hloop.c 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316
  1. #include "hloop.h"
  2. #include "hevent.h"
  3. #include "iowatcher.h"
  4. #include "hdef.h"
  5. #include "hlog.h"
  6. #include "hmath.h"
  7. #define PAUSE_TIME 10 // ms
  8. #define MIN_BLOCK_TIME 1 // ms
  9. #define MAX_BLOCK_TIME 10000 // ms
  10. #define IO_ARRAY_INIT_SIZE 64
  11. static int timer_minheap_compare(const struct heap_node* lhs, const struct heap_node* rhs) {
  12. return TIMER_HEAP_ENTRY(lhs)->timeout < TIMER_HEAP_ENTRY(rhs)->timeout;
  13. }
  14. static int hloop_process_idles(hloop_t* loop) {
  15. int nidles = 0;
  16. struct list_node* node = loop->idles.next;
  17. hidle_t* idle = NULL;
  18. while (node != &loop->idles) {
  19. idle = IDLE_ENTRY(node);
  20. if (idle->destroy) goto destroy;
  21. if (!idle->active) goto next;
  22. if (idle->repeat == 0) {
  23. hidle_del(idle);
  24. //goto next;
  25. goto destroy;
  26. }
  27. if (idle->repeat != INFINITE) {
  28. --idle->repeat;
  29. }
  30. EVENT_PENDING(idle);
  31. ++nidles;
  32. next:
  33. node = node->next;
  34. continue;
  35. destroy:
  36. node = node->next;
  37. list_del(node->prev);
  38. free(idle);
  39. }
  40. return nidles;
  41. }
  42. static int hloop_process_timers(hloop_t* loop) {
  43. int ntimers = 0;
  44. struct list_node* node = loop->timers.next;
  45. htimer_t* timer = NULL;
  46. while (node != &loop->timers) {
  47. timer = TIMER_ENTRY(node);
  48. if (timer->destroy) goto destroy;
  49. if (!timer->active) goto next;
  50. if (timer->repeat == 0) {
  51. htimer_del(timer);
  52. //goto next;
  53. goto destroy;
  54. }
  55. if (loop->cur_hrtime > timer->next_timeout) {
  56. if (timer->repeat != INFINITE) {
  57. --timer->repeat;
  58. }
  59. timer->next_timeout += timer->timeout*1000;
  60. EVENT_PENDING(timer);
  61. ++ntimers;
  62. }
  63. next:
  64. node = node->next;
  65. continue;
  66. destroy:
  67. node = node->next;
  68. list_del(node->prev);
  69. free(timer);
  70. }
  71. return ntimers;
  72. }
  73. static int hloop_process_ios(hloop_t* loop, int timeout) {
  74. int nevents = iowatcher_poll_events(loop, timeout);
  75. if (nevents < 0) {
  76. hloge("poll_events error=%d", -nevents);
  77. }
  78. return nevents < 0 ? 0 : nevents;
  79. }
  80. static int hloop_process_pendings(hloop_t* loop) {
  81. if (loop->npendings == 0) return 0;
  82. hevent_t* prev = NULL;
  83. hevent_t* next = NULL;
  84. int ncbs = 0;
  85. for (int i = HEVENT_PRIORITY_SIZE-1; i >= 0; --i) {
  86. next = loop->pendings[i];
  87. while (next) {
  88. if (next->active && next->cb) {
  89. next->cb(next);
  90. ++ncbs;
  91. }
  92. prev = next;
  93. next = next->pending_next;
  94. prev->pending = 0;
  95. prev->pending_next = NULL;
  96. }
  97. loop->pendings[i] = NULL;
  98. }
  99. loop->npendings = 0;
  100. return ncbs;
  101. }
  102. static int hloop_process_events(hloop_t* loop) {
  103. // ios -> timers -> idles
  104. int nios, ntimers, nidles;
  105. nios = ntimers = nidles = 0;
  106. int blocktime = MAX_BLOCK_TIME;
  107. if (loop->timer_minheap.root) {
  108. // if have timers, blocktime = min_timeout
  109. blocktime = TIMER_HEAP_ENTRY(loop->timer_minheap.root)->timeout;
  110. //if (!list_empty(&loop->idles))
  111. blocktime /= 10;
  112. }
  113. blocktime = LIMIT(MIN_BLOCK_TIME, blocktime, MAX_BLOCK_TIME);
  114. // if you want timer more precise, reduce blocktime
  115. uint64_t last_hrtime = loop->cur_hrtime;
  116. nios = hloop_process_ios(loop, blocktime);
  117. hloop_update_time(loop);
  118. ntimers = hloop_process_timers(loop);
  119. if (loop->npendings == 0) {
  120. loop->idle_time += last_hrtime - loop->cur_hrtime;
  121. // avoid frequent call idles
  122. if (loop->cur_hrtime - loop->last_idle_hrtime > 1e6) {
  123. loop->last_idle_hrtime = loop->cur_hrtime;
  124. nidles= hloop_process_idles(loop);
  125. }
  126. else {
  127. // hloop_process_ios maybe nonblock, so sleep here
  128. msleep(blocktime);
  129. }
  130. }
  131. printd("blocktime=%d nios=%d ntimers=%d nidles=%d nactives=%d npendings=%d\n", blocktime, nios, ntimers, nidles, loop->nactives, loop->npendings);
  132. return hloop_process_pendings(loop);
  133. }
  134. int hloop_init(hloop_t* loop) {
  135. memset(loop, 0, sizeof(hloop_t));
  136. loop->status = HLOOP_STATUS_STOP;
  137. // idles
  138. list_init(&loop->idles);
  139. // timers
  140. list_init(&loop->timers);
  141. heap_init(&loop->timer_minheap, timer_minheap_compare);
  142. // iowatcher
  143. //iowatcher_init(loop);
  144. return 0;
  145. }
  146. void hloop_cleanup(hloop_t* loop) {
  147. // pendings
  148. for (int i = 0; i < HEVENT_PRIORITY_SIZE; ++i) {
  149. loop->pendings[i] = NULL;
  150. }
  151. // idles
  152. struct list_node* node = loop->idles.next;
  153. hidle_t* idle;
  154. while (node != &loop->idles) {
  155. idle = IDLE_ENTRY(node);
  156. node = node->next;
  157. free(idle);
  158. }
  159. list_init(&loop->idles);
  160. // timers
  161. node = loop->timers.next;
  162. htimer_t* timer;
  163. while (node != &loop->timers) {
  164. timer = TIMER_ENTRY(node);
  165. node = node->next;
  166. free(timer);
  167. }
  168. list_init(&loop->timers);
  169. heap_init(&loop->timer_minheap, NULL);
  170. // iowatcher
  171. //iowatcher_cleanup(loop);
  172. };
  173. int hloop_run(hloop_t* loop) {
  174. time(&loop->start_time);
  175. loop->start_hrtime = gethrtime();
  176. loop->loop_cnt = 0;
  177. loop->status = HLOOP_STATUS_RUNNING;
  178. while (loop->status != HLOOP_STATUS_STOP) {
  179. if (loop->status == HLOOP_STATUS_PAUSE) {
  180. msleep(PAUSE_TIME);
  181. hloop_update_time(loop);
  182. continue;
  183. }
  184. ++loop->loop_cnt;
  185. if (loop->nactives == 0) break;
  186. hloop_process_events(loop);
  187. }
  188. loop->status = HLOOP_STATUS_STOP;
  189. loop->end_hrtime = gethrtime();
  190. hloop_cleanup(loop);
  191. return 0;
  192. };
  193. int hloop_stop(hloop_t* loop) {
  194. loop->status = HLOOP_STATUS_STOP;
  195. return 0;
  196. }
  197. int hloop_pause(hloop_t* loop) {
  198. if (loop->status == HLOOP_STATUS_RUNNING) {
  199. loop->status = HLOOP_STATUS_PAUSE;
  200. }
  201. return 0;
  202. }
  203. int hloop_resume(hloop_t* loop) {
  204. if (loop->status == HLOOP_STATUS_PAUSE) {
  205. loop->status = HLOOP_STATUS_RUNNING;
  206. }
  207. return 0;
  208. }
  209. hidle_t* hidle_add(hloop_t* loop, hidle_cb cb, uint32_t repeat) {
  210. hidle_t* idle = (hidle_t*)malloc(sizeof(hidle_t));
  211. memset(idle, 0, sizeof(hidle_t));
  212. idle->event_type = HEVENT_TYPE_IDLE;
  213. idle->repeat = repeat;
  214. list_add(&idle->node, &loop->idles);
  215. EVENT_ADD(loop, idle, cb);
  216. return idle;
  217. }
  218. void hidle_del(hidle_t* idle) {
  219. EVENT_DEL(idle);
  220. }
  221. htimer_t* htimer_add(hloop_t* loop, htimer_cb cb, uint64_t timeout, uint32_t repeat) {
  222. htimer_t* timer = (htimer_t*)malloc(sizeof(htimer_t));
  223. memset(timer, 0, sizeof(htimer_t));
  224. timer->event_type = HEVENT_TYPE_TIMER;
  225. timer->repeat = repeat;
  226. timer->timeout = timeout;
  227. timer->next_timeout = gethrtime() + timeout*1000;
  228. list_add(&timer->node, &loop->timers);
  229. heap_insert(&loop->timer_minheap, &timer->hnode);
  230. EVENT_ADD(loop, timer, cb);
  231. return timer;
  232. }
  233. void htimer_del(htimer_t* timer) {
  234. heap_remove(&timer->loop->timer_minheap, &timer->hnode);
  235. EVENT_DEL(timer);
  236. }
  237. void hio_init(hio_t* io) {
  238. memset(io, 0, sizeof(hio_t));
  239. io->event_type = HEVENT_TYPE_IO;
  240. io->event_index[0] = io->event_index[1] = -1;
  241. // move to hwrite
  242. //write_queue_init(&io->write_queue, 4);;
  243. };
  244. void hio_cleanup(hio_t* io) {
  245. offset_buf_t* pbuf = NULL;
  246. while (!write_queue_empty(&io->write_queue)) {
  247. pbuf = write_queue_front(&io->write_queue);
  248. SAFE_FREE(pbuf->base);
  249. write_queue_pop_front(&io->write_queue);
  250. }
  251. write_queue_cleanup(&io->write_queue);
  252. }
  253. hio_t* hio_add(hloop_t* loop, hio_cb cb, int fd, int events) {
  254. if (loop->ios.maxsize == 0) {
  255. io_array_init(&loop->ios, IO_ARRAY_INIT_SIZE);
  256. }
  257. if (fd > loop->ios.maxsize) {
  258. io_array_resize(&loop->ios, ceil2e(fd));
  259. }
  260. hio_t* io = loop->ios.ptr[fd];
  261. if (io == NULL) {
  262. io = (hio_t*)malloc(sizeof(hio_t));
  263. memset(io, 0, sizeof(hio_t));
  264. loop->ios.ptr[fd] = io;
  265. }
  266. if (!io->active || io->destroy) {
  267. hio_init(io);
  268. EVENT_ADD(loop, io, cb);
  269. }
  270. io->fd = fd;
  271. if (cb) {
  272. io->cb = (hevent_cb)cb;
  273. }
  274. iowatcher_add_event(loop, fd, events);
  275. io->events |= events;
  276. return io;
  277. }
  278. void hio_del(hio_t* io, int events) {
  279. iowatcher_del_event(io->loop, io->fd, events);
  280. io->events &= ~events;
  281. if (io->events == 0) {
  282. hio_cleanup(io);
  283. EVENT_DEL(io);
  284. }
  285. }