1
0

kqueue.c 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. #include "iowatcher.h"
  2. #ifdef EVENT_KQUEUE
  3. #include "hplatform.h"
  4. #include "hdef.h"
  5. #include <sys/event.h>
  6. #include "hevent.h"
  7. #define EVENTS_INIT_SIZE 64
  8. #define READ_INDEX 0
  9. #define WRITE_INDEX 1
  10. #define EVENT_INDEX(type) ((type == EVFILT_READ) ? READ_INDEX : WRITE_INDEX)
  11. typedef struct kqueue_ctx_s {
  12. int kqfd;
  13. int capacity;
  14. int nchanges;
  15. struct kevent* changes;
  16. //int nevents; // nevents == nchanges
  17. struct kevent* events;
  18. } kqueue_ctx_t;
  19. static void kqueue_ctx_resize(kqueue_ctx_t* kqueue_ctx, int size) {
  20. int bytes = sizeof(struct kevent) * size;
  21. int oldbytes = sizeof(struct kevent) * kqueue_ctx->capacity;
  22. kqueue_ctx->changes = (struct kevent*)safe_realloc(kqueue_ctx->changes, bytes, oldbytes);
  23. kqueue_ctx->events = (struct kevent*)safe_realloc(kqueue_ctx->events, bytes, oldbytes);
  24. kqueue_ctx->capacity = size;
  25. }
  26. int iowatcher_init(hloop_t* loop) {
  27. if (loop->iowatcher) return 0;
  28. kqueue_ctx_t* kqueue_ctx;
  29. HV_ALLOC_SIZEOF(kqueue_ctx);
  30. kqueue_ctx->kqfd = kqueue();
  31. kqueue_ctx->capacity = EVENTS_INIT_SIZE;
  32. kqueue_ctx->nchanges = 0;
  33. int bytes = sizeof(struct kevent) * kqueue_ctx->capacity;
  34. HV_ALLOC(kqueue_ctx->changes, bytes);
  35. HV_ALLOC(kqueue_ctx->events, bytes);
  36. loop->iowatcher = kqueue_ctx;
  37. return 0;
  38. }
  39. int iowatcher_cleanup(hloop_t* loop) {
  40. if (loop->iowatcher == NULL) return 0;
  41. kqueue_ctx_t* kqueue_ctx = (kqueue_ctx_t*)loop->iowatcher;
  42. close(kqueue_ctx->kqfd);
  43. HV_FREE(kqueue_ctx->changes);
  44. HV_FREE(kqueue_ctx->events);
  45. HV_FREE(loop->iowatcher);
  46. return 0;
  47. }
  48. static int __add_event(hloop_t* loop, int fd, int event) {
  49. if (loop->iowatcher == NULL) {
  50. iowatcher_init(loop);
  51. }
  52. kqueue_ctx_t* kqueue_ctx = (kqueue_ctx_t*)loop->iowatcher;
  53. hio_t* io = loop->ios.ptr[fd];
  54. int idx = io->event_index[EVENT_INDEX(event)];
  55. if (idx < 0) {
  56. io->event_index[EVENT_INDEX(event)] = idx = kqueue_ctx->nchanges;
  57. kqueue_ctx->nchanges++;
  58. if (idx == kqueue_ctx->capacity) {
  59. kqueue_ctx_resize(kqueue_ctx, kqueue_ctx->capacity*2);
  60. }
  61. memset(kqueue_ctx->changes+idx, 0, sizeof(struct kevent));
  62. kqueue_ctx->changes[idx].ident = fd;
  63. }
  64. assert(kqueue_ctx->changes[idx].ident == fd);
  65. kqueue_ctx->changes[idx].filter = event;
  66. kqueue_ctx->changes[idx].flags = EV_ADD|EV_ENABLE;
  67. struct timespec ts;
  68. ts.tv_sec = 0;
  69. ts.tv_nsec = 0;
  70. kevent(kqueue_ctx->kqfd, kqueue_ctx->changes, kqueue_ctx->nchanges, NULL, 0, &ts);
  71. return 0;
  72. }
  73. int iowatcher_add_event(hloop_t* loop, int fd, int events) {
  74. if (events & HV_READ) {
  75. __add_event(loop, fd, EVFILT_READ);
  76. }
  77. if (events & HV_WRITE) {
  78. __add_event(loop, fd, EVFILT_WRITE);
  79. }
  80. return 0;
  81. }
  82. static int __del_event(hloop_t* loop, int fd, int event) {
  83. kqueue_ctx_t* kqueue_ctx = (kqueue_ctx_t*)loop->iowatcher;
  84. if (kqueue_ctx == NULL) return 0;
  85. hio_t* io = loop->ios.ptr[fd];
  86. int idx = io->event_index[EVENT_INDEX(event)];
  87. if (idx < 0) return 0;
  88. assert(kqueue_ctx->changes[idx].ident == fd);
  89. kqueue_ctx->changes[idx].flags = EV_DELETE;
  90. io->event_index[EVENT_INDEX(event)] = -1;
  91. int lastidx = kqueue_ctx->nchanges - 1;
  92. if (idx < lastidx) {
  93. // swap
  94. struct kevent tmp;
  95. tmp = kqueue_ctx->changes[idx];
  96. kqueue_ctx->changes[idx] = kqueue_ctx->changes[lastidx];
  97. kqueue_ctx->changes[lastidx] = tmp;
  98. hio_t* last = loop->ios.ptr[kqueue_ctx->changes[idx].ident];
  99. if (last) {
  100. last->event_index[EVENT_INDEX(kqueue_ctx->changes[idx].filter)] = idx;
  101. }
  102. }
  103. struct timespec ts;
  104. ts.tv_sec = 0;
  105. ts.tv_nsec = 0;
  106. kevent(kqueue_ctx->kqfd, kqueue_ctx->changes, kqueue_ctx->nchanges, NULL, 0, &ts);
  107. kqueue_ctx->nchanges--;
  108. return 0;
  109. }
  110. int iowatcher_del_event(hloop_t* loop, int fd, int events) {
  111. if (events & HV_READ) {
  112. __del_event(loop, fd, EVFILT_READ);
  113. }
  114. if (events & HV_WRITE) {
  115. __del_event(loop, fd, EVFILT_WRITE);
  116. }
  117. return 0;
  118. }
  119. int iowatcher_poll_events(hloop_t* loop, int timeout) {
  120. kqueue_ctx_t* kqueue_ctx = (kqueue_ctx_t*)loop->iowatcher;
  121. if (kqueue_ctx == NULL) return 0;
  122. if (kqueue_ctx->nchanges == 0) return 0;
  123. struct timespec ts, *tp;
  124. if (timeout == INFINITE) {
  125. tp = NULL;
  126. }
  127. else {
  128. ts.tv_sec = timeout / 1000;
  129. ts.tv_nsec = (timeout % 1000) * 1000000;
  130. tp = &ts;
  131. }
  132. int nkqueue = kevent(kqueue_ctx->kqfd, kqueue_ctx->changes, kqueue_ctx->nchanges, kqueue_ctx->events, kqueue_ctx->nchanges, tp);
  133. if (nkqueue < 0) {
  134. perror("kevent");
  135. return nkqueue;
  136. }
  137. if (nkqueue == 0) return 0;
  138. int nevents = 0;
  139. for (int i = 0; i < nkqueue; ++i) {
  140. if (kqueue_ctx->events[i].flags & EV_ERROR) {
  141. continue;
  142. }
  143. ++nevents;
  144. int fd = kqueue_ctx->events[i].ident;
  145. int revents = kqueue_ctx->events[i].filter;
  146. hio_t* io = loop->ios.ptr[fd];
  147. if (io) {
  148. if (revents & EVFILT_READ) {
  149. io->revents |= HV_READ;
  150. }
  151. if (revents & EVFILT_WRITE) {
  152. io->revents |= HV_WRITE;
  153. }
  154. EVENT_PENDING(io);
  155. }
  156. if (nevents == nkqueue) break;
  157. }
  158. return nevents;
  159. }
  160. #endif