Browse Source

add iocp/overlapio

ithewei 6 years ago
parent
commit
cb28b5b5ff
20 changed files with 685 additions and 136 deletions
  1. 3 3
      Makefile
  2. 1 1
      README.md
  3. 1 0
      base/hplatform.h
  4. 1 1
      base/hsocket.c
  5. 27 9
      base/hstring.cpp
  6. 1 0
      base/hstring.h
  7. 1 1
      etc/httpd.conf
  8. 40 6
      event/hloop.c
  9. 10 4
      event/hloop.h
  10. 27 18
      event/iocp.c
  11. 1 1
      event/iowatcher.h
  12. 67 19
      event/nio.c
  13. 2 0
      event/noevent.c
  14. 420 0
      event/overlapio.c
  15. 30 0
      event/overlapio.h
  16. 4 0
      event/poll.c
  17. 0 26
      event/wsaio.c
  18. 9 16
      examples/client.cpp
  19. 14 18
      examples/server.cpp
  20. 26 13
      http/server/http_server.cpp

+ 3 - 3
Makefile

@@ -3,7 +3,7 @@ TMPDIR=tmp
 
 default: all
 
-all: test client server httpd webbench
+all: test loop client server httpd
 
 clean:
 	$(MAKEF) clean SRCDIRS=". base utils event http http/client http/server examples $(TMPDIR)"
@@ -38,7 +38,7 @@ httpd: prepare
 	cp examples/http_api_test.h $(TMPDIR)/http_api_test.h
 	$(MAKEF) TARGET=$@ SRCDIRS=". base utils event http http/server $(TMPDIR)"
 
-webbench: prepare
+webbench:
 	$(MAKEF) TARGET=$@ SRCDIRS="" SRCS="examples/webbench.c"
 
 # curl
@@ -48,4 +48,4 @@ CURL_SRCS    += examples/curl.cpp base/hstring.cpp
 curl:
 	$(MAKEF) TARGET=$@ SRCDIRS="$(CURL_SRCDIRS)" INCDIRS="$(CURL_INCDIRS)" SRCS="$(CURL_SRCS)" DEFINES="CURL_STATICLIB" LIBS="curl"
 
-.PHONY: clean prepare test client server curl httpd webbench
+.PHONY: clean prepare test loop client server httpd webbench curl

+ 1 - 1
README.md

@@ -62,6 +62,6 @@ hw 是一套跨平台c++工具集,类名以H开头
 - make test: 服务端master-workers model
 - make loop: 事件循环(包含timer、io、idle)
 - make client server:非阻塞socket
-- make httpd: http服务(包含web service和api service),性能测试仅比nginx差一点(估计是nginx accept_mutex规避了惊群效应)
+- make httpd: http服务(包含web service和api service)
 - make curl: 基于libcurl封装http客户端
 - make webbench: http服务压力测试程序

+ 1 - 0
base/hplatform.h

@@ -97,6 +97,7 @@
     #define _CRT_SECURE_NO_WARNINGS
     #define _WINSOCK_DEPRECATED_NO_WARNINGS
     #include <winsock2.h>
+    #include <ws2tcpip.h>   // for inet_pton,inet_ntop
     #include <windows.h>
     #include <process.h>    // for getpid,exec
     #include <direct.h>     // for mkdir,rmdir,chdir,getcwd

+ 1 - 1
base/hsocket.c

