Bläddra i källkod

add iowatcher

ithewei 6 år sedan
förälder
incheckning
b05e3cbd1f
23 ändrade filer med 877 tillägg och 694 borttagningar
  1. 9 9
      base/hbuf.h
  2. 10 0
      base/hdef.h
  3. 1 0
      base/hplatform.h
  4. 1 0
      base/hsocket.h
  5. 41 52
      event/epoll.cpp
  6. 0 86
      event/hevent.cpp
  7. 0 44
      event/hevent.h
  8. 121 0
      event/hio.cpp
  9. 16 0
      event/hio.h
  10. 35 109
      event/hloop.cpp
  11. 86 45
      event/hloop.h
  12. 40 0
      event/io_watcher.cpp
  13. 54 0
      event/io_watcher.h
  14. 39 51
      event/kqueue.cpp
  15. 239 0
      event/nio.cpp
  16. 8 8
      event/noevent.cpp
  17. 36 46
      event/poll.cpp
  18. 30 28
      event/select.cpp
  19. 40 63
      examples/client.cpp
  20. 26 66
      examples/server.cpp
  21. 1 1
      http/server/FileCache.h
  22. 40 84
      http/server/http_server.cpp
  23. 4 2
      utils/hmain.cpp

+ 9 - 9
base/hbuf.h

