smux.h 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. #ifndef HV_SMUX_H_
  2. #define HV_SMUX_H_
  3. /*
  4. * smux: Simple MUltipleXing used by kcptun
  5. * @see: https://github.com/xtaci/smux
  6. *
  7. */
  8. #include <map>
  9. #include "hplatform.h"
  10. #include "hbase.h"
  11. #include "hbuf.h"
  12. #include "hloop.h"
  13. typedef enum {
  14. // v1
  15. SMUX_CMD_SYN = 0, // stream open
  16. SMUX_CMD_FIN = 1, // stream close
  17. SMUX_CMD_PSH = 2, // data push
  18. SMUX_CMD_NOP = 3, // no operation
  19. // v2
  20. SMUX_CMD_UPD = 4, // update
  21. } smux_cmd_e;
  22. typedef struct {
  23. uint8_t version;
  24. uint8_t cmd;
  25. uint16_t length;
  26. uint32_t sid;
  27. } smux_head_t;
  28. #define SMUX_HEAD_LENGTH 8
  29. typedef struct {
  30. smux_head_t head;
  31. const char* data;
  32. } smux_frame_t;
  33. static inline unsigned int smux_package_length(const smux_head_t* head) {
  34. return SMUX_HEAD_LENGTH + head->length;
  35. }
  36. static inline void smux_head_init(smux_head_t* head) {
  37. head->version = 1;
  38. head->cmd = (uint8_t)SMUX_CMD_PSH;
  39. head->length = 0;
  40. head->sid = 0;
  41. }
  42. static inline void smux_frame_init(smux_frame_t* frame) {
  43. smux_head_init(&frame->head);
  44. frame->data = NULL;
  45. }
  46. // @retval >0 package_length, <0 error
  47. int smux_frame_pack(const smux_frame_t* frame, void* buf, int len);
  48. // @retval >0 package_length, <0 error
  49. int smux_frame_unpack(smux_frame_t* frame, const void* buf, int len);
  50. typedef struct smux_config_s {
  51. int version;
  52. int keepalive_interval;
  53. int keepalive_timeout;
  54. int max_frame_size;
  55. smux_config_s() {
  56. version = 1;
  57. keepalive_interval = 10000;
  58. keepalive_timeout = 30000;
  59. max_frame_size = 1024;
  60. }
  61. } smux_config_t;
  62. typedef struct {
  63. uint32_t stream_id;
  64. smux_frame_t frame;
  65. hbuf_t rbuf;
  66. hbuf_t wbuf;
  67. hio_t* io;
  68. htimer_t* timer;
  69. } smux_stream_t;
  70. // @retval >0 package_length, <0 error, data => wbuf
  71. static inline int smux_stream_output(smux_stream_t* stream, smux_frame_t* frame) {
  72. return smux_frame_pack(frame, stream->wbuf.base, stream->wbuf.len);
  73. }
  74. static inline int smux_stream_output(smux_stream_t* stream, smux_cmd_e cmd) {
  75. smux_frame_t frame;
  76. smux_frame_init(&frame);
  77. frame.head.sid = stream->stream_id;
  78. frame.head.cmd = (uint8_t)cmd;
  79. return smux_frame_pack(&frame, stream->wbuf.base, stream->wbuf.len);
  80. }
  81. // @retval >0 package_length, <0 error, data => frame
  82. static inline int smux_stream_input(smux_stream_t* stream, const void* buf, int len) {
  83. return smux_frame_unpack(&stream->frame, buf, len);
  84. }
  85. typedef struct {
  86. uint32_t next_stream_id;
  87. // stream_id => smux_stream_t
  88. std::map<uint32_t, smux_stream_t*> streams;
  89. } smux_session_t;
  90. static inline smux_stream_t* smux_session_open_stream(smux_session_t* session, uint32_t stream_id = 0, hio_t* io = NULL) {
  91. smux_stream_t* stream = NULL;
  92. HV_ALLOC_SIZEOF(stream);
  93. if (stream_id == 0) {
  94. session->next_stream_id += 2;
  95. stream_id = session->next_stream_id;
  96. }
  97. stream->stream_id = stream_id;
  98. session->streams[stream_id] = stream;
  99. stream->io = io;
  100. return stream;
  101. }
  102. static inline smux_stream_t* smux_session_get_stream(smux_session_t* session, uint32_t stream_id) {
  103. auto iter = session->streams.find(stream_id);
  104. if (iter != session->streams.end()) {
  105. return iter->second;
  106. }
  107. return NULL;
  108. }
  109. static inline void smux_session_close_stream(smux_session_t* session, uint32_t stream_id) {
  110. auto iter = session->streams.find(stream_id);
  111. if (iter != session->streams.end()) {
  112. HV_FREE(iter->second);
  113. session->streams.erase(iter);
  114. }
  115. }
  116. #endif // HV_SMUX_H_