ithewei 6 年 前
コミット
773253876d
39 ファイル変更1674 行追加1217 行削除
  1. 5 0
      Makefile
  2. 15 0
      base/RAII.cpp
  3. 33 20
      base/array.h
  4. 19 0
      base/hbuf.h
  5. 6 0
      base/hdef.h
  6. 25 7
      base/hlog.c
  7. 8 0
      base/hlog.h
  8. 25 0
      base/hmath.h
  9. 35 3
      base/hmutex.h
  10. 0 14
      base/hsocket.c
  11. 10 5
      base/hsocket.h
  12. 103 0
      base/queue.h
  13. 136 0
      event/epoll.c
  14. 0 118
      event/epoll.cpp
  15. 26 0
      event/evport.c
  16. 54 0
      event/hevent.h
  17. 0 121
      event/hio.cpp
  18. 0 16
      event/hio.h
  19. 316 0
      event/hloop.c
  20. 0 214
      event/hloop.cpp
  21. 118 96
      event/hloop.h
  22. 0 40
      event/io_watcher.cpp
  23. 0 54
      event/io_watcher.h
  24. 71 0
      event/iocp.c
  25. 0 25
      event/iocp.cpp
  26. 33 0
      event/iowatcher.h
  27. 43 34
      event/kqueue.c
  28. 283 0
      event/nio.c
  29. 0 239
      event/nio.cpp
  30. 4 5
      event/noevent.c
  31. 128 0
      event/poll.c
  32. 0 122
      event/poll.cpp
  33. 57 29
      event/select.c
  34. 26 0
      event/wsaio.c
  35. 28 23
      examples/client.cpp
  36. 25 0
      examples/loop.c
  37. 20 15
      examples/server.cpp
  38. 3 3
      h.h
  39. 19 14
      http/server/http_server.cpp

+ 5 - 0
Makefile

@@ -16,6 +16,11 @@ test: prepare
 	cp main.cpp.tmpl $(TMPDIR)/main.cpp
 	$(MAKEF) TARGET=$@ SRCDIRS=". base utils $(TMPDIR)"
 
+loop: prepare
+	-rm $(TMPDIR)/*.o $(TMPDIR)/*.h $(TMPDIR)/*.c $(TMPDIR)/*.cpp
+	cp examples/loop.c $(TMPDIR)/loop.c
+	$(MAKEF) TARGET=$@ SRCDIRS=". base event $(TMPDIR)"
+
 client: prepare
 	-rm $(TMPDIR)/*.o $(TMPDIR)/*.c $(TMPDIR)/*.cpp
 	cp examples/client.cpp $(TMPDIR)/client.cpp

+ 15 - 0
base/RAII.cpp

@@ -0,0 +1,15 @@
+#include "hplatform.h"
+
+#ifdef OS_WIN
+class WsaRAII {
+public:
+    WsaRAII() {
+        WSADATA wsadata;
+        WSAStartup(MAKEWORD(2,2), &wsadata);
+    }
+    ~WsaRAII() {
+        WSACleanup();
+    }
+};
+static WsaRAII s_wsa;
+#endif

+ 33 - 20
base/array.h

@@ -1,6 +1,15 @@
 #ifndef HW_ARRAY_H_
 #define HW_ARRAY_H_
 
+/*
+ * array
+ * at: random access by pos
+ * @effective
+ * push_back,pop_back,del_nomove,swap
+ * @ineffective
+ * add,del
+ */
+
 #include <assert.h> // for assert
 #include <stdlib.h> // for malloc,realloc,free
 #include <string.h> // for memset,memmove
@@ -54,6 +63,10 @@ static inline void atype##_init(atype* p, int maxsize) {\
     p->ptr = (type*)malloc(bytes);\
     memset(p->ptr, 0, bytes);\
 }\
+static inline void atype##_clear(atype* p) {\
+    p->size = 0;\
+    memset(p->ptr, 0, sizeof(type) * p->maxsize);\
+}\
 \
 static inline void atype##_cleanup(atype* p) {\
     if (p->ptr) {\
@@ -74,6 +87,19 @@ static inline void atype##_double_resize(atype* p) {\
     return atype##_resize(p, p->maxsize*2);\
 }\
 \
+static inline void atype##_push_back(atype* p, type* elem) {\
+    if (p->size == p->maxsize) {\
+        atype##_double_resize(p);\
+    }\
+    p->ptr[p->size] = *elem;\
+    p->size++;\
+}\
+\
+static inline void atype##_pop_back(atype* p) {\
+    assert(p->size > 0);\
+    p->size--;\
+}\
+\
 static inline void atype##_add(atype* p, type* elem, int pos) {\
     if (pos < 0) {\
         pos += p->size;\
@@ -100,17 +126,15 @@ static inline void atype##_del(atype* p, int pos) {\
     }\
 }\
 \
-static inline void atype##_push_back(atype* p, type* elem) {\
-    if (p->size == p->maxsize) {\
-        atype##_double_resize(p);\
+static inline void atype##_del_nomove(atype* p, int pos) {\
+    if (pos < 0) {\
+        pos += p->size;\
     }\
-    p->ptr[p->size] = *elem;\
-    p->size++;\
-}\
-\
-static inline void atype##_pop_back(atype* p) {\
-    assert(p->size > 0);\
+    assert(pos >= 0 && pos < p->size);\
     p->size--;\
+    if (pos < p->size) {\
+        p->ptr[pos] = p->ptr[p->size];\
+    }\
 }\
 \
 static inline void atype##_swap(atype* p, int pos1, int pos2) {\
@@ -124,16 +148,5 @@ static inline void atype##_swap(atype* p, int pos1, int pos2) {\
     p->ptr[pos1] = p->ptr[pos2];\
     p->ptr[pos2] = tmp;\
 }\
-\
-static inline void atype##_del_nomove(atype* p, int pos) {\
-    if (pos < 0) {\
-        pos += p->size;\
-    }\
-    assert(pos >= 0 && pos < p->size);\
-    p->size--;\
-    if (pos < p->size) {\
-        p->ptr[pos] = p->ptr[p->size];\
-    }\
-}\
 
 #endif

+ 19 - 0
base/hbuf.h

@@ -24,6 +24,24 @@ typedef struct hbuf_s {
 #endif
 } hbuf_t;
 
