浏览代码

add nio udp

ithewei 6 年之前
父节点
当前提交
b71faa3284
共有 14 个文件被更改,包括 523 次插入404 次删除
  1. 11 6
      Makefile
  2. 4 2
      README.md
  3. 41 33
      base/hsocket.c
  4. 13 2
      base/hsocket.h
  5. 15 0
      event/hio.h
  6. 231 12
      event/hloop.c
  7. 50 11
      event/hloop.h
  8. 18 0
      event/iowatcher.h
  9. 77 85
      event/nio.c
  10. 3 3
      event/nlog.c
  11. 53 91
      event/overlapio.c
  12. 0 88
      examples/client.cpp
  13. 0 63
      examples/server.cpp
  14. 7 8
      http/server/http_server.cpp

+ 11 - 6
Makefile

@@ -3,7 +3,7 @@ TMPDIR=tmp
 
 default: all
 
-all: test ping loop client server httpd
+all: test ping loop tcp udp nc httpd
 
 clean:
 	$(MAKEF) clean SRCDIRS=". base utils event http http/client http/server examples $(TMPDIR)"
@@ -25,14 +25,19 @@ loop: prepare
 	cp examples/loop.c $(TMPDIR)
 	$(MAKEF) TARGET=$@ SRCDIRS=". base event $(TMPDIR)"
 
