Jelajahi Sumber

New feature: rudp WITH_KCP

ithewei 4 tahun lalu
induk
melakukan
57e0211c57
25 mengubah file dengan 2311 tambahan dan 42 penghapusan
  1. 12 0
      BUILD.md
  2. 8 2
      CMakeLists.txt
  3. 33 28
      Makefile
  4. 1 0
      README-CN.md
  5. 2 1
      README.md
  6. 3 0
      config.ini
  7. 2 1
      config.mk
  8. 4 0
      configure
  9. 6 1
      docs/PLAN.md
  10. 2 0
      event/README.md
  11. 138 7
      event/hevent.c
  12. 18 0
      event/hevent.h
  13. 53 0
      event/hloop.h
  14. 1299 0
      event/kcp/ikcp.c
  15. 416 0
      event/kcp/ikcp.h
  16. 13 0
      event/nio.c
  17. 151 0
      event/rudp.c
  18. 61 0
      event/rudp.h
  19. 25 0
      evpp/UdpClient.h
  20. 14 0
      evpp/UdpServer.h
  21. 21 0
      examples/nc.c
  22. 24 2
      examples/udp_echo_server.c
  23. 1 0
      hconfig.h
  24. 2 0
      hconfig.h.in
  25. 2 0
      scripts/unittest.sh

+ 12 - 0
BUILD.md

@@ -90,6 +90,12 @@ make libhv
 
 ## options
 
+### compile without c++
+```
+./configure --without-evpp
+make clean && make
+```
+
 ### compile WITH_OPENSSL
 Enable SSL in libhv is so easy, just only two apis:
 ```
@@ -126,3 +132,9 @@ make clean && make
 bin/httpd -s restart -d
 bin/curl -v http://localhost:8080 --http2
 ```
+
+### compile WITH_KCP
+```
+./configure --with-kcp
+make clean && make
+```

+ 8 - 2
CMakeLists.txt

@@ -28,6 +28,8 @@ option(WITH_OPENSSL "with openssl library" OFF)
 option(WITH_GNUTLS  "with gnutls library"  OFF)
 option(WITH_MBEDTLS "with mbedtls library" OFF)
 
