WebSocketChannel.h 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. #ifndef HV_WEBSOCKET_CHANNEL_H_
  2. #define HV_WEBSOCKET_CHANNEL_H_
  3. #include <mutex>
  4. #include "Channel.h"
  5. #include "wsdef.h"
  6. #include "hmath.h"
  7. namespace hv {
  8. class WebSocketChannel : public SocketChannel {
  9. public:
  10. ws_session_type type;
  11. WebSocketChannel(hio_t* io, ws_session_type type = WS_CLIENT)
  12. : SocketChannel(io)
  13. , type(type)
  14. {}
  15. ~WebSocketChannel() {}
  16. // isConnected, send, close
  17. int send(const std::string& msg, enum ws_opcode opcode = WS_OPCODE_TEXT, bool fin = true) {
  18. return send(msg.c_str(), msg.size(), opcode, fin);
  19. }
  20. int send(const char* buf, int len, enum ws_opcode opcode = WS_OPCODE_BINARY, bool fin = true) {
  21. int fragment = 0xFFFF; // 65535
  22. if (len > fragment) {
  23. return send(buf, len, fragment, opcode);
  24. }
  25. std::lock_guard<std::mutex> locker(mutex_);
  26. return sendFrame(buf, len, opcode, fin);
  27. }
  28. // websocket fragment
  29. // lock ->
  30. // send(p, fragment, opcode, false) ->
  31. // send(p, fragment, WS_OPCODE_CONTINUE, false) ->
  32. // ... ->
  33. // send(p, remain, WS_OPCODE_CONTINUE, true)
  34. // unlock
  35. int send(const char* buf, int len, int fragment, enum ws_opcode opcode = WS_OPCODE_BINARY) {
  36. std::lock_guard<std::mutex> locker(mutex_);
  37. if (len <= fragment) {
  38. return sendFrame(buf, len, opcode, true);
  39. }
  40. // first fragment
  41. int nsend = sendFrame(buf, fragment, opcode, false);
  42. if (nsend < 0) return nsend;
  43. const char* p = buf + fragment;
  44. int remain = len - fragment;
  45. while (remain > fragment) {
  46. nsend = sendFrame(p, fragment, WS_OPCODE_CONTINUE, false);
  47. if (nsend < 0) return nsend;
  48. p += fragment;
  49. remain -= fragment;
  50. }
  51. // last fragment
  52. nsend = sendFrame(p, remain, WS_OPCODE_CONTINUE, true);
  53. if (nsend < 0) return nsend;
  54. return len;
  55. }
  56. int sendPing() {
  57. std::lock_guard<std::mutex> locker(mutex_);
  58. if (type == WS_CLIENT) {
  59. return write(WS_CLIENT_PING_FRAME, WS_CLIENT_MIN_FRAME_SIZE);
  60. }
  61. return write(WS_SERVER_PING_FRAME, WS_SERVER_MIN_FRAME_SIZE);
  62. }
  63. int sendPong() {
  64. std::lock_guard<std::mutex> locker(mutex_);
  65. if (type == WS_CLIENT) {
  66. return write(WS_CLIENT_PONG_FRAME, WS_CLIENT_MIN_FRAME_SIZE);
  67. }
  68. return write(WS_SERVER_PONG_FRAME, WS_SERVER_MIN_FRAME_SIZE);
  69. }
  70. int close() {
  71. return SocketChannel::close(type == WS_SERVER);
  72. }
  73. protected:
  74. int sendFrame(const char* buf, int len, enum ws_opcode opcode = WS_OPCODE_BINARY, bool fin = true) {
  75. bool has_mask = false;
  76. char mask[4] = {0};
  77. if (type == WS_CLIENT) {
  78. *(int*)mask = rand();
  79. has_mask = true;
  80. }
  81. int frame_size = ws_calc_frame_size(len, has_mask);
  82. if (sendbuf_.len < (size_t)frame_size) {
  83. sendbuf_.resize(ceil2e(frame_size));
  84. }
  85. ws_build_frame(sendbuf_.base, buf, len, mask, has_mask, opcode, fin);
  86. return write(sendbuf_.base, frame_size);
  87. }
  88. public:
  89. enum ws_opcode opcode;
  90. private:
  91. Buffer sendbuf_;
  92. std::mutex mutex_;
  93. };
  94. }
  95. typedef std::shared_ptr<hv::WebSocketChannel> WebSocketChannelPtr;
  96. #endif // HV_WEBSOCKET_CHANNEL_H_