kqueue.c 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. #include "iowatcher.h"
  2. #ifdef EVENT_KQUEUE
  3. #include "hplatform.h"
  4. #include "hdef.h"
  5. #include <sys/event.h>
  6. #include "hevent.h"
  7. #define EVENTS_INIT_SIZE 64
  8. #define READ_INDEX 0
  9. #define WRITE_INDEX 1
  10. #define EVENT_INDEX(type) ((type == EVFILT_READ) ? READ_INDEX : WRITE_INDEX)
  11. typedef struct kqueue_ctx_s {
  12. int kqfd;
  13. int capacity;
  14. int nchanges;
  15. struct kevent* changes;
  16. //int nevents; // nevents == nchanges
  17. struct kevent* events;
  18. } kqueue_ctx_t;
  19. static void kqueue_ctx_resize(kqueue_ctx_t* kqueue_ctx, int size) {
  20. int bytes = sizeof(struct kevent) * size;
  21. kqueue_ctx->changes = (struct kevent*)safe_realloc(kqueue_ctx->changes, bytes);
  22. kqueue_ctx->events = (struct kevent*)safe_realloc(kqueue_ctx->events, bytes);
  23. kqueue_ctx->capacity = size;
  24. }
  25. int iowatcher_init(hloop_t* loop) {
  26. if (loop->iowatcher) return 0;
  27. kqueue_ctx_t* kqueue_ctx;
  28. SAFE_ALLOC_SIZEOF(kqueue_ctx);
  29. kqueue_ctx->kqfd = kqueue();
  30. kqueue_ctx->capacity = EVENTS_INIT_SIZE;
  31. kqueue_ctx->nchanges = 0;
  32. int bytes = sizeof(struct kevent) * kqueue_ctx->capacity;
  33. SAFE_ALLOC(kqueue_ctx->changes, bytes);
  34. SAFE_ALLOC(kqueue_ctx->events, bytes);
  35. loop->iowatcher = kqueue_ctx;
  36. return 0;
  37. }
  38. int iowatcher_cleanup(hloop_t* loop) {
  39. if (loop->iowatcher == NULL) return 0;
  40. kqueue_ctx_t* kqueue_ctx = (kqueue_ctx_t*)loop->iowatcher;
  41. close(kqueue_ctx->kqfd);
  42. SAFE_FREE(kqueue_ctx->changes);
  43. SAFE_FREE(kqueue_ctx->events);
  44. SAFE_FREE(loop->iowatcher);
  45. return 0;
  46. }
  47. static int __add_event(hloop_t* loop, int fd, int event) {
  48. if (loop->iowatcher == NULL) {
  49. iowatcher_init(loop);
  50. }
  51. kqueue_ctx_t* kqueue_ctx = (kqueue_ctx_t*)loop->iowatcher;
  52. hio_t* io = loop->ios.ptr[fd];
  53. int idx = io->event_index[EVENT_INDEX(event)];
  54. if (idx < 0) {
  55. io->event_index[EVENT_INDEX(event)] = idx = kqueue_ctx->nchanges;
  56. kqueue_ctx->nchanges++;
  57. if (idx == kqueue_ctx->capacity) {
  58. kqueue_ctx_resize(kqueue_ctx, kqueue_ctx->capacity*2);
  59. }
  60. memset(kqueue_ctx->changes+idx, 0, sizeof(struct kevent));
  61. kqueue_ctx->changes[idx].ident = fd;
  62. }
  63. assert(kqueue_ctx->changes[idx].ident == fd);
  64. kqueue_ctx->changes[idx].filter = event;
  65. kqueue_ctx->changes[idx].flags = EV_ADD|EV_ENABLE;
  66. struct timespec ts;
  67. ts.tv_sec = 0;
  68. ts.tv_nsec = 0;
  69. kevent(kqueue_ctx->kqfd, kqueue_ctx->changes, kqueue_ctx->nchanges, NULL, 0, &ts);
  70. return 0;
  71. }
  72. int iowatcher_add_event(hloop_t* loop, int fd, int events) {
  73. if (events & READ_EVENT) {
  74. __add_event(loop, fd, EVFILT_READ);
  75. }
  76. if (events & WRITE_EVENT) {
  77. __add_event(loop, fd, EVFILT_WRITE);
  78. }
  79. return 0;
  80. }
  81. static int __del_event(hloop_t* loop, int fd, int event) {
  82. kqueue_ctx_t* kqueue_ctx = (kqueue_ctx_t*)loop->iowatcher;
  83. if (kqueue_ctx == NULL) return 0;
  84. hio_t* io = loop->ios.ptr[fd];
  85. int idx = io->event_index[EVENT_INDEX(event)];
  86. if (idx < 0) return 0;
  87. assert(kqueue_ctx->changes[idx].ident == fd);
  88. kqueue_ctx->changes[idx].flags = EV_DELETE;
  89. io->event_index[EVENT_INDEX(event)] = -1;
  90. int lastidx = kqueue_ctx->nchanges - 1;
  91. if (idx < lastidx) {
  92. // swap
  93. struct kevent tmp;
  94. tmp = kqueue_ctx->changes[idx];
  95. kqueue_ctx->changes[idx] = kqueue_ctx->changes[lastidx];
  96. kqueue_ctx->changes[lastidx] = tmp;
  97. hio_t* last = kqueue_ctx->changes[idx].ident;
  98. if (last) {
  99. last->event_index[EVENT_INDEX(kqueue_ctx->changes[idx].filter)] = idx;
  100. }
  101. }
  102. struct timespec ts;
  103. ts.tv_sec = 0;
  104. ts.tv_nsec = 0;
  105. kevent(kqueue_ctx->kqfd, kqueue_ctx->changes, kqueue_ctx->nchanges, NULL, 0, &ts);
  106. kqueue_ctx->nchanges--;
  107. return 0;
  108. }
  109. int iowatcher_del_event(hloop_t* loop, int fd, int events) {
  110. if (events & READ_EVENT) {
  111. __del_event(loop, fd, EVFILT_READ);
  112. }
  113. if (events & WRITE_EVENT) {
  114. __del_event(loop, fd, EVFILT_WRITE);
  115. }
  116. return 0;
  117. }
  118. int iowatcher_poll_events(hloop_t* loop, int timeout) {
  119. kqueue_ctx_t* kqueue_ctx = (kqueue_ctx_t*)loop->iowatcher;
  120. if (kqueue_ctx == NULL) return 0;
  121. if (kqueue_ctx->nchanges == 0) return 0;
  122. struct timespec ts, *tp;
  123. if (timeout == INFINITE) {
  124. tp = NULL;
  125. }
  126. else {
  127. ts.tv_sec = timeout / 1000;
  128. ts.tv_nsec = (timeout % 1000) * 1000000;
  129. tp = &ts;
  130. }
  131. int nkqueue = kevent(kqueue_ctx->kqfd, kqueue_ctx->changes, kqueue_ctx->nchanges, kqueue_ctx->events, kqueue_ctx->nchanges, tp);
  132. if (nkqueue < 0) {
  133. perror("kevent");
  134. return nkqueue;
  135. }
  136. if (nkqueue == 0) return 0;
  137. int nevents = 0;
  138. for (int i = 0; i < nkqueue; ++i) {
  139. if (kqueue_ctx->events[i].flags & EV_ERROR) {
  140. continue;
  141. }
  142. ++nevents;
  143. int fd = kqueue_ctx->events[i].ident;
  144. int revents = kqueue_ctx->events[i].filter;
  145. hio_t* io = loop->ios.ptr[fd];
  146. if (io) {
  147. if (revents | EVFILT_READ) {
  148. io->revents |= EVFILT_READ;
  149. }
  150. if (revents | EVFILT_WRITE) {
  151. io->revents |= EVFILT_WRITE;
  152. }
  153. EVENT_PENDING(io);
  154. }
  155. if (nevents == nkqueue) break;
  156. }
  157. return nevents;
  158. }
  159. #endif