+option(WITH_KCP "with kcp" OFF)
+
 set(CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/cmake;${CMAKE_MODULE_PATH}")
 include(utils)
 include(vars)
@@ -148,8 +150,12 @@ if(APPLE)
 endif()
 
 # see Makefile
-set(ALL_SRCDIRS . base ssl event util cpputil evpp protocol http http/client http/server)
-set(LIBHV_SRCDIRS . base ssl event util)
+set(ALL_SRCDIRS . base ssl event event/kcp util cpputil evpp protocol http http/client http/server)
+set(CORE_SRCDIRS . base ssl event)
+if(WITH_KCP)
+    set(CORE_SRCDIRS ${CORE_SRCDIRS} event/kcp)
+endif()
+set(LIBHV_SRCDIRS ${CORE_SRCDIRS} util)
 set(LIBHV_HEADERS hv.h hconfig.h hexport.h)
 set(LIBHV_HEADERS ${LIBHV_HEADERS} ${BASE_HEADERS} ${SSL_HEADERS} ${EVENT_HEADERS} ${UTIL_HEADERS})
 

+ 33 - 28
Makefile

@@ -2,9 +2,13 @@ include config.mk
 include Makefile.vars
 
 MAKEF=$(MAKE) -f Makefile.in
-ALL_SRCDIRS=. base ssl event util cpputil evpp protocol http http/client http/server
+ALL_SRCDIRS=. base ssl event event/kcp util cpputil evpp protocol http http/client http/server
+CORE_SRCDIRS=. base ssl event
+ifeq ($(WITH_KCP), yes)
+CORE_SRCDIRS += event/kcp
+endif
 
-LIBHV_SRCDIRS = . base ssl event util
+LIBHV_SRCDIRS = $(CORE_SRCDIRS) util
 LIBHV_HEADERS = hv.h hconfig.h hexport.h
 LIBHV_HEADERS += $(BASE_HEADERS) $(SSL_HEADERS) $(EVENT_HEADERS) $(UTIL_HEADERS)
 
@@ -75,78 +79,78 @@ hmain_test: prepare
 	$(MAKEF) TARGET=$@ SRCDIRS=". base cpputil" SRCS="examples/hmain_test.cpp"
 
 htimer_test: prepare
-	$(MAKEF) TARGET=$@ SRCDIRS=". base ssl event" SRCS="examples/htimer_test.c"
+	$(MAKEF) TARGET=$@ SRCDIRS="$(CORE_SRCDIRS)" SRCS="examples/htimer_test.c"
 
 hloop_test: prepare
-	$(MAKEF) TARGET=$@ SRCDIRS=". base ssl event" SRCS="examples/hloop_test.c"
+	$(MAKEF) TARGET=$@ SRCDIRS="$(CORE_SRCDIRS)" SRCS="examples/hloop_test.c"
 
 tcp_echo_server: prepare
-	$(MAKEF) TARGET=$@ SRCDIRS=". base ssl event" SRCS="examples/tcp_echo_server.c"
+	$(MAKEF) TARGET=$@ SRCDIRS="$(CORE_SRCDIRS)" SRCS="examples/tcp_echo_server.c"
 
 tcp_chat_server: prepare
-	$(MAKEF) TARGET=$@ SRCDIRS=". base ssl event" SRCS="examples/tcp_chat_server.c"
+	$(MAKEF) TARGET=$@ SRCDIRS="$(CORE_SRCDIRS)" SRCS="examples/tcp_chat_server.c"
 
 tcp_proxy_server: prepare
-	$(MAKEF) TARGET=$@ SRCDIRS=". base ssl event" SRCS="examples/tcp_proxy_server.c"
+	$(MAKEF) TARGET=$@ SRCDIRS="$(CORE_SRCDIRS)" SRCS="examples/tcp_proxy_server.c"
 
 udp_echo_server: prepare
-	$(MAKEF) TARGET=$@ SRCDIRS=". base ssl event" SRCS="examples/udp_echo_server.c"
+	$(MAKEF) TARGET=$@ SRCDIRS="$(CORE_SRCDIRS)" SRCS="examples/udp_echo_server.c"
 
 udp_proxy_server: prepare
-	$(MAKEF) TARGET=$@ SRCDIRS=". base ssl event" SRCS="examples/udp_proxy_server.c"
+	$(MAKEF) TARGET=$@ SRCDIRS="$(CORE_SRCDIRS)" SRCS="examples/udp_proxy_server.c"
 
 multi-acceptor-processes: prepare
-	$(MAKEF) TARGET=$@ SRCDIRS=". base ssl event" SRCS="examples/multi-thread/multi-acceptor-processes.c"
+	$(MAKEF) TARGET=$@ SRCDIRS="$(CORE_SRCDIRS)" SRCS="examples/multi-thread/multi-acceptor-processes.c"
 
 multi-acceptor-threads: prepare
-	$(MAKEF) TARGET=$@ SRCDIRS=". base ssl event" SRCS="examples/multi-thread/multi-acceptor-threads.c"
+	$(MAKEF) TARGET=$@ SRCDIRS="$(CORE_SRCDIRS)" SRCS="examples/multi-thread/multi-acceptor-threads.c"
 
 one-acceptor-multi-workers: prepare
-	$(MAKEF) TARGET=$@ SRCDIRS=". base ssl event" SRCS="examples/multi-thread/one-acceptor-multi-workers.c"
+	$(MAKEF) TARGET=$@ SRCDIRS="$(CORE_SRCDIRS)" SRCS="examples/multi-thread/one-acceptor-multi-workers.c"
 
 nc: prepare
-	$(MAKEF) TARGET=$@ SRCDIRS=". base ssl event" SRCS="examples/nc.c"
+	$(MAKEF) TARGET=$@ SRCDIRS="$(CORE_SRCDIRS)" SRCS="examples/nc.c"
 
 nmap: prepare
-	$(MAKEF) TARGET=$@ SRCDIRS=". base ssl event cpputil examples/nmap" DEFINES="PRINT_DEBUG"
+	$(MAKEF) TARGET=$@ SRCDIRS="$(CORE_SRCDIRS) cpputil examples/nmap" DEFINES="PRINT_DEBUG"
 
 wrk: prepare
-	$(MAKEF) TARGET=$@ SRCDIRS=". base ssl event util cpputil evpp http" SRCS="examples/wrk.cpp"
+	$(MAKEF) TARGET=$@ SRCDIRS="$(CORE_SRCDIRS) util cpputil evpp http" SRCS="examples/wrk.cpp"
 
 httpd: prepare
 	$(RM) examples/httpd/*.o
-	$(MAKEF) TARGET=$@ SRCDIRS=". base ssl event util cpputil evpp http http/client http/server examples/httpd"
+	$(MAKEF) TARGET=$@ SRCDIRS="$(CORE_SRCDIRS) util cpputil evpp http http/client http/server examples/httpd"
 
 consul: prepare
-	$(MAKEF) TARGET=$@ SRCDIRS=". base ssl event util cpputil evpp http http/client examples/consul" DEFINES="PRINT_DEBUG"
+	$(MAKEF) TARGET=$@ SRCDIRS="$(CORE_SRCDIRS) util cpputil evpp http http/client examples/consul" DEFINES="PRINT_DEBUG"
 
 curl: prepare
-	$(MAKEF) TARGET=$@ SRCDIRS=". base ssl event util cpputil evpp http http/client" SRCS="examples/curl.cpp"
-	# $(MAKEF) TARGET=$@ SRCDIRS=". base ssl event util cpputil evpp http http/client" SRCS="examples/curl.cpp" WITH_CURL=yes
+	$(MAKEF) TARGET=$@ SRCDIRS="$(CORE_SRCDIRS) util cpputil evpp http http/client" SRCS="examples/curl.cpp"
+	# $(MAKEF) TARGET=$@ SRCDIRS="$(CORE_SRCDIRS) util cpputil evpp http http/client" SRCS="examples/curl.cpp" WITH_CURL=yes
 
 wget: prepare
-	$(MAKEF) TARGET=$@ SRCDIRS=". base ssl event util cpputil evpp http http/client" SRCS="examples/wget.cpp"
+	$(MAKEF) TARGET=$@ SRCDIRS="$(CORE_SRCDIRS) util cpputil evpp http http/client" SRCS="examples/wget.cpp"
 
 http_server_test: prepare
-	$(MAKEF) TARGET=$@ SRCDIRS=". base ssl event util cpputil evpp http http/server" SRCS="examples/http_server_test.cpp"
+	$(MAKEF) TARGET=$@ SRCDIRS="$(CORE_SRCDIRS) util cpputil evpp http http/server" SRCS="examples/http_server_test.cpp"
 
 http_client_test: prepare
-	$(MAKEF) TARGET=$@ SRCDIRS=". base ssl event util cpputil evpp http http/client" SRCS="examples/http_client_test.cpp"
+	$(MAKEF) TARGET=$@ SRCDIRS="$(CORE_SRCDIRS) util cpputil evpp http http/client" SRCS="examples/http_client_test.cpp"
 
 websocket_server_test: prepare
-	$(MAKEF) TARGET=$@ SRCDIRS=". base ssl event util cpputil evpp http http/server" SRCS="examples/websocket_server_test.cpp"
+	$(MAKEF) TARGET=$@ SRCDIRS="$(CORE_SRCDIRS) util cpputil evpp http http/server" SRCS="examples/websocket_server_test.cpp"
 
 websocket_client_test: prepare
-	$(MAKEF) TARGET=$@ SRCDIRS=". base ssl event util cpputil evpp http http/client" SRCS="examples/websocket_client_test.cpp"
+	$(MAKEF) TARGET=$@ SRCDIRS="$(CORE_SRCDIRS) util cpputil evpp http http/client" SRCS="examples/websocket_client_test.cpp"
 
 jsonrpc: jsonrpc_client jsonrpc_server
 
 jsonrpc_client: prepare
-	$(MAKEF) TARGET=$@ SRCDIRS=". base ssl event" SRCS="examples/jsonrpc/jsonrpc_client.c examples/jsonrpc/jsonrpc.c examples/jsonrpc/cJSON.c"
+	$(MAKEF) TARGET=$@ SRCDIRS="$(CORE_SRCDIRS)" SRCS="examples/jsonrpc/jsonrpc_client.c examples/jsonrpc/jsonrpc.c examples/jsonrpc/cJSON.c"
 
 jsonrpc_server: prepare
 	$(RM) examples/jsonrpc/*.o
-	$(MAKEF) TARGET=$@ SRCDIRS=". base ssl event" SRCS="examples/jsonrpc/jsonrpc_server.c examples/jsonrpc/jsonrpc.c examples/jsonrpc/cJSON.c"
+	$(MAKEF) TARGET=$@ SRCDIRS="$(CORE_SRCDIRS)" SRCS="examples/jsonrpc/jsonrpc_server.c examples/jsonrpc/jsonrpc.c examples/jsonrpc/cJSON.c"
 
 protorpc: protorpc_client protorpc_server
 
@@ -154,17 +158,18 @@ protorpc_protoc:
 	bash examples/protorpc/proto/protoc.sh
 
 protorpc_client: prepare protorpc_protoc
-	$(MAKEF) TARGET=$@ SRCDIRS=". base ssl event cpputil evpp examples/protorpc/generated" \
+	$(MAKEF) TARGET=$@ SRCDIRS="$(CORE_SRCDIRS) cpputil evpp examples/protorpc/generated" \
 		SRCS="examples/protorpc/protorpc_client.cpp examples/protorpc/protorpc.c" \
 		LIBS="protobuf"
 
 protorpc_server: prepare protorpc_protoc
 	$(RM) examples/protorpc/*.o
-	$(MAKEF) TARGET=$@ SRCDIRS=". base ssl event cpputil evpp examples/protorpc/generated" \
+	$(MAKEF) TARGET=$@ SRCDIRS="$(CORE_SRCDIRS) cpputil evpp examples/protorpc/generated" \
 		SRCS="examples/protorpc/protorpc_server.cpp examples/protorpc/protorpc.c" \
 		LIBS="protobuf"
 
 unittest: prepare
+	$(CC)  -g -Wall -O0 -std=c99   -I. -Ibase            -o bin/rbtree_test       unittest/rbtree_test.c        base/rbtree.c
 	$(CC)  -g -Wall -O0 -std=c99   -I. -Ibase            -o bin/mkdir_p           unittest/mkdir_test.c         base/hbase.c
 	$(CC)  -g -Wall -O0 -std=c99   -I. -Ibase            -o bin/rmdir_p           unittest/rmdir_test.c         base/hbase.c
 	$(CC)  -g -Wall -O0 -std=c99   -I. -Ibase            -o bin/date              unittest/date_test.c          base/htime.c

+ 1 - 0
README-CN.md

@@ -25,6 +25,7 @@
 - 高性能事件循环(网络IO事件、定时器事件、空闲事件、自定义事件)
 - TCP/UDP服务端/客户端/代理
 - TCP支持心跳、转发、拆包、多线程安全write和close等特性
+- 可靠UDP支持: WITH_KCP
 - SSL/TLS加密通信(可选WITH_OPENSSL、WITH_GNUTLS、WITH_MBEDTLS)
 - HTTP服务端/客户端(支持https http1/x http2 grpc)
 - HTTP支持静态文件服务、目录服务、同步/异步API处理函数

+ 2 - 1
README.md

@@ -24,9 +24,10 @@ but simpler api and richer protocols.
 ## ✨ Features
 
 - Cross-platform (Linux, Windows, MacOS, Solaris)
-- EventLoop (IO, timer, idle, custom)
+- High-performance EventLoop (IO, timer, idle, custom)
 - TCP/UDP client/server/proxy
 - TCP supports heartbeat, upstream, unpack, MultiThread-safe write and close, etc.
+- RUDP support: WITH_KCP
 - SSL/TLS support: (via WITH_OPENSSL or WITH_GNUTLS or WITH_MBEDTLS)
 - HTTP client/server (support https http1/x http2 grpc)
 - HTTP static file service, indexof service, sync/async API handler

+ 3 - 0
config.ini

@@ -33,3 +33,6 @@ WITH_NGHTTP2=no
 WITH_OPENSSL=no
 WITH_GNUTLS=no
 WITH_MBEDTLS=no
+
+# rudp
+WITH_KCP=no

+ 2 - 1
config.mk

@@ -16,4 +16,5 @@ WITH_NGHTTP2=no
 WITH_OPENSSL=no
 WITH_GNUTLS=no
 WITH_MBEDTLS=no
-CONFIG_DATE=20210817
+WITH_KCP=no
+CONFIG_DATE=20211124

+ 4 - 0
configure

@@ -36,6 +36,9 @@ dependencies:
   --with-gnutls         compile with gnutls?            (DEFAULT: $WITH_GNUTLS)
   --with-mbedtls        compile with mbedtls?           (DEFAULT: $WITH_MBEDTLS)
 
+rudp:
+  --with-kcp            compile with kcp?               (DEFAULT: $WITH_KCP)
+
 END
 }
 
@@ -250,6 +253,7 @@ option=WITH_GNUTLS && check_option
 option=WITH_MBEDTLS && check_option
 option=ENABLE_UDS && check_option
 option=USE_MULTIMAP && check_option
+option=WITH_KCP && check_option
 
 # end confile
 cat << END >> $confile

+ 6 - 1
docs/PLAN.md

@@ -1,6 +1,8 @@
 ## Done
 
+- base: cross platfrom infrastructure
 - event: select/poll/epoll/kqueue/port
+- ssl: openssl/guntls/mbedtls
 - evpp: c++ EventLoop interface similar to muduo and evpp
 - http client/server: include https http1/x http2
 - websocket client/server
@@ -18,5 +20,8 @@
 - lua binding
 - js binding
 - hrpc = libhv + protobuf
-- reliable udp: FEC, ARQ, KCP, UDT, QUIC
+- rudp: FEC, ARQ, KCP, UDT, QUIC
 - have a taste of io_uring
+- coroutine
+- IM-libhv
+- GameServer-libhv

+ 2 - 0
event/README.md

@@ -5,6 +5,8 @@
 ├── hloop.h     事件循环模块对外头文件
 ├── hevent.h    事件结构体定义
 ├── nlog.h      网络日志
+├── unpack.h    拆包
+├── rudp.h      可靠UDP
 ├── iowatcher.h IO多路复用统一抽象接口
 ├── select.c    EVENT_SELECT实现
 ├── poll.c      EVENT_POLL实现

+ 138 - 7
event/hevent.c

@@ -40,8 +40,12 @@ static void fill_io_type(hio_t* io) {
 }
 
 static void hio_socket_init(hio_t* io) {
-    // nonblocking
-    nonblocking(io->fd);
+    if (io->io_type & HIO_TYPE_SOCK_RAW || io->io_type & HIO_TYPE_SOCK_DGRAM) {
+        // NOTE: sendto multiple peeraddr cannot use io->write_queue
+        blocking(io->fd);
+    } else {
+        nonblocking(io->fd);
+    }
     // fill io->localaddr io->peeraddr
     if (io->localaddr == NULL) {
         HV_ALLOC(io->localaddr, sizeof(sockaddr_u));
@@ -52,12 +56,8 @@ static void hio_socket_init(hio_t* io) {
     socklen_t addrlen = sizeof(sockaddr_u);
     int ret = getsockname(io->fd, io->localaddr, &addrlen);
     printd("getsockname fd=%d ret=%d errno=%d\n", io->fd, ret, socket_errno());
-    // NOTE:
-    // tcp_server peeraddr set by accept
-    // udp_server peeraddr set by recvfrom
-    // tcp_client/udp_client peeraddr set by hio_setpeeraddr
+    // NOTE: udp peeraddr set by recvfrom/sendto
     if (io->io_type & HIO_TYPE_SOCK_STREAM) {
-        // tcp acceptfd
         addrlen = sizeof(sockaddr_u);
         ret = getpeername(io->fd, io->peeraddr, &addrlen);
         printd("getpeername fd=%d ret=%d errno=%d\n", io->fd, ret, socket_errno());
@@ -142,6 +142,12 @@ void hio_ready(hio_t* io) {
     if (io->io_type & HIO_TYPE_SOCKET) {
         hio_socket_init(io);
     }
+
+#if WITH_RUDP
+    if (io->io_type & HIO_TYPE_SOCK_RAW || io->io_type & HIO_TYPE_SOCK_DGRAM) {
+        rudp_init(&io->rudp);
+    }
+#endif
 }
 
 void hio_done(hio_t* io) {
@@ -163,6 +169,12 @@ void hio_done(hio_t* io) {
     }
     write_queue_cleanup(&io->write_queue);
     hrecursive_mutex_unlock(&io->write_mutex);
+
+#if WITH_RUDP
+    if (io->io_type & HIO_TYPE_SOCK_RAW || io->io_type & HIO_TYPE_SOCK_DGRAM) {
+        rudp_cleanup(&io->rudp);
+    }
+#endif
 }
 
 void hio_free(hio_t* io) {
@@ -610,3 +622,122 @@ hio_t* hio_setup_udp_upstream(hio_t* io, const char* host, int port) {
     hio_read_upstream(io);
     return upstream_io;
 }
+
+#if WITH_RUDP
+rudp_entry_t* hio_get_rudp(hio_t* io) {
+    rudp_entry_t* rudp = rudp_get(&io->rudp, io->peeraddr);
+    rudp->io = io;
+    return rudp;
+}
+
+static void hio_close_rudp_event_cb(hevent_t* ev) {
+    rudp_entry_t* entry = (rudp_entry_t*)ev->userdata;
+    rudp_del(&entry->io->rudp, (struct sockaddr*)&entry->addr);
+    // rudp_entry_free(entry);
+}
+
+int hio_close_rudp(hio_t* io, struct sockaddr* peeraddr) {
+    if (peeraddr == NULL) peeraddr = io->peeraddr;
+    // NOTE: do rudp_del for thread-safe
+    rudp_entry_t* entry = rudp_get(&io->rudp, peeraddr);
+    // NOTE: just rudp_remove first, do rudp_entry_free async for safe.
+    // rudp_entry_t* entry = rudp_remove(&io->rudp, peeraddr);
+    if (entry) {
+        hevent_t ev;
+        memset(&ev, 0, sizeof(ev));
+        ev.cb = hio_close_rudp_event_cb;
+        ev.userdata = entry;
+        ev.priority = HEVENT_HIGH_PRIORITY;
+        hloop_post_event(io->loop, &ev);
+    }
+    return 0;
+}
+#endif
+
+#if WITH_KCP
+static kcp_setting_t s_kcp_setting;
+static int __kcp_output(const char* buf, int len, ikcpcb* ikcp, void* userdata) {
+    // printf("ikcp_output len=%d\n", len);
+    rudp_entry_t* rudp = (rudp_entry_t*)userdata;
+    assert(rudp != NULL && rudp->io != NULL);
+    int nsend = sendto(rudp->io->fd, buf, len, 0, &rudp->addr.sa, SOCKADDR_LEN(&rudp->addr));
+    // printf("sendto nsend=%d\n", nsend);
+    return nsend;
+}
+
+static void __kcp_update_timer_cb(htimer_t* timer) {
+    rudp_entry_t* rudp = (rudp_entry_t*)timer->privdata;
+    assert(rudp != NULL && rudp->io != NULL && rudp->kcp.ikcp != NULL);
+    ikcp_update(rudp->kcp.ikcp, (IUINT32)(rudp->io->loop->cur_hrtime / 1000));
+}
+
+int hio_set_kcp(hio_t* io, kcp_setting_t* setting) {
+    io->io_type = HIO_TYPE_KCP;
+    io->kcp_setting = setting;
+    return 0;
+}
+
+kcp_t* hio_get_kcp(hio_t* io) {
+    rudp_entry_t* rudp = hio_get_rudp(io);
+    assert(rudp != NULL);
+    kcp_t* kcp = &rudp->kcp;
+    if (kcp->ikcp != NULL) return kcp;
+    if (io->kcp_setting == NULL) {
+        io->kcp_setting = &s_kcp_setting;
+    }
+    kcp_setting_t* setting = io->kcp_setting;
+    assert(io->kcp_setting != NULL);
+    kcp->ikcp = ikcp_create(setting->conv, rudp);
+    // printf("ikcp_create ikcp=%p\n", kcp->ikcp);
+    kcp->ikcp->output = __kcp_output;
+    if (setting->interval > 0) {
+        ikcp_nodelay(kcp->ikcp, setting->nodelay, setting->interval, setting->fastresend, setting->nocwnd);
+    }
+    if (setting->sndwnd > 0 && setting->rcvwnd > 0) {
+        ikcp_wndsize(kcp->ikcp, setting->sndwnd, setting->rcvwnd);
+    }
+    if (setting->mtu > 0) {
+        ikcp_setmtu(kcp->ikcp, setting->mtu);
+    }
+    if (kcp->update_timer == NULL) {
+        int update_interval = setting->update_interval;
+        if (update_interval == 0) {
+            update_interval = DEFAULT_KCP_UPDATE_INTERVAL;
+        }
+        kcp->update_timer = htimer_add(io->loop, __kcp_update_timer_cb, update_interval, INFINITE);
+        kcp->update_timer->privdata = rudp;
+    }
+    // NOTE: alloc kcp->readbuf when hio_read_kcp
+    return kcp;
+}
+
+int hio_write_kcp(hio_t* io, const void* buf, size_t len) {
+    kcp_t* kcp = hio_get_kcp(io);
+    int nsend = ikcp_send(kcp->ikcp, (const char*)buf, len);
+    // printf("ikcp_send len=%d nsend=%d\n", (int)len, nsend);
+    if (nsend < 0) {
+        hio_close(io);
+    }
+    ikcp_update(kcp->ikcp, (IUINT32)io->loop->cur_hrtime / 1000);
+    return nsend;
+}
+
+int hio_read_kcp (hio_t* io, void* buf, int readbytes) {
+    kcp_t* kcp = hio_get_kcp(io);
+    // printf("ikcp_input len=%d\n", readbytes);
+    ikcp_input(kcp->ikcp, (const char*)buf, readbytes);
+    if (kcp->readbuf.base == NULL || kcp->readbuf.len == 0) {
+        kcp->readbuf.len = DEFAULT_KCP_READ_BUFSIZE;
+        HV_ALLOC(kcp->readbuf.base, kcp->readbuf.len);
+    }
+    int ret = 0;
+    while (1) {
+        int nrecv = ikcp_recv(kcp->ikcp, kcp->readbuf.base, kcp->readbuf.len);
+        // printf("ikcp_recv nrecv=%d\n", nrecv);
+        if (nrecv < 0) break;
+        hio_read_cb(io, kcp->readbuf.base, nrecv);
+        ret += nrecv;
+    }
+    return ret;
+}
+#endif

+ 18 - 0
event/hevent.h

@@ -3,6 +3,7 @@
 
 #include "hloop.h"
 #include "iowatcher.h"
+#include "rudp.h"
 
 #include "hbuf.h"
 #include "hmutex.h"
@@ -148,9 +149,17 @@ struct hio_s {
 #if defined(EVENT_POLL) || defined(EVENT_KQUEUE)
     int         event_index[2]; // for poll,kqueue
 #endif
+
 #ifdef EVENT_IOCP
     void*       hovlp;          // for iocp/overlapio
 #endif
+
+#if WITH_RUDP
+    rudp_t          rudp;
+#if WITH_KCP
+    kcp_setting_t*  kcp_setting;
+#endif
+#endif
 };
 /*
  * hio lifeline:
@@ -189,6 +198,15 @@ static inline bool hio_is_alloced_readbuf(hio_t* io) {
 void hio_alloc_readbuf(hio_t* io, int len);
 void hio_free_readbuf(hio_t* io);
 
+#if WITH_RUDP
+rudp_entry_t* hio_get_rudp(hio_t* io);
+#if WITH_KCP
+kcp_t*  hio_get_kcp(hio_t* io);
+int     hio_write_kcp(hio_t* io, const void* buf, size_t len);
+int     hio_read_kcp (hio_t* io, void* buf, int readbytes);
+#endif
+#endif
+
 #define EVENT_ENTRY(p)          container_of(p, hevent_t, pending_node)
 #define IDLE_ENTRY(p)           container_of(p, hidle_t,  node)
 #define TIMER_ENTRY(p)          container_of(p, htimer_t, node)

+ 53 - 0
event/hloop.h

@@ -95,6 +95,7 @@ typedef enum {
     HIO_TYPE_SOCK_RAW   = 0x00000F00,
 
     HIO_TYPE_UDP        = 0x00001000,
+    HIO_TYPE_KCP        = 0x00002000,
     HIO_TYPE_DTLS       = 0x00010000,
     HIO_TYPE_SOCK_DGRAM = 0x000FF000,
 
@@ -507,6 +508,58 @@ unpack_setting_t grpc_unpack_setting = {
 };
 */
 
+//-----------------rudp---------------------------------------------
+#if WITH_KCP
+#define WITH_RUDP 1
+#endif
+
+#if WITH_RUDP
+// NOTE: hio_close_rudp is thread-safe.
+HV_EXPORT int hio_close_rudp(hio_t* io, struct sockaddr* peeraddr DEFAULT(NULL));
+#endif
+
+#if WITH_KCP
+typedef struct kcp_setting_s {
+    // ikcp_create(conv, ...)
+    int conv;
+    // ikcp_nodelay(kcp, nodelay, interval, fastresend, nocwnd)
+    int nodelay;
+    int interval;
+    int fastresend;
+    int nocwnd;
+    // ikcp_wndsize(kcp, sndwnd, rcvwnd)
+    int sndwnd;
+    int rcvwnd;
+    // ikcp_setmtu(kcp, mtu)
+    int mtu;
+    // ikcp_update
+    int update_interval;
+
+#ifdef __cplusplus
+    kcp_setting_s() {
+        conv = 0x11223344;
+        // normal mode
+        nodelay = 0;
+        interval = 40;
+        fastresend = 0;
+        nocwnd = 0;
+        // fast mode
+        // nodelay = 1;
+        // interval = 10;
+        // fastresend = 2;
+        // nocwnd = 1;
+
+        sndwnd = 0;
+        rcvwnd = 0;
+        mtu = 1400;
+        update_interval = 10; // ms
+    }
+#endif
+} kcp_setting_t;
+
+HV_EXPORT int hio_set_kcp(hio_t* io, kcp_setting_t* setting DEFAULT(NULL));
+#endif
+
 END_EXTERN_C
 
 #endif // HV_LOOP_H_

+ 1299 - 0
event/kcp/ikcp.c

@@ -0,0 +1,1299 @@
+//=====================================================================
+//
+// KCP - A Better ARQ Protocol Implementation
+// skywind3000 (at) gmail.com, 2010-2011
+//  
+// Features:
+// + Average RTT reduce 30% - 40% vs traditional ARQ like tcp.
+// + Maximum RTT reduce three times vs tcp.
+// + Lightweight, distributed as a single source file.
+//
+//=====================================================================
+#include "ikcp.h"
+
+#include <stddef.h>
+#include <stdlib.h>
+#include <string.h>
+#include <stdarg.h>
+#include <stdio.h>
+
+
+
+//=====================================================================
+// KCP BASIC
+//=====================================================================
+const IUINT32 IKCP_RTO_NDL = 30;		// no delay min rto
+const IUINT32 IKCP_RTO_MIN = 100;		// normal min rto
+const IUINT32 IKCP_RTO_DEF = 200;
+const IUINT32 IKCP_RTO_MAX = 60000;
+const IUINT32 IKCP_CMD_PUSH = 81;		// cmd: push data
+const IUINT32 IKCP_CMD_ACK  = 82;		// cmd: ack
+const IUINT32 IKCP_CMD_WASK = 83;		// cmd: window probe (ask)
+const IUINT32 IKCP_CMD_WINS = 84;		// cmd: window size (tell)
+const IUINT32 IKCP_ASK_SEND = 1;		// need to send IKCP_CMD_WASK
+const IUINT32 IKCP_ASK_TELL = 2;		// need to send IKCP_CMD_WINS
+const IUINT32 IKCP_WND_SND = 32;
+const IUINT32 IKCP_WND_RCV = 128;       // must >= max fragment size
+const IUINT32 IKCP_MTU_DEF = 1400;
+const IUINT32 IKCP_ACK_FAST	= 3;
+const IUINT32 IKCP_INTERVAL	= 100;
+const IUINT32 IKCP_OVERHEAD = 24;
+const IUINT32 IKCP_DEADLINK = 20;
+const IUINT32 IKCP_THRESH_INIT = 2;
+const IUINT32 IKCP_THRESH_MIN = 2;
+const IUINT32 IKCP_PROBE_INIT = 7000;		// 7 secs to probe window size
+const IUINT32 IKCP_PROBE_LIMIT = 120000;	// up to 120 secs to probe window
+const IUINT32 IKCP_FASTACK_LIMIT = 5;		// max times to trigger fastack
+
+
+//---------------------------------------------------------------------
+// encode / decode
+//---------------------------------------------------------------------
+
+/* encode 8 bits unsigned int */
+static inline char *ikcp_encode8u(char *p, unsigned char c)
+{
+	*(unsigned char*)p++ = c;
+	return p;
+}
+
+/* decode 8 bits unsigned int */
+static inline const char *ikcp_decode8u(const char *p, unsigned char *c)
+{
+	*c = *(unsigned char*)p++;
+	return p;
+}
+
+/* encode 16 bits unsigned int (lsb) */
+static inline char *ikcp_encode16u(char *p, unsigned short w)
+{
+#if IWORDS_BIG_ENDIAN || IWORDS_MUST_ALIGN
+	*(unsigned char*)(p + 0) = (w & 255);
+	*(unsigned char*)(p + 1) = (w >> 8);
+#else
+	memcpy(p, &w, 2);
+#endif
+	p += 2;
+	return p;
+}
+
+/* decode 16 bits unsigned int (lsb) */
+static inline const char *ikcp_decode16u(const char *p, unsigned short *w)
+{
+#if IWORDS_BIG_ENDIAN || IWORDS_MUST_ALIGN
+	*w = *(const unsigned char*)(p + 1);
+	*w = *(const unsigned char*)(p + 0) + (*w << 8);
+#else
+	memcpy(w, p, 2);
+#endif
+	p += 2;
+	return p;
+}
+
+/* encode 32 bits unsigned int (lsb) */
+static inline char *ikcp_encode32u(char *p, IUINT32 l)
+{
+#if IWORDS_BIG_ENDIAN || IWORDS_MUST_ALIGN
+	*(unsigned char*)(p + 0) = (unsigned char)((l >>  0) & 0xff);
+	*(unsigned char*)(p + 1) = (unsigned char)((l >>  8) & 0xff);
+	*(unsigned char*)(p + 2) = (unsigned char)((l >> 16) & 0xff);
+	*(unsigned char*)(p + 3) = (unsigned char)((l >> 24) & 0xff);
+#else
+	memcpy(p, &l, 4);
+#endif
+	p += 4;
+	return p;
+}
+
+/* decode 32 bits unsigned int (lsb) */
+static inline const char *ikcp_decode32u(const char *p, IUINT32 *l)
+{
+#if IWORDS_BIG_ENDIAN || IWORDS_MUST_ALIGN
+	*l = *(const unsigned char*)(p + 3);
+	*l = *(const unsigned char*)(p + 2) + (*l << 8);
+	*l = *(const unsigned char*)(p + 1) + (*l << 8);
+	*l = *(const unsigned char*)(p + 0) + (*l << 8);
+#else 
+	memcpy(l, p, 4);
+#endif
+	p += 4;
+	return p;
+}
+
+static inline IUINT32 _imin_(IUINT32 a, IUINT32 b) {
+	return a <= b ? a : b;
+}
+
+static inline IUINT32 _imax_(IUINT32 a, IUINT32 b) {
+	return a >= b ? a : b;
+}
+
+static inline IUINT32 _ibound_(IUINT32 lower, IUINT32 middle, IUINT32 upper) 
+{
+	return _imin_(_imax_(lower, middle), upper);
+}
+
+static inline long _itimediff(IUINT32 later, IUINT32 earlier) 
+{
+	return ((IINT32)(later - earlier));
+}
+
+//---------------------------------------------------------------------
+// manage segment
+//---------------------------------------------------------------------
+typedef struct IKCPSEG IKCPSEG;
+
+static void* (*ikcp_malloc_hook)(size_t) = NULL;
+static void (*ikcp_free_hook)(void *) = NULL;
+
+// internal malloc
+static void* ikcp_malloc(size_t size) {
+	if (ikcp_malloc_hook) 
+		return ikcp_malloc_hook(size);
+	return malloc(size);
+}
+
+// internal free
+static void ikcp_free(void *ptr) {
+	if (ikcp_free_hook) {
+		ikcp_free_hook(ptr);
+	}	else {
+		free(ptr);
+	}
+}
+
+// redefine allocator
+void ikcp_allocator(void* (*new_malloc)(size_t), void (*new_free)(void*))
+{
+	ikcp_malloc_hook = new_malloc;
+	ikcp_free_hook = new_free;
+}
+
+// allocate a new kcp segment
+static IKCPSEG* ikcp_segment_new(ikcpcb *kcp, int size)
+{
+	return (IKCPSEG*)ikcp_malloc(sizeof(IKCPSEG) + size);
+}
+
+// delete a segment
+static void ikcp_segment_delete(ikcpcb *kcp, IKCPSEG *seg)
+{
+	ikcp_free(seg);
+}
+
+// write log
+void ikcp_log(ikcpcb *kcp, int mask, const char *fmt, ...)
+{
+	char buffer[1024];
+	va_list argptr;
+	if ((mask & kcp->logmask) == 0 || kcp->writelog == 0) return;
+	va_start(argptr, fmt);
+	vsprintf(buffer, fmt, argptr);
+	va_end(argptr);
+	kcp->writelog(buffer, kcp, kcp->user);
+}
+
+// check log mask
+static int ikcp_canlog(const ikcpcb *kcp, int mask)
+{
+	if ((mask & kcp->logmask) == 0 || kcp->writelog == NULL) return 0;
+	return 1;
+}
+
+// output segment
+static int ikcp_output(ikcpcb *kcp, const void *data, int size)
+{
+	assert(kcp);
+	assert(kcp->output);
+	if (ikcp_canlog(kcp, IKCP_LOG_OUTPUT)) {
+		ikcp_log(kcp, IKCP_LOG_OUTPUT, "[RO] %ld bytes", (long)size);
+	}
+	if (size == 0) return 0;
+	return kcp->output((const char*)data, size, kcp, kcp->user);
+}
+
+// output queue
+void ikcp_qprint(const char *name, const struct IQUEUEHEAD *head)
+{
+#if 0
+	const struct IQUEUEHEAD *p;
+	printf("<%s>: [", name);
+	for (p = head->next; p != head; p = p->next) {
+		const IKCPSEG *seg = iqueue_entry(p, const IKCPSEG, node);
+		printf("(%lu %d)", (unsigned long)seg->sn, (int)(seg->ts % 10000));
+		if (p->next != head) printf(",");
+	}
+	printf("]\n");
+#endif
+}
+
+
+//---------------------------------------------------------------------
+// create a new kcpcb
+//---------------------------------------------------------------------
+ikcpcb* ikcp_create(IUINT32 conv, void *user)
+{
+	ikcpcb *kcp = (ikcpcb*)ikcp_malloc(sizeof(struct IKCPCB));
+	if (kcp == NULL) return NULL;
+	kcp->conv = conv;
+	kcp->user = user;
+	kcp->snd_una = 0;
+	kcp->snd_nxt = 0;
+	kcp->rcv_nxt = 0;
+	kcp->ts_recent = 0;
+	kcp->ts_lastack = 0;
+	kcp->ts_probe = 0;
+	kcp->probe_wait = 0;
+	kcp->snd_wnd = IKCP_WND_SND;
+	kcp->rcv_wnd = IKCP_WND_RCV;
+	kcp->rmt_wnd = IKCP_WND_RCV;
+	kcp->cwnd = 0;
+	kcp->incr = 0;
+	kcp->probe = 0;
+	kcp->mtu = IKCP_MTU_DEF;
+	kcp->mss = kcp->mtu - IKCP_OVERHEAD;
+	kcp->stream = 0;
+
+	kcp->buffer = (char*)ikcp_malloc((kcp->mtu + IKCP_OVERHEAD) * 3);
+	if (kcp->buffer == NULL) {
+		ikcp_free(kcp);
+		return NULL;
+	}
+
+	iqueue_init(&kcp->snd_queue);
+	iqueue_init(&kcp->rcv_queue);
+	iqueue_init(&kcp->snd_buf);
+	iqueue_init(&kcp->rcv_buf);
+	kcp->nrcv_buf = 0;
+	kcp->nsnd_buf = 0;
+	kcp->nrcv_que = 0;
+	kcp->nsnd_que = 0;
+	kcp->state = 0;
+	kcp->acklist = NULL;
+	kcp->ackblock = 0;
+	kcp->ackcount = 0;
+	kcp->rx_srtt = 0;
+	kcp->rx_rttval = 0;
+	kcp->rx_rto = IKCP_RTO_DEF;
+	kcp->rx_minrto = IKCP_RTO_MIN;
+	kcp->current = 0;
+	kcp->interval = IKCP_INTERVAL;
+	kcp->ts_flush = IKCP_INTERVAL;
+	kcp->nodelay = 0;
+	kcp->updated = 0;
+	kcp->logmask = 0;
+	kcp->ssthresh = IKCP_THRESH_INIT;
+	kcp->fastresend = 0;
+	kcp->fastlimit = IKCP_FASTACK_LIMIT;
+	kcp->nocwnd = 0;
+	kcp->xmit = 0;
+	kcp->dead_link = IKCP_DEADLINK;
+	kcp->output = NULL;
+	kcp->writelog = NULL;
+
+	return kcp;
+}
+
+
+//---------------------------------------------------------------------
+// release a new kcpcb
+//---------------------------------------------------------------------
+void ikcp_release(ikcpcb *kcp)
+{
+	assert(kcp);
+	if (kcp) {
+		IKCPSEG *seg;
+		while (!iqueue_is_empty(&kcp->snd_buf)) {
+			seg = iqueue_entry(kcp->snd_buf.next, IKCPSEG, node);
+			iqueue_del(&seg->node);
+			ikcp_segment_delete(kcp, seg);
+		}
+		while (!iqueue_is_empty(&kcp->rcv_buf)) {
+			seg = iqueue_entry(kcp->rcv_buf.next, IKCPSEG, node);
+			iqueue_del(&seg->node);
+			ikcp_segment_delete(kcp, seg);
+		}
+		while (!iqueue_is_empty(&kcp->snd_queue)) {
+			seg = iqueue_entry(kcp->snd_queue.next, IKCPSEG, node);
+			iqueue_del(&seg->node);
+			ikcp_segment_delete(kcp, seg);
+		}
+		while (!iqueue_is_empty(&kcp->rcv_queue)) {
+			seg = iqueue_entry(kcp->rcv_queue.next, IKCPSEG, node);
+			iqueue_del(&seg->node);
+			ikcp_segment_delete(kcp, seg);
+		}
+		if (kcp->buffer) {
+			ikcp_free(kcp->buffer);
+		}
+		if (kcp->acklist) {
+			ikcp_free(kcp->acklist);
+		}
+
+		kcp->nrcv_buf = 0;
+		kcp->nsnd_buf = 0;
+		kcp->nrcv_que = 0;
+		kcp->nsnd_que = 0;
+		kcp->ackcount = 0;
+		kcp->buffer = NULL;
+		kcp->acklist = NULL;
+		ikcp_free(kcp);
+	}
+}
+
+
+//---------------------------------------------------------------------
+// set output callback, which will be invoked by kcp
+//---------------------------------------------------------------------
+void ikcp_setoutput(ikcpcb *kcp, int (*output)(const char *buf, int len,
+	ikcpcb *kcp, void *user))
+{
+	kcp->output = output;
+}
+
+
+//---------------------------------------------------------------------
+// user/upper level recv: returns size, returns below zero for EAGAIN
+//---------------------------------------------------------------------
+int ikcp_recv(ikcpcb *kcp, char *buffer, int len)
+{
+	struct IQUEUEHEAD *p;
+	int ispeek = (len < 0)? 1 : 0;
+	int peeksize;
+	int recover = 0;
+	IKCPSEG *seg;
+	assert(kcp);
+
+	if (iqueue_is_empty(&kcp->rcv_queue))
+		return -1;
+
+	if (len < 0) len = -len;
+
+	peeksize = ikcp_peeksize(kcp);
+
+	if (peeksize < 0) 
+		return -2;
+
+	if (peeksize > len) 
+		return -3;
+
+	if (kcp->nrcv_que >= kcp->rcv_wnd)
+		recover = 1;
+
+	// merge fragment
+	for (len = 0, p = kcp->rcv_queue.next; p != &kcp->rcv_queue; ) {
+		int fragment;
+		seg = iqueue_entry(p, IKCPSEG, node);
+		p = p->next;
+
+		if (buffer) {
+			memcpy(buffer, seg->data, seg->len);
+			buffer += seg->len;
+		}
+
+		len += seg->len;
+		fragment = seg->frg;
+
+		if (ikcp_canlog(kcp, IKCP_LOG_RECV)) {
+			ikcp_log(kcp, IKCP_LOG_RECV, "recv sn=%lu", (unsigned long)seg->sn);
+		}
+
+		if (ispeek == 0) {
+			iqueue_del(&seg->node);
+			ikcp_segment_delete(kcp, seg);
+			kcp->nrcv_que--;
+		}
+
+		if (fragment == 0) 
+			break;
+	}
+
+	assert(len == peeksize);
+
+	// move available data from rcv_buf -> rcv_queue
+	while (! iqueue_is_empty(&kcp->rcv_buf)) {
+		seg = iqueue_entry(kcp->rcv_buf.next, IKCPSEG, node);
+		if (seg->sn == kcp->rcv_nxt && kcp->nrcv_que < kcp->rcv_wnd) {
+			iqueue_del(&seg->node);
+			kcp->nrcv_buf--;
+			iqueue_add_tail(&seg->node, &kcp->rcv_queue);
+			kcp->nrcv_que++;
+			kcp->rcv_nxt++;
+		}	else {
+			break;
+		}
+	}
+
+	// fast recover
+	if (kcp->nrcv_que < kcp->rcv_wnd && recover) {
+		// ready to send back IKCP_CMD_WINS in ikcp_flush
+		// tell remote my window size
+		kcp->probe |= IKCP_ASK_TELL;
+	}
+
+	return len;
+}
+
+
+//---------------------------------------------------------------------
+// peek data size
+//---------------------------------------------------------------------
+int ikcp_peeksize(const ikcpcb *kcp)
+{
+	struct IQUEUEHEAD *p;
+	IKCPSEG *seg;
+	int length = 0;
+
+	assert(kcp);
+
+	if (iqueue_is_empty(&kcp->rcv_queue)) return -1;
+
+	seg = iqueue_entry(kcp->rcv_queue.next, IKCPSEG, node);
+	if (seg->frg == 0) return seg->len;
+
+	if (kcp->nrcv_que < seg->frg + 1) return -1;
+
+	for (p = kcp->rcv_queue.next; p != &kcp->rcv_queue; p = p->next) {
+		seg = iqueue_entry(p, IKCPSEG, node);
+		length += seg->len;
+		if (seg->frg == 0) break;
+	}
+
+	return length;
+}
+
+
+//---------------------------------------------------------------------
+// user/upper level send, returns below zero for error
+//---------------------------------------------------------------------
+int ikcp_send(ikcpcb *kcp, const char *buffer, int len)
+{
+	IKCPSEG *seg;
+	int count, i;
+
+	assert(kcp->mss > 0);
+	if (len < 0) return -1;
+
+	// append to previous segment in streaming mode (if possible)
+	if (kcp->stream != 0) {
+		if (!iqueue_is_empty(&kcp->snd_queue)) {
+			IKCPSEG *old = iqueue_entry(kcp->snd_queue.prev, IKCPSEG, node);
+			if (old->len < kcp->mss) {
+				int capacity = kcp->mss - old->len;
+				int extend = (len < capacity)? len : capacity;
+				seg = ikcp_segment_new(kcp, old->len + extend);
+				assert(seg);
+				if (seg == NULL) {
+					return -2;
+				}
+				iqueue_add_tail(&seg->node, &kcp->snd_queue);
+				memcpy(seg->data, old->data, old->len);
+				if (buffer) {
+					memcpy(seg->data + old->len, buffer, extend);
+					buffer += extend;
+				}
+				seg->len = old->len + extend;
+				seg->frg = 0;
+				len -= extend;
+				iqueue_del_init(&old->node);
+				ikcp_segment_delete(kcp, old);
+			}
+		}
+		if (len <= 0) {
+			return 0;
+		}
+	}
+
+	if (len <= (int)kcp->mss) count = 1;
+	else count = (len + kcp->mss - 1) / kcp->mss;
+
+	if (count >= (int)IKCP_WND_RCV) return -2;
+
+	if (count == 0) count = 1;
+
+	// fragment
+	for (i = 0; i < count; i++) {
+		int size = len > (int)kcp->mss ? (int)kcp->mss : len;
+		seg = ikcp_segment_new(kcp, size);
+		assert(seg);
+		if (seg == NULL) {
+			return -2;
+		}
+		if (buffer && len > 0) {
+			memcpy(seg->data, buffer, size);
+		}
+		seg->len = size;
+		seg->frg = (kcp->stream == 0)? (count - i - 1) : 0;
+		iqueue_init(&seg->node);
+		iqueue_add_tail(&seg->node, &kcp->snd_queue);
+		kcp->nsnd_que++;
+		if (buffer) {
+			buffer += size;
+		}
+		len -= size;
+	}
+
+	return 0;
+}
+
+
+//---------------------------------------------------------------------
+// parse ack
+//---------------------------------------------------------------------
+static void ikcp_update_ack(ikcpcb *kcp, IINT32 rtt)
+{
+	IINT32 rto = 0;
+	if (kcp->rx_srtt == 0) {
+		kcp->rx_srtt = rtt;
+		kcp->rx_rttval = rtt / 2;
+	}	else {
+		long delta = rtt - kcp->rx_srtt;
+		if (delta < 0) delta = -delta;
+		kcp->rx_rttval = (3 * kcp->rx_rttval + delta) / 4;
+		kcp->rx_srtt = (7 * kcp->rx_srtt + rtt) / 8;
+		if (kcp->rx_srtt < 1) kcp->rx_srtt = 1;
+	}
+	rto = kcp->rx_srtt + _imax_(kcp->interval, 4 * kcp->rx_rttval);
+	kcp->rx_rto = _ibound_(kcp->rx_minrto, rto, IKCP_RTO_MAX);
+}
+
+static void ikcp_shrink_buf(ikcpcb *kcp)
+{
+	struct IQUEUEHEAD *p = kcp->snd_buf.next;
+	if (p != &kcp->snd_buf) {
+		IKCPSEG *seg = iqueue_entry(p, IKCPSEG, node);
+		kcp->snd_una = seg->sn;
+	}	else {
+		kcp->snd_una = kcp->snd_nxt;
+	}
+}
+
+static void ikcp_parse_ack(ikcpcb *kcp, IUINT32 sn)
+{
+	struct IQUEUEHEAD *p, *next;
+
+	if (_itimediff(sn, kcp->snd_una) < 0 || _itimediff(sn, kcp->snd_nxt) >= 0)
+		return;
+
+	for (p = kcp->snd_buf.next; p != &kcp->snd_buf; p = next) {
+		IKCPSEG *seg = iqueue_entry(p, IKCPSEG, node);
+		next = p->next;
+		if (sn == seg->sn) {
+			iqueue_del(p);
+			ikcp_segment_delete(kcp, seg);
+			kcp->nsnd_buf--;
+			break;
+		}
+		if (_itimediff(sn, seg->sn) < 0) {
+			break;
+		}
+	}
+}
+
+static void ikcp_parse_una(ikcpcb *kcp, IUINT32 una)
+{
+	struct IQUEUEHEAD *p, *next;
+	for (p = kcp->snd_buf.next; p != &kcp->snd_buf; p = next) {
+		IKCPSEG *seg = iqueue_entry(p, IKCPSEG, node);
+		next = p->next;
+		if (_itimediff(una, seg->sn) > 0) {
+			iqueue_del(p);
+			ikcp_segment_delete(kcp, seg);
+			kcp->nsnd_buf--;
+		}	else {
+			break;
+		}
+	}
+}
+
+static void ikcp_parse_fastack(ikcpcb *kcp, IUINT32 sn, IUINT32 ts)
+{
+	struct IQUEUEHEAD *p, *next;
+
+	if (_itimediff(sn, kcp->snd_una) < 0 || _itimediff(sn, kcp->snd_nxt) >= 0)
+		return;
+
+	for (p = kcp->snd_buf.next; p != &kcp->snd_buf; p = next) {
+		IKCPSEG *seg = iqueue_entry(p, IKCPSEG, node);
+		next = p->next;
+		if (_itimediff(sn, seg->sn) < 0) {
+			break;
+		}
+		else if (sn != seg->sn) {
+		#ifndef IKCP_FASTACK_CONSERVE
+			seg->fastack++;
+		#else
+			if (_itimediff(ts, seg->ts) >= 0)
+				seg->fastack++;
+		#endif
+		}
+	}
+}
+
+
+//---------------------------------------------------------------------
+// ack append
+//---------------------------------------------------------------------
+static void ikcp_ack_push(ikcpcb *kcp, IUINT32 sn, IUINT32 ts)
+{
+	IUINT32 newsize = kcp->ackcount + 1;
+	IUINT32 *ptr;
+
+	if (newsize > kcp->ackblock) {
+		IUINT32 *acklist;
+		IUINT32 newblock;
+
+		for (newblock = 8; newblock < newsize; newblock <<= 1);
+		acklist = (IUINT32*)ikcp_malloc(newblock * sizeof(IUINT32) * 2);
+
+		if (acklist == NULL) {
+			assert(acklist != NULL);
+			abort();
+		}
+
+		if (kcp->acklist != NULL) {
+			IUINT32 x;
+			for (x = 0; x < kcp->ackcount; x++) {
+				acklist[x * 2 + 0] = kcp->acklist[x * 2 + 0];
+				acklist[x * 2 + 1] = kcp->acklist[x * 2 + 1];
+			}
+			ikcp_free(kcp->acklist);
+		}
+
+		kcp->acklist = acklist;
+		kcp->ackblock = newblock;
+	}
+
+	ptr = &kcp->acklist[kcp->ackcount * 2];
+	ptr[0] = sn;
+	ptr[1] = ts;
+	kcp->ackcount++;
+}
+
+static void ikcp_ack_get(const ikcpcb *kcp, int p, IUINT32 *sn, IUINT32 *ts)
+{
+	if (sn) sn[0] = kcp->acklist[p * 2 + 0];
+	if (ts) ts[0] = kcp->acklist[p * 2 + 1];
+}
+
+
+//---------------------------------------------------------------------
+// parse data
+//---------------------------------------------------------------------
+void ikcp_parse_data(ikcpcb *kcp, IKCPSEG *newseg)
+{
+	struct IQUEUEHEAD *p, *prev;
+	IUINT32 sn = newseg->sn;
+	int repeat = 0;
+	
+	if (_itimediff(sn, kcp->rcv_nxt + kcp->rcv_wnd) >= 0 ||
+		_itimediff(sn, kcp->rcv_nxt) < 0) {
+		ikcp_segment_delete(kcp, newseg);
+		return;
+	}
+
+	for (p = kcp->rcv_buf.prev; p != &kcp->rcv_buf; p = prev) {
+		IKCPSEG *seg = iqueue_entry(p, IKCPSEG, node);
+		prev = p->prev;
+		if (seg->sn == sn) {
+			repeat = 1;
+			break;
+		}
+		if (_itimediff(sn, seg->sn) > 0) {
+			break;
+		}
+	}
+
+	if (repeat == 0) {
+		iqueue_init(&newseg->node);
+		iqueue_add(&newseg->node, p);
+		kcp->nrcv_buf++;
+	}	else {
+		ikcp_segment_delete(kcp, newseg);
+	}
+
+#if 0
+	ikcp_qprint("rcvbuf", &kcp->rcv_buf);
+	printf("rcv_nxt=%lu\n", kcp->rcv_nxt);
+#endif
+
+	// move available data from rcv_buf -> rcv_queue
+	while (! iqueue_is_empty(&kcp->rcv_buf)) {
+		IKCPSEG *seg = iqueue_entry(kcp->rcv_buf.next, IKCPSEG, node);
+		if (seg->sn == kcp->rcv_nxt && kcp->nrcv_que < kcp->rcv_wnd) {
+			iqueue_del(&seg->node);
+			kcp->nrcv_buf--;
+			iqueue_add_tail(&seg->node, &kcp->rcv_queue);
+			kcp->nrcv_que++;
+			kcp->rcv_nxt++;
+		}	else {
+			break;
+		}
+	}
+
+#if 0
+	ikcp_qprint("queue", &kcp->rcv_queue);
+	printf("rcv_nxt=%lu\n", kcp->rcv_nxt);
+#endif
+
+#if 1
+//	printf("snd(buf=%d, queue=%d)\n", kcp->nsnd_buf, kcp->nsnd_que);
+//	printf("rcv(buf=%d, queue=%d)\n", kcp->nrcv_buf, kcp->nrcv_que);
+#endif
+}
+
+
+//---------------------------------------------------------------------
+// input data
+//---------------------------------------------------------------------
+int ikcp_input(ikcpcb *kcp, const char *data, long size)
+{
+	IUINT32 prev_una = kcp->snd_una;
+	IUINT32 maxack = 0, latest_ts = 0;
+	int flag = 0;
+
+	if (ikcp_canlog(kcp, IKCP_LOG_INPUT)) {
+		ikcp_log(kcp, IKCP_LOG_INPUT, "[RI] %d bytes", (int)size);
+	}
+
+	if (data == NULL || (int)size < (int)IKCP_OVERHEAD) return -1;
+
+	while (1) {
+		IUINT32 ts, sn, len, una, conv;
+		IUINT16 wnd;
+		IUINT8 cmd, frg;
+		IKCPSEG *seg;
+
+		if (size < (int)IKCP_OVERHEAD) break;
+
+		data = ikcp_decode32u(data, &conv);
+		if (conv != kcp->conv) return -1;
+
+		data = ikcp_decode8u(data, &cmd);
+		data = ikcp_decode8u(data, &frg);
+		data = ikcp_decode16u(data, &wnd);
+		data = ikcp_decode32u(data, &ts);
+		data = ikcp_decode32u(data, &sn);
+		data = ikcp_decode32u(data, &una);
+		data = ikcp_decode32u(data, &len);
+
+		size -= IKCP_OVERHEAD;
+
+		if ((long)size < (long)len || (int)len < 0) return -2;
+
+		if (cmd != IKCP_CMD_PUSH && cmd != IKCP_CMD_ACK &&
+			cmd != IKCP_CMD_WASK && cmd != IKCP_CMD_WINS) 
+			return -3;
+
+		kcp->rmt_wnd = wnd;
+		ikcp_parse_una(kcp, una);
+		ikcp_shrink_buf(kcp);
+
+		if (cmd == IKCP_CMD_ACK) {
+			if (_itimediff(kcp->current, ts) >= 0) {
+				ikcp_update_ack(kcp, _itimediff(kcp->current, ts));
+			}
+			ikcp_parse_ack(kcp, sn);
+			ikcp_shrink_buf(kcp);
+			if (flag == 0) {
+				flag = 1;
+				maxack = sn;
+				latest_ts = ts;
+			}	else {
+				if (_itimediff(sn, maxack) > 0) {
+				#ifndef IKCP_FASTACK_CONSERVE
+					maxack = sn;
+					latest_ts = ts;
+				#else
+					if (_itimediff(ts, latest_ts) > 0) {
+						maxack = sn;
+						latest_ts = ts;
+					}
+				#endif
+				}
+			}
+			if (ikcp_canlog(kcp, IKCP_LOG_IN_ACK)) {
+				ikcp_log(kcp, IKCP_LOG_IN_ACK, 
+					"input ack: sn=%lu rtt=%ld rto=%ld", (unsigned long)sn, 
+					(long)_itimediff(kcp->current, ts),
+					(long)kcp->rx_rto);
+			}
+		}
+		else if (cmd == IKCP_CMD_PUSH) {
+			if (ikcp_canlog(kcp, IKCP_LOG_IN_DATA)) {
+				ikcp_log(kcp, IKCP_LOG_IN_DATA, 
+					"input psh: sn=%lu ts=%lu", (unsigned long)sn, (unsigned long)ts);
+			}
+			if (_itimediff(sn, kcp->rcv_nxt + kcp->rcv_wnd) < 0) {
+				ikcp_ack_push(kcp, sn, ts);
+				if (_itimediff(sn, kcp->rcv_nxt) >= 0) {
+					seg = ikcp_segment_new(kcp, len);
+					seg->conv = conv;
+					seg->cmd = cmd;
+					seg->frg = frg;
+					seg->wnd = wnd;
+					seg->ts = ts;
+					seg->sn = sn;
+					seg->una = una;
+					seg->len = len;
+
+					if (len > 0) {
+						memcpy(seg->data, data, len);
+					}
+
+					ikcp_parse_data(kcp, seg);
+				}
+			}
+		}
+		else if (cmd == IKCP_CMD_WASK) {
+			// ready to send back IKCP_CMD_WINS in ikcp_flush
+			// tell remote my window size
+			kcp->probe |= IKCP_ASK_TELL;
+			if (ikcp_canlog(kcp, IKCP_LOG_IN_PROBE)) {
+				ikcp_log(kcp, IKCP_LOG_IN_PROBE, "input probe");
+			}
+		}
+		else if (cmd == IKCP_CMD_WINS) {
+			// do nothing
+			if (ikcp_canlog(kcp, IKCP_LOG_IN_WINS)) {
+				ikcp_log(kcp, IKCP_LOG_IN_WINS,
+					"input wins: %lu", (unsigned long)(wnd));
+			}
+		}
+		else {
+			return -3;
+		}
+
+		data += len;
+		size -= len;
+	}
+
+	if (flag != 0) {
+		ikcp_parse_fastack(kcp, maxack, latest_ts);
+	}
+
+	if (_itimediff(kcp->snd_una, prev_una) > 0) {
+		if (kcp->cwnd < kcp->rmt_wnd) {
+			IUINT32 mss = kcp->mss;
+			if (kcp->cwnd < kcp->ssthresh) {
+				kcp->cwnd++;
+				kcp->incr += mss;
+			}	else {
+				if (kcp->incr < mss) kcp->incr = mss;
+				kcp->incr += (mss * mss) / kcp->incr + (mss / 16);
+				if ((kcp->cwnd + 1) * mss <= kcp->incr) {
+				#if 1
+					kcp->cwnd = (kcp->incr + mss - 1) / ((mss > 0)? mss : 1);
+				#else
+					kcp->cwnd++;
+				#endif
+				}
+			}
+			if (kcp->cwnd > kcp->rmt_wnd) {
+				kcp->cwnd = kcp->rmt_wnd;
+				kcp->incr = kcp->rmt_wnd * mss;
+			}
+		}
+	}
+
+	return 0;
+}
+
+
+//---------------------------------------------------------------------
+// ikcp_encode_seg
+//---------------------------------------------------------------------
+static char *ikcp_encode_seg(char *ptr, const IKCPSEG *seg)
+{
+	ptr = ikcp_encode32u(ptr, seg->conv);
+	ptr = ikcp_encode8u(ptr, (IUINT8)seg->cmd);
+	ptr = ikcp_encode8u(ptr, (IUINT8)seg->frg);
+	ptr = ikcp_encode16u(ptr, (IUINT16)seg->wnd);
+	ptr = ikcp_encode32u(ptr, seg->ts);
+	ptr = ikcp_encode32u(ptr, seg->sn);
+	ptr = ikcp_encode32u(ptr, seg->una);
+	ptr = ikcp_encode32u(ptr, seg->len);
+	return ptr;
+}
+
+static int ikcp_wnd_unused(const ikcpcb *kcp)
+{
+	if (kcp->nrcv_que < kcp->rcv_wnd) {
+		return kcp->rcv_wnd - kcp->nrcv_que;
+	}
+	return 0;
+}
+
+
+//---------------------------------------------------------------------
+// ikcp_flush
+//---------------------------------------------------------------------
+void ikcp_flush(ikcpcb *kcp)
+{
+	IUINT32 current = kcp->current;
+	char *buffer = kcp->buffer;
+	char *ptr = buffer;
+	int count, size, i;
+	IUINT32 resent, cwnd;
+	IUINT32 rtomin;
+	struct IQUEUEHEAD *p;
+	int change = 0;
+	int lost = 0;
+	IKCPSEG seg;
+
+	// 'ikcp_update' haven't been called. 
+	if (kcp->updated == 0) return;
+
+	seg.conv = kcp->conv;
+	seg.cmd = IKCP_CMD_ACK;
+	seg.frg = 0;
+	seg.wnd = ikcp_wnd_unused(kcp);
+	seg.una = kcp->rcv_nxt;
+	seg.len = 0;
+	seg.sn = 0;
+	seg.ts = 0;
+
+	// flush acknowledges
+	count = kcp->ackcount;
+	for (i = 0; i < count; i++) {
+		size = (int)(ptr - buffer);
+		if (size + (int)IKCP_OVERHEAD > (int)kcp->mtu) {
+			ikcp_output(kcp, buffer, size);
+			ptr = buffer;
+		}
+		ikcp_ack_get(kcp, i, &seg.sn, &seg.ts);
+		ptr = ikcp_encode_seg(ptr, &seg);
+	}
+
+	kcp->ackcount = 0;
+
+	// probe window size (if remote window size equals zero)
+	if (kcp->rmt_wnd == 0) {
+		if (kcp->probe_wait == 0) {
+			kcp->probe_wait = IKCP_PROBE_INIT;
+			kcp->ts_probe = kcp->current + kcp->probe_wait;
+		}	
+		else {
+			if (_itimediff(kcp->current, kcp->ts_probe) >= 0) {
+				if (kcp->probe_wait < IKCP_PROBE_INIT) 
+					kcp->probe_wait = IKCP_PROBE_INIT;
+				kcp->probe_wait += kcp->probe_wait / 2;
+				if (kcp->probe_wait > IKCP_PROBE_LIMIT)
+					kcp->probe_wait = IKCP_PROBE_LIMIT;
+				kcp->ts_probe = kcp->current + kcp->probe_wait;
+				kcp->probe |= IKCP_ASK_SEND;
+			}
+		}
+	}	else {
+		kcp->ts_probe = 0;
+		kcp->probe_wait = 0;
+	}
+
+	// flush window probing commands
+	if (kcp->probe & IKCP_ASK_SEND) {
+		seg.cmd = IKCP_CMD_WASK;
+		size = (int)(ptr - buffer);
+		if (size + (int)IKCP_OVERHEAD > (int)kcp->mtu) {
+			ikcp_output(kcp, buffer, size);
+			ptr = buffer;
+		}
+		ptr = ikcp_encode_seg(ptr, &seg);
+	}
+
+	// flush window probing commands
+	if (kcp->probe & IKCP_ASK_TELL) {
+		seg.cmd = IKCP_CMD_WINS;
+		size = (int)(ptr - buffer);
+		if (size + (int)IKCP_OVERHEAD > (int)kcp->mtu) {
+			ikcp_output(kcp, buffer, size);
+			ptr = buffer;
+		}
+		ptr = ikcp_encode_seg(ptr, &seg);
+	}
+
+	kcp->probe = 0;
+
+	// calculate window size
+	cwnd = _imin_(kcp->snd_wnd, kcp->rmt_wnd);
+	if (kcp->nocwnd == 0) cwnd = _imin_(kcp->cwnd, cwnd);
+
+	// move data from snd_queue to snd_buf
+	while (_itimediff(kcp->snd_nxt, kcp->snd_una + cwnd) < 0) {
+		IKCPSEG *newseg;
+		if (iqueue_is_empty(&kcp->snd_queue)) break;
+
+		newseg = iqueue_entry(kcp->snd_queue.next, IKCPSEG, node);
+
+		iqueue_del(&newseg->node);
+		iqueue_add_tail(&newseg->node, &kcp->snd_buf);
+		kcp->nsnd_que--;
+		kcp->nsnd_buf++;
+
+		newseg->conv = kcp->conv;
+		newseg->cmd = IKCP_CMD_PUSH;
+		newseg->wnd = seg.wnd;
+		newseg->ts = current;
+		newseg->sn = kcp->snd_nxt++;
+		newseg->una = kcp->rcv_nxt;
+		newseg->resendts = current;
+		newseg->rto = kcp->rx_rto;
+		newseg->fastack = 0;
+		newseg->xmit = 0;
+	}
+
+	// calculate resent
+	resent = (kcp->fastresend > 0)? (IUINT32)kcp->fastresend : 0xffffffff;
+	rtomin = (kcp->nodelay == 0)? (kcp->rx_rto >> 3) : 0;
+
+	// flush data segments
+	for (p = kcp->snd_buf.next; p != &kcp->snd_buf; p = p->next) {
+		IKCPSEG *segment = iqueue_entry(p, IKCPSEG, node);
+		int needsend = 0;
+		if (segment->xmit == 0) {
+			needsend = 1;
+			segment->xmit++;
+			segment->rto = kcp->rx_rto;
+			segment->resendts = current + segment->rto + rtomin;
+		}
+		else if (_itimediff(current, segment->resendts) >= 0) {
+			needsend = 1;
+			segment->xmit++;
+			kcp->xmit++;
+			if (kcp->nodelay == 0) {
+				segment->rto += _imax_(segment->rto, (IUINT32)kcp->rx_rto);
+			}	else {
+				IINT32 step = (kcp->nodelay < 2)? 
+					((IINT32)(segment->rto)) : kcp->rx_rto;
+				segment->rto += step / 2;
+			}
+			segment->resendts = current + segment->rto;
+			lost = 1;
+		}
+		else if (segment->fastack >= resent) {
+			if ((int)segment->xmit <= kcp->fastlimit || 
+				kcp->fastlimit <= 0) {
+				needsend = 1;
+				segment->xmit++;
+				segment->fastack = 0;
+				segment->resendts = current + segment->rto;
+				change++;
+			}
+		}
+
+		if (needsend) {
+			int need;
+			segment->ts = current;
+			segment->wnd = seg.wnd;
+			segment->una = kcp->rcv_nxt;
+
+			size = (int)(ptr - buffer);
+			need = IKCP_OVERHEAD + segment->len;
+
+			if (size + need > (int)kcp->mtu) {
+				ikcp_output(kcp, buffer, size);
+				ptr = buffer;
+			}
+
+			ptr = ikcp_encode_seg(ptr, segment);
+
+			if (segment->len > 0) {
+				memcpy(ptr, segment->data, segment->len);
+				ptr += segment->len;
+			}
+
+			if (segment->xmit >= kcp->dead_link) {
+				kcp->state = (IUINT32)-1;
+			}
+		}
+	}
+
+	// flash remain segments
+	size = (int)(ptr - buffer);
+	if (size > 0) {
+		ikcp_output(kcp, buffer, size);
+	}
+
+	// update ssthresh
+	if (change) {
+		IUINT32 inflight = kcp->snd_nxt - kcp->snd_una;
+		kcp->ssthresh = inflight / 2;
+		if (kcp->ssthresh < IKCP_THRESH_MIN)
+			kcp->ssthresh = IKCP_THRESH_MIN;
+		kcp->cwnd = kcp->ssthresh + resent;
+		kcp->incr = kcp->cwnd * kcp->mss;
+	}
+
+	if (lost) {
+		kcp->ssthresh = cwnd / 2;
+		if (kcp->ssthresh < IKCP_THRESH_MIN)
+			kcp->ssthresh = IKCP_THRESH_MIN;
+		kcp->cwnd = 1;
+		kcp->incr = kcp->mss;
+	}
+
+	if (kcp->cwnd < 1) {
+		kcp->cwnd = 1;
+		kcp->incr = kcp->mss;
+	}
+}
+
+
+//---------------------------------------------------------------------
+// update state (call it repeatedly, every 10ms-100ms), or you can ask 
+// ikcp_check when to call it again (without ikcp_input/_send calling).
+// 'current' - current timestamp in millisec. 
+//---------------------------------------------------------------------
+void ikcp_update(ikcpcb *kcp, IUINT32 current)
+{
+	IINT32 slap;
+
+	kcp->current = current;
+
+	if (kcp->updated == 0) {
+		kcp->updated = 1;
+		kcp->ts_flush = kcp->current;
+	}
+
+	slap = _itimediff(kcp->current, kcp->ts_flush);
+
+	if (slap >= 10000 || slap < -10000) {
+		kcp->ts_flush = kcp->current;
+		slap = 0;
+	}
+
+	if (slap >= 0) {
+		kcp->ts_flush += kcp->interval;
+		if (_itimediff(kcp->current, kcp->ts_flush) >= 0) {
+			kcp->ts_flush = kcp->current + kcp->interval;
+		}
+		ikcp_flush(kcp);
+	}
+}
+
+
+//---------------------------------------------------------------------
+// Determine when should you invoke ikcp_update:
+// returns when you should invoke ikcp_update in millisec, if there 
+// is no ikcp_input/_send calling. you can call ikcp_update in that
+// time, instead of call update repeatly.
+// Important to reduce unnacessary ikcp_update invoking. use it to 
+// schedule ikcp_update (eg. implementing an epoll-like mechanism, 
+// or optimize ikcp_update when handling massive kcp connections)
+//---------------------------------------------------------------------
+IUINT32 ikcp_check(const ikcpcb *kcp, IUINT32 current)
+{
+	IUINT32 ts_flush = kcp->ts_flush;
+	IINT32 tm_flush = 0x7fffffff;
+	IINT32 tm_packet = 0x7fffffff;
+	IUINT32 minimal = 0;
+	struct IQUEUEHEAD *p;
+
+	if (kcp->updated == 0) {
+		return current;
+	}
+
+	if (_itimediff(current, ts_flush) >= 10000 ||
+		_itimediff(current, ts_flush) < -10000) {
+		ts_flush = current;
+	}
+
+	if (_itimediff(current, ts_flush) >= 0) {
+		return current;
+	}
+
+	tm_flush = _itimediff(ts_flush, current);
+
+	for (p = kcp->snd_buf.next; p != &kcp->snd_buf; p = p->next) {
+		const IKCPSEG *seg = iqueue_entry(p, const IKCPSEG, node);
+		IINT32 diff = _itimediff(seg->resendts, current);
+		if (diff <= 0) {
+			return current;
+		}
+		if (diff < tm_packet) tm_packet = diff;
+	}
+
+	minimal = (IUINT32)(tm_packet < tm_flush ? tm_packet : tm_flush);
+	if (minimal >= kcp->interval) minimal = kcp->interval;
+
+	return current + minimal;
+}
+
+
+
+int ikcp_setmtu(ikcpcb *kcp, int mtu)
+{
+	char *buffer;
+	if (mtu < 50 || mtu < (int)IKCP_OVERHEAD) 
+		return -1;
+	buffer = (char*)ikcp_malloc((mtu + IKCP_OVERHEAD) * 3);
+	if (buffer == NULL) 
+		return -2;
+	kcp->mtu = mtu;
+	kcp->mss = kcp->mtu - IKCP_OVERHEAD;
+	ikcp_free(kcp->buffer);
+	kcp->buffer = buffer;
+	return 0;
+}
+
+int ikcp_interval(ikcpcb *kcp, int interval)
+{
+	if (interval > 5000) interval = 5000;
+	else if (interval < 10) interval = 10;
+	kcp->interval = interval;
+	return 0;
+}
+
+int ikcp_nodelay(ikcpcb *kcp, int nodelay, int interval, int resend, int nc)
+{
+	if (nodelay >= 0) {
+		kcp->nodelay = nodelay;
+		if (nodelay) {
+			kcp->rx_minrto = IKCP_RTO_NDL;	
+		}	
+		else {
+			kcp->rx_minrto = IKCP_RTO_MIN;
+		}
+	}
+	if (interval >= 0) {
+		if (interval > 5000) interval = 5000;
+		else if (interval < 10) interval = 10;
+		kcp->interval = interval;
+	}
+	if (resend >= 0) {
+		kcp->fastresend = resend;
+	}
+	if (nc >= 0) {
+		kcp->nocwnd = nc;
+	}
+	return 0;
+}
+
+
+int ikcp_wndsize(ikcpcb *kcp, int sndwnd, int rcvwnd)
+{
+	if (kcp) {
+		if (sndwnd > 0) {
+			kcp->snd_wnd = sndwnd;
+		}
+		if (rcvwnd > 0) {   // must >= max fragment size
+			kcp->rcv_wnd = _imax_(rcvwnd, IKCP_WND_RCV);
+		}
+	}
+	return 0;
+}
+
+int ikcp_waitsnd(const ikcpcb *kcp)
+{
+	return kcp->nsnd_buf + kcp->nsnd_que;
+}
+
+
+// read conv
+IUINT32 ikcp_getconv(const void *ptr)
+{
+	IUINT32 conv;
+	ikcp_decode32u((const char*)ptr, &conv);
+	return conv;
+}
+
+

+ 416 - 0
event/kcp/ikcp.h

@@ -0,0 +1,416 @@
+//=====================================================================
+//
+// KCP - A Better ARQ Protocol Implementation
+// skywind3000 (at) gmail.com, 2010-2011
+//  
+// Features:
+// + Average RTT reduce 30% - 40% vs traditional ARQ like tcp.
+// + Maximum RTT reduce three times vs tcp.
+// + Lightweight, distributed as a single source file.
+//
+//=====================================================================
+#ifndef __IKCP_H__
+#define __IKCP_H__
+
+#include <stddef.h>
+#include <stdlib.h>
+#include <assert.h>
+
+
+//=====================================================================
+// 32BIT INTEGER DEFINITION 
+//=====================================================================
+#ifndef __INTEGER_32_BITS__
+#define __INTEGER_32_BITS__
+#if defined(_WIN64) || defined(WIN64) || defined(__amd64__) || \
+	defined(__x86_64) || defined(__x86_64__) || defined(_M_IA64) || \
+	defined(_M_AMD64)
+	typedef unsigned int ISTDUINT32;
+	typedef int ISTDINT32;
+#elif defined(_WIN32) || defined(WIN32) || defined(__i386__) || \
+	defined(__i386) || defined(_M_X86)
+	typedef unsigned long ISTDUINT32;
+	typedef long ISTDINT32;
+#elif defined(__MACOS__)
+	typedef UInt32 ISTDUINT32;
+	typedef SInt32 ISTDINT32;
+#elif defined(__APPLE__) && defined(__MACH__)
+	#include <sys/types.h>
+	typedef u_int32_t ISTDUINT32;
+	typedef int32_t ISTDINT32;
+#elif defined(__BEOS__)
+	#include <sys/inttypes.h>
+	typedef u_int32_t ISTDUINT32;
+	typedef int32_t ISTDINT32;
+#elif (defined(_MSC_VER) || defined(__BORLANDC__)) && (!defined(__MSDOS__))
+	typedef unsigned __int32 ISTDUINT32;
+	typedef __int32 ISTDINT32;
+#elif defined(__GNUC__)
+	#include <stdint.h>
+	typedef uint32_t ISTDUINT32;
+	typedef int32_t ISTDINT32;
+#else 
+	typedef unsigned long ISTDUINT32; 
+	typedef long ISTDINT32;
+#endif
+#endif
+
+
+//=====================================================================
+// Integer Definition
+//=====================================================================
+#ifndef __IINT8_DEFINED
+#define __IINT8_DEFINED
+typedef char IINT8;
+#endif
+
+#ifndef __IUINT8_DEFINED
+#define __IUINT8_DEFINED
+typedef unsigned char IUINT8;
+#endif
+
+#ifndef __IUINT16_DEFINED
+#define __IUINT16_DEFINED
+typedef unsigned short IUINT16;
+#endif
+
+#ifndef __IINT16_DEFINED
+#define __IINT16_DEFINED
+typedef short IINT16;
+#endif
+
+#ifndef __IINT32_DEFINED
+#define __IINT32_DEFINED
+typedef ISTDINT32 IINT32;
+#endif
+
+#ifndef __IUINT32_DEFINED
+#define __IUINT32_DEFINED
+typedef ISTDUINT32 IUINT32;
+#endif
+
+#ifndef __IINT64_DEFINED
+#define __IINT64_DEFINED
+#if defined(_MSC_VER) || defined(__BORLANDC__)
+typedef __int64 IINT64;
+#else
+typedef long long IINT64;
+#endif
+#endif
+
+#ifndef __IUINT64_DEFINED
+#define __IUINT64_DEFINED
+#if defined(_MSC_VER) || defined(__BORLANDC__)
+typedef unsigned __int64 IUINT64;
+#else
+typedef unsigned long long IUINT64;
+#endif
+#endif
+
+#ifndef INLINE
+#if defined(__GNUC__)
+
+#if (__GNUC__ > 3) || ((__GNUC__ == 3) && (__GNUC_MINOR__ >= 1))
+#define INLINE         __inline__ __attribute__((always_inline))
+#else
+#define INLINE         __inline__
+#endif
+
+#elif (defined(_MSC_VER) || defined(__BORLANDC__) || defined(__WATCOMC__))
+#define INLINE __inline
+#else
+#define INLINE 
+#endif
+#endif
+
+#if (!defined(__cplusplus)) && (!defined(inline))
+#define inline INLINE
+#endif
+
+
+//=====================================================================
+// QUEUE DEFINITION                                                  
+//=====================================================================
+#ifndef __IQUEUE_DEF__
+#define __IQUEUE_DEF__
+
+struct IQUEUEHEAD {
+	struct IQUEUEHEAD *next, *prev;
+};
+
+typedef struct IQUEUEHEAD iqueue_head;
+
+
+//---------------------------------------------------------------------
+// queue init                                                         
+//---------------------------------------------------------------------
+#define IQUEUE_HEAD_INIT(name) { &(name), &(name) }
+#define IQUEUE_HEAD(name) \
+	struct IQUEUEHEAD name = IQUEUE_HEAD_INIT(name)
+
+#define IQUEUE_INIT(ptr) ( \
+	(ptr)->next = (ptr), (ptr)->prev = (ptr))
+
+#define IOFFSETOF(TYPE, MEMBER) ((size_t) &((TYPE *)0)->MEMBER)
+
+#define ICONTAINEROF(ptr, type, member) ( \
+		(type*)( ((char*)((type*)ptr)) - IOFFSETOF(type, member)) )
+
+#define IQUEUE_ENTRY(ptr, type, member) ICONTAINEROF(ptr, type, member)
+
+
+//---------------------------------------------------------------------
+// queue operation                     
+//---------------------------------------------------------------------
+#define IQUEUE_ADD(node, head) ( \
+	(node)->prev = (head), (node)->next = (head)->next, \
+	(head)->next->prev = (node), (head)->next = (node))
+
+#define IQUEUE_ADD_TAIL(node, head) ( \
+	(node)->prev = (head)->prev, (node)->next = (head), \
+	(head)->prev->next = (node), (head)->prev = (node))
+
+#define IQUEUE_DEL_BETWEEN(p, n) ((n)->prev = (p), (p)->next = (n))
+
+#define IQUEUE_DEL(entry) (\
+	(entry)->next->prev = (entry)->prev, \
+	(entry)->prev->next = (entry)->next, \
+	(entry)->next = 0, (entry)->prev = 0)
+
+#define IQUEUE_DEL_INIT(entry) do { \
+	IQUEUE_DEL(entry); IQUEUE_INIT(entry); } while (0)
+
+#define IQUEUE_IS_EMPTY(entry) ((entry) == (entry)->next)
+
+#define iqueue_init		IQUEUE_INIT
+#define iqueue_entry	IQUEUE_ENTRY
+#define iqueue_add		IQUEUE_ADD
+#define iqueue_add_tail	IQUEUE_ADD_TAIL
+#define iqueue_del		IQUEUE_DEL
+#define iqueue_del_init	IQUEUE_DEL_INIT
+#define iqueue_is_empty IQUEUE_IS_EMPTY
+
+#define IQUEUE_FOREACH(iterator, head, TYPE, MEMBER) \
+	for ((iterator) = iqueue_entry((head)->next, TYPE, MEMBER); \
+		&((iterator)->MEMBER) != (head); \
+		(iterator) = iqueue_entry((iterator)->MEMBER.next, TYPE, MEMBER))
+
+#define iqueue_foreach(iterator, head, TYPE, MEMBER) \
+	IQUEUE_FOREACH(iterator, head, TYPE, MEMBER)
+
+#define iqueue_foreach_entry(pos, head) \
+	for( (pos) = (head)->next; (pos) != (head) ; (pos) = (pos)->next )
+	
+
+#define __iqueue_splice(list, head) do {	\
+		iqueue_head *first = (list)->next, *last = (list)->prev; \
+		iqueue_head *at = (head)->next; \
+		(first)->prev = (head), (head)->next = (first);		\
+		(last)->next = (at), (at)->prev = (last); }	while (0)
+
+#define iqueue_splice(list, head) do { \
+	if (!iqueue_is_empty(list)) __iqueue_splice(list, head); } while (0)
+
+#define iqueue_splice_init(list, head) do {	\
+	iqueue_splice(list, head);	iqueue_init(list); } while (0)
+
+
+#ifdef _MSC_VER
+#pragma warning(disable:4311)
+#pragma warning(disable:4312)
+#pragma warning(disable:4996)
+#endif
+
+#endif
+
+
+//---------------------------------------------------------------------
+// BYTE ORDER & ALIGNMENT
+//---------------------------------------------------------------------
+#ifndef IWORDS_BIG_ENDIAN
+    #ifdef _BIG_ENDIAN_
+        #if _BIG_ENDIAN_
+            #define IWORDS_BIG_ENDIAN 1
+        #endif
+    #endif
+    #ifndef IWORDS_BIG_ENDIAN
+        #if defined(__hppa__) || \
+            defined(__m68k__) || defined(mc68000) || defined(_M_M68K) || \
+            (defined(__MIPS__) && defined(__MIPSEB__)) || \
+            defined(__ppc__) || defined(__POWERPC__) || defined(_M_PPC) || \
+            defined(__sparc__) || defined(__powerpc__) || \
+            defined(__mc68000__) || defined(__s390x__) || defined(__s390__)
+            #define IWORDS_BIG_ENDIAN 1
+        #endif
+    #endif
+    #ifndef IWORDS_BIG_ENDIAN
+        #define IWORDS_BIG_ENDIAN  0
+    #endif
+#endif
+
+#ifndef IWORDS_MUST_ALIGN
+	#if defined(__i386__) || defined(__i386) || defined(_i386_)
+		#define IWORDS_MUST_ALIGN 0
+	#elif defined(_M_IX86) || defined(_X86_) || defined(__x86_64__)
+		#define IWORDS_MUST_ALIGN 0
+	#elif defined(__amd64) || defined(__amd64__)
+		#define IWORDS_MUST_ALIGN 0
+	#else
+		#define IWORDS_MUST_ALIGN 1
+	#endif
+#endif
+
+
+//=====================================================================
+// SEGMENT
+//=====================================================================
+struct IKCPSEG
+{
+	struct IQUEUEHEAD node;
+	IUINT32 conv;
+	IUINT32 cmd;
+	IUINT32 frg;
+	IUINT32 wnd;
+	IUINT32 ts;
+	IUINT32 sn;
+	IUINT32 una;
+	IUINT32 len;
+	IUINT32 resendts;
+	IUINT32 rto;
+	IUINT32 fastack;
+	IUINT32 xmit;
+	char data[1];
+};
+
+
+//---------------------------------------------------------------------
+// IKCPCB
+//---------------------------------------------------------------------
+struct IKCPCB
+{
+	IUINT32 conv, mtu, mss, state;
+	IUINT32 snd_una, snd_nxt, rcv_nxt;
+	IUINT32 ts_recent, ts_lastack, ssthresh;
+	IINT32 rx_rttval, rx_srtt, rx_rto, rx_minrto;
+	IUINT32 snd_wnd, rcv_wnd, rmt_wnd, cwnd, probe;
+	IUINT32 current, interval, ts_flush, xmit;
+	IUINT32 nrcv_buf, nsnd_buf;
+	IUINT32 nrcv_que, nsnd_que;
+	IUINT32 nodelay, updated;
+	IUINT32 ts_probe, probe_wait;
+	IUINT32 dead_link, incr;
+	struct IQUEUEHEAD snd_queue;
+	struct IQUEUEHEAD rcv_queue;
+	struct IQUEUEHEAD snd_buf;
+	struct IQUEUEHEAD rcv_buf;
+	IUINT32 *acklist;
+	IUINT32 ackcount;
+	IUINT32 ackblock;
+	void *user;
+	char *buffer;
+	int fastresend;
+	int fastlimit;
+	int nocwnd, stream;
+	int logmask;
+	int (*output)(const char *buf, int len, struct IKCPCB *kcp, void *user);
+	void (*writelog)(const char *log, struct IKCPCB *kcp, void *user);
+};
+
+
+typedef struct IKCPCB ikcpcb;
+
+#define IKCP_LOG_OUTPUT			1
+#define IKCP_LOG_INPUT			2
+#define IKCP_LOG_SEND			4
+#define IKCP_LOG_RECV			8
+#define IKCP_LOG_IN_DATA		16
+#define IKCP_LOG_IN_ACK			32
+#define IKCP_LOG_IN_PROBE		64
+#define IKCP_LOG_IN_WINS		128
+#define IKCP_LOG_OUT_DATA		256
+#define IKCP_LOG_OUT_ACK		512
+#define IKCP_LOG_OUT_PROBE		1024
+#define IKCP_LOG_OUT_WINS		2048
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+//---------------------------------------------------------------------
+// interface
+//---------------------------------------------------------------------
+
+// create a new kcp control object, 'conv' must equal in two endpoint
+// from the same connection. 'user' will be passed to the output callback
+// output callback can be setup like this: 'kcp->output = my_udp_output'
+ikcpcb* ikcp_create(IUINT32 conv, void *user);
+
+// release kcp control object
+void ikcp_release(ikcpcb *kcp);
+
+// set output callback, which will be invoked by kcp
+void ikcp_setoutput(ikcpcb *kcp, int (*output)(const char *buf, int len, 
+	ikcpcb *kcp, void *user));
+
+// user/upper level recv: returns size, returns below zero for EAGAIN
+int ikcp_recv(ikcpcb *kcp, char *buffer, int len);
+
+// user/upper level send, returns below zero for error
+int ikcp_send(ikcpcb *kcp, const char *buffer, int len);
+
+// update state (call it repeatedly, every 10ms-100ms), or you can ask 
+// ikcp_check when to call it again (without ikcp_input/_send calling).
+// 'current' - current timestamp in millisec. 
+void ikcp_update(ikcpcb *kcp, IUINT32 current);
+
+// Determine when should you invoke ikcp_update:
+// returns when you should invoke ikcp_update in millisec, if there 
+// is no ikcp_input/_send calling. you can call ikcp_update in that
+// time, instead of call update repeatly.
+// Important to reduce unnacessary ikcp_update invoking. use it to 
+// schedule ikcp_update (eg. implementing an epoll-like mechanism, 
+// or optimize ikcp_update when handling massive kcp connections)
+IUINT32 ikcp_check(const ikcpcb *kcp, IUINT32 current);
+
+// when you received a low level packet (eg. UDP packet), call it
+int ikcp_input(ikcpcb *kcp, const char *data, long size);
+
+// flush pending data
+void ikcp_flush(ikcpcb *kcp);
+
+// check the size of next message in the recv queue
+int ikcp_peeksize(const ikcpcb *kcp);
+
+// change MTU size, default is 1400
+int ikcp_setmtu(ikcpcb *kcp, int mtu);
+
+// set maximum window size: sndwnd=32, rcvwnd=32 by default
+int ikcp_wndsize(ikcpcb *kcp, int sndwnd, int rcvwnd);
+
+// get how many packet is waiting to be sent
+int ikcp_waitsnd(const ikcpcb *kcp);
+
+// fastest: ikcp_nodelay(kcp, 1, 20, 2, 1)
+// nodelay: 0:disable(default), 1:enable
+// interval: internal update timer interval in millisec, default is 100ms 
+// resend: 0:disable fast resend(default), 1:enable fast resend
+// nc: 0:normal congestion control(default), 1:disable congestion control
+int ikcp_nodelay(ikcpcb *kcp, int nodelay, int interval, int resend, int nc);
+
+
+void ikcp_log(ikcpcb *kcp, int mask, const char *fmt, ...);
+
+// setup allocator
+void ikcp_allocator(void* (*new_malloc)(size_t), void (*new_free)(void*));
+
+// read conv
+IUINT32 ikcp_getconv(const void *ptr);
+
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
+
+