@@ -8,7 +8,7 @@
 #include "hdef.h"
 
 typedef struct hbuf_s {
-    uint8_t* base;
+    char*  base;
     size_t len;
 
 #ifdef __cplusplus
@@ -18,7 +18,7 @@ typedef struct hbuf_s {
     }
 
     hbuf_s(void* data, size_t len) {
-        this->base = (uint8_t*)data;
+        this->base = (char*)data;
         this->len  = len;
     }
 #endif
@@ -60,11 +60,11 @@ public:
         if (cap == len) return;
 
         if (base == NULL) {
-            base = (uint8_t*)malloc(cap);
+            base = (char*)malloc(cap);
             memset(base, 0, cap);
         }
         else {
-            base = (uint8_t*)realloc(base, cap);
+            base = (char*)realloc(base, cap);
         }
         len = cap;
         cleanup_ = true;
@@ -91,13 +91,13 @@ public:
     HVLBuf(size_t cap) : HBuf(cap) {_offset = _size = 0;}
     virtual ~HVLBuf() {}
 
-    uint8_t* data() { return base+_offset; }
+    char* data() { return base+_offset; }
     size_t size() { return _size; }
 
     void push_front(void* ptr, size_t len) {
         if (len > this->len - _size) {
             this->len = MAX(this->len, len)*2;
-            base = (uint8_t*)realloc(base, this->len);
+            base = (char*)realloc(base, this->len);
         }
 
         if (_offset < len) {
@@ -114,7 +114,7 @@ public:
     void push_back(void* ptr, size_t len) {
         if (len > this->len - _size) {
             this->len = MAX(this->len, len)*2;
-            base = (uint8_t*)realloc(base, this->len);
+            base = (char*)realloc(base, this->len);
         }
         else if (len > this->len - _offset - _size) {
             // move => start
@@ -176,8 +176,8 @@ public:
     HRingBuf(size_t cap) : HBuf(cap) {_head = _tail = _size = 0;}
     virtual ~HRingBuf() {}
 
-    uint8_t* alloc(size_t len) {
-        uint8_t* ret = NULL;
+    char* alloc(size_t len) {
+        char* ret = NULL;
         if (_head < _tail || _size == 0) {
             // [_tail, this->len) && [0, _head)
             if (this->len - _tail >= len) {

+ 10 - 0
base/hdef.h

@@ -12,6 +12,16 @@ typedef int                 BOOL;
 
 typedef void*               handle;
 
+typedef union {
+    bool        b;
+    char        ch;
+    char*       str;
+    long long   num;
+    float       f;
+    double      lf;
+    void*       ptr;
+} var;
+
 #ifdef _MSC_VER
 typedef int pid_t;
 typedef int gid_t;

+ 1 - 0
base/hplatform.h

@@ -144,6 +144,7 @@ typedef unsigned __int16    uint16_t;
 typedef unsigned __int32    uint32_t;
 typedef unsigned __int64    uint64_t;
 #else
+#include <stdbool.h>
 #include <stdint.h>
 #endif
 

+ 1 - 0
base/hsocket.h

@@ -21,6 +21,7 @@ inline int nonblocking(int sockfd) {
     unsigned long nb = 1;
     return ioctlsocket(sockfd, FIONBIO, &nb);
 }
+#define poll        WSAPoll
 #define sockerrno   WSAGetLastError()
 #define NIO_EAGAIN  WSAEWOULDBLOCK
 #else

+ 41 - 52
event/epoll.cpp

@@ -1,11 +1,8 @@
-#include "hevent.h"
+#include "io_watcher.h"
 
 #ifdef EVENT_EPOLL
+#include "hio.h"
 #include "hplatform.h"
-#ifdef OS_LINUX
-#include <sys/epoll.h>
-#endif
-
 #include "hdef.h"
 
 #define INIT_EVENTS_NUM    64
@@ -23,8 +20,8 @@ static void epoll_ctx_resize(epoll_ctx_t* epoll_ctx, int size) {
     epoll_ctx->capacity = size;
 }
 
-int _event_init(hloop_t* loop) {
-    if (loop->event_ctx) return 0;
+int iowatcher_init(hloop_t* loop) {
+    if (loop->iowatcher) return 0;
     epoll_ctx_t* epoll_ctx = (epoll_ctx_t*)malloc(sizeof(epoll_ctx_t));
     epoll_ctx->epfd = epoll_create(INIT_EVENTS_NUM);
     epoll_ctx->capacity = INIT_EVENTS_NUM;
@@ -32,36 +29,36 @@ int _event_init(hloop_t* loop) {
     int bytes = sizeof(struct epoll_event) * epoll_ctx->capacity;
     epoll_ctx->events = (struct epoll_event*)malloc(bytes);
     memset(epoll_ctx->events, 0, bytes);
-    loop->event_ctx = epoll_ctx;
+    loop->iowatcher = epoll_ctx;
     return 0;
 }
 
-int _event_cleanup(hloop_t* loop) {
-    if (loop->event_ctx == NULL) return 0;
-    epoll_ctx_t* epoll_ctx = (epoll_ctx_t*)loop->event_ctx;
+int iowatcher_cleanup(hloop_t* loop) {
+    if (loop->iowatcher == NULL) return 0;
+    epoll_ctx_t* epoll_ctx = (epoll_ctx_t*)loop->iowatcher;
     close(epoll_ctx->epfd);
     SAFE_FREE(epoll_ctx->events);
-    SAFE_FREE(loop->event_ctx);
+    SAFE_FREE(loop->iowatcher);
     return 0;
 }
 
-int _add_event(hevent_t* event, int type) {
-    hloop_t* loop = event->loop;
-    if (loop->event_ctx == NULL) {
-        hloop_event_init(loop);
+int iowatcher_add_event(hio_t* io, int events) {
+    hloop_t* loop = io->loop;
+    if (loop->iowatcher == NULL) {
+        hloop_iowatcher_init(loop);
     }
-    epoll_ctx_t* epoll_ctx = (epoll_ctx_t*)loop->event_ctx;
-    int op = event->events == 0 ? EPOLL_CTL_ADD : EPOLL_CTL_MOD;
-    if (type & READ_EVENT) {
-        event->events |= EPOLLIN;
+    epoll_ctx_t* epoll_ctx = (epoll_ctx_t*)loop->iowatcher;
+    struct epoll_event ee;
+    ee.events = 0;
+    ee.data.fd = io->fd;
+    if (events & READ_EVENT) {
+        ee.events |= EPOLLIN;
     }
-    if (type & WRITE_EVENT) {
-        event->events |= EPOLLOUT;
+    if (events & WRITE_EVENT) {
+        ee.events |= EPOLLOUT;
     }
-    struct epoll_event ee;
-    ee.events = event->events;
-    ee.data.fd = event->fd;
-    epoll_ctl(epoll_ctx->epfd, op, event->fd, &ee);
+    int op = io->events == 0 ? EPOLL_CTL_ADD : EPOLL_CTL_MOD;
+    epoll_ctl(epoll_ctx->epfd, op, io->fd, &ee);
     if (op == EPOLL_CTL_ADD) {
         if (epoll_ctx->nevents == epoll_ctx->capacity) {
             epoll_ctx_resize(epoll_ctx, epoll_ctx->capacity*2);
@@ -71,31 +68,30 @@ int _add_event(hevent_t* event, int type) {
     return 0;
 }
 
-int _del_event(hevent_t* event, int type) {
-    hloop_t* loop = event->loop;
-    epoll_ctx_t* epoll_ctx = (epoll_ctx_t*)loop->event_ctx;
+int iowatcher_del_event(hio_t* io, int events) {
+    hloop_t* loop = io->loop;
+    epoll_ctx_t* epoll_ctx = (epoll_ctx_t*)loop->iowatcher;
     if (epoll_ctx == NULL) return 0;
 
-    if (event->events == 0) return 0;
-    if (type & READ_EVENT) {
-        event->events &= ~EPOLLIN;
+    struct epoll_event ee;
+    ee.events = io->events;
+    ee.data.fd = io->fd;
+    if (events & READ_EVENT) {
+        ee.events &= ~EPOLLIN;
     }
-    if (type & WRITE_EVENT) {
-        event->events &= ~EPOLLOUT;
+    if (events & WRITE_EVENT) {
+        ee.events &= ~EPOLLOUT;
     }
-    int op = event->events == 0 ? EPOLL_CTL_DEL : EPOLL_CTL_MOD;
-    struct epoll_event ee;
-    ee.events = event->events;
-    ee.data.fd = event->fd;
-    epoll_ctl(epoll_ctx->epfd, op, event->fd, &ee);
+    int op = ee.events == 0 ? EPOLL_CTL_DEL : EPOLL_CTL_MOD;
+    epoll_ctl(epoll_ctx->epfd, op, io->fd, &ee);
     if (op == EPOLL_CTL_DEL) {
         epoll_ctx->nevents--;
     }
     return 0;
 }
 
-int _handle_events(hloop_t* loop, int timeout) {
-    epoll_ctx_t* epoll_ctx = (epoll_ctx_t*)loop->event_ctx;
+int iowatcher_poll_events(hloop_t* loop, int timeout) {
+    epoll_ctx_t* epoll_ctx = (epoll_ctx_t*)loop->iowatcher;
     if (epoll_ctx == NULL)  return 0;
     if (epoll_ctx->nevents == 0) return 0;
     int nepoll = epoll_wait(epoll_ctx->epfd, epoll_ctx->events, epoll_ctx->nevents, timeout);
@@ -111,17 +107,10 @@ int _handle_events(hloop_t* loop, int timeout) {
         uint32_t revents = epoll_ctx->events[i].events;
         if (revents) {
             ++nevent;
-            auto iter = loop->events.find(fd);
-            if (iter == loop->events.end()) {
-                continue;
-            }
-            hevent_t* event = iter->second;
-            if (revents & EPOLLIN) {
-                _on_read(event);
-            }
-            if (revents & EPOLLOUT) {
-                _on_write(event);
-            }
+            hio_t* io = hio_get(loop, fd);
+            if (io == NULL) continue;
+            io->revents = revents;
+            hio_handle_events(io);
         }
     }
     return nevent;

+ 0 - 86
event/hevent.cpp

@@ -1,86 +0,0 @@
-#include "hevent.h"
-
-#include "hdef.h"
-#include "hlog.h"
-#include "hsocket.h"
-
-int _on_read(hevent_t* event) {
-    event->readable = 1;
-    //if (event->accept) {
-    //}
-    if (event->read_cb) {
-        event->read_cb(event, event->read_userdata);
-    }
-    event->readable = 0;
-    return 0;
-}
-
-int _on_write(hevent_t* event) {
-    // ONESHOT
-    _del_event(event, WRITE_EVENT);
-    event->writeable = 1;
-    //if (event->connect) {
-    //}
-    if (event->write_cb) {
-        event->write_cb(event, event->read_userdata);
-    }
-    event->writeable = 0;
-    return 0;
-}
-
-int hloop_event_init(hloop_t* loop) {
-    return _event_init(loop);
-}
-
-int hloop_event_cleanup(hloop_t* loop) {
-    return _event_cleanup(loop);
-}
-
-int hloop_add_event(hevent_t* event, int type) {
-    return _add_event(event, type);
-}
-
-int hloop_del_event(hevent_t* event, int type) {
-    return _del_event(event, type);
-}
-
-static void remove_bad_fds(hloop_t* loop) {
-    int error = 0;
-    socklen_t optlen = sizeof(int);
-    int ret = 0;
-    auto iter = loop->events.begin();
-    while (iter != loop->events.end()) {
-        int fd = iter->first;
-        ret = getsockopt(fd, SOL_SOCKET, SO_ERROR, (char*)&error, &optlen);
-        if (ret < 0 || error != 0) {
-            hloge("getsockopt fd=%d retval=%d SO_ERROR=%d", fd, ret, error);
-            hloop_del_event(iter->second);
-            iter = loop->events.erase(iter);
-            continue;
-        }
-        ++iter;
-    }
-}
-
-int hloop_handle_events(hloop_t* loop, int timeout) {
-    /*
-    // remove destroy events
-    hevent_t* event = NULL;
-    auto iter = loop->events.begin();
-    while (iter != loop->events.end()) {
-        event = iter->second;
-        if (event->destroy) {
-            SAFE_FREE(event);
-            iter = loop->events.erase(iter);
-            continue;
-        }
-        ++iter;
-    }
-    */
-    int nevent = _handle_events(loop, timeout);
-    if (nevent < 0) {
-        printf("handle_events error=%d\n", -nevent);
-        remove_bad_fds(loop);
-    }
-    return nevent;
-}

+ 0 - 44
event/hevent.h

@@ -1,44 +0,0 @@
-#ifndef __HW_EVNET_H_
-#define __HW_EVNET_H_
-
-#include "hloop.h"
-#include "hplatform.h"
-
-#define READ_EVENT  0x0001
-#define WRITE_EVENT 0x0004
-
-#define READ_INDEX  0
-#define WRITE_INDEX 1
-#define EVENT_INDEX(type) ((type == READ_EVENT) ? READ_INDEX : WRITE_INDEX)
-
-int hloop_event_init(hloop_t* loop);
-int hloop_event_cleanup(hloop_t* loop);
-int hloop_add_event(hevent_t* event, int type = READ_EVENT|WRITE_EVENT);
-int hloop_del_event(hevent_t* event, int type = READ_EVENT|WRITE_EVENT);
-int hloop_handle_events(hloop_t* loop, int timeout = INFINITE);
-
-int _on_read(hevent_t* event);
-int _on_write(hevent_t* event);
-
-#if !defined(EVENT_SELECT) && !defined(EVENT_POLL) && !defined(EVENT_EPOLL) && \
-    !defined(EVENT_IOCP) && !defined(EVENT_KQUEUE) && !defined(EVENT_NOEVENT)
-#ifdef OS_WIN
-//#define EVENT_IOCP
-#define EVENT_SELECT
-#elif defined(OS_LINUX)
-#define EVENT_EPOLL
-#elif defined(OS_MAC)
-#define EVENT_KQUEUE
-#elif defined(OS_BSD)
-#define EVENT_KQUEUE
-#else
-#define EVENT_SELECT
-#endif
-#endif
-int _event_init(hloop_t* loop);
-int _event_cleanup(hloop_t* loop);
-int _add_event(hevent_t* event, int type);
-int _del_event(hevent_t* event, int type);
-int _handle_events(hloop_t* loop, int timeout);
-
-#endif // __HW_EVNET_H_

+ 121 - 0
event/hio.cpp

@@ -0,0 +1,121 @@
+#include "hio.h"
+#include "io_watcher.h"
+
+void hio_init(hio_t* io) {
+    hloop_t* loop = io->loop;
+    int fd = io->fd;
+    memset(io, 0, sizeof(hio_t));
+    io->event_index[0] = io->event_index[1] = -1;
+    io->loop = loop;
+    io->fd = fd;
+}
+
+hio_t* hio_get(hloop_t* loop, int fd) {
+    auto iter = loop->ios.find(fd);
+    if (iter == loop->ios.end()) {
+        return NULL;
+    }
+    return iter->second;
+}
+
+hio_t* hio_add(hloop_t* loop, int fd) {
+    // first try get
+    hio_t* io = hio_get(loop, fd);
+    if (io == NULL) {
+        // then add
+#ifdef EVENT_SELECT
+        if (loop->ios.size() >= FD_SETSIZE) return NULL;
+#endif
+        io = (hio_t*)malloc(sizeof(hio_t));
+        hio_init(io);
+        io->event_type= HEVENT_TYPE_IO;
+        io->event_id = ++loop->event_counter;
+        loop->ios[fd] = io;
+    }
+    io->loop = loop;
+    io->fd = fd;
+    io->active = 1;
+    return io;
+}
+
+void hio_del(hio_t* io) {
+    iowatcher_del_event(io, ALL_EVENTS);
+    io->events = 0;
+     // no free, just init for reuse
+    hio_init(io);
+}
+
+hio_t* hio_read (hloop_t* loop, int fd, hio_cb revent_cb, void* revent_userdata) {
+    hio_t* io = hio_add(loop, fd);
+    if (io == NULL) return NULL;
+    io->revent_cb = revent_cb;
+    io->revent_userdata = revent_userdata;
+    iowatcher_add_event(io, READ_EVENT);
+    io->events |= READ_EVENT;
+    return io;
+}
+
+hio_t* hio_write  (hloop_t* loop, int fd, hio_cb wevent_cb, void* wevent_userdata) {
+    hio_t* io = hio_add(loop, fd);
+    if (io == NULL) return NULL;
+    io->wevent_cb = wevent_cb;
+    io->wevent_userdata = wevent_userdata;
+    iowatcher_add_event(io, WRITE_EVENT);
+    io->events |= WRITE_EVENT;
+    return io;
+}
+
+#include "hsocket.h"
+hio_t* hio_accept (hloop_t* loop, int listenfd, hio_cb revent_cb, void* revent_userdata) {
+    hio_t* io = hio_read(loop, listenfd, revent_cb, revent_userdata);
+    if (io) {
+        nonblocking(listenfd);
+        io->accept = 1;
+    }
+    return io;
+}
+
+hio_t* hio_connect(hloop_t* loop, int connfd, hio_cb wevent_cb, void* wevent_userdata) {
+    hio_t* io = hio_write(loop, connfd, wevent_cb, wevent_userdata);
+    if (io) {
+        nonblocking(connfd);
+        io->connect = 1;
+    }
+    return io;
+}
+
+static int handle_read_event(hio_t* io) {
+    if (!io->active) return 0;
+    if (io->revent_cb) {
+        io->revent_cb(io, io->revent_userdata);
+    }
+    return 0;
+}
+
+static int handle_write_event(hio_t* io) {
+    if (!io->active) return 0;
+    bool connect_event = io->connect;
+    if (connect_event) {
+        // ONESHOT
+        iowatcher_del_event(io, WRITE_EVENT);
+        io->connect = 0;
+    }
+    if (io->wevent_cb) {
+        io->wevent_cb(io, io->wevent_userdata);
+    }
+    //if (!connect_event && io->write_queue.empty()) {
+        //iowatcher_del_event(io, WRITE_EVENT);
+    //}
+    return 0;
+}
+
+int hio_handle_events(hio_t* io) {
+    if (io->revents & READ_EVENT) {
+        handle_read_event(io);
+    }
+    if (io->revents & WRITE_EVENT) {
+        handle_write_event(io);
+    }
+    io->revents = 0;
+    return 0;
+}

+ 16 - 0
event/hio.h

@@ -0,0 +1,16 @@
+#ifndef HW_IO_H_
+#define HW_IO_H_
+
+#include "hloop.h"
+
+hio_t* hio_get(hloop_t* loop, int fd);
+hio_t* hio_add(hloop_t* loop, int fd);
+void   hio_del(hio_t* io);
+int    hio_handle_events(hio_t* io);
+
+hio_t* hio_read   (hloop_t* loop, int fd, hio_cb revent_cb, void* revent_userdata);
+hio_t* hio_write  (hloop_t* loop, int fd, hio_cb wevent_cb, void* wevent_userdata);
+hio_t* hio_accept (hloop_t* loop, int listenfd, hio_cb revent_cb, void* revent_userdata);
+hio_t* hio_connect(hloop_t* loop, int connfd, hio_cb wevent_cb, void* wevent_userdata);
+
+#endif // HW_IO_H_

+ 35 - 109
event/hloop.cpp

@@ -1,8 +1,9 @@
 #include "hloop.h"
+#include "hio.h"
+#include "io_watcher.h"
 
 #include "hdef.h"
 #include "htime.h"
-#include "hevent.h"
 
 static void hloop_update_time(hloop_t* loop) {
     loop->cur_time = gethrtime();
@@ -10,12 +11,12 @@ static void hloop_update_time(hloop_t* loop) {
 
 int hloop_init(hloop_t* loop) {
     loop->status = HLOOP_STATUS_STOP;
+    loop->event_counter = 0;
     loop->timer_counter = 0;
     loop->idle_counter = 0;
     loop->min_timer_timeout = INFINITE;
-    loop->event_ctx = NULL;
-    // hloop_event_init when add_event first
-    // hloop_event_init(loop);
+    loop->iowatcher = NULL;
+    //hloop_iowatcher_init(loop);
     return 0;
 }
 
@@ -28,13 +29,13 @@ void hloop_cleanup(hloop_t* loop) {
         SAFE_FREE(pair.second);
     }
     loop->idles.clear();
-    for (auto& pair : loop->events) {
-        hevent_t* event = pair.second;
-        hloop_del_event(event);
-        SAFE_FREE(event);
+    for (auto& pair : loop->ios) {
+        hio_t* io = pair.second;
+        hio_del(io);
+        SAFE_FREE(io);
     }
-    loop->events.clear();
-    hloop_event_cleanup(loop);
+    loop->ios.clear();
+    hloop_iowatcher_cleanup(loop);
 }
 
 int hloop_handle_timers(hloop_t* loop) {
@@ -43,7 +44,7 @@ int hloop_handle_timers(hloop_t* loop) {
     while (iter != loop->timers.end()) {
         htimer_t* timer = iter->second;
         if (timer->destroy) goto destroy;
-        if (timer->disable) goto next;
+        if (!timer->active) goto next;
         if (timer->repeat == 0) goto destroy;
         if (timer->next_timeout < loop->cur_time) {
             ++ntimer;
@@ -71,7 +72,7 @@ int hloop_handle_idles(hloop_t* loop) {
     while (iter != loop->idles.end()) {
         hidle_t* idle = iter->second;
         if (idle->destroy)  goto destroy;
-        if (idle->disable)  goto next;
+        if (!idle->active)  goto next;
         if (idle->repeat == 0) goto destroy;
         ++nidle;
         if (idle->cb) {
@@ -91,11 +92,11 @@ destroy:
 }
 
 #define PAUSE_SLEEP_TIME        10      // ms
-#define MIN_EVENT_TIMEOUT       1       // ms
-#define MAX_EVENT_TIMEOUT       1000    // ms
+#define MIN_POLL_TIMEOUT        1       // ms
+#define MAX_POLL_TIMEOUT        1000    // ms
 int hloop_run(hloop_t* loop) {
-    int ntimer, nevent, nidle;
-    int event_timeout;
+    int ntimer, nio, nidle;
+    int poll_timeout;
 
     loop->start_time = gethrtime();
     loop->status = HLOOP_STATUS_RUNNING;
@@ -108,25 +109,25 @@ int hloop_run(hloop_t* loop) {
         }
         ++loop->loop_cnt;
         // timers -> events -> idles
-        ntimer = nevent = nidle = 0;
-        event_timeout = INFINITE;
+        ntimer = nio = nidle = 0;
+        poll_timeout = INFINITE;
         if (loop->timers.size() != 0) {
             ntimer = hloop_handle_timers(loop);
-            event_timeout = MAX(MIN_EVENT_TIMEOUT, loop->min_timer_timeout/10);
+            poll_timeout = MAX(MIN_POLL_TIMEOUT, loop->min_timer_timeout/10);
         }
-        if (loop->events.size() == 0 || loop->idles.size() != 0) {
-            event_timeout = MIN(event_timeout, MAX_EVENT_TIMEOUT);
+        if (loop->ios.size() == 0 || loop->idles.size() != 0) {
+            poll_timeout = MIN(poll_timeout, MAX_POLL_TIMEOUT);
         }
-        if (loop->events.size() != 0) {
-            nevent = hloop_handle_events(loop, event_timeout);
+        if (loop->ios.size() != 0) {
+            nio = hloop_handle_ios(loop, poll_timeout);
         }
         else {
-            msleep(event_timeout);
+            msleep(poll_timeout);
         }
-        if (ntimer == 0 && nevent == 0 && loop->idles.size() != 0) {
+        if (ntimer == 0 && nio == 0 && loop->idles.size() != 0) {
             nidle = hloop_handle_idles(loop);
         }
-        //printf("loop_cnt=%lu ntimer=%d nevent=%d nidle=%d\n", loop->loop_cnt, ntimer, nevent, nidle);
+        //printf("loop_cnt=%lu ntimer=%d nio=%d nidle=%d\n", loop->loop_cnt, ntimer, nio, nidle);
     }
     loop->status = HLOOP_STATUS_STOP;
     loop->end_time = gethrtime();
@@ -156,6 +157,8 @@ int hloop_resume(hloop_t* loop) {
 htimer_t* htimer_add(hloop_t* loop, htimer_cb cb, void* userdata, uint64_t timeout, uint32_t repeat) {
     htimer_t* timer = (htimer_t*)malloc(sizeof(htimer_t));
     memset(timer, 0, sizeof(htimer_t));
+    timer->event_type = HEVENT_TYPE_TIMER;
+    timer->event_id = ++loop->event_counter;
     timer->loop = loop;
     timer->timer_id = ++loop->timer_counter;
     timer->cb = cb;
@@ -163,12 +166,14 @@ htimer_t* htimer_add(hloop_t* loop, htimer_cb cb, void* userdata, uint64_t timeo
     timer->timeout = timeout;
     timer->repeat = repeat;
     timer->next_timeout = gethrtime() + timeout*1000;
+    timer->active = 1;
     loop->timers[timer->timer_id] = timer;
     loop->min_timer_timeout = MIN(timeout, loop->min_timer_timeout);
     return timer;
 }
 
 void htimer_del(htimer_t* timer) {
+    timer->active = 0;
     timer->destroy = 1;
 }
 
@@ -183,16 +188,20 @@ void htimer_del(hloop_t* loop, uint32_t timer_id) {
 hidle_t* hidle_add(hloop_t* loop, hidle_cb cb, void* userdata, uint32_t repeat) {
     hidle_t* idle = (hidle_t*)malloc(sizeof(hidle_t));
     memset(idle, 0, sizeof(hidle_t));
+    idle->event_type = HEVENT_TYPE_IDLE;
+    idle->event_id = ++loop->event_counter;
     idle->loop = loop;
     idle->idle_id = ++loop->idle_counter;
     idle->cb = cb;
     idle->userdata = userdata;
     idle->repeat = repeat;
+    idle->active = 1;
     loop->idles[idle->idle_id] = idle;
     return idle;
 }
 
 void hidle_del(hidle_t* idle) {
+    idle->active = 0;
     idle->destroy = 1;
 }
 
@@ -203,86 +212,3 @@ void hidle_del(hloop_t* loop, uint32_t idle_id) {
         hidle_del(idle);
     }
 }
-
-hevent_t* hevent_add(hloop_t* loop, int fd) {
-#ifdef EVENT_SELECT
-    if (loop->events.size() >= FD_SETSIZE) return NULL;
-#endif
-    hevent_t* event = (hevent_t*)malloc(sizeof(hevent_t));
-    memset(event, 0, sizeof(hevent_t));
-    event->loop = loop;
-    event->fd = fd;
-    event->event_index[0] = -1;
-    event->event_index[1] = -1;
-    event->events = 0;
-    loop->events[fd] = event;
-    return event;
-}
-
-hevent_t* hevent_get(hloop_t* loop, int fd) {
-    auto iter = loop->events.find(fd);
-    if (iter != loop->events.end()) {
-        return iter->second;
-    }
-    return NULL;
-}
-
-hevent_t* hevent_get_or_add(hloop_t* loop, int fd) {
-    hevent_t* event = hevent_get(loop, fd);
-    if (event)  {
-        event->destroy = 0;
-        event->disable = 0;
-        return event;
-    }
-    return hevent_add(loop, fd);
-}
-
-void hevent_del(hevent_t* event) {
-    event->destroy = 1;
-    hloop_del_event(event, READ_EVENT|WRITE_EVENT);
-}
-
-void hevent_del(hloop_t* loop, int fd) {
-    auto iter = loop->events.find(fd);
-    if (iter != loop->events.end()) {
-        hevent_del(iter->second);
-    }
-}
-
-hevent_t* hevent_read(hloop_t* loop, int fd, hevent_cb cb, void* userdata) {
-    hevent_t* event = hevent_get_or_add(loop, fd);
-    if (event == NULL) return NULL;
-    event->read_cb = cb;
-    event->read_userdata = userdata;
-    hloop_add_event(event, READ_EVENT);
-    return event;
-}
-
-hevent_t* hevent_write(hloop_t* loop, int fd, hevent_cb cb, void* userdata) {
-    hevent_t* event = hevent_get_or_add(loop, fd);
-    if (event == NULL) return NULL;
-    event->write_cb = cb;
-    event->write_userdata = userdata;
-    hloop_add_event(event, WRITE_EVENT);
-    return event;
-}
-
-#include "hsocket.h"
-hevent_t* hevent_accept(hloop_t* loop, int listenfd, hevent_cb cb, void* userdata) {
-    hevent_t* event = hevent_read(loop, listenfd, cb, userdata);
-    if (event) {
-        nonblocking(listenfd);
-        event->accept = 1;
-    }
-    return event;
-}
-
-hevent_t* hevent_connect(hloop_t* loop, int connfd, hevent_cb cb, void* userdata) {
-    hevent_t* event = hevent_write(loop, connfd, cb, userdata);
-    if (event) {
-        nonblocking(connfd);
-        event->connect = 1;
-    }
-    return event;
-}
-

+ 86 - 45
event/hloop.h

@@ -1,20 +1,27 @@
 #ifndef HW_LOOP_H_
 #define HW_LOOP_H_
 
-#include <map>
+#include "hdef.h"
 
-#ifndef INFINITE
-#define INFINITE    (uint32_t)-1
-#endif
+#include <map>
+#include <list>
 
 typedef struct hloop_s  hloop_t;
+typedef struct hevent_s hevent_t;
 typedef struct htimer_s htimer_t;
 typedef struct hidle_s  hidle_t;
-typedef struct hevent_s hevent_t;
+typedef struct hio_s    hio_t;
+
+typedef void (*hevent_cb)   (hevent_t* ev,      void* userdata);
+typedef void (*htimer_cb)   (htimer_t* timer,   void* userdata);
+typedef void (*hidle_cb)    (hidle_t* idle,     void* userdata);
+typedef void (*hio_cb)      (hio_t* io,         void* userdata);
 
-typedef void (*htimer_cb)(htimer_t* timer, void* userdata);
-typedef void (*hidle_cb)(hidle_t* idle, void* userdata);
-typedef void (*hevent_cb)(hevent_t* event, void* userdata);
+typedef void (*hread_cb)    (hio_t* io, void* buf, int readbytes, void* userdata);
+typedef void (*hwrite_cb)   (hio_t* io, const void* buf, int writebytes, void* userdata);
+typedef void (*hclose_cb)   (hio_t* io, void* userdata);
+typedef void (*haccept_cb)  (hio_t* io, int connfd, void* userdata);
+typedef void (*hconnect_cb) (hio_t* io, int state,  void* userdata);
 
 typedef enum {
     HLOOP_STATUS_STOP,
@@ -28,67 +35,99 @@ struct hloop_s {
     uint64_t    end_time;
     uint64_t    cur_time;
     uint64_t    loop_cnt;
+    var         custom_data;
+//private:
+    uint64_t                    event_counter;
     // timers
-    uint32_t                    timer_counter;
     // timer_id => timer
+    uint32_t                    timer_counter;
     std::map<int, htimer_t*>    timers;
     uint32_t                    min_timer_timeout;
     // idles
-    uint32_t                    idle_counter;
     // hidle_id => idle
+    uint32_t                    idle_counter;
     std::map<int, hidle_t*>     idles;
-    // events
-    // fd => event
-    std::map<int, hevent_t*>    events;
-    void*                       event_ctx; // private
+    // ios
+    // fd => io
+    std::map<int, hio_t*>       ios;
+    void*                       iowatcher;
+};
+
+typedef enum {
+    HEVENT_TYPE_NONE    = 0,
+    HEVENT_TYPE_TIMER   = 0x0001,
+    HEVENT_TYPE_IDLE    = 0x0002,
+    HEVENT_TYPE_IO      = 0x0004,
+} hevent_type_e;
+
+#define HEVENT_FIELDS               \
+    hloop_t*        loop;           \
+    hevent_type_e   event_type;     \
+    uint64_t        event_id;       \
+    int             priority;       \
+    var             custom_data;
+
+#define HEVENT_FLAGS        \
+    unsigned    destroy :1; \
+    unsigned    active  :1; \
+    unsigned    pending :1;
+
+struct hevent_s {
+    HEVENT_FIELDS
+//private:
+    HEVENT_FLAGS
 };
 
 struct htimer_s {
-    hloop_t*    loop;
+    HEVENT_FIELDS
     uint32_t    timer_id;
     uint32_t    timeout;
     uint32_t    repeat;
     htimer_cb   cb;
     void*       userdata;
 //private:
-    unsigned    destroy     :1;
-    unsigned    disable     :1;
     uint64_t    next_timeout;
+    HEVENT_FLAGS
 };
 
 struct hidle_s {
-    hloop_t*    loop;
+    HEVENT_FIELDS
     uint32_t    idle_id;
     uint32_t    repeat;
     hidle_cb    cb;
     void*       userdata;
 //private:
-    unsigned    destroy     :1;
-    unsigned    disable     :1;
+    HEVENT_FLAGS
 };
 
-typedef union {
-    void*       ptr;
-    uint32_t    u32;
-    uint64_t    u64;
-} hevent_data_e;
-
-struct hevent_s {
-    hloop_t*    loop;
+struct hio_s {
+    HEVENT_FIELDS
     int         fd;
-    hevent_cb   read_cb;
+    int         error;
+    char*       readbuf;
+    int         readbuflen;
+    // callbacks
+    hread_cb    read_cb;
     void*       read_userdata;
-    hevent_cb   write_cb;
+    hwrite_cb   write_cb;
     void*       write_userdata;
+    hclose_cb   close_cb;
+    void*       close_userdata;
+    haccept_cb  accept_cb;
+    void*       accept_userdata;
+    hconnect_cb connect_cb;
+    void*       connect_userdata;
 //private:
-    unsigned    destroy     :1;
-    unsigned    disable     :1;
+    hio_cb      revent_cb;
+    void*       revent_userdata;
+    hio_cb      wevent_cb;
+    void*       wevent_userdata;
+    int         event_index[2];
+    int         events;
+    int         revents;
+    HEVENT_FLAGS
     unsigned    accept      :1;
     unsigned    connect     :1;
-    unsigned    readable    :1;
-    unsigned    writeable   :1;
-    int         event_index[2]; // for poll,kqueue
-    int         events;      // for epoll
 };
 
 // loop
@@ -110,14 +149,16 @@ hidle_t*    hidle_add(hloop_t* loop, hidle_cb cb, void* userdata, uint32_t repea
 void        hidle_del(hloop_t* loop, uint32_t idle_id);
 void        hidle_del(hidle_t* idle);
 
-// event
-// NOTE: READ_EVENT is FOREVER
-// NOTE: WRITE_EVENT is ONESHOT
-hevent_t* hevent_accept(hloop_t* loop, int listenfd, hevent_cb on_accept, void* userdata);
-hevent_t* hevent_connect(hloop_t* loop, int connfd, hevent_cb on_connect, void* userdata);
-hevent_t* hevent_read(hloop_t* loop, int fd, hevent_cb on_readable, void* userdata);
-hevent_t* hevent_write(hloop_t* loop, int fd, hevent_cb on_writeable, void* userdata);
-void      hevent_del(hloop_t* loop, int fd);
-void      hevent_del(hevent_t* event);
+// io
+hio_t* haccept  (hloop_t* loop, int listenfd, haccept_cb accept_cb, void* accept_userdata,
+                    hclose_cb close_cb = NULL, void* close_userdata = NULL);
+hio_t* hconnect (hloop_t* loop, int connfd, hconnect_cb connect_cb, void* connect_userdata,
+                    hclose_cb close_cb = NULL, void* close_userdata = NULL);
+hio_t* hread    (hloop_t* loop, int fd, void* buf, size_t len, hread_cb read_cb, void* read_userdata,
+                    hclose_cb close_cb = NULL, void* close_userdata = NULL);
+hio_t* hwrite   (hloop_t* loop, int fd, const void* buf, size_t len,
+                    hwrite_cb write_cb = NULL, void* write_userdata = NULL,
+                    hclose_cb close_cb = NULL, void* close_userdata = NULL);
+void   hclose   (hio_t* io);
 
 #endif // HW_LOOP_H_

+ 40 - 0
event/io_watcher.cpp

@@ -0,0 +1,40 @@
+#include "io_watcher.h"
+#include "hio.h"
+
+#include "hdef.h"
+#include "hlog.h"
+#include "hsocket.h"
+
+int hloop_iowatcher_init(hloop_t* loop) {
+    return iowatcher_init(loop);
+}
+
+int hloop_iowatcher_cleanup(hloop_t* loop) {
+    return iowatcher_cleanup(loop);
+}
+
+static void remove_bad_fds(hloop_t* loop) {
+    int error = 0;
+    socklen_t optlen = sizeof(int);
+    int ret = 0;
+    auto iter = loop->ios.begin();
+    while (iter != loop->ios.end()) {
+        int fd = iter->first;
+        ret = getsockopt(fd, SOL_SOCKET, SO_ERROR, (char*)&error, &optlen);
+        if (ret < 0 || error != 0) {
+            hloge("getsockopt fd=%d retval=%d SO_ERROR=%d", fd, ret, error);
+            hio_del(iter->second);
+            continue;
+        }
+        ++iter;
+    }
+}
+
+int hloop_handle_ios(hloop_t* loop, int timeout) {
+    int nevent = iowatcher_poll_events(loop, timeout);
+    if (nevent < 0) {
+        hloge("poll_events error=%d", -nevent);
+        remove_bad_fds(loop);
+    }
+    return nevent;
+}

+ 54 - 0
event/io_watcher.h

@@ -0,0 +1,54 @@
+#ifndef IO_WATCHER_H_
+#define IO_WATCHER_H_
+
+#include "hloop.h"
+
+int hloop_iowatcher_init(hloop_t* loop);
+int hloop_iowatcher_cleanup(hloop_t* loop);
+int hloop_handle_ios(hloop_t* loop, int timeout);
+
+#include "hplatform.h"
+#if !defined(EVENT_SELECT) && !defined(EVENT_POLL) && !defined(EVENT_EPOLL) && \
+    !defined(EVENT_IOCP) && !defined(EVENT_KQUEUE) && !defined(EVENT_NOEVENT)
+#ifdef OS_WIN
+#define EVENT_IOCP
+#elif defined(OS_LINUX)
+#define EVENT_EPOLL
+#elif defined(OS_MAC)
+#define EVENT_KQUEUE
+#elif defined(OS_BSD)
+#define EVENT_KQUEUE
+#else
+#define EVENT_SELECT
+#endif
+#endif
+
+#if defined(EVENT_POLL)
+#include <sys/poll.h>
+#define READ_EVENT  POLLIN
+#define WRITE_EVENT POLLOUT
+#elif defined(EVENT_EPOLL)
+#include <sys/epoll.h>
+#define READ_EVENT  EPOLLIN
+#define WRITE_EVENT EPOLLOUT
+#elif defined(EVENT_KQUEUE)
+#include <sys/event.h>
+#define READ_EVENT  EVFILT_READ
+#define WRITE_EVENT EVFILT_WRITE
+#else
+#define READ_EVENT  0x0001
+#define WRITE_EVENT 0x0004
+#endif
+
+#define ALL_EVENTS  READ_EVENT|WRITE_EVENT
+
+#define READ_INDEX  0
+#define WRITE_INDEX 1
+#define EVENT_INDEX(type) ((type == READ_EVENT) ? READ_INDEX : WRITE_INDEX)
+int iowatcher_init(hloop_t* loop);
+int iowatcher_cleanup(hloop_t* loop);
+int iowatcher_add_event(hio_t* fd, int events);
+int iowatcher_del_event(hio_t* fd, int events);
+int iowatcher_poll_events(hloop_t* loop, int timeout);
+
+#endif

+ 39 - 51
event/kqueue.cpp

@@ -1,11 +1,7 @@
-#include "hevent.h"
+#include "io_watcher.h"
 
 #ifdef EVENT_KQUEUE
 #include "hplatform.h"
-#if defined(OS_MAC) || defined(OS_BSD)
-#include <sys/event.h>
-#endif
-
 #include "hdef.h"
 
 #define INIT_EVENTS_NUM     64
@@ -26,8 +22,8 @@ static void kqueue_ctx_resize(kqueue_ctx_t* kqueue_ctx, int size) {
     kqueue_ctx->capacity = size;
 }
 
-int _event_init(hloop_t* loop) {
-    if (loop->event_ctx) return 0;
+int iowatcher_init(hloop_t* loop) {
+    if (loop->iowatcher) return 0;
     kqueue_ctx_t* kqueue_ctx = (kqueue_ctx_t*)malloc(sizeof(kqueue_ctx_t));
     kqueue_ctx->kqfd = kqueue();
     kqueue_ctx->capacity = INIT_EVENTS_NUM;
@@ -37,41 +33,41 @@ int _event_init(hloop_t* loop) {
     memset(kqueue_ctx->changes, 0, bytes);
     kqueue_ctx->events = (struct kevent*)malloc(bytes);
     memset(kqueue_ctx->events, 0, bytes);
-    loop->event_ctx = kqueue_ctx;
+    loop->iowatcher = kqueue_ctx;
     return 0;
 }
 
-int _event_cleanup(hloop_t* loop) {
-    if (loop->event_ctx == NULL) return 0;
-    kqueue_ctx_t* kqueue_ctx = (kqueue_ctx_t*)loop->event_ctx;
+int iowatcher_cleanup(hloop_t* loop) {
+    if (loop->iowatcher == NULL) return 0;
+    kqueue_ctx_t* kqueue_ctx = (kqueue_ctx_t*)loop->iowatcher;
     close(kqueue_ctx->kqfd);
     SAFE_FREE(kqueue_ctx->changes);
     SAFE_FREE(kqueue_ctx->events);
-    SAFE_FREE(loop->event_ctx);
+    SAFE_FREE(loop->iowatcher);
     return 0;
 }
 
-static int __add_event(hevent_t* event, int type) {
-    hloop_t* loop = event->loop;
-    if (loop->event_ctx == NULL) {
-        hloop_event_init(loop);
+static int __add_event(hio_t* io, int event) {
+    hloop_t* loop = io->loop;
+    if (loop->iowatcher == NULL) {
+        hloop_iowatcher_init(loop);
     }
-    kqueue_ctx_t* kqueue_ctx = (kqueue_ctx_t*)loop->event_ctx;
-    int idx = event->event_index[EVENT_INDEX(type)];
+    kqueue_ctx_t* kqueue_ctx = (kqueue_ctx_t*)loop->iowatcher;
+    int idx = io->event_index[EVENT_INDEX(event)];
     if (idx < 0) {
-        event->event_index[EVENT_INDEX(type)] = idx = kqueue_ctx->nchanges;
+        io->event_index[EVENT_INDEX(event)] = idx = kqueue_ctx->nchanges;
         kqueue_ctx->nchanges++;
         if (idx == kqueue_ctx->capacity) {
             kqueue_ctx_resize(kqueue_ctx, kqueue_ctx->capacity*2);
         }
         memset(kqueue_ctx->changes+idx, 0, sizeof(struct kevent));
-        kqueue_ctx->changes[idx].ident = event->fd;
+        kqueue_ctx->changes[idx].ident = io->fd;
     }
-    assert(kqueue_ctx->changes[idx].ident == event->fd);
-    if (type & READ_EVENT) {
+    assert(kqueue_ctx->changes[idx].ident == io->fd);
+    if (events & READ_EVENT) {
         kqueue_ctx->changes[idx].filter = EVFILT_READ;
     }
-    else if (type & WRITE_EVENT) {
+    else if (events & WRITE_EVENT) {
         kqueue_ctx->changes[idx].filter = EVFILT_WRITE;
     }
     kqueue_ctx->changes[idx].flags = EV_ADD|EV_ENABLE;
@@ -82,25 +78,25 @@ static int __add_event(hevent_t* event, int type) {
     return 0;
 }
 
-int _add_event(hevent_t* event, int type) {
-    if (type & READ_EVENT) {
+int iowatcher_add_event(hio_t* io, int events) {
+    if (events & READ_EVENT) {
         __add_event(event, READ_EVENT);
     }
-    if (type & WRITE_EVENT) {
+    if (events & WRITE_EVENT) {
         __add_event(event, WRITE_EVENT);
     }
     return 0;
 }
 
-static int __del_event(hevent_t* event, int type) {
-    hloop_t* loop = event->loop;
-    kqueue_ctx_t* kqueue_ctx = (kqueue_ctx_t*)loop->event_ctx;
+static int __del_event(hio_t* io, int event) {
+    hloop_t* loop = io->loop;
+    kqueue_ctx_t* kqueue_ctx = (kqueue_ctx_t*)loop->iowatcher;
     if (kqueue_ctx == NULL) return 0;
-    int idx = event->event_index[EVENT_INDEX(type)];
+    int idx = io->event_index[EVENT_INDEX(event)];
     if (idx < 0) return 0;
-    assert(kqueue_ctx->changes[idx].ident == event->fd);
+    assert(kqueue_ctx->changes[idx].ident == io->fd);
     kqueue_ctx->changes[idx].flags = EV_DELETE;
-    event->event_index[EVENT_INDEX(type)] = -1;
+    io->event_index[EVENT_INDEX(event)] = -1;
     int lastidx = kqueue_ctx->nchanges - 1;
     if (idx < lastidx) {
         // swap
@@ -121,18 +117,18 @@ static int __del_event(hevent_t* event, int type) {
     return 0;
 }
 
-int _del_event(hevent_t* event, int type) {
-    if (type & READ_EVENT) {
-        __del_event(event, READ_EVENT);
+int iowatcher_del_event(hio_t* io, int events) {
+    if (events & READ_EVENT) {
+        __del_event(io, READ_EVENT);
     }
-    if (type & WRITE_EVENT) {
-        __del_event(event, WRITE_EVENT);
+    if (events & WRITE_EVENT) {
+        __del_event(io, WRITE_EVENT);
     }
     return 0;
 }
 
-int _handle_events(hloop_t* loop, int timeout) {
-    kqueue_ctx_t* kqueue_ctx = (kqueue_ctx_t*)loop->event_ctx;
+int iowatcher_poll_events(hloop_t* loop, int timeout) {
+    kqueue_ctx_t* kqueue_ctx = (kqueue_ctx_t*)loop->iowatcher;
     if (kqueue_ctx == NULL) return 0;
     if (kqueue_ctx->nchanges == 0) return 0;
     struct timespec ts, *tp;
@@ -159,19 +155,11 @@ int _handle_events(hloop_t* loop, int timeout) {
         ++nevent;
         int fd = kqueue_ctx->events[i].ident;
         int revent = kqueue_ctx->events[i].filter;
-        auto iter = loop->events.find(fd);
-        if (iter == loop->events.end()) {
-            continue;
-        }
-        hevent_t* event = iter->second;
-        if (revent == EVFILT_READ) {
-            _on_read(event);
-        }
-        else if (revent == EVFILT_WRITE) {
-            _on_write(event);
-        }
+        hio_t* io = hio_get(loop, fd);
+        if (io == NULL) continue;
+        io->revents = revent;
+        hio_handle_events(io);
     }
-
     return nevent;
 }
 #endif

+ 239 - 0
event/nio.cpp

@@ -0,0 +1,239 @@
+#include "hloop.h"
+#include "hio.h"
+#include "hsocket.h"
+
+static void on_accept(hio_t* io, void* userdata) {
+    //printf("on_accept listenfd=%d\n", io->fd);
+    struct sockaddr_in peeraddr;
+    socklen_t addrlen;
+    //struct sockaddr_in localaddr;
+    //addrlen = sizeof(struct sockaddr_in);
+    //getsockname(io->fd, (struct sockaddr*)&localaddr, &addrlen);
+accept:
+    addrlen = sizeof(struct sockaddr_in);
+    int connfd = accept(io->fd, (struct sockaddr*)&peeraddr, &addrlen);
+    if (connfd < 0) {
+        if (sockerrno == NIO_EAGAIN) {
+            //goto accept_done;
+            return;
+        }
+        else {
+            perror("accept");
+            goto accept_error;
+        }
+    }
+    //printf("accept connfd=%d [%s:%d] <= [%s:%d]\n", connfd,
+            //inet_ntoa(localaddr.sin_addr), ntohs(localaddr.sin_port),
+            //inet_ntoa(peeraddr.sin_addr), ntohs(peeraddr.sin_port));
+
+    if (io->accept_cb) {
+        io->accept_cb(io, connfd, io->accept_userdata);
+    }
+
+    goto accept;
+
+accept_error:
+    hclose(io);
+}
+
+static void on_connect(hio_t* io, void* userdata) {
+    //printf("on_connect connfd=%d\n", io->fd);
+    int state = 0;
+    struct sockaddr_in peeraddr;
+    socklen_t addrlen;
+    addrlen = sizeof(struct sockaddr_in);
+    int ret = getpeername(io->fd, (struct sockaddr*)&peeraddr, &addrlen);
+    if (ret < 0) {
+        //printf("connect failed: %s: %d\n", strerror(sockerrno), sockerrno);
+        state = 0;
+    }
+    else {
+        //struct sockaddr_in localaddr;
+        //addrlen = sizeof(struct sockaddr_in);
+        //getsockname(ioent->fd, (struct sockaddr*)&localaddr, &addrlen);
+        //printf("connect connfd=%d [%s:%d] => [%s:%d]\n", io->fd,
+                //inet_ntoa(localaddr.sin_addr), ntohs(localaddr.sin_port),
+                //inet_ntoa(peeraddr.sin_addr), ntohs(peeraddr.sin_port));
+        state = 1;
+    }
+    if (io->connect_cb) {
+        io->connect_cb(io, state, io->connect_userdata);
+    }
+}
+
+static void on_readable(hio_t* io, void* userdata) {
+    //printf("on_read fd=%d\n", io->fd);
+    int nread;
+    void* buf = io->readbuf;
+    int   len = io->readbuflen;
+read:
+    memset(buf, 0, len);
+    nread = read(io->fd, buf, len);
+    //printf("read retval=%d\n", nread);
+    if (nread < 0) {
+        if (sockerrno == NIO_EAGAIN) {
+            //goto read_done;
+            return;
+        }
+        else {
+            perror("read");
+            goto read_error;
+        }
+    }
+    if (nread == 0) {
+        goto disconnect;
+    }
+    //printf("> %s\n", buf);
+    if (io->read_cb) {
+        io->read_cb(io, io->readbuf, nread, io->read_userdata);
+    }
+    if (nread == len) {
+        goto read;
+    }
+    return;
+read_error:
+disconnect:
+    hclose(io);
+}
+
+static void on_writeable(hio_t* io, void* userdata) {
+    printf("on_write fd=%d\n", io->fd);
+    /*
+    int nwrite;
+write:
+    if (io->write_queue.empty()) {
+        return;
+    }
+    pbuf = io->write_queue.front();
+    nwrite = write(ioent->fd, buf, len);
+    if (nwrite < 0) {
+        if (nwrite == NIO_EAGAIN) {
+            //goto write_done;
+            return;
+        }
+        else {
+        }
+    }
+    if (io->write_cb) {
+        io->write_cb(io, nwrite);
+    }
+    if (nwrite == len) {
+        io->write_queue.pop_front();
+        goto write;
+    }
+    //pbuf->buf += nwrite;
+    return;
+write_error:
+disconnect:
+    hclose(ioent);
+    */
+}
+
+hio_t* haccept  (hloop_t* loop, int listenfd, haccept_cb accept_cb, void* accept_userdata,
+                    hclose_cb close_cb, void* close_userdata) {
+    hio_t* io = hio_accept(loop, listenfd, on_accept, NULL);
+    if (io) {
+        io->accept_cb = accept_cb;
+        io->accept_userdata = accept_userdata;
+        if (close_cb) {
+            io->close_cb = close_cb;
+        }
+        if (close_userdata) {
+            io->close_userdata = close_userdata;
+        }
+    }
+    return io;
+}
+
+hio_t* hconnect (hloop_t* loop, int connfd, hconnect_cb connect_cb, void* connect_userdata,
+                    hclose_cb close_cb, void* close_userdata) {
+    hio_t* io = hio_connect(loop, connfd, on_connect, NULL);
+    if (io) {
+        io->connect_cb = connect_cb;
+        io->connect_userdata = connect_userdata;
+        if (close_cb) {
+            io->close_cb = close_cb;
+        }
+        if (close_userdata) {
+            io->close_userdata = close_userdata;
+        }
+    }
+    return io;
+}
+
+hio_t* hread    (hloop_t* loop, int fd, void* readbuf, size_t readbuflen, hread_cb read_cb, void* read_userdata,
+                    hclose_cb close_cb, void* close_userdata) {
+    hio_t* io = hio_read(loop, fd, on_readable, NULL);
+    if (io) {
+        io->readbuf = (char*)readbuf;
+        io->readbuflen = readbuflen;
+        io->read_cb = read_cb;
+        io->read_userdata = read_userdata;
+        if (close_cb) {
+            io->close_cb = close_cb;
+        }
+        if (close_userdata) {
+            io->close_userdata = close_userdata;
+        }
+    }
+    return io;
+}
+
+hio_t* hwrite   (hloop_t* loop, int fd, const void* buf, size_t len, hwrite_cb write_cb, void* write_userdata,
+                    hclose_cb close_cb, void* close_userdata) {
+    hio_t* io = hio_add(loop, fd);
+    if (io == NULL) return NULL;
+    io->write_cb = write_cb;
+    io->write_userdata = write_userdata;
+    if (close_cb) {
+        io->close_cb = close_cb;
+    }
+    if (close_userdata) {
+        io->close_userdata = close_userdata;
+    }
+    int nwrite;
+    if (1) {
+    //if (io->write_queue.empty()) {
+try_write:
+        nwrite = write(fd, buf, len);
+        if (nwrite < 0) {
+            if (sockerrno == NIO_EAGAIN) {
+                nwrite = 0;
+                goto push_queue;
+            }
+            else {
+                perror("write");
+                goto write_error;
+            }
+            goto write_error;
+        }
+        if (nwrite == 0) {
+            goto disconnect;
+        }
+        if (write_cb) {
+            write_cb(io, buf, nwrite, io->write_userdata);
+        }
+        if (nwrite == len) {
+            //goto write_done;
+            return io;
+        }
+    }
+push_queue:
+    printf("write retval=%d buflen=%ld\n", nwrite, len);
+    //ioent->write_queue.push(buf+nwrite, len-nwrite);
+    //hioent_write(loop, fd, on_writeable, NULL);
+    return io;
+write_error:
+disconnect:
+    hclose(io);
+    return io;
+}
+
+void hclose(hio_t* io) {
+    //printf("close fd=%d\n", io->fd);
+    close(io->fd);
+    if (io->close_cb) {
+        io->close_cb(io, io->close_userdata);
+    }
+    hio_del(io);
+}

+ 8 - 8
event/noevent.cpp

@@ -1,25 +1,25 @@
-#include "hevent.h"
+#include "io_watcher.h"
 
 #ifdef EVENT_NOEVENT
-int _event_init(hloop_t* loop) {
-    loop->event_ctx = NULL;
+#include "htime.h"
+int iowatcher_init(hloop_t* loop) {
     return 0;
 }
 
-int _event_cleanup(hloop_t* loop) {
-    loop->event_ctx = NULL;
+int iowatcher_cleanup(hloop_t* loop) {
     return 0;
 }
 
-int _add_event(hevent_t* event, int type) {
+int iowatcher_add_event(hio_t* fd, int events) {
     return 0;
 }
 
-int _del_event(hevent_t* event, int type) {
+int iowatcher_del_event(hio_t* fd, int events) {
     return 0;
 }
 
-int _handle_events(hloop_t* loop, int timeout) {
+int iowatcher_poll_events(hloop_t* loop, int timeout) {
+    msleep(timeout);
     return 0;
 }
 #endif

+ 36 - 46
event/poll.cpp

@@ -1,12 +1,9 @@
-#include "hevent.h"
+#include "io_watcher.h"
 
 #ifdef EVENT_POLL
 #include "hplatform.h"
-#ifdef OS_LINUX
-#include <sys/poll.h>
-#endif
-
 #include "hdef.h"
+#include "hio.h"
 
 #define INIT_FDS_NUM    64
 
@@ -22,74 +19,74 @@ static void poll_ctx_resize(poll_ctx_t* poll_ctx, int size) {
     poll_ctx->capacity = size;
 }
 
-int _event_init(hloop_t* loop) {
-    if (loop->event_ctx)   return 0;
+int iowatcher_init(hloop_t* loop) {
+    if (loop->iowatcher)   return 0;
     poll_ctx_t* poll_ctx = (poll_ctx_t*)malloc(sizeof(poll_ctx_t));
     poll_ctx->capacity = INIT_FDS_NUM;
     poll_ctx->nfds = 0;
     int bytes = sizeof(struct pollfd) * poll_ctx->capacity;
     poll_ctx->fds = (struct pollfd*)malloc(bytes);
     memset(poll_ctx->fds, 0, bytes);
-    loop->event_ctx = poll_ctx;
+    loop->iowatcher = poll_ctx;
     return 0;
 }
 
-int _event_cleanup(hloop_t* loop) {
-    if (loop->event_ctx == NULL)   return 0;
-    poll_ctx_t* poll_ctx = (poll_ctx_t*)loop->event_ctx;
+int iowatcher_cleanup(hloop_t* loop) {
+    if (loop->iowatcher == NULL)   return 0;
+    poll_ctx_t* poll_ctx = (poll_ctx_t*)loop->iowatcher;
     SAFE_FREE(poll_ctx->fds);
-    SAFE_FREE(loop->event_ctx);
+    SAFE_FREE(loop->iowatcher);
     return 0;
 }
 
-int _add_event(hevent_t* event, int type) {
-    hloop_t* loop = event->loop;
-    if (loop->event_ctx == NULL) {
-        hloop_event_init(loop);
+int iowatcher_add_event(hio_t* io, int events) {
+    hloop_t* loop = io->loop;
+    if (loop->iowatcher == NULL) {
+        hloop_iowatcher_init(loop);
     }
-    poll_ctx_t* poll_ctx = (poll_ctx_t*)loop->event_ctx;
-    int idx = event->event_index[0];
+    poll_ctx_t* poll_ctx = (poll_ctx_t*)loop->iowatcher;
+    int idx = io->event_index[0];
     if (idx < 0) {
-        event->event_index[0] = idx = poll_ctx->nfds;
+        io->event_index[0] = idx = poll_ctx->nfds;
         poll_ctx->nfds++;
         if (idx == poll_ctx->capacity) {
             poll_ctx_resize(poll_ctx, poll_ctx->capacity*2);
         }
-        poll_ctx->fds[idx].fd = event->fd;
+        poll_ctx->fds[idx].fd = io->fd;
         poll_ctx->fds[idx].events = 0;
         poll_ctx->fds[idx].revents = 0;
     }
-    assert(poll_ctx->fds[idx].fd == event->fd);
-    if (type & READ_EVENT) {
+    assert(poll_ctx->fds[idx].fd == io->fd);
+    if (events & READ_EVENT) {
         poll_ctx->fds[idx].events |= POLLIN;
     }
-    if (type & WRITE_EVENT) {
+    if (events & WRITE_EVENT) {
         poll_ctx->fds[idx].events |= POLLOUT;
     }
     return 0;
 }
 
-int _del_event(hevent_t* event, int type) {
-    hloop_t* loop = event->loop;
-    poll_ctx_t* poll_ctx = (poll_ctx_t*)loop->event_ctx;
+int iowatcher_del_event(hio_t* io, int events) {
+    hloop_t* loop = io->loop;
+    poll_ctx_t* poll_ctx = (poll_ctx_t*)loop->iowatcher;
     if (poll_ctx == NULL)  return 0;
 
-    int idx = event->event_index[0];
+    int idx = io->event_index[0];
     if (idx < 0) return 0;
-    assert(poll_ctx->fds[idx].fd == event->fd);
-    if (type & READ_EVENT) {
+    assert(poll_ctx->fds[idx].fd == io->fd);
+    if (events & READ_EVENT) {
         poll_ctx->fds[idx].events &= ~POLLIN;
     }
-    if (type & WRITE_EVENT) {
+    if (events & WRITE_EVENT) {
         poll_ctx->fds[idx].events &= ~POLLOUT;
     }
     if (poll_ctx->fds[idx].events == 0) {
-        event->event_index[0] = -1;
+        io->event_index[0] = -1;
         poll_ctx->nfds--;
         if (idx < poll_ctx->nfds) {
             poll_ctx->fds[idx] = poll_ctx->fds[poll_ctx->nfds];
-            auto iter = loop->events.find(poll_ctx->fds[idx].fd);
-            if (iter != loop->events.end()) {
+            auto iter = loop->ios.find(poll_ctx->fds[idx].fd);
+            if (iter != loop->ios.end()) {
                 iter->second->event_index[0] = idx;
             }
         }
@@ -97,8 +94,8 @@ int _del_event(hevent_t* event, int type) {
     return 0;
 }
 
-int _handle_events(hloop_t* loop, int timeout) {
-    poll_ctx_t* poll_ctx = (poll_ctx_t*)loop->event_ctx;
+int iowatcher_poll_events(hloop_t* loop, int timeout) {
+    poll_ctx_t* poll_ctx = (poll_ctx_t*)loop->iowatcher;
     if (poll_ctx == NULL)  return 0;
     if (poll_ctx->nfds == 0)   return 0;
     int npoll = poll(poll_ctx->fds, poll_ctx->nfds, timeout);
@@ -114,17 +111,10 @@ int _handle_events(hloop_t* loop, int timeout) {
         short revents = poll_ctx->fds[i].revents;
         if (revents) {
             ++nevent;
-            auto iter = loop->events.find(fd);
-            if (iter == loop->events.end()) {
-                continue;
-            }
-            hevent_t* event = iter->second;
-            if (revents & POLLIN) {
-                _on_read(event);
-            }
-            if (revents & POLLOUT) {
-                _on_write(event);
-            }
+            hio_t* io = hio_get(loop, fd);
+            if (io == NULL) continue;
+            io->revents = revents;
+            hio_handle_events(io);
         }
     }
     return nevent;

+ 30 - 28
event/select.cpp

@@ -1,4 +1,4 @@
-#include "hevent.h"
+#include "io_watcher.h"
 
 #ifdef EVENT_SELECT
 #include "hplatform.h"
@@ -7,6 +7,7 @@
 #endif
 
 #include "hdef.h"
+#include "hio.h"
 
 typedef struct select_ctx_s {
     int max_fd;
@@ -16,40 +17,40 @@ typedef struct select_ctx_s {
     int nwrite;
 } select_ctx_t;
 
-int _event_init(hloop_t* loop) {
-    if (loop->event_ctx) return 0;
+int iowatcher_init(hloop_t* loop) {
+    if (loop->iowatcher) return 0;
     select_ctx_t* select_ctx = (select_ctx_t*)malloc(sizeof(select_ctx_t));
     select_ctx->max_fd = -1;
     FD_ZERO(&select_ctx->readfds);
     FD_ZERO(&select_ctx->writefds);
     select_ctx->nread = 0;
     select_ctx->nwrite = 0;
-    loop->event_ctx = select_ctx;
+    loop->iowatcher = select_ctx;
     return 0;
 }
 
-int _event_cleanup(hloop_t* loop) {
-    SAFE_FREE(loop->event_ctx);
+int iowatcher_cleanup(hloop_t* loop) {
+    SAFE_FREE(loop->iowatcher);
     return 0;
 }
 
-int _add_event(hevent_t* event, int type) {
-    hloop_t* loop = event->loop;
-    if (loop->event_ctx == NULL) {
-        hloop_event_init(loop);
+int iowatcher_add_event(hio_t* io, int events) {
+    hloop_t* loop = io->loop;
+    if (loop->iowatcher == NULL) {
+        hloop_iowatcher_init(loop);
     }
-    select_ctx_t* select_ctx = (select_ctx_t*)loop->event_ctx;
-    int fd = event->fd;
+    select_ctx_t* select_ctx = (select_ctx_t*)loop->iowatcher;
+    int fd = io->fd;
     if (fd > select_ctx->max_fd) {
         select_ctx->max_fd = fd;
     }
-    if (type & READ_EVENT) {
+    if (events & READ_EVENT) {
         if (!FD_ISSET(fd, &select_ctx->readfds)) {
             FD_SET(fd, &select_ctx->readfds);
             select_ctx->nread++;
         }
     }
-    if (type & WRITE_EVENT) {
+    if (events & WRITE_EVENT) {
         if (!FD_ISSET(fd, &select_ctx->writefds)) {
             FD_SET(fd, &select_ctx->writefds);
             select_ctx->nwrite++;
@@ -58,21 +59,21 @@ int _add_event(hevent_t* event, int type) {
     return 0;
 }
 
-int _del_event(hevent_t* event, int type) {
-    hloop_t* loop = event->loop;
-    select_ctx_t* select_ctx = (select_ctx_t*)loop->event_ctx;
+int iowatcher_del_event(hio_t* io, int events) {
+    hloop_t* loop = io->loop;
+    select_ctx_t* select_ctx = (select_ctx_t*)loop->iowatcher;
     if (select_ctx == NULL)    return 0;
-    int fd = event->fd;
+    int fd = io->fd;
     if (fd == select_ctx->max_fd) {
         select_ctx->max_fd = -1;
     }
-    if (type & READ_EVENT) {
+    if (events & READ_EVENT) {
         if (FD_ISSET(fd, &select_ctx->readfds)) {
             FD_CLR(fd, &select_ctx->readfds);
             select_ctx->nread--;
         }
     }
-    if (type & WRITE_EVENT) {
+    if (events & WRITE_EVENT) {
         if (FD_ISSET(fd, &select_ctx->writefds)) {
             FD_CLR(fd, &select_ctx->writefds);
             select_ctx->nwrite--;
@@ -81,8 +82,8 @@ int _del_event(hevent_t* event, int type) {
     return 0;
 }
 
-int _handle_events(hloop_t* loop, int timeout) {
-    select_ctx_t* select_ctx = (select_ctx_t*)loop->event_ctx;
+int iowatcher_poll_events(hloop_t* loop, int timeout) {
+    select_ctx_t* select_ctx = (select_ctx_t*)loop->iowatcher;
     if (select_ctx == NULL)    return 0;
     if (select_ctx->nread == 0 && select_ctx->nwrite == 0) {
         return 0;
@@ -91,7 +92,7 @@ int _handle_events(hloop_t* loop, int timeout) {
     fd_set  readfds = select_ctx->readfds;
     fd_set  writefds = select_ctx->writefds;
     if (max_fd == -1) {
-        for (auto& pair : loop->events) {
+        for (auto& pair : loop->ios) {
             int fd = pair.first;
             if (fd > max_fd) {
                 max_fd = fd;
@@ -122,19 +123,20 @@ int _handle_events(hloop_t* loop, int timeout) {
     }
     if (nselect == 0)   return 0;
     int nevent = 0;
-    auto iter = loop->events.begin();
-    while (iter != loop->events.end()) {
+    auto iter = loop->ios.begin();
+    while (iter != loop->ios.end()) {
         if (nevent == nselect) break;
         int fd = iter->first;
-        hevent_t* event = iter->second;
+        hio_t* io = iter->second;
         if (FD_ISSET(fd, &readfds)) {
             ++nevent;
-            _on_read(event);
+            io->revents |= READ_EVENT;
         }
         if (FD_ISSET(fd, &writefds)) {
             ++nevent;
-            _on_write(event);
+            io->revents |= WRITE_EVENT;
         }
+        hio_handle_events(io);
         ++iter;
     }
     return nevent;

+ 40 - 63
examples/client.cpp

@@ -1,10 +1,9 @@
 #include "hloop.h"
-
-#include "htime.h"
+#include "hio.h"
 #include "hsocket.h"
 
-#define RECV_BUFSIZE    8192
-#define SEND_BUFSIZE    8192
+#define RECV_BUFSIZE    4096
+static char readbuf[RECV_BUFSIZE];
 
 void on_timer(htimer_t* timer, void* userdata) {
     static int cnt = 0;
@@ -16,72 +15,50 @@ void on_idle(hidle_t* idle, void* userdata) {
     printf("on_idle idle_id=%d cnt=%d\n", idle->idle_id, ++cnt);
 }
 
-void on_read(hevent_t* event, void* userdata) {
-    printf("on_read fd=%d\n", event->fd);
-    char recvbuf[RECV_BUFSIZE] = {0};
-    int nrecv;
-recv:
-    memset(recvbuf, 0, sizeof(recvbuf));
-    nrecv = recv(event->fd, recvbuf, sizeof(recvbuf), 0);
-    printf("recv retval=%d\n", nrecv);
-    if (nrecv < 0) {
-        if (sockerrno == NIO_EAGAIN) {
-            //goto recv_done;
-            return;
-        }
-        else {
-            perror("recv");
-            goto recv_error;
-        }
-    }
-    if (nrecv == 0) {
-        goto disconnect;
-    }
-    printf("> %s\n", recvbuf);
-    if (nrecv == sizeof(recvbuf)) {
-        goto recv;
-    }
-recv_done:
-    return;
-recv_error:
-disconnect:
-    printf("closesocket fd=%d\n", event->fd);
-    closesocket(event->fd);
-    hevent_del(event);
+void on_write(hio_t* io, const void* buf, int writebytes, void* userdata) {
+    printf("on_write fd=%d writebytes=%d\n", io->fd, writebytes);
 }
 
-void on_stdin(hevent_t* event, void* userdata) {
-    printf("on_stdin fd=%d\n", event->fd);
-    int connfd = (int)(long)userdata;
-    char sendbuf[RECV_BUFSIZE] = {0};
-    int nread, nsend;
-read:
-    memset(sendbuf, 0, sizeof(sendbuf));
-    nread = read(0, sendbuf, sizeof(sendbuf));
-send:
-    nsend = send(connfd, sendbuf, nread, 0);
-    printf("send retval=%d\n", nsend);
-    printf("< %s\n", sendbuf);
+void on_stdin(hio_t* io, void* buf, int readbytes, void* userdata) {
+    printf("on_stdin fd=%d readbytes=%d\n", io->fd, readbytes);
+    printf("> %s\n", io->readbuf);
+
+    hio_t* iosock = (hio_t*)io->read_userdata;
+    hwrite(iosock->loop, iosock->fd, io->readbuf, readbytes, on_write, NULL);
 }
 
-void on_connect(hevent_t* event, void* userdata) {
-    printf("on_connect connfd=%d\n", event->fd);
-    struct sockaddr_in localaddr,peeraddr;
-    socklen_t addrlen = sizeof(struct sockaddr_in);
-    int ret = getpeername(event->fd, (struct sockaddr*)&peeraddr, &addrlen);
-    if (ret < 0) {
-        printf("connect failed: %s: %d\n", strerror(sockerrno), sockerrno);
-        closesocket(event->fd);
-        return;
-    }
+void on_read(hio_t* io, void* buf, int readbytes, void* userdata) {
+    printf("on_read fd=%d readbytes=%d\n", io->fd, readbytes);
+    printf("< %s\n", io->readbuf);
+    printf(">>");
+    fflush(stdout);
+}
+
+void on_close(hio_t* io, void* userdata) {
+    printf("on_close fd=%d\n", io->fd);
+    hio_t* iostdin = (hio_t*)userdata;
+    hio_del(iostdin);
+}
+
+void on_connect(hio_t* io, int state, void* userdata) {
+    printf("on_connect fd=%d state=%d\n", io->fd, state);
+    if (state == 0) return;
+    struct sockaddr_in localaddr, peeraddr;
+    socklen_t addrlen;
+    addrlen = sizeof(struct sockaddr_in);
+    getsockname(io->fd, (struct sockaddr*)&localaddr, &addrlen);
     addrlen = sizeof(struct sockaddr_in);
-    getsockname(event->fd, (struct sockaddr*)&localaddr, &addrlen);
-    printf("connect connfd=%d [%s:%d] => [%s:%d]\n", event->fd,
+    getpeername(io->fd, (struct sockaddr*)&peeraddr, &addrlen);
+    printf("connect connfd=%d [%s:%d] => [%s:%d]\n", io->fd,
             inet_ntoa(localaddr.sin_addr), ntohs(localaddr.sin_port),
             inet_ntoa(peeraddr.sin_addr), ntohs(peeraddr.sin_port));
 
-    hevent_read(event->loop, event->fd, on_read, NULL);
-    hevent_read(event->loop, 0, on_stdin, (void*)(long)event->fd);
+    // NOTE: just on loop, readbuf can be shared.
+    hio_t* iostdin = hread(io->loop, 0, readbuf, RECV_BUFSIZE, on_stdin, io);
+    hread(io->loop, io->fd, readbuf, RECV_BUFSIZE, on_read, NULL, on_close, iostdin);
+
+    printf(">>");
+    fflush(stdout);
 }
 
 int main(int argc, char** argv) {
@@ -102,7 +79,7 @@ int main(int argc, char** argv) {
     hloop_init(&loop);
     //hidle_add(&loop, on_idle, NULL);
     //htimer_add(&loop, on_timer, NULL, 1000, INFINITE);
-    hevent_connect(&loop, connfd, on_connect, NULL);
+    hconnect(&loop, connfd, on_connect, NULL);
     hloop_run(&loop);
 
     return 0;

+ 26 - 66
examples/server.cpp

@@ -1,9 +1,8 @@
 #include "hloop.h"
-
-#include "htime.h"
 #include "hsocket.h"
 
-#define RECV_BUFSIZE    8192
+#define RECV_BUFSIZE    4096
+static char readbuf[RECV_BUFSIZE];
 
 void on_timer(htimer_t* timer, void* userdata) {
     static int cnt = 0;
@@ -15,76 +14,37 @@ void on_idle(hidle_t* idle, void* userdata) {
     printf("on_idle idle_id=%d cnt=%d\n", idle->idle_id, ++cnt);
 }
 
-void on_read(hevent_t* event, void* userdata) {
-    printf("on_read fd=%d\n", event->fd);
-    char recvbuf[RECV_BUFSIZE] = {0};
-    int nrecv, nsend;
-recv:
-    memset(recvbuf, 0, sizeof(recvbuf));
-    nrecv = recv(event->fd, recvbuf, sizeof(recvbuf), 0);
-    printf("recv retval=%d\n", nrecv);
-    if (nrecv < 0) {
-        if (sockerrno == NIO_EAGAIN) {
-            goto recv_done;
-        }
-        else {
-            perror("recv");
-            goto recv_error;
-        }
-    }
-    if (nrecv == 0) {
-        goto disconnect;
-    }
-    printf("> %s\n", recvbuf);
-    if (nrecv == sizeof(recvbuf)) {
-        goto recv;
-    }
+void on_close(hio_t* io, void* userdata) {
+    printf("on_close fd=%d\n", io->fd);
+}
 
-recv_done:
-send:
-    static const char* http_response = "HTTP/1.1 200 OK\r\n\r\n";
-    nsend = send(event->fd, http_response, strlen(http_response), 0);
-    printf("send retval=%d\n", nsend);
-    printf("< %s\n", http_response);
-    return;
+void on_write(hio_t* io, const void* buf, int writebytes, void* userdata) {
+    printf("on_write fd=%d writebytes=%d\n", io->fd, writebytes);
+}
 
-recv_error:
-disconnect:
-    printf("closesocket fd=%d\n", event->fd);
-    closesocket(event->fd);
-    hevent_del(event);
+void on_read(hio_t* io, void* buf, int readbytes, void* userdata) {
+    printf("on_read fd=%d readbytes=%d\n", io->fd, readbytes);
+    printf("< %s\n", io->readbuf);
+    // echo
+    printf("> %s\n", io->readbuf);
+    hwrite(io->loop, io->fd, io->readbuf, readbytes, on_write, NULL);
 }
 
-void on_accept(hevent_t* event, void* userdata) {
-    printf("on_accept listenfd=%d\n", event->fd);
+void on_accept(hio_t* io, int connfd, void* userdata) {
+    printf("on_accept listenfd=%d connfd=%d\n", io->fd, connfd);
     struct sockaddr_in localaddr, peeraddr;
-    socklen_t addrlen = sizeof(struct sockaddr_in);
-    getsockname(event->fd, (struct sockaddr*)&localaddr, &addrlen);
-accept:
+    socklen_t addrlen;
     addrlen = sizeof(struct sockaddr_in);
-    int connfd = accept(event->fd, (struct sockaddr*)&peeraddr, &addrlen);
-    if (connfd < 0) {
-        if (sockerrno == NIO_EAGAIN) {
-            //goto accept_done;
-            return;
-        }
-        else {
-            perror("accept");
-            goto accept_error;
-        }
-    }
-    printf("accept connfd=%d [%s:%d] => [%s:%d]\n", connfd,
-            inet_ntoa(peeraddr.sin_addr), ntohs(peeraddr.sin_port),
-            inet_ntoa(localaddr.sin_addr), ntohs(localaddr.sin_port));
+    getsockname(connfd, (struct sockaddr*)&localaddr, &addrlen);
+    addrlen = sizeof(struct sockaddr_in);
+    getpeername(connfd, (struct sockaddr*)&peeraddr, &addrlen);
+    printf("accept connfd=%d [%s:%d] <= [%s:%d]\n", connfd,
+            inet_ntoa(localaddr.sin_addr), ntohs(localaddr.sin_port),
+            inet_ntoa(peeraddr.sin_addr), ntohs(peeraddr.sin_port));
 
     nonblocking(connfd);
-    hevent_read(event->loop, connfd, on_read, NULL);
-
-    goto accept;
-
-accept_error:
-    closesocket(event->fd);
-    hevent_del(event);
+    // one loop can use one readbuf
+    hread(io->loop, connfd, readbuf, RECV_BUFSIZE, on_read, NULL, on_close, NULL);
 }
 
 int main(int argc, char** argv) {
@@ -104,6 +64,6 @@ int main(int argc, char** argv) {
     hloop_init(&loop);
     //hidle_add(&loop, on_idle, NULL);
     //htimer_add(&loop, on_timer, NULL, 1000, INFINITE);
-    hevent_accept(&loop, listenfd, on_accept, NULL);
+    haccept(&loop, listenfd, on_accept, NULL);
     hloop_run(&loop);
 }

+ 1 - 1
http/server/FileCache.h

@@ -91,7 +91,7 @@ public:
             strftime(fc->last_modified, sizeof(fc->last_modified), "%a, %d %b %Y %H:%M:%S GMT", gmtime(&tt));
             MD5_CTX md5_ctx;
             MD5Init(&md5_ctx);
-            MD5Update(&md5_ctx, fc->filebuf.base, fc->filebuf.len);
+            MD5Update(&md5_ctx, (unsigned char*)fc->filebuf.base, fc->filebuf.len);
             unsigned char digital[16];
             MD5Final(digital, &md5_ctx);
             char* md5 = fc->etag;

+ 40 - 84
http/server/http_server.cpp

@@ -3,6 +3,7 @@
 #include "h.h"
 #include "hmain.h"
 #include "hloop.h"
+#include "hbuf.h"
 
 #include "HttpParser.h"
 #include "FileCache.h"
@@ -77,38 +78,22 @@ struct http_connect_userdata {
     }
 };
 
-static void on_read(hevent_t* event, void* userdata) {
-    //printf("on_read fd=%d\n", event->fd);
+static void on_read(hio_t* io, void* buf, int readbytes, void* userdata) {
+    //printf("on_read fd=%d readbytes=%d\n", io->fd, readbytes);
     http_connect_userdata* hcu = (http_connect_userdata*)userdata;
     HttpService* service = hcu->server->service;
     HttpRequest* req = &hcu->req;
     HttpResponse* res = &hcu->res;
-    char recvbuf[RECV_BUFSIZE] = {0};
-    int ret, nrecv, nparse, nsend;
-recv:
+    int ret, nparse;
+    char* recvbuf = (char*)buf;
+    int nrecv = readbytes;
     // recv -> http_parser -> http_request -> http_request_handler -> http_response -> send
-    nrecv = recv(event->fd, recvbuf, sizeof(recvbuf), 0);
-    //printf("recv retval=%d\n", nrecv);
-    if (nrecv < 0) {
-        if (sockerrno == NIO_EAGAIN) {
-            //goto recv_done;
-            return;
-        }
-        else {
-            perror("recv");
-            hcu->log += asprintf("recv: %s", strerror(errno));
-            goto recv_error;
-        }
-    }
-    if (nrecv == 0) {
-        hcu->log += "disconnect";
-        goto disconnect;
-    }
     //printf("%s\n", recvbuf);
     nparse = hcu->parser.execute(recvbuf, nrecv);
     if (nparse != nrecv || hcu->parser.get_errno() != HPE_OK) {
         hcu->log += asprintf("http parser error: %s", http_errno_description(hcu->parser.get_errno()));
-        goto parser_error;
+        hclose(io);
+        return;
     }
     if (hcu->parser.get_state() == HP_MESSAGE_COMPLETE) {
         http_api_handler api = NULL;
@@ -218,78 +203,45 @@ recv:
         }
 
         // send header
-        nsend = send(event->fd, header.c_str(), header.size(), 0);
-        if (nsend != header.size()) {
-            hcu->log += asprintf("send header: %s", strerror(errno));
-            goto send_error;
-        }
+        hwrite(io->loop, io->fd, header.c_str(), header.size());
         // send body
         if (!send_in_one_packet && content_length != 0) {
-            //queue send ?
-            //if (content_length > SEND_BUFSIZE) {
-            //}
-            nsend = send(event->fd, content, content_length, 0);
-            if (nsend != res->body.size()) {
-                hcu->log += asprintf("send body: %s", strerror(errno));
-                goto send_error;
-            }
+            hwrite(io->loop, io->fd, content, content_length);
         }
         hcu->log += asprintf("=>[%d %s]", res->status_code, http_status_str(res->status_code));
         hlogi("%s", hcu->log.c_str());
-        goto end;
-    }
-    if (nrecv == sizeof(recvbuf)) {
-        goto recv;
+        hclose(io);
     }
-    goto end;
+}
 
-recv_error:
-disconnect:
-parser_error:
-send_error:
-    hloge("%s", hcu->log.c_str());
-end:
-    closesocket(event->fd);
-    hevent_del(event);
-    delete hcu;
+static void on_close(hio_t* io, void* userdata) {
+    http_connect_userdata* hcu = (http_connect_userdata*)userdata;
+    if (hcu) {
+        hlogi("%s", hcu->log.c_str());
+        delete hcu;
+    }
 }
 
-static void on_accept(hevent_t* event, void* userdata) {
-    //printf("on_accept listenfd=%d\n", event->fd);
+static void on_accept(hio_t* io, int connfd, void* userdata) {
+    //printf("on_accept listenfd=%d connfd=%d\n", io->fd, connfd);
     struct sockaddr_in localaddr, peeraddr;
-    socklen_t addrlen = sizeof(struct sockaddr_in);
-    getsockname(event->fd, (struct sockaddr*)&localaddr, &addrlen);
-accept:
+    socklen_t addrlen;
     addrlen = sizeof(struct sockaddr_in);
-    int connfd = accept(event->fd, (struct sockaddr*)&peeraddr, &addrlen);
-    if (connfd < 0) {
-        if (sockerrno == NIO_EAGAIN) {
-            //goto accept_done;
-            return;
-        }
-        else {
-            perror("accept");
-            hloge("accept failed: %s: %d", strerror(sockerrno), sockerrno);
-            goto accept_error;
-        }
-    }
-
-    {
-        // new http_connect
-        // delete on on_read
-        http_connect_userdata* hcu = new http_connect_userdata;
-        hcu->server = (http_server_t*)userdata;
-        hcu->log += asprintf("[%s:%d]", inet_ntoa(peeraddr.sin_addr), ntohs(peeraddr.sin_port));
-
-        nonblocking(connfd);
-        hevent_read(event->loop, connfd, on_read, hcu);
-    }
-
-    goto accept;
+    getsockname(connfd, (struct sockaddr*)&localaddr, &addrlen);
+    addrlen = sizeof(struct sockaddr_in);
+    getpeername(connfd, (struct sockaddr*)&peeraddr, &addrlen);
+    //printf("accept connfd=%d [%s:%d] <= [%s:%d]\n", connfd,
+            //inet_ntoa(localaddr.sin_addr), ntohs(localaddr.sin_port),
+            //inet_ntoa(peeraddr.sin_addr), ntohs(peeraddr.sin_port));
+    // new http_connect_userdata
+    // delete on_close
+    http_connect_userdata* hcu = new http_connect_userdata;
+    hcu->server = (http_server_t*)userdata;
+    hcu->log += asprintf("[%s:%d]", inet_ntoa(peeraddr.sin_addr), ntohs(peeraddr.sin_port));
 
-accept_error:
-    closesocket(event->fd);
-    hevent_del(event);
+    nonblocking(connfd);
+    HBuf* buf = (HBuf*)io->loop->custom_data.ptr;
+    hread(io->loop, connfd, buf->base, buf->len, on_read, hcu, on_close, hcu);
 }
 
 void handle_cached_files(htimer_t* timer, void* userdata) {
@@ -319,7 +271,11 @@ static void worker_proc(void* userdata) {
     hloop_t loop;
     hloop_init(&loop);
     htimer_add(&loop, handle_cached_files, &s_filecache, s_filecache.file_cached_time*1000);
-    hevent_accept(&loop, listenfd, on_accept, server);
+    // one loop one readbuf.
+    HBuf readbuf;
+    readbuf.resize(RECV_BUFSIZE);
+    loop.custom_data.ptr = &readbuf;
+    haccept(&loop, listenfd, on_accept, server);
     hloop_run(&loop);
 }
 

+ 4 - 2
utils/hmain.cpp

@@ -396,9 +396,11 @@ static HANDLE s_hEventReload = NULL;
 void WINAPI on_timer(UINT uTimerID, UINT uMsg, DWORD_PTR dwUser, DWORD_PTR dw1, DWORD_PTR dw2) {
     DWORD ret = WaitForSingleObject(s_hEventTerm, 0);
     if (ret == WAIT_OBJECT_0) {
-        timeKillEvent(uTimerID);
         hlogi("pid=%d recv event [TERM]", getpid());
-        exit(0);
+        if (getpid_from_pidfile() == getpid()) {
+            timeKillEvent(uTimerID);
+            exit(0);
+        }
     }
 
     ret = WaitForSingleObject(s_hEventReload, 0);