hloop.c 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020
  1. #include "hloop.h"
  2. #include "hevent.h"
  3. #include "iowatcher.h"
  4. #include "hdef.h"
  5. #include "hbase.h"
  6. #include "hlog.h"
  7. #include "hmath.h"
  8. #include "htime.h"
  9. #include "hsocket.h"
  10. #include "hthread.h"
  11. #if defined(OS_UNIX) && HAVE_EVENTFD
  12. #include "sys/eventfd.h"
  13. #endif
  14. #define HLOOP_PAUSE_TIME 10 // ms
  15. #define HLOOP_MAX_BLOCK_TIME 100 // ms
  16. #define HLOOP_STAT_TIMEOUT 60000 // ms
  17. #define IO_ARRAY_INIT_SIZE 1024
  18. #define CUSTOM_EVENT_QUEUE_INIT_SIZE 16
  19. #define EVENTFDS_READ_INDEX 0
  20. #define EVENTFDS_WRITE_INDEX 1
  21. static void __hidle_del(hidle_t* idle);
  22. static void __htimer_del(htimer_t* timer);
  23. static int timers_compare(const struct heap_node* lhs, const struct heap_node* rhs) {
  24. return TIMER_ENTRY(lhs)->next_timeout < TIMER_ENTRY(rhs)->next_timeout;
  25. }
  26. static int hloop_process_idles(hloop_t* loop) {
  27. int nidles = 0;
  28. struct list_node* node = loop->idles.next;
  29. hidle_t* idle = NULL;
  30. while (node != &loop->idles) {
  31. idle = IDLE_ENTRY(node);
  32. node = node->next;
  33. if (idle->repeat != INFINITE) {
  34. --idle->repeat;
  35. }
  36. if (idle->repeat == 0) {
  37. // NOTE: Just mark it as destroy and remove from list.
  38. // Real deletion occurs after hloop_process_pendings.
  39. __hidle_del(idle);
  40. }
  41. EVENT_PENDING(idle);
  42. ++nidles;
  43. }
  44. return nidles;
  45. }
  46. static int __hloop_process_timers(struct heap* timers, uint64_t timeout) {
  47. int ntimers = 0;
  48. htimer_t* timer = NULL;
  49. while (timers->root) {
  50. // NOTE: root of minheap has min timeout.
  51. timer = TIMER_ENTRY(timers->root);
  52. if (timer->next_timeout > timeout) {
  53. break;
  54. }
  55. if (timer->repeat != INFINITE) {
  56. --timer->repeat;
  57. }
  58. if (timer->repeat == 0) {
  59. // NOTE: Just mark it as destroy and remove from heap.
  60. // Real deletion occurs after hloop_process_pendings.
  61. __htimer_del(timer);
  62. }
  63. else {
  64. // NOTE: calc next timeout, then re-insert heap.
  65. heap_dequeue(timers);
  66. if (timer->event_type == HEVENT_TYPE_TIMEOUT) {
  67. while (timer->next_timeout <= timeout) {
  68. timer->next_timeout += (uint64_t)((htimeout_t*)timer)->timeout * 1000;
  69. }
  70. }
  71. else if (timer->event_type == HEVENT_TYPE_PERIOD) {
  72. hperiod_t* period = (hperiod_t*)timer;
  73. timer->next_timeout = (uint64_t)cron_next_timeout(period->minute, period->hour, period->day,
  74. period->week, period->month) * 1000000;
  75. }
  76. heap_insert(timers, &timer->node);
  77. }
  78. EVENT_PENDING(timer);
  79. ++ntimers;
  80. }
  81. return ntimers;
  82. }
  83. static int hloop_process_timers(hloop_t* loop) {
  84. uint64_t now = hloop_now_us(loop);
  85. int ntimers = __hloop_process_timers(&loop->timers, loop->cur_hrtime);
  86. ntimers += __hloop_process_timers(&loop->realtimers, now);
  87. return ntimers;
  88. }
  89. static int hloop_process_ios(hloop_t* loop, int timeout) {
  90. // That is to call IO multiplexing function such as select, poll, epoll, etc.
  91. int nevents = iowatcher_poll_events(loop, timeout);
  92. if (nevents < 0) {
  93. hlogd("poll_events error=%d", -nevents);
  94. }
  95. return nevents < 0 ? 0 : nevents;
  96. }
  97. static int hloop_process_pendings(hloop_t* loop) {
  98. if (loop->npendings == 0) return 0;
  99. hevent_t* cur = NULL;
  100. hevent_t* next = NULL;
  101. int ncbs = 0;
  102. // NOTE: invoke event callback from high to low sorted by priority.
  103. for (int i = HEVENT_PRIORITY_SIZE-1; i >= 0; --i) {
  104. cur = loop->pendings[i];
  105. while (cur) {
  106. next = cur->pending_next;
  107. if (cur->pending) {
  108. if (cur->active && cur->cb) {
  109. cur->cb(cur);
  110. ++ncbs;
  111. }
  112. cur->pending = 0;
  113. // NOTE: Now we can safely delete event marked as destroy.
  114. if (cur->destroy) {
  115. EVENT_DEL(cur);
  116. }
  117. }
  118. cur = next;
  119. }
  120. loop->pendings[i] = NULL;
  121. }
  122. loop->npendings = 0;
  123. return ncbs;
  124. }
  125. // hloop_process_ios -> hloop_process_timers -> hloop_process_idles -> hloop_process_pendings
  126. int hloop_process_events(hloop_t* loop, int timeout_ms) {
  127. // ios -> timers -> idles
  128. int nios, ntimers, nidles;
  129. nios = ntimers = nidles = 0;
  130. // calc blocktime
  131. int32_t blocktime_ms = timeout_ms;
  132. if (loop->ntimers) {
  133. hloop_update_time(loop);
  134. int64_t blocktime_us = blocktime_ms * 1000;
  135. if (loop->timers.root) {
  136. int64_t min_timeout = TIMER_ENTRY(loop->timers.root)->next_timeout - loop->cur_hrtime;
  137. blocktime_us = MIN(blocktime_us, min_timeout);
  138. }
  139. if (loop->realtimers.root) {
  140. int64_t min_timeout = TIMER_ENTRY(loop->realtimers.root)->next_timeout - hloop_now_us(loop);
  141. blocktime_us = MIN(blocktime_us, min_timeout);
  142. }
  143. if (blocktime_us < 0) goto process_timers;
  144. blocktime_ms = blocktime_us / 1000 + 1;
  145. blocktime_ms = MIN(blocktime_ms, timeout_ms);
  146. }
  147. if (loop->nios) {
  148. nios = hloop_process_ios(loop, blocktime_ms);
  149. } else {
  150. hv_msleep(blocktime_ms);
  151. }
  152. hloop_update_time(loop);
  153. // wakeup by hloop_stop
  154. if (loop->status == HLOOP_STATUS_STOP) {
  155. return 0;
  156. }
  157. process_timers:
  158. if (loop->ntimers) {
  159. ntimers = hloop_process_timers(loop);
  160. }
  161. int npendings = loop->npendings;
  162. if (npendings == 0) {
  163. if (loop->nidles) {
  164. nidles= hloop_process_idles(loop);
  165. }
  166. }
  167. int ncbs = hloop_process_pendings(loop);
  168. // printd("blocktime=%d nios=%d/%u ntimers=%d/%u nidles=%d/%u nactives=%d npendings=%d ncbs=%d\n",
  169. // blocktime, nios, loop->nios, ntimers, loop->ntimers, nidles, loop->nidles,
  170. // loop->nactives, npendings, ncbs);
  171. return ncbs;
  172. }
  173. static void hloop_stat_timer_cb(htimer_t* timer) {
  174. hloop_t* loop = timer->loop;
  175. // hlog_set_level(LOG_LEVEL_DEBUG);
  176. hlogd("[loop] pid=%ld tid=%ld uptime=%lluus cnt=%llu nactives=%u nios=%u ntimers=%u nidles=%u",
  177. loop->pid, loop->tid,
  178. (unsigned long long)loop->cur_hrtime - loop->start_hrtime,
  179. (unsigned long long)loop->loop_cnt,
  180. loop->nactives, loop->nios, loop->ntimers, loop->nidles);
  181. }
  182. static void eventfd_read_cb(hio_t* io, void* buf, int readbytes) {
  183. hloop_t* loop = io->loop;
  184. hevent_t* pev = NULL;
  185. hevent_t ev;
  186. uint64_t count = readbytes;
  187. #if defined(OS_UNIX) && HAVE_EVENTFD
  188. assert(readbytes == sizeof(count));
  189. count = *(uint64_t*)buf;
  190. #endif
  191. for (uint64_t i = 0; i < count; ++i) {
  192. hmutex_lock(&loop->custom_events_mutex);
  193. if (event_queue_empty(&loop->custom_events)) {
  194. goto unlock;
  195. }
  196. pev = event_queue_front(&loop->custom_events);
  197. if (pev == NULL) {
  198. goto unlock;
  199. }
  200. ev = *pev;
  201. event_queue_pop_front(&loop->custom_events);
  202. // NOTE: unlock before cb, avoid deadlock if hloop_post_event called in cb.
  203. hmutex_unlock(&loop->custom_events_mutex);
  204. if (ev.cb) {
  205. ev.cb(&ev);
  206. }
  207. }
  208. return;
  209. unlock:
  210. hmutex_unlock(&loop->custom_events_mutex);
  211. }
  212. static int hloop_create_eventfds(hloop_t* loop) {
  213. #if defined(OS_UNIX) && HAVE_EVENTFD
  214. int efd = eventfd(0, 0);
  215. if (efd < 0) {
  216. hloge("eventfd create failed!");
  217. return -1;
  218. }
  219. loop->eventfds[0] = loop->eventfds[1] = efd;
  220. #elif defined(OS_UNIX) && HAVE_PIPE
  221. if (pipe(loop->eventfds) != 0) {
  222. hloge("pipe create failed!");
  223. return -1;
  224. }
  225. #else
  226. if (Socketpair(AF_INET, SOCK_STREAM, 0, loop->eventfds) != 0) {
  227. hloge("socketpair create failed!");
  228. return -1;
  229. }
  230. #endif
  231. hio_t* io = hread(loop, loop->eventfds[EVENTFDS_READ_INDEX], NULL, 0, eventfd_read_cb);
  232. io->priority = HEVENT_HIGH_PRIORITY;
  233. ++loop->intern_nevents;
  234. return 0;
  235. }
  236. static void hloop_destroy_eventfds(hloop_t* loop) {
  237. #if defined(OS_UNIX) && HAVE_EVENTFD
  238. // NOTE: eventfd has only one fd
  239. SAFE_CLOSE(loop->eventfds[0]);
  240. #elif defined(OS_UNIX) && HAVE_PIPE
  241. SAFE_CLOSE(loop->eventfds[0]);
  242. SAFE_CLOSE(loop->eventfds[1]);
  243. #else
  244. // NOTE: Avoid duplication closesocket in hio_cleanup
  245. // SAFE_CLOSESOCKET(loop->eventfds[EVENTFDS_READ_INDEX]);
  246. SAFE_CLOSESOCKET(loop->eventfds[EVENTFDS_WRITE_INDEX]);
  247. #endif
  248. loop->eventfds[0] = loop->eventfds[1] = -1;
  249. }
  250. void hloop_post_event(hloop_t* loop, hevent_t* ev) {
  251. if (ev->loop == NULL) {
  252. ev->loop = loop;
  253. }
  254. if (ev->event_type == 0) {
  255. ev->event_type = HEVENT_TYPE_CUSTOM;
  256. }
  257. if (ev->event_id == 0) {
  258. ev->event_id = hloop_next_event_id();
  259. }
  260. int nwrite = 0;
  261. uint64_t count = 1;
  262. hmutex_lock(&loop->custom_events_mutex);
  263. if (loop->eventfds[EVENTFDS_WRITE_INDEX] == -1) {
  264. if (hloop_create_eventfds(loop) != 0) {
  265. goto unlock;
  266. }
  267. }
  268. #if defined(OS_UNIX) && HAVE_EVENTFD
  269. nwrite = write(loop->eventfds[EVENTFDS_WRITE_INDEX], &count, sizeof(count));
  270. #elif defined(OS_UNIX) && HAVE_PIPE
  271. nwrite = write(loop->eventfds[EVENTFDS_WRITE_INDEX], "e", 1);
  272. #else
  273. nwrite = send(loop->eventfds[EVENTFDS_WRITE_INDEX], "e", 1, 0);
  274. #endif
  275. if (nwrite <= 0) {
  276. hloge("hloop_post_event failed!");
  277. goto unlock;
  278. }
  279. if (loop->custom_events.maxsize == 0) {
  280. event_queue_init(&loop->custom_events, CUSTOM_EVENT_QUEUE_INIT_SIZE);
  281. }
  282. event_queue_push_back(&loop->custom_events, ev);
  283. unlock:
  284. hmutex_unlock(&loop->custom_events_mutex);
  285. }
  286. static void hloop_init(hloop_t* loop) {
  287. #ifdef OS_WIN
  288. WSAInit();
  289. #endif
  290. #ifdef SIGPIPE
  291. // NOTE: if not ignore SIGPIPE, write twice when peer close will lead to exit process by SIGPIPE.
  292. signal(SIGPIPE, SIG_IGN);
  293. #endif
  294. loop->status = HLOOP_STATUS_STOP;
  295. loop->pid = hv_getpid();
  296. loop->tid = hv_gettid();
  297. // idles
  298. list_init(&loop->idles);
  299. // timers
  300. heap_init(&loop->timers, timers_compare);
  301. heap_init(&loop->realtimers, timers_compare);
  302. // ios
  303. // NOTE: io_array_init when hio_get -> io_array_resize
  304. // io_array_init(&loop->ios, IO_ARRAY_INIT_SIZE);
  305. // readbuf
  306. // NOTE: alloc readbuf when hio_use_loop_readbuf
  307. // loop->readbuf.len = HLOOP_READ_BUFSIZE;
  308. // HV_ALLOC(loop->readbuf.base, loop->readbuf.len);
  309. // NOTE: iowatcher_init when hio_add -> iowatcher_add_event
  310. // iowatcher_init(loop);
  311. // custom_events
  312. hmutex_init(&loop->custom_events_mutex);
  313. // NOTE: hloop_create_eventfds when hloop_post_event or hloop_run
  314. loop->eventfds[0] = loop->eventfds[1] = -1;
  315. // NOTE: init start_time here, because htimer_add use it.
  316. loop->start_ms = gettimeofday_ms();
  317. loop->start_hrtime = loop->cur_hrtime = gethrtime_us();
  318. }
  319. static void hloop_cleanup(hloop_t* loop) {
  320. // pendings
  321. printd("cleanup pendings...\n");
  322. for (int i = 0; i < HEVENT_PRIORITY_SIZE; ++i) {
  323. loop->pendings[i] = NULL;
  324. }
  325. // ios
  326. printd("cleanup ios...\n");
  327. for (int i = 0; i < loop->ios.maxsize; ++i) {
  328. hio_t* io = loop->ios.ptr[i];
  329. if (io) {
  330. hio_free(io);
  331. }
  332. }
  333. io_array_cleanup(&loop->ios);
  334. // idles
  335. printd("cleanup idles...\n");
  336. struct list_node* node = loop->idles.next;
  337. hidle_t* idle;
  338. while (node != &loop->idles) {
  339. idle = IDLE_ENTRY(node);
  340. node = node->next;
  341. HV_FREE(idle);
  342. }
  343. list_init(&loop->idles);
  344. // timers
  345. printd("cleanup timers...\n");
  346. htimer_t* timer;
  347. while (loop->timers.root) {
  348. timer = TIMER_ENTRY(loop->timers.root);
  349. heap_dequeue(&loop->timers);
  350. HV_FREE(timer);
  351. }
  352. heap_init(&loop->timers, NULL);
  353. while (loop->realtimers.root) {
  354. timer = TIMER_ENTRY(loop->realtimers.root);
  355. heap_dequeue(&loop->realtimers);
  356. HV_FREE(timer);
  357. }
  358. heap_init(&loop->realtimers, NULL);
  359. // readbuf
  360. if (loop->readbuf.base && loop->readbuf.len) {
  361. HV_FREE(loop->readbuf.base);
  362. loop->readbuf.base = NULL;
  363. loop->readbuf.len = 0;
  364. }
  365. // iowatcher
  366. iowatcher_cleanup(loop);
  367. // custom_events
  368. hmutex_lock(&loop->custom_events_mutex);
  369. hloop_destroy_eventfds(loop);
  370. event_queue_cleanup(&loop->custom_events);
  371. hmutex_unlock(&loop->custom_events_mutex);
  372. hmutex_destroy(&loop->custom_events_mutex);
  373. }
  374. hloop_t* hloop_new(int flags) {
  375. hloop_t* loop;
  376. HV_ALLOC_SIZEOF(loop);
  377. hloop_init(loop);
  378. loop->flags |= flags;
  379. return loop;
  380. }
  381. void hloop_free(hloop_t** pp) {
  382. if (pp && *pp) {
  383. hloop_cleanup(*pp);
  384. HV_FREE(*pp);
  385. *pp = NULL;
  386. }
  387. }
  388. // while (loop->status) { hloop_process_events(loop); }
  389. int hloop_run(hloop_t* loop) {
  390. if (loop == NULL) return -1;
  391. if (loop->status == HLOOP_STATUS_RUNNING) return -2;
  392. loop->status = HLOOP_STATUS_RUNNING;
  393. loop->pid = hv_getpid();
  394. loop->tid = hv_gettid();
  395. if (loop->intern_nevents == 0) {
  396. hmutex_lock(&loop->custom_events_mutex);
  397. if (loop->eventfds[EVENTFDS_WRITE_INDEX] == -1) {
  398. hloop_create_eventfds(loop);
  399. }
  400. hmutex_unlock(&loop->custom_events_mutex);
  401. #ifdef DEBUG
  402. htimer_add(loop, hloop_stat_timer_cb, HLOOP_STAT_TIMEOUT, INFINITE);
  403. ++loop->intern_nevents;
  404. #endif
  405. }
  406. while (loop->status != HLOOP_STATUS_STOP) {
  407. if (loop->status == HLOOP_STATUS_PAUSE) {
  408. hv_msleep(HLOOP_PAUSE_TIME);
  409. hloop_update_time(loop);
  410. continue;
  411. }
  412. ++loop->loop_cnt;
  413. if ((loop->flags & HLOOP_FLAG_QUIT_WHEN_NO_ACTIVE_EVENTS) &&
  414. loop->nactives <= loop->intern_nevents) {
  415. break;
  416. }
  417. hloop_process_events(loop, HLOOP_MAX_BLOCK_TIME);
  418. if (loop->flags & HLOOP_FLAG_RUN_ONCE) {
  419. break;
  420. }
  421. }
  422. loop->status = HLOOP_STATUS_STOP;
  423. loop->end_hrtime = gethrtime_us();
  424. if (loop->flags & HLOOP_FLAG_AUTO_FREE) {
  425. hloop_cleanup(loop);
  426. HV_FREE(loop);
  427. }
  428. return 0;
  429. }
  430. int hloop_wakeup(hloop_t* loop) {
  431. hevent_t ev;
  432. memset(&ev, 0, sizeof(ev));
  433. hloop_post_event(loop, &ev);
  434. return 0;
  435. }
  436. int hloop_stop(hloop_t* loop) {
  437. if (hv_gettid() != loop->tid) {
  438. hloop_wakeup(loop);
  439. }
  440. loop->status = HLOOP_STATUS_STOP;
  441. return 0;
  442. }
  443. int hloop_pause(hloop_t* loop) {
  444. if (loop->status == HLOOP_STATUS_RUNNING) {
  445. loop->status = HLOOP_STATUS_PAUSE;
  446. }
  447. return 0;
  448. }
  449. int hloop_resume(hloop_t* loop) {
  450. if (loop->status == HLOOP_STATUS_PAUSE) {
  451. loop->status = HLOOP_STATUS_RUNNING;
  452. }
  453. return 0;
  454. }
  455. hloop_status_e hloop_status(hloop_t* loop) {
  456. return loop->status;
  457. }
  458. void hloop_update_time(hloop_t* loop) {
  459. loop->cur_hrtime = gethrtime_us();
  460. if (hloop_now(loop) != time(NULL)) {
  461. // systemtime changed, we adjust start_ms
  462. loop->start_ms = gettimeofday_ms() - (loop->cur_hrtime - loop->start_hrtime) / 1000;
  463. }
  464. }
  465. uint64_t hloop_now(hloop_t* loop) {
  466. return loop->start_ms / 1000 + (loop->cur_hrtime - loop->start_hrtime) / 1000000;
  467. }
  468. uint64_t hloop_now_ms(hloop_t* loop) {
  469. return loop->start_ms + (loop->cur_hrtime - loop->start_hrtime) / 1000;
  470. }
  471. uint64_t hloop_now_us(hloop_t* loop) {
  472. return loop->start_ms * 1000 + (loop->cur_hrtime - loop->start_hrtime);
  473. }
  474. uint64_t hloop_now_hrtime(hloop_t* loop) {
  475. return loop->cur_hrtime;
  476. }
  477. uint64_t hio_last_read_time(hio_t* io) {
  478. hloop_t* loop = io->loop;
  479. return loop->start_ms + (io->last_read_hrtime - loop->start_hrtime) / 1000;
  480. }
  481. uint64_t hio_last_write_time(hio_t* io) {
  482. hloop_t* loop = io->loop;
  483. return loop->start_ms + (io->last_write_hrtime - loop->start_hrtime) / 1000;
  484. }
  485. long hloop_pid(hloop_t* loop) {
  486. return loop->pid;
  487. }
  488. long hloop_tid(hloop_t* loop) {
  489. return loop->tid;
  490. }
  491. uint64_t hloop_count(hloop_t* loop) {
  492. return loop->loop_cnt;
  493. }
  494. uint32_t hloop_nios(hloop_t* loop) {
  495. return loop->nios;
  496. }
  497. uint32_t hloop_ntimers(hloop_t* loop) {
  498. return loop->ntimers;
  499. }
  500. uint32_t hloop_nidles(hloop_t* loop) {
  501. return loop->nidles;
  502. }
  503. uint32_t hloop_nactives(hloop_t* loop) {
  504. return loop->nactives;
  505. }
  506. void hloop_set_userdata(hloop_t* loop, void* userdata) {
  507. loop->userdata = userdata;
  508. }
  509. void* hloop_userdata(hloop_t* loop) {
  510. return loop->userdata;
  511. }
  512. hidle_t* hidle_add(hloop_t* loop, hidle_cb cb, uint32_t repeat) {
  513. hidle_t* idle;
  514. HV_ALLOC_SIZEOF(idle);
  515. idle->event_type = HEVENT_TYPE_IDLE;
  516. idle->priority = HEVENT_LOWEST_PRIORITY;
  517. idle->repeat = repeat;
  518. list_add(&idle->node, &loop->idles);
  519. EVENT_ADD(loop, idle, cb);
  520. loop->nidles++;
  521. return idle;
  522. }
  523. static void __hidle_del(hidle_t* idle) {
  524. if (idle->destroy) return;
  525. idle->destroy = 1;
  526. list_del(&idle->node);
  527. idle->loop->nidles--;
  528. }
  529. void hidle_del(hidle_t* idle) {
  530. if (!idle->active) return;
  531. __hidle_del(idle);
  532. EVENT_DEL(idle);
  533. }
  534. htimer_t* htimer_add(hloop_t* loop, htimer_cb cb, uint32_t timeout_ms, uint32_t repeat) {
  535. if (timeout_ms == 0) return NULL;
  536. htimeout_t* timer;
  537. HV_ALLOC_SIZEOF(timer);
  538. timer->event_type = HEVENT_TYPE_TIMEOUT;
  539. timer->priority = HEVENT_HIGHEST_PRIORITY;
  540. timer->repeat = repeat;
  541. timer->timeout = timeout_ms;
  542. hloop_update_time(loop);
  543. timer->next_timeout = loop->cur_hrtime + (uint64_t)timeout_ms * 1000;
  544. // NOTE: Limit granularity to 100ms
  545. if (timeout_ms >= 1000 && timeout_ms % 100 == 0) {
  546. timer->next_timeout = timer->next_timeout / 100000 * 100000;
  547. }
  548. heap_insert(&loop->timers, &timer->node);
  549. EVENT_ADD(loop, timer, cb);
  550. loop->ntimers++;
  551. return (htimer_t*)timer;
  552. }
  553. void htimer_reset(htimer_t* timer, uint32_t timeout_ms) {
  554. if (timer->event_type != HEVENT_TYPE_TIMEOUT) {
  555. return;
  556. }
  557. hloop_t* loop = timer->loop;
  558. htimeout_t* timeout = (htimeout_t*)timer;
  559. if (timer->destroy) {
  560. loop->ntimers++;
  561. } else {
  562. heap_remove(&loop->timers, &timer->node);
  563. }
  564. if (timer->repeat == 0) {
  565. timer->repeat = 1;
  566. }
  567. if (timeout_ms > 0) {
  568. timeout->timeout = timeout_ms;
  569. }
  570. timer->next_timeout = loop->cur_hrtime + (uint64_t)timeout->timeout * 1000;
  571. // NOTE: Limit granularity to 100ms
  572. if (timeout->timeout >= 1000 && timeout->timeout % 100 == 0) {
  573. timer->next_timeout = timer->next_timeout / 100000 * 100000;
  574. }
  575. heap_insert(&loop->timers, &timer->node);
  576. EVENT_RESET(timer);
  577. }
  578. htimer_t* htimer_add_period(hloop_t* loop, htimer_cb cb,
  579. int8_t minute, int8_t hour, int8_t day,
  580. int8_t week, int8_t month, uint32_t repeat) {
  581. if (minute > 59 || hour > 23 || day > 31 || week > 6 || month > 12) {
  582. return NULL;
  583. }
  584. hperiod_t* timer;
  585. HV_ALLOC_SIZEOF(timer);
  586. timer->event_type = HEVENT_TYPE_PERIOD;
  587. timer->priority = HEVENT_HIGH_PRIORITY;
  588. timer->repeat = repeat;
  589. timer->minute = minute;
  590. timer->hour = hour;
  591. timer->day = day;
  592. timer->month = month;
  593. timer->week = week;
  594. timer->next_timeout = (uint64_t)cron_next_timeout(minute, hour, day, week, month) * 1000000;
  595. heap_insert(&loop->realtimers, &timer->node);
  596. EVENT_ADD(loop, timer, cb);
  597. loop->ntimers++;
  598. return (htimer_t*)timer;
  599. }
  600. static void __htimer_del(htimer_t* timer) {
  601. if (timer->destroy) return;
  602. if (timer->event_type == HEVENT_TYPE_TIMEOUT) {
  603. heap_remove(&timer->loop->timers, &timer->node);
  604. } else if (timer->event_type == HEVENT_TYPE_PERIOD) {
  605. heap_remove(&timer->loop->realtimers, &timer->node);
  606. }
  607. timer->loop->ntimers--;
  608. timer->destroy = 1;
  609. }
  610. void htimer_del(htimer_t* timer) {
  611. if (!timer->active) return;
  612. __htimer_del(timer);
  613. EVENT_DEL(timer);
  614. }
  615. const char* hio_engine() {
  616. #ifdef EVENT_SELECT
  617. return "select";
  618. #elif defined(EVENT_POLL)
  619. return "poll";
  620. #elif defined(EVENT_EPOLL)
  621. return "epoll";
  622. #elif defined(EVENT_KQUEUE)
  623. return "kqueue";
  624. #elif defined(EVENT_IOCP)
  625. return "iocp";
  626. #elif defined(EVENT_PORT)
  627. return "evport";
  628. #else
  629. return "noevent";
  630. #endif
  631. }
  632. static inline hio_t* __hio_get(hloop_t* loop, int fd) {
  633. if (fd >= loop->ios.maxsize) {
  634. int newsize = ceil2e(fd);
  635. newsize = MAX(newsize, IO_ARRAY_INIT_SIZE);
  636. io_array_resize(&loop->ios, newsize > fd ? newsize : 2*fd);
  637. }
  638. return loop->ios.ptr[fd];
  639. }
  640. hio_t* hio_get(hloop_t* loop, int fd) {
  641. hio_t* io = __hio_get(loop, fd);
  642. if (io == NULL) {
  643. HV_ALLOC_SIZEOF(io);
  644. hio_init(io);
  645. io->event_type = HEVENT_TYPE_IO;
  646. io->loop = loop;
  647. io->fd = fd;
  648. loop->ios.ptr[fd] = io;
  649. }
  650. if (!io->ready) {
  651. hio_ready(io);
  652. }
  653. return io;
  654. }
  655. void hio_detach(hio_t* io) {
  656. hloop_t* loop = io->loop;
  657. int fd = io->fd;
  658. assert(loop != NULL && fd < loop->ios.maxsize);
  659. loop->ios.ptr[fd] = NULL;
  660. }
  661. void hio_attach(hloop_t* loop, hio_t* io) {
  662. int fd = io->fd;
  663. // NOTE: hio was not freed for reused when closed, but attached hio can't be reused,
  664. // so we need to free it if fd exists to avoid memory leak.
  665. hio_t* preio = __hio_get(loop, fd);
  666. if (preio != NULL && preio != io) {
  667. hio_free(preio);
  668. }
  669. io->loop = loop;
  670. // NOTE: use new_loop readbuf
  671. hio_use_loop_readbuf(io);
  672. loop->ios.ptr[fd] = io;
  673. }
  674. bool hio_exists(hloop_t* loop, int fd) {
  675. if (fd >= loop->ios.maxsize) {
  676. return false;
  677. }
  678. return loop->ios.ptr[fd] != NULL;
  679. }
  680. int hio_add(hio_t* io, hio_cb cb, int events) {
  681. printd("hio_add fd=%d io->events=%d events=%d\n", io->fd, io->events, events);
  682. #ifdef OS_WIN
  683. // Windows iowatcher not work on stdio
  684. if (io->fd < 3) return -1;
  685. #endif
  686. hloop_t* loop = io->loop;
  687. if (!io->active) {
  688. EVENT_ADD(loop, io, cb);
  689. loop->nios++;
  690. }
  691. if (!io->ready) {
  692. hio_ready(io);
  693. }
  694. if (cb) {
  695. io->cb = (hevent_cb)cb;
  696. }
  697. if (!(io->events & events)) {
  698. iowatcher_add_event(loop, io->fd, events);
  699. io->events |= events;
  700. }
  701. return 0;
  702. }
  703. int hio_del(hio_t* io, int events) {
  704. printd("hio_del fd=%d io->events=%d events=%d\n", io->fd, io->events, events);
  705. #ifdef OS_WIN
  706. // Windows iowatcher not work on stdio
  707. if (io->fd < 3) return -1;
  708. #endif
  709. if (!io->active) return -1;
  710. if (io->events & events) {
  711. iowatcher_del_event(io->loop, io->fd, events);
  712. io->events &= ~events;
  713. }
  714. if (io->events == 0) {
  715. io->loop->nios--;
  716. // NOTE: not EVENT_DEL, avoid free
  717. EVENT_INACTIVE(io);
  718. }
  719. return 0;
  720. }
  721. static void hio_close_event_cb(hevent_t* ev) {
  722. hio_t* io = (hio_t*)ev->userdata;
  723. uint32_t id = (uintptr_t)ev->privdata;
  724. if (io->id != id) return;
  725. hio_close(io);
  726. }
  727. int hio_close_async(hio_t* io) {
  728. hevent_t ev;
  729. memset(&ev, 0, sizeof(ev));
  730. ev.cb = hio_close_event_cb;
  731. ev.userdata = io;
  732. ev.privdata = (void*)(uintptr_t)io->id;
  733. hloop_post_event(io->loop, &ev);
  734. return 0;
  735. }
  736. //------------------high-level apis-------------------------------------------
  737. hio_t* hread(hloop_t* loop, int fd, void* buf, size_t len, hread_cb read_cb) {
  738. hio_t* io = hio_get(loop, fd);
  739. assert(io != NULL);
  740. if (buf && len) {
  741. io->readbuf.base = (char*)buf;
  742. io->readbuf.len = len;
  743. }
  744. if (read_cb) {
  745. io->read_cb = read_cb;
  746. }
  747. hio_read(io);
  748. return io;
  749. }
  750. hio_t* hwrite(hloop_t* loop, int fd, const void* buf, size_t len, hwrite_cb write_cb) {
  751. hio_t* io = hio_get(loop, fd);
  752. assert(io != NULL);
  753. if (write_cb) {
  754. io->write_cb = write_cb;
  755. }
  756. hio_write(io, buf, len);
  757. return io;
  758. }
  759. hio_t* haccept(hloop_t* loop, int listenfd, haccept_cb accept_cb) {
  760. hio_t* io = hio_get(loop, listenfd);
  761. assert(io != NULL);
  762. if (accept_cb) {
  763. io->accept_cb = accept_cb;
  764. }
  765. if (hio_accept(io) != 0) return NULL;
  766. return io;
  767. }
  768. hio_t* hconnect (hloop_t* loop, int connfd, hconnect_cb connect_cb) {
  769. hio_t* io = hio_get(loop, connfd);
  770. assert(io != NULL);
  771. if (connect_cb) {
  772. io->connect_cb = connect_cb;
  773. }
  774. if (hio_connect(io) != 0) return NULL;
  775. return io;
  776. }
  777. void hclose (hloop_t* loop, int fd) {
  778. hio_t* io = hio_get(loop, fd);
  779. assert(io != NULL);
  780. hio_close(io);
  781. }
  782. hio_t* hrecv (hloop_t* loop, int connfd, void* buf, size_t len, hread_cb read_cb) {
  783. //hio_t* io = hio_get(loop, connfd);
  784. //assert(io != NULL);
  785. //io->recv = 1;
  786. //if (io->io_type != HIO_TYPE_SSL) {
  787. //io->io_type = HIO_TYPE_TCP;
  788. //}
  789. return hread(loop, connfd, buf, len, read_cb);
  790. }
  791. hio_t* hsend (hloop_t* loop, int connfd, const void* buf, size_t len, hwrite_cb write_cb) {
  792. //hio_t* io = hio_get(loop, connfd);
  793. //assert(io != NULL);
  794. //io->send = 1;
  795. //if (io->io_type != HIO_TYPE_SSL) {
  796. //io->io_type = HIO_TYPE_TCP;
  797. //}
  798. return hwrite(loop, connfd, buf, len, write_cb);
  799. }
  800. hio_t* hrecvfrom (hloop_t* loop, int sockfd, void* buf, size_t len, hread_cb read_cb) {
  801. //hio_t* io = hio_get(loop, sockfd);
  802. //assert(io != NULL);
  803. //io->recvfrom = 1;
  804. //io->io_type = HIO_TYPE_UDP;
  805. return hread(loop, sockfd, buf, len, read_cb);
  806. }
  807. hio_t* hsendto (hloop_t* loop, int sockfd, const void* buf, size_t len, hwrite_cb write_cb) {
  808. //hio_t* io = hio_get(loop, sockfd);
  809. //assert(io != NULL);
  810. //io->sendto = 1;
  811. //io->io_type = HIO_TYPE_UDP;
  812. return hwrite(loop, sockfd, buf, len, write_cb);
  813. }
  814. //-----------------top-level apis---------------------------------------------
  815. hio_t* hio_create_socket(hloop_t* loop, const char* host, int port, hio_type_e type, hio_side_e side) {
  816. int sock_type = (type & HIO_TYPE_SOCK_STREAM) ? SOCK_STREAM :
  817. (type & HIO_TYPE_SOCK_DGRAM) ? SOCK_DGRAM :
  818. (type & HIO_TYPE_SOCK_RAW) ? SOCK_RAW : -1;
  819. if (sock_type == -1) return NULL;
  820. sockaddr_u addr;
  821. memset(&addr, 0, sizeof(addr));
  822. int ret = -1;
  823. #ifdef ENABLE_UDS
  824. if (port < 0) {
  825. sockaddr_set_path(&addr, host);
  826. ret = 0;
  827. }
  828. #endif
  829. if (port >= 0) {
  830. ret = sockaddr_set_ipport(&addr, host, port);
  831. }
  832. if (ret != 0) {
  833. // fprintf(stderr, "unknown host: %s\n", host);
  834. return NULL;
  835. }
  836. int sockfd = socket(addr.sa.sa_family, sock_type, 0);
  837. if (sockfd < 0) {
  838. perror("socket");
  839. return NULL;
  840. }
  841. hio_t* io = NULL;
  842. if (side == HIO_SERVER_SIDE) {
  843. #ifdef OS_UNIX
  844. so_reuseaddr(sockfd, 1);
  845. // so_reuseport(sockfd, 1);
  846. #endif
  847. if (addr.sa.sa_family == AF_INET6) {
  848. ip_v6only(sockfd, 0);
  849. }
  850. if (bind(sockfd, &addr.sa, sockaddr_len(&addr)) < 0) {
  851. perror("bind");
  852. closesocket(sockfd);
  853. return NULL;
  854. }
  855. if (sock_type == SOCK_STREAM) {
  856. if (listen(sockfd, SOMAXCONN) < 0) {
  857. perror("listen");
  858. closesocket(sockfd);
  859. return NULL;
  860. }
  861. }
  862. }
  863. io = hio_get(loop, sockfd);
  864. assert(io != NULL);
  865. io->io_type = type;
  866. if (side == HIO_SERVER_SIDE) {
  867. hio_set_localaddr(io, &addr.sa, sockaddr_len(&addr));
  868. io->priority = HEVENT_HIGH_PRIORITY;
  869. } else {
  870. hio_set_peeraddr(io, &addr.sa, sockaddr_len(&addr));
  871. }
  872. return io;
  873. }
  874. hio_t* hloop_create_tcp_server (hloop_t* loop, const char* host, int port, haccept_cb accept_cb) {
  875. hio_t* io = hio_create_socket(loop, host, port, HIO_TYPE_TCP, HIO_SERVER_SIDE);
  876. if (io == NULL) return NULL;
  877. hio_setcb_accept(io, accept_cb);
  878. if (hio_accept(io) != 0) return NULL;
  879. return io;
  880. }
  881. hio_t* hloop_create_tcp_client (hloop_t* loop, const char* host, int port, hconnect_cb connect_cb, hclose_cb close_cb) {
  882. hio_t* io = hio_create_socket(loop, host, port, HIO_TYPE_TCP, HIO_CLIENT_SIDE);
  883. if (io == NULL) return NULL;
  884. hio_setcb_connect(io, connect_cb);
  885. hio_setcb_close(io, close_cb);
  886. if (hio_connect(io) != 0) return NULL;
  887. return io;
  888. }
  889. hio_t* hloop_create_ssl_server (hloop_t* loop, const char* host, int port, haccept_cb accept_cb) {
  890. hio_t* io = hio_create_socket(loop, host, port, HIO_TYPE_SSL, HIO_SERVER_SIDE);
  891. if (io == NULL) return NULL;
  892. hio_setcb_accept(io, accept_cb);
  893. if (hio_accept(io) != 0) return NULL;
  894. return io;
  895. }
  896. hio_t* hloop_create_ssl_client (hloop_t* loop, const char* host, int port, hconnect_cb connect_cb, hclose_cb close_cb) {
  897. hio_t* io = hio_create_socket(loop, host, port, HIO_TYPE_SSL, HIO_CLIENT_SIDE);
  898. if (io == NULL) return NULL;
  899. hio_setcb_connect(io, connect_cb);
  900. hio_setcb_close(io, close_cb);
  901. if (hio_connect(io) != 0) return NULL;
  902. return io;
  903. }
  904. hio_t* hloop_create_udp_server(hloop_t* loop, const char* host, int port) {
  905. return hio_create_socket(loop, host, port, HIO_TYPE_UDP, HIO_SERVER_SIDE);
  906. }
  907. hio_t* hloop_create_udp_client(hloop_t* loop, const char* host, int port) {
  908. return hio_create_socket(loop, host, port, HIO_TYPE_UDP, HIO_CLIENT_SIDE);
  909. }