hevent.c 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743
  1. #include "hevent.h"
  2. #include "hsocket.h"
  3. #include "hatomic.h"
  4. #include "hlog.h"
  5. uint64_t hloop_next_event_id() {
  6. static hatomic_t s_id = HATOMIC_VAR_INIT(0);
  7. return ++s_id;
  8. }
  9. uint32_t hio_next_id() {
  10. static hatomic_t s_id = HATOMIC_VAR_INIT(0);
  11. return ++s_id;
  12. }
  13. static void fill_io_type(hio_t* io) {
  14. int type = 0;
  15. socklen_t optlen = sizeof(int);
  16. int ret = getsockopt(io->fd, SOL_SOCKET, SO_TYPE, (char*)&type, &optlen);
  17. printd("getsockopt SO_TYPE fd=%d ret=%d type=%d errno=%d\n", io->fd, ret, type, socket_errno());
  18. if (ret == 0) {
  19. switch (type) {
  20. case SOCK_STREAM: io->io_type = HIO_TYPE_TCP; break;
  21. case SOCK_DGRAM: io->io_type = HIO_TYPE_UDP; break;
  22. case SOCK_RAW: io->io_type = HIO_TYPE_IP; break;
  23. default: io->io_type = HIO_TYPE_SOCKET; break;
  24. }
  25. }
  26. else if (socket_errno() == ENOTSOCK) {
  27. switch (io->fd) {
  28. case 0: io->io_type = HIO_TYPE_STDIN; break;
  29. case 1: io->io_type = HIO_TYPE_STDOUT; break;
  30. case 2: io->io_type = HIO_TYPE_STDERR; break;
  31. default: io->io_type = HIO_TYPE_FILE; break;
  32. }
  33. }
  34. else {
  35. io->io_type = HIO_TYPE_TCP;
  36. }
  37. }
  38. static void hio_socket_init(hio_t* io) {
  39. if (io->io_type & HIO_TYPE_SOCK_RAW || io->io_type & HIO_TYPE_SOCK_DGRAM) {
  40. // NOTE: sendto multiple peeraddr cannot use io->write_queue
  41. blocking(io->fd);
  42. } else {
  43. nonblocking(io->fd);
  44. }
  45. // fill io->localaddr io->peeraddr
  46. if (io->localaddr == NULL) {
  47. HV_ALLOC(io->localaddr, sizeof(sockaddr_u));
  48. }
  49. if (io->peeraddr == NULL) {
  50. HV_ALLOC(io->peeraddr, sizeof(sockaddr_u));
  51. }
  52. socklen_t addrlen = sizeof(sockaddr_u);
  53. int ret = getsockname(io->fd, io->localaddr, &addrlen);
  54. printd("getsockname fd=%d ret=%d errno=%d\n", io->fd, ret, socket_errno());
  55. // NOTE: udp peeraddr set by recvfrom/sendto
  56. if (io->io_type & HIO_TYPE_SOCK_STREAM) {
  57. addrlen = sizeof(sockaddr_u);
  58. ret = getpeername(io->fd, io->peeraddr, &addrlen);
  59. printd("getpeername fd=%d ret=%d errno=%d\n", io->fd, ret, socket_errno());
  60. }
  61. }
  62. void hio_init(hio_t* io) {
  63. // alloc localaddr,peeraddr when hio_socket_init
  64. /*
  65. if (io->localaddr == NULL) {
  66. HV_ALLOC(io->localaddr, sizeof(sockaddr_u));
  67. }
  68. if (io->peeraddr == NULL) {
  69. HV_ALLOC(io->peeraddr, sizeof(sockaddr_u));
  70. }
  71. */
  72. // write_queue init when hwrite try_write failed
  73. // write_queue_init(&io->write_queue, 4);
  74. hrecursive_mutex_init(&io->write_mutex);
  75. }
  76. void hio_ready(hio_t* io) {
  77. if (io->ready) return;
  78. // flags
  79. io->ready = 1;
  80. io->closed = 0;
  81. io->accept = io->connect = io->connectex = 0;
  82. io->recv = io->send = 0;
  83. io->recvfrom = io->sendto = 0;
  84. io->close = 0;
  85. // public:
  86. io->id = hio_next_id();
  87. io->io_type = HIO_TYPE_UNKNOWN;
  88. io->error = 0;
  89. io->events = io->revents = 0;
  90. // readbuf
  91. io->alloced_readbuf = 0;
  92. io->readbuf.base = io->loop->readbuf.base;
  93. io->readbuf.len = io->loop->readbuf.len;
  94. io->readbuf.offset = 0;
  95. io->read_once = 0;
  96. io->read_until = 0;
  97. io->small_readbytes_cnt = 0;
  98. // write_queue
  99. io->write_queue_bytes = 0;
  100. // callbacks
  101. io->read_cb = NULL;
  102. io->write_cb = NULL;
  103. io->close_cb = NULL;
  104. io->accept_cb = NULL;
  105. io->connect_cb = NULL;
  106. // timers
  107. io->connect_timeout = 0;
  108. io->connect_timer = NULL;
  109. io->close_timeout = 0;
  110. io->close_timer = NULL;
  111. io->keepalive_timeout = 0;
  112. io->keepalive_timer = NULL;
  113. io->heartbeat_interval = 0;
  114. io->heartbeat_fn = NULL;
  115. io->heartbeat_timer = NULL;
  116. // upstream
  117. io->upstream_io = NULL;
  118. // unpack
  119. io->unpack_setting = NULL;
  120. // ssl
  121. io->ssl = NULL;
  122. // context
  123. io->ctx = NULL;
  124. // private:
  125. #if defined(EVENT_POLL) || defined(EVENT_KQUEUE)
  126. io->event_index[0] = io->event_index[1] = -1;
  127. #endif
  128. #ifdef EVENT_IOCP
  129. io->hovlp = NULL;
  130. #endif
  131. // io_type
  132. fill_io_type(io);
  133. if (io->io_type & HIO_TYPE_SOCKET) {
  134. hio_socket_init(io);
  135. }
  136. #if WITH_RUDP
  137. if (io->io_type & HIO_TYPE_SOCK_RAW || io->io_type & HIO_TYPE_SOCK_DGRAM) {
  138. rudp_init(&io->rudp);
  139. }
  140. #endif
  141. }
  142. void hio_done(hio_t* io) {
  143. if (!io->ready) return;
  144. io->ready = 0;
  145. hio_del(io, HV_RDWR);
  146. // readbuf
  147. hio_free_readbuf(io);
  148. // write_queue
  149. offset_buf_t* pbuf = NULL;
  150. hrecursive_mutex_lock(&io->write_mutex);
  151. while (!write_queue_empty(&io->write_queue)) {
  152. pbuf = write_queue_front(&io->write_queue);
  153. HV_FREE(pbuf->base);
  154. write_queue_pop_front(&io->write_queue);
  155. }
  156. write_queue_cleanup(&io->write_queue);
  157. hrecursive_mutex_unlock(&io->write_mutex);
  158. #if WITH_RUDP
  159. if (io->io_type & HIO_TYPE_SOCK_RAW || io->io_type & HIO_TYPE_SOCK_DGRAM) {
  160. rudp_cleanup(&io->rudp);
  161. }
  162. #endif
  163. }
  164. void hio_free(hio_t* io) {
  165. if (io == NULL) return;
  166. hio_close(io);
  167. hrecursive_mutex_destroy(&io->write_mutex);
  168. HV_FREE(io->localaddr);
  169. HV_FREE(io->peeraddr);
  170. HV_FREE(io);
  171. }
  172. bool hio_is_opened(hio_t* io) {
  173. if (io == NULL) return false;
  174. return io->ready == 1 && io->closed == 0;
  175. }
  176. bool hio_is_closed(hio_t* io) {
  177. if (io == NULL) return true;
  178. return io->ready == 0 && io->closed == 1;
  179. }
  180. uint32_t hio_id (hio_t* io) {
  181. return io->id;
  182. }
  183. int hio_fd(hio_t* io) {
  184. return io->fd;
  185. }
  186. hio_type_e hio_type(hio_t* io) {
  187. return io->io_type;
  188. }
  189. int hio_error(hio_t* io) {
  190. return io->error;
  191. }
  192. int hio_events(hio_t* io) {
  193. return io->events;
  194. }
  195. int hio_revents(hio_t* io) {
  196. return io->revents;
  197. }
  198. struct sockaddr* hio_localaddr(hio_t* io) {
  199. return io->localaddr;
  200. }
  201. struct sockaddr* hio_peeraddr(hio_t* io) {
  202. return io->peeraddr;
  203. }
  204. void hio_set_context(hio_t* io, void* ctx) {
  205. io->ctx = ctx;
  206. }
  207. void* hio_context(hio_t* io) {
  208. return io->ctx;
  209. }
  210. size_t hio_read_bufsize(hio_t* io) {
  211. return io->readbuf.len;
  212. }
  213. size_t hio_write_bufsize(hio_t* io) {
  214. return io->write_queue_bytes;
  215. }
  216. haccept_cb hio_getcb_accept(hio_t* io) {
  217. return io->accept_cb;
  218. }
  219. hconnect_cb hio_getcb_connect(hio_t* io) {
  220. return io->connect_cb;
  221. }
  222. hread_cb hio_getcb_read(hio_t* io) {
  223. return io->read_cb;
  224. }
  225. hwrite_cb hio_getcb_write(hio_t* io) {
  226. return io->write_cb;
  227. }
  228. hclose_cb hio_getcb_close(hio_t* io) {
  229. return io->close_cb;
  230. }
  231. void hio_setcb_accept(hio_t* io, haccept_cb accept_cb) {
  232. io->accept_cb = accept_cb;
  233. }
  234. void hio_setcb_connect(hio_t* io, hconnect_cb connect_cb) {
  235. io->connect_cb = connect_cb;
  236. }
  237. void hio_setcb_read(hio_t* io, hread_cb read_cb) {
  238. io->read_cb = read_cb;
  239. }
  240. void hio_setcb_write(hio_t* io, hwrite_cb write_cb) {
  241. io->write_cb = write_cb;
  242. }
  243. void hio_setcb_close(hio_t* io, hclose_cb close_cb) {
  244. io->close_cb = close_cb;
  245. }
  246. void hio_accept_cb(hio_t* io) {
  247. /*
  248. char localaddrstr[SOCKADDR_STRLEN] = {0};
  249. char peeraddrstr[SOCKADDR_STRLEN] = {0};
  250. printd("accept connfd=%d [%s] <= [%s]\n", io->fd,
  251. SOCKADDR_STR(io->localaddr, localaddrstr),
  252. SOCKADDR_STR(io->peeraddr, peeraddrstr));
  253. */
  254. if (io->accept_cb) {
  255. // printd("accept_cb------\n");
  256. io->accept_cb(io);
  257. // printd("accept_cb======\n");
  258. }
  259. }
  260. void hio_connect_cb(hio_t* io) {
  261. /*
  262. char localaddrstr[SOCKADDR_STRLEN] = {0};
  263. char peeraddrstr[SOCKADDR_STRLEN] = {0};
  264. printd("connect connfd=%d [%s] => [%s]\n", io->fd,
  265. SOCKADDR_STR(io->localaddr, localaddrstr),
  266. SOCKADDR_STR(io->peeraddr, peeraddrstr));
  267. */
  268. if (io->connect_cb) {
  269. // printd("connect_cb------\n");
  270. io->connect_cb(io);
  271. // printd("connect_cb======\n");
  272. }
  273. }
  274. void hio_read_cb(hio_t* io, void* buf, int len) {
  275. if (io->read_cb) {
  276. // printd("read_cb------\n");
  277. io->read_cb(io, buf, len);
  278. // printd("read_cb======\n");
  279. }
  280. // for readbuf autosize
  281. if (hio_is_alloced_readbuf(io) && io->readbuf.len > READ_BUFSIZE_HIGH_WATER) {
  282. size_t small_size = io->readbuf.len / 2;
  283. if (len < small_size) {
  284. ++io->small_readbytes_cnt;
  285. } else {
  286. io->small_readbytes_cnt = 0;
  287. }
  288. }
  289. }
  290. void hio_write_cb(hio_t* io, const void* buf, int len) {
  291. if (io->write_cb) {
  292. // printd("write_cb------\n");
  293. io->write_cb(io, buf, len);
  294. // printd("write_cb======\n");
  295. }
  296. }
  297. void hio_close_cb(hio_t* io) {
  298. if (io->close_cb) {
  299. // printd("close_cb------\n");
  300. io->close_cb(io);
  301. // printd("close_cb======\n");
  302. }
  303. }
  304. void hio_set_type(hio_t* io, hio_type_e type) {
  305. io->io_type = type;
  306. }
  307. void hio_set_localaddr(hio_t* io, struct sockaddr* addr, int addrlen) {
  308. if (io->localaddr == NULL) {
  309. HV_ALLOC(io->localaddr, sizeof(sockaddr_u));
  310. }
  311. memcpy(io->localaddr, addr, addrlen);
  312. }
  313. void hio_set_peeraddr (hio_t* io, struct sockaddr* addr, int addrlen) {
  314. if (io->peeraddr == NULL) {
  315. HV_ALLOC(io->peeraddr, sizeof(sockaddr_u));
  316. }
  317. memcpy(io->peeraddr, addr, addrlen);
  318. }
  319. int hio_enable_ssl(hio_t* io) {
  320. io->io_type = HIO_TYPE_SSL;
  321. return 0;
  322. }
  323. bool hio_is_ssl(hio_t* io) {
  324. return io->io_type == HIO_TYPE_SSL;
  325. }
  326. hssl_t hio_get_ssl(hio_t* io) {
  327. return io->ssl;
  328. }
  329. int hio_set_ssl(hio_t* io, hssl_t ssl) {
  330. io->io_type = HIO_TYPE_SSL;
  331. io->ssl = ssl;
  332. return 0;
  333. }
  334. void hio_set_readbuf(hio_t* io, void* buf, size_t len) {
  335. assert(io && buf && len != 0);
  336. hio_free_readbuf(io);
  337. io->readbuf.base = (char*)buf;
  338. io->readbuf.len = len;
  339. io->readbuf.offset = 0;
  340. io->alloced_readbuf = 0;
  341. }
  342. void hio_del_connect_timer(hio_t* io) {
  343. if (io->connect_timer) {
  344. htimer_del(io->connect_timer);
  345. io->connect_timer = NULL;
  346. io->connect_timeout = 0;
  347. }
  348. }
  349. void hio_del_close_timer(hio_t* io) {
  350. if (io->close_timer) {
  351. htimer_del(io->close_timer);
  352. io->close_timer = NULL;
  353. io->close_timeout = 0;
  354. }
  355. }
  356. void hio_del_keepalive_timer(hio_t* io) {
  357. if (io->keepalive_timer) {
  358. htimer_del(io->keepalive_timer);
  359. io->keepalive_timer = NULL;
  360. io->keepalive_timeout = 0;
  361. }
  362. }
  363. void hio_del_heartbeat_timer(hio_t* io) {
  364. if (io->heartbeat_timer) {
  365. htimer_del(io->heartbeat_timer);
  366. io->heartbeat_timer = NULL;
  367. io->heartbeat_interval = 0;
  368. io->heartbeat_fn = NULL;
  369. }
  370. }
  371. void hio_set_connect_timeout(hio_t* io, int timeout_ms) {
  372. io->connect_timeout = timeout_ms;
  373. }
  374. void hio_set_close_timeout(hio_t* io, int timeout_ms) {
  375. io->close_timeout = timeout_ms;
  376. }
  377. static void __keepalive_timeout_cb(htimer_t* timer) {
  378. hio_t* io = (hio_t*)timer->privdata;
  379. if (io) {
  380. char localaddrstr[SOCKADDR_STRLEN] = {0};
  381. char peeraddrstr[SOCKADDR_STRLEN] = {0};
  382. hlogw("keepalive timeout [%s] <=> [%s]",
  383. SOCKADDR_STR(io->localaddr, localaddrstr),
  384. SOCKADDR_STR(io->peeraddr, peeraddrstr));
  385. io->error = ETIMEDOUT;
  386. hio_close(io);
  387. }
  388. }
  389. void hio_set_keepalive_timeout(hio_t* io, int timeout_ms) {
  390. if (timeout_ms == 0) {
  391. // del
  392. hio_del_keepalive_timer(io);
  393. return;
  394. }
  395. if (io->keepalive_timer) {
  396. // reset
  397. ((struct htimeout_s*)io->keepalive_timer)->timeout = timeout_ms;
  398. htimer_reset(io->keepalive_timer);
  399. } else {
  400. // add
  401. io->keepalive_timer = htimer_add(io->loop, __keepalive_timeout_cb, timeout_ms, 1);
  402. io->keepalive_timer->privdata = io;
  403. }
  404. io->keepalive_timeout = timeout_ms;
  405. }
  406. static void __heartbeat_timer_cb(htimer_t* timer) {
  407. hio_t* io = (hio_t*)timer->privdata;
  408. if (io && io->heartbeat_fn) {
  409. io->heartbeat_fn(io);
  410. }
  411. }
  412. void hio_set_heartbeat(hio_t* io, int interval_ms, hio_send_heartbeat_fn fn) {
  413. if (interval_ms == 0) {
  414. // del
  415. hio_del_heartbeat_timer(io);
  416. return;
  417. }
  418. if (io->heartbeat_timer) {
  419. // reset
  420. ((struct htimeout_s*)io->heartbeat_timer)->timeout = interval_ms;
  421. htimer_reset(io->heartbeat_timer);
  422. } else {
  423. // add
  424. io->heartbeat_timer = htimer_add(io->loop, __heartbeat_timer_cb, interval_ms, INFINITE);
  425. io->heartbeat_timer->privdata = io;
  426. }
  427. io->heartbeat_interval = interval_ms;
  428. io->heartbeat_fn = fn;
  429. }
  430. void hio_alloc_readbuf(hio_t* io, int len) {
  431. if (hio_is_alloced_readbuf(io)) {
  432. io->readbuf.base = (char*)safe_realloc(io->readbuf.base, len, io->readbuf.len);
  433. } else {
  434. HV_ALLOC(io->readbuf.base, len);
  435. }
  436. io->readbuf.len = len;
  437. io->alloced_readbuf = 1;
  438. }
  439. void hio_free_readbuf(hio_t* io) {
  440. if (hio_is_alloced_readbuf(io)) {
  441. HV_FREE(io->readbuf.base);
  442. io->alloced_readbuf = 0;
  443. // reset to loop->readbuf
  444. io->readbuf.base = io->loop->readbuf.base;
  445. io->readbuf.len = io->loop->readbuf.len;
  446. }
  447. }
  448. int hio_read_once (hio_t* io) {
  449. io->read_once = 1;
  450. return hio_read_start(io);
  451. }
  452. int hio_read_until(hio_t* io, int len) {
  453. io->read_until = len;
  454. // NOTE: prepare readbuf
  455. if (hio_is_loop_readbuf(io) ||
  456. io->readbuf.len < len) {
  457. hio_alloc_readbuf(io, len);
  458. }
  459. return hio_read_once(io);
  460. }
  461. //-----------------unpack---------------------------------------------
  462. void hio_set_unpack(hio_t* io, unpack_setting_t* setting) {
  463. hio_unset_unpack(io);
  464. if (setting == NULL) return;
  465. io->unpack_setting = setting;
  466. if (io->unpack_setting->package_max_length == 0) {
  467. io->unpack_setting->package_max_length = DEFAULT_PACKAGE_MAX_LENGTH;
  468. }
  469. if (io->unpack_setting->mode == UNPACK_BY_FIXED_LENGTH) {
  470. assert(io->unpack_setting->fixed_length != 0 &&
  471. io->unpack_setting->fixed_length <= io->unpack_setting->package_max_length);
  472. }
  473. else if (io->unpack_setting->mode == UNPACK_BY_DELIMITER) {
  474. if (io->unpack_setting->delimiter_bytes == 0) {
  475. io->unpack_setting->delimiter_bytes = strlen((char*)io->unpack_setting->delimiter);
  476. }
  477. }
  478. else if (io->unpack_setting->mode == UNPACK_BY_LENGTH_FIELD) {
  479. assert(io->unpack_setting->body_offset >=
  480. io->unpack_setting->length_field_offset +
  481. io->unpack_setting->length_field_bytes);
  482. }
  483. // NOTE: unpack must have own readbuf
  484. if (io->unpack_setting->mode == UNPACK_BY_FIXED_LENGTH) {
  485. io->readbuf.len = io->unpack_setting->fixed_length;
  486. } else {
  487. io->readbuf.len = HLOOP_READ_BUFSIZE;
  488. }
  489. hio_alloc_readbuf(io, io->readbuf.len);
  490. }
  491. void hio_unset_unpack(hio_t* io) {
  492. if (io->unpack_setting) {
  493. io->unpack_setting = NULL;
  494. // NOTE: unpack has own readbuf
  495. hio_free_readbuf(io);
  496. }
  497. }
  498. //-----------------upstream---------------------------------------------
  499. void hio_read_upstream(hio_t* io) {
  500. hio_t* upstream_io = io->upstream_io;
  501. if (upstream_io) {
  502. hio_read(io);
  503. hio_read(upstream_io);
  504. }
  505. }
  506. void hio_write_upstream(hio_t* io, void* buf, int bytes) {
  507. hio_t* upstream_io = io->upstream_io;
  508. if (upstream_io) {
  509. hio_write(upstream_io, buf, bytes);
  510. }
  511. }
  512. void hio_close_upstream(hio_t* io) {
  513. hio_t* upstream_io = io->upstream_io;
  514. if (upstream_io) {
  515. hio_close(upstream_io);
  516. }
  517. }
  518. void hio_setup_upstream(hio_t* io1, hio_t* io2) {
  519. io1->upstream_io = io2;
  520. io2->upstream_io = io1;
  521. hio_setcb_read(io1, hio_write_upstream);
  522. hio_setcb_read(io2, hio_write_upstream);
  523. }
  524. hio_t* hio_get_upstream(hio_t* io) {
  525. return io->upstream_io;
  526. }
  527. hio_t* hio_setup_tcp_upstream(hio_t* io, const char* host, int port, int ssl) {
  528. hio_t* upstream_io = hio_create_socket(io->loop, host, port, HIO_TYPE_TCP, HIO_CLIENT_SIDE);
  529. if (upstream_io == NULL) return NULL;
  530. if (ssl) hio_enable_ssl(upstream_io);
  531. hio_setup_upstream(io, upstream_io);
  532. hio_setcb_close(io, hio_close_upstream);
  533. hio_setcb_close(upstream_io, hio_close_upstream);
  534. hconnect(io->loop, upstream_io->fd, hio_read_upstream);
  535. return upstream_io;
  536. }
  537. hio_t* hio_setup_udp_upstream(hio_t* io, const char* host, int port) {
  538. hio_t* upstream_io = hio_create_socket(io->loop, host, port, HIO_TYPE_UDP, HIO_CLIENT_SIDE);
  539. if (upstream_io == NULL) return NULL;
  540. hio_setup_upstream(io, upstream_io);
  541. hio_read_upstream(io);
  542. return upstream_io;
  543. }
  544. #if WITH_RUDP
  545. rudp_entry_t* hio_get_rudp(hio_t* io) {
  546. rudp_entry_t* rudp = rudp_get(&io->rudp, io->peeraddr);
  547. rudp->io = io;
  548. return rudp;
  549. }
  550. static void hio_close_rudp_event_cb(hevent_t* ev) {
  551. rudp_entry_t* entry = (rudp_entry_t*)ev->userdata;
  552. rudp_del(&entry->io->rudp, (struct sockaddr*)&entry->addr);
  553. // rudp_entry_free(entry);
  554. }
  555. int hio_close_rudp(hio_t* io, struct sockaddr* peeraddr) {
  556. if (peeraddr == NULL) peeraddr = io->peeraddr;
  557. // NOTE: do rudp_del for thread-safe
  558. rudp_entry_t* entry = rudp_get(&io->rudp, peeraddr);
  559. // NOTE: just rudp_remove first, do rudp_entry_free async for safe.
  560. // rudp_entry_t* entry = rudp_remove(&io->rudp, peeraddr);
  561. if (entry) {
  562. hevent_t ev;
  563. memset(&ev, 0, sizeof(ev));
  564. ev.cb = hio_close_rudp_event_cb;
  565. ev.userdata = entry;
  566. ev.priority = HEVENT_HIGH_PRIORITY;
  567. hloop_post_event(io->loop, &ev);
  568. }
  569. return 0;
  570. }
  571. #endif
  572. #if WITH_KCP
  573. static kcp_setting_t s_kcp_setting;
  574. static int __kcp_output(const char* buf, int len, ikcpcb* ikcp, void* userdata) {
  575. // printf("ikcp_output len=%d\n", len);
  576. rudp_entry_t* rudp = (rudp_entry_t*)userdata;
  577. assert(rudp != NULL && rudp->io != NULL);
  578. int nsend = sendto(rudp->io->fd, buf, len, 0, &rudp->addr.sa, SOCKADDR_LEN(&rudp->addr));
  579. // printf("sendto nsend=%d\n", nsend);
  580. return nsend;
  581. }
  582. static void __kcp_update_timer_cb(htimer_t* timer) {
  583. rudp_entry_t* rudp = (rudp_entry_t*)timer->privdata;
  584. assert(rudp != NULL && rudp->io != NULL && rudp->kcp.ikcp != NULL);
  585. ikcp_update(rudp->kcp.ikcp, (IUINT32)(rudp->io->loop->cur_hrtime / 1000));
  586. }
  587. int hio_set_kcp(hio_t* io, kcp_setting_t* setting) {
  588. io->io_type = HIO_TYPE_KCP;
  589. io->kcp_setting = setting;
  590. return 0;
  591. }
  592. kcp_t* hio_get_kcp(hio_t* io) {
  593. rudp_entry_t* rudp = hio_get_rudp(io);
  594. assert(rudp != NULL);
  595. kcp_t* kcp = &rudp->kcp;
  596. if (kcp->ikcp != NULL) return kcp;
  597. if (io->kcp_setting == NULL) {
  598. io->kcp_setting = &s_kcp_setting;
  599. }
  600. kcp_setting_t* setting = io->kcp_setting;
  601. assert(io->kcp_setting != NULL);
  602. kcp->ikcp = ikcp_create(setting->conv, rudp);
  603. // printf("ikcp_create ikcp=%p\n", kcp->ikcp);
  604. kcp->ikcp->output = __kcp_output;
  605. if (setting->interval > 0) {
  606. ikcp_nodelay(kcp->ikcp, setting->nodelay, setting->interval, setting->fastresend, setting->nocwnd);
  607. }
  608. if (setting->sndwnd > 0 && setting->rcvwnd > 0) {
  609. ikcp_wndsize(kcp->ikcp, setting->sndwnd, setting->rcvwnd);
  610. }
  611. if (setting->mtu > 0) {
  612. ikcp_setmtu(kcp->ikcp, setting->mtu);
  613. }
  614. if (kcp->update_timer == NULL) {
  615. int update_interval = setting->update_interval;
  616. if (update_interval == 0) {
  617. update_interval = DEFAULT_KCP_UPDATE_INTERVAL;
  618. }
  619. kcp->update_timer = htimer_add(io->loop, __kcp_update_timer_cb, update_interval, INFINITE);
  620. kcp->update_timer->privdata = rudp;
  621. }
  622. // NOTE: alloc kcp->readbuf when hio_read_kcp
  623. return kcp;
  624. }
  625. int hio_write_kcp(hio_t* io, const void* buf, size_t len) {
  626. kcp_t* kcp = hio_get_kcp(io);
  627. int nsend = ikcp_send(kcp->ikcp, (const char*)buf, len);
  628. // printf("ikcp_send len=%d nsend=%d\n", (int)len, nsend);
  629. if (nsend < 0) {
  630. hio_close(io);
  631. }
  632. ikcp_update(kcp->ikcp, (IUINT32)io->loop->cur_hrtime / 1000);
  633. return nsend;
  634. }
  635. int hio_read_kcp (hio_t* io, void* buf, int readbytes) {
  636. kcp_t* kcp = hio_get_kcp(io);
  637. // printf("ikcp_input len=%d\n", readbytes);
  638. ikcp_input(kcp->ikcp, (const char*)buf, readbytes);
  639. if (kcp->readbuf.base == NULL || kcp->readbuf.len == 0) {
  640. kcp->readbuf.len = DEFAULT_KCP_READ_BUFSIZE;
  641. HV_ALLOC(kcp->readbuf.base, kcp->readbuf.len);
  642. }
  643. int ret = 0;
  644. while (1) {
  645. int nrecv = ikcp_recv(kcp->ikcp, kcp->readbuf.base, kcp->readbuf.len);
  646. // printf("ikcp_recv nrecv=%d\n", nrecv);
  647. if (nrecv < 0) break;
  648. hio_read_cb(io, kcp->readbuf.base, nrecv);
  649. ret += nrecv;
  650. }
  651. return ret;
  652. }
  653. #endif