ithewei 6 년 전
부모
커밋
fc1d487a96
6개의 변경된 파일174개의 추가작업 그리고 11개의 파일을 삭제
  1. 1 0
      event/hevent.cpp
  2. 6 0
      event/hevent.h
  3. 3 1
      event/hloop.cpp
  4. 3 1
      event/hloop.h
  5. 156 4
      event/kqueue.cpp
  6. 5 5
      event/poll.cpp

+ 1 - 0
event/hevent.cpp

@@ -16,6 +16,7 @@ int _on_read(hevent_t* event) {
 }
 
 int _on_write(hevent_t* event) {
+    // ONESHOT
     _del_event(event, WRITE_EVENT);
     event->writeable = 1;
     //if (event->connect) {

+ 6 - 0
event/hevent.h

@@ -7,6 +7,10 @@
 #define READ_EVENT  0x0001
 #define WRITE_EVENT 0x0004
 
+#define READ_INDEX  0
+#define WRITE_INDEX 1
+#define EVENT_INDEX(type) ((type == READ_EVENT) ? READ_INDEX : WRITE_INDEX)
+
 int hloop_event_init(hloop_t* loop);
 int hloop_event_cleanup(hloop_t* loop);
 int hloop_add_event(hevent_t* event, int type = READ_EVENT|WRITE_EVENT);
@@ -23,6 +27,8 @@ int _on_write(hevent_t* event);
 #define EVENT_SELECT
 #elif defined(OS_LINUX)
 #define EVENT_EPOLL
+#elif defined(OS_MAC)
+#define EVENT_KQUEUE
 #elif defined(OS_BSD)
 #define EVENT_KQUEUE
 #else

+ 3 - 1
event/hloop.cpp

@@ -212,7 +212,9 @@ hevent_t* hevent_add(hloop_t* loop, int fd) {
     memset(event, 0, sizeof(hevent_t));
     event->loop = loop;
     event->fd = fd;
-    event->event_index = -1;
+    event->event_index[0] = -1;
+    event->event_index[1] = -1;
+    event->events = 0;
     loop->events[fd] = event;
     return event;
 }

+ 3 - 1
event/hloop.h

@@ -87,7 +87,7 @@ struct hevent_s {
     unsigned    connect     :1;
     unsigned    readable    :1;
     unsigned    writeable   :1;
-    int         event_index; // for poll
+    int         event_index[2]; // for poll,kqueue
     int         events;      // for epoll
 };
 
@@ -111,6 +111,8 @@ void        hidle_del(hloop_t* loop, uint32_t idle_id);
 void        hidle_del(hidle_t* idle);
 
 // event
+// NOTE: READ_EVENT is FOREVER
+// NOTE: WRITE_EVENT is ONESHOT
 hevent_t* hevent_accept(hloop_t* loop, int listenfd, hevent_cb on_accept, void* userdata);
 hevent_t* hevent_connect(hloop_t* loop, int connfd, hevent_cb on_connect, void* userdata);
 hevent_t* hevent_read(hloop_t* loop, int fd, hevent_cb on_readable, void* userdata);

+ 156 - 4
event/kqueue.cpp

@@ -1,25 +1,177 @@
 #include "hevent.h"
 
 #ifdef EVENT_KQUEUE
+#include "hplatform.h"
+#if defined(OS_MAC) || defined(OS_BSD)
+#include <sys/event.h>
+#endif
+
+#include "hdef.h"
+
+#define INIT_EVENTS_NUM     64
+
+typedef struct kqueue_ctx_s {
+    int kqfd;
+    int capacity;
+    int nchanges;
+    struct kevent* changes;
+    //int nevents; // nevents == nchanges
+    struct kevent* events;
+} kqueue_ctx_t;
+
+static void kqueue_ctx_resize(kqueue_ctx_t* kqueue_ctx, int size) {
+    int bytes = sizeof(kevent) * size;
+    kqueue_ctx->changes = (struct kevent*)realloc(kqueue_ctx->changes, bytes);
+    kqueue_ctx->events = (struct kevent*)realloc(kqueue_ctx->events, bytes);
+    kqueue_ctx->capacity = size;
+}
+
 int _event_init(hloop_t* loop) {
-    loop->event_ctx = NULL;
+    if (loop->event_ctx) return 0;
+    kqueue_ctx_t* kqueue_ctx = (kqueue_ctx_t*)malloc(sizeof(kqueue_ctx_t));
+    kqueue_ctx->kqfd = kqueue();
+    kqueue_ctx->capacity = INIT_EVENTS_NUM;
+    kqueue_ctx->nchanges = 0;
+    int bytes = sizeof(struct kevent) * kqueue_ctx->capacity;
+    kqueue_ctx->changes = (struct kevent*)malloc(bytes);
+    memset(kqueue_ctx->changes, 0, bytes);
+    kqueue_ctx->events = (struct kevent*)malloc(bytes);
+    memset(kqueue_ctx->events, 0, bytes);
+    loop->event_ctx = kqueue_ctx;
     return 0;
 }
 
 int _event_cleanup(hloop_t* loop) {
-    loop->event_ctx = NULL;
+    if (loop->event_ctx = NULL) return 0;
+    kqueue_ctx_t* kqueue_ctx = (kqueue_ctx_t*)loop->event_ctx;
+    close(kqueue_ctx->kqfd);
+    SAFE_FREE(kqueue_ctx->changes);
+    SAFE_FREE(kqueue_ctx->events);
+    SAFE_FREE(loop->event_ctx);
+    return 0;
+}
+
+static int __add_event(hevent_t* event, int type) {
+    hloop_t* loop = event->loop;
+    if (loop->event_ctx == NULL) {
+        hloop_event_init(loop);
+    }
+    kqueue_ctx_t* kqueue_ctx = (kqueue_ctx_t*)loop->event_ctx;
+    int idx = event->event_index[EVENT_INDEX(type)];
+    if (idx < 0) {
+        event->event_index[EVENT_INDEX(type)] = idx = kqueue_ctx->nchanges;
+        kqueue_ctx->nchanges++;
+        if (idx == kqueue_ctx->capacity) {
+            kqueue_ctx_resize(kqueue_ctx, kqueue_ctx->capacity*2);
+        }
+        memset(kqueue_ctx->changes+idx, 0, sizeof(struct kevent));
+        kqueue_ctx->ident = event->fd;
+    }
+    assert(kqueue->changes[idx].ident == event->fd);
+    if (type & READ_EVENT) {
+        kqueue_ctx->changes[idx].filter = EVFILT_READ;
+    }
+    else if (type & WRITE_EVENT) {
+        kqueue_ctx->changes[idx].filter = EVFILT_WRITE;
+    }
+    kqueue_ctx->changes[idx].flags = EV_ADD|EV_ENABLE;
+    struct timespec ts;
+    ts.tv_sec = 0;
+    ts.tv_nsec = 0;
+    kevent(kqueue_ctx->kqfd, kqueue_ctx->changes, kqueue->nchanges, NULL, 0, &ts);
     return 0;
 }
 
 int _add_event(hevent_t* event, int type) {
+    if (type & READ_EVENT) {
+        __add_event(READ_EVENT);
+    }
+    if (type & WRITE_EVENT) {
+        __add_event(WRITE_EVENT);
+    }
     return 0;
 }
 
-int _del_event(hevent_t* event, int type) {
+static int __del_event(hevent_t* event, int type) {
+    hloop_t* loop = event->loop;
+    kqueue_ctx_t* kqueue_ctx = (kqueue_ctx_t*)loop->event_ctx;
+    if (kqueue_ctx == NULL) return 0;
+    int idx = event->event_index[EVENT_INDEX(type)];
+    if (idx < 0) return 0;
+    assert(kqueue_ctx->changes[idx].ident == event->fd);
+    kqueue_ctx->changes[idx].flags = EV_DELETE;
+    event->index = -1;
+    kqueue_ctx->nchanges--;
+    if (idx < kqueue_ctx->nchanges) {
+        // swap
+        struct event tmp;
+        tmp = kqueue_ctx->changes[idx];
+        kqueue_ctx->changes[idx] = kqueue_ctx->changes[kqueue_ctx->nchanges];
+        kqueue_ctx->changes[kqueue_ctx->nchanges] = tmp;
+        auto iter = loop->events.find(kqueue_ctx->changes[idx].ident);
+        if (iter != loop->events.end()) {
+            iter->second->event_index[kqueue_ctx->changes[idx].filter == EVFILT_READ ? READ_INDEX : WRITE_INDEX] = idx;
+        }
+    }
+    struct timespec ts;
+    ts.tv_sec = 0;
+    ts.tv_nsec = 0;
+    kevent(kqueue_ctx->kqfd, kqueue_ctx->changes, kqueue_ctx->nchanges, NULL, 0, &ts);
     return 0;
 }
 
-int _handle_events(hloop_t* loop, int timeout) {
+int _del_event(hevent* event, int type) {
+    if (type & READ_EVENT) {
+        __del_event(READ_EVENT);
+    }
+    if (type & WRITE_EVENT) {
+        __del_event(WRITE_EVENT);
+    }
     return 0;
 }
+
+int _handle_events(hloop_t* loop, int timeout) {
+    hloop_t* loop = event->loop;
+    kqueue_ctx_t* kqueue_ctx = (kqueue_ctx_t*)loop->event_ctx;
+    if (kqueue_ctx == NULL) return 0;
+    if (kqueue_ctx->nchanges == 0) return 0;
+    struct timespec ts, *tp;
+    if (timeout == INFINITE) {
+        tp = NULL;
+    }
+    else {
+        ts.tv_sec = timeout / 1000;
+        ts.tv_nsec = (timeout % 1000) * 1000000;
+        tp = &ts;
+    }
+    int nkqueue = kevent(kqueue_ctx->kqfd, kqueue_ctx->changes, kqueue_ctx->nchanges, kqueue_ctx->events, kqueue_ctx->nchanges, tp);
+    if (nkqueue < 0) {
+        perror("kevent");
+        return nkqueue;
+    }
+    if (nkqueue == 0) return 0;
+    int nevent = 0;
+    for (int i = 0; i < nkqueue; ++i) {
+        if (nevent == nkqueue) break;
+        if (kqueue_ctx->events[i].flags & EV_ERROR) {
+            continue;
+        }
+        ++nevent;
+        int fd = kqueue_ctx->events[i].ident;
+        int revent = kqueue_ctx->events[i].filter;
+        auto iter = loop->events.find(fd);
+        if (iter == loop->events.end()) {
+            continue;
+        }
+        hevent_t* event = iter->second;
+        if (revent == EVFILT_READ) {
+            _on_read(event);
+        }
+        else if (revent == EVFILT_WRITE) {
+            _on_write(event);
+        }
+    }
+
+    return nevent;
+}
 #endif

+ 5 - 5
event/poll.cpp

@@ -48,9 +48,9 @@ int _add_event(hevent_t* event, int type) {
         hloop_event_init(loop);
     }
     poll_ctx_t* poll_ctx = (poll_ctx_t*)loop->event_ctx;
-    int idx = event->event_index;
+    int idx = event->event_index[0];
     if (idx < 0) {
-        event->event_index = idx = poll_ctx->nfds;
+        event->event_index[0] = idx = poll_ctx->nfds;
         poll_ctx->nfds++;
         if (idx == poll_ctx->capacity) {
             poll_ctx_resize(poll_ctx, poll_ctx->capacity*2);
@@ -74,7 +74,7 @@ int _del_event(hevent_t* event, int type) {
     poll_ctx_t* poll_ctx = (poll_ctx_t*)loop->event_ctx;
     if (poll_ctx == NULL)  return 0;
 
-    int idx = event->event_index;
+    int idx = event->event_index[0];
     if (idx < 0) return 0;
     assert(poll_ctx->fds[idx].fd == event->fd);
     if (type & READ_EVENT) {
@@ -84,13 +84,13 @@ int _del_event(hevent_t* event, int type) {
         poll_ctx->fds[idx].events &= ~POLLOUT;
     }
     if (poll_ctx->fds[idx].events == 0) {
-        event->event_index = -1;
+        event->event_index[0] = -1;
         poll_ctx->nfds--;
         if (idx < poll_ctx->nfds) {
             poll_ctx->fds[idx] = poll_ctx->fds[poll_ctx->nfds];
             auto iter = loop->events.find(poll_ctx->fds[idx].fd);
             if (iter != loop->events.end()) {
-                iter->second->event_index = idx;
+                iter->second->event_index[0] = idx;
             }
         }
     }