hevent.c 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906
  1. #include "hevent.h"
  2. #include "hsocket.h"
  3. #include "hatomic.h"
  4. #include "hlog.h"
  5. #include "herr.h"
  6. #include "unpack.h"
  7. uint64_t hloop_next_event_id() {
  8. static hatomic_t s_id = HATOMIC_VAR_INIT(0);
  9. return ++s_id;
  10. }
  11. uint32_t hio_next_id() {
  12. static hatomic_t s_id = HATOMIC_VAR_INIT(0);
  13. return ++s_id;
  14. }
  15. static void fill_io_type(hio_t* io) {
  16. int type = 0;
  17. socklen_t optlen = sizeof(int);
  18. int ret = getsockopt(io->fd, SOL_SOCKET, SO_TYPE, (char*)&type, &optlen);
  19. printd("getsockopt SO_TYPE fd=%d ret=%d type=%d errno=%d\n", io->fd, ret, type, socket_errno());
  20. if (ret == 0) {
  21. switch (type) {
  22. case SOCK_STREAM: io->io_type = HIO_TYPE_TCP; break;
  23. case SOCK_DGRAM: io->io_type = HIO_TYPE_UDP; break;
  24. case SOCK_RAW: io->io_type = HIO_TYPE_IP; break;
  25. default: io->io_type = HIO_TYPE_SOCKET; break;
  26. }
  27. }
  28. else if (socket_errno() == ENOTSOCK) {
  29. switch (io->fd) {
  30. case 0: io->io_type = HIO_TYPE_STDIN; break;
  31. case 1: io->io_type = HIO_TYPE_STDOUT; break;
  32. case 2: io->io_type = HIO_TYPE_STDERR; break;
  33. default: io->io_type = HIO_TYPE_FILE; break;
  34. }
  35. }
  36. else {
  37. io->io_type = HIO_TYPE_TCP;
  38. }
  39. }
  40. static void hio_socket_init(hio_t* io) {
  41. if ((io->io_type & HIO_TYPE_SOCK_DGRAM) || (io->io_type & HIO_TYPE_SOCK_RAW)) {
  42. // NOTE: sendto multiple peeraddr cannot use io->write_queue
  43. blocking(io->fd);
  44. } else {
  45. nonblocking(io->fd);
  46. }
  47. // fill io->localaddr io->peeraddr
  48. if (io->localaddr == NULL) {
  49. HV_ALLOC(io->localaddr, sizeof(sockaddr_u));
  50. }
  51. if (io->peeraddr == NULL) {
  52. HV_ALLOC(io->peeraddr, sizeof(sockaddr_u));
  53. }
  54. socklen_t addrlen = sizeof(sockaddr_u);
  55. int ret = getsockname(io->fd, io->localaddr, &addrlen);
  56. printd("getsockname fd=%d ret=%d errno=%d\n", io->fd, ret, socket_errno());
  57. // NOTE: udp peeraddr set by recvfrom/sendto
  58. if (io->io_type & HIO_TYPE_SOCK_STREAM) {
  59. addrlen = sizeof(sockaddr_u);
  60. ret = getpeername(io->fd, io->peeraddr, &addrlen);
  61. printd("getpeername fd=%d ret=%d errno=%d\n", io->fd, ret, socket_errno());
  62. }
  63. }
  64. void hio_init(hio_t* io) {
  65. // alloc localaddr,peeraddr when hio_socket_init
  66. /*
  67. if (io->localaddr == NULL) {
  68. HV_ALLOC(io->localaddr, sizeof(sockaddr_u));
  69. }
  70. if (io->peeraddr == NULL) {
  71. HV_ALLOC(io->peeraddr, sizeof(sockaddr_u));
  72. }
  73. */
  74. // write_queue init when hwrite try_write failed
  75. // write_queue_init(&io->write_queue, 4);
  76. hrecursive_mutex_init(&io->write_mutex);
  77. }
  78. void hio_ready(hio_t* io) {
  79. if (io->ready) return;
  80. // flags
  81. io->ready = 1;
  82. io->connected = 0;
  83. io->closed = 0;
  84. io->accept = io->connect = io->connectex = 0;
  85. io->recv = io->send = 0;
  86. io->recvfrom = io->sendto = 0;
  87. io->close = 0;
  88. // public:
  89. io->id = hio_next_id();
  90. io->io_type = HIO_TYPE_UNKNOWN;
  91. io->error = 0;
  92. io->events = io->revents = 0;
  93. io->last_read_hrtime = io->last_write_hrtime = io->loop->cur_hrtime;
  94. // readbuf
  95. io->alloced_readbuf = 0;
  96. io->readbuf.base = io->loop->readbuf.base;
  97. io->readbuf.len = io->loop->readbuf.len;
  98. io->readbuf.head = io->readbuf.tail = 0;
  99. io->read_flags = 0;
  100. io->read_until_length = 0;
  101. io->max_read_bufsize = MAX_READ_BUFSIZE;
  102. io->small_readbytes_cnt = 0;
  103. // write_queue
  104. io->write_bufsize = 0;
  105. io->max_write_bufsize = MAX_WRITE_BUFSIZE;
  106. // callbacks
  107. io->read_cb = NULL;
  108. io->write_cb = NULL;
  109. io->close_cb = NULL;
  110. io->accept_cb = NULL;
  111. io->connect_cb = NULL;
  112. // timers
  113. io->connect_timeout = 0;
  114. io->connect_timer = NULL;
  115. io->close_timeout = 0;
  116. io->close_timer = NULL;
  117. io->read_timeout = 0;
  118. io->read_timer = NULL;
  119. io->write_timeout = 0;
  120. io->write_timer = NULL;
  121. io->keepalive_timeout = 0;
  122. io->keepalive_timer = NULL;
  123. io->heartbeat_interval = 0;
  124. io->heartbeat_fn = NULL;
  125. io->heartbeat_timer = NULL;
  126. // upstream
  127. io->upstream_io = NULL;
  128. // unpack
  129. io->unpack_setting = NULL;
  130. // ssl
  131. io->ssl = NULL;
  132. io->ssl_ctx = NULL;
  133. io->alloced_ssl_ctx = 0;
  134. io->hostname = NULL;
  135. // context
  136. io->ctx = NULL;
  137. // private:
  138. #if defined(EVENT_POLL) || defined(EVENT_KQUEUE)
  139. io->event_index[0] = io->event_index[1] = -1;
  140. #endif
  141. #ifdef EVENT_IOCP
  142. io->hovlp = NULL;
  143. #endif
  144. // io_type
  145. fill_io_type(io);
  146. if (io->io_type & HIO_TYPE_SOCKET) {
  147. hio_socket_init(io);
  148. }
  149. #if WITH_RUDP
  150. if ((io->io_type & HIO_TYPE_SOCK_DGRAM) || (io->io_type & HIO_TYPE_SOCK_RAW)) {
  151. rudp_init(&io->rudp);
  152. }
  153. #endif
  154. }
  155. void hio_done(hio_t* io) {
  156. if (!io->ready) return;
  157. io->ready = 0;
  158. hio_del(io, HV_RDWR);
  159. // readbuf
  160. hio_free_readbuf(io);
  161. // write_queue
  162. offset_buf_t* pbuf = NULL;
  163. hrecursive_mutex_lock(&io->write_mutex);
  164. while (!write_queue_empty(&io->write_queue)) {
  165. pbuf = write_queue_front(&io->write_queue);
  166. HV_FREE(pbuf->base);
  167. write_queue_pop_front(&io->write_queue);
  168. }
  169. write_queue_cleanup(&io->write_queue);
  170. hrecursive_mutex_unlock(&io->write_mutex);
  171. #if WITH_RUDP
  172. if ((io->io_type & HIO_TYPE_SOCK_DGRAM) || (io->io_type & HIO_TYPE_SOCK_RAW)) {
  173. rudp_cleanup(&io->rudp);
  174. }
  175. #endif
  176. }
  177. void hio_free(hio_t* io) {
  178. if (io == NULL) return;
  179. hio_close(io);
  180. hrecursive_mutex_destroy(&io->write_mutex);
  181. HV_FREE(io->localaddr);
  182. HV_FREE(io->peeraddr);
  183. HV_FREE(io);
  184. }
  185. bool hio_is_opened(hio_t* io) {
  186. if (io == NULL) return false;
  187. return io->ready == 1 && io->closed == 0;
  188. }
  189. bool hio_is_connected(hio_t* io) {
  190. if (io == NULL) return false;
  191. return io->ready == 1 && io->connected == 1 && io->closed == 0;
  192. }
  193. bool hio_is_closed(hio_t* io) {
  194. if (io == NULL) return true;
  195. return io->ready == 0 && io->closed == 1;
  196. }
  197. uint32_t hio_id (hio_t* io) {
  198. return io->id;
  199. }
  200. int hio_fd(hio_t* io) {
  201. return io->fd;
  202. }
  203. hio_type_e hio_type(hio_t* io) {
  204. return io->io_type;
  205. }
  206. int hio_error(hio_t* io) {
  207. return io->error;
  208. }
  209. int hio_events(hio_t* io) {
  210. return io->events;
  211. }
  212. int hio_revents(hio_t* io) {
  213. return io->revents;
  214. }
  215. struct sockaddr* hio_localaddr(hio_t* io) {
  216. return io->localaddr;
  217. }
  218. struct sockaddr* hio_peeraddr(hio_t* io) {
  219. return io->peeraddr;
  220. }
  221. void hio_set_context(hio_t* io, void* ctx) {
  222. io->ctx = ctx;
  223. }
  224. void* hio_context(hio_t* io) {
  225. return io->ctx;
  226. }
  227. haccept_cb hio_getcb_accept(hio_t* io) {
  228. return io->accept_cb;
  229. }
  230. hconnect_cb hio_getcb_connect(hio_t* io) {
  231. return io->connect_cb;
  232. }
  233. hread_cb hio_getcb_read(hio_t* io) {
  234. return io->read_cb;
  235. }
  236. hwrite_cb hio_getcb_write(hio_t* io) {
  237. return io->write_cb;
  238. }
  239. hclose_cb hio_getcb_close(hio_t* io) {
  240. return io->close_cb;
  241. }
  242. void hio_setcb_accept(hio_t* io, haccept_cb accept_cb) {
  243. io->accept_cb = accept_cb;
  244. }
  245. void hio_setcb_connect(hio_t* io, hconnect_cb connect_cb) {
  246. io->connect_cb = connect_cb;
  247. }
  248. void hio_setcb_read(hio_t* io, hread_cb read_cb) {
  249. io->read_cb = read_cb;
  250. }
  251. void hio_setcb_write(hio_t* io, hwrite_cb write_cb) {
  252. io->write_cb = write_cb;
  253. }
  254. void hio_setcb_close(hio_t* io, hclose_cb close_cb) {
  255. io->close_cb = close_cb;
  256. }
  257. void hio_accept_cb(hio_t* io) {
  258. /*
  259. char localaddrstr[SOCKADDR_STRLEN] = {0};
  260. char peeraddrstr[SOCKADDR_STRLEN] = {0};
  261. printd("accept connfd=%d [%s] <= [%s]\n", io->fd,
  262. SOCKADDR_STR(io->localaddr, localaddrstr),
  263. SOCKADDR_STR(io->peeraddr, peeraddrstr));
  264. */
  265. if (io->accept_cb) {
  266. // printd("accept_cb------\n");
  267. io->accept_cb(io);
  268. // printd("accept_cb======\n");
  269. }
  270. }
  271. void hio_connect_cb(hio_t* io) {
  272. /*
  273. char localaddrstr[SOCKADDR_STRLEN] = {0};
  274. char peeraddrstr[SOCKADDR_STRLEN] = {0};
  275. printd("connect connfd=%d [%s] => [%s]\n", io->fd,
  276. SOCKADDR_STR(io->localaddr, localaddrstr),
  277. SOCKADDR_STR(io->peeraddr, peeraddrstr));
  278. */
  279. io->connected = 1;
  280. if (io->connect_cb) {
  281. // printd("connect_cb------\n");
  282. io->connect_cb(io);
  283. // printd("connect_cb======\n");
  284. }
  285. }
  286. void hio_handle_read(hio_t* io, void* buf, int readbytes) {
  287. #if WITH_KCP
  288. if (io->io_type == HIO_TYPE_KCP) {
  289. hio_read_kcp(io, buf, readbytes);
  290. io->readbuf.head = io->readbuf.tail = 0;
  291. return;
  292. }
  293. #endif
  294. if (io->unpack_setting) {
  295. // hio_set_unpack
  296. hio_unpack(io, buf, readbytes);
  297. } else {
  298. const unsigned char* sp = (const unsigned char*)io->readbuf.base + io->readbuf.head;
  299. const unsigned char* ep = (const unsigned char*)buf + readbytes;
  300. if (io->read_flags & HIO_READ_UNTIL_LENGTH) {
  301. // hio_read_until_length
  302. if (ep - sp >= io->read_until_length) {
  303. io->readbuf.head += io->read_until_length;
  304. if (io->readbuf.head == io->readbuf.tail) {
  305. io->readbuf.head = io->readbuf.tail = 0;
  306. }
  307. io->read_flags &= ~HIO_READ_UNTIL_LENGTH;
  308. hio_read_cb(io, (void*)sp, io->read_until_length);
  309. }
  310. } else if (io->read_flags & HIO_READ_UNTIL_DELIM) {
  311. // hio_read_until_delim
  312. const unsigned char* p = (const unsigned char*)buf;
  313. for (int i = 0; i < readbytes; ++i, ++p) {
  314. if (*p == io->read_until_delim) {
  315. int len = p - sp + 1;
  316. io->readbuf.head += len;
  317. if (io->readbuf.head == io->readbuf.tail) {
  318. io->readbuf.head = io->readbuf.tail = 0;
  319. }
  320. io->read_flags &= ~HIO_READ_UNTIL_DELIM;
  321. hio_read_cb(io, (void*)sp, len);
  322. return;
  323. }
  324. }
  325. } else {
  326. // hio_read
  327. io->readbuf.head = io->readbuf.tail = 0;
  328. hio_read_cb(io, (void*)sp, ep - sp);
  329. }
  330. }
  331. if (io->readbuf.head == io->readbuf.tail) {
  332. io->readbuf.head = io->readbuf.tail = 0;
  333. }
  334. // readbuf autosize
  335. if (io->readbuf.tail == io->readbuf.len) {
  336. if (io->readbuf.head == 0) {
  337. // scale up * 2
  338. hio_alloc_readbuf(io, io->readbuf.len * 2);
  339. } else {
  340. // [head, tail] => [base, tail - head]
  341. memmove(io->readbuf.base, io->readbuf.base + io->readbuf.head, io->readbuf.tail - io->readbuf.head);
  342. }
  343. } else {
  344. size_t small_size = io->readbuf.len / 2;
  345. if (io->readbuf.tail < small_size &&
  346. io->small_readbytes_cnt >= 3) {
  347. // scale down / 2
  348. hio_alloc_readbuf(io, small_size);
  349. }
  350. }
  351. }
  352. void hio_read_cb(hio_t* io, void* buf, int len) {
  353. if (io->read_flags & HIO_READ_ONCE) {
  354. io->read_flags &= ~HIO_READ_ONCE;
  355. hio_read_stop(io);
  356. }
  357. if (io->read_cb) {
  358. // printd("read_cb------\n");
  359. io->read_cb(io, buf, len);
  360. // printd("read_cb======\n");
  361. }
  362. // for readbuf autosize
  363. if (hio_is_alloced_readbuf(io) && io->readbuf.len > READ_BUFSIZE_HIGH_WATER) {
  364. size_t small_size = io->readbuf.len / 2;
  365. if (len < small_size) {
  366. ++io->small_readbytes_cnt;
  367. } else {
  368. io->small_readbytes_cnt = 0;
  369. }
  370. }
  371. }
  372. void hio_write_cb(hio_t* io, const void* buf, int len) {
  373. if (io->write_cb) {
  374. // printd("write_cb------\n");
  375. io->write_cb(io, buf, len);
  376. // printd("write_cb======\n");
  377. }
  378. }
  379. void hio_close_cb(hio_t* io) {
  380. io->connected = 0;
  381. io->closed = 1;
  382. if (io->close_cb) {
  383. // printd("close_cb------\n");
  384. io->close_cb(io);
  385. // printd("close_cb======\n");
  386. }
  387. }
  388. void hio_set_type(hio_t* io, hio_type_e type) {
  389. io->io_type = type;
  390. }
  391. void hio_set_localaddr(hio_t* io, struct sockaddr* addr, int addrlen) {
  392. if (io->localaddr == NULL) {
  393. HV_ALLOC(io->localaddr, sizeof(sockaddr_u));
  394. }
  395. memcpy(io->localaddr, addr, addrlen);
  396. }
  397. void hio_set_peeraddr (hio_t* io, struct sockaddr* addr, int addrlen) {
  398. if (io->peeraddr == NULL) {
  399. HV_ALLOC(io->peeraddr, sizeof(sockaddr_u));
  400. }
  401. memcpy(io->peeraddr, addr, addrlen);
  402. }
  403. int hio_enable_ssl(hio_t* io) {
  404. io->io_type = HIO_TYPE_SSL;
  405. return 0;
  406. }
  407. bool hio_is_ssl(hio_t* io) {
  408. return io->io_type == HIO_TYPE_SSL;
  409. }
  410. hssl_t hio_get_ssl(hio_t* io) {
  411. return io->ssl;
  412. }
  413. hssl_ctx_t hio_get_ssl_ctx(hio_t* io) {
  414. return io->ssl_ctx;
  415. }
  416. int hio_set_ssl(hio_t* io, hssl_t ssl) {
  417. io->io_type = HIO_TYPE_SSL;
  418. io->ssl = ssl;
  419. return 0;
  420. }
  421. int hio_set_ssl_ctx(hio_t* io, hssl_ctx_t ssl_ctx) {
  422. io->io_type = HIO_TYPE_SSL;
  423. io->ssl_ctx = ssl_ctx;
  424. return 0;
  425. }
  426. int hio_new_ssl_ctx(hio_t* io, hssl_ctx_opt_t* opt) {
  427. hssl_ctx_t ssl_ctx = hssl_ctx_new(opt);
  428. if (ssl_ctx == NULL) return ERR_NEW_SSL_CTX;
  429. io->alloced_ssl_ctx = 1;
  430. return hio_set_ssl_ctx(io, ssl_ctx);
  431. }
  432. int hio_set_hostname(hio_t* io, const char* hostname) {
  433. SAFE_FREE(io->hostname);
  434. io->hostname = strdup(hostname);
  435. return 0;
  436. }
  437. const char* hio_get_hostname(hio_t* io) {
  438. return io->hostname;
  439. }
  440. void hio_del_connect_timer(hio_t* io) {
  441. if (io->connect_timer) {
  442. htimer_del(io->connect_timer);
  443. io->connect_timer = NULL;
  444. io->connect_timeout = 0;
  445. }
  446. }
  447. void hio_del_close_timer(hio_t* io) {
  448. if (io->close_timer) {
  449. htimer_del(io->close_timer);
  450. io->close_timer = NULL;
  451. io->close_timeout = 0;
  452. }
  453. }
  454. void hio_del_read_timer(hio_t* io) {
  455. if (io->read_timer) {
  456. htimer_del(io->read_timer);
  457. io->read_timer = NULL;
  458. io->read_timeout = 0;
  459. }
  460. }
  461. void hio_del_write_timer(hio_t* io) {
  462. if (io->write_timer) {
  463. htimer_del(io->write_timer);
  464. io->write_timer = NULL;
  465. io->write_timeout = 0;
  466. }
  467. }
  468. void hio_del_keepalive_timer(hio_t* io) {
  469. if (io->keepalive_timer) {
  470. htimer_del(io->keepalive_timer);
  471. io->keepalive_timer = NULL;
  472. io->keepalive_timeout = 0;
  473. }
  474. }
  475. void hio_del_heartbeat_timer(hio_t* io) {
  476. if (io->heartbeat_timer) {
  477. htimer_del(io->heartbeat_timer);
  478. io->heartbeat_timer = NULL;
  479. io->heartbeat_interval = 0;
  480. io->heartbeat_fn = NULL;
  481. }
  482. }
  483. void hio_set_connect_timeout(hio_t* io, int timeout_ms) {
  484. io->connect_timeout = timeout_ms;
  485. }
  486. void hio_set_close_timeout(hio_t* io, int timeout_ms) {
  487. io->close_timeout = timeout_ms;
  488. }
  489. static void __read_timeout_cb(htimer_t* timer) {
  490. hio_t* io = (hio_t*)timer->privdata;
  491. uint64_t inactive_ms = (io->loop->cur_hrtime - io->last_read_hrtime) / 1000;
  492. if (inactive_ms + 100 < io->read_timeout) {
  493. htimer_reset(io->read_timer, io->read_timeout - inactive_ms);
  494. } else {
  495. if (io->io_type & HIO_TYPE_SOCKET) {
  496. char localaddrstr[SOCKADDR_STRLEN] = {0};
  497. char peeraddrstr[SOCKADDR_STRLEN] = {0};
  498. hlogw("read timeout [%s] <=> [%s]",
  499. SOCKADDR_STR(io->localaddr, localaddrstr),
  500. SOCKADDR_STR(io->peeraddr, peeraddrstr));
  501. }
  502. io->error = ETIMEDOUT;
  503. hio_close(io);
  504. }
  505. }
  506. void hio_set_read_timeout(hio_t* io, int timeout_ms) {
  507. if (timeout_ms <= 0) {
  508. // del
  509. hio_del_read_timer(io);
  510. return;
  511. }
  512. if (io->read_timer) {
  513. // reset
  514. htimer_reset(io->read_timer, timeout_ms);
  515. } else {
  516. // add
  517. io->read_timer = htimer_add(io->loop, __read_timeout_cb, timeout_ms, 1);
  518. io->read_timer->privdata = io;
  519. }
  520. io->read_timeout = timeout_ms;
  521. }
  522. static void __write_timeout_cb(htimer_t* timer) {
  523. hio_t* io = (hio_t*)timer->privdata;
  524. uint64_t inactive_ms = (io->loop->cur_hrtime - io->last_write_hrtime) / 1000;
  525. if (inactive_ms + 100 < io->write_timeout) {
  526. htimer_reset(io->write_timer, io->write_timeout - inactive_ms);
  527. } else {
  528. if (io->io_type & HIO_TYPE_SOCKET) {
  529. char localaddrstr[SOCKADDR_STRLEN] = {0};
  530. char peeraddrstr[SOCKADDR_STRLEN] = {0};
  531. hlogw("write timeout [%s] <=> [%s]",
  532. SOCKADDR_STR(io->localaddr, localaddrstr),
  533. SOCKADDR_STR(io->peeraddr, peeraddrstr));
  534. }
  535. io->error = ETIMEDOUT;
  536. hio_close(io);
  537. }
  538. }
  539. void hio_set_write_timeout(hio_t* io, int timeout_ms) {
  540. if (timeout_ms <= 0) {
  541. // del
  542. hio_del_write_timer(io);
  543. return;
  544. }
  545. if (io->write_timer) {
  546. // reset
  547. htimer_reset(io->write_timer, timeout_ms);
  548. } else {
  549. // add
  550. io->write_timer = htimer_add(io->loop, __write_timeout_cb, timeout_ms, 1);
  551. io->write_timer->privdata = io;
  552. }
  553. io->write_timeout = timeout_ms;
  554. }
  555. static void __keepalive_timeout_cb(htimer_t* timer) {
  556. hio_t* io = (hio_t*)timer->privdata;
  557. uint64_t last_rw_hrtime = MAX(io->last_read_hrtime, io->last_write_hrtime);
  558. uint64_t inactive_ms = (io->loop->cur_hrtime - last_rw_hrtime) / 1000;
  559. if (inactive_ms + 100 < io->keepalive_timeout) {
  560. htimer_reset(io->keepalive_timer, io->keepalive_timeout - inactive_ms);
  561. } else {
  562. char localaddrstr[SOCKADDR_STRLEN] = {0};
  563. char peeraddrstr[SOCKADDR_STRLEN] = {0};
  564. hlogw("keepalive timeout [%s] <=> [%s]",
  565. SOCKADDR_STR(io->localaddr, localaddrstr),
  566. SOCKADDR_STR(io->peeraddr, peeraddrstr));
  567. io->error = ETIMEDOUT;
  568. hio_close(io);
  569. }
  570. }
  571. void hio_set_keepalive_timeout(hio_t* io, int timeout_ms) {
  572. if (timeout_ms <= 0) {
  573. // del
  574. hio_del_keepalive_timer(io);
  575. return;
  576. }
  577. if (io->keepalive_timer) {
  578. // reset
  579. htimer_reset(io->keepalive_timer, timeout_ms);
  580. } else {
  581. // add
  582. io->keepalive_timer = htimer_add(io->loop, __keepalive_timeout_cb, timeout_ms, 1);
  583. io->keepalive_timer->privdata = io;
  584. }
  585. io->keepalive_timeout = timeout_ms;
  586. }
  587. static void __heartbeat_timer_cb(htimer_t* timer) {
  588. hio_t* io = (hio_t*)timer->privdata;
  589. if (io && io->heartbeat_fn) {
  590. io->heartbeat_fn(io);
  591. }
  592. }
  593. void hio_set_heartbeat(hio_t* io, int interval_ms, hio_send_heartbeat_fn fn) {
  594. if (interval_ms <= 0) {
  595. // del
  596. hio_del_heartbeat_timer(io);
  597. return;
  598. }
  599. if (io->heartbeat_timer) {
  600. // reset
  601. htimer_reset(io->heartbeat_timer, interval_ms);
  602. } else {
  603. // add
  604. io->heartbeat_timer = htimer_add(io->loop, __heartbeat_timer_cb, interval_ms, INFINITE);
  605. io->heartbeat_timer->privdata = io;
  606. }
  607. io->heartbeat_interval = interval_ms;
  608. io->heartbeat_fn = fn;
  609. }
  610. //-----------------iobuf---------------------------------------------
  611. void hio_alloc_readbuf(hio_t* io, int len) {
  612. if (len > io->max_read_bufsize) {
  613. hloge("read bufsize > %u, close it!", io->max_read_bufsize);
  614. io->error = ERR_OVER_LIMIT;
  615. hio_close_async(io);
  616. return;
  617. }
  618. if (hio_is_alloced_readbuf(io)) {
  619. io->readbuf.base = (char*)hv_realloc(io->readbuf.base, len, io->readbuf.len);
  620. } else {
  621. HV_ALLOC(io->readbuf.base, len);
  622. }
  623. io->readbuf.len = len;
  624. io->alloced_readbuf = 1;
  625. io->small_readbytes_cnt = 0;
  626. }
  627. void hio_free_readbuf(hio_t* io) {
  628. if (hio_is_alloced_readbuf(io)) {
  629. HV_FREE(io->readbuf.base);
  630. io->alloced_readbuf = 0;
  631. // reset to loop->readbuf
  632. io->readbuf.base = io->loop->readbuf.base;
  633. io->readbuf.len = io->loop->readbuf.len;
  634. }
  635. }
  636. void hio_set_readbuf(hio_t* io, void* buf, size_t len) {
  637. assert(io && buf && len != 0);
  638. hio_free_readbuf(io);
  639. io->readbuf.base = (char*)buf;
  640. io->readbuf.len = len;
  641. io->readbuf.head = io->readbuf.tail = 0;
  642. io->alloced_readbuf = 0;
  643. }
  644. hio_readbuf_t* hio_get_readbuf(hio_t* io) {
  645. return &io->readbuf;
  646. }
  647. void hio_set_max_read_bufsize (hio_t* io, uint32_t size) {
  648. io->max_read_bufsize = size;
  649. }
  650. void hio_set_max_write_bufsize(hio_t* io, uint32_t size) {
  651. io->max_write_bufsize = size;
  652. }
  653. size_t hio_write_bufsize(hio_t* io) {
  654. return io->write_bufsize;
  655. }
  656. int hio_read_once (hio_t* io) {
  657. io->read_flags |= HIO_READ_ONCE;
  658. return hio_read_start(io);
  659. }
  660. int hio_read_until_length(hio_t* io, unsigned int len) {
  661. if (len == 0) return 0;
  662. if (io->readbuf.tail - io->readbuf.head >= len) {
  663. void* buf = io->readbuf.base + io->readbuf.head;
  664. io->readbuf.head += len;
  665. if (io->readbuf.head == io->readbuf.tail) {
  666. io->readbuf.head = io->readbuf.tail = 0;
  667. }
  668. hio_read_cb(io, buf, len);
  669. return len;
  670. }
  671. io->read_flags = HIO_READ_UNTIL_LENGTH;
  672. io->read_until_length = len;
  673. // NOTE: prepare readbuf
  674. if (hio_is_loop_readbuf(io) ||
  675. io->readbuf.len < len) {
  676. hio_alloc_readbuf(io, len);
  677. }
  678. return hio_read_once(io);
  679. }
  680. int hio_read_until_delim(hio_t* io, unsigned char delim) {
  681. if (io->readbuf.tail - io->readbuf.head > 0) {
  682. const unsigned char* sp = (const unsigned char*)io->readbuf.base + io->readbuf.head;
  683. const unsigned char* ep = (const unsigned char*)io->readbuf.base + io->readbuf.tail;
  684. const unsigned char* p = sp;
  685. while (p <= ep) {
  686. if (*p == delim) {
  687. int len = p - sp + 1;
  688. io->readbuf.head += len;
  689. if (io->readbuf.head == io->readbuf.tail) {
  690. io->readbuf.head = io->readbuf.tail = 0;
  691. }
  692. hio_read_cb(io, (void*)sp, len);
  693. return len;
  694. }
  695. ++p;
  696. }
  697. }
  698. io->read_flags = HIO_READ_UNTIL_DELIM;
  699. io->read_until_length = delim;
  700. // NOTE: prepare readbuf
  701. if (hio_is_loop_readbuf(io) ||
  702. io->readbuf.len < HLOOP_READ_BUFSIZE) {
  703. hio_alloc_readbuf(io, HLOOP_READ_BUFSIZE);
  704. }
  705. return hio_read_once(io);
  706. }
  707. int hio_read_remain(hio_t* io) {
  708. int remain = io->readbuf.tail - io->readbuf.head;
  709. if (remain > 0) {
  710. void* buf = io->readbuf.base + io->readbuf.head;
  711. io->readbuf.head = io->readbuf.tail = 0;
  712. hio_read_cb(io, buf, remain);
  713. }
  714. return remain;
  715. }
  716. //-----------------unpack---------------------------------------------
  717. void hio_set_unpack(hio_t* io, unpack_setting_t* setting) {
  718. hio_unset_unpack(io);
  719. if (setting == NULL) return;
  720. io->unpack_setting = setting;
  721. if (io->unpack_setting->package_max_length == 0) {
  722. io->unpack_setting->package_max_length = DEFAULT_PACKAGE_MAX_LENGTH;
  723. }
  724. if (io->unpack_setting->mode == UNPACK_BY_FIXED_LENGTH) {
  725. assert(io->unpack_setting->fixed_length != 0 &&
  726. io->unpack_setting->fixed_length <= io->unpack_setting->package_max_length);
  727. }
  728. else if (io->unpack_setting->mode == UNPACK_BY_DELIMITER) {
  729. if (io->unpack_setting->delimiter_bytes == 0) {
  730. io->unpack_setting->delimiter_bytes = strlen((char*)io->unpack_setting->delimiter);
  731. }
  732. }
  733. else if (io->unpack_setting->mode == UNPACK_BY_LENGTH_FIELD) {
  734. assert(io->unpack_setting->body_offset >=
  735. io->unpack_setting->length_field_offset +
  736. io->unpack_setting->length_field_bytes);
  737. }
  738. // NOTE: unpack must have own readbuf
  739. if (io->unpack_setting->mode == UNPACK_BY_FIXED_LENGTH) {
  740. io->readbuf.len = io->unpack_setting->fixed_length;
  741. } else {
  742. io->readbuf.len = MIN(HLOOP_READ_BUFSIZE, io->unpack_setting->package_max_length);
  743. }
  744. io->max_read_bufsize = io->unpack_setting->package_max_length;
  745. hio_alloc_readbuf(io, io->readbuf.len);
  746. }
  747. void hio_unset_unpack(hio_t* io) {
  748. if (io->unpack_setting) {
  749. io->unpack_setting = NULL;
  750. // NOTE: unpack has own readbuf
  751. hio_free_readbuf(io);
  752. }
  753. }
  754. //-----------------upstream---------------------------------------------
  755. void hio_read_upstream(hio_t* io) {
  756. hio_t* upstream_io = io->upstream_io;
  757. if (upstream_io) {
  758. hio_read(io);
  759. hio_read(upstream_io);
  760. }
  761. }
  762. void hio_write_upstream(hio_t* io, void* buf, int bytes) {
  763. hio_t* upstream_io = io->upstream_io;
  764. if (upstream_io) {
  765. hio_write(upstream_io, buf, bytes);
  766. }
  767. }
  768. void hio_close_upstream(hio_t* io) {
  769. hio_t* upstream_io = io->upstream_io;
  770. if (upstream_io) {
  771. hio_close(upstream_io);
  772. }
  773. }
  774. void hio_setup_upstream(hio_t* io1, hio_t* io2) {
  775. io1->upstream_io = io2;
  776. io2->upstream_io = io1;
  777. }
  778. hio_t* hio_get_upstream(hio_t* io) {
  779. return io->upstream_io;
  780. }
  781. hio_t* hio_setup_tcp_upstream(hio_t* io, const char* host, int port, int ssl) {
  782. hio_t* upstream_io = hio_create_socket(io->loop, host, port, HIO_TYPE_TCP, HIO_CLIENT_SIDE);
  783. if (upstream_io == NULL) return NULL;
  784. if (ssl) hio_enable_ssl(upstream_io);
  785. hio_setup_upstream(io, upstream_io);
  786. hio_setcb_read(io, hio_write_upstream);
  787. hio_setcb_read(upstream_io, hio_write_upstream);
  788. hio_setcb_close(io, hio_close_upstream);
  789. hio_setcb_close(upstream_io, hio_close_upstream);
  790. hio_setcb_connect(upstream_io, hio_read_upstream);
  791. hio_connect(upstream_io);
  792. return upstream_io;
  793. }
  794. hio_t* hio_setup_udp_upstream(hio_t* io, const char* host, int port) {
  795. hio_t* upstream_io = hio_create_socket(io->loop, host, port, HIO_TYPE_UDP, HIO_CLIENT_SIDE);
  796. if (upstream_io == NULL) return NULL;
  797. hio_setup_upstream(io, upstream_io);
  798. hio_setcb_read(io, hio_write_upstream);
  799. hio_setcb_read(upstream_io, hio_write_upstream);
  800. hio_read_upstream(io);
  801. return upstream_io;
  802. }