@@ -39,7 +39,7 @@ int Connect(const char* host, int port, int nonblock) {
     socklen_t addrlen = sizeof(addr);
     memset(&addr, 0, addrlen);
     addr.sin_family = AF_INET;
-    addr.sin_addr.s_addr = inet_addr(host);
+    inet_pton(AF_INET, host, &addr.sin_addr);
     if (addr.sin_addr.s_addr == INADDR_NONE) {
         struct hostent* phe = gethostbyname(host);
         if (phe == NULL)    return -10;

+ 27 - 9
base/hstring.cpp

@@ -30,25 +30,43 @@ char* strlower(char* str) {
     return str;
 }
 
-int vscprintf(const char* fmt, va_list ap) {
+char* strreverse(char* str) {
+    if (str == NULL) return NULL;
+    char* b = str;
+    char* e = str;
+    while(*e) {++e;}
+    --e;
+    char tmp;
+    while (e > b) {
+        tmp = *e;
+        *e = *b;
+        *b = tmp;
+        --e;
+        ++b;
+    }
+    return str;
+}
+
+static inline int vscprintf(const char* fmt, va_list ap) {
     return vsnprintf(NULL, 0, fmt, ap);
 }
 
 string asprintf(const char* fmt, ...) {
     va_list ap;
     va_start(ap, fmt);
-    int len = vscprintf(fmt, ap) + 1;
+    int len = vscprintf(fmt, ap);
     va_end(ap);
-    // must recall va_start in linux
+
+    string str;
+    str.reserve(len+1);
+    // must resize to set str.size
+    str.resize(len);
+    // must recall va_start on unix
     va_start(ap, fmt);
-    char* buf = (char*)malloc(len);
-    memset(buf, 0, len);
-    vsnprintf(buf, len, fmt, ap);
+    vsnprintf((char*)str.data(), len+1, fmt, ap);
     va_end(ap);
 
-    string res(buf);
-    free(buf);
-    return res;
+    return str;
 }
 
 StringList split(const string& str, char delim) {

+ 1 - 0
base/hstring.h

@@ -29,6 +29,7 @@ public:
 
 char* strupper(char* str);
 char* strlower(char* str);
+char* strreverse(char* str);
 
 string asprintf(const char* fmt, ...);
 StringList split(const string& str, char delim);

+ 1 - 1
etc/httpd.conf

@@ -7,7 +7,7 @@ log_remain_days = 3
 
 # worker_processes = 4
 # auto = ncpu
-worker_processes = auto
+worker_processes = 1
 
 # http server
 port = 8080

+ 40 - 6
event/hloop.c

@@ -11,6 +11,10 @@
 
 #define IO_ARRAY_INIT_SIZE      64
 
+static void hio_init(hio_t* io);
+static void hio_deinit(hio_t* io);
+static void hio_free(hio_t* io);
+
 static int timers_compare(const struct heap_node* lhs, const struct heap_node* rhs) {
     return TIMER_ENTRY(lhs)->next_timeout < TIMER_ENTRY(rhs)->next_timeout;
 }
@@ -118,7 +122,7 @@ static int hloop_process_events(hloop_t* loop) {
     int nios, ntimers, nidles;
     nios = ntimers = nidles = 0;
 
-    int64_t blocktime = 0;
+    int32_t blocktime = MAX_BLOCK_TIME;
     hloop_update_time(loop);
     if (loop->timers.root) {
         uint64_t next_min_timeout = TIMER_ENTRY(loop->timers.root)->next_timeout;
@@ -126,9 +130,9 @@ static int hloop_process_events(hloop_t* loop) {
         if (blocktime <= 0) goto process_timers;
         blocktime /= 1000;
         ++blocktime;
+        blocktime = MIN(blocktime, MAX_BLOCK_TIME);
     }
 
-    blocktime = MIN(blocktime, MAX_BLOCK_TIME);
     if (loop->nios) {
         nios = hloop_process_ios(loop, blocktime);
     }
@@ -158,7 +162,9 @@ int hloop_init(hloop_t* loop) {
     list_init(&loop->idles);
     // timers
     heap_init(&loop->timers, timers_compare);
-    // iowatcher
+    // ios: init when hio_add
+    //io_array_init(&loop->ios, IO_ARRAY_INIT_SIZE);
+    // iowatcher: init when iowatcher_add_event
     //iowatcher_init(loop);
     // time
     time(&loop->start_time);
@@ -188,6 +194,14 @@ void hloop_cleanup(hloop_t* loop) {
         free(timer);
     }
     heap_init(&loop->timers, NULL);
+    // ios
+    for (int i = 0; i < loop->ios.maxsize; ++i) {
+        hio_t* io = loop->ios.ptr[i];
+        if (io) {
+            hio_free(io);
+        }
+    }
+    io_array_cleanup(&loop->ios);
     // iowatcher
     iowatcher_cleanup(loop);
 };
@@ -297,11 +311,11 @@ void hio_init(hio_t* io) {
     memset(io, 0, sizeof(hio_t));
     io->event_type = HEVENT_TYPE_IO;
     io->event_index[0] = io->event_index[1] = -1;
-    // move to hwrite
+    // write_queue init when hwrite try_write failed
     //write_queue_init(&io->write_queue, 4);;
 }
 
-void hio_cleanup(hio_t* io) {
+void hio_deinit(hio_t* io) {
     offset_buf_t* pbuf = NULL;
     while (!write_queue_empty(&io->write_queue)) {
         pbuf = write_queue_front(&io->write_queue);
@@ -311,6 +325,14 @@ void hio_cleanup(hio_t* io) {
     write_queue_cleanup(&io->write_queue);
 }
 
+void hio_free(hio_t* io) {
+    if (io == NULL) return;
+    hio_deinit(io);
+    SAFE_FREE(io->localaddr);
+    SAFE_FREE(io->peeraddr);
+    free(io);
+}
+
 hio_t* hio_add(hloop_t* loop, hio_cb cb, int fd, int events) {
     if (loop->ios.maxsize == 0) {
         io_array_init(&loop->ios, IO_ARRAY_INIT_SIZE);
@@ -349,8 +371,20 @@ void hio_del(hio_t* io, int events) {
     io->events &= ~events;
     if (io->events == 0) {
         io->loop->nios--;
-        hio_cleanup(io);
+        hio_deinit(io);
         EVENT_DEL(io);
     }
 }
 
+#include "hsocket.h"
+hio_t* hlisten (hloop_t* loop, int port, haccept_cb accept_cb) {
+    int listenfd = Listen(port);
+    if (listenfd < 0) {
+        return NULL;
+    }
+    hio_t* io = haccept(loop, listenfd, accept_cb);
+    if (io == NULL) {
+        closesocket(listenfd);
+    }
+    return io;
+}

+ 10 - 4
event/hloop.h

@@ -106,12 +106,15 @@ struct hio_s {
     unsigned    accept      :1;
     unsigned    connect     :1;
     unsigned    closed      :1;
+    unsigned    connectex   :1; // for ConnectEx/DisconnectEx
     int         fd;
     int         error;
     int         events;
     int         revents;
-    hbuf_t              readbuf;
-    struct write_queue  write_queue;
+    struct sockaddr*    localaddr;
+    struct sockaddr*    peeraddr;
+    hbuf_t              readbuf;        // for hread
+    struct write_queue  write_queue;    // for hwrite
     // callbacks
     hread_cb    read_cb;
     hwrite_cb   write_cb;
@@ -119,7 +122,8 @@ struct hio_s {
     haccept_cb  accept_cb;
     hconnect_cb connect_cb;
 //private:
-    int         event_index[2];
+    int         event_index[2]; // for poll,kqueue
+    void*       hovlp;          // for iocp/overlapio
 };
 
 typedef enum {
@@ -209,7 +213,9 @@ void        hio_del(hio_t* io, int events DEFAULT(ALL_EVENTS));
 
 // second level apis
 hio_t* haccept  (hloop_t* loop, int listenfd, haccept_cb accept_cb);
-hio_t* hconnect (hloop_t* loop, int connfd, hconnect_cb connect_cb);
+// Listen => haccept
+hio_t* hlisten  (hloop_t* loop, int port,     haccept_cb accept_cb);
+hio_t* hconnect (hloop_t* loop, const char* host, int port, hconnect_cb connect_cb);
 hio_t* hread    (hloop_t* loop, int fd, void* buf, size_t len, hread_cb read_cb);
 hio_t* hwrite   (hloop_t* loop, int fd, const void* buf, size_t len, hwrite_cb write_cb DEFAULT(NULL));
 void   hclose   (hio_t* io);

+ 27 - 18
event/iocp.c

@@ -4,6 +4,9 @@
 #include "hplatform.h"
 #include "hdef.h"
 
+#include "hevent.h"
+#include "overlapio.h"
+
 typedef struct iocp_ctx_s {
     HANDLE      iocp;
 } iocp_ctx_t;
@@ -29,11 +32,18 @@ int iowatcher_add_event(hloop_t* loop, int fd, int events) {
         iowatcher_init(loop);
     }
     iocp_ctx_t* iocp_ctx = (iocp_ctx_t*)loop->iowatcher;
-    HANDLE h = CreateIoCompletionPort((HANDLE)fd, iocp_ctx->iocp, (ULONG_PTR)events, 0);
+    hio_t* io = loop->ios.ptr[fd];
+    if (io && io->events == 0 && events != 0) {
+        CreateIoCompletionPort((HANDLE)fd, iocp_ctx->iocp, 0, 0);
+    }
     return 0;
 }
 
 int iowatcher_del_event(hloop_t* loop, int fd, int events) {
+    hio_t* io = loop->ios.ptr[fd];
+    if ((io->events & ~events) == 0) {
+        CancelIo((HANDLE)fd);
+    }
     return 0;
 }
 
@@ -43,29 +53,28 @@ int iowatcher_poll_events(hloop_t* loop, int timeout) {
     DWORD bytes = 0;
     ULONG_PTR key = 0;
     LPOVERLAPPED povlp = NULL;
-    timeout = 3000;
     BOOL bRet = GetQueuedCompletionStatus(iocp_ctx->iocp, &bytes, &key, &povlp, timeout);
     int err = 0;
-    if (bRet == 0) {
-        err = GetLastError();
-    }
-    if (err) {
-        if (err == ERROR_NETNAME_DELETED ||
-            err == ERROR_OPERATION_ABORTED) {
+    if (povlp == NULL) {
+        err = WSAGetLastError();
+        if (err == WAIT_TIMEOUT || ERROR_NETNAME_DELETED || ERROR_OPERATION_ABORTED) {
             return 0;
         }
-        if (povlp == NULL) {
-            if (err == WAIT_TIMEOUT) return 0;
-            return -1;
-        }
-    }
-    if (povlp == NULL) {
-        return -1;
+        return -err;
     }
-    if (key == NULL) {
-        return -1;
+    hoverlapped_t* hovlp = (hoverlapped_t*)povlp;
+    hio_t* io = hovlp->io;
+    if (bRet == FALSE) {
+        err = WSAGetLastError();
+        printd("iocp ret=%d err=%d bytes=%u\n", bRet, err, bytes);
+        // NOTE: when ConnectEx failed, err != 0
+        hovlp->error = err;
     }
-    ULONG_PTR revents = key;
+    // NOTE: when WSASend/WSARecv disconnect, bytes = 0
+    hovlp->bytes = bytes;
+    io->hovlp = hovlp;
+    io->revents |= hovlp->event;
+    EVENT_PENDING(hovlp->io);
     return 1;
 }
 #endif

+ 1 - 1
event/iowatcher.h

@@ -12,7 +12,7 @@
     !defined(EVENT_PORT) &&     \
     !defined(EVENT_NOEVENT)
 #ifdef OS_WIN
-#define EVENT_SELECT
+#define EVENT_IOCP
 #elif defined(OS_LINUX)
 #define EVENT_EPOLL
 #elif defined(OS_MAC)

+ 67 - 19
event/nio.c

@@ -4,14 +4,16 @@
 
 static void nio_accept(hio_t* io) {
     //printd("nio_accept listenfd=%d\n", io->fd);
-    struct sockaddr_in peeraddr;
     socklen_t addrlen;
-    //struct sockaddr_in localaddr;
-    //addrlen = sizeof(struct sockaddr_in);
-    //getsockname(io->fd, (struct sockaddr*)&localaddr, &addrlen);
+    if (io->localaddr == NULL) {
+        io->localaddr = (struct sockaddr*)malloc(sizeof(struct sockaddr_in));
+    }
+    if (io->peeraddr == NULL) {
+        io->peeraddr = (struct sockaddr*)malloc(sizeof(struct sockaddr_in));
+    }
 accept:
     addrlen = sizeof(struct sockaddr_in);
-    int connfd = accept(io->fd, (struct sockaddr*)&peeraddr, &addrlen);
+    int connfd = accept(io->fd, io->peeraddr, &addrlen);
     if (connfd < 0) {
         if (sockerrno == NIO_EAGAIN) {
             //goto accept_done;
@@ -23,11 +25,19 @@ accept:
             goto accept_error;
         }
     }
-    //printd("accept connfd=%d [%s:%d] <= [%s:%d]\n", connfd,
-            //inet_ntoa(localaddr.sin_addr), ntohs(localaddr.sin_port),
-            //inet_ntoa(peeraddr.sin_addr), ntohs(peeraddr.sin_port));
 
+    addrlen = sizeof(struct sockaddr_in);
+    getsockname(connfd, io->localaddr, &addrlen);
     if (io->accept_cb) {
+        struct sockaddr_in* localaddr = (struct sockaddr_in*)io->localaddr;
+        struct sockaddr_in* peeraddr = (struct sockaddr_in*)io->peeraddr;
+        char localip[64];
+        char peerip[64];
+        inet_ntop(AF_INET, &localaddr->sin_addr, localip, sizeof(localip));
+        inet_ntop(AF_INET, &peeraddr->sin_addr, peerip, sizeof(peerip));
+        printd("accept listenfd=%d connfd=%d [%s:%d] <= [%s:%d]\n", io->fd, connfd,
+                localip, ntohs(localaddr->sin_port),
+                peerip, ntohs(peeraddr->sin_port));
         printd("accept_cb------\n");
         io->accept_cb(io, connfd);
         printd("accept_cb======\n");
@@ -42,22 +52,32 @@ accept_error:
 static void nio_connect(hio_t* io) {
     //printd("nio_connect connfd=%d\n", io->fd);
     int state = 0;
-    struct sockaddr_in peeraddr;
     socklen_t addrlen;
+    if (io->localaddr == NULL) {
+        io->localaddr = (struct sockaddr*)malloc(sizeof(struct sockaddr_in));
+        addrlen = sizeof(struct sockaddr_in);
+        getsockname(io->fd, io->localaddr, &addrlen);
+    }
+    if (io->peeraddr == NULL) {
+        io->peeraddr = (struct sockaddr*)malloc(sizeof(struct sockaddr_in));
+    }
     addrlen = sizeof(struct sockaddr_in);
-    int ret = getpeername(io->fd, (struct sockaddr*)&peeraddr, &addrlen);
+    int ret = getpeername(io->fd, io->peeraddr, &addrlen);
     if (ret < 0) {
         io->error = sockerrno;
-        //printd("connect failed: %s: %d\n", strerror(sockerrno), sockerrno);
+        printd("connect failed: %s: %d\n", strerror(sockerrno), sockerrno);
         state = 0;
     }
     else {
-        //struct sockaddr_in localaddr;
-        //addrlen = sizeof(struct sockaddr_in);
-        //getsockname(ioent->fd, (struct sockaddr*)&localaddr, &addrlen);
-        //printd("connect connfd=%d [%s:%d] => [%s:%d]\n", io->fd,
-                //inet_ntoa(localaddr.sin_addr), ntohs(localaddr.sin_port),
-                //inet_ntoa(peeraddr.sin_addr), ntohs(peeraddr.sin_port));
+        struct sockaddr_in* localaddr = (struct sockaddr_in*)io->localaddr;
+        struct sockaddr_in* peeraddr = (struct sockaddr_in*)io->peeraddr;
+        char localip[64];
+        char peerip[64];
+        inet_ntop(AF_INET, &localaddr->sin_addr, localip, sizeof(localip));
+        inet_ntop(AF_INET, &peeraddr->sin_addr, peerip, sizeof(peerip));
+        printd("connect connfd=%d [%s:%d] => [%s:%d]\n", io->fd,
+                localip, ntohs(localaddr->sin_port),
+                peerip, ntohs(peeraddr->sin_port));
         state = 1;
     }
     if (io->connect_cb) {
@@ -65,6 +85,9 @@ static void nio_connect(hio_t* io) {
         io->connect_cb(io, state);
         printd("connect_cb======\n");
     }
+    if (state == 0) {
+        hclose(io);
+    }
 }
 
 static void nio_read(hio_t* io) {
@@ -74,7 +97,11 @@ static void nio_read(hio_t* io) {
     int   len = io->readbuf.len;
 read:
     memset(buf, 0, len);
+#ifdef OS_UNIX
     nread = read(io->fd, buf, len);
+#else
+    nread = recv(io->fd, buf, len, 0);
+#endif
     //printd("read retval=%d\n", nread);
     if (nread < 0) {
         if (sockerrno == NIO_EAGAIN) {
@@ -115,7 +142,11 @@ write:
     offset_buf_t* pbuf = write_queue_front(&io->write_queue);
     char* buf = pbuf->base + pbuf->offset;
     int len = pbuf->len - pbuf->offset;
+#ifdef OS_UNIX
     nwrite = write(io->fd, buf, len);
+#else
+    nwrite = send(io->fd, buf, len, 0);
+#endif
     //printd("write retval=%d\n", nwrite);
     if (nwrite < 0) {
         if (sockerrno == NIO_EAGAIN) {
@@ -191,9 +222,16 @@ hio_t* haccept  (hloop_t* loop, int listenfd, haccept_cb accept_cb) {
     return io;
 }
 
-hio_t* hconnect (hloop_t* loop, int connfd, hconnect_cb connect_cb) {
+hio_t* hconnect (hloop_t* loop, const char* host, int port, hconnect_cb connect_cb) {
+    int connfd = Connect(host, port, 1);
+    if (connfd < 0) {
+        return NULL;
+    }
     hio_t* io = hio_add(loop, hio_handle_events, connfd, WRITE_EVENT);
-    if (io == NULL) return NULL;
+    if (io == NULL) {
+        closesocket(connfd);
+        return NULL;
+    }
     if (connect_cb) {
         io->connect_cb = connect_cb;
     }
@@ -222,7 +260,11 @@ hio_t* hwrite   (hloop_t* loop, int fd, const void* buf, size_t len, hwrite_cb w
     int nwrite = 0;
     if (write_queue_empty(&io->write_queue)) {
 try_write:
+#ifdef OS_UNIX
         nwrite = write(fd, buf, len);
+#else
+        nwrite = send(fd, buf, len, 0);
+#endif
         //printd("write retval=%d\n", nwrite);
         if (nwrite < 0) {
             if (sockerrno == NIO_EAGAIN) {
@@ -273,10 +315,16 @@ disconnect:
 void   hclose   (hio_t* io) {
     //printd("close fd=%d\n", io->fd);
     if (io->closed) return;
+#ifdef OS_UNIX
     close(io->fd);
+#else
+    closesocket(io->fd);
+#endif
     io->closed = 1;
     if (io->close_cb) {
+        printd("close_cb------\n");
         io->close_cb(io);
+        printd("close_cb======\n");
     }
     hio_del(io, ALL_EVENTS);
 }

+ 2 - 0
event/noevent.c

@@ -1,6 +1,7 @@
 #include "iowatcher.h"
 
 #ifdef EVENT_NOEVENT
+#include "htime.h"
 int iowatcher_init(hloop_t* loop) {
     return 0;
 }
@@ -18,6 +19,7 @@ int iowatcher_del_event(hloop_t* loop, int fd, int events) {
 }
 
 int iowatcher_poll_events(hloop_t* loop, int timeout) {
+    msleep(timeout);
     return 0;
 }
 

+ 420 - 0
event/overlapio.c

@@ -0,0 +1,420 @@
+#include "iowatcher.h"
+
+#ifdef EVENT_IOCP
+#include "overlapio.h"
+
+#define ACCEPTEX_NUM    10
+
+int post_acceptex(hio_t* listenio, hoverlapped_t* hovlp) {
+    LPFN_ACCEPTEX AcceptEx = NULL;
+    GUID guidAcceptEx = WSAID_ACCEPTEX;
+    DWORD dwbytes = 0;
+    if (WSAIoctl(listenio->fd, SIO_GET_EXTENSION_FUNCTION_POINTER,
+        &guidAcceptEx, sizeof(guidAcceptEx),
+        &AcceptEx, sizeof(AcceptEx),
+        &dwbytes, NULL, NULL) != 0) {
+        return WSAGetLastError();
+    }
+    int connfd = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
+    if (connfd < 0) {
+        return WSAGetLastError();
+    }
+    if (hovlp == NULL) {
+        hovlp = (hoverlapped_t*)malloc(sizeof(hoverlapped_t));
+        memset(hovlp, 0, sizeof(hoverlapped_t));
+        hovlp->buf.len = 20 + (sizeof(SOCKADDR_IN) + 16) * 2;
+        hovlp->buf.buf = (char*)malloc(hovlp->buf.len);
+        memset(hovlp->buf.buf, 0, hovlp->buf.len);
+    }
+    hovlp->fd = connfd;
+    hovlp->event = READ_EVENT;
+    hovlp->io = listenio;
+    if (AcceptEx(listenio->fd, connfd, hovlp->buf.buf, 0, sizeof(SOCKADDR_IN) + 16, sizeof(SOCKADDR_IN) + 16,
+        &dwbytes, &hovlp->ovlp) != TRUE) {
+        int err = WSAGetLastError();
+        if (err != ERROR_IO_PENDING) {
+            fprintf(stderr, "AcceptEx error: %d\n", err);
+            return err;
+        }
+    }
+    return 0;
+}
+
+int post_recv(hio_t* io, hoverlapped_t* hovlp) {
+    if (hovlp == NULL) {
+        hovlp = (hoverlapped_t*)malloc(sizeof(hoverlapped_t));
+        memset(hovlp, 0, sizeof(hoverlapped_t));
+    }
+    hovlp->fd = io->fd;
+    hovlp->event = READ_EVENT;
+    hovlp->io = io;
+    hovlp->buf.buf = io->readbuf.base;
+    hovlp->buf.len = io->readbuf.len;
+    memset(hovlp->buf.buf, 0, hovlp->buf.len);
+    DWORD dwbytes = 0;
+    DWORD flags = 0;
+    int ret = WSARecv(io->fd, &hovlp->buf, 1, &dwbytes, &flags, &hovlp->ovlp, NULL);
+    printd("WSARecv ret=%d bytes=%u\n", ret, dwbytes);
+    if (ret != 0) {
+        int err = WSAGetLastError();
+        if (err != ERROR_IO_PENDING) {
+            fprintf(stderr, "WSARecv error: %d\n", err);
+            return err;
+        }
+    }
+    return 0;
+}
+
+static void on_acceptex_complete(hio_t* io) {
+    printd("on_acceptex_complete------\n");
+    hoverlapped_t* hovlp = (hoverlapped_t*)io->hovlp;
+    int listenfd = io->fd;
+    int connfd = hovlp->fd;
+    LPFN_GETACCEPTEXSOCKADDRS GetAcceptExSockaddrs = NULL;
+    GUID guidGetAcceptExSockaddrs = WSAID_GETACCEPTEXSOCKADDRS;
+    DWORD dwbytes = 0;
+    if (WSAIoctl(connfd, SIO_GET_EXTENSION_FUNCTION_POINTER,
+        &guidGetAcceptExSockaddrs, sizeof(guidGetAcceptExSockaddrs),
+        &GetAcceptExSockaddrs, sizeof(GetAcceptExSockaddrs),
+        &dwbytes, NULL, NULL) != 0) {
+        return;
+    }
+    struct sockaddr* plocaladdr = NULL;
+    struct sockaddr* ppeeraddr = NULL;
+    socklen_t localaddrlen;
+    socklen_t peeraddrlen;
+    GetAcceptExSockaddrs(hovlp->buf.buf, 0, sizeof(SOCKADDR_IN) + 16, sizeof(SOCKADDR_IN) + 16,
+        &plocaladdr, &localaddrlen, &ppeeraddr, &peeraddrlen);
+    if (io->localaddr == NULL) {
+        io->localaddr = (struct sockaddr*)malloc(localaddrlen);
+    }
+    memcpy(io->localaddr, plocaladdr, localaddrlen);
+    if (io->peeraddr == NULL) {
+        io->peeraddr = (struct sockaddr*)malloc(peeraddrlen);
+    }
+    memcpy(io->peeraddr, ppeeraddr, peeraddrlen);
+    if (io->accept_cb) {
+        struct sockaddr_in* localaddr = (struct sockaddr_in*)io->localaddr;
+        struct sockaddr_in* peeraddr = (struct sockaddr_in*)io->peeraddr;
+        char localip[64];
+        char peerip[64];
+        inet_ntop(AF_INET, &localaddr->sin_addr, localip, sizeof(localip));
+        inet_ntop(AF_INET, &peeraddr->sin_addr, peerip, sizeof(peerip));
+        printd("accept listenfd=%d connfd=%d [%s:%d] <= [%s:%d]\n", listenfd, connfd,
+                localip, ntohs(localaddr->sin_port),
+                peerip, ntohs(peeraddr->sin_port));
+        setsockopt(connfd, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, (const char*)&listenfd, sizeof(int));
+        printd("accept_cb------\n");
+        io->accept_cb(io, connfd);
+        printd("accept_cb======\n");
+    }
+    post_acceptex(io, hovlp);
+}
+
+static void on_connectex_complete(hio_t* io) {
+    printd("on_connectex_complete------\n");
+    hoverlapped_t* hovlp = (hoverlapped_t*)io->hovlp;
+    int state = hovlp->error == 0 ? 1 : 0;
+    if (state == 1) {
+        if (io->localaddr == NULL) {
+            io->localaddr = (struct sockaddr*)malloc(sizeof(struct sockaddr_in));
+        }
+        if (io->peeraddr == NULL) {
+            io->peeraddr = (struct sockaddr*)malloc(sizeof(struct sockaddr_in));
+        }
+        setsockopt(io->fd, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0);
+        socklen_t addrlen = sizeof(struct sockaddr_in);
+        getsockname(io->fd, io->localaddr, &addrlen);
+        addrlen = sizeof(struct sockaddr_in);
+        getpeername(io->fd, io->peeraddr, &addrlen);
+        struct sockaddr_in* localaddr = (struct sockaddr_in*)io->localaddr;
+        struct sockaddr_in* peeraddr = (struct sockaddr_in*)io->peeraddr;
+        char localip[64];
+        char peerip[64];
+        inet_ntop(AF_INET, &localaddr->sin_addr, localip, sizeof(localip));
+        inet_ntop(AF_INET, &peeraddr->sin_addr, peerip, sizeof(peerip));
+        printd("connect connfd=%d [%s:%d] => [%s:%d]\n", io->fd,
+                localip, ntohs(localaddr->sin_port),
+                peerip, ntohs(peeraddr->sin_port));
+    }
+    else {
+        io->error = hovlp->error;
+    }
+    if (io->connect_cb) {
+        printd("connect_cb------\n");
+        io->connect_cb(io, state);
+        printd("connect_cb======\n");
+    }
+    SAFE_FREE(io->hovlp);
+    if (state == 0) {
+        hclose(io);
+    }
+}
+
+static void on_wsarecv_complete(hio_t* io) {
+    printd("on_recv_complete------\n");
+    hoverlapped_t* hovlp = (hoverlapped_t*)io->hovlp;
+    if (hovlp->bytes == 0) {
+        io->error = WSAGetLastError();
+        hclose(io);
+        return;
+    }
+
+    if (io->read_cb) {
+        printd("read_cb------\n");
+        io->read_cb(io, hovlp->buf.buf, hovlp->bytes);
+        printd("read_cb======\n");
+    }
+    if (!io->closed) {
+        post_recv(io, hovlp);
+    }
+}
+
+static void on_wsasend_complete(hio_t* io) {
+    printd("on_send_complete------\n");
+    hoverlapped_t* hovlp = (hoverlapped_t*)io->hovlp;
+    if (hovlp->bytes == 0) {
+        io->error = WSAGetLastError();
+        hclose(io);
+        goto end;
+    }
+    if (io->write_cb) {
+        printd("write_cb------\n");
+        io->write_cb(io, hovlp->buf.buf, hovlp->bytes);
+        printd("write_cb======\n");
+    }
+end:
+    if (io->hovlp) {
+        SAFE_FREE(hovlp->buf.buf);
+        free(io->hovlp);
+        io->hovlp = NULL;
+    }
+}
+
+static void hio_handle_events(hio_t* io) {
+    if ((io->events & READ_EVENT) && (io->revents & READ_EVENT)) {
+        if (io->accept) {
+            on_acceptex_complete(io);
+        }
+        else {
+            on_wsarecv_complete(io);
+        }
+    }
+
+    if ((io->events & WRITE_EVENT) && (io->revents & WRITE_EVENT)) {
+        // NOTE: WRITE_EVENT just do once
+        // ONESHOT
+        hio_del(io, WRITE_EVENT);
+        if (io->connect) {
+            io->connect = 0;
+
+            on_connectex_complete(io);
+        }
+        else {
+            on_wsasend_complete(io);
+        }
+    }
+
+    io->revents = 0;
+}
+
+hio_t* haccept  (hloop_t* loop, int listenfd, haccept_cb accept_cb) {
+    hio_t* io = hio_add(loop, hio_handle_events, listenfd, READ_EVENT);
+    if (io == NULL) return NULL;
+    if (accept_cb) {
+        io->accept_cb = accept_cb;
+    }
+    io->accept = 1;
+    nonblocking(listenfd);
+    for (int i = 0; i < ACCEPTEX_NUM; ++i) {
+        post_acceptex(io, NULL);
+    }
+    return io;
+}
+
+hio_t* hconnect (hloop_t* loop, const char* host, int port, hconnect_cb connect_cb) {
+    // gethostbyname -> socket -> bind -> nonblocking -> ConnectEx
+    struct sockaddr_in peeraddr;
+    socklen_t addrlen = sizeof(struct sockaddr_in);
+    memset(&peeraddr, 0, addrlen);
+    peeraddr.sin_family = AF_INET;
+    inet_pton(AF_INET, host, &peeraddr.sin_addr);
+    if (peeraddr.sin_addr.s_addr == INADDR_NONE) {
+        struct hostent* phe = gethostbyname(host);
+        if (phe == NULL)    return NULL;
+        memcpy(&peeraddr.sin_addr, phe->h_addr, phe->h_length);
+    }
+    peeraddr.sin_port = htons(port);
+    int connfd = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
+    if (connfd < 0) {
+        perror("socket");
+        return NULL;
+    }
+    // NOTE: ConnectEx must call bind
+    struct sockaddr_in localaddr;
+    memset(&localaddr, 0, addrlen);
+    localaddr.sin_family = AF_INET;
+    localaddr.sin_addr.s_addr = htonl(INADDR_ANY);
+    localaddr.sin_port = htons(0);
+    if (bind(connfd, (struct sockaddr*)&localaddr, addrlen) < 0) {
+        perror("bind");
+        closesocket(connfd);
+        return NULL;
+    }
+    nonblocking(connfd);
+    hio_t* io = hio_add(loop, hio_handle_events, connfd, WRITE_EVENT);
+    if (io == NULL) {
+        closesocket(connfd);
+        return NULL;
+    }
+    if (connect_cb) {
+        io->connect_cb = connect_cb;
+    }
+    io->connect = 1;
+    // ConnectEx
+    io->connectex = 1;
+    LPFN_CONNECTEX ConnectEx = NULL;
+    GUID guidConnectEx = WSAID_CONNECTEX;
+    DWORD dwbytes;
+    if (WSAIoctl(connfd, SIO_GET_EXTENSION_FUNCTION_POINTER,
+        &guidConnectEx, sizeof(guidConnectEx),
+        &ConnectEx, sizeof(ConnectEx),
+        &dwbytes, NULL, NULL) != 0) {
+        closesocket(connfd);
+        return NULL;
+    }
+    // NOTE: free on_connectex_complete
+    hoverlapped_t* hovlp = (hoverlapped_t*)malloc(sizeof(hoverlapped_t));
+    memset(hovlp, 0, sizeof(hoverlapped_t));
+    hovlp->fd = connfd;
+    hovlp->event = WRITE_EVENT;
+    hovlp->io = io;
+    if (ConnectEx(connfd, (struct sockaddr*)&peeraddr, addrlen, NULL, 0, &dwbytes, &hovlp->ovlp) != TRUE) {
+        int err = WSAGetLastError();
+        if (err != ERROR_IO_PENDING) {
+            fprintf(stderr, "AcceptEx error: %d\n", err);
+            return NULL;
+        }
+    }
+    return io;
+error:
+    closesocket(connfd);
+    return NULL;
+}
+
+hio_t* hread (hloop_t* loop, int fd, void* buf, size_t len, hread_cb read_cb) {
+    hio_t* io = hio_add(loop, hio_handle_events, fd, READ_EVENT);
+    if (io == NULL) return NULL;
+    io->readbuf.base = (char*)buf;
+    io->readbuf.len = len;
+    if (read_cb) {
+        io->read_cb = read_cb;
+    }
+    post_recv(io, NULL);
+    return io;
+}
+
+hio_t* hwrite (hloop_t* loop, int fd, const void* buf, size_t len, hwrite_cb write_cb) {
+    hio_t* io = hio_add(loop, hio_handle_events, fd, 0);
+    if (io == NULL) return NULL;
+    if (write_cb) {
+        io->write_cb = write_cb;
+    }
+    int nwrite = 0;
+try_send:
+    nwrite = send(fd, buf, len, 0);
+    //printd("write retval=%d\n", nwrite);
+    if (nwrite < 0) {
+        if (sockerrno == NIO_EAGAIN) {
+            nwrite = 0;
+            goto WSASend;
+        }
+        else {
+            perror("write");
+            io->error = sockerrno;
+            goto write_error;
+        }
+    }
+    if (nwrite == 0) {
+        goto disconnect;
+    }
+    if (write_cb) {
+        printd("try_write_cb------\n");
+        write_cb(io, buf, nwrite);
+        printd("try_write_cb======\n");
+    }
+    if (nwrite == len) {
+        //goto write_done;
+        return io;
+    }
+WSASend:
+    {
+        hoverlapped_t* hovlp = (hoverlapped_t*)malloc(sizeof(hoverlapped_t));
+        memset(hovlp, 0, sizeof(hoverlapped_t));
+        hovlp->fd = fd;
+        hovlp->event = WRITE_EVENT;
+        hovlp->buf.len = len - nwrite;
+        // NOTE: free on_send_complete
+        hovlp->buf.buf = (char*)malloc(hovlp->buf.len);
+        memcpy(hovlp->buf.buf, buf + nwrite, hovlp->buf.len);
+        hovlp->io = io;
+        DWORD dwbytes = 0;
+        DWORD flags = 0;
+        int ret = WSASend(fd, &hovlp->buf, 1, &dwbytes, flags, &hovlp->ovlp, NULL);
+        printd("WSASend ret=%d bytes=%u\n", ret, dwbytes);
+        if (ret != 0) {
+            int err = WSAGetLastError();
+            if (err != ERROR_IO_PENDING) {
+                fprintf(stderr, "WSASend error: %d\n", err);
+                return NULL;
+            }
+        }
+        hio_add(loop, hio_handle_events, fd, WRITE_EVENT);
+        return io;
+    }
+write_error:
+disconnect:
+    hclose(io);
+    return io;
+}
+
+void   hclose   (hio_t* io) {
+    //printd("close fd=%d\n", io->fd);
+    if (io->closed) return;
+#ifdef USE_DISCONNECTEX
+    // DisconnectEx reuse socket
+    if (io->connectex) {
+        io->connectex = 0;
+        LPFN_DISCONNECTEX DisconnectEx = NULL;
+        GUID guidDisconnectEx = WSAID_DISCONNECTEX;
+        DWORD dwbytes;
+        if (WSAIoctl(io->fd, SIO_GET_EXTENSION_FUNCTION_POINTER,
+            &guidDisconnectEx, sizeof(guidDisconnectEx),
+            &DisconnectEx, sizeof(DisconnectEx),
+            &dwbytes, NULL, NULL) != 0) {
+            return;
+        }
+        DisconnectEx(io->fd, NULL, 0, 0);
+    }
+#else
+    closesocket(io->fd);
+#endif
+    io->closed = 1;
+    if (io->hovlp) {
+        hoverlapped_t* hovlp = (hoverlapped_t*)io->hovlp;
+        // NOTE: hread buf provided by caller
+        if (hovlp->buf.buf != io->readbuf.base) {
+            SAFE_FREE(hovlp->buf.buf);
+        }
+        free(io->hovlp);
+        io->hovlp = NULL;
+    }
+    if (io->close_cb) {
+        printd("close_cb------\n");
+        io->close_cb(io);
+        printd("close_cb======\n");
+    }
+    hio_del(io, ALL_EVENTS);
+}
+
+#endif

+ 30 - 0
event/overlapio.h

@@ -0,0 +1,30 @@
+#ifndef HW_OVERLAPPED_H_
+#define HW_OVERLAPPED_H_
+
+#include "iowatcher.h"
+
+#ifdef EVENT_IOCP
+
+#include "hbuf.h"
+#include "hsocket.h"
+#include <mswsock.h>
+#ifdef _MSC_VER
+#pragma comment(lib, "mswsock.lib")
+#endif
+
+typedef struct hoverlapped_s {
+    OVERLAPPED  ovlp;
+    int         fd;
+    int         event;
+    WSABUF      buf;
+    int         bytes;
+    int         error;
+    hio_t*      io;
+} hoverlapped_t;
+
+int post_acceptex(hio_t* listenio, hoverlapped_t* hovlp);
+int post_recv(hio_t* io, hoverlapped_t* hovlp);
+
+#endif
+
+#endif // HW_OVERLAPPED_H_

+ 4 - 0
event/poll.c

@@ -4,6 +4,10 @@
 #include "hplatform.h"
 #include "hdef.h"
 
+#ifdef OS_WIN
+#define poll        WSAPoll
+#endif
+
 #ifdef OS_LINUX
 #include <sys/poll.h>
 #endif

+ 0 - 26
event/wsaio.c

@@ -1,26 +0,0 @@
-#include "iowatcher.h"
-
-#ifdef EVENT_IOCP
-#include "hplatform.h"
-
-hio_t* haccept  (hloop_t* loop, int listenfd, haccept_cb accept_cb) {
-    return NULL;
-}
-
-hio_t* hconnect (hloop_t* loop, int connfd, hconnect_cb connect_cb) {
-    return NULL;
-}
-
-hio_t* hread    (hloop_t* loop, int fd, void* buf, size_t len, hread_cb read_cb) {
-    return NULL;
-}
-
-hio_t* hwrite   (hloop_t* loop, int fd, const void* buf, size_t len, hwrite_cb cb) {
-    return NULL;
-}
-
-void   hclose   (hio_t* io) {
-
-}
-
-#endif

+ 9 - 16
examples/client.cpp

@@ -1,5 +1,4 @@
 #include "hloop.h"
-#include "hsocket.h"
 
 #define RECV_BUFSIZE    4096
 static char readbuf[RECV_BUFSIZE];
@@ -45,15 +44,11 @@ void on_connect(hio_t* io, int state) {
         printf("error=%d:%s\n", io->error, strerror(io->error));
         return;
     }
-    struct sockaddr_in localaddr, peeraddr;
-    socklen_t addrlen;
-    addrlen = sizeof(struct sockaddr_in);
-    getsockname(io->fd, (struct sockaddr*)&localaddr, &addrlen);
-    addrlen = sizeof(struct sockaddr_in);
-    getpeername(io->fd, (struct sockaddr*)&peeraddr, &addrlen);
+    struct sockaddr_in* localaddr = (struct sockaddr_in*)io->localaddr;
+    struct sockaddr_in* peeraddr = (struct sockaddr_in*)io->peeraddr;
     printf("connect connfd=%d [%s:%d] => [%s:%d]\n", io->fd,
-            inet_ntoa(localaddr.sin_addr), ntohs(localaddr.sin_port),
-            inet_ntoa(peeraddr.sin_addr), ntohs(peeraddr.sin_port));
+            inet_ntoa(localaddr->sin_addr), ntohs(localaddr->sin_port),
+            inet_ntoa(peeraddr->sin_addr), ntohs(peeraddr->sin_port));
 
     // NOTE: just on loop, readbuf can be shared.
     hio_t* iostdin = hread(io->loop, 0, readbuf, RECV_BUFSIZE, on_stdin);
@@ -74,17 +69,15 @@ int main(int argc, char** argv) {
     const char* host = argv[1];
     int port = atoi(argv[2]);
 
-    int connfd = Connect(host, port, 1);
-    printf("connfd=%d\n", connfd);
-    if (connfd < 0) {
-        return connfd;
-    }
-
     hloop_t loop;
     hloop_init(&loop);
     //hidle_add(&loop, on_idle, INFINITE);
     //htimer_add(&loop, on_timer, 1000, INFINITE);
-    hconnect(&loop, connfd, on_connect);
+    hio_t* io = hconnect(&loop, host, port, on_connect);
+    if (io == NULL) {
+        return -20;
+    }
+    printf("connfd=%d\n", io->fd);
     hloop_run(&loop);
 
     return 0;

+ 14 - 18
examples/server.cpp

@@ -1,5 +1,4 @@
 #include "hloop.h"
-#include "hsocket.h"
 
 #define RECV_BUFSIZE    4096
 static char readbuf[RECV_BUFSIZE];
@@ -32,17 +31,16 @@ void on_read(hio_t* io, void* buf, int readbytes) {
 
 void on_accept(hio_t* io, int connfd) {
     printf("on_accept listenfd=%d connfd=%d\n", io->fd, connfd);
-    struct sockaddr_in localaddr, peeraddr;
-    socklen_t addrlen;
-    addrlen = sizeof(struct sockaddr_in);
-    getsockname(connfd, (struct sockaddr*)&localaddr, &addrlen);
-    addrlen = sizeof(struct sockaddr_in);
-    getpeername(connfd, (struct sockaddr*)&peeraddr, &addrlen);
-    printf("accept connfd=%d [%s:%d] <= [%s:%d]\n", connfd,
-            inet_ntoa(localaddr.sin_addr), ntohs(localaddr.sin_port),
-            inet_ntoa(peeraddr.sin_addr), ntohs(peeraddr.sin_port));
+    struct sockaddr_in* localaddr = (struct sockaddr_in*)io->localaddr;
+    struct sockaddr_in* peeraddr = (struct sockaddr_in*)io->peeraddr;
+    char localip[64];
+    char peerip[64];
+    inet_ntop(AF_INET, &localaddr->sin_addr, localip, sizeof(localip));
+    inet_ntop(AF_INET, &peeraddr->sin_addr, peerip, sizeof(peerip));
+    printf("accept listenfd=%d connfd=%d [%s:%d] <= [%s:%d]\n", io->fd, connfd,
+            localip, ntohs(localaddr->sin_port),
+            peerip, ntohs(peeraddr->sin_port));
 
-    nonblocking(connfd);
     // one loop can use one readbuf
     hio_t* connio = hread(io->loop, connfd, readbuf, RECV_BUFSIZE, on_read);
     connio->close_cb = on_close;
@@ -55,16 +53,14 @@ int main(int argc, char** argv) {
     }
     int port = atoi(argv[1]);
 
-    int listenfd = Listen(port);
-    printf("listenfd=%d\n", listenfd);
-    if (listenfd < 0) {
-        return listenfd;
-    }
-
     hloop_t loop;
     hloop_init(&loop);
     //hidle_add(&loop, on_idle, INFINITE);
     //htimer_add(&loop, on_timer, 1000, INFINITE);
-    haccept(&loop, listenfd, on_accept);
+    hio_t* io = hlisten(&loop, port, on_accept);
+    if (io == NULL) {
+        return -20;
+    }
+    printf("listenfd=%d\n", io->fd);
     hloop_run(&loop);
 }

+ 26 - 13
http/server/http_server.cpp

@@ -218,25 +218,26 @@ static void on_close(hio_t* io) {
     if (hcu) {
         hlogi("%s", hcu->log.c_str());
         delete hcu;
+        io->userdata = NULL;
     }
 }
 
 static void on_accept(hio_t* io, int connfd) {
     //printf("on_accept listenfd=%d connfd=%d\n", io->fd, connfd);
-    struct sockaddr_in localaddr, peeraddr;
-    socklen_t addrlen;
-    addrlen = sizeof(struct sockaddr_in);
-    getsockname(connfd, (struct sockaddr*)&localaddr, &addrlen);
-    addrlen = sizeof(struct sockaddr_in);
-    getpeername(connfd, (struct sockaddr*)&peeraddr, &addrlen);
-    //printf("accept connfd=%d [%s:%d] <= [%s:%d]\n", connfd,
-            //inet_ntoa(localaddr.sin_addr), ntohs(localaddr.sin_port),
-            //inet_ntoa(peeraddr.sin_addr), ntohs(peeraddr.sin_port));
+    struct sockaddr_in* localaddr = (struct sockaddr_in*)io->localaddr;
+    struct sockaddr_in* peeraddr = (struct sockaddr_in*)io->peeraddr;
+    //char localip[64];
+    char peerip[64];
+    //inet_ntop(AF_INET, &localaddr->sin_addr, localip, sizeof(localip));
+    inet_ntop(AF_INET, &peeraddr->sin_addr, peerip, sizeof(peerip));
+    //printd("accept listenfd=%d connfd=%d [%s:%d] <= [%s:%d]\n", io->fd, connfd,
+            //localip, ntohs(localaddr->sin_port),
+            //peerip, ntohs(peeraddr->sin_port));
     // new http_connect_userdata
     // delete on_close
     http_connect_userdata* hcu = new http_connect_userdata;
     hcu->server = (http_server_t*)io->userdata;
-    hcu->log += asprintf("[%s:%d]", inet_ntoa(peeraddr.sin_addr), ntohs(peeraddr.sin_port));
+    hcu->log += asprintf("[%s:%d]", peerip, ntohs(peeraddr->sin_port));
 
     nonblocking(connfd);
     HBuf* buf = (HBuf*)io->loop->userdata;
@@ -266,21 +267,27 @@ void handle_cached_files(htimer_t* timer) {
     }
 }
 
+void fflush_log(hidle_t* idle) {
+    hlog_fflush();
+}
+
 static void worker_proc(void* userdata) {
     http_server_t* server = (http_server_t*)userdata;
     int listenfd = server->listenfd;
     hloop_t loop;
     hloop_init(&loop);
-    htimer_t* timer = htimer_add(&loop, handle_cached_files, s_filecache.file_cached_time*1000);
-    timer->userdata = &s_filecache;
     // one loop one readbuf.
     HBuf readbuf;
     readbuf.resize(RECV_BUFSIZE);
     loop.userdata = &readbuf;
     hio_t* listenio = haccept(&loop, listenfd, on_accept);
     listenio->userdata = server;
-    // disable fflush
+    // fflush logfile when idle
     hlog_set_fflush(0);
+    hidle_add(&loop, fflush_log, INFINITE);
+    // timer handle_cached_files
+    htimer_t* timer = htimer_add(&loop, handle_cached_files, s_filecache.file_cached_time*1000);
+    timer->userdata = &s_filecache;
     hloop_run(&loop);
 }
 
@@ -297,6 +304,12 @@ int http_server_run(http_server_t* server, int wait) {
     server->listenfd = Listen(server->port);
     if (server->listenfd < 0) return server->listenfd;
 
+#ifdef OS_WIN
+    if (server->worker_processes > 1) {
+        server->worker_processes = 1;
+    }
+#endif
+
     if (server->worker_processes == 0) {
         worker_proc(server);
     }