ithewei пре 4 година
родитељ
комит
f2370eabba
14 измењених фајлова са 434 додато и 28 уклоњено
  1. 1 0
      README-CN.md
  2. 1 0
      README.md
  3. 42 0
      base/hmath.h
  4. 52 12
      event/hevent.c
  5. 3 1
      event/hevent.h
  6. 16 4
      event/hloop.c
  7. 81 1
      event/hloop.h
  8. 11 5
      event/nio.c
  9. 0 3
      event/overlapio.c
  10. 184 0
      event/unpack.c
  11. 11 0
      event/unpack.h
  12. 5 0
      evpp/Channel.h
  13. 21 2
      evpp/TcpClient.h
  14. 6 0
      evpp/TcpServer.h

+ 1 - 0
README-CN.md

@@ -24,6 +24,7 @@
 - 跨平台(Linux, Windows, MacOS, Solaris)
 - 高性能事件循环(网络IO事件、定时器事件、空闲事件、自定义事件)
 - TCP/UDP服务端/客户端/代理
+- TCP支持心跳、转发、拆包、多线程安全write和close等特性
 - SSL/TLS加密通信(可选WITH_OPENSSL、WITH_GNUTLS、WITH_MBEDTLS)
 - HTTP服务端/客户端(支持https http1/x http2 grpc)
 - HTTP支持静态文件服务、目录服务、同步/异步API处理函数

+ 1 - 0
README.md

@@ -26,6 +26,7 @@ but simpler api and richer protocols.
 - Cross-platform (Linux, Windows, MacOS, Solaris)
 - EventLoop (IO, timer, idle, custom)
 - TCP/UDP client/server/proxy
+- TCP supports heartbeat, upstream, unpack, MultiThread-safe write and close, etc.
 - SSL/TLS support: (via WITH_OPENSSL or WITH_GNUTLS or WITH_MBEDTLS)
 - HTTP client/server (support https http1/x http2 grpc)
 - HTTP static file service, indexof service, sync/async API handler

+ 42 - 0
base/hmath.h

@@ -23,4 +23,46 @@ static inline unsigned long ceil2e(unsigned long num) {
     return ret;
 }
 
+// varint little-endian
+// MSB
+static inline int varint_encode(long long value, unsigned char* buf) {
+    unsigned char ch;
+    unsigned char *p = buf;
+    int bytes = 0;
+    do {
+        ch = value & 0x7F;
+        value >>= 7;
+        *p++ = value == 0 ? ch : (ch | 0x80);
+        ++bytes;
+    } while (value);
+    return bytes;
+}
+
+// @param[IN|OUT] len: in=>buflen, out=>varint bytesize
+static inline long long varint_decode(const unsigned char* buf, int* len) {
+    long long ret = 0;
+    int bytes = 0, bits = 0;
+    const unsigned char *p = buf;
+    do {
+        if (len && *len && bytes == *len) {
+            // Not enough length
+            *len = 0;
+            return 0;
+        }
+        ret |= ((long long)(*p & 0x7F)) << bits;
+        ++bytes;
+        if ((*p & 0x80) == 0) {
+            // Found end
+            if (len) *len = bytes;
+            return ret;
+        }
+        ++p;
+        bits += 7;
+    } while(bytes < 10);
+
+    // Not found end
+    if (len) *len = -1;
+    return ret;
+}
+
 #endif // HV_MATH_H_

+ 52 - 12
event/hevent.c

