1
0

hevent.c 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602
  1. #include "hevent.h"
  2. #include "hsocket.h"
  3. #include "hatomic.h"
  4. #include "hlog.h"
  5. uint64_t hloop_next_event_id() {
  6. static hatomic_t s_id = HATOMIC_VAR_INIT(0);
  7. return ++s_id;
  8. }
  9. uint32_t hio_next_id() {
  10. static hatomic_t s_id = HATOMIC_VAR_INIT(0);
  11. return ++s_id;
  12. }
  13. static void fill_io_type(hio_t* io) {
  14. int type = 0;
  15. socklen_t optlen = sizeof(int);
  16. int ret = getsockopt(io->fd, SOL_SOCKET, SO_TYPE, (char*)&type, &optlen);
  17. printd("getsockopt SO_TYPE fd=%d ret=%d type=%d errno=%d\n", io->fd, ret, type, socket_errno());
  18. if (ret == 0) {
  19. switch (type) {
  20. case SOCK_STREAM: io->io_type = HIO_TYPE_TCP; break;
  21. case SOCK_DGRAM: io->io_type = HIO_TYPE_UDP; break;
  22. case SOCK_RAW: io->io_type = HIO_TYPE_IP; break;
  23. default: io->io_type = HIO_TYPE_SOCKET; break;
  24. }
  25. }
  26. else if (socket_errno() == ENOTSOCK) {
  27. switch (io->fd) {
  28. case 0: io->io_type = HIO_TYPE_STDIN; break;
  29. case 1: io->io_type = HIO_TYPE_STDOUT; break;
  30. case 2: io->io_type = HIO_TYPE_STDERR; break;
  31. default: io->io_type = HIO_TYPE_FILE; break;
  32. }
  33. }
  34. else {
  35. io->io_type = HIO_TYPE_TCP;
  36. }
  37. }
  38. static void hio_socket_init(hio_t* io) {
  39. // nonblocking
  40. nonblocking(io->fd);
  41. // fill io->localaddr io->peeraddr
  42. if (io->localaddr == NULL) {
  43. HV_ALLOC(io->localaddr, sizeof(sockaddr_u));
  44. }
  45. if (io->peeraddr == NULL) {
  46. HV_ALLOC(io->peeraddr, sizeof(sockaddr_u));
  47. }
  48. socklen_t addrlen = sizeof(sockaddr_u);
  49. int ret = getsockname(io->fd, io->localaddr, &addrlen);
  50. printd("getsockname fd=%d ret=%d errno=%d\n", io->fd, ret, socket_errno());
  51. // NOTE:
  52. // tcp_server peeraddr set by accept
  53. // udp_server peeraddr set by recvfrom
  54. // tcp_client/udp_client peeraddr set by hio_setpeeraddr
  55. if (io->io_type == HIO_TYPE_TCP || io->io_type == HIO_TYPE_SSL) {
  56. // tcp acceptfd
  57. addrlen = sizeof(sockaddr_u);
  58. ret = getpeername(io->fd, io->peeraddr, &addrlen);
  59. printd("getpeername fd=%d ret=%d errno=%d\n", io->fd, ret, socket_errno());
  60. }
  61. }
  62. void hio_init(hio_t* io) {
  63. // alloc localaddr,peeraddr when hio_socket_init
  64. /*
  65. if (io->localaddr == NULL) {
  66. HV_ALLOC(io->localaddr, sizeof(sockaddr_u));
  67. }
  68. if (io->peeraddr == NULL) {
  69. HV_ALLOC(io->peeraddr, sizeof(sockaddr_u));
  70. }
  71. */
  72. // write_queue init when hwrite try_write failed
  73. // write_queue_init(&io->write_queue, 4);
  74. hrecursive_mutex_init(&io->write_mutex);
  75. }
  76. void hio_ready(hio_t* io) {
  77. if (io->ready) return;
  78. // flags
  79. io->ready = 1;
  80. io->closed = 0;
  81. io->accept = io->connect = io->connectex = 0;
  82. io->recv = io->send = 0;
  83. io->recvfrom = io->sendto = 0;
  84. io->close = 0;
  85. // public:
  86. io->id = hio_next_id();
  87. io->io_type = HIO_TYPE_UNKNOWN;
  88. io->error = 0;
  89. io->events = io->revents = 0;
  90. // readbuf
  91. io->alloced_readbuf = 0;
  92. io->readbuf.base = io->loop->readbuf.base;
  93. io->readbuf.len = io->loop->readbuf.len;
  94. io->readbuf.offset = 0;
  95. io->read_once = 0;
  96. io->read_until = 0;
  97. io->small_readbytes_cnt = 0;
  98. // write_queue
  99. io->write_queue_bytes = 0;
  100. // callbacks
  101. io->read_cb = NULL;
  102. io->write_cb = NULL;
  103. io->close_cb = NULL;
  104. io->accept_cb = NULL;
  105. io->connect_cb = NULL;
  106. // timers
  107. io->connect_timeout = 0;
  108. io->connect_timer = NULL;
  109. io->close_timeout = 0;
  110. io->close_timer = NULL;
  111. io->keepalive_timeout = 0;
  112. io->keepalive_timer = NULL;
  113. io->heartbeat_interval = 0;
  114. io->heartbeat_fn = NULL;
  115. io->heartbeat_timer = NULL;
  116. // upstream
  117. io->upstream_io = NULL;
  118. // unpack
  119. io->unpack_setting = NULL;
  120. // private:
  121. io->event_index[0] = io->event_index[1] = -1;
  122. io->hovlp = NULL;
  123. io->ssl = NULL;
  124. // io_type
  125. fill_io_type(io);
  126. if (io->io_type & HIO_TYPE_SOCKET) {
  127. hio_socket_init(io);
  128. }
  129. }
  130. void hio_done(hio_t* io) {
  131. if (!io->ready) return;
  132. io->ready = 0;
  133. hio_del(io, HV_RDWR);
  134. // readbuf
  135. hio_free_readbuf(io);
  136. // write_queue
  137. offset_buf_t* pbuf = NULL;
  138. hrecursive_mutex_lock(&io->write_mutex);
  139. while (!write_queue_empty(&io->write_queue)) {
  140. pbuf = write_queue_front(&io->write_queue);
  141. HV_FREE(pbuf->base);
  142. write_queue_pop_front(&io->write_queue);
  143. }
  144. write_queue_cleanup(&io->write_queue);
  145. hrecursive_mutex_unlock(&io->write_mutex);
  146. }
  147. void hio_free(hio_t* io) {
  148. if (io == NULL) return;
  149. // NOTE: call hio_done to cleanup write_queue
  150. hio_done(io);
  151. // NOTE: call hio_close to call hclose_cb
  152. hio_close(io);
  153. hrecursive_mutex_destroy(&io->write_mutex);
  154. HV_FREE(io->localaddr);
  155. HV_FREE(io->peeraddr);
  156. HV_FREE(io);
  157. }
  158. bool hio_is_opened(hio_t* io) {
  159. if (io == NULL) return false;
  160. return io->ready == 1 && io->closed == 0;
  161. }
  162. bool hio_is_closed(hio_t* io) {
  163. if (io == NULL) return true;
  164. return io->ready == 0 && io->closed == 1;
  165. }
  166. uint32_t hio_id (hio_t* io) {
  167. return io->id;
  168. }
  169. int hio_fd(hio_t* io) {
  170. return io->fd;
  171. }
  172. hio_type_e hio_type(hio_t* io) {
  173. return io->io_type;
  174. }
  175. int hio_error(hio_t* io) {
  176. return io->error;
  177. }
  178. int hio_events(hio_t* io) {
  179. return io->events;
  180. }
  181. int hio_revents(hio_t* io) {
  182. return io->revents;
  183. }
  184. struct sockaddr* hio_localaddr(hio_t* io) {
  185. return io->localaddr;
  186. }
  187. struct sockaddr* hio_peeraddr(hio_t* io) {
  188. return io->peeraddr;
  189. }
  190. void hio_set_context(hio_t* io, void* ctx) {
  191. io->ctx = ctx;
  192. }
  193. void* hio_context(hio_t* io) {
  194. return io->ctx;
  195. }
  196. haccept_cb hio_getcb_accept(hio_t* io) {
  197. return io->accept_cb;
  198. }
  199. hconnect_cb hio_getcb_connect(hio_t* io) {
  200. return io->connect_cb;
  201. }
  202. hread_cb hio_getcb_read(hio_t* io) {
  203. return io->read_cb;
  204. }
  205. hwrite_cb hio_getcb_write(hio_t* io) {
  206. return io->write_cb;
  207. }
  208. hclose_cb hio_getcb_close(hio_t* io) {
  209. return io->close_cb;
  210. }
  211. void hio_setcb_accept(hio_t* io, haccept_cb accept_cb) {
  212. io->accept_cb = accept_cb;
  213. }
  214. void hio_setcb_connect(hio_t* io, hconnect_cb connect_cb) {
  215. io->connect_cb = connect_cb;
  216. }
  217. void hio_setcb_read(hio_t* io, hread_cb read_cb) {
  218. io->read_cb = read_cb;
  219. }
  220. void hio_setcb_write(hio_t* io, hwrite_cb write_cb) {
  221. io->write_cb = write_cb;
  222. }
  223. void hio_setcb_close(hio_t* io, hclose_cb close_cb) {
  224. io->close_cb = close_cb;
  225. }
  226. void hio_accept_cb(hio_t* io) {
  227. /*
  228. char localaddrstr[SOCKADDR_STRLEN] = {0};
  229. char peeraddrstr[SOCKADDR_STRLEN] = {0};
  230. printd("accept connfd=%d [%s] <= [%s]\n", io->fd,
  231. SOCKADDR_STR(io->localaddr, localaddrstr),
  232. SOCKADDR_STR(io->peeraddr, peeraddrstr));
  233. */
  234. if (io->accept_cb) {
  235. // printd("accept_cb------\n");
  236. io->accept_cb(io);
  237. // printd("accept_cb======\n");
  238. }
  239. }
  240. void hio_connect_cb(hio_t* io) {
  241. /*
  242. char localaddrstr[SOCKADDR_STRLEN] = {0};
  243. char peeraddrstr[SOCKADDR_STRLEN] = {0};
  244. printd("connect connfd=%d [%s] => [%s]\n", io->fd,
  245. SOCKADDR_STR(io->localaddr, localaddrstr),
  246. SOCKADDR_STR(io->peeraddr, peeraddrstr));
  247. */
  248. if (io->connect_cb) {
  249. // printd("connect_cb------\n");
  250. io->connect_cb(io);
  251. // printd("connect_cb======\n");
  252. }
  253. }
  254. void hio_read_cb(hio_t* io, void* buf, int len) {
  255. if (io->read_cb) {
  256. // printd("read_cb------\n");
  257. io->read_cb(io, buf, len);
  258. // printd("read_cb======\n");
  259. }
  260. if (hio_is_alloced_readbuf(io) && io->readbuf.len > READ_BUFSIZE_HIGH_WATER) {
  261. // readbuf autosize
  262. size_t small_size = io->readbuf.len / 2;
  263. if (len < small_size) {
  264. if (++io->small_readbytes_cnt == 3) {
  265. io->small_readbytes_cnt = 0;
  266. io->readbuf.base = (char*)safe_realloc(io->readbuf.base, small_size, io->readbuf.len);
  267. io->readbuf.len = small_size;
  268. }
  269. }
  270. }
  271. }
  272. void hio_write_cb(hio_t* io, const void* buf, int len) {
  273. if (io->write_cb) {
  274. // printd("write_cb------\n");
  275. io->write_cb(io, buf, len);
  276. // printd("write_cb======\n");
  277. }
  278. }
  279. void hio_close_cb(hio_t* io) {
  280. if (io->close_cb) {
  281. // printd("close_cb------\n");
  282. io->close_cb(io);
  283. // printd("close_cb======\n");
  284. }
  285. }
  286. void hio_set_type(hio_t* io, hio_type_e type) {
  287. io->io_type = type;
  288. }
  289. void hio_set_localaddr(hio_t* io, struct sockaddr* addr, int addrlen) {
  290. if (io->localaddr == NULL) {
  291. HV_ALLOC(io->localaddr, sizeof(sockaddr_u));
  292. }
  293. memcpy(io->localaddr, addr, addrlen);
  294. }
  295. void hio_set_peeraddr (hio_t* io, struct sockaddr* addr, int addrlen) {
  296. if (io->peeraddr == NULL) {
  297. HV_ALLOC(io->peeraddr, sizeof(sockaddr_u));
  298. }
  299. memcpy(io->peeraddr, addr, addrlen);
  300. }
  301. int hio_enable_ssl(hio_t* io) {
  302. io->io_type = HIO_TYPE_SSL;
  303. return 0;
  304. }
  305. bool hio_is_ssl(hio_t* io) {
  306. return io->io_type == HIO_TYPE_SSL;
  307. }
  308. hssl_t hio_get_ssl(hio_t* io) {
  309. return io->ssl;
  310. }
  311. int hio_set_ssl(hio_t* io, hssl_t ssl) {
  312. io->io_type = HIO_TYPE_SSL;
  313. io->ssl = ssl;
  314. return 0;
  315. }
  316. void hio_set_readbuf(hio_t* io, void* buf, size_t len) {
  317. assert(io && buf && len != 0);
  318. hio_free_readbuf(io);
  319. io->readbuf.base = (char*)buf;
  320. io->readbuf.len = len;
  321. io->readbuf.offset = 0;
  322. io->alloced_readbuf = 0;
  323. }
  324. void hio_del_connect_timer(hio_t* io) {
  325. if (io->connect_timer) {
  326. htimer_del(io->connect_timer);
  327. io->connect_timer = NULL;
  328. io->connect_timeout = 0;
  329. }
  330. }
  331. void hio_del_close_timer(hio_t* io) {
  332. if (io->close_timer) {
  333. htimer_del(io->close_timer);
  334. io->close_timer = NULL;
  335. io->close_timeout = 0;
  336. }
  337. }
  338. void hio_del_keepalive_timer(hio_t* io) {
  339. if (io->keepalive_timer) {
  340. htimer_del(io->keepalive_timer);
  341. io->keepalive_timer = NULL;
  342. io->keepalive_timeout = 0;
  343. }
  344. }
  345. void hio_del_heartbeat_timer(hio_t* io) {
  346. if (io->heartbeat_timer) {
  347. htimer_del(io->heartbeat_timer);
  348. io->heartbeat_timer = NULL;
  349. io->heartbeat_interval = 0;
  350. io->heartbeat_fn = NULL;
  351. }
  352. }
  353. void hio_set_connect_timeout(hio_t* io, int timeout_ms) {
  354. io->connect_timeout = timeout_ms;
  355. }
  356. void hio_set_close_timeout(hio_t* io, int timeout_ms) {
  357. io->close_timeout = timeout_ms;
  358. }
  359. static void __keepalive_timeout_cb(htimer_t* timer) {
  360. hio_t* io = (hio_t*)timer->privdata;
  361. if (io) {
  362. char localaddrstr[SOCKADDR_STRLEN] = {0};
  363. char peeraddrstr[SOCKADDR_STRLEN] = {0};
  364. hlogw("keepalive timeout [%s] <=> [%s]",
  365. SOCKADDR_STR(io->localaddr, localaddrstr),
  366. SOCKADDR_STR(io->peeraddr, peeraddrstr));
  367. io->error = ETIMEDOUT;
  368. hio_close(io);
  369. }
  370. }
  371. void hio_set_keepalive_timeout(hio_t* io, int timeout_ms) {
  372. if (timeout_ms == 0) {
  373. // del
  374. hio_del_keepalive_timer(io);
  375. return;
  376. }
  377. if (io->keepalive_timer) {
  378. // reset
  379. ((struct htimeout_s*)io->keepalive_timer)->timeout = timeout_ms;
  380. htimer_reset(io->keepalive_timer);
  381. } else {
  382. // add
  383. io->keepalive_timer = htimer_add(io->loop, __keepalive_timeout_cb, timeout_ms, 1);
  384. io->keepalive_timer->privdata = io;
  385. }
  386. io->keepalive_timeout = timeout_ms;
  387. }
  388. static void __heartbeat_timer_cb(htimer_t* timer) {
  389. hio_t* io = (hio_t*)timer->privdata;
  390. if (io && io->heartbeat_fn) {
  391. io->heartbeat_fn(io);
  392. }
  393. }
  394. void hio_set_heartbeat(hio_t* io, int interval_ms, hio_send_heartbeat_fn fn) {
  395. if (interval_ms == 0) {
  396. // del
  397. hio_del_heartbeat_timer(io);
  398. return;
  399. }
  400. if (io->heartbeat_timer) {
  401. // reset
  402. ((struct htimeout_s*)io->heartbeat_timer)->timeout = interval_ms;
  403. htimer_reset(io->heartbeat_timer);
  404. } else {
  405. // add
  406. io->heartbeat_timer = htimer_add(io->loop, __heartbeat_timer_cb, interval_ms, INFINITE);
  407. io->heartbeat_timer->privdata = io;
  408. }
  409. io->heartbeat_interval = interval_ms;
  410. io->heartbeat_fn = fn;
  411. }
  412. void hio_alloc_readbuf(hio_t* io, int len) {
  413. if (hio_is_alloced_readbuf(io)) {
  414. io->readbuf.base = (char*)safe_realloc(io->readbuf.base, len, io->readbuf.len);
  415. } else {
  416. HV_ALLOC(io->readbuf.base, len);
  417. }
  418. io->readbuf.len = len;
  419. io->alloced_readbuf = 1;
  420. }
  421. void hio_free_readbuf(hio_t* io) {
  422. if (hio_is_alloced_readbuf(io)) {
  423. HV_FREE(io->readbuf.base);
  424. io->alloced_readbuf = 0;
  425. // reset to loop->readbuf
  426. io->readbuf.base = io->loop->readbuf.base;
  427. io->readbuf.len = io->loop->readbuf.len;
  428. }
  429. }
  430. int hio_read_once (hio_t* io) {
  431. io->read_once = 1;
  432. return hio_read_start(io);
  433. }
  434. int hio_read_until(hio_t* io, int len) {
  435. io->read_until = len;
  436. // NOTE: prepare readbuf
  437. if (hio_is_loop_readbuf(io) ||
  438. io->readbuf.len < len) {
  439. hio_alloc_readbuf(io, len);
  440. }
  441. return hio_read_once(io);
  442. }
  443. //-----------------unpack---------------------------------------------
  444. void hio_set_unpack(hio_t* io, unpack_setting_t* setting) {
  445. hio_unset_unpack(io);
  446. if (setting == NULL) return;
  447. io->unpack_setting = setting;
  448. if (io->unpack_setting->package_max_length == 0) {
  449. io->unpack_setting->package_max_length = DEFAULT_PACKAGE_MAX_LENGTH;
  450. }
  451. if (io->unpack_setting->mode == UNPACK_BY_FIXED_LENGTH) {
  452. assert(io->unpack_setting->fixed_length != 0 &&
  453. io->unpack_setting->fixed_length <= io->unpack_setting->package_max_length);
  454. }
  455. else if (io->unpack_setting->mode == UNPACK_BY_DELIMITER) {
  456. if (io->unpack_setting->delimiter_bytes == 0) {
  457. io->unpack_setting->delimiter_bytes = strlen((char*)io->unpack_setting->delimiter);
  458. }
  459. }
  460. else if (io->unpack_setting->mode == UNPACK_BY_LENGTH_FIELD) {
  461. assert(io->unpack_setting->body_offset >=
  462. io->unpack_setting->length_field_offset +
  463. io->unpack_setting->length_field_bytes);
  464. }
  465. // NOTE: unpack must have own readbuf
  466. if (io->unpack_setting->mode == UNPACK_BY_FIXED_LENGTH) {
  467. io->readbuf.len = io->unpack_setting->fixed_length;
  468. } else {
  469. io->readbuf.len = HLOOP_READ_BUFSIZE;
  470. }
  471. hio_alloc_readbuf(io, io->readbuf.len);
  472. }
  473. void hio_unset_unpack(hio_t* io) {
  474. if (io->unpack_setting) {
  475. io->unpack_setting = NULL;
  476. // NOTE: unpack has own readbuf
  477. hio_free_readbuf(io);
  478. }
  479. }
  480. //-----------------upstream---------------------------------------------
  481. void hio_read_upstream(hio_t* io) {
  482. hio_t* upstream_io = io->upstream_io;
  483. if (upstream_io) {
  484. hio_read(io);
  485. hio_read(upstream_io);
  486. }
  487. }
  488. void hio_write_upstream(hio_t* io, void* buf, int bytes) {
  489. hio_t* upstream_io = io->upstream_io;
  490. if (upstream_io) {
  491. hio_write(upstream_io, buf, bytes);
  492. }
  493. }
  494. void hio_close_upstream(hio_t* io) {
  495. hio_t* upstream_io = io->upstream_io;
  496. if (upstream_io) {
  497. hio_close(upstream_io);
  498. }
  499. }
  500. void hio_setup_upstream(hio_t* io1, hio_t* io2) {
  501. io1->upstream_io = io2;
  502. io2->upstream_io = io1;
  503. hio_setcb_read(io1, hio_write_upstream);
  504. hio_setcb_read(io2, hio_write_upstream);
  505. }
  506. hio_t* hio_get_upstream(hio_t* io) {
  507. return io->upstream_io;
  508. }
  509. hio_t* hio_setup_tcp_upstream(hio_t* io, const char* host, int port, int ssl) {
  510. hio_t* upstream_io = hio_create(io->loop, host, port, SOCK_STREAM);
  511. if (upstream_io == NULL) return NULL;
  512. if (ssl) hio_enable_ssl(upstream_io);
  513. hio_setup_upstream(io, upstream_io);
  514. hio_setcb_close(io, hio_close_upstream);
  515. hio_setcb_close(upstream_io, hio_close_upstream);
  516. hconnect(io->loop, upstream_io->fd, hio_read_upstream);
  517. return upstream_io;
  518. }
  519. hio_t* hio_setup_udp_upstream(hio_t* io, const char* host, int port) {
  520. hio_t* upstream_io = hio_create(io->loop, host, port, SOCK_DGRAM);
  521. if (upstream_io == NULL) return NULL;
  522. hio_setup_upstream(io, upstream_io);
  523. hio_read_upstream(io);
  524. return upstream_io;
  525. }