hevent.c 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741
  1. #include "hevent.h"
  2. #include "hsocket.h"
  3. #include "hatomic.h"
  4. #include "hlog.h"
  5. #include "unpack.h"
  6. uint64_t hloop_next_event_id() {
  7. static hatomic_t s_id = HATOMIC_VAR_INIT(0);
  8. return ++s_id;
  9. }
  10. uint32_t hio_next_id() {
  11. static hatomic_t s_id = HATOMIC_VAR_INIT(0);
  12. return ++s_id;
  13. }
  14. static void fill_io_type(hio_t* io) {
  15. int type = 0;
  16. socklen_t optlen = sizeof(int);
  17. int ret = getsockopt(io->fd, SOL_SOCKET, SO_TYPE, (char*)&type, &optlen);
  18. printd("getsockopt SO_TYPE fd=%d ret=%d type=%d errno=%d\n", io->fd, ret, type, socket_errno());
  19. if (ret == 0) {
  20. switch (type) {
  21. case SOCK_STREAM: io->io_type = HIO_TYPE_TCP; break;
  22. case SOCK_DGRAM: io->io_type = HIO_TYPE_UDP; break;
  23. case SOCK_RAW: io->io_type = HIO_TYPE_IP; break;
  24. default: io->io_type = HIO_TYPE_SOCKET; break;
  25. }
  26. }
  27. else if (socket_errno() == ENOTSOCK) {
  28. switch (io->fd) {
  29. case 0: io->io_type = HIO_TYPE_STDIN; break;
  30. case 1: io->io_type = HIO_TYPE_STDOUT; break;
  31. case 2: io->io_type = HIO_TYPE_STDERR; break;
  32. default: io->io_type = HIO_TYPE_FILE; break;
  33. }
  34. }
  35. else {
  36. io->io_type = HIO_TYPE_TCP;
  37. }
  38. }
  39. static void hio_socket_init(hio_t* io) {
  40. if (io->io_type & HIO_TYPE_SOCK_RAW || io->io_type & HIO_TYPE_SOCK_DGRAM) {
  41. // NOTE: sendto multiple peeraddr cannot use io->write_queue
  42. blocking(io->fd);
  43. } else {
  44. nonblocking(io->fd);
  45. }
  46. // fill io->localaddr io->peeraddr
  47. if (io->localaddr == NULL) {
  48. HV_ALLOC(io->localaddr, sizeof(sockaddr_u));
  49. }
  50. if (io->peeraddr == NULL) {
  51. HV_ALLOC(io->peeraddr, sizeof(sockaddr_u));
  52. }
  53. socklen_t addrlen = sizeof(sockaddr_u);
  54. int ret = getsockname(io->fd, io->localaddr, &addrlen);
  55. printd("getsockname fd=%d ret=%d errno=%d\n", io->fd, ret, socket_errno());
  56. // NOTE: udp peeraddr set by recvfrom/sendto
  57. if (io->io_type & HIO_TYPE_SOCK_STREAM) {
  58. addrlen = sizeof(sockaddr_u);
  59. ret = getpeername(io->fd, io->peeraddr, &addrlen);
  60. printd("getpeername fd=%d ret=%d errno=%d\n", io->fd, ret, socket_errno());
  61. }
  62. }
  63. void hio_init(hio_t* io) {
  64. // alloc localaddr,peeraddr when hio_socket_init
  65. /*
  66. if (io->localaddr == NULL) {
  67. HV_ALLOC(io->localaddr, sizeof(sockaddr_u));
  68. }
  69. if (io->peeraddr == NULL) {
  70. HV_ALLOC(io->peeraddr, sizeof(sockaddr_u));
  71. }
  72. */
  73. // write_queue init when hwrite try_write failed
  74. // write_queue_init(&io->write_queue, 4);
  75. hrecursive_mutex_init(&io->write_mutex);
  76. }
  77. void hio_ready(hio_t* io) {
  78. if (io->ready) return;
  79. // flags
  80. io->ready = 1;
  81. io->closed = 0;
  82. io->accept = io->connect = io->connectex = 0;
  83. io->recv = io->send = 0;
  84. io->recvfrom = io->sendto = 0;
  85. io->close = 0;
  86. // public:
  87. io->id = hio_next_id();
  88. io->io_type = HIO_TYPE_UNKNOWN;
  89. io->error = 0;
  90. io->events = io->revents = 0;
  91. // readbuf
  92. io->alloced_readbuf = 0;
  93. io->readbuf.base = io->loop->readbuf.base;
  94. io->readbuf.len = io->loop->readbuf.len;
  95. io->readbuf.head = io->readbuf.tail = 0;
  96. io->read_flags = 0;
  97. io->read_until_length = 0;
  98. io->small_readbytes_cnt = 0;
  99. // write_queue
  100. io->write_queue_bytes = 0;
  101. // callbacks
  102. io->read_cb = NULL;
  103. io->write_cb = NULL;
  104. io->close_cb = NULL;
  105. io->accept_cb = NULL;
  106. io->connect_cb = NULL;
  107. // timers
  108. io->connect_timeout = 0;
  109. io->connect_timer = NULL;
  110. io->close_timeout = 0;
  111. io->close_timer = NULL;
  112. io->keepalive_timeout = 0;
  113. io->keepalive_timer = NULL;
  114. io->heartbeat_interval = 0;
  115. io->heartbeat_fn = NULL;
  116. io->heartbeat_timer = NULL;
  117. // upstream
  118. io->upstream_io = NULL;
  119. // unpack
  120. io->unpack_setting = NULL;
  121. // ssl
  122. io->ssl = NULL;
  123. // context
  124. io->ctx = NULL;
  125. // private:
  126. #if defined(EVENT_POLL) || defined(EVENT_KQUEUE)
  127. io->event_index[0] = io->event_index[1] = -1;
  128. #endif
  129. #ifdef EVENT_IOCP
  130. io->hovlp = NULL;
  131. #endif
  132. // io_type
  133. fill_io_type(io);
  134. if (io->io_type & HIO_TYPE_SOCKET) {
  135. hio_socket_init(io);
  136. }
  137. #if WITH_RUDP
  138. if (io->io_type & HIO_TYPE_SOCK_RAW || io->io_type & HIO_TYPE_SOCK_DGRAM) {
  139. rudp_init(&io->rudp);
  140. }
  141. #endif
  142. }
  143. void hio_done(hio_t* io) {
  144. if (!io->ready) return;
  145. io->ready = 0;
  146. hio_del(io, HV_RDWR);
  147. // readbuf
  148. hio_free_readbuf(io);
  149. // write_queue
  150. offset_buf_t* pbuf = NULL;
  151. hrecursive_mutex_lock(&io->write_mutex);
  152. while (!write_queue_empty(&io->write_queue)) {
  153. pbuf = write_queue_front(&io->write_queue);
  154. HV_FREE(pbuf->base);
  155. write_queue_pop_front(&io->write_queue);
  156. }
  157. write_queue_cleanup(&io->write_queue);
  158. hrecursive_mutex_unlock(&io->write_mutex);
  159. #if WITH_RUDP
  160. if (io->io_type & HIO_TYPE_SOCK_RAW || io->io_type & HIO_TYPE_SOCK_DGRAM) {
  161. rudp_cleanup(&io->rudp);
  162. }
  163. #endif
  164. }
  165. void hio_free(hio_t* io) {
  166. if (io == NULL) return;
  167. hio_close(io);
  168. hrecursive_mutex_destroy(&io->write_mutex);
  169. HV_FREE(io->localaddr);
  170. HV_FREE(io->peeraddr);
  171. HV_FREE(io);
  172. }
  173. bool hio_is_opened(hio_t* io) {
  174. if (io == NULL) return false;
  175. return io->ready == 1 && io->closed == 0;
  176. }
  177. bool hio_is_closed(hio_t* io) {
  178. if (io == NULL) return true;
  179. return io->ready == 0 && io->closed == 1;
  180. }
  181. uint32_t hio_id (hio_t* io) {
  182. return io->id;
  183. }
  184. int hio_fd(hio_t* io) {
  185. return io->fd;
  186. }
  187. hio_type_e hio_type(hio_t* io) {
  188. return io->io_type;
  189. }
  190. int hio_error(hio_t* io) {
  191. return io->error;
  192. }
  193. int hio_events(hio_t* io) {
  194. return io->events;
  195. }
  196. int hio_revents(hio_t* io) {
  197. return io->revents;
  198. }
  199. struct sockaddr* hio_localaddr(hio_t* io) {
  200. return io->localaddr;
  201. }
  202. struct sockaddr* hio_peeraddr(hio_t* io) {
  203. return io->peeraddr;
  204. }
  205. void hio_set_context(hio_t* io, void* ctx) {
  206. io->ctx = ctx;
  207. }
  208. void* hio_context(hio_t* io) {
  209. return io->ctx;
  210. }
  211. size_t hio_read_bufsize(hio_t* io) {
  212. return io->readbuf.len;
  213. }
  214. size_t hio_write_bufsize(hio_t* io) {
  215. return io->write_queue_bytes;
  216. }
  217. haccept_cb hio_getcb_accept(hio_t* io) {
  218. return io->accept_cb;
  219. }
  220. hconnect_cb hio_getcb_connect(hio_t* io) {
  221. return io->connect_cb;
  222. }
  223. hread_cb hio_getcb_read(hio_t* io) {
  224. return io->read_cb;
  225. }
  226. hwrite_cb hio_getcb_write(hio_t* io) {
  227. return io->write_cb;
  228. }
  229. hclose_cb hio_getcb_close(hio_t* io) {
  230. return io->close_cb;
  231. }
  232. void hio_setcb_accept(hio_t* io, haccept_cb accept_cb) {
  233. io->accept_cb = accept_cb;
  234. }
  235. void hio_setcb_connect(hio_t* io, hconnect_cb connect_cb) {
  236. io->connect_cb = connect_cb;
  237. }
  238. void hio_setcb_read(hio_t* io, hread_cb read_cb) {
  239. io->read_cb = read_cb;
  240. }
  241. void hio_setcb_write(hio_t* io, hwrite_cb write_cb) {
  242. io->write_cb = write_cb;
  243. }
  244. void hio_setcb_close(hio_t* io, hclose_cb close_cb) {
  245. io->close_cb = close_cb;
  246. }
  247. void hio_accept_cb(hio_t* io) {
  248. /*
  249. char localaddrstr[SOCKADDR_STRLEN] = {0};
  250. char peeraddrstr[SOCKADDR_STRLEN] = {0};
  251. printd("accept connfd=%d [%s] <= [%s]\n", io->fd,
  252. SOCKADDR_STR(io->localaddr, localaddrstr),
  253. SOCKADDR_STR(io->peeraddr, peeraddrstr));
  254. */
  255. if (io->accept_cb) {
  256. // printd("accept_cb------\n");
  257. io->accept_cb(io);
  258. // printd("accept_cb======\n");
  259. }
  260. }
  261. void hio_connect_cb(hio_t* io) {
  262. /*
  263. char localaddrstr[SOCKADDR_STRLEN] = {0};
  264. char peeraddrstr[SOCKADDR_STRLEN] = {0};
  265. printd("connect connfd=%d [%s] => [%s]\n", io->fd,
  266. SOCKADDR_STR(io->localaddr, localaddrstr),
  267. SOCKADDR_STR(io->peeraddr, peeraddrstr));
  268. */
  269. if (io->connect_cb) {
  270. // printd("connect_cb------\n");
  271. io->connect_cb(io);
  272. // printd("connect_cb======\n");
  273. }
  274. }
  275. void hio_handle_read(hio_t* io, void* buf, int readbytes) {
  276. #if WITH_KCP
  277. if (io->io_type == HIO_TYPE_KCP) {
  278. hio_read_kcp(io, buf, readbytes);
  279. return;
  280. }
  281. #endif
  282. if (io->unpack_setting) {
  283. // hio_set_unpack
  284. hio_unpack(io, buf, readbytes);
  285. } else {
  286. const unsigned char* sp = (const unsigned char*)io->readbuf.base + io->readbuf.head;
  287. const unsigned char* ep = (const unsigned char*)buf + readbytes;
  288. if (io->read_flags & HIO_READ_UNTIL_LENGTH) {
  289. // hio_read_until_length
  290. if (ep - sp >= io->read_until_length) {
  291. io->readbuf.head += io->read_until_length;
  292. if (io->readbuf.head == io->readbuf.tail) {
  293. io->readbuf.head = io->readbuf.tail = 0;
  294. }
  295. io->readbuf.head = io->readbuf.tail = 0;
  296. io->read_flags &= ~HIO_READ_UNTIL_LENGTH;
  297. hio_read_cb(io, (void*)sp, io->read_until_length);
  298. }
  299. } else if (io->read_flags & HIO_READ_UNTIL_DELIM) {
  300. // hio_read_until_delim
  301. const unsigned char* p = (const unsigned char*)buf;
  302. for (int i = 0; i < readbytes; ++i, ++p) {
  303. if (*p == io->read_until_delim) {
  304. int len = p - sp + 1;
  305. io->readbuf.head += len;
  306. if (io->readbuf.head == io->readbuf.tail) {
  307. io->readbuf.head = io->readbuf.tail = 0;
  308. }
  309. io->read_flags &= ~HIO_READ_UNTIL_DELIM;
  310. hio_read_cb(io, (void*)sp, len);
  311. return;
  312. }
  313. }
  314. } else {
  315. // hio_read
  316. io->readbuf.head = io->readbuf.tail = 0;
  317. hio_read_cb(io, (void*)sp, ep - sp);
  318. }
  319. }
  320. if (io->readbuf.head == io->readbuf.tail) {
  321. io->readbuf.head = io->readbuf.tail = 0;
  322. }
  323. // readbuf autosize
  324. if (io->readbuf.tail == io->readbuf.len) {
  325. if (io->readbuf.head == 0) {
  326. // scale up * 2
  327. hio_alloc_readbuf(io, io->readbuf.len * 2);
  328. } else {
  329. // [head, tail] => [base, tail - head]
  330. memmove(io->readbuf.base, io->readbuf.base + io->readbuf.head, io->readbuf.tail - io->readbuf.head);
  331. }
  332. } else {
  333. size_t small_size = io->readbuf.len / 2;
  334. if (io->readbuf.tail < small_size &&
  335. io->small_readbytes_cnt >= 3) {
  336. // scale down / 2
  337. hio_alloc_readbuf(io, small_size);
  338. }
  339. }
  340. }
  341. void hio_read_cb(hio_t* io, void* buf, int len) {
  342. if (io->read_flags & HIO_READ_ONCE) {
  343. io->read_flags &= ~HIO_READ_ONCE;
  344. hio_read_stop(io);
  345. }
  346. if (io->read_cb) {
  347. // printd("read_cb------\n");
  348. io->read_cb(io, buf, len);
  349. // printd("read_cb======\n");
  350. }
  351. // for readbuf autosize
  352. if (hio_is_alloced_readbuf(io) && io->readbuf.len > READ_BUFSIZE_HIGH_WATER) {
  353. size_t small_size = io->readbuf.len / 2;
  354. if (len < small_size) {
  355. ++io->small_readbytes_cnt;
  356. } else {
  357. io->small_readbytes_cnt = 0;
  358. }
  359. }
  360. }
  361. void hio_write_cb(hio_t* io, const void* buf, int len) {
  362. if (io->write_cb) {
  363. // printd("write_cb------\n");
  364. io->write_cb(io, buf, len);
  365. // printd("write_cb======\n");
  366. }
  367. }
  368. void hio_close_cb(hio_t* io) {
  369. if (io->close_cb) {
  370. // printd("close_cb------\n");
  371. io->close_cb(io);
  372. // printd("close_cb======\n");
  373. }
  374. }
  375. void hio_set_type(hio_t* io, hio_type_e type) {
  376. io->io_type = type;
  377. }
  378. void hio_set_localaddr(hio_t* io, struct sockaddr* addr, int addrlen) {
  379. if (io->localaddr == NULL) {
  380. HV_ALLOC(io->localaddr, sizeof(sockaddr_u));
  381. }
  382. memcpy(io->localaddr, addr, addrlen);
  383. }
  384. void hio_set_peeraddr (hio_t* io, struct sockaddr* addr, int addrlen) {
  385. if (io->peeraddr == NULL) {
  386. HV_ALLOC(io->peeraddr, sizeof(sockaddr_u));
  387. }
  388. memcpy(io->peeraddr, addr, addrlen);
  389. }
  390. int hio_enable_ssl(hio_t* io) {
  391. io->io_type = HIO_TYPE_SSL;
  392. return 0;
  393. }
  394. bool hio_is_ssl(hio_t* io) {
  395. return io->io_type == HIO_TYPE_SSL;
  396. }
  397. hssl_t hio_get_ssl(hio_t* io) {
  398. return io->ssl;
  399. }
  400. int hio_set_ssl(hio_t* io, hssl_t ssl) {
  401. io->io_type = HIO_TYPE_SSL;
  402. io->ssl = ssl;
  403. return 0;
  404. }
  405. void hio_set_readbuf(hio_t* io, void* buf, size_t len) {
  406. assert(io && buf && len != 0);
  407. hio_free_readbuf(io);
  408. io->readbuf.base = (char*)buf;
  409. io->readbuf.len = len;
  410. io->readbuf.head = io->readbuf.tail = 0;
  411. io->alloced_readbuf = 0;
  412. }
  413. void hio_del_connect_timer(hio_t* io) {
  414. if (io->connect_timer) {
  415. htimer_del(io->connect_timer);
  416. io->connect_timer = NULL;
  417. io->connect_timeout = 0;
  418. }
  419. }
  420. void hio_del_close_timer(hio_t* io) {
  421. if (io->close_timer) {
  422. htimer_del(io->close_timer);
  423. io->close_timer = NULL;
  424. io->close_timeout = 0;
  425. }
  426. }
  427. void hio_del_keepalive_timer(hio_t* io) {
  428. if (io->keepalive_timer) {
  429. htimer_del(io->keepalive_timer);
  430. io->keepalive_timer = NULL;
  431. io->keepalive_timeout = 0;
  432. }
  433. }
  434. void hio_del_heartbeat_timer(hio_t* io) {
  435. if (io->heartbeat_timer) {
  436. htimer_del(io->heartbeat_timer);
  437. io->heartbeat_timer = NULL;
  438. io->heartbeat_interval = 0;
  439. io->heartbeat_fn = NULL;
  440. }
  441. }
  442. void hio_set_connect_timeout(hio_t* io, int timeout_ms) {
  443. io->connect_timeout = timeout_ms;
  444. }
  445. void hio_set_close_timeout(hio_t* io, int timeout_ms) {
  446. io->close_timeout = timeout_ms;
  447. }
  448. static void __keepalive_timeout_cb(htimer_t* timer) {
  449. hio_t* io = (hio_t*)timer->privdata;
  450. if (io) {
  451. char localaddrstr[SOCKADDR_STRLEN] = {0};
  452. char peeraddrstr[SOCKADDR_STRLEN] = {0};
  453. hlogw("keepalive timeout [%s] <=> [%s]",
  454. SOCKADDR_STR(io->localaddr, localaddrstr),
  455. SOCKADDR_STR(io->peeraddr, peeraddrstr));
  456. io->error = ETIMEDOUT;
  457. hio_close(io);
  458. }
  459. }
  460. void hio_set_keepalive_timeout(hio_t* io, int timeout_ms) {
  461. if (timeout_ms == 0) {
  462. // del
  463. hio_del_keepalive_timer(io);
  464. return;
  465. }
  466. if (io->keepalive_timer) {
  467. // reset
  468. ((struct htimeout_s*)io->keepalive_timer)->timeout = timeout_ms;
  469. htimer_reset(io->keepalive_timer);
  470. } else {
  471. // add
  472. io->keepalive_timer = htimer_add(io->loop, __keepalive_timeout_cb, timeout_ms, 1);
  473. io->keepalive_timer->privdata = io;
  474. }
  475. io->keepalive_timeout = timeout_ms;
  476. }
  477. static void __heartbeat_timer_cb(htimer_t* timer) {
  478. hio_t* io = (hio_t*)timer->privdata;
  479. if (io && io->heartbeat_fn) {
  480. io->heartbeat_fn(io);
  481. }
  482. }
  483. void hio_set_heartbeat(hio_t* io, int interval_ms, hio_send_heartbeat_fn fn) {
  484. if (interval_ms == 0) {
  485. // del
  486. hio_del_heartbeat_timer(io);
  487. return;
  488. }
  489. if (io->heartbeat_timer) {
  490. // reset
  491. ((struct htimeout_s*)io->heartbeat_timer)->timeout = interval_ms;
  492. htimer_reset(io->heartbeat_timer);
  493. } else {
  494. // add
  495. io->heartbeat_timer = htimer_add(io->loop, __heartbeat_timer_cb, interval_ms, INFINITE);
  496. io->heartbeat_timer->privdata = io;
  497. }
  498. io->heartbeat_interval = interval_ms;
  499. io->heartbeat_fn = fn;
  500. }
  501. void hio_alloc_readbuf(hio_t* io, int len) {
  502. if (hio_is_alloced_readbuf(io)) {
  503. io->readbuf.base = (char*)safe_realloc(io->readbuf.base, len, io->readbuf.len);
  504. } else {
  505. HV_ALLOC(io->readbuf.base, len);
  506. }
  507. io->readbuf.len = len;
  508. io->alloced_readbuf = 1;
  509. io->small_readbytes_cnt = 0;
  510. }
  511. void hio_free_readbuf(hio_t* io) {
  512. if (hio_is_alloced_readbuf(io)) {
  513. HV_FREE(io->readbuf.base);
  514. io->alloced_readbuf = 0;
  515. // reset to loop->readbuf
  516. io->readbuf.base = io->loop->readbuf.base;
  517. io->readbuf.len = io->loop->readbuf.len;
  518. }
  519. }
  520. int hio_read_once (hio_t* io) {
  521. io->read_flags |= HIO_READ_ONCE;
  522. return hio_read_start(io);
  523. }
  524. int hio_read_until_length(hio_t* io, unsigned int len) {
  525. if (len == 0) return 0;
  526. if (io->readbuf.tail - io->readbuf.head >= len) {
  527. void* buf = io->readbuf.base + io->readbuf.head;
  528. io->readbuf.head += len;
  529. if (io->readbuf.head == io->readbuf.tail) {
  530. io->readbuf.head = io->readbuf.tail = 0;
  531. }
  532. hio_read_cb(io, buf, len);
  533. return len;
  534. }
  535. io->read_flags = HIO_READ_UNTIL_LENGTH;
  536. io->read_until_length = len;
  537. // NOTE: prepare readbuf
  538. if (hio_is_loop_readbuf(io) ||
  539. io->readbuf.len < len) {
  540. hio_alloc_readbuf(io, len);
  541. }
  542. return hio_read_once(io);
  543. }
  544. int hio_read_until_delim(hio_t* io, unsigned char delim) {
  545. if (io->readbuf.tail - io->readbuf.head > 0) {
  546. const unsigned char* sp = (const unsigned char*)io->readbuf.base + io->readbuf.head;
  547. const unsigned char* ep = (const unsigned char*)io->readbuf.base + io->readbuf.tail;
  548. const unsigned char* p = sp;
  549. while (p <= ep) {
  550. if (*p == delim) {
  551. int len = p - sp + 1;
  552. io->readbuf.head += len;
  553. if (io->readbuf.head == io->readbuf.tail) {
  554. io->readbuf.head = io->readbuf.tail = 0;
  555. }
  556. hio_read_cb(io, (void*)sp, len);
  557. return len;
  558. }
  559. ++p;
  560. }
  561. }
  562. io->read_flags = HIO_READ_UNTIL_DELIM;
  563. io->read_until_length = delim;
  564. // NOTE: prepare readbuf
  565. if (hio_is_loop_readbuf(io) ||
  566. io->readbuf.len < HLOOP_READ_BUFSIZE) {
  567. hio_alloc_readbuf(io, HLOOP_READ_BUFSIZE);
  568. }
  569. return hio_read_once(io);
  570. }
  571. //-----------------unpack---------------------------------------------
  572. void hio_set_unpack(hio_t* io, unpack_setting_t* setting) {
  573. hio_unset_unpack(io);
  574. if (setting == NULL) return;
  575. io->unpack_setting = setting;
  576. if (io->unpack_setting->package_max_length == 0) {
  577. io->unpack_setting->package_max_length = DEFAULT_PACKAGE_MAX_LENGTH;
  578. }
  579. if (io->unpack_setting->mode == UNPACK_BY_FIXED_LENGTH) {
  580. assert(io->unpack_setting->fixed_length != 0 &&
  581. io->unpack_setting->fixed_length <= io->unpack_setting->package_max_length);
  582. }
  583. else if (io->unpack_setting->mode == UNPACK_BY_DELIMITER) {
  584. if (io->unpack_setting->delimiter_bytes == 0) {
  585. io->unpack_setting->delimiter_bytes = strlen((char*)io->unpack_setting->delimiter);
  586. }
  587. }
  588. else if (io->unpack_setting->mode == UNPACK_BY_LENGTH_FIELD) {
  589. assert(io->unpack_setting->body_offset >=
  590. io->unpack_setting->length_field_offset +
  591. io->unpack_setting->length_field_bytes);
  592. }
  593. // NOTE: unpack must have own readbuf
  594. if (io->unpack_setting->mode == UNPACK_BY_FIXED_LENGTH) {
  595. io->readbuf.len = io->unpack_setting->fixed_length;
  596. } else {
  597. io->readbuf.len = HLOOP_READ_BUFSIZE;
  598. }
  599. hio_alloc_readbuf(io, io->readbuf.len);
  600. }
  601. void hio_unset_unpack(hio_t* io) {
  602. if (io->unpack_setting) {
  603. io->unpack_setting = NULL;
  604. // NOTE: unpack has own readbuf
  605. hio_free_readbuf(io);
  606. }
  607. }
  608. //-----------------upstream---------------------------------------------
  609. void hio_read_upstream(hio_t* io) {
  610. hio_t* upstream_io = io->upstream_io;
  611. if (upstream_io) {
  612. hio_read(io);
  613. hio_read(upstream_io);
  614. }
  615. }
  616. void hio_write_upstream(hio_t* io, void* buf, int bytes) {
  617. hio_t* upstream_io = io->upstream_io;
  618. if (upstream_io) {
  619. hio_write(upstream_io, buf, bytes);
  620. }
  621. }
  622. void hio_close_upstream(hio_t* io) {
  623. hio_t* upstream_io = io->upstream_io;
  624. if (upstream_io) {
  625. hio_close(upstream_io);
  626. }
  627. }
  628. void hio_setup_upstream(hio_t* io1, hio_t* io2) {
  629. io1->upstream_io = io2;
  630. io2->upstream_io = io1;
  631. hio_setcb_read(io1, hio_write_upstream);
  632. hio_setcb_read(io2, hio_write_upstream);
  633. }
  634. hio_t* hio_get_upstream(hio_t* io) {
  635. return io->upstream_io;
  636. }
  637. hio_t* hio_setup_tcp_upstream(hio_t* io, const char* host, int port, int ssl) {
  638. hio_t* upstream_io = hio_create_socket(io->loop, host, port, HIO_TYPE_TCP, HIO_CLIENT_SIDE);
  639. if (upstream_io == NULL) return NULL;
  640. if (ssl) hio_enable_ssl(upstream_io);
  641. hio_setup_upstream(io, upstream_io);
  642. hio_setcb_close(io, hio_close_upstream);
  643. hio_setcb_close(upstream_io, hio_close_upstream);
  644. hconnect(io->loop, upstream_io->fd, hio_read_upstream);
  645. return upstream_io;
  646. }
  647. hio_t* hio_setup_udp_upstream(hio_t* io, const char* host, int port) {
  648. hio_t* upstream_io = hio_create_socket(io->loop, host, port, HIO_TYPE_UDP, HIO_CLIENT_SIDE);
  649. if (upstream_io == NULL) return NULL;
  650. hio_setup_upstream(io, upstream_io);
  651. hio_read_upstream(io);
  652. return upstream_io;
  653. }