@@ -131,18 +131,10 @@ int hio_set_ssl(hio_t* io, hssl_t ssl) {
 }
 
 void hio_set_readbuf(hio_t* io, void* buf, size_t len) {
-    if (buf == NULL || len == 0) {
-        hloop_t* loop = io->loop;
-        if (loop && (loop->readbuf.base == NULL || loop->readbuf.len == 0)) {
-            loop->readbuf.len = HLOOP_READ_BUFSIZE;
-            HV_ALLOC(loop->readbuf.base, loop->readbuf.len);
-            io->readbuf = loop->readbuf;
-        }
-    }
-    else {
-        io->readbuf.base = (char*)buf;
-        io->readbuf.len = len;
-    }
+    assert(io && buf && len != 0);
+    io->readbuf.base = (char*)buf;
+    io->readbuf.len = len;
+    io->readbuf.offset = 0;
 }
 
 void hio_del_connect_timer(hio_t* io) {
@@ -244,3 +236,51 @@ void hio_set_heartbeat(hio_t* io, int interval_ms, hio_send_heartbeat_fn fn) {
     io->heartbeat_interval = interval_ms;
     io->heartbeat_fn = fn;
 }
+
+void hio_unset_unpack(hio_t* io) {
+    if (io->unpack_setting) {
+        // NOTE: unpack has own readbuf
+        if (io->readbuf.base && io->readbuf.len &&
+            io->readbuf.base != io->loop->readbuf.base) {
+            HV_FREE(io->readbuf.base);
+            // reset to loop->readbuf
+            io->readbuf.base = io->loop->readbuf.base;
+            io->readbuf.len = io->loop->readbuf.len;
+        }
+        io->unpack_setting = NULL;
+    }
+}
+
+void hio_set_unpack(hio_t* io, unpack_setting_t* setting) {
+    hio_unset_unpack(io);
+    if (setting == NULL) return;
+
+    io->unpack_setting = setting;
+    if (io->unpack_setting->package_max_length == 0) {
+        io->unpack_setting->package_max_length = DEFAULT_PACKAGE_MAX_LENGTH;
+    }
+    if (io->unpack_setting->mode == UNPACK_BY_FIXED_LENGTH) {
+        assert(io->unpack_setting->fixed_length != 0 &&
+               io->unpack_setting->fixed_length <= io->unpack_setting->package_max_length);
+    }
+    else if (io->unpack_setting->mode == UNPACK_BY_DELIMITER) {
+        if (io->unpack_setting->delimiter_bytes == 0) {
+            io->unpack_setting->delimiter_bytes = strlen((char*)io->unpack_setting->delimiter);
+        }
+    }
+    else if (io->unpack_setting->mode == UNPACK_BY_LENGTH_FIELD) {
+        assert(io->unpack_setting->body_offset >=
+               io->unpack_setting->length_field_offset +
+               io->unpack_setting->length_field_bytes);
+    }
+
+    // NOTE: unpack must have own readbuf
+    if (io->unpack_setting->mode == UNPACK_BY_FIXED_LENGTH) {
+        io->readbuf.len = io->unpack_setting->fixed_length;
+    } else {
+        io->readbuf.len = 2;
+    }
+    assert(io->readbuf.len > 0);
+    // NOTE: free in hio_unset_unpack
+    HV_ALLOC(io->readbuf.base, io->readbuf.len);
+}

+ 3 - 1
event/hevent.h

@@ -107,7 +107,7 @@ struct hio_s {
     int         revents;
     struct sockaddr*    localaddr;
     struct sockaddr*    peeraddr;
-    hbuf_t              readbuf;        // for hread
+    offset_buf_t        readbuf;        // for hread
     struct write_queue  write_queue;    // for hwrite
     hrecursive_mutex_t  write_mutex;    // lock write and write_queue
     // callbacks
@@ -128,6 +128,8 @@ struct hio_s {
     htimer_t*   heartbeat_timer;
     // upstream
     struct hio_s*   upstream_io;
+    // unpack
+    unpack_setting_t*   unpack_setting;
 // private:
     int         event_index[2]; // for poll,kqueue
     void*       hovlp;          // for iocp/overlapio

+ 16 - 4
event/hloop.c

@@ -509,7 +509,7 @@ htimer_t* htimer_add(hloop_t* loop, htimer_cb cb, uint32_t timeout, uint32_t rep
     timer->repeat = repeat;
     timer->timeout = timeout;
     hloop_update_time(loop);
-    timer->next_timeout = hloop_now_hrtime(loop) + timeout*1000;
+    timer->next_timeout = hloop_now_hrtime(loop) + (uint64_t)timeout*1000;
     heap_insert(&loop->timers, &timer->node);
     EVENT_ADD(loop, timer, cb);
     loop->ntimers++;
@@ -530,7 +530,7 @@ void htimer_reset(htimer_t* timer) {
     if (timer->repeat == 0) {
         timer->repeat = 1;
     }
-    timer->next_timeout = hloop_now_hrtime(loop) + timeout->timeout*1000;
+    timer->next_timeout = hloop_now_hrtime(loop) + (uint64_t)timeout->timeout*1000;
     heap_insert(&loop->timers, &timer->node);
     EVENT_RESET(timer);
 }
@@ -671,6 +671,10 @@ void hio_ready(hio_t* io) {
     io->io_type = HIO_TYPE_UNKNOWN;
     io->error = 0;
     io->events = io->revents = 0;
+    // readbuf
+    io->readbuf.base = io->loop->readbuf.base;
+    io->readbuf.len = io->loop->readbuf.len;
+    io->readbuf.offset = 0;
     // callbacks
     io->read_cb = NULL;
     io->write_cb = NULL;
@@ -689,6 +693,8 @@ void hio_ready(hio_t* io) {
     io->heartbeat_timer = NULL;
     // upstream
     io->upstream_io = NULL;
+    // unpack
+    io->unpack_setting = NULL;
     // private:
     io->event_index[0] = io->event_index[1] = -1;
     io->hovlp = NULL;
@@ -707,6 +713,10 @@ void hio_done(hio_t* io) {
 
     hio_del(io, HV_RDWR);
 
+    // readbuf
+    hio_unset_unpack(io);
+
+    // write_queue
     offset_buf_t* pbuf = NULL;
     hrecursive_mutex_lock(&io->write_mutex);
     while (!write_queue_empty(&io->write_queue)) {
@@ -831,8 +841,10 @@ int hio_close_async(hio_t* io) {
 hio_t* hread(hloop_t* loop, int fd, void* buf, size_t len, hread_cb read_cb) {
     hio_t* io = hio_get(loop, fd);
     assert(io != NULL);
-    io->readbuf.base = (char*)buf;
-    io->readbuf.len = len;
+    if (buf && len) {
+        io->readbuf.base = (char*)buf;
+        io->readbuf.len = len;
+    }
     if (read_cb) {
         io->read_cb = read_cb;
     }

+ 81 - 1
event/hloop.h

@@ -237,7 +237,7 @@ HV_EXPORT int  hio_enable_ssl(hio_t* io);
 HV_EXPORT bool hio_is_ssl(hio_t* io);
 HV_EXPORT hssl_t hio_get_ssl(hio_t* io);
 HV_EXPORT int  hio_set_ssl(hio_t* io, hssl_t ssl);
-// TODO: One loop per thread, one readbuf per loop.
+// NOTE: One loop per thread, one readbuf per loop.
 // But you can pass in your own readbuf instead of the default readbuf to avoid memcopy.
 HV_EXPORT void hio_set_readbuf(hio_t* io, void* buf, size_t len);
 // connect timeout => hclose_cb
@@ -357,6 +357,86 @@ HV_EXPORT hio_t* hio_setup_tcp_upstream(hio_t* io, const char* host, int port, i
 // @see examples/udp_proxy_server
 HV_EXPORT hio_t* hio_setup_udp_upstream(hio_t* io, const char* host, int port);
 
+//-----------------unpack---------------------------------------------
+typedef enum {
+    UNPACK_BY_FIXED_LENGTH  = 1,    // Not recommended
+    UNPACK_BY_DELIMITER     = 2,
+    UNPACK_BY_LENGTH_FIELD  = 3,    // Recommended
+} unpack_mode_e;
+
+#define DEFAULT_PACKAGE_MAX_LENGTH  (1 << 21)   // 2M
+
+// UNPACK_BY_DELIMITER
+#define PACKAGE_MAX_DELIMITER_BYTES 8
+
+// UNPACK_BY_LENGTH_FIELD
+typedef enum {
+    ENCODE_BY_VARINT        = 1,
+    ENCODE_BY_LITTEL_ENDIAN = LITTLE_ENDIAN,    // 1234
+    ENCODE_BY_BIG_ENDIAN    = BIG_ENDIAN,       // 4321
+} unpack_coding_e;
+
+typedef struct unpack_setting_s {
+    unpack_mode_e   mode;
+    unsigned int    package_max_length;
+    // UNPACK_BY_FIXED_LENGTH
+    unsigned int    fixed_length;
+    // UNPACK_BY_DELIMITER
+    unsigned char   delimiter[PACKAGE_MAX_DELIMITER_BYTES];
+    unsigned short  delimiter_bytes;
+    // UNPACK_BY_LENGTH_FIELD
+    unsigned short  body_offset; // real_body_offset = body_offset + varint_bytes - length_field_bytes
+    unsigned short  length_field_offset;
+    unsigned short  length_field_bytes;
+    unpack_coding_e length_field_coding;
+#ifdef __cplusplus
+    unpack_setting_s() {
+        // Recommended setting:
+        // head = flags:1byte + length:4bytes = 5bytes
+        mode = UNPACK_BY_LENGTH_FIELD;
+        package_max_length = DEFAULT_PACKAGE_MAX_LENGTH;
+        fixed_length = 0;
+        delimiter_bytes = 0;
+        body_offset = 5;
+        length_field_offset = 1;
+        length_field_bytes = 4;
+        length_field_coding = ENCODE_BY_BIG_ENDIAN;
+    }
+#endif
+} unpack_setting_t;
+
+HV_EXPORT void hio_set_unpack(hio_t* io, unpack_setting_t* setting);
+HV_EXPORT void hio_unset_unpack(hio_t* io);
+
+// unpack examples
+/*
+unpack_setting_t ftp_unpack_setting;
+memset(&ftp_unpack_setting, 0, sizeof(unpack_setting_t));
+ftp_unpack_setting.package_max_length = DEFAULT_PACKAGE_MAX_LENGTH;
+ftp_unpack_setting.mode = UNPACK_BY_DELIMITER;
+ftp_unpack_setting.delimiter[0] = '\r';
+ftp_unpack_setting.delimiter[1] = '\n';
+ftp_unpack_setting.delimiter_bytes = 2;
+
+unpack_setting_t mqtt_unpack_setting = {
+    .mode = UNPACK_BY_LENGTH_FIELD,
+    .package_max_length = DEFAULT_PACKAGE_MAX_LENGTH,
+    .body_offset = 2,
+    .length_field_offset = 1,
+    .length_field_bytes = 1,
+    .length_field_coding = ENCODE_BY_VARINT,
+};
+
+unpack_setting_t grpc_unpack_setting = {
+    .mode = UNPACK_BY_LENGTH_FIELD,
+    .package_max_length = DEFAULT_PACKAGE_MAX_LENGTH,
+    .body_offset = 5,
+    .length_field_offset = 1,
+    .length_field_bytes = 4,
+    .length_field_coding = ENCODE_BY_BIG_ENDIAN,
+};
+*/
+
 END_EXTERN_C
 
 #endif // HV_LOOP_H_

+ 11 - 5
event/nio.c

@@ -5,6 +5,7 @@
 #include "hssl.h"
 #include "hlog.h"
 #include "hthread.h"
+#include "unpack.h"
 
 static void __connect_timeout_cb(htimer_t* timer) {
     hio_t* io = (hio_t*)timer->privdata;
@@ -71,6 +72,11 @@ static void __read_cb(hio_t* io, void* buf, int readbytes) {
         htimer_reset(io->keepalive_timer);
     }
 
+    if (io->unpack_setting) {
+        hio_unpack(io, buf, readbytes);
+        return;
+    }
+
     if (io->read_cb) {
         // printd("read_cb------\n");
         io->read_cb(io, buf, readbytes);
@@ -174,6 +180,9 @@ accept:
     // NOTE: inherit from listenio
     connio->accept_cb = io->accept_cb;
     connio->userdata = io->userdata;
+    if (io->unpack_setting) {
+        hio_set_unpack(connio, io->unpack_setting);
+    }
 
     if (io->io_type == HIO_TYPE_SSL) {
         if (connio->ssl == NULL) {
@@ -298,11 +307,8 @@ static void nio_read(hio_t* io) {
     void* buf;
     int len, nread;
 read:
-    if (io->readbuf.base == NULL || io->readbuf.len == 0) {
-        hio_set_readbuf(io, io->loop->readbuf.base, io->loop->readbuf.len);
-    }
-    buf = io->readbuf.base;
-    len = io->readbuf.len;
+    buf = io->readbuf.base + io->readbuf.offset;
+    len = io->readbuf.len - io->readbuf.offset;
     nread = __nio_read(io, buf, len);
     // printd("read retval=%d\n", nread);
     if (nread < 0) {

+ 0 - 3
event/overlapio.c

@@ -47,9 +47,6 @@ int post_recv(hio_t* io, hoverlapped_t* hovlp) {
     hovlp->fd = io->fd;
     hovlp->event = HV_READ;
     hovlp->io = io;
-    if (io->readbuf.base == NULL || io->readbuf.len == 0) {
-        hio_set_readbuf(io, io->loop->readbuf.base, io->loop->readbuf.len);
-    }
     hovlp->buf.len = io->readbuf.len;
     if (io->io_type == HIO_TYPE_UDP || io->io_type == HIO_TYPE_IP) {
         HV_ALLOC(hovlp->buf.buf, hovlp->buf.len);

+ 184 - 0
event/unpack.c

@@ -0,0 +1,184 @@
+#include "unpack.h"
+#include "hevent.h"
+#include "herr.h"
+#include "hlog.h"
+#include "hmath.h"
+
+int hio_unpack(hio_t* io, void* buf, int readbytes) {
+    unpack_setting_t* setting = io->unpack_setting;
+    switch(setting->mode) {
+    case UNPACK_BY_FIXED_LENGTH:
+        return hio_unpack_by_fixed_length(io, buf, readbytes);
+    case UNPACK_BY_DELIMITER:
+        return hio_unpack_by_delimiter(io, buf, readbytes);
+    case UNPACK_BY_LENGTH_FIELD:
+        return hio_unpack_by_length_field(io, buf, readbytes);
+    default:
+        if (io->read_cb) {
+            io->read_cb(io, buf, readbytes);
+        }
+        return readbytes;
+    }
+}
+
+int hio_unpack_by_fixed_length(hio_t* io, void* buf, int readbytes) {
+    const unsigned char* sp = (const unsigned char*)io->readbuf.base;
+    assert(buf == sp + io->readbuf.offset);
+    const unsigned char* ep = sp + io->readbuf.offset + readbytes;
+    unpack_setting_t* setting = io->unpack_setting;
+
+    int fixed_length = setting->fixed_length;
+    assert(io->readbuf.len >= fixed_length);
+
+    const unsigned char* p = sp;
+    int remain = ep - p;
+    int handled = 0;
+    while (remain >= fixed_length) {
+        if (io->read_cb) {
+            io->read_cb(io, (void*)p, fixed_length);
+        }
+        handled += fixed_length;
+        p += fixed_length;
+        remain -= fixed_length;
+    }
+
+    io->readbuf.offset = remain;
+    if (remain) {
+        // [p, p+remain] => [base, base+remain]
+        if (p != (unsigned char*)io->readbuf.base) {
+            memmove(io->readbuf.base, p, remain);
+        }
+    }
+
+    return handled;
+}
+
+int hio_unpack_by_delimiter(hio_t* io, void* buf, int readbytes) {
+    const unsigned char* sp = (const unsigned char*)io->readbuf.base;
+    assert(buf == sp + io->readbuf.offset);
+    const unsigned char* ep = sp + io->readbuf.offset + readbytes;
+    unpack_setting_t* setting = io->unpack_setting;
+
+    unsigned char* delimiter = setting->delimiter;
+    int delimiter_bytes = setting->delimiter_bytes;
+
+    // [offset - package_eof_bytes + 1, offset + readbytes]
+    const unsigned char* p = sp + io->readbuf.offset - delimiter_bytes + 1;
+    if (p < sp) p = sp;
+    int remain = ep - p;
+    int handled = 0;
+    int i = 0;
+    while (remain >= delimiter_bytes) {
+        for (i = 0; i < delimiter_bytes; ++i) {
+            if (p[i] != delimiter[i]) {
+                goto not_match;
+            }
+        }
+match:
+        p += delimiter_bytes;
+        remain -= delimiter_bytes;
+        if (io->read_cb) {
+            io->read_cb(io, (void*)sp, p - sp);
+        }
+        handled += p - sp;
+        sp = p;
+        continue;
+not_match:
+        ++p;
+        --remain;
+    }
+
+    remain = ep - sp;
+    io->readbuf.offset = remain;
+    if (remain) {
+        // [sp, sp+remain] => [base, base+remain]
+        if (sp != (unsigned char*)io->readbuf.base) {
+            memmove(io->readbuf.base, sp, remain);
+        }
+        if (io->readbuf.offset == io->readbuf.len) {
+            if (io->readbuf.len >= setting->package_max_length) {
+                hloge("recv package over %d bytes!", (int)setting->package_max_length);
+                io->error = ERR_OVER_LIMIT;
+                hio_close(io);
+                return -1;
+            }
+            io->readbuf.len = MIN(io->readbuf.len * 2, setting->package_max_length);
+            io->readbuf.base = (char*)safe_realloc(io->readbuf.base, io->readbuf.len, io->readbuf.offset);
+        }
+    }
+
+    return handled;
+}
+
+int hio_unpack_by_length_field(hio_t* io, void* buf, int readbytes) {
+    const unsigned char* sp = (const unsigned char*)io->readbuf.base;
+    assert(buf == sp + io->readbuf.offset);
+    const unsigned char* ep = sp + io->readbuf.offset + readbytes;
+    unpack_setting_t* setting = io->unpack_setting;
+
+    const unsigned char* p = sp;
+    int remain = ep - p;
+    int handled = 0;
+    unsigned int head_len = setting->body_offset;
+    unsigned int body_len = 0;
+    unsigned int package_len = head_len + body_len;
+    const unsigned char* lp = NULL;
+    while (remain >= setting->body_offset) {
+        body_len = 0;
+        lp = p + setting->length_field_offset;
+        if (setting->length_field_coding == BIG_ENDIAN) {
+            for (int i = 0; i < setting->length_field_bytes; ++i) {
+                body_len = (body_len << 8) | (unsigned int)*lp++;
+            }
+        }
+        else if (setting->length_field_coding == LITTLE_ENDIAN) {
+            for (int i = 0; i < setting->length_field_bytes; ++i) {
+                body_len |= ((unsigned int)*lp++) << (i * 8);
+            }
+        }
+        else if (setting->length_field_coding == ENCODE_BY_VARINT) {
+            int varint_bytes = ep - lp;
+            body_len = varint_decode(lp, &varint_bytes);
+            if (varint_bytes == 0) break;
+            if (varint_bytes == -1) {
+                hloge("varint is too big!");
+                io->error = ERR_OVER_LIMIT;
+                hio_close(io);
+                return -1;
+            }
+            head_len = setting->body_offset + varint_bytes - setting->length_field_bytes;
+        }
+        package_len = head_len + body_len;
+        if (remain >= package_len) {
+            if (io->read_cb) {
+                io->read_cb(io, (void*)p, package_len);
+            }
+            handled += package_len;
+            p += package_len;
+            remain -= package_len;
+        } else {
+            break;
+        }
+    }
+
+    io->readbuf.offset = remain;
+    if (remain) {
+        // [p, p+remain] => [base, base+remain]
+        if (p != (unsigned char*)io->readbuf.base) {
+            memmove(io->readbuf.base, p, remain);
+        }
+        if (package_len > io->readbuf.len) {
+            if (package_len > setting->package_max_length) {
+                hloge("recv package over %d bytes!", (int)setting->package_max_length);
+                io->error = ERR_OVER_LIMIT;
+                hio_close(io);
+                return -1;
+            }
+            io->readbuf.len *= 2;
+            io->readbuf.len = LIMIT(package_len, io->readbuf.len, setting->package_max_length);
+            io->readbuf.base = (char*)safe_realloc(io->readbuf.base, io->readbuf.len, io->readbuf.offset);
+        }
+    }
+
+    return handled;
+}

+ 11 - 0
event/unpack.h

@@ -0,0 +1,11 @@
+#ifndef HV_UNPACK_H_
+#define HV_UNPACK_H_
+
+#include "hloop.h"
+
+int hio_unpack(hio_t* io, void* buf, int readbytes);
+int hio_unpack_by_fixed_length(hio_t* io, void* buf, int readbytes);
+int hio_unpack_by_delimiter(hio_t* io, void* buf, int readbytes);
+int hio_unpack_by_length_field(hio_t* io, void* buf, int readbytes);
+
+#endif // HV_UNPACK_H_

+ 5 - 0
evpp/Channel.h

@@ -190,6 +190,11 @@ public:
         hio_set_heartbeat(io_, interval_ms, send_heartbeat);
     }
 
+    void setUnpack(unpack_setting_t* setting) {
+        if (io_ == NULL) return;
+        hio_set_unpack(io_, setting);
+    }
+
     int startConnect(int port, const char* host = "127.0.0.1") {
         sockaddr_u peeraddr;
         memset(&peeraddr, 0, sizeof(peeraddr));

+ 21 - 2
evpp/TcpClient.h

@@ -48,6 +48,7 @@ public:
         tls = false;
         connect_timeout = 5000;
         enable_reconnect = false;
+        enable_unpack = false;
     }
 
     virtual ~TcpClientTmpl() {
@@ -91,6 +92,9 @@ public:
             channel->setConnectTimeout(connect_timeout);
         }
         channel->onconnect = [this]() {
+            if (enable_unpack) {
+                channel->setUnpack(&unpack_setting);
+            }
             channel->startRead();
             if (onConnection) {
                 onConnection(channel);
@@ -171,8 +175,21 @@ public:
     }
 
     void setReconnect(ReconnectInfo* info) {
-        enable_reconnect = true;
-        reconnect_info = *info;
+        if (info) {
+            enable_reconnect = true;
+            reconnect_info = *info;
+        } else {
+            enable_reconnect = false;
+        }
+    }
+
+    void setUnpack(unpack_setting_t* setting) {
+        if (setting) {
+            enable_unpack = true;
+            unpack_setting = *setting;
+        } else {
+            enable_unpack = false;
+        }
     }
 
 public:
@@ -183,6 +200,8 @@ public:
     int                     connect_timeout;
     bool                    enable_reconnect;
     ReconnectInfo           reconnect_info;
+    bool                    enable_unpack;
+    unpack_setting_t        unpack_setting;
 
     // Callback
     std::function<void(const TSocketChannelPtr&)>           onConnection;

+ 6 - 0
evpp/TcpServer.h

@@ -16,6 +16,7 @@ public:
     TcpServer() {
         listenfd = -1;
         tls = false;
+        enable_unpack = false;
         max_connections = 0xFFFFFFFF;
     }
 
@@ -118,6 +119,9 @@ private:
             // so in this lambda function, no code should be added below.
         };
 
+        if (server->enable_unpack) {
+            channel->setUnpack(&server->unpack_setting);
+        }
         channel->startRead();
         if (server->onConnection) {
             server->onConnection(channel);
@@ -127,6 +131,8 @@ private:
 public:
     int                     listenfd;
     bool                    tls;
+    bool                    enable_unpack;
+    unpack_setting_t        unpack_setting;
     // Callback
     ConnectionCallback      onConnection;
     MessageCallback         onMessage;