-client: prepare
+tcp: prepare
 	-rm $(TMPDIR)/*.o $(TMPDIR)/*.h $(TMPDIR)/*.c $(TMPDIR)/*.cpp
-	cp examples/client.cpp $(TMPDIR)
+	cp examples/tcp.c $(TMPDIR)
 	$(MAKEF) TARGET=$@ SRCDIRS=". base event $(TMPDIR)"
 
-server: prepare
+udp: prepare
 	-rm $(TMPDIR)/*.o $(TMPDIR)/*.h $(TMPDIR)/*.c $(TMPDIR)/*.cpp
-	cp examples/server.cpp $(TMPDIR)
+	cp examples/udp.c $(TMPDIR)
+	$(MAKEF) TARGET=$@ SRCDIRS=". base event $(TMPDIR)"
+
+nc: prepare
+	-rm $(TMPDIR)/*.o $(TMPDIR)/*.h $(TMPDIR)/*.c $(TMPDIR)/*.cpp
+	cp examples/nc.c $(TMPDIR)
 	$(MAKEF) TARGET=$@ SRCDIRS=". base event $(TMPDIR)"
 
 httpd: prepare
@@ -50,4 +55,4 @@ CURL_SRCS    += examples/curl.cpp base/hstring.cpp base/hbase.c
 curl:
 	$(MAKEF) TARGET=$@ SRCDIRS="$(CURL_SRCDIRS)" INCDIRS="$(CURL_INCDIRS)" SRCS="$(CURL_SRCS)" DEFINES="CURL_STATICLIB" LIBS="curl"
 
-.PHONY: clean prepare test ping loop client server httpd webbench curl
+.PHONY: clean prepare test ping loop tcp udp nc httpd webbench curl

+ 4 - 2
README.md

@@ -1,6 +1,6 @@
 ## Intro
 
-hw 是一套跨平台c++工具集,类名以H开头
+hw 是一套跨平台c/c++基础组件,函数名/类名以h/H开头
 
 ## platform
 
@@ -61,7 +61,9 @@ hw 是一套跨平台c++工具集,类名以H开头
 - make all
 - make test: 服务端master-workers model
 - make loop: 事件循环(包含timer、io、idle)
-- make client server:非阻塞socket
+- make tcp:  tcp server
+- make udp:  udp server
+- make nc:   network client
 - make httpd: http服务(包含web service和api service)
 - make curl: 基于libcurl封装http客户端
 - make webbench: http服务压力测试程序

+ 41 - 33
base/hsocket.c

@@ -17,10 +17,10 @@ char *socket_strerror(int err) {
 #endif
 }
 
-int Listen(int port) {
-    // socket -> setsockopt -> bind -> listen
-    int listenfd = socket(AF_INET, SOCK_STREAM, 0);
-    if (listenfd < 0) {
+int Bind(int port, int type) {
+    // socket -> setsockopt -> bind
+    int sockfd = socket(AF_INET, type, 0);
+    if (sockfd < 0) {
         perror("socket");
         return -socket_errno();
     }
@@ -28,7 +28,7 @@ int Listen(int port) {
     socklen_t addrlen = sizeof(localaddr);
     // NOTE: SO_REUSEADDR means that you can reuse sockaddr of TIME_WAIT status
     int reuseaddr = 1;
-    if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, (const char*)&reuseaddr, sizeof(int)) < 0) {
+    if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, (const char*)&reuseaddr, sizeof(int)) < 0) {
         perror("setsockopt");
         goto error;
     }
@@ -36,34 +36,52 @@ int Listen(int port) {
     localaddr.sin_family = AF_INET;
     localaddr.sin_addr.s_addr = htonl(INADDR_ANY);
     localaddr.sin_port = htons(port);
-    if (bind(listenfd, (struct sockaddr*)&localaddr, addrlen) < 0) {
+    if (bind(sockfd, (struct sockaddr*)&localaddr, addrlen) < 0) {
         perror("bind");
         goto error;
     }
-    if (listen(listenfd, SOMAXCONN) < 0) {
+    return sockfd;
+error:
+    closesocket(sockfd);
+    return socket_errno() > 0 ? -socket_errno() : -1;
+}
+
+int Listen(int port) {
+    int sockfd = Bind(port, SOCK_STREAM);
+    if (sockfd < 0) return sockfd;
+    if (listen(sockfd, SOMAXCONN) < 0) {
         perror("listen");
         goto error;
     }
-    return listenfd;
+    return sockfd;
 error:
-    closesocket(listenfd);
+    closesocket(sockfd);
     return socket_errno() > 0 ? -socket_errno() : -1;
 }
 
+int Resolver(const char* host, struct sockaddr* addr) {
+    // IPv4
+    struct sockaddr_in* addr4 = (struct sockaddr_in*)addr;
+    addr4->sin_family = AF_INET;
+    if (inet_pton(AF_INET, host, &addr4->sin_addr) == 1) {
+        return 0; // host is ip, so easy ;)
+    }
+    struct hostent* phe = gethostbyname(host);
+    if (phe == NULL) {
+        printd("unknown host %s\n", host);
+        return -h_errno;
+    }
+    memcpy(&addr4->sin_addr, phe->h_addr_list[0], phe->h_length);
+    return 0;
+}
+
 int Connect(const char* host, int port, int nonblock) {
-    // gethostbyname -> socket -> nonblocking -> connect
+    // Resolver -> socket -> nonblocking -> connect
     struct sockaddr_in peeraddr;
     socklen_t addrlen = sizeof(peeraddr);
     memset(&peeraddr, 0, addrlen);
-    peeraddr.sin_family = AF_INET;
-    inet_pton(peeraddr.sin_family, host, &peeraddr.sin_addr);
-    if (peeraddr.sin_addr.s_addr == 0 ||
-        peeraddr.sin_addr.s_addr == INADDR_NONE) {
-        struct hostent* phe = gethostbyname(host);
-        if (phe == NULL)    return -h_errno;
-        peeraddr.sin_family = phe->h_addrtype;
-        memcpy(&peeraddr.sin_addr, phe->h_addr_list[0], phe->h_length);
-    }
+    int ret = Resolver(host, (struct sockaddr*)&peeraddr);
+    if (ret != 0) return ret;
     peeraddr.sin_port = htons(port);
     int connfd = socket(AF_INET, SOCK_STREAM, 0);
     if (connfd < 0) {
@@ -73,7 +91,7 @@ int Connect(const char* host, int port, int nonblock) {
     if (nonblock) {
         nonblocking(connfd);
     }
-    int ret = connect(connfd, (struct sockaddr*)&peeraddr, addrlen);
+    ret = connect(connfd, (struct sockaddr*)&peeraddr, addrlen);
 #ifdef OS_WIN
     if (ret < 0 && socket_errno() != WSAEWOULDBLOCK) {
 #else
@@ -114,18 +132,8 @@ int Ping(const char* host, int cnt) {
     struct sockaddr_in peeraddr;
     socklen_t addrlen = sizeof(peeraddr);
     memset(&peeraddr, 0, addrlen);
-    peeraddr.sin_family = AF_INET;
-    inet_pton(peeraddr.sin_family, host, &peeraddr.sin_addr);
-    if (peeraddr.sin_addr.s_addr == 0 ||
-        peeraddr.sin_addr.s_addr == INADDR_NONE) {
-        struct hostent* phe = gethostbyname(host);
-        if (phe == NULL) {
-            printd("unknown host %s\n", host);
-            return -h_errno;
-        }
-        peeraddr.sin_family = phe->h_addrtype;
-        memcpy(&peeraddr.sin_addr, phe->h_addr_list[0], phe->h_length);
-    }
+    int ret = Resolver(host, (struct sockaddr*)&peeraddr);
+    if (ret != 0) return ret;
     inet_ntop(peeraddr.sin_family, &peeraddr.sin_addr, ip, sizeof(ip));
     int sockfd = socket(AF_INET, SOCK_RAW, IPPROTO_ICMP);
     if (sockfd < 0) {
@@ -137,7 +145,7 @@ int Ping(const char* host, int cnt) {
     }
 
     timeout = PING_TIMEOUT;
-    int ret = so_sndtimeo(sockfd, timeout);
+    ret = so_sndtimeo(sockfd, timeout);
     if (ret < 0) {
         perror("setsockopt");
         goto error;

+ 13 - 2
base/hsocket.h

@@ -19,11 +19,20 @@ static inline int socket_errno() {
 }
 char* socket_strerror(int err);
 
-// socket -> setsockopt -> bind -> listen
+// socket -> setsockopt -> bind
+// @param type: SOCK_STREAM(tcp) SOCK_DGRAM(udp)
+// @return sockfd
+int Bind(int port, int type DEFAULT(SOCK_STREAM));
+
+// Bind -> listen
 // @return sockfd
 int Listen(int port);
 
-// gethostbyname -> socket -> nonblocking -> connect
+// @param host: domain or ip
+// @retval 0:succeed
+int Resolver(const char* host, struct sockaddr* addr);
+
+// Resolver -> socket -> nonblocking -> connect
 // @return sockfd
 int Connect(const char* host, int port, int nonblock DEFAULT(0));
 
@@ -47,6 +56,8 @@ static inline int nonblocking(int sockfd) {
 #define EAGAIN      WSAEWOULDBLOCK
 #undef  EINPROGRESS
 #define EINPROGRESS WSAEINPROGRESS
+#undef  ENOTSOCK
+#define ENOTSOCK    WSAENOTSOCK
 #else
 #define blocking(s)     fcntl(s, F_SETFL, fcntl(s, F_GETFL) & ~O_NONBLOCK)
 #define nonblocking(s)  fcntl(s, F_SETFL, fcntl(s, F_GETFL) |  O_NONBLOCK)

+ 15 - 0
event/hio.h

@@ -0,0 +1,15 @@
+#ifndef HW_HIO_H_
+#define HW_HIO_H_
+
+#include "hloop.h"
+
+//int hio_read (hio_t* io, void* buf, size_t len);
+//@see io->readbuf
+int hio_read (hio_t* io);
+int hio_write(hio_t* io, const void* buf, size_t len);
+int hio_close(hio_t* io);
+
+int hio_accept (hio_t* io);
+int hio_connect(hio_t* io);
+
+#endif // HW_HIO_H_

+ 231 - 12
event/hloop.c

@@ -1,16 +1,17 @@
 #include "hloop.h"
 #include "hevent.h"
+#include "hio.h"
 #include "iowatcher.h"
 
 #include "hdef.h"
 #include "hlog.h"
 #include "hmath.h"
+#include "hsocket.h"
 
 #define PAUSE_TIME              10          // ms
 #define MAX_BLOCK_TIME          1000        // ms
 
 #define IO_ARRAY_INIT_SIZE      64
-
 static void hio_init(hio_t* io);
 static void hio_deinit(hio_t* io);
 static void hio_reset(hio_t* io);
@@ -332,7 +333,11 @@ void hio_init(hio_t* io) {
 }
 
 void hio_reset(hio_t* io) {
-    io->accept = io->connect = io->closed = 0;
+    io->closed = 0;
+    io->accept = io->connect = io->connectex = 0;
+    io->recv = io->send = 0;
+    io->recvfrom = io->sendto = 0;
+    io->io_type = HIO_TYPE_UNKNOWN;
     io->error = 0;
     io->events = io->revents = 0;
     io->read_cb = NULL;
@@ -362,8 +367,53 @@ void hio_free(hio_t* io) {
     SAFE_FREE(io);
 }
 
-hio_t* hio_add(hloop_t* loop, hio_cb cb, int fd, int events) {
-    printd("hio_add fd=%d events=%d\n", fd, events);
+static void fill_io_type(hio_t* io) {
+    int type = 0;
+    socklen_t optlen = sizeof(int);
+    int ret = getsockopt(io->fd, SOL_SOCKET, SO_TYPE, (char*)&type, &optlen);
+    printd("getsockopt SO_TYPE fd=%d ret=%d type=%d errno=%d\n", io->fd, ret, type, socket_errno());
+    if (ret == 0) {
+        switch (type) {
+        case SOCK_STREAM:   io->io_type = HIO_TYPE_TCP; break;
+        case SOCK_DGRAM:    io->io_type = HIO_TYPE_UDP; break;
+        case SOCK_RAW:      io->io_type = HIO_TYPE_IP;  break;
+        default: io->io_type = HIO_TYPE_SOCKET;         break;
+        }
+        // nonblocking
+        nonblocking(io->fd);
+        // fill io->localaddr io->peeraddr
+        if (io->localaddr == NULL) {
+            SAFE_ALLOC(io->localaddr, sizeof(struct sockaddr_in6));
+        }
+        if (io->peeraddr == NULL) {
+            io->peeraddrlen = sizeof(struct sockaddr_in6);
+            SAFE_ALLOC(io->peeraddr, sizeof(struct sockaddr_in6));
+        }
+        socklen_t addrlen = sizeof(struct sockaddr_in6);
+        ret = getsockname(io->fd, io->localaddr, &addrlen);
+        printd("getsockname fd=%d ret=%d errno=%d\n", io->fd, ret, socket_errno());
+        // NOTE:
+        // tcp_server peeraddr set by accept
+        // udp_server peeraddr set by recvfrom
+        // tcp_client/udp_client peeraddr set by hio_setpeeraddr
+        if (io->io_type == HIO_TYPE_TCP) {
+            // tcp acceptfd
+            addrlen = sizeof(struct sockaddr_in6);
+            ret = getpeername(io->fd, io->peeraddr, &addrlen);
+            printd("getpeername fd=%d ret=%d errno=%d\n", io->fd, ret, socket_errno());
+        }
+    }
+    else if (socket_errno() == ENOTSOCK) {
+        switch (io->fd) {
+        case 0: io->io_type = HIO_TYPE_STDIN;   break;
+        case 1: io->io_type = HIO_TYPE_STDOUT;  break;
+        case 2: io->io_type = HIO_TYPE_STDERR;  break;
+        default: io->io_type = HIO_TYPE_FILE;   break;
+        }
+    }
+}
+
+hio_t* hio_get(hloop_t* loop, int fd) {
     if (loop->ios.maxsize == 0) {
         io_array_init(&loop->ios, IO_ARRAY_INIT_SIZE);
     }
@@ -376,8 +426,10 @@ hio_t* hio_add(hloop_t* loop, hio_cb cb, int fd, int events) {
     hio_t* io = loop->ios.ptr[fd];
     if (io == NULL) {
         SAFE_ALLOC_SIZEOF(io);
-        loop->ios.ptr[fd] = io;
         hio_init(io);
+        io->loop = loop;
+        io->fd = fd;
+        loop->ios.ptr[fd] = io;
     }
 
     if (io->destroy) {
@@ -385,23 +437,38 @@ hio_t* hio_add(hloop_t* loop, hio_cb cb, int fd, int events) {
         hio_reset(io);
     }
 
+    if (io->io_type == HIO_TYPE_UNKNOWN) {
+        // NOTE: fill io_type: this is important
+        fill_io_type(io);
+    }
+
+    return io;
+}
+
+int hio_add(hio_t* io, hio_cb cb, int events) {
+    printd("hio_add fd=%d events=%d\n", io->fd, events);
+    hloop_t* loop = io->loop;
     if (!io->active) {
         EVENT_ADD(loop, io, cb);
         loop->nios++;
+        // NOTE: fill io_type: this is important
+        if (io->io_type == HIO_TYPE_UNKNOWN) {
+            fill_io_type(io);
+        }
     }
 
-    io->fd = fd;
     if (cb) {
         io->cb = (hevent_cb)cb;
     }
-    iowatcher_add_event(loop, fd, events);
+
+    iowatcher_add_event(loop, io->fd, events);
     io->events |= events;
-    return io;
+    return 0;
 }
 
-void hio_del(hio_t* io, int events) {
+int hio_del(hio_t* io, int events) {
     printd("hio_del fd=%d io->events=%d events=%d\n", io->fd, io->events, events);
-    if (io->destroy) return;
+    if (io->destroy) return 0;
     iowatcher_del_event(io->loop, io->fd, events);
     io->events &= ~events;
     if (io->events == 0) {
@@ -409,10 +476,82 @@ void hio_del(hio_t* io, int events) {
         EVENT_DEL(io);
         hio_deinit(io);
     }
+    return 0;
 }
 
-#include "hsocket.h"
-hio_t* hlisten (hloop_t* loop, int port, haccept_cb accept_cb) {
+void hio_setlocaladdr(hio_t* io, struct sockaddr* addr, int addrlen) {
+    if (io->localaddr == NULL) {
+        SAFE_ALLOC(io->localaddr, sizeof(struct sockaddr_in6));
+    }
+    memcpy(io->localaddr, addr, addrlen);
+}
+
+void hio_setpeeraddr (hio_t* io, struct sockaddr* addr, int addrlen) {
+    if (io->peeraddr == NULL) {
+        io->peeraddrlen = sizeof(struct sockaddr_in6);
+        SAFE_ALLOC(io->peeraddr, sizeof(struct sockaddr_in6));
+    }
+    memcpy(io->peeraddr, addr, addrlen);
+}
+
+hio_t* hread(hloop_t* loop, int fd, void* buf, size_t len, hread_cb read_cb) {
+    hio_t* io = hio_get(loop, fd);
+    if (io == NULL) return NULL;
+    io->readbuf.base = (char*)buf;
+    io->readbuf.len = len;
+    if (read_cb) {
+        io->read_cb = read_cb;
+    }
+    hio_read(io);
+    return io;
+}
+
+hio_t* hwrite(hloop_t* loop, int fd, const void* buf, size_t len, hwrite_cb write_cb) {
+    hio_t* io = hio_get(loop, fd);
+    if (io == NULL) return NULL;
+    if (write_cb) {
+        io->write_cb = write_cb;
+    }
+    hio_write(io, buf, len);
+    return io;
+}
+
+void hclose(hio_t* io) {
+    printd("close fd=%d\n", io->fd);
+    if (io->closed) return;
+    io->closed = 1;
+    hio_close(io);
+    if (io->close_cb) {
+        printd("close_cb------\n");
+        io->close_cb(io);
+        printd("close_cb======\n");
+    }
+    hio_del(io, ALL_EVENTS);
+}
+
+hio_t* haccept(hloop_t* loop, int listenfd, haccept_cb accept_cb) {
+    hio_t* io = hio_get(loop, listenfd);
+    if (io == NULL) return NULL;
+    io->accept = 1;
+    if (accept_cb) {
+        io->accept_cb = accept_cb;
+    }
+    hio_accept(io);
+    return io;
+}
+
+hio_t* hconnect (hloop_t* loop, int connfd, hconnect_cb connect_cb) {
+    hio_t* io = hio_get(loop, connfd);
+    if (io == NULL) return NULL;
+    io->connect = 1;
+    if (connect_cb) {
+        io->connect_cb = connect_cb;
+    }
+    hio_connect(io);
+    return io;
+}
+
+hio_t* create_tcp_server (hloop_t* loop, int port, haccept_cb accept_cb) {
     int listenfd = Listen(port);
     if (listenfd < 0) {
         return NULL;
@@ -423,3 +562,83 @@ hio_t* hlisten (hloop_t* loop, int port, haccept_cb accept_cb) {
     }
     return io;
 }
+
+hio_t* create_tcp_client (hloop_t* loop, const char* host, int port, hconnect_cb connect_cb) {
+    struct sockaddr_in addr;
+    socklen_t addrlen = sizeof(addr);
+    memset(&addr, 0, addrlen);
+    addr.sin_family = AF_INET;
+    int ret = Resolver(host, (struct sockaddr*)&addr);
+    if (ret != 0) return NULL;
+    addr.sin_port = htons(port);
+    int connfd = socket(AF_INET, SOCK_STREAM, 0);
+    if (connfd < 0) {
+        perror("socket");
+        return NULL;
+    }
+    hio_t* io = hio_get(loop, connfd);
+    if (io == NULL) return NULL;
+    hio_setpeeraddr(io, (struct sockaddr*)&addr, addrlen);
+    hconnect(loop, connfd, connect_cb);
+    return io;
+}
+
+hio_t* hrecv (hloop_t* loop, int connfd, void* buf, size_t len, hread_cb read_cb) {
+    hio_t* io = hio_get(loop, connfd);
+    if (io == NULL) return NULL;
+    io->recv = 1;
+    return hread(loop, connfd, buf, len, read_cb);
+}
+
+hio_t* hsend (hloop_t* loop, int connfd, const void* buf, size_t len, hwrite_cb write_cb) {
+    hio_t* io = hio_get(loop, connfd);
+    if (io == NULL) return NULL;
+    io->send = 1;
+    return hwrite(loop, connfd, buf, len, write_cb);
+}
+
+hio_t* hrecvfrom (hloop_t* loop, int sockfd, void* buf, size_t len, hread_cb read_cb) {
+    hio_t* io = hio_get(loop, sockfd);
+    if (io == NULL) return NULL;
+    io->recvfrom = 1;
+    return hread(loop, sockfd, buf, len, read_cb);
+}
+
+hio_t* hsendto (hloop_t* loop, int sockfd, const void* buf, size_t len, hwrite_cb write_cb) {
+    hio_t* io = hio_get(loop, sockfd);
+    if (io == NULL) return NULL;
+    io->sendto = 1;
+    return hwrite(loop, sockfd, buf, len, write_cb);
+}
+
+// @server: socket -> bind -> hrecvfrom
+hio_t* create_udp_server(hloop_t* loop, int port) {
+    int bindfd = Bind(port, SOCK_DGRAM);
+    if (bindfd < 0) {
+        return NULL;
+    }
+    return hio_get(loop, bindfd);
+}
+
+// @client: Resolver -> socket -> hio_get -> hio_setpeeraddr
+hio_t* create_udp_client(hloop_t* loop, const char* host, int port) {
+    // IPv4
+    struct sockaddr_in peeraddr;
+    socklen_t addrlen = sizeof(peeraddr);
+    memset(&peeraddr, 0, addrlen);
+    peeraddr.sin_family = AF_INET;
+    int ret = Resolver(host, (struct sockaddr*)&peeraddr);
+    if (ret != 0) return NULL;
+    peeraddr.sin_port = htons(port);
+
+    int sockfd = socket(AF_INET, SOCK_DGRAM, 0);
+    if (sockfd < 0) {
+        perror("socket");
+        return NULL;
+    }
+
+    hio_t* io = hio_get(loop, sockfd);
+    if (io == NULL) return NULL;
+    hio_setpeeraddr(io, (struct sockaddr*)&peeraddr, addrlen);
+    return io;
+}

+ 50 - 11
event/hloop.h

@@ -1,6 +1,5 @@
 #ifndef HW_LOOP_H_
-#define HW_LOOP_H_
-
+#define HW_LOOP_H_ 
 #include "hdef.h"
 
 BEGIN_EXTERN_C
@@ -101,18 +100,37 @@ struct hperiod_s {
 
 QUEUE_DECL(offset_buf_t, write_queue);
 
+typedef enum {
+    HIO_TYPE_UNKNOWN = 0,
+    HIO_TYPE_STDIN   = 0x00000001,
+    HIO_TYPE_STDOUT  = 0x00000002,
+    HIO_TYPE_STDERR  = 0x00000004,
+    HIO_TYPE_STDIO   = HIO_TYPE_STDIN|HIO_TYPE_STDOUT|HIO_TYPE_STDERR,
+    HIO_TYPE_FILE    = 0x00000010,
+    HIO_TYPE_TCP     = 0x00000100,
+    HIO_TYPE_UDP     = 0x00001000,
+    HIO_TYPE_IP      = 0x00010000,
+    HIO_TYPE_SOCKET  = HIO_TYPE_TCP|HIO_TYPE_UDP|HIO_TYPE_IP
+} hio_type_e;
+
 struct hio_s {
     HEVENT_FIELDS
+    unsigned    closed      :1;
     unsigned    accept      :1;
     unsigned    connect     :1;
-    unsigned    closed      :1;
     unsigned    connectex   :1; // for ConnectEx/DisconnectEx
+    unsigned    recv        :1;
+    unsigned    send        :1;
+    unsigned    recvfrom    :1;
+    unsigned    sendto      :1;
     int         fd;
+    hio_type_e  io_type;
     int         error;
     int         events;
     int         revents;
     struct sockaddr*    localaddr;
     struct sockaddr*    peeraddr;
+    int                 peeraddrlen;    // for WSARecvFrom
     hbuf_t              readbuf;        // for hread
     struct write_queue  write_queue;    // for hwrite
     // callbacks
@@ -205,22 +223,43 @@ void        htimer_del(htimer_t* timer);
 void        htimer_reset(htimer_t* timer);
 
 // io
-// frist level apis
+// low-level api
 #define READ_EVENT  0x0001
 #define WRITE_EVENT 0x0004
 #define ALL_EVENTS  READ_EVENT|WRITE_EVENT
-hio_t*      hio_add(hloop_t* loop, hio_cb cb, int fd, int events DEFAULT(READ_EVENT));
-void        hio_del(hio_t* io, int events DEFAULT(ALL_EVENTS));
+hio_t* hio_get(hloop_t* loop, int fd);
+int    hio_add(hio_t* io, hio_cb cb, int events DEFAULT(READ_EVENT));
+int    hio_del(hio_t* io, int events DEFAULT(ALL_EVENTS));
 
-// second level apis
-hio_t* haccept  (hloop_t* loop, int listenfd, haccept_cb accept_cb);
-// Listen => haccept
-hio_t* hlisten  (hloop_t* loop, int port,     haccept_cb accept_cb);
-hio_t* hconnect (hloop_t* loop, const char* host, int port, hconnect_cb connect_cb);
+void hio_setlocaladdr(hio_t* io, struct sockaddr* addr, int addrlen);
+void hio_setpeeraddr (hio_t* io, struct sockaddr* addr, int addrlen);
+
+// high-level api
 hio_t* hread    (hloop_t* loop, int fd, void* buf, size_t len, hread_cb read_cb);
 hio_t* hwrite   (hloop_t* loop, int fd, const void* buf, size_t len, hwrite_cb write_cb DEFAULT(NULL));
 void   hclose   (hio_t* io);
 
+// tcp
+hio_t* haccept  (hloop_t* loop, int listenfd, haccept_cb accept_cb);
+hio_t* hconnect (hloop_t* loop, int connfd,   hconnect_cb connect_cb);
+hio_t* hrecv    (hloop_t* loop, int connfd, void* buf, size_t len, hread_cb read_cb);
+hio_t* hsend    (hloop_t* loop, int connfd, const void* buf, size_t len, hwrite_cb write_cb DEFAULT(NULL));
+
+// @tcp_server: socket -> bind -> listen -> haccept
+hio_t* create_tcp_server (hloop_t* loop, int port, haccept_cb accept_cb);
+// @tcp_client: resolver -> socket -> hio_get -> hio_setpeeraddr -> hconnect
+hio_t* create_tcp_client (hloop_t* loop, const char* host, int port, hconnect_cb connect_cb);
+
+// udp/ip
+// NOTE: recvfrom/sendto struct sockaddr* addr save in io->peeraddr
+hio_t* hrecvfrom (hloop_t* loop, int sockfd, void* buf, size_t len, hread_cb read_cb);
+hio_t* hsendto   (hloop_t* loop, int sockfd, const void* buf, size_t len, hwrite_cb write_cb DEFAULT(NULL));
+
+// @udp_server: socket -> bind -> hio_get
+hio_t* create_udp_server(hloop_t* loop, int port);
+// @udp_client: resolver -> socket -> hio_get -> hio_setpeeraddr
+hio_t* create_udp_client(hloop_t* loop, const char* host, int port);
+
 END_EXTERN_C
 
 #endif // HW_LOOP_H_

+ 18 - 0
event/iowatcher.h

@@ -24,6 +24,24 @@
 #endif
 #endif
 
+static inline const char* iowatcher_name() {
+#ifdef EVENT_SELECT
+    return  "select";
+#elif defined(EVENT_POLL)
+    return  "poll";
+#elif defined(EVENT_EPOLL)
+    return  "epoll";
+#elif defined(EVENT_KQUEUE)
+    return  "kqueue";
+#elif defined(EVENT_IOCP)
+    return  "iocp";
+#elif defined(EVENT_PORT)
+    return  "evport";
+#else
+    return  "noevent";
+#endif
+}
+
 int iowatcher_init(hloop_t* loop);
 int iowatcher_cleanup(hloop_t* loop);
 int iowatcher_add_event(hloop_t* loop, int fd, int events);

+ 77 - 85
event/nio.c

@@ -1,16 +1,11 @@
 #include "iowatcher.h"
 #ifndef EVENT_IOCP
+#include "hio.h"
 #include "hsocket.h"
 
 static void nio_accept(hio_t* io) {
     //printd("nio_accept listenfd=%d\n", io->fd);
     socklen_t addrlen;
-    if (io->localaddr == NULL) {
-        SAFE_ALLOC(io->localaddr, sizeof(struct sockaddr_in6));
-    }
-    if (io->peeraddr == NULL) {
-        SAFE_ALLOC(io->peeraddr, sizeof(struct sockaddr_in6));
-    }
 accept:
     addrlen = sizeof(struct sockaddr_in6);
     int connfd = accept(io->fd, io->peeraddr, &addrlen);
@@ -25,9 +20,9 @@ accept:
             goto accept_error;
         }
     }
-
     addrlen = sizeof(struct sockaddr_in6);
     getsockname(connfd, io->localaddr, &addrlen);
+
     if (io->accept_cb) {
         char localaddrstr[INET6_ADDRSTRLEN+16] = {0};
         char peeraddrstr[INET6_ADDRSTRLEN+16] = {0};
@@ -48,16 +43,7 @@ accept_error:
 static void nio_connect(hio_t* io) {
     //printd("nio_connect connfd=%d\n", io->fd);
     int state = 0;
-    socklen_t addrlen;
-    if (io->localaddr == NULL) {
-        SAFE_ALLOC(io->localaddr, sizeof(struct sockaddr_in6));
-        addrlen = sizeof(struct sockaddr_in6);
-        getsockname(io->fd, io->localaddr, &addrlen);
-    }
-    if (io->peeraddr == NULL) {
-        SAFE_ALLOC(io->peeraddr, sizeof(struct sockaddr_in6));
-    }
-    addrlen = sizeof(struct sockaddr_in6);
+    socklen_t addrlen = sizeof(struct sockaddr_in6);
     int ret = getpeername(io->fd, io->peeraddr, &addrlen);
     if (ret < 0) {
         io->error = socket_errno();
@@ -65,6 +51,8 @@ static void nio_connect(hio_t* io) {
         state = 0;
     }
     else {
+        addrlen = sizeof(struct sockaddr_in6);
+        getsockname(io->fd, io->localaddr, &addrlen);
         char localaddrstr[INET6_ADDRSTRLEN+16] = {0};
         char peeraddrstr[INET6_ADDRSTRLEN+16] = {0};
         printd("connect connfd=%d [%s] => [%s]\n", io->fd,
@@ -89,11 +77,25 @@ static void nio_read(hio_t* io) {
     int   len = io->readbuf.len;
 read:
     memset(buf, 0, len);
+    switch (io->io_type) {
+    case HIO_TYPE_TCP:
 #ifdef OS_UNIX
-    nread = read(io->fd, buf, len);
+        nread = read(io->fd, buf, len);
 #else
-    nread = recv(io->fd, buf, len, 0);
+        nread = recv(io->fd, buf, len, 0);
 #endif
+        break;
+    case HIO_TYPE_UDP:
+    case HIO_TYPE_IP:
+    {
+        socklen_t addrlen = sizeof(struct sockaddr_in6);
+        nread = recvfrom(io->fd, buf, len, 0, io->peeraddr, &addrlen);
+    }
+        break;
+    default:
+        nread = read(io->fd, buf, len);
+        break;
+    }
     //printd("read retval=%d\n", nread);
     if (nread < 0) {
         if (socket_errno() == EAGAIN) {
@@ -134,11 +136,22 @@ write:
     offset_buf_t* pbuf = write_queue_front(&io->write_queue);
     char* buf = pbuf->base + pbuf->offset;
     int len = pbuf->len - pbuf->offset;
+    switch (io->io_type) {
+    case HIO_TYPE_TCP:
 #ifdef OS_UNIX
-    nwrite = write(io->fd, buf, len);
+        nwrite = write(io->fd, buf, len);
 #else
-    nwrite = send(io->fd, buf, len, 0);
+        nwrite = send(io->fd, buf, len, 0);
 #endif
+        break;
+    case HIO_TYPE_UDP:
+    case HIO_TYPE_IP:
+        nwrite = sendto(io->fd, buf, len, 0, io->peeraddr, sizeof(struct sockaddr_in6));
+        break;
+    default:
+        nwrite = write(io->fd, buf, len);
+        break;
+    }
     //printd("write retval=%d\n", nwrite);
     if (nwrite < 0) {
         if (socket_errno() == EAGAIN) {
@@ -186,77 +199,65 @@ static void hio_handle_events(hio_t* io) {
         if (io->connect) {
             // NOTE: connect just do once
             // ONESHOT
-            hio_del(io, WRITE_EVENT);
             io->connect = 0;
 
             nio_connect(io);
         }
         else {
             nio_write(io);
-            // NOTE: del WRITE_EVENT, if write_queue empty
-            if (write_queue_empty(&io->write_queue)) {
-                hio_del(io, WRITE_EVENT);
-            }
+        }
+        // NOTE: del WRITE_EVENT, if write_queue empty
+        if (write_queue_empty(&io->write_queue)) {
+            hio_del(io, WRITE_EVENT);
         }
     }
 
     io->revents = 0;
 }
 
-hio_t* haccept  (hloop_t* loop, int listenfd, haccept_cb accept_cb) {
-    hio_t* io = hio_add(loop, hio_handle_events, listenfd, READ_EVENT);
-    if (io == NULL) return NULL;
-    if (accept_cb) {
-        io->accept_cb = accept_cb;
-    }
-    io->accept = 1;
-    nonblocking(listenfd);
-    return io;
+int hio_accept(hio_t* io) {
+    hio_add(io, hio_handle_events, READ_EVENT);
+    return 0;
 }
 
-hio_t* hconnect (hloop_t* loop, const char* host, int port, hconnect_cb connect_cb) {
-    int connfd = Connect(host, port, 1);
-    if (connfd < 0) {
-        return NULL;
-    }
-    hio_t* io = hio_add(loop, hio_handle_events, connfd, WRITE_EVENT);
-    if (io == NULL) {
-        closesocket(connfd);
-        return NULL;
-    }
-    if (connect_cb) {
-        io->connect_cb = connect_cb;
+int hio_connect(hio_t* io) {
+    int ret = connect(io->fd, io->peeraddr, sizeof(struct sockaddr_in6));
+#ifdef OS_WIN
+    if (ret < 0 && socket_errno() != WSAEWOULDBLOCK) {
+#else
+    if (ret < 0 && socket_errno() != EINPROGRESS) {
+#endif
+        perror("connect");
+        hclose(io);
+        return ret;
     }
-    io->connect = 1;
-    nonblocking(connfd);
-    return io;
+    return hio_add(io, hio_handle_events, WRITE_EVENT);
 }
 
-hio_t* hread    (hloop_t* loop, int fd, void* buf, size_t len, hread_cb read_cb) {
-    hio_t* io = hio_add(loop, hio_handle_events, fd, READ_EVENT);
-    if (io == NULL) return NULL;
-    io->readbuf.base = (char*)buf;
-    io->readbuf.len = len;
-    if (read_cb) {
-        io->read_cb = read_cb;
-    }
-    return io;
+int hio_read (hio_t* io) {
+    return hio_add(io, hio_handle_events, READ_EVENT);
 }
 
-hio_t* hwrite   (hloop_t* loop, int fd, const void* buf, size_t len, hwrite_cb write_cb) {
-    hio_t* io = hio_add(loop, hio_handle_events, fd, 0);
-    if (io == NULL) return NULL;
-    if (write_cb) {
-        io->write_cb = write_cb;
-    }
+int hio_write (hio_t* io, const void* buf, size_t len) {
     int nwrite = 0;
     if (write_queue_empty(&io->write_queue)) {
 try_write:
+        switch (io->io_type) {
+        case HIO_TYPE_TCP:
 #ifdef OS_UNIX
-        nwrite = write(fd, buf, len);
+            nwrite = write(io->fd, buf, len);
 #else
-        nwrite = send(fd, buf, len, 0);
+            nwrite = send(io->fd, buf, len, 0);
 #endif
+            break;
+        case HIO_TYPE_UDP:
+        case HIO_TYPE_IP:
+            nwrite = sendto(io->fd, buf, len, 0, io->peeraddr, sizeof(struct sockaddr_in6));
+            break;
+        default:
+            nwrite = write(io->fd, buf, len);
+            break;
+        }
         //printd("write retval=%d\n", nwrite);
         if (nwrite < 0) {
             if (socket_errno() == EAGAIN) {
@@ -272,52 +273,43 @@ try_write:
         if (nwrite == 0) {
             goto disconnect;
         }
-        if (write_cb) {
+        if (io->write_cb) {
             printd("try_write_cb------\n");
-            write_cb(io, buf, nwrite);
+            io->write_cb(io, buf, nwrite);
             printd("try_write_cb======\n");
         }
         if (nwrite == len) {
             //goto write_done;
-            return io;
+            return nwrite;
         }
-        hio_add(loop, hio_handle_events, fd, WRITE_EVENT);
+        hio_add(io, hio_handle_events, WRITE_EVENT);
     }
 enqueue:
     if (nwrite < len) {
         offset_buf_t rest;
         rest.len = len;
         rest.offset = nwrite;
-        // NOTE: free in nio_write;
+        // NOTE: free in nio_write
         SAFE_ALLOC(rest.base, rest.len);
-        if (rest.base == NULL) return io;
         memcpy(rest.base, (char*)buf, rest.len);
         if (io->write_queue.maxsize == 0) {
             write_queue_init(&io->write_queue, 4);
         }
         write_queue_push_back(&io->write_queue, &rest);
     }
-    return io;
+    return nwrite;
 write_error:
 disconnect:
     hclose(io);
-    return io;
+    return nwrite;
 }
 
-void   hclose   (hio_t* io) {
-    //printd("close fd=%d\n", io->fd);
-    if (io->closed) return;
+int hio_close (hio_t* io) {
 #ifdef OS_UNIX
     close(io->fd);
 #else
     closesocket(io->fd);
 #endif
-    io->closed = 1;
-    if (io->close_cb) {
-        printd("close_cb------\n");
-        io->close_cb(io);
-        printd("close_cb======\n");
-    }
-    hio_del(io, ALL_EVENTS);
+    return 0;
 }
 #endif

+ 3 - 3
event/nlog.c

@@ -17,7 +17,7 @@ typedef struct nlog_client {
 
 static network_logger_t s_logger = {0};
 
-void on_close(hio_t* io) {
+static void on_close(hio_t* io) {
     printf("on_close fd=%d error=%d\n", io->fd, io->error);
     struct list_node* next = s_logger.clients.next;
     nlog_client* client;
@@ -33,7 +33,7 @@ void on_close(hio_t* io) {
     }
 }
 
-void on_read(hio_t* io, void* buf, int readbytes) {
+static void on_read(hio_t* io, void* buf, int readbytes) {
     printf("on_read fd=%d readbytes=%d\n", io->fd, readbytes);
     printf("< %s\n", buf);
     // nothing to do
@@ -70,7 +70,7 @@ void network_logger(int loglevel, const char* buf, int len) {
 hio_t* nlog_listen(hloop_t* loop, int port) {
     list_init(&s_logger.clients);
     s_logger.loop = loop;
-    s_logger.listenio = hlisten(loop, port, on_accept);
+    s_logger.listenio = create_tcp_server(loop, port, on_accept);
     return s_logger.listenio;
 }
 

+ 53 - 91
event/overlapio.c

@@ -50,7 +50,17 @@ int post_recv(hio_t* io, hoverlapped_t* hovlp) {
     memset(hovlp->buf.buf, 0, hovlp->buf.len);
     DWORD dwbytes = 0;
     DWORD flags = 0;
-    int ret = WSARecv(io->fd, &hovlp->buf, 1, &dwbytes, &flags, &hovlp->ovlp, NULL);
+    int ret = 0;
+    if (io->io_type == HIO_TYPE_TCP) {
+        ret = WSARecv(io->fd, &hovlp->buf, 1, &dwbytes, &flags, &hovlp->ovlp, NULL);
+    }
+    else if (io->io_type == HIO_TYPE_UDP ||
+            io->io_type == HIO_TYPE_IP) {
+        ret = WSARecvFrom(io->fd, &hovlp->buf, 1, &dwbytes, &flags, io->peeraddr, &io->peeraddrlen, &hovlp->ovlp, NULL);
+    }
+    else {
+        ret = -1;
+    }
     printd("WSARecv ret=%d bytes=%u\n", ret, dwbytes);
     if (ret != 0) {
         int err = WSAGetLastError();
@@ -82,13 +92,7 @@ static void on_acceptex_complete(hio_t* io) {
     socklen_t peeraddrlen;
     GetAcceptExSockaddrs(hovlp->buf.buf, 0, sizeof(struct sockaddr_in6), sizeof(struct sockaddr_in6),
         &plocaladdr, &localaddrlen, &ppeeraddr, &peeraddrlen);
-    if (io->localaddr == NULL) {
-        SAFE_ALLOC(io->localaddr, localaddrlen);
-    }
     memcpy(io->localaddr, plocaladdr, localaddrlen);
-    if (io->peeraddr == NULL) {
-        SAFE_ALLOC(io->peeraddr, peeraddrlen);
-    }
     memcpy(io->peeraddr, ppeeraddr, peeraddrlen);
     if (io->accept_cb) {
         char localaddrstr[INET6_ADDRSTRLEN+16] = {0};
@@ -109,12 +113,6 @@ static void on_connectex_complete(hio_t* io) {
     hoverlapped_t* hovlp = (hoverlapped_t*)io->hovlp;
     int state = hovlp->error == 0 ? 1 : 0;
     if (state == 1) {
-        if (io->localaddr == NULL) {
-            SAFE_ALLOC(io->localaddr, sizeof(struct sockaddr_in6));
-        }
-        if (io->peeraddr == NULL) {
-            SAFE_ALLOC(io->peeraddr, sizeof(struct sockaddr_in6));
-        }
         setsockopt(io->fd, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0);
         socklen_t addrlen = sizeof(struct sockaddr_in6);
         getsockname(io->fd, io->localaddr, &addrlen);
@@ -206,65 +204,31 @@ static void hio_handle_events(hio_t* io) {
     io->revents = 0;
 }
 
-hio_t* haccept  (hloop_t* loop, int listenfd, haccept_cb accept_cb) {
-    hio_t* io = hio_add(loop, hio_handle_events, listenfd, READ_EVENT);
-    if (io == NULL) return NULL;
-    if (accept_cb) {
-        io->accept_cb = accept_cb;
-    }
-    io->accept = 1;
-    nonblocking(listenfd);
+int hio_accept (hio_t* io) {
     for (int i = 0; i < ACCEPTEX_NUM; ++i) {
         post_acceptex(io, NULL);
     }
-    return io;
+    return hio_add(io, hio_handle_events, READ_EVENT);
 }
 
-hio_t* hconnect (hloop_t* loop, const char* host, int port, hconnect_cb connect_cb) {
-    // gethostbyname -> socket -> bind -> nonblocking -> ConnectEx
-    struct sockaddr_in peeraddr;
-    socklen_t addrlen = sizeof(struct sockaddr_in);
-    memset(&peeraddr, 0, addrlen);
-    peeraddr.sin_family = AF_INET;
-    inet_pton(peeraddr.sin_family, host, &peeraddr.sin_addr);
-    if (peeraddr.sin_addr.s_addr == 0 ||
-        peeraddr.sin_addr.s_addr == INADDR_NONE) {
-        struct hostent* phe = gethostbyname(host);
-        if (phe == NULL)    return NULL;
-        peeraddr.sin_family = phe->h_addrtype;
-        memcpy(&peeraddr.sin_addr, phe->h_addr_list[0], phe->h_length);
-    }
-    peeraddr.sin_port = htons(port);
-    int connfd = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
-    if (connfd < 0) {
-        perror("socket");
-        return NULL;
-    }
+int hio_connect (hio_t* io) {
     // NOTE: ConnectEx must call bind
     struct sockaddr_in localaddr;
+    socklen_t addrlen = sizeof(localaddr);
     memset(&localaddr, 0, addrlen);
     localaddr.sin_family = AF_INET;
     localaddr.sin_addr.s_addr = htonl(INADDR_ANY);
     localaddr.sin_port = htons(0);
-    if (bind(connfd, (struct sockaddr*)&localaddr, addrlen) < 0) {
+    if (bind(io->fd, (struct sockaddr*)&localaddr, addrlen) < 0) {
         perror("bind");
         goto error;
     }
-    nonblocking(connfd);
-    hio_t* io = hio_add(loop, hio_handle_events, connfd, WRITE_EVENT);
-    if (io == NULL) {
-        goto error;
-    }
-    if (connect_cb) {
-        io->connect_cb = connect_cb;
-    }
-    io->connect = 1;
     // ConnectEx
     io->connectex = 1;
     LPFN_CONNECTEX ConnectEx = NULL;
     GUID guidConnectEx = WSAID_CONNECTEX;
     DWORD dwbytes;
-    if (WSAIoctl(connfd, SIO_GET_EXTENSION_FUNCTION_POINTER,
+    if (WSAIoctl(io->fd, SIO_GET_EXTENSION_FUNCTION_POINTER,
         &guidConnectEx, sizeof(guidConnectEx),
         &ConnectEx, sizeof(ConnectEx),
         &dwbytes, NULL, NULL) != 0) {
@@ -273,43 +237,40 @@ hio_t* hconnect (hloop_t* loop, const char* host, int port, hconnect_cb connect_
     // NOTE: free on_connectex_complete
     hoverlapped_t* hovlp;
     SAFE_ALLOC_SIZEOF(hovlp);
-    hovlp->fd = connfd;
+    hovlp->fd = io->fd;
     hovlp->event = WRITE_EVENT;
     hovlp->io = io;
-    if (ConnectEx(connfd, (struct sockaddr*)&peeraddr, addrlen, NULL, 0, &dwbytes, &hovlp->ovlp) != TRUE) {
+    if (ConnectEx(io->fd, io->peeraddr, sizeof(struct sockaddr_in6), NULL, 0, &dwbytes, &hovlp->ovlp) != TRUE) {
         int err = WSAGetLastError();
         if (err != ERROR_IO_PENDING) {
             fprintf(stderr, "AcceptEx error: %d\n", err);
             goto error;
         }
     }
-    return io;
+    return hio_add(io, hio_handle_events, WRITE_EVENT);
 error:
-    closesocket(connfd);
-    return NULL;
+    hclose(io);
+    return 0;
 };
 
-hio_t* hread (hloop_t* loop, int fd, void* buf, size_t len, hread_cb read_cb) {
-    hio_t* io = hio_add(loop, hio_handle_events, fd, READ_EVENT);
-    if (io == NULL) return NULL;
-    io->readbuf.base = (char*)buf;
-    io->readbuf.len = len;
-    if (read_cb) {
-        io->read_cb = read_cb;
-    }
+int hio_read (hio_t* io) {
     post_recv(io, NULL);
-    return io;
+    return hio_add(io, hio_handle_events, READ_EVENT);
 }
 
-hio_t* hwrite (hloop_t* loop, int fd, const void* buf, size_t len, hwrite_cb write_cb) {
-    hio_t* io = hio_add(loop, hio_handle_events, fd, 0);
-    if (io == NULL) return NULL;
-    if (write_cb) {
-        io->write_cb = write_cb;
-    }
+int hio_write(hio_t* io, const void* buf, size_t len) {
     int nwrite = 0;
 try_send:
-    nwrite = send(fd, buf, len, 0);
+    if (io->io_type == HIO_TYPE_TCP) {
+        nwrite = send(io->fd, buf, len, 0);
+    }
+    else if (io->io_type == HIO_TYPE_UDP ||
+             io->io_type == HIO_TYPE_IP) {
+        nwrite = recvfrom(io->fd, buf, len, 0, io->peeraddr, &io->peeraddrlen);
+    }
+    else {
+        nwrite = -1;
+    }
     //printd("write retval=%d\n", nwrite);
     if (nwrite < 0) {
         if (socket_errno() == EAGAIN) {
@@ -325,20 +286,20 @@ try_send:
     if (nwrite == 0) {
         goto disconnect;
     }
-    if (write_cb) {
+    if (io->write_cb) {
         printd("try_write_cb------\n");
-        write_cb(io, buf, nwrite);
+        io->write_cb(io, buf, nwrite);
         printd("try_write_cb======\n");
     }
     if (nwrite == len) {
         //goto write_done;
-        return io;
+        return nwrite;
     }
 WSASend:
     {
         hoverlapped_t* hovlp;
         SAFE_ALLOC_SIZEOF(hovlp);
-        hovlp->fd = fd;
+        hovlp->fd = io->fd;
         hovlp->event = WRITE_EVENT;
         hovlp->buf.len = len - nwrite;
         // NOTE: free on_send_complete
@@ -347,7 +308,17 @@ WSASend:
         hovlp->io = io;
         DWORD dwbytes = 0;
         DWORD flags = 0;
-        int ret = WSASend(fd, &hovlp->buf, 1, &dwbytes, flags, &hovlp->ovlp, NULL);
+        int ret = 0;
+        if (io->io_type == HIO_TYPE_TCP) {
+            ret = WSASend(io->fd, &hovlp->buf, 1, &dwbytes, flags, &hovlp->ovlp, NULL);
+        }
+        else if (io->io_type == HIO_TYPE_UDP ||
+                 io->io_type == HIO_TYPE_IP) {
+            ret = WSASendTo(io->fd, &hovlp->buf, 1, &dwbytes, flags, io->peeraddr, io->peeraddrlen, &hovlp->ovlp, NULL);
+        }
+        else {
+            ret = -1;
+        }
         printd("WSASend ret=%d bytes=%u\n", ret, dwbytes);
         if (ret != 0) {
             int err = WSAGetLastError();
@@ -356,18 +327,16 @@ WSASend:
                 return NULL;
             }
         }
-        hio_add(loop, hio_handle_events, fd, WRITE_EVENT);
-        return io;
+        return hio_add(io, hio_handle_events, WRITE_EVENT);
     }
 write_error:
 disconnect:
     hclose(io);
-    return io;
+    return 0;
 }
 
-void   hclose   (hio_t* io) {
+void hio_close (hio_t* io) {
     //printd("close fd=%d\n", io->fd);
-    if (io->closed) return;
 #ifdef USE_DISCONNECTEX
     // DisconnectEx reuse socket
     if (io->connectex) {
@@ -386,7 +355,6 @@ void   hclose   (hio_t* io) {
 #else
     closesocket(io->fd);
 #endif
-    io->closed = 1;
     if (io->hovlp) {
         hoverlapped_t* hovlp = (hoverlapped_t*)io->hovlp;
         // NOTE: hread buf provided by caller
@@ -395,12 +363,6 @@ void   hclose   (hio_t* io) {
         }
         SAFE_FREE(io->hovlp);
     }
-    if (io->close_cb) {
-        printd("close_cb------\n");
-        io->close_cb(io);
-        printd("close_cb======\n");
-    }
-    hio_del(io, ALL_EVENTS);
 }
 
 #endif

+ 0 - 88
examples/client.cpp

@@ -1,88 +0,0 @@
-#include "hloop.h"
-#include "hbase.h"
-#include "hsocket.h"
-
-#define RECV_BUFSIZE    4096
-static char readbuf[RECV_BUFSIZE];
-
-void on_timer(htimer_t* timer) {
-    static int cnt = 0;
-    printf("on_timer timer_id=%lu time=%lus cnt=%d\n", timer->event_id, hloop_now(timer->loop), ++cnt);
-}
-
-void on_idle(hidle_t* idle) {
-    static int cnt = 0;
-    printf("on_idle idle_id=%lu cnt=%d\n", idle->event_id, ++cnt);
-}
-
-void on_write(hio_t* io, const void* buf, int writebytes) {
-    printf("on_write fd=%d writebytes=%d\n", io->fd, writebytes);
-}
-
-void on_stdin(hio_t* io, void* buf, int readbytes) {
-    printf("on_stdin fd=%d readbytes=%d\n", io->fd, readbytes);
-    printf("> %s\n", buf);
-
-    hio_t* iosock = (hio_t*)io->userdata;
-    hwrite(iosock->loop, iosock->fd, buf, readbytes, on_write);
-}
-
-void on_read(hio_t* io, void* buf, int readbytes) {
-    printf("on_read fd=%d readbytes=%d\n", io->fd, readbytes);
-    printf("< %s\n", buf);
-    printf(">>");
-    fflush(stdout);
-}
-
-void on_close(hio_t* io) {
-    printf("on_close fd=%d error=%d\n", io->fd, io->error);
-    hio_t* iostdin = (hio_t*)io->userdata;
-    hio_del(iostdin, READ_EVENT);
-}
-
-void on_connect(hio_t* io, int state) {
-    printf("on_connect fd=%d state=%d\n", io->fd, state);
-    if (state == 0) {
-        printf("error=%d:%s\n", io->error, strerror(io->error));
-        return;
-    }
-    char localaddrstr[INET6_ADDRSTRLEN+16] = {0};
-    char peeraddrstr[INET6_ADDRSTRLEN+16] = {0};
-    printf("connect connfd=%d [%s] => [%s]\n", io->fd,
-            sockaddr_snprintf(io->localaddr, localaddrstr, sizeof(localaddrstr)),
-            sockaddr_snprintf(io->peeraddr, peeraddrstr, sizeof(peeraddrstr)));
-
-    // NOTE: just on loop, readbuf can be shared.
-    hio_t* iostdin = hread(io->loop, 0, readbuf, RECV_BUFSIZE, on_stdin);
-    iostdin->userdata = io;
-    hio_t* iosock = hread(io->loop, io->fd, readbuf, RECV_BUFSIZE, on_read);
-    iosock->close_cb = on_close;
-    iosock->userdata = iostdin;
-
-    printf(">>");
-    fflush(stdout);
-}
-
-int main(int argc, char** argv) {
-    if (argc < 3) {
-        printf("Usage: cmd host port\n");
-        return -10;
-    }
-    const char* host = argv[1];
-    int port = atoi(argv[2]);
-
-    MEMCHECK;
-
-    hloop_t loop;
-    hloop_init(&loop);
-    //hidle_add(&loop, on_idle, INFINITE);
-    //htimer_add(&loop, on_timer, 1000, INFINITE);
-    hio_t* io = hconnect(&loop, host, port, on_connect);
-    if (io == NULL) {
-        return -20;
-    }
-    printf("connfd=%d\n", io->fd);
-    hloop_run(&loop);
-
-    return 0;
-}

+ 0 - 63
examples/server.cpp

@@ -1,63 +0,0 @@
-#include "hloop.h"
-#include "hsocket.h"
-
-#define RECV_BUFSIZE    4096
-static char readbuf[RECV_BUFSIZE];
-
-void on_timer(htimer_t* timer) {
-    static int cnt = 0;
-    printf("on_timer timer_id=%lu time=%lus cnt=%d\n", timer->event_id, hloop_now(timer->loop), ++cnt);
-}
-
-void on_idle(hidle_t* idle) {
-    static int cnt = 0;
-    printf("on_idle idle_id=%lu cnt=%d\n", idle->event_id, ++cnt);
-}
-
-void on_write(hio_t* io, const void* buf, int writebytes) {
-    printf("on_write fd=%d writebytes=%d\n", io->fd, writebytes);
-}
-
-void on_close(hio_t* io) {
-    printf("on_close fd=%d error=%d\n", io->fd, io->error);
-}
-
-void on_read(hio_t* io, void* buf, int readbytes) {
-    printf("on_read fd=%d readbytes=%d\n", io->fd, readbytes);
-    printf("< %s\n", buf);
-    // echo
-    printf("> %s\n", buf);
-    hwrite(io->loop, io->fd, buf, readbytes, on_write);
-}
-
-void on_accept(hio_t* io, int connfd) {
-    printf("on_accept listenfd=%d connfd=%d\n", io->fd, connfd);
-    char localaddrstr[INET6_ADDRSTRLEN+16] = {0};
-    char peeraddrstr[INET6_ADDRSTRLEN+16] = {0};
-    printf("accept listenfd=%d connfd=%d [%s] <= [%s]\n", io->fd, connfd,
-            sockaddr_snprintf(io->localaddr, localaddrstr, sizeof(localaddrstr)),
-            sockaddr_snprintf(io->peeraddr, peeraddrstr, sizeof(peeraddrstr)));
-
-    // one loop can use one readbuf
-    hio_t* connio = hread(io->loop, connfd, readbuf, RECV_BUFSIZE, on_read);
-    connio->close_cb = on_close;
-}
-
-int main(int argc, char** argv) {
-    if (argc < 2) {
-        printf("Usage: cmd port\n");
-        return -10;
-    }
-    int port = atoi(argv[1]);
-
-    hloop_t loop;
-    hloop_init(&loop);
-    //hidle_add(&loop, on_idle, INFINITE);
-    //htimer_add(&loop, on_timer, 1000, INFINITE);
-    hio_t* io = hlisten(&loop, port, on_accept);
-    if (io == NULL) {
-        return -20;
-    }
-    printf("listenfd=%d\n", io->fd);
-    hloop_run(&loop);
-}

+ 7 - 8
http/server/http_server.cpp

@@ -9,8 +9,8 @@
 #include "HttpParser.h"
 #include "HttpHandler.h"
 
-#define RECV_BUFSIZE    4096
-#define SEND_BUFSIZE    4096
+#define RECV_BUFSIZE    8192
+#define SEND_BUFSIZE    8192
 
 static HttpService  s_default_service;
 static FileCache    s_filecache;
@@ -36,8 +36,8 @@ static void worker_init(void* userdata) {
 #endif
 }
 
-static void on_read(hio_t* io, void* buf, int readbytes) {
-    //printf("on_read fd=%d readbytes=%d\n", io->fd, readbytes);
+static void on_recv(hio_t* io, void* buf, int readbytes) {
+    //printf("on_recv fd=%d readbytes=%d\n", io->fd, readbytes);
     HttpHandler* handler = (HttpHandler*)io->userdata;
     HttpParser* parser = &handler->parser;
     // recv -> HttpParser -> HttpRequest -> handle_request -> HttpResponse -> send
@@ -90,10 +90,10 @@ static void on_read(hio_t* io, void* buf, int readbytes) {
             sendbuf.len = header.size();
         }
         // send header/body
-        hwrite(io->loop, io->fd, sendbuf.base, sendbuf.len);
+        hsend(io->loop, io->fd, sendbuf.base, sendbuf.len);
         if (send_in_one_packet == false) {
             // send body
-            hwrite(io->loop, io->fd, handler->res.body.data(), handler->res.body.size());
+            hsend(io->loop, io->fd, handler->res.body.data(), handler->res.body.size());
         }
 
         hlogi("[%s:%d][%s %s]=>[%d %s]",
@@ -129,9 +129,8 @@ static void on_accept(hio_t* io, int connfd) {
             sockaddr_snprintf(io->peeraddr, peeraddrstr, sizeof(peeraddrstr)));
     */
 
-    nonblocking(connfd);
     HBuf* buf = (HBuf*)io->loop->userdata;
-    hio_t* connio = hread(io->loop, connfd, buf->base, buf->len, on_read);
+    hio_t* connio = hrecv(io->loop, connfd, buf->base, buf->len, on_recv);
     connio->close_cb = on_close;
     // new HttpHandler
     // delete on_close