hevent.c 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305
  1. #include "hevent.h"
  2. #include "hsocket.h"
  3. #include "hatomic.h"
  4. #include "hlog.h"
  5. uint64_t hloop_next_event_id() {
  6. static hatomic_t s_id = HATOMIC_VAR_INIT(0);
  7. return ++s_id;
  8. }
  9. uint32_t hio_next_id() {
  10. static hatomic_t s_id = HATOMIC_VAR_INIT(0);
  11. return ++s_id;
  12. }
  13. uint32_t hio_id (hio_t* io) {
  14. return io->id;
  15. }
  16. int hio_fd(hio_t* io) {
  17. return io->fd;
  18. }
  19. hio_type_e hio_type(hio_t* io) {
  20. return io->io_type;
  21. }
  22. int hio_error(hio_t* io) {
  23. return io->error;
  24. }
  25. int hio_events(hio_t* io) {
  26. return io->events;
  27. }
  28. int hio_revents(hio_t* io) {
  29. return io->revents;
  30. }
  31. struct sockaddr* hio_localaddr(hio_t* io) {
  32. return io->localaddr;
  33. }
  34. struct sockaddr* hio_peeraddr(hio_t* io) {
  35. return io->peeraddr;
  36. }
  37. void hio_set_context(hio_t* io, void* ctx) {
  38. io->ctx = ctx;
  39. }
  40. void* hio_context(hio_t* io) {
  41. return io->ctx;
  42. }
  43. haccept_cb hio_getcb_accept(hio_t* io) {
  44. return io->accept_cb;
  45. }
  46. hconnect_cb hio_getcb_connect(hio_t* io) {
  47. return io->connect_cb;
  48. }
  49. hread_cb hio_getcb_read(hio_t* io) {
  50. return io->read_cb;
  51. }
  52. hwrite_cb hio_getcb_write(hio_t* io) {
  53. return io->write_cb;
  54. }
  55. hclose_cb hio_getcb_close(hio_t* io) {
  56. return io->close_cb;
  57. }
  58. void hio_setcb_accept (hio_t* io, haccept_cb accept_cb) {
  59. io->accept_cb = accept_cb;
  60. }
  61. void hio_setcb_connect (hio_t* io, hconnect_cb connect_cb) {
  62. io->connect_cb = connect_cb;
  63. }
  64. void hio_setcb_read (hio_t* io, hread_cb read_cb) {
  65. io->read_cb = read_cb;
  66. }
  67. void hio_setcb_write (hio_t* io, hwrite_cb write_cb) {
  68. io->write_cb = write_cb;
  69. }
  70. void hio_setcb_close (hio_t* io, hclose_cb close_cb) {
  71. io->close_cb = close_cb;
  72. }
  73. void hio_set_type(hio_t* io, hio_type_e type) {
  74. io->io_type = type;
  75. }
  76. void hio_set_localaddr(hio_t* io, struct sockaddr* addr, int addrlen) {
  77. if (io->localaddr == NULL) {
  78. HV_ALLOC(io->localaddr, sizeof(sockaddr_u));
  79. }
  80. memcpy(io->localaddr, addr, addrlen);
  81. }
  82. void hio_set_peeraddr (hio_t* io, struct sockaddr* addr, int addrlen) {
  83. if (io->peeraddr == NULL) {
  84. HV_ALLOC(io->peeraddr, sizeof(sockaddr_u));
  85. }
  86. memcpy(io->peeraddr, addr, addrlen);
  87. }
  88. int hio_enable_ssl(hio_t* io) {
  89. io->io_type = HIO_TYPE_SSL;
  90. return 0;
  91. }
  92. bool hio_is_ssl(hio_t* io) {
  93. return io->io_type == HIO_TYPE_SSL;
  94. }
  95. hssl_t hio_get_ssl(hio_t* io) {
  96. return io->ssl;
  97. }
  98. int hio_set_ssl(hio_t* io, hssl_t ssl) {
  99. io->io_type = HIO_TYPE_SSL;
  100. io->ssl = ssl;
  101. return 0;
  102. }
  103. void hio_set_readbuf(hio_t* io, void* buf, size_t len) {
  104. assert(io && buf && len != 0);
  105. io->readbuf.base = (char*)buf;
  106. io->readbuf.len = len;
  107. io->readbuf.offset = 0;
  108. }
  109. void hio_del_connect_timer(hio_t* io) {
  110. if (io->connect_timer) {
  111. htimer_del(io->connect_timer);
  112. io->connect_timer = NULL;
  113. io->connect_timeout = 0;
  114. }
  115. }
  116. void hio_del_close_timer(hio_t* io) {
  117. if (io->close_timer) {
  118. htimer_del(io->close_timer);
  119. io->close_timer = NULL;
  120. io->close_timeout = 0;
  121. }
  122. }
  123. void hio_del_keepalive_timer(hio_t* io) {
  124. if (io->keepalive_timer) {
  125. htimer_del(io->keepalive_timer);
  126. io->keepalive_timer = NULL;
  127. io->keepalive_timeout = 0;
  128. }
  129. }
  130. void hio_del_heartbeat_timer(hio_t* io) {
  131. if (io->heartbeat_timer) {
  132. htimer_del(io->heartbeat_timer);
  133. io->heartbeat_timer = NULL;
  134. io->heartbeat_interval = 0;
  135. io->heartbeat_fn = NULL;
  136. }
  137. }
  138. void hio_set_connect_timeout(hio_t* io, int timeout_ms) {
  139. io->connect_timeout = timeout_ms;
  140. }
  141. void hio_set_close_timeout(hio_t* io, int timeout_ms) {
  142. io->close_timeout = timeout_ms;
  143. }
  144. static void __keepalive_timeout_cb(htimer_t* timer) {
  145. hio_t* io = (hio_t*)timer->privdata;
  146. if (io) {
  147. char localaddrstr[SOCKADDR_STRLEN] = {0};
  148. char peeraddrstr[SOCKADDR_STRLEN] = {0};
  149. hlogw("keepalive timeout [%s] <=> [%s]",
  150. SOCKADDR_STR(io->localaddr, localaddrstr),
  151. SOCKADDR_STR(io->peeraddr, peeraddrstr));
  152. io->error = ETIMEDOUT;
  153. hio_close(io);
  154. }
  155. }
  156. void hio_set_keepalive_timeout(hio_t* io, int timeout_ms) {
  157. if (timeout_ms == 0) {
  158. // del
  159. hio_del_keepalive_timer(io);
  160. return;
  161. }
  162. if (io->keepalive_timer) {
  163. // reset
  164. ((struct htimeout_s*)io->keepalive_timer)->timeout = timeout_ms;
  165. htimer_reset(io->keepalive_timer);
  166. } else {
  167. // add
  168. io->keepalive_timer = htimer_add(io->loop, __keepalive_timeout_cb, timeout_ms, 1);
  169. io->keepalive_timer->privdata = io;
  170. }
  171. io->keepalive_timeout = timeout_ms;
  172. }
  173. static void __heartbeat_timer_cb(htimer_t* timer) {
  174. hio_t* io = (hio_t*)timer->privdata;
  175. if (io && io->heartbeat_fn) {
  176. io->heartbeat_fn(io);
  177. }
  178. }
  179. void hio_set_heartbeat(hio_t* io, int interval_ms, hio_send_heartbeat_fn fn) {
  180. if (interval_ms == 0) {
  181. // del
  182. hio_del_heartbeat_timer(io);
  183. return;
  184. }
  185. if (io->heartbeat_timer) {
  186. // reset
  187. ((struct htimeout_s*)io->heartbeat_timer)->timeout = interval_ms;
  188. htimer_reset(io->heartbeat_timer);
  189. } else {
  190. // add
  191. io->heartbeat_timer = htimer_add(io->loop, __heartbeat_timer_cb, interval_ms, INFINITE);
  192. io->heartbeat_timer->privdata = io;
  193. }
  194. io->heartbeat_interval = interval_ms;
  195. io->heartbeat_fn = fn;
  196. }
  197. bool hio_is_alloced_readbuf(hio_t* io) {
  198. return io->alloced_readbuf &&
  199. io->readbuf.base &&
  200. io->readbuf.len &&
  201. io->readbuf.base != io->loop->readbuf.base;
  202. }
  203. void hio_alloc_readbuf(hio_t* io, int len) {
  204. if (hio_is_alloced_readbuf(io)) {
  205. io->readbuf.base = (char*)safe_realloc(io->readbuf.base, len, io->readbuf.len);
  206. } else {
  207. HV_ALLOC(io->readbuf.base, len);
  208. }
  209. io->readbuf.len = len;
  210. io->alloced_readbuf = 1;
  211. }
  212. void hio_free_readbuf(hio_t* io) {
  213. if (hio_is_alloced_readbuf(io)) {
  214. HV_FREE(io->readbuf.base);
  215. io->alloced_readbuf = 0;
  216. // reset to loop->readbuf
  217. io->readbuf.base = io->loop->readbuf.base;
  218. io->readbuf.len = io->loop->readbuf.len;
  219. }
  220. }
  221. void hio_unset_unpack(hio_t* io) {
  222. if (io->unpack_setting) {
  223. io->unpack_setting = NULL;
  224. // NOTE: unpack has own readbuf
  225. hio_free_readbuf(io);
  226. }
  227. }
  228. void hio_set_unpack(hio_t* io, unpack_setting_t* setting) {
  229. hio_unset_unpack(io);
  230. if (setting == NULL) return;
  231. io->unpack_setting = setting;
  232. if (io->unpack_setting->package_max_length == 0) {
  233. io->unpack_setting->package_max_length = DEFAULT_PACKAGE_MAX_LENGTH;
  234. }
  235. if (io->unpack_setting->mode == UNPACK_BY_FIXED_LENGTH) {
  236. assert(io->unpack_setting->fixed_length != 0 &&
  237. io->unpack_setting->fixed_length <= io->unpack_setting->package_max_length);
  238. }
  239. else if (io->unpack_setting->mode == UNPACK_BY_DELIMITER) {
  240. if (io->unpack_setting->delimiter_bytes == 0) {
  241. io->unpack_setting->delimiter_bytes = strlen((char*)io->unpack_setting->delimiter);
  242. }
  243. }
  244. else if (io->unpack_setting->mode == UNPACK_BY_LENGTH_FIELD) {
  245. assert(io->unpack_setting->body_offset >=
  246. io->unpack_setting->length_field_offset +
  247. io->unpack_setting->length_field_bytes);
  248. }
  249. // NOTE: unpack must have own readbuf
  250. if (io->unpack_setting->mode == UNPACK_BY_FIXED_LENGTH) {
  251. io->readbuf.len = io->unpack_setting->fixed_length;
  252. } else {
  253. io->readbuf.len = HLOOP_READ_BUFSIZE;
  254. }
  255. hio_alloc_readbuf(io, io->readbuf.len);
  256. }