kqueue.cpp 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. #include "io_watcher.h"
  2. #ifdef EVENT_KQUEUE
  3. #include "hplatform.h"
  4. #include "hdef.h"
  5. #define INIT_EVENTS_NUM 64
  6. typedef struct kqueue_ctx_s {
  7. int kqfd;
  8. int capacity;
  9. int nchanges;
  10. struct kevent* changes;
  11. //int nevents; // nevents == nchanges
  12. struct kevent* events;
  13. } kqueue_ctx_t;
  14. static void kqueue_ctx_resize(kqueue_ctx_t* kqueue_ctx, int size) {
  15. int bytes = sizeof(struct kevent) * size;
  16. kqueue_ctx->changes = (struct kevent*)realloc(kqueue_ctx->changes, bytes);
  17. kqueue_ctx->events = (struct kevent*)realloc(kqueue_ctx->events, bytes);
  18. kqueue_ctx->capacity = size;
  19. }
  20. int iowatcher_init(hloop_t* loop) {
  21. if (loop->iowatcher) return 0;
  22. kqueue_ctx_t* kqueue_ctx = (kqueue_ctx_t*)malloc(sizeof(kqueue_ctx_t));
  23. kqueue_ctx->kqfd = kqueue();
  24. kqueue_ctx->capacity = INIT_EVENTS_NUM;
  25. kqueue_ctx->nchanges = 0;
  26. int bytes = sizeof(struct kevent) * kqueue_ctx->capacity;
  27. kqueue_ctx->changes = (struct kevent*)malloc(bytes);
  28. memset(kqueue_ctx->changes, 0, bytes);
  29. kqueue_ctx->events = (struct kevent*)malloc(bytes);
  30. memset(kqueue_ctx->events, 0, bytes);
  31. loop->iowatcher = kqueue_ctx;
  32. return 0;
  33. }
  34. int iowatcher_cleanup(hloop_t* loop) {
  35. if (loop->iowatcher == NULL) return 0;
  36. kqueue_ctx_t* kqueue_ctx = (kqueue_ctx_t*)loop->iowatcher;
  37. close(kqueue_ctx->kqfd);
  38. SAFE_FREE(kqueue_ctx->changes);
  39. SAFE_FREE(kqueue_ctx->events);
  40. SAFE_FREE(loop->iowatcher);
  41. return 0;
  42. }
  43. static int __add_event(hio_t* io, int event) {
  44. hloop_t* loop = io->loop;
  45. if (loop->iowatcher == NULL) {
  46. hloop_iowatcher_init(loop);
  47. }
  48. kqueue_ctx_t* kqueue_ctx = (kqueue_ctx_t*)loop->iowatcher;
  49. int idx = io->event_index[EVENT_INDEX(event)];
  50. if (idx < 0) {
  51. io->event_index[EVENT_INDEX(event)] = idx = kqueue_ctx->nchanges;
  52. kqueue_ctx->nchanges++;
  53. if (idx == kqueue_ctx->capacity) {
  54. kqueue_ctx_resize(kqueue_ctx, kqueue_ctx->capacity*2);
  55. }
  56. memset(kqueue_ctx->changes+idx, 0, sizeof(struct kevent));
  57. kqueue_ctx->changes[idx].ident = io->fd;
  58. }
  59. assert(kqueue_ctx->changes[idx].ident == io->fd);
  60. if (events & READ_EVENT) {
  61. kqueue_ctx->changes[idx].filter = EVFILT_READ;
  62. }
  63. else if (events & WRITE_EVENT) {
  64. kqueue_ctx->changes[idx].filter = EVFILT_WRITE;
  65. }
  66. kqueue_ctx->changes[idx].flags = EV_ADD|EV_ENABLE;
  67. struct timespec ts;
  68. ts.tv_sec = 0;
  69. ts.tv_nsec = 0;
  70. kevent(kqueue_ctx->kqfd, kqueue_ctx->changes, kqueue_ctx->nchanges, NULL, 0, &ts);
  71. return 0;
  72. }
  73. int iowatcher_add_event(hio_t* io, int events) {
  74. if (events & READ_EVENT) {
  75. __add_event(event, READ_EVENT);
  76. }
  77. if (events & WRITE_EVENT) {
  78. __add_event(event, WRITE_EVENT);
  79. }
  80. return 0;
  81. }
  82. static int __del_event(hio_t* io, int event) {
  83. hloop_t* loop = io->loop;
  84. kqueue_ctx_t* kqueue_ctx = (kqueue_ctx_t*)loop->iowatcher;
  85. if (kqueue_ctx == NULL) return 0;
  86. int idx = io->event_index[EVENT_INDEX(event)];
  87. if (idx < 0) return 0;
  88. assert(kqueue_ctx->changes[idx].ident == io->fd);
  89. kqueue_ctx->changes[idx].flags = EV_DELETE;
  90. io->event_index[EVENT_INDEX(event)] = -1;
  91. int lastidx = kqueue_ctx->nchanges - 1;
  92. if (idx < lastidx) {
  93. // swap
  94. struct kevent tmp;
  95. tmp = kqueue_ctx->changes[idx];
  96. kqueue_ctx->changes[idx] = kqueue_ctx->changes[lastidx];
  97. kqueue_ctx->changes[lastidx] = tmp;
  98. auto iter = loop->events.find(kqueue_ctx->changes[idx].ident);
  99. if (iter != loop->events.end()) {
  100. iter->second->event_index[kqueue_ctx->changes[idx].filter == EVFILT_READ ? READ_INDEX : WRITE_INDEX] = idx;
  101. }
  102. }
  103. struct timespec ts;
  104. ts.tv_sec = 0;
  105. ts.tv_nsec = 0;
  106. kevent(kqueue_ctx->kqfd, kqueue_ctx->changes, kqueue_ctx->nchanges, NULL, 0, &ts);
  107. kqueue_ctx->nchanges--;
  108. return 0;
  109. }
  110. int iowatcher_del_event(hio_t* io, int events) {
  111. if (events & READ_EVENT) {
  112. __del_event(io, READ_EVENT);
  113. }
  114. if (events & WRITE_EVENT) {
  115. __del_event(io, WRITE_EVENT);
  116. }
  117. return 0;
  118. }
  119. int iowatcher_poll_events(hloop_t* loop, int timeout) {
  120. kqueue_ctx_t* kqueue_ctx = (kqueue_ctx_t*)loop->iowatcher;
  121. if (kqueue_ctx == NULL) return 0;
  122. if (kqueue_ctx->nchanges == 0) return 0;
  123. struct timespec ts, *tp;
  124. if (timeout == INFINITE) {
  125. tp = NULL;
  126. }
  127. else {
  128. ts.tv_sec = timeout / 1000;
  129. ts.tv_nsec = (timeout % 1000) * 1000000;
  130. tp = &ts;
  131. }
  132. int nkqueue = kevent(kqueue_ctx->kqfd, kqueue_ctx->changes, kqueue_ctx->nchanges, kqueue_ctx->events, kqueue_ctx->nchanges, tp);
  133. if (nkqueue < 0) {
  134. perror("kevent");
  135. return nkqueue;
  136. }
  137. if (nkqueue == 0) return 0;
  138. int nevent = 0;
  139. for (int i = 0; i < nkqueue; ++i) {
  140. if (nevent == nkqueue) break;
  141. if (kqueue_ctx->events[i].flags & EV_ERROR) {
  142. continue;
  143. }
  144. ++nevent;
  145. int fd = kqueue_ctx->events[i].ident;
  146. int revent = kqueue_ctx->events[i].filter;
  147. hio_t* io = hio_get(loop, fd);
  148. if (io == NULL) continue;
  149. io->revents = revent;
  150. hio_handle_events(io);
  151. }
  152. return nevent;
  153. }
  154. #endif