+ 13 - 0
event/nio.c

@@ -55,6 +55,12 @@ static void __read_cb(hio_t* io, void* buf, int readbytes) {
             hio_read_stop(io);
         }
 
+#if WITH_KCP
+        if (io->io_type == HIO_TYPE_KCP) {
+            hio_read_kcp(io, buf, readbytes);
+            return;
+        }
+#endif
         hio_read_cb(io, buf, readbytes);
     }
 
@@ -236,6 +242,7 @@ static int __nio_read(hio_t* io, void* buf, int len) {
 #endif
         break;
     case HIO_TYPE_UDP:
+    case HIO_TYPE_KCP:
     case HIO_TYPE_IP:
     {
         socklen_t addrlen = sizeof(sockaddr_u);
@@ -264,6 +271,7 @@ static int __nio_write(hio_t* io, const void* buf, int len) {
 #endif
         break;
     case HIO_TYPE_UDP:
+    case HIO_TYPE_KCP:
     case HIO_TYPE_IP:
         nwrite = sendto(io->fd, buf, len, 0, io->peeraddr, SOCKADDR_LEN(io->peeraddr));
         break;
@@ -459,6 +467,11 @@ int hio_write (hio_t* io, const void* buf, size_t len) {
         hloge("hio_write called but fd[%d] already closed!", io->fd);
         return -1;
     }
+#if WITH_KCP
+    if (io->io_type == HIO_TYPE_KCP) {
+        return hio_write_kcp(io, buf, len);
+    }
+#endif
     int nwrite = 0, err = 0;
     hrecursive_mutex_lock(&io->write_mutex);
     if (write_queue_empty(&io->write_queue)) {

+ 151 - 0
event/rudp.c

@@ -0,0 +1,151 @@
+#include "rudp.h"
+
+#if WITH_RUDP
+
+#if WITH_KCP
+void kcp_release(kcp_t* kcp) {
+    if (kcp->ikcp == NULL) return;
+    if (kcp->update_timer) {
+        htimer_del(kcp->update_timer);
+        kcp->update_timer = NULL;
+    }
+    HV_FREE(kcp->readbuf.base);
+    kcp->readbuf.len = 0;
+    // printf("ikcp_release ikcp=%p\n", kcp->ikcp);
+    ikcp_release(kcp->ikcp);
+    kcp->ikcp = NULL;
+}
+#endif
+
+void rudp_entry_free(rudp_entry_t* entry) {
+#if WITH_KCP
+    kcp_release(&entry->kcp);
+#endif
+    HV_FREE(entry);
+}
+
+void rudp_init(rudp_t* rudp) {
+    // printf("rudp init\n");
+    rudp->rb_root.rb_node = NULL;
+    hmutex_init(&rudp->mutex);
+}
+
+void rudp_cleanup(rudp_t* rudp) {
+    // printf("rudp cleaup\n");
+    struct rb_node* n = NULL;
+    rudp_entry_t* e = NULL;
+    while ((n = rudp->rb_root.rb_node)) {
+        e = rb_entry(n, rudp_entry_t, rb_node);
+        rb_erase(n, &rudp->rb_root);
+        rudp_entry_free(e);
+    }
+    hmutex_destroy(&rudp->mutex);
+}
+
+bool rudp_insert(rudp_t* rudp, rudp_entry_t* entry) {
+    struct rb_node** n = &rudp->rb_root.rb_node;
+    struct rb_node* parent = NULL;
+    rudp_entry_t* e = NULL;
+    int cmp = 0;
+    bool exists = false;
+    while (*n) {
+        parent = *n;
+        e = rb_entry(*n, rudp_entry_t, rb_node);
+        cmp = memcmp(&entry->addr, &e->addr, sizeof(sockaddr_u));
+        if (cmp < 0) {
+            n = &(*n)->rb_left;
+        } else if (cmp > 0) {
+            n = &(*n)->rb_right;
+        } else {
+            exists = true;
+            break;
+        }
+    }
+
+    if (!exists) {
+        rb_link_node(&entry->rb_node, parent, n);
+        rb_insert_color(&entry->rb_node, &rudp->rb_root);
+    }
+    return !exists;
+}
+
+rudp_entry_t* rudp_search(rudp_t* rudp, struct sockaddr* addr) {
+    struct rb_node* n = rudp->rb_root.rb_node;
+    rudp_entry_t* e = NULL;
+    int cmp = 0;
+    bool exists = false;
+    while (n) {
+        e = rb_entry(n, rudp_entry_t, rb_node);
+        cmp = memcmp(addr, &e->addr, sizeof(sockaddr_u));
+        if (cmp < 0) {
+            n = n->rb_left;
+        } else if (cmp > 0) {
+            n = n->rb_right;
+        } else {
+            exists = true;
+            break;
+        }
+    }
+    return exists ? e : NULL;
+}
+
+rudp_entry_t* rudp_remove(rudp_t* rudp, struct sockaddr* addr) {
+    hmutex_lock(&rudp->mutex);
+    rudp_entry_t* e = rudp_search(rudp, addr);
+    if (e) {
+        // printf("rudp_remove ");
+        // SOCKADDR_PRINT(addr);
+        rb_erase(&e->rb_node, &rudp->rb_root);
+    }
+    hmutex_unlock(&rudp->mutex);
+    return e;
+}
+
+rudp_entry_t* rudp_get(rudp_t* rudp, struct sockaddr* addr) {
+    hmutex_lock(&rudp->mutex);
+    struct rb_node** n = &rudp->rb_root.rb_node;
+    struct rb_node* parent = NULL;
+    rudp_entry_t* e = NULL;
+    int cmp = 0;
+    bool exists = false;
+    // search
+    while (*n) {
+        parent = *n;
+        e = rb_entry(*n, rudp_entry_t, rb_node);
+        cmp = memcmp(addr, &e->addr, sizeof(sockaddr_u));
+        if (cmp < 0) {
+            n = &(*n)->rb_left;
+        } else if (cmp > 0) {
+            n = &(*n)->rb_right;
+        } else {
+            exists = true;
+            break;
+        }
+    }
+
+    if (!exists) {
+        // insert
+        // printf("rudp_insert ");
+        // SOCKADDR_PRINT(addr);
+        HV_ALLOC_SIZEOF(e);
+        memcpy(&e->addr, addr, SOCKADDR_LEN(addr));
+        rb_link_node(&e->rb_node, parent, n);
+        rb_insert_color(&e->rb_node, &rudp->rb_root);
+    }
+    hmutex_unlock(&rudp->mutex);
+    return e;
+}
+
+void rudp_del(rudp_t* rudp, struct sockaddr* addr) {
+    hmutex_lock(&rudp->mutex);
+    rudp_entry_t* e = rudp_search(rudp, addr);
+    if (e) {
+        // printf("rudp_remove ");
+        // SOCKADDR_PRINT(addr);
+        rb_erase(&e->rb_node, &rudp->rb_root);
+        rudp_entry_free(e);
+    }
+    hmutex_unlock(&rudp->mutex);
+}
+
+#endif

+ 61 - 0
event/rudp.h

@@ -0,0 +1,61 @@
+#ifndef HV_RUDP_H_
+#define HV_RUDP_H_
+
+#include "hloop.h"
+
+#if WITH_RUDP
+
+#include "rbtree.h"
+#include "hsocket.h"
+#include "hmutex.h"
+
+#if WITH_KCP
+#include "kcp/ikcp.h"
+#include "hbuf.h"
+#define DEFAULT_KCP_UPDATE_INTERVAL 10 // ms
+#define DEFAULT_KCP_READ_BUFSIZE    1400
+
+typedef struct kcp_s {
+    ikcpcb*         ikcp;
+    htimer_t*       update_timer;
+    hbuf_t          readbuf;
+} kcp_t;
+
+// NOTE: kcp_create in hio_get_kcp
+void kcp_release(kcp_t* kcp);
+#endif
+
+typedef struct rudp_s {
+    struct rb_root  rb_root;
+    hmutex_t        mutex;
+} rudp_t;
+
+typedef struct rudp_entry_s {
+    struct rb_node  rb_node;
+    sockaddr_u      addr; // key
+    // val
+    hio_t*          io;
+#if WITH_KCP
+    kcp_t           kcp;
+#endif
+} rudp_entry_t;
+// NOTE: rudp_entry_t alloc when rudp_get
+void rudp_entry_free(rudp_entry_t* entry);
+
+void rudp_init(rudp_t* rudp);
+void rudp_cleanup(rudp_t* rudp);
+
+bool rudp_insert(rudp_t* rudp, rudp_entry_t* entry);
+// NOTE: just rb_erase, not free
+rudp_entry_t* rudp_remove(rudp_t* rudp, struct sockaddr* addr);
+rudp_entry_t* rudp_search(rudp_t* rudp, struct sockaddr* addr);
+#define rudp_has(rudp, addr) (rudp_search(rudp, addr) != NULL)
+
+// rudp_search + malloc + rudp_insert
+rudp_entry_t* rudp_get(rudp_t* rudp, struct sockaddr* addr);
+// rudp_remove + free
+void          rudp_del(rudp_t* rudp, struct sockaddr* addr);
+
+#endif // WITH_RUDP
+
+#endif // HV_RUDP_H_

+ 25 - 0
evpp/UdpClient.h

@@ -12,6 +12,9 @@ namespace hv {
 class UdpClient {
 public:
     UdpClient() {
+#if WITH_KCP
+        enable_kcp = false;
+#endif
     }
 
     virtual ~UdpClient() {
@@ -47,6 +50,11 @@ public:
                 onWriteComplete(channel, buf);
             }
         };
+#if WITH_KCP
+        if (enable_kcp) {
+            hio_set_kcp(channel->io(), &kcp_setting);
+        }
+#endif
         return channel->startRead();
     }
 
@@ -59,6 +67,7 @@ public:
 
     int sendto(const void* data, int size, struct sockaddr* peeraddr = NULL) {
         if (channel == NULL) return -1;
+        std::lock_guard<std::mutex> locker(sendto_mutex);
         if (peeraddr) hio_set_peeraddr(channel->io(), peeraddr, SOCKADDR_LEN(peeraddr));
         return channel->write(data, size);
     }
@@ -69,13 +78,29 @@ public:
         return sendto(str.data(), str.size(), peeraddr);
     }
 
+#if WITH_KCP
+    void setKcp(kcp_setting_t* setting) {
+        if (setting) {
+            enable_kcp = true;
+            kcp_setting = *setting;
+        } else {
+            enable_kcp = false;
+        }
+    }
+#endif
+
 public:
     SocketChannelPtr        channel;
+#if WITH_KCP
+    bool                    enable_kcp;
+    kcp_setting_t           kcp_setting;
+#endif
     // Callback
     MessageCallback         onMessage;
     WriteCompleteCallback   onWriteComplete;
 
 private:
+    std::mutex              sendto_mutex;
     EventLoopThread         loop_thread;
 };
 

+ 14 - 0
evpp/UdpServer.h

@@ -12,6 +12,9 @@ namespace hv {
 class UdpServer {
 public:
     UdpServer() {
+#if WITH_KCP
+        enable_kcp = false;
+#endif
     }
 
     virtual ~UdpServer() {
@@ -47,6 +50,11 @@ public:
                 onWriteComplete(channel, buf);
             }
         };
+#if WITH_KCP
+        if (enable_kcp) {
+            hio_set_kcp(channel->io(), &kcp_setting);
+        }
+#endif
         return channel->startRead();
     }
 
@@ -59,6 +67,7 @@ public:
 
     int sendto(const void* data, int size, struct sockaddr* peeraddr = NULL) {
         if (channel == NULL) return -1;
+        std::lock_guard<std::mutex> locker(sendto_mutex);
         if (peeraddr) hio_set_peeraddr(channel->io(), peeraddr, SOCKADDR_LEN(peeraddr));
         return channel->write(data, size);
     }
@@ -71,11 +80,16 @@ public:
 
 public:
     SocketChannelPtr        channel;
+#if WITH_KCP
+    bool                    enable_kcp;
+    kcp_setting_t           kcp_setting;
+#endif
     // Callback
     MessageCallback         onMessage;
     WriteCompleteCallback   onWriteComplete;
 
 private:
+    std::mutex              sendto_mutex;
     EventLoopThread         loop_thread;
 };
 

+ 21 - 0
examples/nc.c

@@ -26,6 +26,17 @@
  */
 #define TEST_SSL 0
 
+/*
+ * @test    kcp_client
+ * #define  TEST_KCP 1
+ *
+ * @build   ./configure --with-kcp && make clean && make
+ * @server  bin/udp_echo_server 1234
+ * @client  bin/nc -u 127.0.0.1 1234
+ *
+ */
+#define TEST_KCP 0
+
 #define RECV_BUFSIZE    8192
 static char recvbuf[RECV_BUFSIZE];
 
@@ -113,6 +124,13 @@ static void on_stdin(hio_t* io, void* buf, int readbytes) {
     }
 
     hio_write(sockio, buf, readbytes);
+
+#if TEST_KCP
+    if (strncmp(str, "CLOSE", 5) == 0) {
+        printf("call hio_close\n");
+        hio_close(sockio);
+    }
+#endif
 }
 
 static void on_close(hio_t* io) {
@@ -189,6 +207,9 @@ Examples: nc 127.0.0.1 80\n\
     else if (protocol == 2) {
         // udp
         sockio = hloop_create_udp_client(loop, host, port);
+#if TEST_KCP
+        hio_set_kcp(sockio, NULL);
+#endif
         hio_read(sockio);
     }
     if (sockio == NULL) {

+ 24 - 2
examples/udp_echo_server.c

@@ -11,6 +11,17 @@
 #include "hloop.h"
 #include "hsocket.h"
 
+/*
+ * @test    kcp_server
+ * #define  TEST_KCP 1
+ *
+ * @build   ./configure --with-kcp && make clean && make
+ * @server  bin/udp_echo_server 1234
+ * @client  bin/nc -u 127.0.0.1 1234
+ *
+ */
+#define TEST_KCP 0
+
 static void on_recvfrom(hio_t* io, void* buf, int readbytes) {
     printf("on_recvfrom fd=%d readbytes=%d\n", hio_fd(io), readbytes);
     char localaddrstr[SOCKADDR_STRLEN] = {0};
@@ -18,10 +29,18 @@ static void on_recvfrom(hio_t* io, void* buf, int readbytes) {
     printf("[%s] <=> [%s]\n",
             SOCKADDR_STR(hio_localaddr(io), localaddrstr),
             SOCKADDR_STR(hio_peeraddr(io), peeraddrstr));
-    printf("< %.*s", readbytes, (char*)buf);
+
+    char* str = (char*)buf;
+    printf("< %.*s", readbytes, str);
     // echo
-    printf("> %.*s", readbytes, (char*)buf);
+    printf("> %.*s", readbytes, str);
     hio_write(io, buf, readbytes);
+
+#if TEST_KCP
+    if (strncmp(str, "CLOSE", 5) == 0) {
+        hio_close_rudp(io, hio_peeraddr(io));
+    }
+#endif
 }
 
 int main(int argc, char** argv) {
@@ -42,6 +61,9 @@ int main(int argc, char** argv) {
     if (io == NULL) {
         return -20;
     }
+#if TEST_KCP
+    hio_set_kcp(io, NULL);
+#endif
     hio_setcb_read(io, on_recvfrom);
     hio_read(io);
     hloop_run(loop);

+ 1 - 0
hconfig.h

@@ -70,5 +70,6 @@
 /* #undef WITH_MBEDTLS */
 /* #undef ENABLE_UDS */
 /* #undef USE_MULTIMAP */
+/* #undef WITH_KCP */
 
 #endif // HV_CONFIG_H_

+ 2 - 0
hconfig.h.in

@@ -72,4 +72,6 @@
 #cmakedefine ENABLE_UDS     1
 #cmakedefine USE_MULTIMAP   1
 
+#cmakedefine WITH_KCP       1
+
 #endif // HV_CONFIG_H_

+ 2 - 0
scripts/unittest.sh

@@ -4,6 +4,8 @@ SCRIPT_DIR=$(cd `dirname $0`; pwd)
 ROOT_DIR=${SCRIPT_DIR}/..
 cd ${ROOT_DIR}
 
+bin/rbtree_test
+
 bin/date
 bin/ifconfig
 bin/mkdir_p 123/456