소스 검색

Add kcptun (#528)

* Add examples/kcptun

* Add README for kcptun
ithewei 1 년 전
부모
커밋
8e7339a099
10개의 변경된 파일1075개의 추가작업 그리고 0개의 파일을 삭제
  1. 8 0
      Makefile
  2. 1 0
      README-CN.md
  3. 1 0
      README.md
  4. 16 0
      examples/CMakeLists.txt
  5. 68 0
      examples/kcptun/README.md
  6. 402 0
      examples/kcptun/client/main.cpp
  7. BIN
      examples/kcptun/kcptun.png
  8. 362 0
      examples/kcptun/server/main.cpp
  9. 79 0
      examples/kcptun/smux/smux.cpp
  10. 138 0
      examples/kcptun/smux/smux.h

+ 8 - 0
Makefile

@@ -191,6 +191,14 @@ mqtt_pub: prepare
 mqtt_client_test: prepare
 	$(MAKEF) TARGET=$@ SRCDIRS="$(CORE_SRCDIRS) mqtt" SRCS="examples/mqtt/mqtt_client_test.cpp"
 
+kcptun: kcptun_client kcptun_server
+
+kcptun_client: prepare
+	$(MAKEF) TARGET=$@ SRCDIRS="$(CORE_SRCDIRS) examples/kcptun/smux examples/kcptun/client"
+
+kcptun_server: prepare
+	$(MAKEF) TARGET=$@ SRCDIRS="$(CORE_SRCDIRS) examples/kcptun/smux examples/kcptun/server"
+
 jsonrpc: jsonrpc_client jsonrpc_server
 
 jsonrpc_client: prepare

+ 1 - 0
README-CN.md

@@ -429,6 +429,7 @@ int main(int argc, char** argv) {
 - HTTP客户端: [examples/http_client_test.cpp](examples/http_client_test.cpp)
 - WebSocket服务端: [examples/websocket_server_test.cpp](examples/websocket_server_test.cpp)
 - WebSocket客户端: [examples/websocket_client_test.cpp](examples/websocket_client_test.cpp)
+- kcptun隧道: [examples/kcptun](examples/kcptun)
 - protobufRPC示例: [examples/protorpc](examples/protorpc)
 - Qt中使用libhv示例: [hv-projects/QtDemo](https://github.com/hv-projects/QtDemo)
 

+ 1 - 0
README.md

@@ -369,6 +369,7 @@ int main(int argc, char** argv) {
 - [examples/http_client_test.cpp](examples/http_client_test.cpp)
 - [examples/websocket_server_test.cpp](examples/websocket_server_test.cpp)
 - [examples/websocket_client_test.cpp](examples/websocket_client_test.cpp)
+- [examples/kcptun](examples/kcptun)
 - [examples/protorpc](examples/protorpc)
 - [hv-projects/QtDemo](https://github.com/hv-projects/QtDemo)
 

+ 16 - 0
examples/CMakeLists.txt

@@ -61,6 +61,22 @@ add_executable(jsonrpc_server jsonrpc/jsonrpc_server.c jsonrpc/cJSON.c)
 target_compile_definitions(jsonrpc_server PRIVATE CJSON_HIDE_SYMBOLS)
 target_link_libraries(jsonrpc_server ${HV_LIBRARIES})
 
+if(WITH_KCP)
+    glob_headers_and_sources(KCPTUN_SMUX_FILES   kcptun/smux)
+    glob_headers_and_sources(KCPTUN_CLIENT_FILES kcptun/client)
+    glob_headers_and_sources(KCPTUN_SERVER_FILES kcptun/server)
+
+    # kcptun_client
+    add_executable(kcptun_client ${KCPTUN_SMUX_FILES} ${KCPTUN_CLIENT_FILES})
+    target_link_libraries(kcptun_client ${HV_LIBRARIES})
+
+    # kcptun_server
+    add_executable(kcptun_server ${KCPTUN_SMUX_FILES} ${KCPTUN_SERVER_FILES})
+    target_link_libraries(kcptun_server ${HV_LIBRARIES})
+
+    list(APPEND EXAMPLES kcptun_client kcptun_server)
+endif()
+
 if(WITH_EVPP)
     include_directories(../cpputil ../evpp)
 

+ 68 - 0
examples/kcptun/README.md

@@ -0,0 +1,68 @@
+# Intro
+
+<img src="kcptun.png" alt="kcptun" height="300px"/>
+
+> *Disclaimer: The picture comes from [github.com/xtaci/kcptun](https://github.com/xtaci/kcptun). Thanks so much.*
+
+# Build
+
+```shell
+./configure --with-kcp
+make clean
+make examples
+make kcptun
+```
+
+# Usage
+
+```shell
+$ bin/kcptun_server -h
+
+Usage: kcptun_server [hvdl:t:m:]
+Options:
+
+  -h|--help                 Print this information
+  -v|--version              Print version
+  -d|--daemon               Daemonize
+  -l|--listen value         kcp server listen address (default: ":4000")
+  -t|--target value         target server address (default: "127.0.0.1:8080")
+  -m|--mode value           profiles: fast3, fast2, fast, normal (default: "fast")
+     --mtu value            set maximum transmission unit for UDP packets (default: 1350)
+     --sndwnd value         set send window size(num of packets) (default: 1024)
+     --rcvwnd value         set receive window size(num of packets) (default: 1024)
+```
+
+```shell
+$ bin/kcptun_client -h
+
+Usage: kcptun_client [hvdl:r:m:]
+Options:
+
+  -h|--help                 Print this information
+  -v|--version              Print version
+  -d|--daemon               Daemonize
+  -l|--localaddr value      local listen address (default: ":8388")
+  -r|--remoteaddr value     kcp server address (default: "127.0.0.1:4000")
+  -m|--mode value           profiles: fast3, fast2, fast, normal (default: "fast")
+     --mtu value            set maximum transmission unit for UDP packets (default: 1350)
+     --sndwnd value         set send window size(num of packets) (default: 128)
+     --rcvwnd value         set receive window size(num of packets) (default: 512)
+```
+
+# Test
+`tcp_client -> kcptun_client -> kcptun_server -> tcp_server`
+```shell
+tcp_server:     bin/tcp_echo_server 1234
+kcptun_server:  bin/kcptun_server -l :4000 -t 127.0.0.1:1234 --mode fast3
+kcptun_client:  bin/kcptun_client -l :8388 -r 127.0.0.1:4000 --mode fast3
+tcp_client:     bin/nc 127.0.0.1 8388
+                > hello
+                < hello
+```
+
+This kcptun examples does not implement encryption, compression, and fec.<br>
+if you want to use [github.com/xtaci/kcptun](https://github.com/xtaci/kcptun), please add `--crypt null --nocomp --ds 0 --ps 0`.<br>
+For example:
+```shell
+golang_kcptun_server -l :4000 -t 127.0.0.1:1234 --mode fast3 --crypt null --nocomp --ds 0 --ps 0
+```

+ 402 - 0
examples/kcptun/client/main.cpp

@@ -0,0 +1,402 @@
+/*
+ * kcptun client
+ *
+ * @build:          ./configure --with-kcp && make clean && make kcptun examples
+ * @tcp_server:     bin/tcp_echo_server 1234
+ * @kcptun_server:  bin/kcptun_server -l :4000 -t 127.0.0.1:1234
+ * @kcptun_client:  bin/kcptun_client -l :8388 -r 127.0.0.1:4000
+ * @tcp_client:     bin/nc 127.0.0.1 8388
+ *                  > hello
+ *                  < hello
+ */
+
+#define WITH_KCP 1
+#include "hversion.h"
+#include "hmain.h"
+#include "hsocket.h"
+#include "hloop.h"
+#include "hthread.h"
+
+#include "../smux/smux.h"
+
+// config
+static const char* localaddr = ":8388";
+static const char* remoteaddr = "127.0.0.1:4000";
+static const char* mode = "fast";
+static int mtu = 1350;
+static int sndwnd = 128;
+static int rcvwnd = 512;
+
+// short options
+static const char options[] = "hvdl:r:m:";
+// long options
+static const option_t long_options[] = {
+    {'h', "help",       NO_ARGUMENT},
+    {'v', "version",    NO_ARGUMENT},
+    {'d', "daemon",     NO_ARGUMENT},
+    {'l', "localaddr",  REQUIRED_ARGUMENT},
+    {'r', "remoteaddr", REQUIRED_ARGUMENT},
+    {'m', "mode",       REQUIRED_ARGUMENT},
+    { 0,  "mtu",        REQUIRED_ARGUMENT},
+    { 0,  "sndwnd",     REQUIRED_ARGUMENT},
+    { 0,  "rcvwnd",     REQUIRED_ARGUMENT},
+};
+
+static const char detail_options[] = R"(
+  -h|--help                 Print this information
+  -v|--version              Print version
+  -d|--daemon               Daemonize
+  -l|--localaddr value      local listen address (default: ":8388")
+  -r|--remoteaddr value     kcp server address (default: "127.0.0.1:4000")
+  -m|--mode value           profiles: fast3, fast2, fast, normal (default: "fast")
+     --mtu value            set maximum transmission unit for UDP packets (default: 1350)
+     --sndwnd value         set send window size(num of packets) (default: 128)
+     --rcvwnd value         set receive window size(num of packets) (default: 512)
+)";
+
+static void print_version() {
+    printf("%s version %s\n", g_main_ctx.program_name, hv_compile_version());
+}
+
+static void print_help() {
+    printf("Usage: %s [%s]\n", g_main_ctx.program_name, options);
+    printf("Options:\n%s\n", detail_options);
+}
+
+static char listen_host[64] = "0.0.0.0";
+static int  listen_port = 8388;
+static hio_t* listen_io = NULL;
+
+static kcp_setting_t s_kcp_setting;
+static char kcp_host[64] = "127.0.0.1";
+static int  kcp_port = 4000;
+static hio_t* kcp_io = NULL;
+
+static smux_config_t  smux_config;
+static smux_session_t smux_session;
+static smux_stream_t  smux_stream0;
+
+static int verbose = 1;
+
+typedef struct kcp_ctx_s {
+    smux_frame_t    frame;
+    uint16_t        want_readbytes;
+} kcp_ctx_t;
+
+/* workflow:
+ *
+ * hloop_create_tcp_server ->
+ * on_accept -> smux_session_open_stream ->
+ * on_recv -> hio_write(kcp_io) ->
+ * on_close -> smux_session_close_stream
+ *
+ * hloop_create_udp_client -> hio_set_kcp ->
+ * on_recvfrom -> smux_session_get_stream -> hio_write(stream_io)
+ *
+ */
+
+static void send_hearbeat(htimer_t* timer) {
+    smux_stream_t* smux_stream = (smux_stream_t*)hevent_userdata(timer);
+    if (smux_stream == NULL) return;
+    // NOP
+    int packlen = smux_stream_output(smux_stream, SMUX_CMD_NOP);
+    if (packlen > 0) {
+        // printf("NOP %d\n", packlen);
+        hio_write(kcp_io, smux_stream->wbuf.base, packlen);
+    }
+}
+
+static void on_close(hio_t* io) {
+    // printf("on_close fd=%d error=%d\n", hio_fd(io), hio_error(io));
+    if (verbose) {
+        char localaddrstr[SOCKADDR_STRLEN] = {0};
+        char peeraddrstr[SOCKADDR_STRLEN] = {0};
+        printf("disconnected connfd=%d [%s] <= [%s]\n", hio_fd(io),
+            SOCKADDR_STR(hio_localaddr(io), localaddrstr),
+            SOCKADDR_STR(hio_peeraddr(io), peeraddrstr));
+    }
+
+    smux_stream_t* smux_stream = (smux_stream_t*)hevent_userdata(io);
+    if (smux_stream == NULL) return;
+
+    // FIN
+    int packlen = smux_stream_output(smux_stream, SMUX_CMD_FIN);
+    if (packlen > 0) {
+        // printf("FIN %d\n", packlen);
+        hio_write(kcp_io, smux_stream->wbuf.base, packlen);
+    }
+
+    // kill timer
+    if (smux_stream->timer) {
+        htimer_del(smux_stream->timer);
+        smux_stream->timer = NULL;
+    }
+
+    // free buffer
+    HV_FREE(smux_stream->rbuf.base);
+    HV_FREE(smux_stream->wbuf.base);
+
+    smux_session_close_stream(&smux_session, smux_stream->stream_id);
+    hevent_set_userdata(io, NULL);
+}
+
+static void on_recv(hio_t* io, void* buf, int readbytes) {
+    // printf("on_recv %.*s \n", readbytes, (char*)buf);
+    smux_stream_t* smux_stream = (smux_stream_t*)hevent_userdata(io);
+    if (smux_stream == NULL) return;
+
+    // PSH
+    smux_frame_t frame;
+    smux_frame_init(&frame);
+    frame.head.sid = smux_stream->stream_id;
+    frame.head.cmd = SMUX_CMD_PSH;
+    frame.head.length = readbytes;
+    frame.data = (const char*)buf;
+    int packlen = smux_frame_pack(&frame, smux_stream->wbuf.base, smux_stream->wbuf.len);
+    if (packlen > 0) {
+        // printf("PSH %d\n", packlen);
+        int nwrite = hio_write(kcp_io, smux_stream->wbuf.base, packlen);
+        // printf("PSH ret=%d\n", nwrite);
+    }
+}
+
+static void on_accept(hio_t* io) {
+    // printf("on_accept connfd=%d\n", hio_fd(io));
+    if (verbose) {
+        char localaddrstr[SOCKADDR_STRLEN] = {0};
+        char peeraddrstr[SOCKADDR_STRLEN] = {0};
+        printf("accept connfd=%d [%s] <= [%s]\n", hio_fd(io),
+                SOCKADDR_STR(hio_localaddr(io), localaddrstr),
+                SOCKADDR_STR(hio_peeraddr(io), peeraddrstr));
+    }
+
+    hio_setcb_close(io, on_close);
+
+    smux_stream_t* smux_stream = smux_session_open_stream(&smux_session, 0, io);
+    // alloc buffer
+    smux_stream->rbuf.len = mtu;
+    smux_stream->wbuf.len = mtu;
+    HV_ALLOC(smux_stream->rbuf.base, smux_stream->rbuf.len);
+    HV_ALLOC(smux_stream->wbuf.base, smux_stream->wbuf.len);
+    hio_set_readbuf(io, smux_stream->rbuf.base, smux_config.max_frame_size);
+    /*
+    // set heartbeat timer
+    if (smux_config.keepalive_interval > 0) {
+        smux_stream->timer = htimer_add(hevent_loop(io), send_hearbeat, smux_config.keepalive_interval, INFINITE);
+        hevent_set_userdata(smux_stream->timer, smux_stream);
+    }
+    */
+    hevent_set_userdata(io, smux_stream);
+
+    // SYN
+    int packlen = smux_stream_output(smux_stream, SMUX_CMD_SYN);
+    if (packlen > 0) {
+        // printf("SYN %d\n", packlen);
+        hio_write(kcp_io, smux_stream->wbuf.base, packlen);
+    }
+
+    hio_setcb_read(io, on_recv);
+    hio_read(io);
+}
+
+static void on_kcp_recvfrom(hio_t* io, void* buf, int readbytes) {
+    // printf("on_kcp_recvfrom %d\n", readbytes);
+
+    kcp_ctx_t* kcp_ctx = (kcp_ctx_t*)hevent_userdata(io);
+    smux_frame_t* frame = &kcp_ctx->frame;
+    if (kcp_ctx->want_readbytes > 0) {
+        frame->data = (const char*)buf;
+        frame->head.length = readbytes;
+        kcp_ctx->want_readbytes -= readbytes;
+    }
+    else {
+        smux_frame_init(frame);
+        int packlen = smux_frame_unpack(frame, buf, readbytes);
+        if (packlen < 0 ||
+            frame->head.version > 2 ||
+            frame->head.cmd > SMUX_CMD_UPD) {
+            fprintf(stderr, "smux_frame_unpack error: %d\n", packlen);
+            return;
+        }
+        int datalen = packlen - SMUX_HEAD_LENGTH;
+        if (datalen < frame->head.length) {
+            kcp_ctx->want_readbytes = frame->head.length - datalen;
+            frame->head.length = datalen;
+        }
+    }
+    // printf("smux sid=%u cmd=%d length=%d\n", frame->head.sid, (int)frame->head.cmd, (int)frame->head.length);
+
+    smux_stream_t* smux_stream = smux_session_get_stream(&smux_session, kcp_ctx->frame.head.sid);
+    if (smux_stream == NULL) {
+        if (frame->head.sid != 0 && frame->head.cmd != SMUX_CMD_FIN) {
+            fprintf(stderr, "recvfrom invalid smux package!\n");
+        }
+        return;
+    }
+
+    switch (frame->head.cmd) {
+    case SMUX_CMD_SYN:
+        hio_setcb_read(smux_stream->io, on_recv);
+        hio_read(smux_stream->io);
+        break;
+    case SMUX_CMD_FIN:
+        hio_close(smux_stream->io);
+        break;
+    case SMUX_CMD_PSH:
+        hio_write(smux_stream->io, frame->data, frame->head.length);
+        break;
+    case SMUX_CMD_NOP:
+        break;
+    default:
+        break;
+    }
+}
+
+int main(int argc, char** argv) {
+    if (argc < 2) {
+        print_help();
+        exit(0);
+    }
+
+    // g_main_ctx
+    main_ctx_init(argc, argv);
+    //int ret = parse_opt(argc, argv, options);
+    int ret = parse_opt_long(argc, argv, long_options, ARRAY_SIZE(long_options));
+    if (ret != 0) {
+        print_help();
+        exit(ret);
+    }
+
+    // help
+    if (get_arg("h")) {
+        print_help();
+        exit(0);
+    }
+
+    // version
+    if (get_arg("v")) {
+        print_version();
+        exit(0);
+    }
+
+#ifdef OS_UNIX
+    // daemon
+    if (get_arg("d")) {
+        // nochdir, noclose
+        int ret = daemon(1, 1);
+        if (ret != 0) {
+            printf("daemon error: %d\n", ret);
+            exit(-10);
+        }
+    }
+#endif
+
+    const char* arg = get_arg("l");
+    if (arg) {
+        localaddr = arg;
+    }
+
+    arg = get_arg("r");
+    if (arg) {
+        remoteaddr = arg;
+    }
+
+    arg = get_arg("m");
+    if (arg) {
+        mode = arg;
+    }
+
+    arg = get_arg("mtu");
+    if (arg) {
+        mtu = atoi(arg);
+    }
+
+    arg = get_arg("sndwnd");
+    if (arg) {
+        sndwnd = atoi(arg);
+    }
+
+    arg = get_arg("rcvwnd");
+    if (arg) {
+        rcvwnd = atoi(arg);
+    }
+
+    const char* pos = strchr(localaddr, ':');
+    int len = 0;
+    if (pos) {
+        len = pos - localaddr;
+        if (len > 0) {
+            memcpy(listen_host, localaddr, len);
+            listen_host[len] = '\0';
+        }
+        listen_port = atoi(pos + 1);
+    }
+
+    pos = strchr(remoteaddr, ':');
+    if (pos) {
+        len = pos - remoteaddr;
+        if (len > 0) {
+            memcpy(kcp_host, remoteaddr, len);
+            kcp_host[len] = '\0';
+        }
+        kcp_port = atoi(pos + 1);
+    }
+
+    if (strcmp(mode, "normal") == 0) {
+        kcp_setting_init_with_normal_mode(&s_kcp_setting);
+    } else if (strcmp(mode, "fast") == 0) {
+        kcp_setting_init_with_fast_mode(&s_kcp_setting);
+    } else if (strcmp(mode, "fast2") == 0) {
+        kcp_setting_init_with_fast2_mode(&s_kcp_setting);
+    } else if (strcmp(mode, "fast3") == 0) {
+        kcp_setting_init_with_fast3_mode(&s_kcp_setting);
+    } else {
+        fprintf(stderr, "Unknown mode '%s'\n", mode);
+        exit(-20);
+    }
+    s_kcp_setting.conv = hv_getpid();
+    s_kcp_setting.mtu = mtu;
+    s_kcp_setting.sndwnd = sndwnd;
+    s_kcp_setting.rcvwnd = rcvwnd;
+
+    printf("smux version: 1\n");
+    printf("%s:%d => %s:%d\n", listen_host, listen_port, kcp_host, kcp_port);
+    printf("mode: %s\n", mode);
+    printf("mtu: %d\n", mtu);
+    printf("sndwnd: %d rcvwnd: %d\n", sndwnd, rcvwnd);
+
+    hloop_t* loop = hloop_new(0);
+
+    listen_io = hloop_create_tcp_server(loop, listen_host, listen_port, on_accept);
+    if (listen_io == NULL) {
+        fprintf(stderr, "create tcp server error!\n");
+        return -30;
+    }
+
+    kcp_io = hloop_create_udp_client(loop, kcp_host, kcp_port);
+    if (kcp_io == NULL) {
+        fprintf(stderr, "create udp client error!\n");
+        return -40;
+    }
+    hio_set_kcp(kcp_io, &s_kcp_setting);
+    kcp_ctx_t* kcp_ctx = NULL;
+    HV_ALLOC_SIZEOF(kcp_ctx);
+    hevent_set_userdata(kcp_io, kcp_ctx);
+    hio_setcb_read(kcp_io, on_kcp_recvfrom);
+    hio_read(kcp_io);
+
+    // smux
+    smux_config.max_frame_size = 1024;
+    smux_session.next_stream_id = 1;
+    smux_stream0.stream_id = 0;
+
+    // set heartbeat timer
+    if (smux_config.keepalive_interval > 0) {
+        htimer_t* timer = htimer_add(loop, send_hearbeat, smux_config.keepalive_interval, INFINITE);
+        hevent_set_userdata(timer, &smux_stream0);
+    }
+
+    hloop_run(loop);
+    hloop_free(&loop);
+    return 0;
+}

BIN
examples/kcptun/kcptun.png


+ 362 - 0
examples/kcptun/server/main.cpp

@@ -0,0 +1,362 @@
+/*
+ * kcptun server
+ *
+ * @build:          ./configure --with-kcp && make clean && make kcptun examples
+ * @tcp_server:     bin/tcp_echo_server 1234
+ * @kcptun_server:  bin/kcptun_server -l :4000 -t 127.0.0.1:1234
+ * @kcptun_client:  bin/kcptun_client -l :8388 -r 127.0.0.1:4000
+ * @tcp_client:     bin/nc 127.0.0.1 8388
+ *                  > hello
+ *                  < hello
+ */
+
+#define WITH_KCP 1
+#include "hversion.h"
+#include "hmain.h"
+#include "hsocket.h"
+#include "hloop.h"
+
+#include "../smux/smux.h"
+
+// config
+static const char* localaddr = ":4000";
+static const char* targetaddr = "127.0.0.1:8080";
+static const char* mode = "fast";
+static int mtu = 1350;
+static int sndwnd = 1024;
+static int rcvwnd = 1024;
+
+// short options
+static const char options[] = "hvdl:t:m:";
+// long options
+static const option_t long_options[] = {
+    {'h', "help",       NO_ARGUMENT},
+    {'v', "version",    NO_ARGUMENT},
+    {'d', "daemon",     NO_ARGUMENT},
+    {'l', "listen",     REQUIRED_ARGUMENT},
+    {'t', "target",     REQUIRED_ARGUMENT},
+    {'m', "mode",       REQUIRED_ARGUMENT},
+    { 0,  "mtu",        REQUIRED_ARGUMENT},
+    { 0,  "sndwnd",     REQUIRED_ARGUMENT},
+    { 0,  "rcvwnd",     REQUIRED_ARGUMENT},
+};
+
+static const char detail_options[] = R"(
+  -h|--help                 Print this information
+  -v|--version              Print version
+  -d|--daemon               Daemonize
+  -l|--listen value         kcp server listen address (default: ":4000")
+  -t|--target value         target server address (default: "127.0.0.1:8080")
+  -m|--mode value           profiles: fast3, fast2, fast, normal (default: "fast")
+     --mtu value            set maximum transmission unit for UDP packets (default: 1350)
+     --sndwnd value         set send window size(num of packets) (default: 1024)
+     --rcvwnd value         set receive window size(num of packets) (default: 1024)
+)";
+
+static void print_version() {
+    printf("%s version %s\n", g_main_ctx.program_name, hv_compile_version());
+}
+
+static void print_help() {
+    printf("Usage: %s [%s]\n", g_main_ctx.program_name, options);
+    printf("Options:\n%s\n", detail_options);
+}
+
+static kcp_setting_t s_kcp_setting;
+static char kcp_host[64] = "0.0.0.0";
+static int  kcp_port = 4000;
+static hio_t* kcp_io = NULL;
+
+static char target_host[64] = "127.0.0.1";
+static int  target_port = 8080;
+
+static smux_config_t  smux_config;
+static smux_session_t smux_session;
+
+static int verbose = 1;
+
+/* workflow:
+ *
+ * hloop_create_udp_server -> on_recvfrom ->
+ *
+ * SYN -> hloop_create_tcp_client ->
+ * on_connect -> hio_write(kcp_io, SYN) ->
+ * on_read -> hio_write(kcp_io) ->
+ * on_close -> hio_write(kcp_io, FIN) -> smux_session_close_stream
+ *
+ * PSH -> smux_session_get_stream -> hio_write(stream_io)
+ *
+ * FIN -> smux_session_get_stream -> hio_close(stream_io)
+ *
+ */
+
+// hloop_create_udp_server -> hio_set_kcp -> on_recvfrom ->
+// SYN -> hloop_create_tcp_client -> smux_session_open_stream -> 
+// PSH -> hio_write(io)
+
+static void on_close(hio_t* io) {
+    // printf("on_close fd=%d error=%d\n", hio_fd(io), hio_error(io));
+    if (verbose) {
+        char localaddrstr[SOCKADDR_STRLEN] = {0};
+        char peeraddrstr[SOCKADDR_STRLEN] = {0};
+        printf("disconnected connfd=%d [%s] => [%s]\n", hio_fd(io),
+            SOCKADDR_STR(hio_localaddr(io), localaddrstr),
+            SOCKADDR_STR(hio_peeraddr(io), peeraddrstr));
+    }
+
+    smux_stream_t* smux_stream = (smux_stream_t*)hevent_userdata(io);
+    if (smux_stream == NULL) return;
+
+    // FIN
+    int packlen = smux_stream_output(smux_stream, SMUX_CMD_FIN);
+    if (packlen > 0) {
+        // printf("FIN %d\n", packlen);
+        hio_write(kcp_io, smux_stream->wbuf.base, packlen);
+    }
+
+    // kill timer
+    if (smux_stream->timer) {
+        htimer_del(smux_stream->timer);
+        smux_stream->timer = NULL;
+    }
+
+    // free buffer
+    HV_FREE(smux_stream->rbuf.base);
+    HV_FREE(smux_stream->wbuf.base);
+
+    smux_session_close_stream(&smux_session, smux_stream->stream_id);
+    hevent_set_userdata(io, NULL);
+}
+
+static void on_recv(hio_t* io, void* buf, int readbytes) {
+    // printf("on_recv %.*s \n", readbytes, (char*)buf);
+    smux_stream_t* smux_stream = (smux_stream_t*)hevent_userdata(io);
+    if (smux_stream == NULL) return;
+
+    // PSH
+    smux_frame_t frame;
+    smux_frame_init(&frame);
+    frame.head.sid = smux_stream->stream_id;
+    frame.head.cmd = SMUX_CMD_PSH;
+    frame.head.length = readbytes;
+    frame.data = (const char*)buf;
+    int packlen = smux_frame_pack(&frame, smux_stream->wbuf.base, smux_stream->wbuf.len);
+    if (packlen > 0) {
+        // printf("PSH %d\n", packlen);
+        int nwrite = hio_write(kcp_io, smux_stream->wbuf.base, packlen);
+        // printf("PSH ret=%d\n", nwrite);
+    }
+}
+
+static void on_connect(hio_t* io) {
+    // printf("on_connect fd=%d\n", hio_fd(io));
+    if (verbose) {
+        char localaddrstr[SOCKADDR_STRLEN] = {0};
+        char peeraddrstr[SOCKADDR_STRLEN] = {0};
+        printf("connected connfd=%d [%s] => [%s]\n", hio_fd(io),
+            SOCKADDR_STR(hio_localaddr(io), localaddrstr),
+            SOCKADDR_STR(hio_peeraddr(io), peeraddrstr));
+    }
+
+    smux_stream_t* smux_stream = (smux_stream_t*)hevent_userdata(io);
+    if (smux_stream == NULL) return;
+
+    // SYN
+    int packlen = smux_stream_output(smux_stream, SMUX_CMD_SYN);
+    if (packlen > 0) {
+        // printf("SYN %d\n", packlen);
+        hio_write(kcp_io, smux_stream->wbuf.base, packlen);
+    }
+
+    hio_setcb_read(io, on_recv);
+    hio_read(io);
+}
+
+static void on_kcp_recvfrom(hio_t* io, void* buf, int readbytes) {
+    // printf("on_kcp_recvfrom %d\n", readbytes);
+    smux_frame_t frame;
+    smux_frame_init(&frame);
+    int packlen = smux_frame_unpack(&frame, buf, readbytes);
+    assert(packlen == readbytes);
+    if (packlen < 0 ||
+        frame.head.version > 2 ||
+        frame.head.cmd > SMUX_CMD_UPD) {
+        fprintf(stderr, "smux_frame_unpack error: %d\n", packlen);
+        return;
+    }
+    // printf("smux sid=%u cmd=%d length=%d\n", frame.head.sid, (int)frame.head.cmd, (int)frame.head.length);
+
+    smux_stream_t* smux_stream = NULL;
+    if (frame.head.cmd == SMUX_CMD_SYN) {
+        hio_t* target_io = hio_create_socket(hevent_loop(kcp_io), target_host, target_port, HIO_TYPE_TCP, HIO_CLIENT_SIDE);
+        if (target_io == NULL) {
+            fprintf(stderr, "create tcp client error!\n");
+            return;
+        }
+        smux_stream = smux_session_open_stream(&smux_session, frame.head.sid, target_io);
+        // alloc buffer
+        smux_stream->rbuf.len = mtu;
+        smux_stream->wbuf.len = mtu;
+        HV_ALLOC(smux_stream->rbuf.base, smux_stream->rbuf.len);
+        HV_ALLOC(smux_stream->wbuf.base, smux_stream->wbuf.len);
+        hio_set_readbuf(target_io, smux_stream->rbuf.base, smux_config.max_frame_size);
+        hevent_set_userdata(target_io, smux_stream);
+
+        hio_setcb_connect(target_io, on_connect);
+        hio_setcb_close(target_io, on_close);
+        hio_connect(target_io);
+    } else {
+        smux_stream = smux_session_get_stream(&smux_session, frame.head.sid);
+    }
+    if (smux_stream == NULL) {
+        if (frame.head.sid != 0 && frame.head.cmd != SMUX_CMD_FIN) {
+            fprintf(stderr, "recvfrom invalid smux package!\n");
+        }
+        return;
+    }
+
+    switch (frame.head.cmd) {
+    case SMUX_CMD_FIN:
+        hio_close(smux_stream->io);
+        break;
+    case SMUX_CMD_PSH:
+        hio_write(smux_stream->io, frame.data, frame.head.length);
+        break;
+    case SMUX_CMD_NOP:
+        break;
+    default:
+        break;
+    }
+}
+
+int main(int argc, char** argv) {
+    if (argc < 2) {
+        print_help();
+        exit(0);
+    }
+
+    // g_main_ctx
+    main_ctx_init(argc, argv);
+    //int ret = parse_opt(argc, argv, options);
+    int ret = parse_opt_long(argc, argv, long_options, ARRAY_SIZE(long_options));
+    if (ret != 0) {
+        print_help();
+        exit(ret);
+    }
+
+    // help
+    if (get_arg("h")) {
+        print_help();
+        exit(0);
+    }
+
+    // version
+    if (get_arg("v")) {
+        print_version();
+        exit(0);
+    }
+
+#ifdef OS_UNIX
+    // daemon
+    if (get_arg("d")) {
+        // nochdir, noclose
+        int ret = daemon(1, 1);
+        if (ret != 0) {
+            printf("daemon error: %d\n", ret);
+            exit(-10);
+        }
+    }
+#endif
+
+    const char* arg = get_arg("l");
+    if (arg) {
+        localaddr = arg;
+    }
+
+    arg = get_arg("t");
+    if (arg) {
+        targetaddr = arg;
+    }
+
+    arg = get_arg("m");
+    if (arg) {
+        mode = arg;
+    }
+
+    arg = get_arg("mtu");
+    if (arg) {
+        mtu = atoi(arg);
+    }
+
+    arg = get_arg("sndwnd");
+    if (arg) {
+        sndwnd = atoi(arg);
+    }
+
+    arg = get_arg("rcvwnd");
+    if (arg) {
+        rcvwnd = atoi(arg);
+    }
+
+    const char* pos = strchr(localaddr, ':');
+    int len = 0;
+    if (pos) {
+        len = pos - localaddr;
+        if (len > 0) {
+            memcpy(kcp_host, localaddr, len);
+            kcp_host[len] = '\0';
+        }
+        kcp_port = atoi(pos + 1);
+    }
+
+    pos = strchr(targetaddr, ':');
+    if (pos) {
+        len = pos - targetaddr;
+        if (len > 0) {
+            memcpy(target_host, targetaddr, len);
+            target_host[len] = '\0';
+        }
+        target_port = atoi(pos + 1);
+    }
+
+    if (strcmp(mode, "normal") == 0) {
+        kcp_setting_init_with_normal_mode(&s_kcp_setting);
+    } else if (strcmp(mode, "fast") == 0) {
+        kcp_setting_init_with_fast_mode(&s_kcp_setting);
+    } else if (strcmp(mode, "fast2") == 0) {
+        kcp_setting_init_with_fast2_mode(&s_kcp_setting);
+    } else if (strcmp(mode, "fast3") == 0) {
+        kcp_setting_init_with_fast3_mode(&s_kcp_setting);
+    } else {
+        fprintf(stderr, "Unknown mode '%s'\n", mode);
+        exit(-20);
+    }
+    s_kcp_setting.mtu = mtu;
+    s_kcp_setting.sndwnd = sndwnd;
+    s_kcp_setting.rcvwnd = rcvwnd;
+
+    printf("smux version: 1\n");
+    printf("%s:%d => %s:%d\n", kcp_host, kcp_port, target_host, target_port);
+    printf("mode: %s\n", mode);
+    printf("mtu: %d\n", mtu);
+    printf("sndwnd: %d rcvwnd: %d\n", sndwnd, rcvwnd);
+
+    hloop_t* loop = hloop_new(0);
+
+    kcp_io = hloop_create_udp_server(loop, kcp_host, kcp_port);
+    if (kcp_io == NULL) {
+        fprintf(stderr, "create udp server error!\n");
+        return -20;
+    }
+    hio_set_kcp(kcp_io, &s_kcp_setting);
+    hio_setcb_read(kcp_io, on_kcp_recvfrom);
+    hio_read(kcp_io);
+
+    // smux
+    smux_config.max_frame_size = 1024;
+    smux_session.next_stream_id = 0;
+
+    hloop_run(loop);
+    hloop_free(&loop);
+    return 0;
+}

+ 79 - 0
examples/kcptun/smux/smux.cpp

@@ -0,0 +1,79 @@
+#include "smux.h"
+
+#define SMUX_USE_LITTLE_ENDIAN 1
+
+int smux_frame_pack(const smux_frame_t* frame, void* buf, int len) {
+    if (!frame || !buf || !len) return -1;
+    const smux_head_t* head = &(frame->head);
+    unsigned int packlen = smux_package_length(head);
+    // Check is buffer enough
+    if (len < packlen) {
+        return -2;
+    }
+    unsigned char* p = (unsigned char*)buf;
+    *p++ = head->version;
+    *p++ = head->cmd;
+#if SMUX_USE_LITTLE_ENDIAN
+    *p++ =  head->length;
+    *p++ = (head->length >> 8) & 0xFF;
+#else
+    // hton length
+    *p++ = (head->length >> 8) & 0xFF;
+    *p++ =  head->length;
+#endif
+
+    uint32_t sid = head->sid;
+#if SMUX_USE_LITTLE_ENDIAN
+    *p++ =  sid        & 0xFF;
+    *p++ = (sid >>  8) & 0xFF;
+    *p++ = (sid >> 16) & 0xFF;
+    *p++ = (sid >> 24) & 0xFF;
+#else
+    // hton sid
+    *p++ = (sid >> 24) & 0xFF;
+    *p++ = (sid >> 16) & 0xFF;
+    *p++ = (sid >>  8) & 0xFF;
+    *p++ =  sid        & 0xFF;
+#endif
+    // memcpy data
+    if (frame->data && head->length) {
+        memcpy(p, frame->data, frame->head.length);
+    }
+    return packlen;
+}
+
+int smux_frame_unpack(smux_frame_t* frame, const void* buf, int len) {
+    if (!frame || !buf || !len) return -1;
+    if (len < SMUX_HEAD_LENGTH) return -2;
+    smux_head_t* head = &(frame->head);
+    unsigned char* p = (unsigned char*)buf;
+    head->version = *p++;
+    head->cmd = *p++;
+#if SMUX_USE_LITTLE_ENDIAN
+    head->length  = *p++;
+    head->length |= ((uint16_t)*p++) << 8;
+#else
+    // ntoh length
+    head->length  = ((uint16_t)*p++) << 8;
+    head->length |= *p++;
+#endif
+
+#if SMUX_USE_LITTLE_ENDIAN
+    head->sid  = *p++;
+    head->sid |= ((uint32_t)*p++) << 8;
+    head->sid |= ((uint32_t)*p++) << 16;
+    head->sid |= ((uint32_t)*p++) << 24;
+#else
+    // ntoh sid
+    head->sid  = ((uint32_t)*p++) << 24;
+    head->sid |= ((uint32_t)*p++) << 16;
+    head->sid |= ((uint32_t)*p++) << 8;
+    head->sid |= *p++;
+#endif
+    // NOTE: just shadow copy
+    if (len > SMUX_HEAD_LENGTH) {
+        frame->data = (const char*)buf + SMUX_HEAD_LENGTH;
+    }
+    unsigned int packlen = smux_package_length(head);
+    return MIN(len, packlen);
+}

+ 138 - 0
examples/kcptun/smux/smux.h

@@ -0,0 +1,138 @@
+#ifndef HV_SMUX_H_
+#define HV_SMUX_H_
+
+/*
+ * smux: Simple MUltipleXing used by kcptun
+ * @see: https://github.com/xtaci/smux
+ *
+ */
+
+#include <map>
+
+#include "hplatform.h"
+#include "hbase.h"
+#include "hbuf.h"
+#include "hloop.h"
+
+typedef enum {
+    // v1
+    SMUX_CMD_SYN = 0, // stream open
+    SMUX_CMD_FIN = 1, // stream close
+    SMUX_CMD_PSH = 2, // data push
+    SMUX_CMD_NOP = 3, // no operation
+    // v2
+    SMUX_CMD_UPD = 4, // update
+} smux_cmd_e;
+
+typedef struct {
+    uint8_t     version;
+    uint8_t     cmd;
+    uint16_t    length;
+    uint32_t    sid;
+} smux_head_t;
+
+#define SMUX_HEAD_LENGTH    8
+
+typedef struct {
+    smux_head_t head;
+    const char* data;
+} smux_frame_t;
+
+static inline unsigned int smux_package_length(const smux_head_t* head) {
+    return SMUX_HEAD_LENGTH + head->length;
+}
+
+static inline void smux_head_init(smux_head_t* head) {
+    head->version = 1;
+    head->cmd = (uint8_t)SMUX_CMD_PSH;
+    head->length = 0;
+    head->sid = 0;
+}
+
+static inline void smux_frame_init(smux_frame_t* frame) {
+    smux_head_init(&frame->head);
+    frame->data = NULL;
+}
+
+// @retval >0 package_length, <0 error
+int smux_frame_pack(const smux_frame_t* frame, void* buf, int len);
+// @retval >0 package_length, <0 error
+int smux_frame_unpack(smux_frame_t* frame, const void* buf, int len);
+
+typedef struct smux_config_s {
+    int version;
+    int keepalive_interval;
+    int keepalive_timeout;
+    int max_frame_size;
+
+    smux_config_s() {
+        version = 1;
+        keepalive_interval = 10000;
+        keepalive_timeout  = 30000;
+        max_frame_size = 1024;
+    }
+} smux_config_t;
+
+typedef struct {
+    uint32_t        stream_id;
+    smux_frame_t    frame;
+    hbuf_t          rbuf;
+    hbuf_t          wbuf;
+    hio_t*          io;
+    htimer_t*       timer;
+} smux_stream_t;
+
+// @retval >0 package_length, <0 error, data => wbuf
+static inline int smux_stream_output(smux_stream_t* stream, smux_frame_t* frame) {
+    return smux_frame_pack(frame, stream->wbuf.base, stream->wbuf.len);
+}
+
+static inline int smux_stream_output(smux_stream_t* stream, smux_cmd_e cmd) {
+    smux_frame_t frame;
+    smux_frame_init(&frame);
+    frame.head.sid = stream->stream_id;
+    frame.head.cmd = (uint8_t)cmd;
+    return smux_frame_pack(&frame, stream->wbuf.base, stream->wbuf.len);
+}
+
+// @retval >0 package_length, <0 error, data => frame
+static inline int smux_stream_input(smux_stream_t* stream, const void* buf, int len) {
+    return smux_frame_unpack(&stream->frame, buf, len);
+}
+
+typedef struct {
+    uint32_t                            next_stream_id;
+    // stream_id => smux_stream_t
+    std::map<uint32_t, smux_stream_t*>  streams;
+} smux_session_t;
+
+static inline smux_stream_t* smux_session_open_stream(smux_session_t* session, uint32_t stream_id = 0, hio_t* io = NULL) {
+    smux_stream_t* stream = NULL;
+    HV_ALLOC_SIZEOF(stream);
+    if (stream_id == 0) {
+        session->next_stream_id += 2;
+        stream_id = session->next_stream_id;
+    }
+    stream->stream_id = stream_id;
+    session->streams[stream_id] = stream;
+    stream->io = io;
+    return stream;
+}
+
+static inline smux_stream_t* smux_session_get_stream(smux_session_t* session, uint32_t stream_id) {
+    auto iter = session->streams.find(stream_id);
+    if (iter != session->streams.end()) {
+        return iter->second;
+    }
+    return NULL;
+}
+
+static inline void smux_session_close_stream(smux_session_t* session, uint32_t stream_id) {
+    auto iter = session->streams.find(stream_id);
+    if (iter != session->streams.end()) {
+        HV_FREE(iter->second);
+        session->streams.erase(iter);
+    }
+}
+
+#endif // HV_SMUX_H_