hevent.c 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938
  1. #include "hevent.h"
  2. #include "hsocket.h"
  3. #include "hatomic.h"
  4. #include "hlog.h"
  5. #include "herr.h"
  6. #include "unpack.h"
  7. uint64_t hloop_next_event_id() {
  8. static hatomic_t s_id = HATOMIC_VAR_INIT(0);
  9. return ++s_id;
  10. }
  11. uint32_t hio_next_id() {
  12. static hatomic_t s_id = HATOMIC_VAR_INIT(0);
  13. return ++s_id;
  14. }
  15. static void fill_io_type(hio_t* io) {
  16. int type = 0;
  17. socklen_t optlen = sizeof(int);
  18. int ret = getsockopt(io->fd, SOL_SOCKET, SO_TYPE, (char*)&type, &optlen);
  19. printd("getsockopt SO_TYPE fd=%d ret=%d type=%d errno=%d\n", io->fd, ret, type, socket_errno());
  20. if (ret == 0) {
  21. switch (type) {
  22. case SOCK_STREAM: io->io_type = HIO_TYPE_TCP; break;
  23. case SOCK_DGRAM: io->io_type = HIO_TYPE_UDP; break;
  24. case SOCK_RAW: io->io_type = HIO_TYPE_IP; break;
  25. default: io->io_type = HIO_TYPE_SOCKET; break;
  26. }
  27. }
  28. else if (socket_errno() == ENOTSOCK) {
  29. switch (io->fd) {
  30. case 0: io->io_type = HIO_TYPE_STDIN; break;
  31. case 1: io->io_type = HIO_TYPE_STDOUT; break;
  32. case 2: io->io_type = HIO_TYPE_STDERR; break;
  33. default: io->io_type = HIO_TYPE_FILE; break;
  34. }
  35. }
  36. else {
  37. io->io_type = HIO_TYPE_TCP;
  38. }
  39. }
  40. static void hio_socket_init(hio_t* io) {
  41. nonblocking(io->fd);
  42. // fill io->localaddr io->peeraddr
  43. if (io->localaddr == NULL) {
  44. HV_ALLOC(io->localaddr, sizeof(sockaddr_u));
  45. }
  46. if (io->peeraddr == NULL) {
  47. HV_ALLOC(io->peeraddr, sizeof(sockaddr_u));
  48. }
  49. socklen_t addrlen = sizeof(sockaddr_u);
  50. int ret = getsockname(io->fd, io->localaddr, &addrlen);
  51. printd("getsockname fd=%d ret=%d errno=%d\n", io->fd, ret, socket_errno());
  52. // NOTE: udp peeraddr set by recvfrom/sendto
  53. if (io->io_type & HIO_TYPE_SOCK_STREAM) {
  54. addrlen = sizeof(sockaddr_u);
  55. ret = getpeername(io->fd, io->peeraddr, &addrlen);
  56. printd("getpeername fd=%d ret=%d errno=%d\n", io->fd, ret, socket_errno());
  57. }
  58. }
  59. void hio_init(hio_t* io) {
  60. // alloc localaddr,peeraddr when hio_socket_init
  61. /*
  62. if (io->localaddr == NULL) {
  63. HV_ALLOC(io->localaddr, sizeof(sockaddr_u));
  64. }
  65. if (io->peeraddr == NULL) {
  66. HV_ALLOC(io->peeraddr, sizeof(sockaddr_u));
  67. }
  68. */
  69. // write_queue init when hwrite try_write failed
  70. // write_queue_init(&io->write_queue, 4);
  71. hrecursive_mutex_init(&io->write_mutex);
  72. }
  73. void hio_ready(hio_t* io) {
  74. if (io->ready) return;
  75. // flags
  76. io->ready = 1;
  77. io->connected = 0;
  78. io->closed = 0;
  79. io->accept = io->connect = io->connectex = 0;
  80. io->recv = io->send = 0;
  81. io->recvfrom = io->sendto = 0;
  82. io->close = 0;
  83. // public:
  84. io->id = hio_next_id();
  85. io->io_type = HIO_TYPE_UNKNOWN;
  86. io->error = 0;
  87. io->events = io->revents = 0;
  88. io->last_read_hrtime = io->last_write_hrtime = io->loop->cur_hrtime;
  89. // readbuf
  90. io->alloced_readbuf = 0;
  91. hio_use_loop_readbuf(io);
  92. io->readbuf.head = io->readbuf.tail = 0;
  93. io->read_flags = 0;
  94. io->read_until_length = 0;
  95. io->max_read_bufsize = MAX_READ_BUFSIZE;
  96. io->small_readbytes_cnt = 0;
  97. // write_queue
  98. io->write_bufsize = 0;
  99. io->max_write_bufsize = MAX_WRITE_BUFSIZE;
  100. // callbacks
  101. io->read_cb = NULL;
  102. io->write_cb = NULL;
  103. io->close_cb = NULL;
  104. io->accept_cb = NULL;
  105. io->connect_cb = NULL;
  106. // timers
  107. io->connect_timeout = 0;
  108. io->connect_timer = NULL;
  109. io->close_timeout = 0;
  110. io->close_timer = NULL;
  111. io->read_timeout = 0;
  112. io->read_timer = NULL;
  113. io->write_timeout = 0;
  114. io->write_timer = NULL;
  115. io->keepalive_timeout = 0;
  116. io->keepalive_timer = NULL;
  117. io->heartbeat_interval = 0;
  118. io->heartbeat_fn = NULL;
  119. io->heartbeat_timer = NULL;
  120. // upstream
  121. io->upstream_io = NULL;
  122. // unpack
  123. io->unpack_setting = NULL;
  124. // ssl
  125. io->ssl = NULL;
  126. io->ssl_ctx = NULL;
  127. io->alloced_ssl_ctx = 0;
  128. io->hostname = NULL;
  129. // context
  130. io->ctx = NULL;
  131. // private:
  132. #if defined(EVENT_POLL) || defined(EVENT_KQUEUE)
  133. io->event_index[0] = io->event_index[1] = -1;
  134. #endif
  135. #ifdef EVENT_IOCP
  136. io->hovlp = NULL;
  137. #endif
  138. // io_type
  139. fill_io_type(io);
  140. if (io->io_type & HIO_TYPE_SOCKET) {
  141. hio_socket_init(io);
  142. }
  143. #if WITH_RUDP
  144. if ((io->io_type & HIO_TYPE_SOCK_DGRAM) || (io->io_type & HIO_TYPE_SOCK_RAW)) {
  145. rudp_init(&io->rudp);
  146. }
  147. #endif
  148. }
  149. void hio_done(hio_t* io) {
  150. if (!io->ready) return;
  151. io->ready = 0;
  152. hio_del(io, HV_RDWR);
  153. // readbuf
  154. hio_free_readbuf(io);
  155. // write_queue
  156. offset_buf_t* pbuf = NULL;
  157. hrecursive_mutex_lock(&io->write_mutex);
  158. while (!write_queue_empty(&io->write_queue)) {
  159. pbuf = write_queue_front(&io->write_queue);
  160. HV_FREE(pbuf->base);
  161. write_queue_pop_front(&io->write_queue);
  162. }
  163. write_queue_cleanup(&io->write_queue);
  164. hrecursive_mutex_unlock(&io->write_mutex);
  165. #if WITH_RUDP
  166. if ((io->io_type & HIO_TYPE_SOCK_DGRAM) || (io->io_type & HIO_TYPE_SOCK_RAW)) {
  167. rudp_cleanup(&io->rudp);
  168. }
  169. #endif
  170. }
  171. void hio_free(hio_t* io) {
  172. if (io == NULL || io->destroy) return;
  173. io->destroy = 1;
  174. hio_close(io);
  175. hrecursive_mutex_destroy(&io->write_mutex);
  176. HV_FREE(io->localaddr);
  177. HV_FREE(io->peeraddr);
  178. HV_FREE(io);
  179. }
  180. bool hio_is_opened(hio_t* io) {
  181. if (io == NULL) return false;
  182. return io->ready == 1 && io->closed == 0;
  183. }
  184. bool hio_is_connected(hio_t* io) {
  185. if (io == NULL) return false;
  186. return io->ready == 1 && io->connected == 1 && io->closed == 0;
  187. }
  188. bool hio_is_closed(hio_t* io) {
  189. if (io == NULL) return true;
  190. return io->ready == 0 && io->closed == 1;
  191. }
  192. uint32_t hio_id (hio_t* io) {
  193. return io->id;
  194. }
  195. int hio_fd(hio_t* io) {
  196. return io->fd;
  197. }
  198. hio_type_e hio_type(hio_t* io) {
  199. return io->io_type;
  200. }
  201. int hio_error(hio_t* io) {
  202. return io->error;
  203. }
  204. int hio_events(hio_t* io) {
  205. return io->events;
  206. }
  207. int hio_revents(hio_t* io) {
  208. return io->revents;
  209. }
  210. struct sockaddr* hio_localaddr(hio_t* io) {
  211. return io->localaddr;
  212. }
  213. struct sockaddr* hio_peeraddr(hio_t* io) {
  214. return io->peeraddr;
  215. }
  216. void hio_set_context(hio_t* io, void* ctx) {
  217. io->ctx = ctx;
  218. }
  219. void* hio_context(hio_t* io) {
  220. return io->ctx;
  221. }
  222. haccept_cb hio_getcb_accept(hio_t* io) {
  223. return io->accept_cb;
  224. }
  225. hconnect_cb hio_getcb_connect(hio_t* io) {
  226. return io->connect_cb;
  227. }
  228. hread_cb hio_getcb_read(hio_t* io) {
  229. return io->read_cb;
  230. }
  231. hwrite_cb hio_getcb_write(hio_t* io) {
  232. return io->write_cb;
  233. }
  234. hclose_cb hio_getcb_close(hio_t* io) {
  235. return io->close_cb;
  236. }
  237. void hio_setcb_accept(hio_t* io, haccept_cb accept_cb) {
  238. io->accept_cb = accept_cb;
  239. }
  240. void hio_setcb_connect(hio_t* io, hconnect_cb connect_cb) {
  241. io->connect_cb = connect_cb;
  242. }
  243. void hio_setcb_read(hio_t* io, hread_cb read_cb) {
  244. io->read_cb = read_cb;
  245. }
  246. void hio_setcb_write(hio_t* io, hwrite_cb write_cb) {
  247. io->write_cb = write_cb;
  248. }
  249. void hio_setcb_close(hio_t* io, hclose_cb close_cb) {
  250. io->close_cb = close_cb;
  251. }
  252. void hio_accept_cb(hio_t* io) {
  253. /*
  254. char localaddrstr[SOCKADDR_STRLEN] = {0};
  255. char peeraddrstr[SOCKADDR_STRLEN] = {0};
  256. printd("accept connfd=%d [%s] <= [%s]\n", io->fd,
  257. SOCKADDR_STR(io->localaddr, localaddrstr),
  258. SOCKADDR_STR(io->peeraddr, peeraddrstr));
  259. */
  260. if (io->accept_cb) {
  261. // printd("accept_cb------\n");
  262. io->accept_cb(io);
  263. // printd("accept_cb======\n");
  264. }
  265. }
  266. void hio_connect_cb(hio_t* io) {
  267. /*
  268. char localaddrstr[SOCKADDR_STRLEN] = {0};
  269. char peeraddrstr[SOCKADDR_STRLEN] = {0};
  270. printd("connect connfd=%d [%s] => [%s]\n", io->fd,
  271. SOCKADDR_STR(io->localaddr, localaddrstr),
  272. SOCKADDR_STR(io->peeraddr, peeraddrstr));
  273. */
  274. io->connected = 1;
  275. if (io->connect_cb) {
  276. // printd("connect_cb------\n");
  277. io->connect_cb(io);
  278. // printd("connect_cb======\n");
  279. }
  280. }
  281. void hio_handle_read(hio_t* io, void* buf, int readbytes) {
  282. #if WITH_KCP
  283. if (io->io_type == HIO_TYPE_KCP) {
  284. hio_read_kcp(io, buf, readbytes);
  285. io->readbuf.head = io->readbuf.tail = 0;
  286. return;
  287. }
  288. #endif
  289. if (io->unpack_setting) {
  290. // hio_set_unpack
  291. hio_unpack(io, buf, readbytes);
  292. } else {
  293. const unsigned char* sp = (const unsigned char*)io->readbuf.base + io->readbuf.head;
  294. const unsigned char* ep = (const unsigned char*)buf + readbytes;
  295. if (io->read_flags & HIO_READ_UNTIL_LENGTH) {
  296. // hio_read_until_length
  297. if (ep - sp >= io->read_until_length) {
  298. io->readbuf.head += io->read_until_length;
  299. if (io->readbuf.head == io->readbuf.tail) {
  300. io->readbuf.head = io->readbuf.tail = 0;
  301. }
  302. io->read_flags &= ~HIO_READ_UNTIL_LENGTH;
  303. hio_read_cb(io, (void*)sp, io->read_until_length);
  304. }
  305. } else if (io->read_flags & HIO_READ_UNTIL_DELIM) {
  306. // hio_read_until_delim
  307. const unsigned char* p = (const unsigned char*)buf;
  308. for (int i = 0; i < readbytes; ++i, ++p) {
  309. if (*p == io->read_until_delim) {
  310. int len = p - sp + 1;
  311. io->readbuf.head += len;
  312. if (io->readbuf.head == io->readbuf.tail) {
  313. io->readbuf.head = io->readbuf.tail = 0;
  314. }
  315. io->read_flags &= ~HIO_READ_UNTIL_DELIM;
  316. hio_read_cb(io, (void*)sp, len);
  317. return;
  318. }
  319. }
  320. } else {
  321. // hio_read
  322. io->readbuf.head = io->readbuf.tail = 0;
  323. hio_read_cb(io, (void*)sp, ep - sp);
  324. }
  325. }
  326. if (io->readbuf.head == io->readbuf.tail) {
  327. io->readbuf.head = io->readbuf.tail = 0;
  328. }
  329. // readbuf autosize
  330. if (io->readbuf.tail == io->readbuf.len) {
  331. if (io->readbuf.head == 0) {
  332. // scale up * 2
  333. hio_alloc_readbuf(io, io->readbuf.len * 2);
  334. } else {
  335. hio_memmove_readbuf(io);
  336. }
  337. } else {
  338. size_t small_size = io->readbuf.len / 2;
  339. if (io->readbuf.tail < small_size &&
  340. io->small_readbytes_cnt >= 3) {
  341. // scale down / 2
  342. hio_alloc_readbuf(io, small_size);
  343. }
  344. }
  345. }
  346. void hio_read_cb(hio_t* io, void* buf, int len) {
  347. if (io->read_flags & HIO_READ_ONCE) {
  348. io->read_flags &= ~HIO_READ_ONCE;
  349. hio_read_stop(io);
  350. }
  351. if (io->read_cb && !io->closed) {
  352. // printd("read_cb------\n");
  353. io->read_cb(io, buf, len);
  354. // printd("read_cb======\n");
  355. }
  356. // for readbuf autosize
  357. if (hio_is_alloced_readbuf(io) && io->readbuf.len > READ_BUFSIZE_HIGH_WATER) {
  358. size_t small_size = io->readbuf.len / 2;
  359. if (len < small_size) {
  360. ++io->small_readbytes_cnt;
  361. } else {
  362. io->small_readbytes_cnt = 0;
  363. }
  364. }
  365. }
  366. void hio_write_cb(hio_t* io, const void* buf, int len) {
  367. if (io->write_cb && !io->closed) {
  368. // printd("write_cb------\n");
  369. io->write_cb(io, buf, len);
  370. // printd("write_cb======\n");
  371. }
  372. }
  373. void hio_close_cb(hio_t* io) {
  374. io->connected = 0;
  375. io->closed = 1;
  376. hclose_cb close_cb = io->close_cb;
  377. if (close_cb) {
  378. // printd("close_cb------\n");
  379. close_cb(io);
  380. // printd("close_cb======\n");
  381. }
  382. }
  383. void hio_set_type(hio_t* io, hio_type_e type) {
  384. io->io_type = type;
  385. }
  386. void hio_set_localaddr(hio_t* io, struct sockaddr* addr, int addrlen) {
  387. if (io->localaddr == NULL) {
  388. HV_ALLOC(io->localaddr, sizeof(sockaddr_u));
  389. }
  390. memcpy(io->localaddr, addr, addrlen);
  391. }
  392. void hio_set_peeraddr (hio_t* io, struct sockaddr* addr, int addrlen) {
  393. if (io->peeraddr == NULL) {
  394. HV_ALLOC(io->peeraddr, sizeof(sockaddr_u));
  395. }
  396. memcpy(io->peeraddr, addr, addrlen);
  397. }
  398. int hio_enable_ssl(hio_t* io) {
  399. io->io_type = HIO_TYPE_SSL;
  400. return 0;
  401. }
  402. bool hio_is_ssl(hio_t* io) {
  403. return io->io_type == HIO_TYPE_SSL;
  404. }
  405. hssl_t hio_get_ssl(hio_t* io) {
  406. return io->ssl;
  407. }
  408. hssl_ctx_t hio_get_ssl_ctx(hio_t* io) {
  409. return io->ssl_ctx;
  410. }
  411. int hio_set_ssl(hio_t* io, hssl_t ssl) {
  412. io->io_type = HIO_TYPE_SSL;
  413. io->ssl = ssl;
  414. return 0;
  415. }
  416. int hio_set_ssl_ctx(hio_t* io, hssl_ctx_t ssl_ctx) {
  417. io->io_type = HIO_TYPE_SSL;
  418. io->ssl_ctx = ssl_ctx;
  419. return 0;
  420. }
  421. int hio_new_ssl_ctx(hio_t* io, hssl_ctx_opt_t* opt) {
  422. hssl_ctx_t ssl_ctx = hssl_ctx_new(opt);
  423. if (ssl_ctx == NULL) return ERR_NEW_SSL_CTX;
  424. io->alloced_ssl_ctx = 1;
  425. return hio_set_ssl_ctx(io, ssl_ctx);
  426. }
  427. int hio_set_hostname(hio_t* io, const char* hostname) {
  428. SAFE_FREE(io->hostname);
  429. io->hostname = strdup(hostname);
  430. return 0;
  431. }
  432. const char* hio_get_hostname(hio_t* io) {
  433. return io->hostname;
  434. }
  435. void hio_del_connect_timer(hio_t* io) {
  436. if (io->connect_timer) {
  437. htimer_del(io->connect_timer);
  438. io->connect_timer = NULL;
  439. io->connect_timeout = 0;
  440. }
  441. }
  442. void hio_del_close_timer(hio_t* io) {
  443. if (io->close_timer) {
  444. htimer_del(io->close_timer);
  445. io->close_timer = NULL;
  446. io->close_timeout = 0;
  447. }
  448. }
  449. void hio_del_read_timer(hio_t* io) {
  450. if (io->read_timer) {
  451. htimer_del(io->read_timer);
  452. io->read_timer = NULL;
  453. io->read_timeout = 0;
  454. }
  455. }
  456. void hio_del_write_timer(hio_t* io) {
  457. if (io->write_timer) {
  458. htimer_del(io->write_timer);
  459. io->write_timer = NULL;
  460. io->write_timeout = 0;
  461. }
  462. }
  463. void hio_del_keepalive_timer(hio_t* io) {
  464. if (io->keepalive_timer) {
  465. htimer_del(io->keepalive_timer);
  466. io->keepalive_timer = NULL;
  467. io->keepalive_timeout = 0;
  468. }
  469. }
  470. void hio_del_heartbeat_timer(hio_t* io) {
  471. if (io->heartbeat_timer) {
  472. htimer_del(io->heartbeat_timer);
  473. io->heartbeat_timer = NULL;
  474. io->heartbeat_interval = 0;
  475. io->heartbeat_fn = NULL;
  476. }
  477. }
  478. void hio_set_connect_timeout(hio_t* io, int timeout_ms) {
  479. io->connect_timeout = timeout_ms;
  480. }
  481. void hio_set_close_timeout(hio_t* io, int timeout_ms) {
  482. io->close_timeout = timeout_ms;
  483. }
  484. static void __read_timeout_cb(htimer_t* timer) {
  485. hio_t* io = (hio_t*)timer->privdata;
  486. uint64_t inactive_ms = (io->loop->cur_hrtime - io->last_read_hrtime) / 1000;
  487. if (inactive_ms + 100 < io->read_timeout) {
  488. htimer_reset(io->read_timer, io->read_timeout - inactive_ms);
  489. } else {
  490. if (io->io_type & HIO_TYPE_SOCKET) {
  491. char localaddrstr[SOCKADDR_STRLEN] = {0};
  492. char peeraddrstr[SOCKADDR_STRLEN] = {0};
  493. hlogw("read timeout [%s] <=> [%s]",
  494. SOCKADDR_STR(io->localaddr, localaddrstr),
  495. SOCKADDR_STR(io->peeraddr, peeraddrstr));
  496. }
  497. io->error = ETIMEDOUT;
  498. hio_close(io);
  499. }
  500. }
  501. void hio_set_read_timeout(hio_t* io, int timeout_ms) {
  502. if (timeout_ms <= 0) {
  503. // del
  504. hio_del_read_timer(io);
  505. return;
  506. }
  507. if (io->read_timer) {
  508. // reset
  509. htimer_reset(io->read_timer, timeout_ms);
  510. } else {
  511. // add
  512. io->read_timer = htimer_add(io->loop, __read_timeout_cb, timeout_ms, 1);
  513. io->read_timer->privdata = io;
  514. }
  515. io->read_timeout = timeout_ms;
  516. }
  517. static void __write_timeout_cb(htimer_t* timer) {
  518. hio_t* io = (hio_t*)timer->privdata;
  519. uint64_t inactive_ms = (io->loop->cur_hrtime - io->last_write_hrtime) / 1000;
  520. if (inactive_ms + 100 < io->write_timeout) {
  521. htimer_reset(io->write_timer, io->write_timeout - inactive_ms);
  522. } else {
  523. if (io->io_type & HIO_TYPE_SOCKET) {
  524. char localaddrstr[SOCKADDR_STRLEN] = {0};
  525. char peeraddrstr[SOCKADDR_STRLEN] = {0};
  526. hlogi("write timeout [%s] <=> [%s]",
  527. SOCKADDR_STR(io->localaddr, localaddrstr),
  528. SOCKADDR_STR(io->peeraddr, peeraddrstr));
  529. }
  530. io->error = ETIMEDOUT;
  531. hio_close(io);
  532. }
  533. }
  534. void hio_set_write_timeout(hio_t* io, int timeout_ms) {
  535. if (timeout_ms <= 0) {
  536. // del
  537. hio_del_write_timer(io);
  538. return;
  539. }
  540. if (io->write_timer) {
  541. // reset
  542. htimer_reset(io->write_timer, timeout_ms);
  543. } else {
  544. // add
  545. io->write_timer = htimer_add(io->loop, __write_timeout_cb, timeout_ms, 1);
  546. io->write_timer->privdata = io;
  547. }
  548. io->write_timeout = timeout_ms;
  549. }
  550. static void __keepalive_timeout_cb(htimer_t* timer) {
  551. hio_t* io = (hio_t*)timer->privdata;
  552. uint64_t last_rw_hrtime = MAX(io->last_read_hrtime, io->last_write_hrtime);
  553. uint64_t inactive_ms = (io->loop->cur_hrtime - last_rw_hrtime) / 1000;
  554. if (inactive_ms + 100 < io->keepalive_timeout) {
  555. htimer_reset(io->keepalive_timer, io->keepalive_timeout - inactive_ms);
  556. } else {
  557. if (io->io_type & HIO_TYPE_SOCKET) {
  558. char localaddrstr[SOCKADDR_STRLEN] = {0};
  559. char peeraddrstr[SOCKADDR_STRLEN] = {0};
  560. hlogw("keepalive timeout [%s] <=> [%s]",
  561. SOCKADDR_STR(io->localaddr, localaddrstr),
  562. SOCKADDR_STR(io->peeraddr, peeraddrstr));
  563. }
  564. io->error = ETIMEDOUT;
  565. hio_close(io);
  566. }
  567. }
  568. void hio_set_keepalive_timeout(hio_t* io, int timeout_ms) {
  569. if (timeout_ms <= 0) {
  570. // del
  571. hio_del_keepalive_timer(io);
  572. return;
  573. }
  574. if (io->keepalive_timer) {
  575. // reset
  576. htimer_reset(io->keepalive_timer, timeout_ms);
  577. } else {
  578. // add
  579. io->keepalive_timer = htimer_add(io->loop, __keepalive_timeout_cb, timeout_ms, 1);
  580. io->keepalive_timer->privdata = io;
  581. }
  582. io->keepalive_timeout = timeout_ms;
  583. }
  584. static void __heartbeat_timer_cb(htimer_t* timer) {
  585. hio_t* io = (hio_t*)timer->privdata;
  586. if (io && io->heartbeat_fn) {
  587. io->heartbeat_fn(io);
  588. }
  589. }
  590. void hio_set_heartbeat(hio_t* io, int interval_ms, hio_send_heartbeat_fn fn) {
  591. if (interval_ms <= 0) {
  592. // del
  593. hio_del_heartbeat_timer(io);
  594. return;
  595. }
  596. if (io->heartbeat_timer) {
  597. // reset
  598. htimer_reset(io->heartbeat_timer, interval_ms);
  599. } else {
  600. // add
  601. io->heartbeat_timer = htimer_add(io->loop, __heartbeat_timer_cb, interval_ms, INFINITE);
  602. io->heartbeat_timer->privdata = io;
  603. }
  604. io->heartbeat_interval = interval_ms;
  605. io->heartbeat_fn = fn;
  606. }
  607. //-----------------iobuf---------------------------------------------
  608. void hio_alloc_readbuf(hio_t* io, int len) {
  609. if (len > io->max_read_bufsize) {
  610. hloge("read bufsize > %u, close it!", io->max_read_bufsize);
  611. io->error = ERR_OVER_LIMIT;
  612. hio_close_async(io);
  613. return;
  614. }
  615. if (hio_is_alloced_readbuf(io)) {
  616. io->readbuf.base = (char*)hv_realloc(io->readbuf.base, len, io->readbuf.len);
  617. } else {
  618. HV_ALLOC(io->readbuf.base, len);
  619. }
  620. io->readbuf.len = len;
  621. io->alloced_readbuf = 1;
  622. io->small_readbytes_cnt = 0;
  623. }
  624. void hio_free_readbuf(hio_t* io) {
  625. if (hio_is_alloced_readbuf(io)) {
  626. HV_FREE(io->readbuf.base);
  627. io->alloced_readbuf = 0;
  628. // reset to loop->readbuf
  629. io->readbuf.base = io->loop->readbuf.base;
  630. io->readbuf.len = io->loop->readbuf.len;
  631. }
  632. }
  633. void hio_memmove_readbuf(hio_t* io) {
  634. fifo_buf_t* buf = &io->readbuf;
  635. if (buf->tail == buf->head) {
  636. buf->head = buf->tail = 0;
  637. return;
  638. }
  639. if (buf->tail > buf->head) {
  640. size_t size = buf->tail - buf->head;
  641. // [head, tail] => [0, tail - head]
  642. memmove(buf->base, buf->base + buf->head, size);
  643. buf->head = 0;
  644. buf->tail = size;
  645. }
  646. }
  647. void hio_set_readbuf(hio_t* io, void* buf, size_t len) {
  648. assert(io && buf && len != 0);
  649. hio_free_readbuf(io);
  650. io->readbuf.base = (char*)buf;
  651. io->readbuf.len = len;
  652. io->readbuf.head = io->readbuf.tail = 0;
  653. io->alloced_readbuf = 0;
  654. }
  655. hio_readbuf_t* hio_get_readbuf(hio_t* io) {
  656. return &io->readbuf;
  657. }
  658. void hio_set_max_read_bufsize (hio_t* io, uint32_t size) {
  659. io->max_read_bufsize = size;
  660. }
  661. void hio_set_max_write_bufsize(hio_t* io, uint32_t size) {
  662. io->max_write_bufsize = size;
  663. }
  664. size_t hio_write_bufsize(hio_t* io) {
  665. return io->write_bufsize;
  666. }
  667. int hio_read_once (hio_t* io) {
  668. io->read_flags |= HIO_READ_ONCE;
  669. return hio_read_start(io);
  670. }
  671. int hio_read_until_length(hio_t* io, unsigned int len) {
  672. if (len == 0) return 0;
  673. if (io->readbuf.tail - io->readbuf.head >= len) {
  674. void* buf = io->readbuf.base + io->readbuf.head;
  675. io->readbuf.head += len;
  676. if (io->readbuf.head == io->readbuf.tail) {
  677. io->readbuf.head = io->readbuf.tail = 0;
  678. }
  679. hio_read_cb(io, buf, len);
  680. return len;
  681. }
  682. io->read_flags = HIO_READ_UNTIL_LENGTH;
  683. io->read_until_length = len;
  684. if (io->readbuf.head > 1024 || io->readbuf.tail - io->readbuf.head < 1024) {
  685. hio_memmove_readbuf(io);
  686. }
  687. // NOTE: prepare readbuf
  688. int need_len = io->readbuf.head + len;
  689. if (hio_is_loop_readbuf(io) ||
  690. io->readbuf.len < need_len) {
  691. hio_alloc_readbuf(io, need_len);
  692. }
  693. return hio_read_once(io);
  694. }
  695. int hio_read_until_delim(hio_t* io, unsigned char delim) {
  696. if (io->readbuf.tail - io->readbuf.head > 0) {
  697. const unsigned char* sp = (const unsigned char*)io->readbuf.base + io->readbuf.head;
  698. const unsigned char* ep = (const unsigned char*)io->readbuf.base + io->readbuf.tail;
  699. const unsigned char* p = sp;
  700. while (p <= ep) {
  701. if (*p == delim) {
  702. int len = p - sp + 1;
  703. io->readbuf.head += len;
  704. if (io->readbuf.head == io->readbuf.tail) {
  705. io->readbuf.head = io->readbuf.tail = 0;
  706. }
  707. hio_read_cb(io, (void*)sp, len);
  708. return len;
  709. }
  710. ++p;
  711. }
  712. }
  713. io->read_flags = HIO_READ_UNTIL_DELIM;
  714. io->read_until_length = delim;
  715. // NOTE: prepare readbuf
  716. if (hio_is_loop_readbuf(io) ||
  717. io->readbuf.len < HLOOP_READ_BUFSIZE) {
  718. hio_alloc_readbuf(io, HLOOP_READ_BUFSIZE);
  719. }
  720. return hio_read_once(io);
  721. }
  722. int hio_read_remain(hio_t* io) {
  723. int remain = io->readbuf.tail - io->readbuf.head;
  724. if (remain > 0) {
  725. void* buf = io->readbuf.base + io->readbuf.head;
  726. io->readbuf.head = io->readbuf.tail = 0;
  727. hio_read_cb(io, buf, remain);
  728. }
  729. return remain;
  730. }
  731. //-----------------unpack---------------------------------------------
  732. void hio_set_unpack(hio_t* io, unpack_setting_t* setting) {
  733. hio_unset_unpack(io);
  734. if (setting == NULL) return;
  735. io->unpack_setting = setting;
  736. if (io->unpack_setting->package_max_length == 0) {
  737. io->unpack_setting->package_max_length = DEFAULT_PACKAGE_MAX_LENGTH;
  738. }
  739. if (io->unpack_setting->mode == UNPACK_BY_FIXED_LENGTH) {
  740. assert(io->unpack_setting->fixed_length != 0 &&
  741. io->unpack_setting->fixed_length <= io->unpack_setting->package_max_length);
  742. }
  743. else if (io->unpack_setting->mode == UNPACK_BY_DELIMITER) {
  744. if (io->unpack_setting->delimiter_bytes == 0) {
  745. io->unpack_setting->delimiter_bytes = strlen((char*)io->unpack_setting->delimiter);
  746. }
  747. }
  748. else if (io->unpack_setting->mode == UNPACK_BY_LENGTH_FIELD) {
  749. assert(io->unpack_setting->body_offset >=
  750. io->unpack_setting->length_field_offset +
  751. io->unpack_setting->length_field_bytes);
  752. if (io->unpack_setting->length_field_coding == 0) {
  753. io->unpack_setting->length_field_coding = ENCODE_BY_BIG_ENDIAN;
  754. }
  755. }
  756. // NOTE: unpack must have own readbuf
  757. if (io->unpack_setting->mode == UNPACK_BY_FIXED_LENGTH) {
  758. io->readbuf.len = io->unpack_setting->fixed_length;
  759. } else {
  760. io->readbuf.len = MIN(HLOOP_READ_BUFSIZE, io->unpack_setting->package_max_length);
  761. }
  762. io->max_read_bufsize = io->unpack_setting->package_max_length;
  763. hio_alloc_readbuf(io, io->readbuf.len);
  764. }
  765. void hio_unset_unpack(hio_t* io) {
  766. if (io->unpack_setting) {
  767. io->unpack_setting = NULL;
  768. // NOTE: unpack has own readbuf
  769. hio_free_readbuf(io);
  770. }
  771. }
  772. //-----------------upstream---------------------------------------------
  773. void hio_read_upstream(hio_t* io) {
  774. hio_t* upstream_io = io->upstream_io;
  775. if (upstream_io) {
  776. hio_read(io);
  777. hio_read(upstream_io);
  778. }
  779. }
  780. void hio_read_upstream_on_write_complete(hio_t* io, const void* buf, int writebytes) {
  781. hio_t* upstream_io = io->upstream_io;
  782. if (upstream_io && hio_write_is_complete(io)) {
  783. hio_setcb_write(io, NULL);
  784. hio_read(upstream_io);
  785. }
  786. }
  787. void hio_write_upstream(hio_t* io, void* buf, int bytes) {
  788. hio_t* upstream_io = io->upstream_io;
  789. if (upstream_io) {
  790. int nwrite = hio_write(upstream_io, buf, bytes);
  791. // if (!hio_write_is_complete(upstream_io)) {
  792. if (nwrite >= 0 && nwrite < bytes) {
  793. hio_read_stop(io);
  794. hio_setcb_write(upstream_io, hio_read_upstream_on_write_complete);
  795. }
  796. }
  797. }
  798. void hio_close_upstream(hio_t* io) {
  799. hio_t* upstream_io = io->upstream_io;
  800. if (upstream_io) {
  801. hio_close(upstream_io);
  802. }
  803. }
  804. void hio_setup_upstream(hio_t* io1, hio_t* io2) {
  805. io1->upstream_io = io2;
  806. io2->upstream_io = io1;
  807. }
  808. hio_t* hio_get_upstream(hio_t* io) {
  809. return io->upstream_io;
  810. }
  811. hio_t* hio_setup_tcp_upstream(hio_t* io, const char* host, int port, int ssl) {
  812. hio_t* upstream_io = hio_create_socket(io->loop, host, port, HIO_TYPE_TCP, HIO_CLIENT_SIDE);
  813. if (upstream_io == NULL) return NULL;
  814. if (ssl) hio_enable_ssl(upstream_io);
  815. hio_setup_upstream(io, upstream_io);
  816. hio_setcb_read(io, hio_write_upstream);
  817. hio_setcb_read(upstream_io, hio_write_upstream);
  818. hio_setcb_close(io, hio_close_upstream);
  819. hio_setcb_close(upstream_io, hio_close_upstream);
  820. hio_setcb_connect(upstream_io, hio_read_upstream);
  821. hio_connect(upstream_io);
  822. return upstream_io;
  823. }
  824. hio_t* hio_setup_udp_upstream(hio_t* io, const char* host, int port) {
  825. hio_t* upstream_io = hio_create_socket(io->loop, host, port, HIO_TYPE_UDP, HIO_CLIENT_SIDE);
  826. if (upstream_io == NULL) return NULL;
  827. hio_setup_upstream(io, upstream_io);
  828. hio_setcb_read(io, hio_write_upstream);
  829. hio_setcb_read(upstream_io, hio_write_upstream);
  830. hio_read_upstream(io);
  831. return upstream_io;
  832. }