+typedef struct offset_buf_s {
+    char*   base;
+    size_t  len;
+    size_t  offset;
+#ifdef __cplusplus
+    offset_buf_s() {
+        base = NULL;
+        len = offset = 0;
+    }
+
+    offset_buf_s(void* data, size_t len) {
+        this->base = (char*)data;
+        this->len = len;
+    }
+#endif
+} offset_buf_t;
+
+#ifdef __cplusplus
 class HBuf : public hbuf_t {
 public:
     HBuf() : hbuf_t() {
@@ -219,5 +237,6 @@ private:
     size_t _tail;
     size_t _size;
 };
+#endif
 
 #endif  // HW_BUF_H_

+ 6 - 0
base/hdef.h

@@ -243,6 +243,12 @@ typedef void (*procedure_t)(void* userdata);
 #endif
 #endif
 
+#ifdef PRINT_DEBUG
+#define printd printf
+#else
+#define printd(...)
+#endif
+
 // __cplusplus
 #ifdef __cplusplus
 

+ 25 - 7
base/hlog.cpp → base/hlog.c

@@ -3,7 +3,6 @@
 #include <stdio.h>
 #include <string.h>
 #include <stdarg.h>
-#include <mutex>
 
 #include "htime.h"  // for get_datetime
 
@@ -15,7 +14,24 @@ static bool     s_logcolor = false;
 static bool     s_fflush   = true;
 static int      s_remain_days = DEFAULT_LOG_REMAIN_DAYS;
 static char     s_logbuf[LOG_BUFSIZE];
-static std::mutex s_mutex; // for thread-safe
+
+// for thread-safe
+#include "hmutex.h"
+#ifdef _MSC_VER
+static hmutex_t  s_mutex;
+static honce_t   s_once = HONCE_INIT;
+static void __mutex_init() {
+    hmutex_init(&s_mutex);
+}
+#define HLOG_LOCK\
+    honce(&s_once, __mutex_init);\
+    hmutex_lock(%s_mutex);
+
+#else
+static hmutex_t s_mutex = PTHREAD_MUTEX_INITIALIZER;
+#define HLOG_LOCK       hmutex_lock(&s_mutex);
+#endif
+#define HLOG_UNLOCK     hmutex_unlock(&s_mutex);
 
 static void ts_logfile(time_t ts, char* buf, int len) {
     struct tm* tm = localtime(&ts);
@@ -29,10 +45,10 @@ static void ts_logfile(time_t ts, char* buf, int len) {
 static FILE* shift_logfile() {
     static FILE*    s_logfp = NULL;
     static char     s_cur_logfile[256] = {0};
-    static time_t   s_last_logfile_ts = time(NULL);
+    static time_t   s_last_logfile_ts = 0;
 
     time_t ts_now = time(NULL);
-    int interval_days = ts_now / SECONDS_PER_DAY - s_last_logfile_ts / SECONDS_PER_DAY;
+    int interval_days = s_last_logfile_ts == 0 ? 0 : (ts_now / SECONDS_PER_DAY - s_last_logfile_ts / SECONDS_PER_DAY);;
     if (s_logfp == NULL || interval_days > 0) {
         // close old logfile
         if (s_logfp) {
@@ -116,10 +132,11 @@ int hlog_printf(int level, const char* fmt, ...) {
     }
 #undef CASE_LOG
 
-    std::lock_guard<std::mutex> locker(s_mutex);
+    HLOG_LOCK
 
     FILE* fp = shift_logfile();
     if (fp == NULL) {
+        HLOG_UNLOCK
         return -20;
     }
 
@@ -142,6 +159,7 @@ int hlog_printf(int level, const char* fmt, ...) {
         fflush(fp);
     }
 
+    HLOG_UNLOCK
     return len;
 }
 
@@ -150,10 +168,10 @@ void hlog_set_fflush(int on) {
 }
 
 void hlog_fflush() {
-    std::lock_guard<std::mutex> locker(s_mutex);
-
+    HLOG_LOCK
     FILE* fp = shift_logfile();
     if (fp) {
         fflush(fp);
     }
+    HLOG_UNLOCK
 }

+ 8 - 0
base/hlog.h

@@ -1,6 +1,10 @@
 #ifndef HW_LOG_H_
 #define HW_LOG_H_
 
+#ifdef __cplusplus
+extern "C" {
+#endif
+
 /*
  * hlog is thread-safe
  */
@@ -81,4 +85,8 @@ void    hlog_fflush();
 #define LOGF    hlogf
 #endif
 
+#ifdef __cplusplus
+} // extern "C"
+#endif
+
 #endif  // HW_LOG_H_

+ 25 - 0
base/hmath.h

@@ -0,0 +1,25 @@
+#ifndef HW_MATH_H_
+#define HW_MATH_H_
+#include <math.h>
+
+static inline unsigned long floor2e(unsigned long num) {
+    unsigned long n = num;
+    int e = 1;
+    while (n>>=1) ++e;
+    unsigned long ret = 1;
+    while (--e) ret<<=1;
+    return ret;
+}
+
+static inline unsigned long ceil2e(unsigned long num) {
+    // 2**0 = 1
+    if (num == 0)   return 1;
+    unsigned long n = num;
+    int e = 1;
+    while (n>>=1) ++e;
+    unsigned long ret = 1;
+    while (--e) ret<<=1;
+    return ret == num ? ret : ret<<1;
+}
+
+#endif // HW_MATH_H_

+ 35 - 3
base/hmutex.h

@@ -1,10 +1,41 @@
 #ifndef HW_MUTEX_H_
 #define HW_MUTEX_H_
 
-#include <mutex>
-
 #include "hplatform.h"
 
+#ifdef _WIN32
+#define hmutex_t            CRITICAL_SECTION
+#define hmutex_init         InitializeCriticalSection
+#define hmutex_destroy      DeleteCriticalSection
+#define hmutex_lock         EnterCriticalSection
+#define hmutex_unlock       LeaveCriticalSection
+
+#define honce_t             INIT_ONCE
+#define HONCE_INIT          INIT_ONCE_STATIC_INIT
+typedef void (*honce_fn)(void);
+static inline BOOL WINAPI s_once_func(INIT_ONCE* once, PVOID arg, PVOID* _) {
+    honce_fn fn = (honce_fn)arg;
+    fn();
+    return TRUE;
+}
+static inline void honce(INIT_ONCE* once, honce_fn fn) {
+    PVOID dummy = NULL;
+    InitOnceExecuteOnce(once, s_once_func, (PVOID)fn, &dummy);
+}
+#else
+#define hmutex_t            pthread_mutex_t
+#define hmutex_init(mutex)  pthread_mutex_init(mutex, NULL)
+#define hmutex_destroy      pthread_mutex_destroy
+#define hmutex_lock         pthread_mutex_lock
+#define hmutex_unlock       pthread_mutex_unlock
+
+#define honce_t             pthread_once_t
+#define HONCE_INIT          PTHREAD_ONCE_INIT
+#define honce               pthread_once
+#endif
+
+#ifdef __cplusplus
+#include <mutex>
 #ifdef _MSC_VER
 class RWLock {
  public:
@@ -20,7 +51,7 @@ class RWLock {
     SRWLOCK _rwlock;
 };
 #else
-class RWLock{
+class RWLock {
  public:
     RWLock() { pthread_rwlock_init(&_rwlock, NULL); }
     ~RWLock() { pthread_rwlock_destroy(&_rwlock); }
@@ -34,5 +65,6 @@ class RWLock{
     pthread_rwlock_t _rwlock;
 };
 #endif
+#endif
 
 #endif  // HW_MUTEX_H_

+ 0 - 14
base/hsocket.cpp → base/hsocket.c

@@ -1,19 +1,5 @@
 #include "hsocket.h"
 
-#ifdef OS_WIN
-class WinSocketRAII {
-public:
-    WinSocketRAII() {
-        WSADATA wsadata;
-        WSAStartup(MAKEWORD(2,2), &wsadata);
-    }
-    ~WinSocketRAII() {
-        WSACleanup();
-    }
-};
-static WinSocketRAII s_ws;
-#endif
-
 int Listen(int port) {
     // socket -> setsockopt -> bind -> listen
     int listenfd = socket(AF_INET, SOCK_STREAM, 0);

+ 10 - 5
base/hsocket.h

@@ -2,6 +2,9 @@
 #define HW_SOCKET_H_
 
 #include "hplatform.h"
+#include "hdef.h"
+
+BEGIN_EXTERN_C
 
 // socket -> setsockopt -> bind -> listen
 // @return sockfd
@@ -9,7 +12,7 @@ int Listen(int port);
 
 // gethostbyname -> socket -> nonblocking -> connect
 // @return sockfd
-int Connect(const char* host, int port, int nonblock = 0);
+int Connect(const char* host, int port, int nonblock DEFAULT(0));
 
 #ifdef OS_WIN
 typedef int socklen_t;
@@ -32,11 +35,11 @@ static inline int nonblocking(int sockfd) {
 #define NIO_EAGAIN  EAGAIN
 #endif
 
-static inline int tcp_nodelay(int sockfd, int on = 1) {
+static inline int tcp_nodelay(int sockfd, int on DEFAULT(1)) {
     return setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, (const char*)&on, sizeof(int));
 }
 
-static inline int tcp_nopush(int sockfd, int on = 1) {
+static inline int tcp_nopush(int sockfd, int on DEFAULT(1)) {
 #ifdef TCP_NOPUSH
     return setsockopt(sockfd, IPPROTO_TCP, TCP_NOPUSH, (const char*)&on, sizeof(int));
 #elif defined(TCP_CORK)
@@ -46,7 +49,7 @@ static inline int tcp_nopush(int sockfd, int on = 1) {
 #endif
 }
 
-static inline int tcp_keepalive(int sockfd, int on = 1, int delay = 60) {
+static inline int tcp_keepalive(int sockfd, int on DEFAULT(1), int delay DEFAULT(60)) {
     if (setsockopt(sockfd, SOL_SOCKET, SO_KEEPALIVE, (const char*)&on, sizeof(int)) != 0) {
         return sockerrno;
     }
@@ -61,8 +64,10 @@ static inline int tcp_keepalive(int sockfd, int on = 1, int delay = 60) {
 #endif
 }
 
-static inline int udp_broadcast(int sockfd, int on = 1) {
+static inline int udp_broadcast(int sockfd, int on DEFAULT(1)) {
     return setsockopt(sockfd, SOL_SOCKET, SO_BROADCAST, (const char*)&on, sizeof(int));
 }
 
+END_EXTERN_C
+
 #endif // HW_SOCKET_H_

+ 103 - 0
base/queue.h

@@ -0,0 +1,103 @@
+#ifndef HW_QUEUE_H_
+#define HW_QUEUE_H_
+
+/*
+ * queue
+ * FIFO: push_back,pop_front
+ */
+
+#include <assert.h> // for assert
+#include <stdlib.h> // for malloc,realloc,free
+#include <string.h> // for memset,memmove
+
+// #include <deque>
+// typedef std::deque<type> qtype;
+#define QUEUE_DECL(type, qtype) \
+struct qtype {      \
+    type*   ptr;    \
+    size_t  size;   \
+    size_t  maxsize;\
+    size_t  _offset;\
+};                  \
+typedef struct qtype qtype;\
+\
+static inline type* qtype##_data(qtype* p) {\
+    return p->ptr + p->_offset;\
+}\
+\
+static inline int qtype##_size(qtype* p) {\
+    return p->size;\
+}\
+\
+static inline int qtype##_maxsize(qtype* p) {\
+    return p->maxsize;\
+}\
+\
+static inline int qtype##_empty(qtype* p) {\
+    return p->size == 0;\
+}\
+\
+static inline type* qtype##_front(qtype* p) {\
+    return p->size == 0 ? NULL : p->ptr + p->_offset;\
+}\
+\
+static inline type* qtype##_back(qtype* p) {\
+    return p->size == 0 ? NULL : p->ptr + p->_offset + p->size-1;\
+}\
+\
+static inline void qtype##_init(qtype* p, int maxsize) {\
+    p->_offset = 0;\
+    p->size = 0;\
+    p->maxsize = maxsize;\
+    size_t bytes = sizeof(type) * maxsize;\
+    p->ptr = (type*)malloc(bytes);\
+    memset(p->ptr, 0, bytes);\
+}\
+static inline void qtype##_clear(qtype* p) {\
+    p->_offset = 0;\
+    p->size = 0;\
+    memset(p->ptr, 0, sizeof(type) * p->maxsize);\
+}\
+\
+static inline void qtype##_cleanup(qtype* p) {\
+    if (p->ptr) {\
+        free(p->ptr);\
+        p->ptr = NULL;\
+    }\
+    p->_offset = p->size = p->maxsize = 0;\
+}\
+\
+static inline void qtype##_resize(qtype* p, int maxsize) {\
+    p->maxsize = maxsize;\
+    int bytes = sizeof(type) * maxsize;\
+    p->ptr = (type*)realloc(p->ptr, bytes);\
+}\
+\
+static inline void qtype##_double_resize(qtype* p) {\
+    assert(p->maxsize != 0);\
+    return qtype##_resize(p, p->maxsize*2);\
+}\
+\
+static inline void qtype##_push_back(qtype* p, type* elem) {\
+    if (p->size == p->maxsize) {\
+        qtype##_double_resize(p);\
+    }\
+    else if (p->_offset + p->size == p->maxsize) {\
+        memmove(p->ptr, p->ptr + p->_offset, p->size);\
+        p->_offset = 0;\
+    }\
+    p->ptr[p->_offset + p->size] = *elem;\
+    p->size++;\
+}\
+static inline void qtype##_pop_front(qtype* p) {\
+    assert(p->size > 0);\
+    p->size--;\
+    if (++p->_offset == p->maxsize) p->_offset = 0;\
+}\
+\
+static inline void qtype##_pop_back(qtype* p) {\
+    assert(p->size > 0);\
+    p->size--;\
+}\
+
+#endif // HW_QUEUE_H_

+ 136 - 0
event/epoll.c

@@ -0,0 +1,136 @@
+#include "iowatcher.h"
+
+#ifdef EVENT_EPOLL
+#include "hplatform.h"
+#include "hdef.h"
+
+#include <sys/epoll.h>
+
+#include "hevent.h"
+
+#include "array.h"
+#define EVENTS_INIT_SIZE    64
+ARRAY_DECL(struct epoll_event, events);
+
+
+typedef struct epoll_ctx_s {
+    int                 epfd;
+    struct events       events;
+} epoll_ctx_t;
+
+int iowatcher_init(hloop_t* loop) {
+    if (loop->iowatcher) return 0;
+    epoll_ctx_t* epoll_ctx = (epoll_ctx_t*)malloc(sizeof(epoll_ctx_t));
+    epoll_ctx->epfd = epoll_create(EVENTS_INIT_SIZE);
+    events_init(&epoll_ctx->events, EVENTS_INIT_SIZE);
+    loop->iowatcher = epoll_ctx;
+    return 0;
+}
+
+int iowatcher_cleanup(hloop_t* loop) {
+    if (loop->iowatcher == NULL) return 0;
+    epoll_ctx_t* epoll_ctx = (epoll_ctx_t*)loop->iowatcher;
+    close(epoll_ctx->epfd);
+    events_cleanup(&epoll_ctx->events);
+    SAFE_FREE(loop->iowatcher);
+    return 0;
+}
+
+int iowatcher_add_event(hloop_t* loop, int fd, int events) {
+    if (loop->iowatcher == NULL) {
+        iowatcher_init(loop);
+    }
+    epoll_ctx_t* epoll_ctx = (epoll_ctx_t*)loop->iowatcher;
+    hio_t* io = loop->ios.ptr[fd];
+
+    struct epoll_event ee;
+    ee.events = 0;
+    ee.data.fd = fd;
+    // pre events
+    if (io->events & READ_EVENT) {
+        ee.events |= EPOLLIN;
+    }
+    if (io->events & WRITE_EVENT) {
+        ee.events |= EPOLLOUT;
+    }
+    // now events
+    if (events & READ_EVENT) {
+        ee.events |= EPOLLIN;
+    }
+    if (events & WRITE_EVENT) {
+        ee.events |= EPOLLOUT;
+    }
+    int op = io->events == 0 ? EPOLL_CTL_ADD : EPOLL_CTL_MOD;
+    epoll_ctl(epoll_ctx->epfd, op, fd, &ee);
+    if (op == EPOLL_CTL_ADD) {
+        if (epoll_ctx->events.size == epoll_ctx->events.maxsize) {
+            events_double_resize(&epoll_ctx->events);
+        }
+        epoll_ctx->events.size++;
+    }
+    return 0;
+}
+
+int iowatcher_del_event(hloop_t* loop, int fd, int events) {
+    epoll_ctx_t* epoll_ctx = (epoll_ctx_t*)loop->iowatcher;
+    if (epoll_ctx == NULL) return 0;
+    hio_t* io = loop->ios.ptr[fd];
+
+    struct epoll_event ee;
+    ee.events = 0;
+    ee.data.fd = fd;
+    // pre events
+    if (io->events & READ_EVENT) {
+        ee.events |= EPOLLIN;
+    }
+    if (io->events & WRITE_EVENT) {
+        ee.events |= EPOLLOUT;
+    }
+    // now events
+    if (events & READ_EVENT) {
+        ee.events &= ~EPOLLIN;
+    }
+    if (events & WRITE_EVENT) {
+        ee.events &= ~EPOLLOUT;
+    }
+    int op = ee.events == 0 ? EPOLL_CTL_DEL : EPOLL_CTL_MOD;
+    epoll_ctl(epoll_ctx->epfd, op, fd, &ee);
+    if (op == EPOLL_CTL_DEL) {
+        epoll_ctx->events.size--;
+    }
+    return 0;
+}
+
+int iowatcher_poll_events(hloop_t* loop, int timeout) {
+    epoll_ctx_t* epoll_ctx = (epoll_ctx_t*)loop->iowatcher;
+    if (epoll_ctx == NULL)  return 0;
+    if (epoll_ctx->events.size == 0) return 0;
+    int nepoll = epoll_wait(epoll_ctx->epfd, epoll_ctx->events.ptr, epoll_ctx->events.size, timeout);
+    if (nepoll < 0) {
+        perror("epoll");
+        return nepoll;
+    }
+    if (nepoll == 0) return 0;
+    int nevents = 0;
+    for (int i = 0; i < epoll_ctx->events.size; ++i) {
+        struct epoll_event* ee = epoll_ctx->events.ptr + i;
+        int fd = ee->data.fd;
+        uint32_t revents = ee->events;
+        if (revents) {
+            ++nevents;
+            hio_t* io = loop->ios.ptr[fd];
+            if (io) {
+                if (revents & EPOLLIN) {
+                    io->revents |= READ_EVENT;
+                }
+                if (revents & EPOLLOUT) {
+                    io->revents |= WRITE_EVENT;
+                }
+                EVENT_PENDING(io);
+            }
+        }
+        if (nevents == nepoll) break;
+    }
+    return nevents;
+}
+#endif

+ 0 - 118
event/epoll.cpp

@@ -1,118 +0,0 @@
-#include "io_watcher.h"
-
-#ifdef EVENT_EPOLL
-#include "hio.h"
-#include "hplatform.h"
-#include "hdef.h"
-
-#define INIT_EVENTS_NUM    64
-
-typedef struct epoll_ctx_s {
-    int                 epfd;
-    int                 capacity;
-    int                 nevents;
-    struct epoll_event* events;
-} epoll_ctx_t;
-
-static void epoll_ctx_resize(epoll_ctx_t* epoll_ctx, int size) {
-    int bytes = sizeof(struct epoll_event) * size;
-    epoll_ctx->events = (struct epoll_event*)realloc(epoll_ctx->events, bytes);
-    epoll_ctx->capacity = size;
-}
-
-int iowatcher_init(hloop_t* loop) {
-    if (loop->iowatcher) return 0;
-    epoll_ctx_t* epoll_ctx = (epoll_ctx_t*)malloc(sizeof(epoll_ctx_t));
-    epoll_ctx->epfd = epoll_create(INIT_EVENTS_NUM);
-    epoll_ctx->capacity = INIT_EVENTS_NUM;
-    epoll_ctx->nevents = 0;
-    int bytes = sizeof(struct epoll_event) * epoll_ctx->capacity;
-    epoll_ctx->events = (struct epoll_event*)malloc(bytes);
-    memset(epoll_ctx->events, 0, bytes);
-    loop->iowatcher = epoll_ctx;
-    return 0;
-}
-
-int iowatcher_cleanup(hloop_t* loop) {
-    if (loop->iowatcher == NULL) return 0;
-    epoll_ctx_t* epoll_ctx = (epoll_ctx_t*)loop->iowatcher;
-    close(epoll_ctx->epfd);
-    SAFE_FREE(epoll_ctx->events);
-    SAFE_FREE(loop->iowatcher);
-    return 0;
-}
-
-int iowatcher_add_event(hio_t* io, int events) {
-    hloop_t* loop = io->loop;
-    if (loop->iowatcher == NULL) {
-        hloop_iowatcher_init(loop);
-    }
-    epoll_ctx_t* epoll_ctx = (epoll_ctx_t*)loop->iowatcher;
-    struct epoll_event ee;
-    ee.events = 0;
-    ee.data.fd = io->fd;
-    if (events & READ_EVENT) {
-        ee.events |= EPOLLIN;
-    }
-    if (events & WRITE_EVENT) {
-        ee.events |= EPOLLOUT;
-    }
-    int op = io->events == 0 ? EPOLL_CTL_ADD : EPOLL_CTL_MOD;
-    epoll_ctl(epoll_ctx->epfd, op, io->fd, &ee);
-    if (op == EPOLL_CTL_ADD) {
-        if (epoll_ctx->nevents == epoll_ctx->capacity) {
-            epoll_ctx_resize(epoll_ctx, epoll_ctx->capacity*2);
-        }
-        epoll_ctx->nevents++;
-    }
-    return 0;
-}
-
-int iowatcher_del_event(hio_t* io, int events) {
-    hloop_t* loop = io->loop;
-    epoll_ctx_t* epoll_ctx = (epoll_ctx_t*)loop->iowatcher;
-    if (epoll_ctx == NULL) return 0;
-
-    struct epoll_event ee;
-    ee.events = io->events;
-    ee.data.fd = io->fd;
-    if (events & READ_EVENT) {
-        ee.events &= ~EPOLLIN;
-    }
-    if (events & WRITE_EVENT) {
-        ee.events &= ~EPOLLOUT;
-    }
-    int op = ee.events == 0 ? EPOLL_CTL_DEL : EPOLL_CTL_MOD;
-    epoll_ctl(epoll_ctx->epfd, op, io->fd, &ee);
-    if (op == EPOLL_CTL_DEL) {
-        epoll_ctx->nevents--;
-    }
-    return 0;
-}
-
-int iowatcher_poll_events(hloop_t* loop, int timeout) {
-    epoll_ctx_t* epoll_ctx = (epoll_ctx_t*)loop->iowatcher;
-    if (epoll_ctx == NULL)  return 0;
-    if (epoll_ctx->nevents == 0) return 0;
-    int nepoll = epoll_wait(epoll_ctx->epfd, epoll_ctx->events, epoll_ctx->nevents, timeout);
-    if (nepoll < 0) {
-        perror("epoll");
-        return nepoll;
-    }
-    if (nepoll == 0) return 0;
-    int nevent = 0;
-    for (int i = 0; i < epoll_ctx->nevents; ++i) {
-        if (nevent == nepoll) break;
-        int fd = epoll_ctx->events[i].data.fd;
-        uint32_t revents = epoll_ctx->events[i].events;
-        if (revents) {
-            ++nevent;
-            hio_t* io = hio_get(loop, fd);
-            if (io == NULL) continue;
-            io->revents = revents;
-            hio_handle_events(io);
-        }
-    }
-    return nevent;
-}
-#endif

+ 26 - 0
event/evport.c

@@ -0,0 +1,26 @@
+#include "iowatcher.h"
+
+#ifdef EVENT_PORT
+
+#include <port.h>
+
+int iowatcher_init(hloop_t* loop) {
+    return 0;
+}
+
+int iowatcher_cleanup(hloop_t* loop) {
+    return 0;
+}
+
+int iowatcher_add_event(hloop_t* loop, int fd, int events) {
+    return 0;
+}
+
+int iowatcher_del_event(hloop_t* loop, int fd, int events) {
+    return 0;
+}
+
+int iowatcher_poll_events(hloop_t* loop, int timeout) {
+    return 0;
+}
+#endif

+ 54 - 0
event/hevent.h

@@ -0,0 +1,54 @@
+#ifndef HW_EVENT_H_
+#define HW_EVENT_H_
+
+#include "hloop.h"
+
+#include "hdef.h"
+
+#define EVENT_ENTRY(p)          container_of(p, hevent_t, pending_node)
+#define IDLE_ENTRY(p)           container_of(p, hidle_t,  node)
+#define TIMER_ENTRY(p)          container_of(p, htimer_t, node)
+#define TIMER_HEAP_ENTRY(p)     container_of(p, htimer_t, hnode)
+
+#define EVENT_ACTIVE(ev) \
+    if (!ev->active) {\
+        ev->active = 1;\
+        ev->loop->nactives++;\
+    }\
+
+#define EVENT_INACTIVE(ev) \
+    if (ev->active) {\
+        ev->active = 0;\
+        ev->loop->nactives--;\
+    }\
+
+#define EVENT_PENDING(ev) \
+    do {\
+        if (!ev->pending) {\
+            ev->pending = 1;\
+            ev->loop->npendings++;\
+            hevent_t** phead = &ev->loop->pendings[HEVENT_PRIORITY_INDEX(ev->priority)];\
+            if (*phead == NULL) {\
+                *phead = (hevent_t*)ev;\
+            } else {\
+                ev->pending_next = *phead;\
+                *phead = (hevent_t*)ev;\
+            }\
+        }\
+    } while(0)
+
+#define EVENT_ADD(loop, ev, cb) \
+    do {\
+        ev->loop = loop;\
+        ev->event_id = ++loop->event_counter;\
+        ev->cb = (hevent_cb)cb;\
+        EVENT_ACTIVE(ev);\
+    } while(0)
+
+#define EVENT_DEL(ev) \
+    do {\
+        EVENT_INACTIVE(ev);\
+        ev->destroy = 1;\
+    } while(0)
+
+#endif // HW_EVENT_H_

+ 0 - 121
event/hio.cpp

@@ -1,121 +0,0 @@
-#include "hio.h"
-#include "io_watcher.h"
-
-void hio_init(hio_t* io) {
-    hloop_t* loop = io->loop;
-    int fd = io->fd;
-    memset(io, 0, sizeof(hio_t));
-    io->event_index[0] = io->event_index[1] = -1;
-    io->loop = loop;
-    io->fd = fd;
-}
-
-hio_t* hio_get(hloop_t* loop, int fd) {
-    auto iter = loop->ios.find(fd);
-    if (iter == loop->ios.end()) {
-        return NULL;
-    }
-    return iter->second;
-}
-
-hio_t* hio_add(hloop_t* loop, int fd) {
-    // first try get
-    hio_t* io = hio_get(loop, fd);
-    if (io == NULL) {
-        // then add
-#ifdef EVENT_SELECT
-        if (loop->ios.size() >= FD_SETSIZE) return NULL;
-#endif
-        io = (hio_t*)malloc(sizeof(hio_t));
-        hio_init(io);
-        io->event_type= HEVENT_TYPE_IO;
-        io->event_id = ++loop->event_counter;
-        loop->ios[fd] = io;
-    }
-    io->loop = loop;
-    io->fd = fd;
-    io->active = 1;
-    return io;
-}
-
-void hio_del(hio_t* io) {
-    iowatcher_del_event(io, ALL_EVENTS);
-    io->events = 0;
-     // no free, just init for reuse
-    hio_init(io);
-}
-
-hio_t* hio_read (hloop_t* loop, int fd, hio_cb revent_cb, void* revent_userdata) {
-    hio_t* io = hio_add(loop, fd);
-    if (io == NULL) return NULL;
-    io->revent_cb = revent_cb;
-    io->revent_userdata = revent_userdata;
-    iowatcher_add_event(io, READ_EVENT);
-    io->events |= READ_EVENT;
-    return io;
-}
-
-hio_t* hio_write  (hloop_t* loop, int fd, hio_cb wevent_cb, void* wevent_userdata) {
-    hio_t* io = hio_add(loop, fd);
-    if (io == NULL) return NULL;
-    io->wevent_cb = wevent_cb;
-    io->wevent_userdata = wevent_userdata;
-    iowatcher_add_event(io, WRITE_EVENT);
-    io->events |= WRITE_EVENT;
-    return io;
-}
-
-#include "hsocket.h"
-hio_t* hio_accept (hloop_t* loop, int listenfd, hio_cb revent_cb, void* revent_userdata) {
-    hio_t* io = hio_read(loop, listenfd, revent_cb, revent_userdata);
-    if (io) {
-        nonblocking(listenfd);
-        io->accept = 1;
-    }
-    return io;
-}
-
-hio_t* hio_connect(hloop_t* loop, int connfd, hio_cb wevent_cb, void* wevent_userdata) {
-    hio_t* io = hio_write(loop, connfd, wevent_cb, wevent_userdata);
-    if (io) {
-        nonblocking(connfd);
-        io->connect = 1;
-    }
-    return io;
-}
-
-static int handle_read_event(hio_t* io) {
-    if (!io->active) return 0;
-    if (io->revent_cb) {
-        io->revent_cb(io, io->revent_userdata);
-    }
-    return 0;
-}
-
-static int handle_write_event(hio_t* io) {
-    if (!io->active) return 0;
-    bool connect_event = io->connect;
-    if (connect_event) {
-        // ONESHOT
-        iowatcher_del_event(io, WRITE_EVENT);
-        io->connect = 0;
-    }
-    if (io->wevent_cb) {
-        io->wevent_cb(io, io->wevent_userdata);
-    }
-    //if (!connect_event && io->write_queue.empty()) {
-        //iowatcher_del_event(io, WRITE_EVENT);
-    //}
-    return 0;
-}
-
-int hio_handle_events(hio_t* io) {
-    if (io->revents & READ_EVENT) {
-        handle_read_event(io);
-    }
-    if (io->revents & WRITE_EVENT) {
-        handle_write_event(io);
-    }
-    io->revents = 0;
-    return 0;
-}

+ 0 - 16
event/hio.h

@@ -1,16 +0,0 @@
-#ifndef HW_IO_H_
-#define HW_IO_H_
-
-#include "hloop.h"
-
-hio_t* hio_get(hloop_t* loop, int fd);
-hio_t* hio_add(hloop_t* loop, int fd);
-void   hio_del(hio_t* io);
-int    hio_handle_events(hio_t* io);
-
-hio_t* hio_read   (hloop_t* loop, int fd, hio_cb revent_cb, void* revent_userdata);
-hio_t* hio_write  (hloop_t* loop, int fd, hio_cb wevent_cb, void* wevent_userdata);
-hio_t* hio_accept (hloop_t* loop, int listenfd, hio_cb revent_cb, void* revent_userdata);
-hio_t* hio_connect(hloop_t* loop, int connfd, hio_cb wevent_cb, void* wevent_userdata);
-
-#endif // HW_IO_H_

+ 316 - 0
event/hloop.c

@@ -0,0 +1,316 @@
+#include "hloop.h"
+#include "hevent.h"
+#include "iowatcher.h"
+
+#include "hdef.h"
+#include "hlog.h"
+#include "hmath.h"
+
+#define PAUSE_TIME              10      // ms
+#define MIN_BLOCK_TIME          1       // ms
+#define MAX_BLOCK_TIME          10000   // ms
+
+#define IO_ARRAY_INIT_SIZE      64
+
+static int timer_minheap_compare(const struct heap_node* lhs, const struct heap_node* rhs) {
+    return TIMER_HEAP_ENTRY(lhs)->timeout < TIMER_HEAP_ENTRY(rhs)->timeout;
+}
+
+static int hloop_process_idles(hloop_t* loop) {
+    int nidles = 0;
+    struct list_node* node = loop->idles.next;
+    hidle_t* idle = NULL;
+    while (node != &loop->idles) {
+        idle = IDLE_ENTRY(node);
+        if (idle->destroy) goto destroy;
+        if (!idle->active) goto next;
+        if (idle->repeat == 0) {
+            hidle_del(idle);
+            //goto next;
+            goto destroy;
+        }
+        if (idle->repeat != INFINITE) {
+            --idle->repeat;
+        }
+        EVENT_PENDING(idle);
+        ++nidles;
+next:
+        node = node->next;
+        continue;
+destroy:
+        node = node->next;
+        list_del(node->prev);
+        free(idle);
+    }
+    return nidles;
+}
+
+static int hloop_process_timers(hloop_t* loop) {
+    int ntimers = 0;
+    struct list_node* node = loop->timers.next;
+    htimer_t* timer = NULL;
+    while (node != &loop->timers) {
+        timer = TIMER_ENTRY(node);
+        if (timer->destroy) goto destroy;
+        if (!timer->active) goto next;
+        if (timer->repeat == 0) {
+            htimer_del(timer);
+            //goto next;
+            goto destroy;
+        }
+        if (loop->cur_hrtime > timer->next_timeout) {
+            if (timer->repeat != INFINITE) {
+                --timer->repeat;
+            }
+            timer->next_timeout += timer->timeout*1000;
+            EVENT_PENDING(timer);
+            ++ntimers;
+        }
+next:
+        node = node->next;
+        continue;
+destroy:
+        node = node->next;
+        list_del(node->prev);
+        free(timer);
+    }
+    return ntimers;
+}
+
+static int hloop_process_ios(hloop_t* loop, int timeout) {
+    int nevents = iowatcher_poll_events(loop, timeout);
+    if (nevents < 0) {
+        hloge("poll_events error=%d", -nevents);
+    }
+    return nevents < 0 ? 0 : nevents;
+}
+
+static int hloop_process_pendings(hloop_t* loop) {
+    if (loop->npendings == 0) return 0;
+
+    hevent_t* prev = NULL;
+    hevent_t* next = NULL;
+    int ncbs = 0;
+    for (int i = HEVENT_PRIORITY_SIZE-1; i >= 0; --i) {
+        next = loop->pendings[i];
+        while (next) {
+            if (next->active && next->cb) {
+                next->cb(next);
+                ++ncbs;
+            }
+            prev = next;
+            next = next->pending_next;
+            prev->pending = 0;
+            prev->pending_next = NULL;
+        }
+        loop->pendings[i] = NULL;
+    }
+    loop->npendings = 0;
+    return ncbs;
+}
+
+static int hloop_process_events(hloop_t* loop) {
+    // ios -> timers -> idles
+    int nios, ntimers, nidles;
+    nios = ntimers = nidles = 0;
+
+    int blocktime = MAX_BLOCK_TIME;
+    if (loop->timer_minheap.root) {
+        // if have timers, blocktime = min_timeout
+        blocktime = TIMER_HEAP_ENTRY(loop->timer_minheap.root)->timeout;
+        //if (!list_empty(&loop->idles))
+            blocktime /= 10;
+    }
+    blocktime = LIMIT(MIN_BLOCK_TIME, blocktime, MAX_BLOCK_TIME);
+    // if you want timer more precise, reduce blocktime
+
+    uint64_t last_hrtime = loop->cur_hrtime;
+    nios = hloop_process_ios(loop, blocktime);
+    hloop_update_time(loop);
+    ntimers = hloop_process_timers(loop);
+    if (loop->npendings == 0) {
+        loop->idle_time += last_hrtime - loop->cur_hrtime;
+        // avoid frequent call idles
+        if (loop->cur_hrtime - loop->last_idle_hrtime > 1e6) {
+            loop->last_idle_hrtime = loop->cur_hrtime;
+            nidles= hloop_process_idles(loop);
+        }
+        else {
+            // hloop_process_ios maybe nonblock, so sleep here
+            msleep(blocktime);
+        }
+    }
+    printd("blocktime=%d nios=%d ntimers=%d nidles=%d nactives=%d npendings=%d\n", blocktime, nios, ntimers, nidles, loop->nactives, loop->npendings);
+    return hloop_process_pendings(loop);
+}
+
+int hloop_init(hloop_t* loop) {
+    memset(loop, 0, sizeof(hloop_t));
+    loop->status = HLOOP_STATUS_STOP;
+    // idles
+    list_init(&loop->idles);
+    // timers
+    list_init(&loop->timers);
+    heap_init(&loop->timer_minheap, timer_minheap_compare);
+    // iowatcher
+    //iowatcher_init(loop);
+    return 0;
+}
+
+void hloop_cleanup(hloop_t* loop) {
+    // pendings
+    for (int i = 0; i < HEVENT_PRIORITY_SIZE; ++i) {
+        loop->pendings[i] = NULL;
+    }
+    // idles
+    struct list_node* node = loop->idles.next;
+    hidle_t* idle;
+    while (node != &loop->idles) {
+        idle = IDLE_ENTRY(node);
+        node = node->next;
+        free(idle);
+    }
+    list_init(&loop->idles);
+    // timers
+    node = loop->timers.next;
+    htimer_t* timer;
+    while (node != &loop->timers) {
+        timer = TIMER_ENTRY(node);
+        node = node->next;
+        free(timer);
+    }
+    list_init(&loop->timers);
+    heap_init(&loop->timer_minheap, NULL);
+    // iowatcher
+    //iowatcher_cleanup(loop);
+};
+
+int hloop_run(hloop_t* loop) {
+    time(&loop->start_time);
+    loop->start_hrtime = gethrtime();
+    loop->loop_cnt = 0;
+    loop->status = HLOOP_STATUS_RUNNING;
+    while (loop->status != HLOOP_STATUS_STOP) {
+        if (loop->status == HLOOP_STATUS_PAUSE) {
+            msleep(PAUSE_TIME);
+            hloop_update_time(loop);
+            continue;
+        }
+        ++loop->loop_cnt;
+        if (loop->nactives == 0) break;
+        hloop_process_events(loop);
+    }
+    loop->status = HLOOP_STATUS_STOP;
+    loop->end_hrtime = gethrtime();
+    hloop_cleanup(loop);
+    return 0;
+};
+
+int hloop_stop(hloop_t* loop) {
+    loop->status = HLOOP_STATUS_STOP;
+    return 0;
+}
+
+int hloop_pause(hloop_t* loop) {
+    if (loop->status == HLOOP_STATUS_RUNNING) {
+        loop->status = HLOOP_STATUS_PAUSE;
+    }
+    return 0;
+}
+
+int hloop_resume(hloop_t* loop) {
+    if (loop->status == HLOOP_STATUS_PAUSE) {
+        loop->status = HLOOP_STATUS_RUNNING;
+    }
+    return 0;
+}
+
+hidle_t* hidle_add(hloop_t* loop, hidle_cb cb, uint32_t repeat) {
+    hidle_t* idle = (hidle_t*)malloc(sizeof(hidle_t));
+    memset(idle, 0, sizeof(hidle_t));
+    idle->event_type = HEVENT_TYPE_IDLE;
+    idle->repeat = repeat;
+    list_add(&idle->node, &loop->idles);
+    EVENT_ADD(loop, idle, cb);
+    return idle;
+}
+
+void hidle_del(hidle_t* idle) {
+    EVENT_DEL(idle);
+}
+
+htimer_t* htimer_add(hloop_t* loop, htimer_cb cb, uint64_t timeout, uint32_t repeat) {
+    htimer_t* timer = (htimer_t*)malloc(sizeof(htimer_t));
+    memset(timer, 0, sizeof(htimer_t));
+    timer->event_type = HEVENT_TYPE_TIMER;
+    timer->repeat = repeat;
+    timer->timeout = timeout;
+    timer->next_timeout = gethrtime() + timeout*1000;
+    list_add(&timer->node, &loop->timers);
+    heap_insert(&loop->timer_minheap, &timer->hnode);
+    EVENT_ADD(loop, timer, cb);
+    return timer;
+}
+
+void htimer_del(htimer_t* timer) {
+    heap_remove(&timer->loop->timer_minheap, &timer->hnode);
+    EVENT_DEL(timer);
+}
+
+void hio_init(hio_t* io) {
+    memset(io, 0, sizeof(hio_t));
+    io->event_type = HEVENT_TYPE_IO;
+    io->event_index[0] = io->event_index[1] = -1;
+    // move to hwrite
+    //write_queue_init(&io->write_queue, 4);;
+};
+
+void hio_cleanup(hio_t* io) {
+    offset_buf_t* pbuf = NULL;
+    while (!write_queue_empty(&io->write_queue)) {
+        pbuf = write_queue_front(&io->write_queue);
+        SAFE_FREE(pbuf->base);
+        write_queue_pop_front(&io->write_queue);
+    }
+    write_queue_cleanup(&io->write_queue);
+}
+
+hio_t* hio_add(hloop_t* loop, hio_cb cb, int fd, int events) {
+    if (loop->ios.maxsize == 0) {
+        io_array_init(&loop->ios, IO_ARRAY_INIT_SIZE);
+    }
+
+    if (fd > loop->ios.maxsize) {
+        io_array_resize(&loop->ios, ceil2e(fd));
+    }
+
+    hio_t* io = loop->ios.ptr[fd];
+    if (io == NULL) {
+        io = (hio_t*)malloc(sizeof(hio_t));
+        memset(io, 0, sizeof(hio_t));
+        loop->ios.ptr[fd] = io;
+    }
+
+    if (!io->active || io->destroy) {
+        hio_init(io);
+        EVENT_ADD(loop, io, cb);
+    }
+
+    io->fd = fd;
+    if (cb) {
+        io->cb = (hevent_cb)cb;
+    }
+    iowatcher_add_event(loop, fd, events);
+    io->events |= events;
+    return io;
+}
+
+void hio_del(hio_t* io, int events) {
+    iowatcher_del_event(io->loop, io->fd, events);
+    io->events &= ~events;
+    if (io->events == 0) {
+        hio_cleanup(io);
+        EVENT_DEL(io);
+    }
+}
+

+ 0 - 214
event/hloop.cpp

@@ -1,214 +0,0 @@
-#include "hloop.h"
-#include "hio.h"
-#include "io_watcher.h"
-
-#include "hdef.h"
-#include "htime.h"
-
-static void hloop_update_time(hloop_t* loop) {
-    loop->cur_time = gethrtime();
-}
-
-int hloop_init(hloop_t* loop) {
-    loop->status = HLOOP_STATUS_STOP;
-    loop->event_counter = 0;
-    loop->timer_counter = 0;
-    loop->idle_counter = 0;
-    loop->min_timer_timeout = INFINITE;
-    loop->iowatcher = NULL;
-    //hloop_iowatcher_init(loop);
-    return 0;
-}
-
-void hloop_cleanup(hloop_t* loop) {
-    for (auto& pair : loop->timers) {
-        SAFE_FREE(pair.second);
-    }
-    loop->timers.clear();
-    for (auto& pair : loop->idles) {
-        SAFE_FREE(pair.second);
-    }
-    loop->idles.clear();
-    for (auto& pair : loop->ios) {
-        hio_t* io = pair.second;
-        hio_del(io);
-        SAFE_FREE(io);
-    }
-    loop->ios.clear();
-    hloop_iowatcher_cleanup(loop);
-}
-
-int hloop_handle_timers(hloop_t* loop) {
-    int ntimer = 0;
-    auto iter = loop->timers.begin();
-    while (iter != loop->timers.end()) {
-        htimer_t* timer = iter->second;
-        if (timer->destroy) goto destroy;
-        if (!timer->active) goto next;
-        if (timer->repeat == 0) goto destroy;
-        if (timer->next_timeout < loop->cur_time) {
-            ++ntimer;
-            if (timer->cb) {
-                timer->cb(timer, timer->userdata);
-            }
-            timer->next_timeout += timer->timeout*1000;
-            if (timer->repeat != INFINITE) {
-                --timer->repeat;
-            }
-        }
-next:
-        ++iter;
-        continue;
-destroy:
-        free(timer);
-        iter = loop->timers.erase(iter);
-    }
-    return ntimer;
-}
-
-int hloop_handle_idles(hloop_t* loop) {
-    int nidle = 0;
-    auto iter = loop->idles.begin();
-    while (iter != loop->idles.end()) {
-        hidle_t* idle = iter->second;
-        if (idle->destroy)  goto destroy;
-        if (!idle->active)  goto next;
-        if (idle->repeat == 0) goto destroy;
-        ++nidle;
-        if (idle->cb) {
-            idle->cb(idle, idle->userdata);
-        }
-        if (idle->repeat != INFINITE) {
-            --idle->repeat;
-        }
-next:
-        ++iter;
-        continue;
-destroy:
-        free(idle);
-        iter = loop->idles.erase(iter);
-    }
-    return nidle;
-}
-
-#define PAUSE_SLEEP_TIME        10      // ms
-#define MIN_POLL_TIMEOUT        1       // ms
-#define MAX_POLL_TIMEOUT        1000    // ms
-int hloop_run(hloop_t* loop) {
-    int ntimer, nio, nidle;
-    int poll_timeout;
-
-    loop->start_time = gethrtime();
-    loop->status = HLOOP_STATUS_RUNNING;
-    loop->loop_cnt = 0;
-    while (loop->status != HLOOP_STATUS_STOP) {
-        hloop_update_time(loop);
-        if (loop->status == HLOOP_STATUS_PAUSE) {
-            msleep(PAUSE_SLEEP_TIME);
-            continue;
-        }
-        ++loop->loop_cnt;
-        // timers -> events -> idles
-        ntimer = nio = nidle = 0;
-        poll_timeout = INFINITE;
-        if (loop->timers.size() != 0) {
-            ntimer = hloop_handle_timers(loop);
-            poll_timeout = MAX(MIN_POLL_TIMEOUT, loop->min_timer_timeout/10);
-        }
-        if (loop->ios.size() == 0 || loop->idles.size() != 0) {
-            poll_timeout = MIN(poll_timeout, MAX_POLL_TIMEOUT);
-        }
-        if (loop->ios.size() != 0) {
-            nio = hloop_handle_ios(loop, poll_timeout);
-        }
-        else {
-            msleep(poll_timeout);
-        }
-        if (ntimer == 0 && nio == 0 && loop->idles.size() != 0) {
-            nidle = hloop_handle_idles(loop);
-        }
-        //printf("loop_cnt=%lu ntimer=%d nio=%d nidle=%d\n", loop->loop_cnt, ntimer, nio, nidle);
-    }
-    loop->status = HLOOP_STATUS_STOP;
-    loop->end_time = gethrtime();
-    hloop_cleanup(loop);
-    return 0;
-}
-
-int hloop_stop(hloop_t* loop) {
-    loop->status = HLOOP_STATUS_STOP;
-    return 0;
-}
-
-int hloop_pause(hloop_t* loop) {
-    if (loop->status == HLOOP_STATUS_RUNNING) {
-        loop->status = HLOOP_STATUS_PAUSE;
-    }
-    return 0;
-}
-
-int hloop_resume(hloop_t* loop) {
-    if (loop->status == HLOOP_STATUS_PAUSE) {
-        loop->status = HLOOP_STATUS_RUNNING;
-    }
-    return 0;
-}
-
-htimer_t* htimer_add(hloop_t* loop, htimer_cb cb, void* userdata, uint64_t timeout, uint32_t repeat) {
-    htimer_t* timer = (htimer_t*)malloc(sizeof(htimer_t));
-    memset(timer, 0, sizeof(htimer_t));
-    timer->event_type = HEVENT_TYPE_TIMER;
-    timer->event_id = ++loop->event_counter;
-    timer->loop = loop;
-    timer->timer_id = ++loop->timer_counter;
-    timer->cb = cb;
-    timer->userdata = userdata;
-    timer->timeout = timeout;
-    timer->repeat = repeat;
-    timer->next_timeout = gethrtime() + timeout*1000;
-    timer->active = 1;
-    loop->timers[timer->timer_id] = timer;
-    loop->min_timer_timeout = MIN(timeout, loop->min_timer_timeout);
-    return timer;
-}
-
-void htimer_del(htimer_t* timer) {
-    timer->active = 0;
-    timer->destroy = 1;
-}
-
-void htimer_del(hloop_t* loop, uint32_t timer_id) {
-    auto iter = loop->timers.find(timer_id);
-    if (iter != loop->timers.end()) {
-        htimer_t* timer = iter->second;
-        htimer_del(timer);
-    }
-}
-
-hidle_t* hidle_add(hloop_t* loop, hidle_cb cb, void* userdata, uint32_t repeat) {
-    hidle_t* idle = (hidle_t*)malloc(sizeof(hidle_t));
-    memset(idle, 0, sizeof(hidle_t));
-    idle->event_type = HEVENT_TYPE_IDLE;
-    idle->event_id = ++loop->event_counter;
-    idle->loop = loop;
-    idle->idle_id = ++loop->idle_counter;
-    idle->cb = cb;
-    idle->userdata = userdata;
-    idle->repeat = repeat;
-    idle->active = 1;
-    loop->idles[idle->idle_id] = idle;
-    return idle;
-}
-
-void hidle_del(hidle_t* idle) {
-    idle->active = 0;
-    idle->destroy = 1;
-}
-
-void hidle_del(hloop_t* loop, uint32_t idle_id) {
-    auto iter = loop->idles.find(idle_id);
-    if (iter != loop->idles.end()) {
-        hidle_t* idle = iter->second;
-        hidle_del(idle);
-    }
-}

+ 118 - 96
event/hloop.h

@@ -3,131 +3,142 @@
 
 #include "hdef.h"
 
-#include <map>
-#include <list>
+BEGIN_EXTERN_C
+
+#include "htime.h"
+#include "array.h"
+#include "list.h"
+#include "heap.h"
+#include "queue.h"
+#include "hbuf.h"
 
 typedef struct hloop_s  hloop_t;
 typedef struct hevent_s hevent_t;
-typedef struct htimer_s htimer_t;
+
 typedef struct hidle_s  hidle_t;
+typedef struct htimer_s htimer_t;
 typedef struct hio_s    hio_t;
 
-typedef void (*hevent_cb)   (hevent_t* ev,      void* userdata);
-typedef void (*htimer_cb)   (htimer_t* timer,   void* userdata);
-typedef void (*hidle_cb)    (hidle_t* idle,     void* userdata);
-typedef void (*hio_cb)      (hio_t* io,         void* userdata);
+typedef void (*hevent_cb)   (hevent_t* ev);
+typedef void (*hidle_cb)    (hidle_t* idle);
+typedef void (*htimer_cb)   (htimer_t* timer);
+typedef void (*hio_cb)      (hio_t* io);
 
-typedef void (*hread_cb)    (hio_t* io, void* buf, int readbytes, void* userdata);
-typedef void (*hwrite_cb)   (hio_t* io, const void* buf, int writebytes, void* userdata);
-typedef void (*hclose_cb)   (hio_t* io, void* userdata);
-typedef void (*haccept_cb)  (hio_t* io, int connfd, void* userdata);
-typedef void (*hconnect_cb) (hio_t* io, int state,  void* userdata);
-
-typedef enum {
-    HLOOP_STATUS_STOP,
-    HLOOP_STATUS_RUNNING,
-    HLOOP_STATUS_PAUSE
-} hloop_status_e;
-
-struct hloop_s {
-    hloop_status_e status;
-    uint64_t    start_time;
-    uint64_t    end_time;
-    uint64_t    cur_time;
-    uint64_t    loop_cnt;
-    var         custom_data;
-//private:
-    uint64_t                    event_counter;
-    // timers
-    // timer_id => timer
-    uint32_t                    timer_counter;
-    std::map<int, htimer_t*>    timers;
-    uint32_t                    min_timer_timeout;
-    // idles
-    // hidle_id => idle
-    uint32_t                    idle_counter;
-    std::map<int, hidle_t*>     idles;
-    // ios
-    // fd => io
-    std::map<int, hio_t*>       ios;
-    void*                       iowatcher;
-};
+typedef void (*haccept_cb)  (hio_t* io, int connfd);
+typedef void (*hconnect_cb) (hio_t* io, int state);
+typedef void (*hread_cb)    (hio_t* io, void* buf, int readbytes);
+typedef void (*hwrite_cb)   (hio_t* io, const void* buf, int writebytes);
+typedef void (*hclose_cb)   (hio_t* io);
 
 typedef enum {
     HEVENT_TYPE_NONE    = 0,
-    HEVENT_TYPE_TIMER   = 0x0001,
-    HEVENT_TYPE_IDLE    = 0x0002,
+    HEVENT_TYPE_IDLE    = 0x0001,
+    HEVENT_TYPE_TIMER   = 0x0002,
     HEVENT_TYPE_IO      = 0x0004,
 } hevent_type_e;
 
-#define HEVENT_FIELDS               \
-    hloop_t*        loop;           \
-    hevent_type_e   event_type;     \
-    uint64_t        event_id;       \
-    int             priority;       \
-    var             custom_data;
+#define HEVENT_LOWEST_PRIORITY     -10
+#define HEVENT_LOW_PRIORITY        -5
+#define HEVENT_NORMAL_PRIORITY      0
+#define HEVENT_HIGH_PRIORITY        5
+#define HEVENT_HIGHEST_PRIORITY     10
+#define HEVENT_PRIORITY_SIZE  (HEVENT_HIGHEST_PRIORITY-HEVENT_LOWEST_PRIORITY+1)
+#define HEVENT_PRIORITY_INDEX(priority) (priority-HEVENT_LOWEST_PRIORITY)
 
 #define HEVENT_FLAGS        \
     unsigned    destroy :1; \
     unsigned    active  :1; \
     unsigned    pending :1;
 
+#define HEVENT_FIELDS                   \
+    hloop_t*            loop;           \
+    hevent_type_e       event_type;     \
+    uint64_t            event_id;       \
+    hevent_cb           cb;             \
+    void*               userdata;       \
+    int                 priority;       \
+    struct hevent_s*    pending_next;   \
+    HEVENT_FLAGS
+
 struct hevent_s {
     HEVENT_FIELDS
-//private:
-    HEVENT_FLAGS
 };
 
-struct htimer_s {
+struct hidle_s {
     HEVENT_FIELDS
-    uint32_t    timer_id;
-    uint32_t    timeout;
     uint32_t    repeat;
-    htimer_cb   cb;
-    void*       userdata;
 //private:
-    uint64_t    next_timeout;
-    HEVENT_FLAGS
+    struct list_node node;
 };
 
-struct hidle_s {
+struct htimer_s {
     HEVENT_FIELDS
-    uint32_t    idle_id;
     uint32_t    repeat;
-    hidle_cb    cb;
-    void*       userdata;
+    uint32_t    timeout;
 //private:
-    HEVENT_FLAGS
+    uint64_t    next_timeout;
+    struct list_node node;
+    struct heap_node hnode;
 };
 
+QUEUE_DECL(offset_buf_t, write_queue);
+
 struct hio_s {
     HEVENT_FIELDS
+    unsigned    accept      :1;
+    unsigned    connect     :1;
+    unsigned    closed      :1;
     int         fd;
     int         error;
-    char*       readbuf;
-    int         readbuflen;
+    int         events;
+    int         revents;
+    hbuf_t              readbuf;
+    struct write_queue  write_queue;
     // callbacks
     hread_cb    read_cb;
-    void*       read_userdata;
     hwrite_cb   write_cb;
-    void*       write_userdata;
     hclose_cb   close_cb;
-    void*       close_userdata;
     haccept_cb  accept_cb;
-    void*       accept_userdata;
     hconnect_cb connect_cb;
-    void*       connect_userdata;
 //private:
-    hio_cb      revent_cb;
-    void*       revent_userdata;
-    hio_cb      wevent_cb;
-    void*       wevent_userdata;
     int         event_index[2];
-    int         events;
-    int         revents;
-    HEVENT_FLAGS
-    unsigned    accept      :1;
-    unsigned    connect     :1;
+};
+
+typedef enum {
+    HLOOP_STATUS_STOP,
+    HLOOP_STATUS_RUNNING,
+    HLOOP_STATUS_PAUSE
+} hloop_status_e;
+
+ARRAY_DECL(hio_t*, io_array);
+
+struct hloop_s {
+    hloop_status_e status;
+    time_t      start_time;
+    // hrtime: us
+    uint64_t    start_hrtime;
+    uint64_t    end_hrtime;
+    uint64_t    cur_hrtime;
+    uint64_t    loop_cnt;
+    void*       userdata;
+//private:
+    // events
+    uint64_t                    event_counter;
+    uint32_t                    nevents;
+    uint32_t                    nactives;
+    uint32_t                    npendings;
+    // pendings: with priority as array.index
+    hevent_t*                   pendings[HEVENT_PRIORITY_SIZE];
+    // idles
+    struct list_head            idles;
+    uint64_t                    idle_time;
+    uint64_t                    last_idle_hrtime;
+    // timers
+    struct list_head            timers;
+    struct heap                 timer_minheap;
+    // ios: with fd as array.index
+    struct io_array             ios;
+    void*                       iowatcher;
 };
 
 // loop
@@ -138,27 +149,38 @@ int hloop_stop(hloop_t* loop);
 int hloop_pause(hloop_t* loop);
 int hloop_resume(hloop_t* loop);
 
-// timer
-// @param timeout: unit(ms)
-htimer_t*   htimer_add(hloop_t* loop, htimer_cb cb, void* userdata, uint64_t timeout, uint32_t repeat = INFINITE);
-void        htimer_del(hloop_t* loop, uint32_t timer_id);
-void        htimer_del(htimer_t* timer);
+static inline void hloop_update_time(hloop_t* loop) {
+    loop->cur_hrtime = gethrtime();
+}
+
+static inline time_t hloop_now(hloop_t* loop) {
+    return loop->start_time + (loop->cur_hrtime - loop->start_hrtime) / 1000000;
+}
 
 // idle
-hidle_t*    hidle_add(hloop_t* loop, hidle_cb cb, void* userdata, uint32_t repeat = INFINITE);
-void        hidle_del(hloop_t* loop, uint32_t idle_id);
+hidle_t*    hidle_add(hloop_t* loop, hidle_cb cb, uint32_t repeat DEFAULT(INFINITE));
 void        hidle_del(hidle_t* idle);
 
+// timer
+// @param timeout: unit(ms)
+htimer_t*   htimer_add(hloop_t* loop, htimer_cb cb, uint64_t timeout, uint32_t repeat DEFAULT(INFINITE));
+void        htimer_del(htimer_t* timer);
+
 // io
-hio_t* haccept  (hloop_t* loop, int listenfd, haccept_cb accept_cb, void* accept_userdata,
-                    hclose_cb close_cb = NULL, void* close_userdata = NULL);
-hio_t* hconnect (hloop_t* loop, int connfd, hconnect_cb connect_cb, void* connect_userdata,
-                    hclose_cb close_cb = NULL, void* close_userdata = NULL);
-hio_t* hread    (hloop_t* loop, int fd, void* buf, size_t len, hread_cb read_cb, void* read_userdata,
-                    hclose_cb close_cb = NULL, void* close_userdata = NULL);
-hio_t* hwrite   (hloop_t* loop, int fd, const void* buf, size_t len,
-                    hwrite_cb write_cb = NULL, void* write_userdata = NULL,
-                    hclose_cb close_cb = NULL, void* close_userdata = NULL);
+// frist level apis
+#define READ_EVENT  0x0001
+#define WRITE_EVENT 0x0004
+#define ALL_EVENTS  READ_EVENT|WRITE_EVENT
+hio_t*      hio_add(hloop_t* loop, hio_cb cb, int fd, int events DEFAULT(READ_EVENT));
+void        hio_del(hio_t* io, int events DEFAULT(ALL_EVENTS));
+
+// second level apis
+hio_t* haccept  (hloop_t* loop, int listenfd, haccept_cb accept_cb);
+hio_t* hconnect (hloop_t* loop, int connfd, hconnect_cb connect_cb);
+hio_t* hread    (hloop_t* loop, int fd, void* buf, size_t len, hread_cb read_cb);
+hio_t* hwrite   (hloop_t* loop, int fd, const void* buf, size_t len, hwrite_cb write_cb DEFAULT(NULL));
 void   hclose   (hio_t* io);
 
+END_EXTERN_C
+
 #endif // HW_LOOP_H_

+ 0 - 40
event/io_watcher.cpp

@@ -1,40 +0,0 @@
-#include "io_watcher.h"
-#include "hio.h"
-
-#include "hdef.h"
-#include "hlog.h"
-#include "hsocket.h"
-
-int hloop_iowatcher_init(hloop_t* loop) {
-    return iowatcher_init(loop);
-}
-
-int hloop_iowatcher_cleanup(hloop_t* loop) {
-    return iowatcher_cleanup(loop);
-}
-
-static void remove_bad_fds(hloop_t* loop) {
-    int error = 0;
-    socklen_t optlen = sizeof(int);
-    int ret = 0;
-    auto iter = loop->ios.begin();
-    while (iter != loop->ios.end()) {
-        int fd = iter->first;
-        ret = getsockopt(fd, SOL_SOCKET, SO_ERROR, (char*)&error, &optlen);
-        if (ret < 0 || error != 0) {
-            hloge("getsockopt fd=%d retval=%d SO_ERROR=%d", fd, ret, error);
-            hio_del(iter->second);
-            continue;
-        }
-        ++iter;
-    }
-}
-
-int hloop_handle_ios(hloop_t* loop, int timeout) {
-    int nevent = iowatcher_poll_events(loop, timeout);
-    if (nevent < 0) {
-        hloge("poll_events error=%d", -nevent);
-        remove_bad_fds(loop);
-    }
-    return nevent;
-}

+ 0 - 54
event/io_watcher.h

@@ -1,54 +0,0 @@
-#ifndef IO_WATCHER_H_
-#define IO_WATCHER_H_
-
-#include "hloop.h"
-
-int hloop_iowatcher_init(hloop_t* loop);
-int hloop_iowatcher_cleanup(hloop_t* loop);
-int hloop_handle_ios(hloop_t* loop, int timeout);
-
-#include "hplatform.h"
-#if !defined(EVENT_SELECT) && !defined(EVENT_POLL) && !defined(EVENT_EPOLL) && \
-    !defined(EVENT_IOCP) && !defined(EVENT_KQUEUE) && !defined(EVENT_NOEVENT)
-#ifdef OS_WIN
-#define EVENT_IOCP
-#elif defined(OS_LINUX)
-#define EVENT_EPOLL
-#elif defined(OS_MAC)
-#define EVENT_KQUEUE
-#elif defined(OS_BSD)
-#define EVENT_KQUEUE
-#else
-#define EVENT_SELECT
-#endif
-#endif
-
-#if defined(EVENT_POLL)
-#include <sys/poll.h>
-#define READ_EVENT  POLLIN
-#define WRITE_EVENT POLLOUT
-#elif defined(EVENT_EPOLL)
-#include <sys/epoll.h>
-#define READ_EVENT  EPOLLIN
-#define WRITE_EVENT EPOLLOUT
-#elif defined(EVENT_KQUEUE)
-#include <sys/event.h>
-#define READ_EVENT  EVFILT_READ
-#define WRITE_EVENT EVFILT_WRITE
-#else
-#define READ_EVENT  0x0001
-#define WRITE_EVENT 0x0004
-#endif
-
-#define ALL_EVENTS  READ_EVENT|WRITE_EVENT
-
-#define READ_INDEX  0
-#define WRITE_INDEX 1
-#define EVENT_INDEX(type) ((type == READ_EVENT) ? READ_INDEX : WRITE_INDEX)
-int iowatcher_init(hloop_t* loop);
-int iowatcher_cleanup(hloop_t* loop);
-int iowatcher_add_event(hio_t* fd, int events);
-int iowatcher_del_event(hio_t* fd, int events);
-int iowatcher_poll_events(hloop_t* loop, int timeout);
-
-#endif

+ 71 - 0
event/iocp.c

@@ -0,0 +1,71 @@
+#include "iowatcher.h"
+
+#ifdef EVENT_IOCP
+#include "hplatform.h"
+#include "hdef.h"
+
+typedef struct iocp_ctx_s {
+    HANDLE      iocp;
+} iocp_ctx_t;
+
+int iowatcher_init(hloop_t* loop) {
+    if (loop->iowatcher)    return 0;
+    iocp_ctx_t* iocp_ctx = (iocp_ctx_t*)malloc(sizeof(iocp_ctx_t));
+    iocp_ctx->iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
+    loop->iowatcher = iocp_ctx;
+    return 0;
+}
+
+int iowatcher_cleanup(hloop_t* loop) {
+    if (loop->iowatcher == NULL) return 0;
+    iocp_ctx_t* iocp_ctx = (iocp_ctx_t*)loop->iowatcher;
+    CloseHandle(iocp_ctx->iocp);
+    SAFE_FREE(loop->iowatcher);
+    return 0;
+}
+
+int iowatcher_add_event(hloop_t* loop, int fd, int events) {
+    if (loop->iowatcher == NULL) {
+        iowatcher_init(loop);
+    }
+    iocp_ctx_t* iocp_ctx = (iocp_ctx_t*)loop->iowatcher;
+    HANDLE h = CreateIoCompletionPort((HANDLE)fd, iocp_ctx->iocp, (ULONG_PTR)events, 0);
+    return 0;
+}
+
+int iowatcher_del_event(hloop_t* loop, int fd, int events) {
+    return 0;
+}
+
+int iowatcher_poll_events(hloop_t* loop, int timeout) {
+    if (loop->iowatcher == NULL) return 0;
+    iocp_ctx_t* iocp_ctx = (iocp_ctx_t*)loop->iowatcher;
+    DWORD bytes = 0;
+    ULONG_PTR key = 0;
+    LPOVERLAPPED povlp = NULL;
+    timeout = 3000;
+    BOOL bRet = GetQueuedCompletionStatus(iocp_ctx->iocp, &bytes, &key, &povlp, timeout);
+    int err = 0;
+    if (bRet == 0) {
+        err = GetLastError();
+    }
+    if (err) {
+        if (err == ERROR_NETNAME_DELETED ||
+            err == ERROR_OPERATION_ABORTED) {
+            return 0;
+        }
+        if (povlp == NULL) {
+            if (err == WAIT_TIMEOUT) return 0;
+            return -1;
+        }
+    }
+    if (povlp == NULL) {
+        return -1;
+    }
+    if (key == NULL) {
+        return -1;
+    }
+    ULONG_PTR revents = key;
+    return 1;
+}
+#endif

+ 0 - 25
event/iocp.cpp

@@ -1,25 +0,0 @@
-#include "hevent.h"
-
-#ifdef EVENT_IOCP
-int _event_init(hloop_t* loop) {
-    loop->event_ctx = NULL;
-    return 0;
-}
-
-int _event_cleanup(hloop_t* loop) {
-    loop->event_ctx = NULL;
-    return 0;
-}
-
-int _add_event(hevent_t* event, int type) {
-    return 0;
-}
-
-int _del_event(hevent_t* event, int type) {
-    return 0;
-}
-
-int _handle_events(hloop_t* loop, int timeout) {
-    return 0;
-}
-#endif

+ 33 - 0
event/iowatcher.h

@@ -0,0 +1,33 @@
+#ifndef IO_WATCHER_H_
+#define IO_WATCHER_H_
+
+#include "hloop.h"
+
+#include "hplatform.h"
+#if !defined(EVENT_SELECT) &&   \
+    !defined(EVENT_EPOLL) &&    \
+    !defined(EVENT_POLL) &&     \
+    !defined(EVENT_KQUEUE) &&   \
+    !defined(EVENT_IOCP) &&     \
+    !defined(EVENT_PORT) &&     \
+    !defined(EVENT_NOEVENT)
+#ifdef OS_WIN
+#define EVENT_SELECT
+#elif defined(OS_LINUX)
+#define EVENT_EPOLL
+#elif defined(OS_MAC)
+#define EVENT_KQUEUE
+#elif defined(OS_BSD)
+#define EVENT_KQUEUE
+#else
+#define EVENT_SELECT
+#endif
+#endif
+
+int iowatcher_init(hloop_t* loop);
+int iowatcher_cleanup(hloop_t* loop);
+int iowatcher_add_event(hloop_t* loop, int fd, int events);
+int iowatcher_del_event(hloop_t* loop, int fd, int events);
+int iowatcher_poll_events(hloop_t* loop, int timeout);
+
+#endif

+ 43 - 34
event/kqueue.cpp → event/kqueue.c

@@ -1,10 +1,18 @@
-#include "io_watcher.h"
+#include "iowatcher.h"
 
 #ifdef EVENT_KQUEUE
 #include "hplatform.h"
 #include "hdef.h"
 
-#define INIT_EVENTS_NUM     64
+#include <sys/event.h>
+
+#include "hevent.h"
+
+#define EVENTS_INIT_SIZE     64
+
+#define READ_INDEX  0
+#define WRITE_INDEX 1
+#define EVENT_INDEX(type) ((type == EVFILT_READ) ? READ_INDEX : WRITE_INDEX)
 
 typedef struct kqueue_ctx_s {
     int kqfd;
@@ -26,7 +34,7 @@ int iowatcher_init(hloop_t* loop) {
     if (loop->iowatcher) return 0;
     kqueue_ctx_t* kqueue_ctx = (kqueue_ctx_t*)malloc(sizeof(kqueue_ctx_t));
     kqueue_ctx->kqfd = kqueue();
-    kqueue_ctx->capacity = INIT_EVENTS_NUM;
+    kqueue_ctx->capacity = EVENTS_INIT_SIZE;
     kqueue_ctx->nchanges = 0;
     int bytes = sizeof(struct kevent) * kqueue_ctx->capacity;
     kqueue_ctx->changes = (struct kevent*)malloc(bytes);
@@ -47,12 +55,12 @@ int iowatcher_cleanup(hloop_t* loop) {
     return 0;
 }
 
-static int __add_event(hio_t* io, int event) {
-    hloop_t* loop = io->loop;
+static int __add_event(hloop_t* loop, int fd, int event) {
     if (loop->iowatcher == NULL) {
-        hloop_iowatcher_init(loop);
+        iowatcher_init(loop);
     }
     kqueue_ctx_t* kqueue_ctx = (kqueue_ctx_t*)loop->iowatcher;
+    struct hio_t* io = loop->ios.ptr[fd];
     int idx = io->event_index[EVENT_INDEX(event)];
     if (idx < 0) {
         io->event_index[EVENT_INDEX(event)] = idx = kqueue_ctx->nchanges;
@@ -61,15 +69,10 @@ static int __add_event(hio_t* io, int event) {
             kqueue_ctx_resize(kqueue_ctx, kqueue_ctx->capacity*2);
         }
         memset(kqueue_ctx->changes+idx, 0, sizeof(struct kevent));
-        kqueue_ctx->changes[idx].ident = io->fd;
-    }
-    assert(kqueue_ctx->changes[idx].ident == io->fd);
-    if (events & READ_EVENT) {
-        kqueue_ctx->changes[idx].filter = EVFILT_READ;
-    }
-    else if (events & WRITE_EVENT) {
-        kqueue_ctx->changes[idx].filter = EVFILT_WRITE;
+        kqueue_ctx->changes[idx].ident = fd;
     }
+    assert(kqueue_ctx->changes[idx].ident == fd);
+    kqueue_ctx->changes[idx].filter = event;
     kqueue_ctx->changes[idx].flags = EV_ADD|EV_ENABLE;
     struct timespec ts;
     ts.tv_sec = 0;
@@ -78,23 +81,23 @@ static int __add_event(hio_t* io, int event) {
     return 0;
 }
 
-int iowatcher_add_event(hio_t* io, int events) {
+int iowatcher_add_event(hloop_t* loop, int fd, int events) {
     if (events & READ_EVENT) {
-        __add_event(event, READ_EVENT);
+        __add_event(loop, fd, EVFILT_READ);
     }
     if (events & WRITE_EVENT) {
-        __add_event(event, WRITE_EVENT);
+        __add_event(loop, fd, EVFILT_WRITE);
     }
     return 0;
 }
 
-static int __del_event(hio_t* io, int event) {
-    hloop_t* loop = io->loop;
+static int __del_event(hloop_t* loop, int fd, int event) {
     kqueue_ctx_t* kqueue_ctx = (kqueue_ctx_t*)loop->iowatcher;
     if (kqueue_ctx == NULL) return 0;
+    struct hio_t* io = loop->ios.ptr[fd];
     int idx = io->event_index[EVENT_INDEX(event)];
     if (idx < 0) return 0;
-    assert(kqueue_ctx->changes[idx].ident == io->fd);
+    assert(kqueue_ctx->changes[idx].ident == fd);
     kqueue_ctx->changes[idx].flags = EV_DELETE;
     io->event_index[EVENT_INDEX(event)] = -1;
     int lastidx = kqueue_ctx->nchanges - 1;
@@ -104,9 +107,9 @@ static int __del_event(hio_t* io, int event) {
         tmp = kqueue_ctx->changes[idx];
         kqueue_ctx->changes[idx] = kqueue_ctx->changes[lastidx];
         kqueue_ctx->changes[lastidx] = tmp;
-        auto iter = loop->events.find(kqueue_ctx->changes[idx].ident);
-        if (iter != loop->events.end()) {
-            iter->second->event_index[kqueue_ctx->changes[idx].filter == EVFILT_READ ? READ_INDEX : WRITE_INDEX] = idx;
+        hio_t* last = kqueue_ctx->changes[idx].ident;
+        if (last) {
+            last->event_index[EVENT_INDEX(kqueue_ctx->chages[idx].filter)] = idx;
         }
     }
     struct timespec ts;
@@ -117,12 +120,12 @@ static int __del_event(hio_t* io, int event) {
     return 0;
 }
 
-int iowatcher_del_event(hio_t* io, int events) {
+int iowatcher_del_event(hloop_t* loop, int fd, int events) {
     if (events & READ_EVENT) {
-        __del_event(io, READ_EVENT);
+        __del_event(loop, fd, EVFILT_READ);
     }
     if (events & WRITE_EVENT) {
-        __del_event(io, WRITE_EVENT);
+        __del_event(loop, fd, EVFILT_WRITE);
     }
     return 0;
 }
@@ -146,19 +149,25 @@ int iowatcher_poll_events(hloop_t* loop, int timeout) {
         return nkqueue;
     }
     if (nkqueue == 0) return 0;
-    int nevent = 0;
+    int nevents = 0;
     for (int i = 0; i < nkqueue; ++i) {
-        if (nevent == nkqueue) break;
         if (kqueue_ctx->events[i].flags & EV_ERROR) {
             continue;
         }
-        ++nevent;
+        ++nevents;
         int fd = kqueue_ctx->events[i].ident;
-        int revent = kqueue_ctx->events[i].filter;
-        hio_t* io = hio_get(loop, fd);
-        if (io == NULL) continue;
-        io->revents = revent;
-        hio_handle_events(io);
+        int revents = kqueue_ctx->events[i].filter;
+        hio_t* io = loop->ios.ptr[fd];
+        if (io) {
+            if (revents | EVFILT_READ) {
+                io->revents |= EVFILT_READ;
+            }
+            if (revents | EVFILT_WRITE) {
+                io->revents |= EVFILT_WRITE;
+            }
+            EVENT_PENDING(io);
+        }
+        if (nevents == nkqueue) break;
     }
     return nevent;
 }

+ 283 - 0
event/nio.c

@@ -0,0 +1,283 @@
+#include "iowatcher.h"
+#ifndef EVENT_IOCP
+#include "hsocket.h"
+
+static void nio_accept(hio_t* io) {
+    //printd("nio_accept listenfd=%d\n", io->fd);
+    struct sockaddr_in peeraddr;
+    socklen_t addrlen;
+    //struct sockaddr_in localaddr;
+    //addrlen = sizeof(struct sockaddr_in);
+    //getsockname(io->fd, (struct sockaddr*)&localaddr, &addrlen);
+accept:
+    addrlen = sizeof(struct sockaddr_in);
+    int connfd = accept(io->fd, (struct sockaddr*)&peeraddr, &addrlen);
+    if (connfd < 0) {
+        if (sockerrno == NIO_EAGAIN) {
+            //goto accept_done;
+            return;
+        }
+        else {
+            io->error = sockerrno;
+            perror("accept");
+            goto accept_error;
+        }
+    }
+    //printd("accept connfd=%d [%s:%d] <= [%s:%d]\n", connfd,
+            //inet_ntoa(localaddr.sin_addr), ntohs(localaddr.sin_port),
+            //inet_ntoa(peeraddr.sin_addr), ntohs(peeraddr.sin_port));
+
+    if (io->accept_cb) {
+        printd("accept_cb------\n");
+        io->accept_cb(io, connfd);
+        printd("accept_cb======\n");
+    }
+
+    goto accept;
+
+accept_error:
+    hclose(io);
+}
+
+static void nio_connect(hio_t* io) {
+    //printd("nio_connect connfd=%d\n", io->fd);
+    int state = 0;
+    struct sockaddr_in peeraddr;
+    socklen_t addrlen;
+    addrlen = sizeof(struct sockaddr_in);
+    int ret = getpeername(io->fd, (struct sockaddr*)&peeraddr, &addrlen);
+    if (ret < 0) {
+        io->error = sockerrno;
+        //printd("connect failed: %s: %d\n", strerror(sockerrno), sockerrno);
+        state = 0;
+    }
+    else {
+        //struct sockaddr_in localaddr;
+        //addrlen = sizeof(struct sockaddr_in);
+        //getsockname(ioent->fd, (struct sockaddr*)&localaddr, &addrlen);
+        //printd("connect connfd=%d [%s:%d] => [%s:%d]\n", io->fd,
+                //inet_ntoa(localaddr.sin_addr), ntohs(localaddr.sin_port),
+                //inet_ntoa(peeraddr.sin_addr), ntohs(peeraddr.sin_port));
+        state = 1;
+    }
+    if (io->connect_cb) {
+        printd("connect_cb------\n");
+        io->connect_cb(io, state);
+        printd("connect_cb======\n");
+    }
+}
+
+static void nio_read(hio_t* io) {
+    //printd("nio_read fd=%d\n", io->fd);
+    int nread;
+    void* buf = io->readbuf.base;
+    int   len = io->readbuf.len;
+read:
+    memset(buf, 0, len);
+    nread = read(io->fd, buf, len);
+    //printd("read retval=%d\n", nread);
+    if (nread < 0) {
+        if (sockerrno == NIO_EAGAIN) {
+            //goto read_done;
+            return;
+        }
+        else {
+            io->error = sockerrno;
+            perror("read");
+            goto read_error;
+        }
+    }
+    if (nread == 0) {
+        goto disconnect;
+    }
+    //printd("> %s\n", buf);
+    if (io->read_cb) {
+        printd("read_cb------\n");
+        io->read_cb(io, buf, nread);
+        printd("read_cb======\n");
+    }
+    if (nread == len) {
+        goto read;
+    }
+    return;
+read_error:
+disconnect:
+    hclose(io);
+}
+
+static void nio_write(hio_t* io) {
+    //printd("nio_write fd=%d\n", io->fd);
+    int nwrite = 0;
+write:
+    if (write_queue_empty(&io->write_queue)) {
+        return;
+    }
+    offset_buf_t* pbuf = write_queue_front(&io->write_queue);
+    char* buf = pbuf->base + pbuf->offset;
+    int len = pbuf->len - pbuf->offset;
+    nwrite = write(io->fd, buf, len);
+    //printd("write retval=%d\n", nwrite);
+    if (nwrite < 0) {
+        if (sockerrno == NIO_EAGAIN) {
+            //goto write_done;
+            return;
+        }
+        else {
+            io->error = sockerrno;
+            perror("write");
+            goto write_error;
+        }
+    }
+    if (nwrite == 0) {
+        goto disconnect;
+    }
+    if (io->write_cb) {
+        printd("write_cb------\n");
+        io->write_cb(io, buf, nwrite);
+        printd("write_cb======\n");
+    }
+    pbuf->offset += nwrite;
+    if (nwrite == len) {
+        SAFE_FREE(pbuf->base);
+        write_queue_pop_front(&io->write_queue);
+        // write next
+        goto write;
+    }
+    return;
+write_error:
+disconnect:
+    hclose(io);
+}
+
+static void hio_handle_events(hio_t* io) {
+    if ((io->events & READ_EVENT) && (io->revents & READ_EVENT)) {
+        if (io->accept) {
+            nio_accept(io);
+        }
+        else {
+            nio_read(io);
+        }
+    }
+
+    if ((io->events & WRITE_EVENT) && (io->revents & WRITE_EVENT)) {
+        if (io->connect) {
+            // NOTE: connect just do once
+            // ONESHOT
+            hio_del(io, WRITE_EVENT);
+            io->connect = 0;
+
+            nio_connect(io);
+        }
+        else {
+            nio_write(io);
+            // NOTE: del WRITE_EVENT, if write_queue empty
+            if (write_queue_empty(&io->write_queue)) {
+                hio_del(io, WRITE_EVENT);
+            }
+        }
+    }
+
+    io->revents = 0;
+}
+
+hio_t* haccept  (hloop_t* loop, int listenfd, haccept_cb accept_cb) {
+    hio_t* io = hio_add(loop, hio_handle_events, listenfd, READ_EVENT);
+    if (io == NULL) return NULL;
+    if (accept_cb) {
+        io->accept_cb = accept_cb;
+    }
+    io->accept = 1;
+    nonblocking(listenfd);
+    return io;
+}
+
+hio_t* hconnect (hloop_t* loop, int connfd, hconnect_cb connect_cb) {
+    hio_t* io = hio_add(loop, hio_handle_events, connfd, WRITE_EVENT);
+    if (io == NULL) return NULL;
+    if (connect_cb) {
+        io->connect_cb = connect_cb;
+    }
+    io->connect = 1;
+    nonblocking(connfd);
+    return io;
+}
+
+hio_t* hread    (hloop_t* loop, int fd, void* buf, size_t len, hread_cb read_cb) {
+    hio_t* io = hio_add(loop, hio_handle_events, fd, READ_EVENT);
+    if (io == NULL) return NULL;
+    io->readbuf.base = (char*)buf;
+    io->readbuf.len = len;
+    if (read_cb) {
+        io->read_cb = read_cb;
+    }
+    return io;
+}
+
+hio_t* hwrite   (hloop_t* loop, int fd, const void* buf, size_t len, hwrite_cb write_cb) {
+    hio_t* io = hio_add(loop, hio_handle_events, fd, 0);
+    if (io == NULL) return NULL;
+    if (write_cb) {
+        io->write_cb = write_cb;
+    }
+    int nwrite = 0;
+    if (write_queue_empty(&io->write_queue)) {
+try_write:
+        nwrite = write(fd, buf, len);
+        //printd("write retval=%d\n", nwrite);
+        if (nwrite < 0) {
+            if (sockerrno == NIO_EAGAIN) {
+                nwrite = 0;
+                goto enqueue;
+            }
+            else {
+                perror("write");
+                io->error = sockerrno;
+                goto write_error;
+            }
+        }
+        if (nwrite == 0) {
+            goto disconnect;
+        }
+        if (write_cb) {
+            printd("try_write_cb------\n");
+            write_cb(io, buf, nwrite);
+            printd("try_write_cb======\n");
+        }
+        if (nwrite == len) {
+            //goto write_done;
+            return io;
+        }
+        hio_add(loop, hio_handle_events, fd, WRITE_EVENT);
+    }
+enqueue:
+    if (nwrite < len) {
+        offset_buf_t rest;
+        rest.len = len;
+        rest.offset = nwrite;
+        // NOTE: free in nio_write;
+        rest.base = (char*)malloc(rest.len);
+        if (rest.base == NULL) return io;
+        memcpy(rest.base, (char*)buf, rest.len);
+        if (io->write_queue.maxsize == 0) {
+            write_queue_init(&io->write_queue, 4);
+        }
+        write_queue_push_back(&io->write_queue, &rest);
+    }
+    return io;
+write_error:
+disconnect:
+    hclose(io);
+    return io;
+}
+
+void   hclose   (hio_t* io) {
+    //printd("close fd=%d\n", io->fd);
+    if (io->closed) return;
+    close(io->fd);
+    io->closed = 1;
+    if (io->close_cb) {
+        io->close_cb(io);
+    }
+    hio_del(io, ALL_EVENTS);
+}
+#endif

+ 0 - 239
event/nio.cpp

@@ -1,239 +0,0 @@
-#include "hloop.h"
-#include "hio.h"
-#include "hsocket.h"
-
-static void on_accept(hio_t* io, void* userdata) {
-    //printf("on_accept listenfd=%d\n", io->fd);
-    struct sockaddr_in peeraddr;
-    socklen_t addrlen;
-    //struct sockaddr_in localaddr;
-    //addrlen = sizeof(struct sockaddr_in);
-    //getsockname(io->fd, (struct sockaddr*)&localaddr, &addrlen);
-accept:
-    addrlen = sizeof(struct sockaddr_in);
-    int connfd = accept(io->fd, (struct sockaddr*)&peeraddr, &addrlen);
-    if (connfd < 0) {
-        if (sockerrno == NIO_EAGAIN) {
-            //goto accept_done;
-            return;
-        }
-        else {
-            perror("accept");
-            goto accept_error;
-        }
-    }
-    //printf("accept connfd=%d [%s:%d] <= [%s:%d]\n", connfd,
-            //inet_ntoa(localaddr.sin_addr), ntohs(localaddr.sin_port),
-            //inet_ntoa(peeraddr.sin_addr), ntohs(peeraddr.sin_port));
-
-    if (io->accept_cb) {
-        io->accept_cb(io, connfd, io->accept_userdata);
-    }
-
-    goto accept;
-
-accept_error:
-    hclose(io);
-}
-
-static void on_connect(hio_t* io, void* userdata) {
-    //printf("on_connect connfd=%d\n", io->fd);
-    int state = 0;
-    struct sockaddr_in peeraddr;
-    socklen_t addrlen;
-    addrlen = sizeof(struct sockaddr_in);
-    int ret = getpeername(io->fd, (struct sockaddr*)&peeraddr, &addrlen);
-    if (ret < 0) {
-        //printf("connect failed: %s: %d\n", strerror(sockerrno), sockerrno);
-        state = 0;
-    }
-    else {
-        //struct sockaddr_in localaddr;
-        //addrlen = sizeof(struct sockaddr_in);
-        //getsockname(ioent->fd, (struct sockaddr*)&localaddr, &addrlen);
-        //printf("connect connfd=%d [%s:%d] => [%s:%d]\n", io->fd,
-                //inet_ntoa(localaddr.sin_addr), ntohs(localaddr.sin_port),
-                //inet_ntoa(peeraddr.sin_addr), ntohs(peeraddr.sin_port));
-        state = 1;
-    }
-    if (io->connect_cb) {
-        io->connect_cb(io, state, io->connect_userdata);
-    }
-}
-
-static void on_readable(hio_t* io, void* userdata) {
-    //printf("on_read fd=%d\n", io->fd);
-    int nread;
-    void* buf = io->readbuf;
-    int   len = io->readbuflen;
-read:
-    memset(buf, 0, len);
-    nread = read(io->fd, buf, len);
-    //printf("read retval=%d\n", nread);
-    if (nread < 0) {
-        if (sockerrno == NIO_EAGAIN) {
-            //goto read_done;
-            return;
-        }
-        else {
-            perror("read");
-            goto read_error;
-        }
-    }
-    if (nread == 0) {
-        goto disconnect;
-    }
-    //printf("> %s\n", buf);
-    if (io->read_cb) {
-        io->read_cb(io, io->readbuf, nread, io->read_userdata);
-    }
-    if (nread == len) {
-        goto read;
-    }
-    return;
-read_error:
-disconnect:
-    hclose(io);
-}
-
-static void on_writeable(hio_t* io, void* userdata) {
-    printf("on_write fd=%d\n", io->fd);
-    /*
-    int nwrite;
-write:
-    if (io->write_queue.empty()) {
-        return;
-    }
-    pbuf = io->write_queue.front();
-    nwrite = write(ioent->fd, buf, len);
-    if (nwrite < 0) {
-        if (nwrite == NIO_EAGAIN) {
-            //goto write_done;
-            return;
-        }
-        else {
-        }
-    }
-    if (io->write_cb) {
-        io->write_cb(io, nwrite);
-    }
-    if (nwrite == len) {
-        io->write_queue.pop_front();
-        goto write;
-    }
-    //pbuf->buf += nwrite;
-    return;
-write_error:
-disconnect:
-    hclose(ioent);
-    */
-}
-
-hio_t* haccept  (hloop_t* loop, int listenfd, haccept_cb accept_cb, void* accept_userdata,
-                    hclose_cb close_cb, void* close_userdata) {
-    hio_t* io = hio_accept(loop, listenfd, on_accept, NULL);
-    if (io) {
-        io->accept_cb = accept_cb;
-        io->accept_userdata = accept_userdata;
-        if (close_cb) {
-            io->close_cb = close_cb;
-        }
-        if (close_userdata) {
-            io->close_userdata = close_userdata;
-        }
-    }
-    return io;
-}
-
-hio_t* hconnect (hloop_t* loop, int connfd, hconnect_cb connect_cb, void* connect_userdata,
-                    hclose_cb close_cb, void* close_userdata) {
-    hio_t* io = hio_connect(loop, connfd, on_connect, NULL);
-    if (io) {
-        io->connect_cb = connect_cb;
-        io->connect_userdata = connect_userdata;
-        if (close_cb) {
-            io->close_cb = close_cb;
-        }
-        if (close_userdata) {
-            io->close_userdata = close_userdata;
-        }
-    }
-    return io;
-}
-
-hio_t* hread    (hloop_t* loop, int fd, void* readbuf, size_t readbuflen, hread_cb read_cb, void* read_userdata,
-                    hclose_cb close_cb, void* close_userdata) {
-    hio_t* io = hio_read(loop, fd, on_readable, NULL);
-    if (io) {
-        io->readbuf = (char*)readbuf;
-        io->readbuflen = readbuflen;
-        io->read_cb = read_cb;
-        io->read_userdata = read_userdata;
-        if (close_cb) {
-            io->close_cb = close_cb;
-        }
-        if (close_userdata) {
-            io->close_userdata = close_userdata;
-        }
-    }
-    return io;
-}
-
-hio_t* hwrite   (hloop_t* loop, int fd, const void* buf, size_t len, hwrite_cb write_cb, void* write_userdata,
-                    hclose_cb close_cb, void* close_userdata) {
-    hio_t* io = hio_add(loop, fd);
-    if (io == NULL) return NULL;
-    io->write_cb = write_cb;
-    io->write_userdata = write_userdata;
-    if (close_cb) {
-        io->close_cb = close_cb;
-    }
-    if (close_userdata) {
-        io->close_userdata = close_userdata;
-    }
-    int nwrite;
-    if (1) {
-    //if (io->write_queue.empty()) {
-try_write:
-        nwrite = write(fd, buf, len);
-        if (nwrite < 0) {
-            if (sockerrno == NIO_EAGAIN) {
-                nwrite = 0;
-                goto push_queue;
-            }
-            else {
-                perror("write");
-                goto write_error;
-            }
-            goto write_error;
-        }
-        if (nwrite == 0) {
-            goto disconnect;
-        }
-        if (write_cb) {
-            write_cb(io, buf, nwrite, io->write_userdata);
-        }
-        if (nwrite == len) {
-            //goto write_done;
-            return io;
-        }
-    }
-push_queue:
-    printf("write retval=%d buflen=%ld\n", nwrite, len);
-    //ioent->write_queue.push(buf+nwrite, len-nwrite);
-    //hioent_write(loop, fd, on_writeable, NULL);
-    return io;
-write_error:
-disconnect:
-    hclose(io);
-    return io;
-}
-
-void hclose(hio_t* io) {
-    //printf("close fd=%d\n", io->fd);
-    close(io->fd);
-    if (io->close_cb) {
-        io->close_cb(io, io->close_userdata);
-    }
-    hio_del(io);
-}

+ 4 - 5
event/noevent.cpp → event/noevent.c

@@ -1,7 +1,6 @@
-#include "io_watcher.h"
+#include "iowatcher.h"
 
 #ifdef EVENT_NOEVENT
-#include "htime.h"
 int iowatcher_init(hloop_t* loop) {
     return 0;
 }
@@ -10,16 +9,16 @@ int iowatcher_cleanup(hloop_t* loop) {
     return 0;
 }
 
-int iowatcher_add_event(hio_t* fd, int events) {
+int iowatcher_add_event(hloop_t* loop, int fd, int events) {
     return 0;
 }
 
-int iowatcher_del_event(hio_t* fd, int events) {
+int iowatcher_del_event(hloop_t* loop, int fd, int events) {
     return 0;
 }
 
 int iowatcher_poll_events(hloop_t* loop, int timeout) {
-    msleep(timeout);
     return 0;
 }
+
 #endif

+ 128 - 0
event/poll.c

@@ -0,0 +1,128 @@
+#include "iowatcher.h"
+
+#ifdef EVENT_POLL
+#include "hplatform.h"
+#include "hdef.h"
+
+#ifdef OS_LINUX
+#include <sys/poll.h>
+#endif
+
+#include "hevent.h"
+
+#include "array.h"
+#define FDS_INIT_SIZE   64
+ARRAY_DECL(struct pollfd, pollfds);
+
+typedef struct poll_ctx_s {
+    int            capacity;
+    struct pollfds fds;
+} poll_ctx_t;
+
+int iowatcher_init(hloop_t* loop) {
+    if (loop->iowatcher)   return 0;
+    poll_ctx_t* poll_ctx = (poll_ctx_t*)malloc(sizeof(poll_ctx_t));
+    pollfds_init(&poll_ctx->fds, FDS_INIT_SIZE);
+    loop->iowatcher = poll_ctx;
+    return 0;
+}
+
+int iowatcher_cleanup(hloop_t* loop) {
+    if (loop->iowatcher == NULL)   return 0;
+    poll_ctx_t* poll_ctx = (poll_ctx_t*)loop->iowatcher;
+    pollfds_cleanup(&poll_ctx->fds);
+    SAFE_FREE(loop->iowatcher);
+    return 0;
+}
+
+int iowatcher_add_event(hloop_t* loop, int fd, int events) {
+    if (loop->iowatcher == NULL) {
+        iowatcher_init(loop);
+    }
+    poll_ctx_t* poll_ctx = (poll_ctx_t*)loop->iowatcher;
+    hio_t* io = loop->ios.ptr[fd];
+    int idx = io->event_index[0];
+    struct pollfd* pfd = NULL;
+    if (idx < 0) {
+        io->event_index[0] = idx = poll_ctx->fds.size;
+        if (idx == poll_ctx->fds.maxsize) {
+            pollfds_double_resize(&poll_ctx->fds);
+        }
+        poll_ctx->fds.size++;
+        pfd = poll_ctx->fds.ptr + idx;
+        pfd->fd = fd;
+        pfd->events = 0;
+        pfd->revents = 0;
+    }
+    else {
+        pfd = poll_ctx->fds.ptr + idx;
+        assert(pfd->fd == fd);
+    }
+    if (events & READ_EVENT) {
+        pfd->events |= POLLIN;
+    }
+    if (events & WRITE_EVENT) {
+        pfd->events |= POLLOUT;
+    }
+    return 0;
+}
+
+int iowatcher_del_event(hloop_t* loop, int fd, int events) {
+    poll_ctx_t* poll_ctx = (poll_ctx_t*)loop->iowatcher;
+    if (poll_ctx == NULL)  return 0;
+    hio_t* io = loop->ios.ptr[fd];
+
+    int idx = io->event_index[0];
+    if (idx < 0) return 0;
+    struct pollfd* pfd = poll_ctx->fds.ptr + idx;
+    assert(pfd->fd == fd);
+    if (events & READ_EVENT) {
+        pfd->events &= ~POLLIN;
+    }
+    if (events & WRITE_EVENT) {
+        pfd->events &= ~POLLOUT;
+    }
+    if (pfd->events == 0) {
+        pollfds_del_nomove(&poll_ctx->fds, idx);
+        // NOTE: correct event_idex
+        if (idx < poll_ctx->fds.size) {
+            hio_t* last = loop->ios.ptr[poll_ctx->fds.ptr[idx].fd];
+            last->event_index[0] = idx;
+        }
+        io->event_index[0] = -1;
+    }
+    return 0;
+}
+
+int iowatcher_poll_events(hloop_t* loop, int timeout) {
+    poll_ctx_t* poll_ctx = (poll_ctx_t*)loop->iowatcher;
+    if (poll_ctx == NULL)  return 0;
+    if (poll_ctx->fds.size == 0)   return 0;
+    int npoll = poll(poll_ctx->fds.ptr, poll_ctx->fds.size, timeout);
+    if (npoll < 0) {
+        perror("poll");
+        return npoll;
+    }
+    if (npoll == 0) return 0;
+    int nevents = 0;
+    for (int i = 0; i < poll_ctx->fds.size; ++i) {
+        int fd = poll_ctx->fds.ptr[i].fd;
+        short revents = poll_ctx->fds.ptr[i].revents;
+        if (revents) {
+            ++nevents;
+            hio_t* io = loop->ios.ptr[fd];
+            if (io) {
+                if (revents | POLLIN) {
+                    io->revents |= READ_EVENT;
+                }
+                if (revents | POLLOUT) {
+                    io->revents |= WRITE_EVENT;
+                }
+                EVENT_PENDING(io);
+            }
+        }
+        if (nevents == npoll) break;
+    }
+    return nevents;
+}
+#endif

+ 0 - 122
event/poll.cpp

@@ -1,122 +0,0 @@
-#include "io_watcher.h"
-
-#ifdef EVENT_POLL
-#include "hplatform.h"
-#include "hdef.h"
-#include "hio.h"
-
-#define INIT_FDS_NUM    64
-
-typedef struct poll_ctx_s {
-    int            capacity;
-    int            nfds;
-    struct pollfd* fds;
-} poll_ctx_t;
-
-static void poll_ctx_resize(poll_ctx_t* poll_ctx, int size) {
-    int bytes = sizeof(struct pollfd) * size;
-    poll_ctx->fds = (struct pollfd*)realloc(poll_ctx->fds, bytes);
-    poll_ctx->capacity = size;
-}
-
-int iowatcher_init(hloop_t* loop) {
-    if (loop->iowatcher)   return 0;
-    poll_ctx_t* poll_ctx = (poll_ctx_t*)malloc(sizeof(poll_ctx_t));
-    poll_ctx->capacity = INIT_FDS_NUM;
-    poll_ctx->nfds = 0;
-    int bytes = sizeof(struct pollfd) * poll_ctx->capacity;
-    poll_ctx->fds = (struct pollfd*)malloc(bytes);
-    memset(poll_ctx->fds, 0, bytes);
-    loop->iowatcher = poll_ctx;
-    return 0;
-}
-
-int iowatcher_cleanup(hloop_t* loop) {
-    if (loop->iowatcher == NULL)   return 0;
-    poll_ctx_t* poll_ctx = (poll_ctx_t*)loop->iowatcher;
-    SAFE_FREE(poll_ctx->fds);
-    SAFE_FREE(loop->iowatcher);
-    return 0;
-}
-
-int iowatcher_add_event(hio_t* io, int events) {
-    hloop_t* loop = io->loop;
-    if (loop->iowatcher == NULL) {
-        hloop_iowatcher_init(loop);
-    }
-    poll_ctx_t* poll_ctx = (poll_ctx_t*)loop->iowatcher;
-    int idx = io->event_index[0];
-    if (idx < 0) {
-        io->event_index[0] = idx = poll_ctx->nfds;
-        poll_ctx->nfds++;
-        if (idx == poll_ctx->capacity) {
-            poll_ctx_resize(poll_ctx, poll_ctx->capacity*2);
-        }
-        poll_ctx->fds[idx].fd = io->fd;
-        poll_ctx->fds[idx].events = 0;
-        poll_ctx->fds[idx].revents = 0;
-    }
-    assert(poll_ctx->fds[idx].fd == io->fd);
-    if (events & READ_EVENT) {
-        poll_ctx->fds[idx].events |= POLLIN;
-    }
-    if (events & WRITE_EVENT) {
-        poll_ctx->fds[idx].events |= POLLOUT;
-    }
-    return 0;
-}
-
-int iowatcher_del_event(hio_t* io, int events) {
-    hloop_t* loop = io->loop;
-    poll_ctx_t* poll_ctx = (poll_ctx_t*)loop->iowatcher;
-    if (poll_ctx == NULL)  return 0;
-
-    int idx = io->event_index[0];
-    if (idx < 0) return 0;
-    assert(poll_ctx->fds[idx].fd == io->fd);
-    if (events & READ_EVENT) {
-        poll_ctx->fds[idx].events &= ~POLLIN;
-    }
-    if (events & WRITE_EVENT) {
-        poll_ctx->fds[idx].events &= ~POLLOUT;
-    }
-    if (poll_ctx->fds[idx].events == 0) {
-        io->event_index[0] = -1;
-        poll_ctx->nfds--;
-        if (idx < poll_ctx->nfds) {
-            poll_ctx->fds[idx] = poll_ctx->fds[poll_ctx->nfds];
-            auto iter = loop->ios.find(poll_ctx->fds[idx].fd);
-            if (iter != loop->ios.end()) {
-                iter->second->event_index[0] = idx;
-            }
-        }
-    }
-    return 0;
-}
-
-int iowatcher_poll_events(hloop_t* loop, int timeout) {
-    poll_ctx_t* poll_ctx = (poll_ctx_t*)loop->iowatcher;
-    if (poll_ctx == NULL)  return 0;
-    if (poll_ctx->nfds == 0)   return 0;
-    int npoll = poll(poll_ctx->fds, poll_ctx->nfds, timeout);
-    if (npoll < 0) {
-        perror("poll");
-        return npoll;
-    }
-    if (npoll == 0) return 0;
-    int nevent = 0;
-    for (int i = 0; i < poll_ctx->nfds; ++i) {
-        if (nevent == npoll) break;
-        int fd = poll_ctx->fds[i].fd;
-        short revents = poll_ctx->fds[i].revents;
-        if (revents) {
-            ++nevent;
-            hio_t* io = hio_get(loop, fd);
-            if (io == NULL) continue;
-            io->revents = revents;
-            hio_handle_events(io);
-        }
-    }
-    return nevent;
-}
-#endif

+ 57 - 29
event/select.cpp → event/select.c

@@ -1,4 +1,4 @@
-#include "io_watcher.h"
+#include "iowatcher.h"
 
 #ifdef EVENT_SELECT
 #include "hplatform.h"
@@ -7,7 +7,8 @@
 #endif
 
 #include "hdef.h"
-#include "hio.h"
+#include "hevent.h"
+#include "hsocket.h"
 
 typedef struct select_ctx_s {
     int max_fd;
@@ -34,13 +35,11 @@ int iowatcher_cleanup(hloop_t* loop) {
     return 0;
 }
 
-int iowatcher_add_event(hio_t* io, int events) {
-    hloop_t* loop = io->loop;
+int iowatcher_add_event(hloop_t* loop, int fd, int events) {
     if (loop->iowatcher == NULL) {
-        hloop_iowatcher_init(loop);
+        iowatcher_init(loop);
     }
     select_ctx_t* select_ctx = (select_ctx_t*)loop->iowatcher;
-    int fd = io->fd;
     if (fd > select_ctx->max_fd) {
         select_ctx->max_fd = fd;
     }
@@ -59,11 +58,9 @@ int iowatcher_add_event(hio_t* io, int events) {
     return 0;
 }
 
-int iowatcher_del_event(hio_t* io, int events) {
-    hloop_t* loop = io->loop;
+int iowatcher_del_event(hloop_t* loop, int fd, int events) {
     select_ctx_t* select_ctx = (select_ctx_t*)loop->iowatcher;
     if (select_ctx == NULL)    return 0;
-    int fd = io->fd;
     if (fd == select_ctx->max_fd) {
         select_ctx->max_fd = -1;
     }
@@ -82,6 +79,38 @@ int iowatcher_del_event(hio_t* io, int events) {
     return 0;
 }
 
+static int find_max_active_fd(hloop_t* loop) {
+    hio_t* io = NULL;
+    for (int i = loop->ios.maxsize-1; i >= 0; --i) {
+        io = loop->ios.ptr[i];
+        if (io && io->active && io->events) return i;
+    }
+    return -1;
+}
+
+static int remove_bad_fds(hloop_t* loop) {
+    select_ctx_t* select_ctx = (select_ctx_t*)loop->iowatcher;
+    if (select_ctx == NULL)    return 0;
+    int badfds = 0;
+    int error = 0;
+    socklen_t optlen = sizeof(error);
+    for (int fd = 0; fd <= select_ctx->max_fd; ++fd) {
+        if (FD_ISSET(fd, &select_ctx->readfds) ||
+            FD_ISSET(fd, &select_ctx->writefds)) {
+            error = 0;
+            optlen = sizeof(int);
+            if (getsockopt(fd, SOL_SOCKET, SO_ERROR, (char*)&error, &optlen) < 0 || error != 0) {
+                ++badfds;
+                hio_t* io = loop->ios.ptr[fd];
+                if (io) {
+                    hio_del(io, ALL_EVENTS);
+                }
+            }
+        }
+    }
+    return badfds;
+}
+
 int iowatcher_poll_events(hloop_t* loop, int timeout) {
     select_ctx_t* select_ctx = (select_ctx_t*)loop->iowatcher;
     if (select_ctx == NULL)    return 0;
@@ -92,13 +121,7 @@ int iowatcher_poll_events(hloop_t* loop, int timeout) {
     fd_set  readfds = select_ctx->readfds;
     fd_set  writefds = select_ctx->writefds;
     if (max_fd == -1) {
-        for (auto& pair : loop->ios) {
-            int fd = pair.first;
-            if (fd > max_fd) {
-                max_fd = fd;
-            }
-        }
-        select_ctx->max_fd = max_fd;
+        select_ctx->max_fd = max_fd = find_max_active_fd(loop);
     }
     struct timeval tv, *tp;
     if (timeout == INFINITE) {
@@ -117,28 +140,33 @@ int iowatcher_poll_events(hloop_t* loop, int timeout) {
         if (errno == EBADF) {
             perror("select");
 #endif
+            remove_bad_fds(loop);
             return -EBADF;
         }
         return nselect;
     }
     if (nselect == 0)   return 0;
-    int nevent = 0;
-    auto iter = loop->ios.begin();
-    while (iter != loop->ios.end()) {
-        if (nevent == nselect) break;
-        int fd = iter->first;
-        hio_t* io = iter->second;
+    int nevents = 0;
+    int revents = 0;
+    for (int fd = 0; fd <= max_fd; ++fd) {
+        revents = 0;
         if (FD_ISSET(fd, &readfds)) {
-            ++nevent;
-            io->revents |= READ_EVENT;
+            ++nevents;
+            revents |= READ_EVENT;
         }
         if (FD_ISSET(fd, &writefds)) {
-            ++nevent;
-            io->revents |= WRITE_EVENT;
+            ++nevents;
+            revents |= WRITE_EVENT;
+        }
+        if (revents) {
+            hio_t* io = loop->ios.ptr[fd];
+            if (io) {
+                io->revents = revents;
+                EVENT_PENDING(io);
+            }
         }
-        hio_handle_events(io);
-        ++iter;
+        if (nevents == nselect) break;
     }
-    return nevent;
+    return nevents;
 }
 #endif

+ 26 - 0
event/wsaio.c

@@ -0,0 +1,26 @@
+#include "iowatcher.h"
+
+#ifdef EVENT_IOCP
+#include "hplatform.h"
+
+hio_t* haccept  (hloop_t* loop, int listenfd, haccept_cb accept_cb) {
+    return NULL;
+}
+
+hio_t* hconnect (hloop_t* loop, int connfd, hconnect_cb connect_cb) {
+    return NULL;
+}
+
+hio_t* hread    (hloop_t* loop, int fd, void* buf, size_t len, hread_cb read_cb) {
+    return NULL;
+}
+
+hio_t* hwrite   (hloop_t* loop, int fd, const void* buf, size_t len, hwrite_cb cb) {
+    return NULL;
+}
+
+void   hclose   (hio_t* io) {
+
+}
+
+#endif

+ 28 - 23
examples/client.cpp

@@ -1,48 +1,50 @@
 #include "hloop.h"
-#include "hio.h"
 #include "hsocket.h"
 
 #define RECV_BUFSIZE    4096
 static char readbuf[RECV_BUFSIZE];
 
-void on_timer(htimer_t* timer, void* userdata) {
+void on_timer(htimer_t* timer) {
     static int cnt = 0;
-    printf("on_timer timer_id=%d time=%luus cnt=%d\n", timer->timer_id, timer->loop->cur_time, ++cnt);
+    printf("on_timer timer_id=%lu time=%lus cnt=%d\n", timer->event_id, hloop_now(timer->loop), ++cnt);
 }
 
-void on_idle(hidle_t* idle, void* userdata) {
+void on_idle(hidle_t* idle) {
     static int cnt = 0;
-    printf("on_idle idle_id=%d cnt=%d\n", idle->idle_id, ++cnt);
+    printf("on_idle idle_id=%lu cnt=%d\n", idle->event_id, ++cnt);
 }
 
-void on_write(hio_t* io, const void* buf, int writebytes, void* userdata) {
+void on_write(hio_t* io, const void* buf, int writebytes) {
     printf("on_write fd=%d writebytes=%d\n", io->fd, writebytes);
 }
 
-void on_stdin(hio_t* io, void* buf, int readbytes, void* userdata) {
+void on_stdin(hio_t* io, void* buf, int readbytes) {
     printf("on_stdin fd=%d readbytes=%d\n", io->fd, readbytes);
-    printf("> %s\n", io->readbuf);
+    printf("> %s\n", buf);
 
-    hio_t* iosock = (hio_t*)io->read_userdata;
-    hwrite(iosock->loop, iosock->fd, io->readbuf, readbytes, on_write, NULL);
+    hio_t* iosock = (hio_t*)io->userdata;
+    hwrite(iosock->loop, iosock->fd, buf, readbytes, on_write);
 }
 
-void on_read(hio_t* io, void* buf, int readbytes, void* userdata) {
+void on_read(hio_t* io, void* buf, int readbytes) {
     printf("on_read fd=%d readbytes=%d\n", io->fd, readbytes);
-    printf("< %s\n", io->readbuf);
+    printf("< %s\n", buf);
     printf(">>");
     fflush(stdout);
 }
 
-void on_close(hio_t* io, void* userdata) {
-    printf("on_close fd=%d\n", io->fd);
-    hio_t* iostdin = (hio_t*)userdata;
-    hio_del(iostdin);
+void on_close(hio_t* io) {
+    printf("on_close fd=%d error=%d\n", io->fd, io->error);
+    hio_t* iostdin = (hio_t*)io->userdata;
+    hio_del(iostdin, READ_EVENT);
 }
 
-void on_connect(hio_t* io, int state, void* userdata) {
+void on_connect(hio_t* io, int state) {
     printf("on_connect fd=%d state=%d\n", io->fd, state);
-    if (state == 0) return;
+    if (state == 0) {
+        printf("error=%d:%s\n", io->error, strerror(io->error));
+        return;
+    }
     struct sockaddr_in localaddr, peeraddr;
     socklen_t addrlen;
     addrlen = sizeof(struct sockaddr_in);
@@ -54,8 +56,11 @@ void on_connect(hio_t* io, int state, void* userdata) {
             inet_ntoa(peeraddr.sin_addr), ntohs(peeraddr.sin_port));
 
     // NOTE: just on loop, readbuf can be shared.
-    hio_t* iostdin = hread(io->loop, 0, readbuf, RECV_BUFSIZE, on_stdin, io);
-    hread(io->loop, io->fd, readbuf, RECV_BUFSIZE, on_read, NULL, on_close, iostdin);
+    hio_t* iostdin = hread(io->loop, 0, readbuf, RECV_BUFSIZE, on_stdin);
+    iostdin->userdata = io;
+    hio_t* iosock = hread(io->loop, io->fd, readbuf, RECV_BUFSIZE, on_read);
+    iosock->close_cb = on_close;
+    iosock->userdata = iostdin;
 
     printf(">>");
     fflush(stdout);
@@ -77,9 +82,9 @@ int main(int argc, char** argv) {
 
     hloop_t loop;
     hloop_init(&loop);
-    //hidle_add(&loop, on_idle, NULL);
-    //htimer_add(&loop, on_timer, NULL, 1000, INFINITE);
-    hconnect(&loop, connfd, on_connect, NULL);
+    //hidle_add(&loop, on_idle, INFINITE);
+    //htimer_add(&loop, on_timer, 1000, INFINITE);
+    hconnect(&loop, connfd, on_connect);
     hloop_run(&loop);
 
     return 0;

+ 25 - 0
examples/loop.c

@@ -0,0 +1,25 @@
+#include "hloop.h"
+
+void on_idle(hidle_t* idle) {
+    printf("on_idle: event_id=%lu\tpriority=%d\tuserdata=%ld\n", idle->event_id, idle->priority, (long)idle->userdata);
+}
+
+void on_timer(htimer_t* timer) {
+    printf("on_timer: event_id=%lu\tpriority=%d\tuserdata=%ld\ttime=%lus\thrtime=%luus\n",
+        timer->event_id, timer->priority, (long)timer->userdata, hloop_now(timer->loop), timer->loop->cur_hrtime);
+}
+
+int main() {
+    hloop_t loop;
+    hloop_init(&loop);
+    for (int i = HEVENT_LOWEST_PRIORITY; i <= HEVENT_HIGHEST_PRIORITY; ++i) {
+        hidle_t* idle = hidle_add(&loop, on_idle, 10);
+        idle->priority = i;
+    }
+    for (int i = 1; i <= 10; ++i) {
+        htimer_t* timer = htimer_add(&loop, on_timer, i*1000, i);
+        timer->userdata = (void*)i;
+    }
+    hloop_run(&loop);
+    return 0;
+}

+ 20 - 15
examples/server.cpp

@@ -4,33 +4,37 @@
 #define RECV_BUFSIZE    4096
 static char readbuf[RECV_BUFSIZE];
 
-void on_timer(htimer_t* timer, void* userdata) {
+void on_timer(htimer_t* timer) {
     static int cnt = 0;
-    printf("on_timer timer_id=%d time=%luus cnt=%d\n", timer->timer_id, timer->loop->cur_time, ++cnt);
+    printf("on_timer timer_id=%lu time=%lus cnt=%d\n", timer->event_id, hloop_now(timer->loop), ++cnt);
 }
 
-void on_idle(hidle_t* idle, void* userdata) {
+void on_idle(hidle_t* idle) {
     static int cnt = 0;
-    printf("on_idle idle_id=%d cnt=%d\n", idle->idle_id, ++cnt);
+    printf("on_idle idle_id=%lu cnt=%d\n", idle->event_id, ++cnt);
 }
 
-void on_close(hio_t* io, void* userdata) {
-    printf("on_close fd=%d\n", io->fd);
+void on_write(hio_t* io, const void* buf, int writebytes) {
+    printf("on_write fd=%d writebytes=%d\n", io->fd, writebytes);
+}
+
+void on_close(hio_t* io) {
+    printf("on_close fd=%d error=%d\n", io->fd, io->error);
 }
 
 void on_write(hio_t* io, const void* buf, int writebytes, void* userdata) {
     printf("on_write fd=%d writebytes=%d\n", io->fd, writebytes);
 }
 
-void on_read(hio_t* io, void* buf, int readbytes, void* userdata) {
+void on_read(hio_t* io, void* buf, int readbytes) {
     printf("on_read fd=%d readbytes=%d\n", io->fd, readbytes);
-    printf("< %s\n", io->readbuf);
+    printf("< %s\n", buf);
     // echo
-    printf("> %s\n", io->readbuf);
-    hwrite(io->loop, io->fd, io->readbuf, readbytes, on_write, NULL);
+    printf("> %s\n", buf);
+    hwrite(io->loop, io->fd, buf, readbytes, on_write);
 }
 
-void on_accept(hio_t* io, int connfd, void* userdata) {
+void on_accept(hio_t* io, int connfd) {
     printf("on_accept listenfd=%d connfd=%d\n", io->fd, connfd);
     struct sockaddr_in localaddr, peeraddr;
     socklen_t addrlen;
@@ -44,7 +48,8 @@ void on_accept(hio_t* io, int connfd, void* userdata) {
 
     nonblocking(connfd);
     // one loop can use one readbuf
-    hread(io->loop, connfd, readbuf, RECV_BUFSIZE, on_read, NULL, on_close, NULL);
+    hio_t* connio = hread(io->loop, connfd, readbuf, RECV_BUFSIZE, on_read);
+    connio->close_cb = on_close;
 }
 
 int main(int argc, char** argv) {
@@ -62,8 +67,8 @@ int main(int argc, char** argv) {
 
     hloop_t loop;
     hloop_init(&loop);
-    //hidle_add(&loop, on_idle, NULL);
-    //htimer_add(&loop, on_timer, NULL, 1000, INFINITE);
-    haccept(&loop, listenfd, on_accept, NULL);
+    //hidle_add(&loop, on_idle, INFINITE);
+    //htimer_add(&loop, on_timer, 1000, INFINITE);
+    haccept(&loop, listenfd, on_accept);
     hloop_run(&loop);
 }

+ 3 - 3
h.h

@@ -14,13 +14,14 @@
 // c
 #include "hsysinfo.h"
 #include "hproc.h"
-
+#include "hmath.h"
 #include "htime.h"
 #include "herr.h"
+#include "hlog.h"
+#include "hmutex.h"
 
 // cpp
 #ifdef __cplusplus
-#include "hlog.h"
 #include "hstring.h"
 #include "hsocket.h"
 
@@ -30,7 +31,6 @@
 #include "hbuf.h"
 #include "hfile.h"
 #include "hscope.h"
-#include "hmutex.h"
 #include "hthread.h"
 #include "hthreadpool.h"
 #endif

+ 19 - 14
http/server/http_server.cpp

@@ -78,9 +78,9 @@ struct http_connect_userdata {
     }
 };
 
-static void on_read(hio_t* io, void* buf, int readbytes, void* userdata) {
+static void on_read(hio_t* io, void* buf, int readbytes) {
     //printf("on_read fd=%d readbytes=%d\n", io->fd, readbytes);
-    http_connect_userdata* hcu = (http_connect_userdata*)userdata;
+    http_connect_userdata* hcu = (http_connect_userdata*)io->userdata;
     HttpService* service = hcu->server->service;
     HttpRequest* req = &hcu->req;
     HttpResponse* res = &hcu->res;
@@ -209,20 +209,19 @@ static void on_read(hio_t* io, void* buf, int readbytes, void* userdata) {
             hwrite(io->loop, io->fd, content, content_length);
         }
         hcu->log += asprintf("=>[%d %s]", res->status_code, http_status_str(res->status_code));
-        hlogi("%s", hcu->log.c_str());
         hclose(io);
     }
 }
 
-static void on_close(hio_t* io, void* userdata) {
-    http_connect_userdata* hcu = (http_connect_userdata*)userdata;
+static void on_close(hio_t* io) {
+    http_connect_userdata* hcu = (http_connect_userdata*)io->userdata;
     if (hcu) {
         hlogi("%s", hcu->log.c_str());
         delete hcu;
     }
 }
 
-static void on_accept(hio_t* io, int connfd, void* userdata) {
+static void on_accept(hio_t* io, int connfd) {
     //printf("on_accept listenfd=%d connfd=%d\n", io->fd, connfd);
     struct sockaddr_in localaddr, peeraddr;
     socklen_t addrlen;
@@ -236,16 +235,18 @@ static void on_accept(hio_t* io, int connfd, void* userdata) {
     // new http_connect_userdata
     // delete on_close
     http_connect_userdata* hcu = new http_connect_userdata;
-    hcu->server = (http_server_t*)userdata;
+    hcu->server = (http_server_t*)io->userdata;
     hcu->log += asprintf("[%s:%d]", inet_ntoa(peeraddr.sin_addr), ntohs(peeraddr.sin_port));
 
     nonblocking(connfd);
-    HBuf* buf = (HBuf*)io->loop->custom_data.ptr;
-    hread(io->loop, connfd, buf->base, buf->len, on_read, hcu, on_close, hcu);
+    HBuf* buf = (HBuf*)io->loop->userdata;
+    hio_t* connio = hread(io->loop, connfd, buf->base, buf->len, on_read);
+    connio->close_cb = on_close;
+    connio->userdata = hcu;
 }
 
-void handle_cached_files(htimer_t* timer, void* userdata) {
-    FileCache* pfc = (FileCache*)userdata;
+void handle_cached_files(htimer_t* timer) {
+    FileCache* pfc = (FileCache*)timer->userdata;
     if (pfc == NULL) {
         htimer_del(timer);
         return;
@@ -270,12 +271,16 @@ static void worker_proc(void* userdata) {
     int listenfd = server->listenfd;
     hloop_t loop;
     hloop_init(&loop);
-    htimer_add(&loop, handle_cached_files, &s_filecache, s_filecache.file_cached_time*1000);
+    htimer_t* timer = htimer_add(&loop, handle_cached_files, s_filecache.file_cached_time*1000);
+    timer->userdata = &s_filecache;
     // one loop one readbuf.
     HBuf readbuf;
     readbuf.resize(RECV_BUFSIZE);
-    loop.custom_data.ptr = &readbuf;
-    haccept(&loop, listenfd, on_accept, server);
+    loop.userdata = &readbuf;
+    hio_t* listenio = haccept(&loop, listenfd, on_accept);
+    listenio->userdata = server;
+    // disable fflush
+    hlog_set_fflush(0);
     hloop_run(&loop);
 }