hloop.c 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642
  1. #include "hloop.h"
  2. #include "hevent.h"
  3. #include "hio.h"
  4. #include "iowatcher.h"
  5. #include "hdef.h"
  6. #include "hlog.h"
  7. #include "hmath.h"
  8. #include "hsocket.h"
  9. #define PAUSE_TIME 10 // ms
  10. #define MAX_BLOCK_TIME 1000 // ms
  11. #define IO_ARRAY_INIT_SIZE 64
  12. static void hio_init(hio_t* io);
  13. static void hio_deinit(hio_t* io);
  14. static void hio_reset(hio_t* io);
  15. static void hio_free(hio_t* io);
  16. static int timers_compare(const struct heap_node* lhs, const struct heap_node* rhs) {
  17. return TIMER_ENTRY(lhs)->next_timeout < TIMER_ENTRY(rhs)->next_timeout;
  18. }
  19. static int hloop_process_idles(hloop_t* loop) {
  20. int nidles = 0;
  21. struct list_node* node = loop->idles.next;
  22. hidle_t* idle = NULL;
  23. while (node != &loop->idles) {
  24. idle = IDLE_ENTRY(node);
  25. if (idle->repeat != INFINITE) {
  26. --idle->repeat;
  27. }
  28. if (idle->repeat == 0) {
  29. hidle_del(idle);
  30. }
  31. EVENT_PENDING(idle);
  32. ++nidles;
  33. node = node->next;
  34. if (!idle->active) {
  35. list_del(node->prev);
  36. }
  37. }
  38. return nidles;
  39. }
  40. static int hloop_process_timers(hloop_t* loop) {
  41. int ntimers = 0;
  42. htimer_t* timer = NULL;
  43. uint64_t now_hrtime = hloop_now_hrtime(loop);
  44. while (loop->timers.root) {
  45. timer = TIMER_ENTRY(loop->timers.root);
  46. if (timer->next_timeout > now_hrtime) {
  47. break;
  48. }
  49. if (timer->repeat != INFINITE) {
  50. --timer->repeat;
  51. }
  52. if (timer->repeat == 0) {
  53. htimer_del(timer);
  54. }
  55. EVENT_PENDING(timer);
  56. ++ntimers;
  57. heap_dequeue(&loop->timers);
  58. if (timer->active) {
  59. if (timer->event_type == HEVENT_TYPE_TIMEOUT) {
  60. timer->next_timeout += ((htimeout_t*)timer)->timeout*1000;
  61. }
  62. else if (timer->event_type == HEVENT_TYPE_PERIOD) {
  63. hperiod_t* period = (hperiod_t*)timer;
  64. timer->next_timeout = calc_next_timeout(period->minute, period->hour, period->day,
  65. period->week, period->month) * 1e6;
  66. }
  67. heap_insert(&loop->timers, &timer->node);
  68. }
  69. }
  70. return ntimers;
  71. }
  72. static int hloop_process_ios(hloop_t* loop, int timeout) {
  73. int nevents = iowatcher_poll_events(loop, timeout);
  74. if (nevents < 0) {
  75. hloge("poll_events error=%d", -nevents);
  76. }
  77. return nevents < 0 ? 0 : nevents;
  78. }
  79. static int hloop_process_pendings(hloop_t* loop) {
  80. if (loop->npendings == 0) return 0;
  81. hevent_t* prev = NULL;
  82. hevent_t* next = NULL;
  83. int ncbs = 0;
  84. for (int i = HEVENT_PRIORITY_SIZE-1; i >= 0; --i) {
  85. next = loop->pendings[i];
  86. while (next) {
  87. if (next->pending && next->cb) {
  88. next->cb(next);
  89. ++ncbs;
  90. }
  91. prev = next;
  92. next = next->pending_next;
  93. prev->pending = 0;
  94. prev->pending_next = NULL;
  95. if (prev->destroy) {
  96. SAFE_FREE(prev);
  97. }
  98. }
  99. loop->pendings[i] = NULL;
  100. }
  101. loop->npendings = 0;
  102. return ncbs;
  103. }
  104. static int hloop_process_events(hloop_t* loop) {
  105. // ios -> timers -> idles
  106. int nios, ntimers, nidles;
  107. nios = ntimers = nidles = 0;
  108. int32_t blocktime = MAX_BLOCK_TIME;
  109. hloop_update_time(loop);
  110. if (loop->timers.root) {
  111. uint64_t next_min_timeout = TIMER_ENTRY(loop->timers.root)->next_timeout;
  112. int64_t blocktime_us = next_min_timeout - hloop_now_hrtime(loop);
  113. if (blocktime_us <= 0) goto process_timers;
  114. blocktime = blocktime_us / 1000;
  115. ++blocktime;
  116. blocktime = MIN(blocktime, MAX_BLOCK_TIME);
  117. }
  118. if (loop->nios) {
  119. nios = hloop_process_ios(loop, blocktime);
  120. }
  121. else {
  122. msleep(blocktime);
  123. }
  124. hloop_update_time(loop);
  125. process_timers:
  126. if (loop->ntimers) {
  127. ntimers = hloop_process_timers(loop);
  128. }
  129. if (loop->npendings == 0) {
  130. if (loop->nidles) {
  131. nidles= hloop_process_idles(loop);
  132. }
  133. }
  134. int ncbs = hloop_process_pendings(loop);
  135. printd("blocktime=%d nios=%d ntimers=%d nidles=%d nactives=%d npendings=%d ncbs=%d\n",
  136. blocktime, nios, ntimers, nidles, loop->nactives, loop->npendings, ncbs);
  137. return ncbs;
  138. }
  139. int hloop_init(hloop_t* loop) {
  140. memset(loop, 0, sizeof(hloop_t));
  141. loop->status = HLOOP_STATUS_STOP;
  142. // idles
  143. list_init(&loop->idles);
  144. // timers
  145. heap_init(&loop->timers, timers_compare);
  146. // ios: init when hio_add
  147. //io_array_init(&loop->ios, IO_ARRAY_INIT_SIZE);
  148. // iowatcher: init when iowatcher_add_event
  149. //iowatcher_init(loop);
  150. // time
  151. time(&loop->start_time);
  152. loop->start_hrtime = loop->cur_hrtime = gethrtime();
  153. return 0;
  154. }
  155. void hloop_cleanup(hloop_t* loop) {
  156. // pendings
  157. printd("cleanup pendings...\n");
  158. for (int i = 0; i < HEVENT_PRIORITY_SIZE; ++i) {
  159. loop->pendings[i] = NULL;
  160. }
  161. // idles
  162. printd("cleanup idles...\n");
  163. struct list_node* node = loop->idles.next;
  164. hidle_t* idle;
  165. while (node != &loop->idles) {
  166. idle = IDLE_ENTRY(node);
  167. node = node->next;
  168. SAFE_FREE(idle);
  169. }
  170. list_init(&loop->idles);
  171. // timers
  172. printd("cleanup timers...\n");
  173. htimer_t* timer;
  174. while (loop->timers.root) {
  175. timer = TIMER_ENTRY(loop->timers.root);
  176. heap_dequeue(&loop->timers);
  177. SAFE_FREE(timer);
  178. }
  179. heap_init(&loop->timers, NULL);
  180. // ios
  181. printd("cleanup ios...\n");
  182. for (int i = 0; i < loop->ios.maxsize; ++i) {
  183. hio_t* io = loop->ios.ptr[i];
  184. if (io) {
  185. if (!(io->io_type&HIO_TYPE_STDIO)) {
  186. hclose(io);
  187. }
  188. hio_free(io);
  189. }
  190. }
  191. io_array_cleanup(&loop->ios);
  192. // iowatcher
  193. iowatcher_cleanup(loop);
  194. }
  195. int hloop_run(hloop_t* loop) {
  196. loop->loop_cnt = 0;
  197. loop->status = HLOOP_STATUS_RUNNING;
  198. while (loop->status != HLOOP_STATUS_STOP) {
  199. if (loop->status == HLOOP_STATUS_PAUSE) {
  200. msleep(PAUSE_TIME);
  201. hloop_update_time(loop);
  202. continue;
  203. }
  204. ++loop->loop_cnt;
  205. if (loop->nactives == 0) break;
  206. hloop_process_events(loop);
  207. }
  208. loop->status = HLOOP_STATUS_STOP;
  209. loop->end_hrtime = gethrtime();
  210. hloop_cleanup(loop);
  211. return 0;
  212. }
  213. int hloop_stop(hloop_t* loop) {
  214. loop->status = HLOOP_STATUS_STOP;
  215. return 0;
  216. }
  217. int hloop_pause(hloop_t* loop) {
  218. if (loop->status == HLOOP_STATUS_RUNNING) {
  219. loop->status = HLOOP_STATUS_PAUSE;
  220. }
  221. return 0;
  222. }
  223. int hloop_resume(hloop_t* loop) {
  224. if (loop->status == HLOOP_STATUS_PAUSE) {
  225. loop->status = HLOOP_STATUS_RUNNING;
  226. }
  227. return 0;
  228. }
  229. hidle_t* hidle_add(hloop_t* loop, hidle_cb cb, uint32_t repeat) {
  230. hidle_t* idle;
  231. SAFE_ALLOC_SIZEOF(idle);
  232. idle->event_type = HEVENT_TYPE_IDLE;
  233. idle->priority = HEVENT_LOWEST_PRIORITY;
  234. idle->repeat = repeat;
  235. list_add(&idle->node, &loop->idles);
  236. EVENT_ADD(loop, idle, cb);
  237. loop->nidles++;
  238. return idle;
  239. }
  240. void hidle_del(hidle_t* idle) {
  241. if (!idle->active) return;
  242. idle->loop->nidles--;
  243. EVENT_DEL(idle);
  244. }
  245. htimer_t* htimer_add(hloop_t* loop, htimer_cb cb, uint64_t timeout, uint32_t repeat) {
  246. if (timeout == 0) return NULL;
  247. htimeout_t* timer;
  248. SAFE_ALLOC_SIZEOF(timer);
  249. timer->event_type = HEVENT_TYPE_TIMEOUT;
  250. timer->priority = HEVENT_HIGHEST_PRIORITY;
  251. timer->repeat = repeat;
  252. timer->timeout = timeout;
  253. hloop_update_time(loop);
  254. timer->next_timeout = hloop_now_hrtime(loop) + timeout*1000;
  255. heap_insert(&loop->timers, &timer->node);
  256. EVENT_ADD(loop, timer, cb);
  257. loop->ntimers++;
  258. return (htimer_t*)timer;
  259. }
  260. void htimer_reset(htimer_t* timer) {
  261. if (timer->event_type != HEVENT_TYPE_TIMEOUT || timer->pending) {
  262. return;
  263. }
  264. hloop_t* loop = timer->loop;
  265. htimeout_t* timeout = (htimeout_t*)timer;
  266. heap_remove(&loop->timers, &timer->node);
  267. timer->next_timeout = hloop_now_hrtime(loop) + timeout->timeout*1000;
  268. heap_insert(&loop->timers, &timer->node);
  269. }
  270. htimer_t* htimer_add_period(hloop_t* loop, htimer_cb cb,
  271. int8_t minute, int8_t hour, int8_t day,
  272. int8_t week, int8_t month, uint32_t repeat) {
  273. if (minute > 59 || hour > 23 || day > 31 || week > 6 || month > 12) {
  274. return NULL;
  275. }
  276. hperiod_t* timer;
  277. SAFE_ALLOC_SIZEOF(timer);
  278. timer->event_type = HEVENT_TYPE_PERIOD;
  279. timer->priority = HEVENT_HIGH_PRIORITY;
  280. timer->repeat = repeat;
  281. timer->minute = minute;
  282. timer->hour = hour;
  283. timer->day = day;
  284. timer->month = month;
  285. timer->week = week;
  286. timer->next_timeout = calc_next_timeout(minute, hour, day, week, month) * 1e6;
  287. heap_insert(&loop->timers, &timer->node);
  288. EVENT_ADD(loop, timer, cb);
  289. loop->ntimers++;
  290. return (htimer_t*)timer;
  291. }
  292. void htimer_del(htimer_t* timer) {
  293. if (!timer->active) return;
  294. timer->loop->ntimers--;
  295. EVENT_DEL(timer);
  296. // NOTE: set timer->next_timeout to handle at next loop
  297. timer->next_timeout = hloop_now_hrtime(timer->loop);
  298. }
  299. void hio_init(hio_t* io) {
  300. memset(io, 0, sizeof(hio_t));
  301. io->event_type = HEVENT_TYPE_IO;
  302. io->event_index[0] = io->event_index[1] = -1;
  303. // write_queue init when hwrite try_write failed
  304. //write_queue_init(&io->write_queue, 4);;
  305. }
  306. static void fill_io_type(hio_t* io) {
  307. int type = 0;
  308. socklen_t optlen = sizeof(int);
  309. int ret = getsockopt(io->fd, SOL_SOCKET, SO_TYPE, (char*)&type, &optlen);
  310. printd("getsockopt SO_TYPE fd=%d ret=%d type=%d errno=%d\n", io->fd, ret, type, socket_errno());
  311. if (ret == 0) {
  312. switch (type) {
  313. case SOCK_STREAM: io->io_type = HIO_TYPE_TCP; break;
  314. case SOCK_DGRAM: io->io_type = HIO_TYPE_UDP; break;
  315. case SOCK_RAW: io->io_type = HIO_TYPE_IP; break;
  316. default: io->io_type = HIO_TYPE_SOCKET; break;
  317. }
  318. }
  319. else if (socket_errno() == ENOTSOCK) {
  320. switch (io->fd) {
  321. case 0: io->io_type = HIO_TYPE_STDIN; break;
  322. case 1: io->io_type = HIO_TYPE_STDOUT; break;
  323. case 2: io->io_type = HIO_TYPE_STDERR; break;
  324. default: io->io_type = HIO_TYPE_FILE; break;
  325. }
  326. }
  327. }
  328. void hio_socket_init(hio_t* io) {
  329. // nonblocking
  330. nonblocking(io->fd);
  331. // fill io->localaddr io->peeraddr
  332. if (io->localaddr == NULL) {
  333. SAFE_ALLOC(io->localaddr, sizeof(struct sockaddr_in6));
  334. }
  335. if (io->peeraddr == NULL) {
  336. io->peeraddrlen = sizeof(struct sockaddr_in6);
  337. SAFE_ALLOC(io->peeraddr, sizeof(struct sockaddr_in6));
  338. }
  339. socklen_t addrlen = sizeof(struct sockaddr_in6);
  340. int ret = getsockname(io->fd, io->localaddr, &addrlen);
  341. printd("getsockname fd=%d ret=%d errno=%d\n", io->fd, ret, socket_errno());
  342. // NOTE:
  343. // tcp_server peeraddr set by accept
  344. // udp_server peeraddr set by recvfrom
  345. // tcp_client/udp_client peeraddr set by hio_setpeeraddr
  346. if (io->io_type == HIO_TYPE_TCP) {
  347. // tcp acceptfd
  348. addrlen = sizeof(struct sockaddr_in6);
  349. ret = getpeername(io->fd, io->peeraddr, &addrlen);
  350. printd("getpeername fd=%d ret=%d errno=%d\n", io->fd, ret, socket_errno());
  351. }
  352. }
  353. void hio_reset(hio_t* io) {
  354. fill_io_type(io);
  355. if (io->io_type & HIO_TYPE_SOCKET) {
  356. hio_socket_init(io);
  357. }
  358. }
  359. void hio_deinit(hio_t* io) {
  360. offset_buf_t* pbuf = NULL;
  361. while (!write_queue_empty(&io->write_queue)) {
  362. pbuf = write_queue_front(&io->write_queue);
  363. SAFE_FREE(pbuf->base);
  364. write_queue_pop_front(&io->write_queue);
  365. }
  366. write_queue_cleanup(&io->write_queue);
  367. io->closed = 0;
  368. io->accept = io->connect = io->connectex = 0;
  369. io->recv = io->send = 0;
  370. io->recvfrom = io->sendto = 0;
  371. io->io_type = HIO_TYPE_UNKNOWN;
  372. io->error = 0;
  373. io->events = io->revents = 0;
  374. io->read_cb = NULL;
  375. io->write_cb = NULL;
  376. io->close_cb = 0;
  377. io->accept_cb = 0;
  378. io->connect_cb = 0;
  379. io->event_index[0] = io->event_index[1] = -1;
  380. io->hovlp = NULL;
  381. }
  382. void hio_free(hio_t* io) {
  383. if (io == NULL) return;
  384. hio_deinit(io);
  385. SAFE_FREE(io->localaddr);
  386. SAFE_FREE(io->peeraddr);
  387. SAFE_FREE(io);
  388. }
  389. hio_t* hio_get(hloop_t* loop, int fd) {
  390. if (loop->ios.maxsize == 0) {
  391. io_array_init(&loop->ios, IO_ARRAY_INIT_SIZE);
  392. }
  393. if (fd >= loop->ios.maxsize) {
  394. int newsize = ceil2e(fd);
  395. io_array_resize(&loop->ios, newsize > fd ? newsize : 2*fd);
  396. }
  397. hio_t* io = loop->ios.ptr[fd];
  398. if (io == NULL) {
  399. SAFE_ALLOC_SIZEOF(io);
  400. hio_init(io);
  401. io->loop = loop;
  402. io->fd = fd;
  403. loop->ios.ptr[fd] = io;
  404. }
  405. return io;
  406. }
  407. int hio_add(hio_t* io, hio_cb cb, int events) {
  408. printd("hio_add fd=%d events=%d\n", io->fd, events);
  409. hloop_t* loop = io->loop;
  410. if (!io->active) {
  411. hio_reset(io);
  412. EVENT_ADD(loop, io, cb);
  413. loop->nios++;
  414. }
  415. if (cb) {
  416. io->cb = (hevent_cb)cb;
  417. }
  418. iowatcher_add_event(loop, io->fd, events);
  419. io->events |= events;
  420. return 0;
  421. }
  422. int hio_del(hio_t* io, int events) {
  423. printd("hio_del fd=%d io->events=%d events=%d\n", io->fd, io->events, events);
  424. if (!io->active) return 0;
  425. iowatcher_del_event(io->loop, io->fd, events);
  426. io->events &= ~events;
  427. if (io->events == 0) {
  428. io->loop->nios--;
  429. // NOTE: not EVENT_DEL, avoid free
  430. EVENT_INACTIVE(io);
  431. hio_deinit(io);
  432. }
  433. return 0;
  434. }
  435. void hio_setlocaladdr(hio_t* io, struct sockaddr* addr, int addrlen) {
  436. if (io->localaddr == NULL) {
  437. SAFE_ALLOC(io->localaddr, sizeof(struct sockaddr_in6));
  438. }
  439. memcpy(io->localaddr, addr, addrlen);
  440. }
  441. void hio_setpeeraddr (hio_t* io, struct sockaddr* addr, int addrlen) {
  442. if (io->peeraddr == NULL) {
  443. io->peeraddrlen = sizeof(struct sockaddr_in6);
  444. SAFE_ALLOC(io->peeraddr, sizeof(struct sockaddr_in6));
  445. }
  446. memcpy(io->peeraddr, addr, addrlen);
  447. }
  448. hio_t* hread(hloop_t* loop, int fd, void* buf, size_t len, hread_cb read_cb) {
  449. hio_t* io = hio_get(loop, fd);
  450. if (io == NULL) return NULL;
  451. io->readbuf.base = (char*)buf;
  452. io->readbuf.len = len;
  453. if (read_cb) {
  454. io->read_cb = read_cb;
  455. }
  456. hio_read(io);
  457. return io;
  458. }
  459. hio_t* hwrite(hloop_t* loop, int fd, const void* buf, size_t len, hwrite_cb write_cb) {
  460. hio_t* io = hio_get(loop, fd);
  461. if (io == NULL) return NULL;
  462. if (write_cb) {
  463. io->write_cb = write_cb;
  464. }
  465. hio_write(io, buf, len);
  466. return io;
  467. }
  468. void hclose(hio_t* io) {
  469. printd("close fd=%d\n", io->fd);
  470. if (io->closed) return;
  471. io->closed = 1;
  472. hio_close(io);
  473. if (io->close_cb) {
  474. printd("close_cb------\n");
  475. io->close_cb(io);
  476. printd("close_cb======\n");
  477. }
  478. hio_del(io, ALL_EVENTS);
  479. }
  480. hio_t* haccept(hloop_t* loop, int listenfd, haccept_cb accept_cb) {
  481. hio_t* io = hio_get(loop, listenfd);
  482. if (io == NULL) return NULL;
  483. io->accept = 1;
  484. if (accept_cb) {
  485. io->accept_cb = accept_cb;
  486. }
  487. hio_accept(io);
  488. return io;
  489. }
  490. hio_t* hconnect (hloop_t* loop, int connfd, hconnect_cb connect_cb) {
  491. hio_t* io = hio_get(loop, connfd);
  492. if (io == NULL) return NULL;
  493. io->connect = 1;
  494. if (connect_cb) {
  495. io->connect_cb = connect_cb;
  496. }
  497. hio_connect(io);
  498. return io;
  499. }
  500. hio_t* create_tcp_server (hloop_t* loop, int port, haccept_cb accept_cb) {
  501. int listenfd = Listen(port);
  502. if (listenfd < 0) {
  503. return NULL;
  504. }
  505. hio_t* io = haccept(loop, listenfd, accept_cb);
  506. if (io == NULL) {
  507. closesocket(listenfd);
  508. }
  509. return io;
  510. }
  511. hio_t* create_tcp_client (hloop_t* loop, const char* host, int port, hconnect_cb connect_cb) {
  512. struct sockaddr_in addr;
  513. socklen_t addrlen = sizeof(addr);
  514. memset(&addr, 0, addrlen);
  515. addr.sin_family = AF_INET;
  516. int ret = Resolver(host, (struct sockaddr*)&addr);
  517. if (ret != 0) return NULL;
  518. addr.sin_port = htons(port);
  519. int connfd = socket(AF_INET, SOCK_STREAM, 0);
  520. if (connfd < 0) {
  521. perror("socket");
  522. return NULL;
  523. }
  524. hio_t* io = hio_get(loop, connfd);
  525. if (io == NULL) return NULL;
  526. hio_setpeeraddr(io, (struct sockaddr*)&addr, addrlen);
  527. hconnect(loop, connfd, connect_cb);
  528. return io;
  529. }
  530. hio_t* hrecv (hloop_t* loop, int connfd, void* buf, size_t len, hread_cb read_cb) {
  531. hio_t* io = hio_get(loop, connfd);
  532. if (io == NULL) return NULL;
  533. io->recv = 1;
  534. io->io_type = HIO_TYPE_TCP;
  535. return hread(loop, connfd, buf, len, read_cb);
  536. }
  537. hio_t* hsend (hloop_t* loop, int connfd, const void* buf, size_t len, hwrite_cb write_cb) {
  538. hio_t* io = hio_get(loop, connfd);
  539. if (io == NULL) return NULL;
  540. io->send = 1;
  541. io->io_type = HIO_TYPE_TCP;
  542. return hwrite(loop, connfd, buf, len, write_cb);
  543. }
  544. hio_t* hrecvfrom (hloop_t* loop, int sockfd, void* buf, size_t len, hread_cb read_cb) {
  545. hio_t* io = hio_get(loop, sockfd);
  546. if (io == NULL) return NULL;
  547. io->recvfrom = 1;
  548. io->io_type = HIO_TYPE_UDP;
  549. return hread(loop, sockfd, buf, len, read_cb);
  550. }
  551. hio_t* hsendto (hloop_t* loop, int sockfd, const void* buf, size_t len, hwrite_cb write_cb) {
  552. hio_t* io = hio_get(loop, sockfd);
  553. if (io == NULL) return NULL;
  554. io->sendto = 1;
  555. io->io_type = HIO_TYPE_UDP;
  556. return hwrite(loop, sockfd, buf, len, write_cb);
  557. }
  558. // @server: socket -> bind -> hrecvfrom
  559. hio_t* create_udp_server(hloop_t* loop, int port) {
  560. int bindfd = Bind(port, SOCK_DGRAM);
  561. if (bindfd < 0) {
  562. return NULL;
  563. }
  564. return hio_get(loop, bindfd);
  565. }
  566. // @client: Resolver -> socket -> hio_get -> hio_setpeeraddr
  567. hio_t* create_udp_client(hloop_t* loop, const char* host, int port) {
  568. // IPv4
  569. struct sockaddr_in peeraddr;
  570. socklen_t addrlen = sizeof(peeraddr);
  571. memset(&peeraddr, 0, addrlen);
  572. peeraddr.sin_family = AF_INET;
  573. int ret = Resolver(host, (struct sockaddr*)&peeraddr);
  574. if (ret != 0) return NULL;
  575. peeraddr.sin_port = htons(port);
  576. int sockfd = socket(AF_INET, SOCK_DGRAM, 0);
  577. if (sockfd < 0) {
  578. perror("socket");
  579. return NULL;
  580. }
  581. hio_t* io = hio_get(loop, sockfd);
  582. if (io == NULL) return NULL;
  583. hio_setpeeraddr(io, (struct sockaddr*)&peeraddr, addrlen);
  584. return io;
  585. }