Browse Source

evpp: c++ classes of event

ithewei 4 years ago
parent
commit
7272667d2e

+ 1 - 0
.gitignore

@@ -51,6 +51,7 @@ bin
 tmp
 dist
 test
+*_test
 build
 hconfig.h
 html/uploads

+ 1 - 0
Makefile.vars

@@ -32,6 +32,7 @@ BASE_HEADERS =  base/hplatform.h\
 				base/hthreadpool.h\
 				base/hobjectpool.h\
 				base/ifconfig.h\
+				base/ThreadLocalStorage.h\
 
 UTILS_HEADERS = utils/base64.h\
 				utils/md5.h\

+ 1 - 0
TREE.md

@@ -12,6 +12,7 @@
 ├── echo-servers 包含libevent、libev、libuv、libhv、asio、poco、muduo等多个网络库的tcp echo server写法,并做压力测试
 ├── etc         应用程序配置目录
 ├── event       libhv事件循环模块
+├── evpp        事件循环c++封装类
 ├── examples    示例代码
 │   └── httpd
 ├── html        网页document_root目录

+ 15 - 0
base/ThreadLocalStorage.cpp

@@ -0,0 +1,15 @@
+#include "ThreadLocalStorage.h"
+
+#include "hthread.h"
+
+ThreadLocalStorage ThreadLocalStorage::tls[ThreadLocalStorage::MAX_NUM];
+
+const char* ThreadLocalStorage::threadName() {
+    static char unnamed[32] = {0};
+    void* value = get(THREAD_NAME);
+    if (value) {
+        return (char*)value;
+    }
+    snprintf(unnamed, sizeof(unnamed)-1, "thread-%ld", hv_gettid());
+    return unnamed;
+}

+ 70 - 0
base/ThreadLocalStorage.h

@@ -0,0 +1,70 @@
+#ifndef HV_THREAD_LOCAL_STORAGE_H_
+#define HV_THREAD_LOCAL_STORAGE_H_
+
+#include "hexport.h"
+#include "hplatform.h"
+
+#ifdef OS_WIN
+
+#define hthread_key_t               DWORD
+#define INVALID_HTHREAD_KEY         0xFFFFFFFF
+#define hthread_key_create(pkey)    *pkey = TlsAlloc()
+#define hthread_key_delete          TlsFree
+#define hthread_get_value           TlsGetValue
+#define hthread_set_value           TlsSetValue
+
+#else
+
+#define hthread_key_t               pthread_key_t
+#define INVALID_HTHREAD_KEY         0xFFFFFFFF
+#define hthread_key_create(pkey)    pthread_key_create(pkey, NULL)
+#define hthread_key_delete          pthread_key_delete
+#define hthread_get_value           pthread_getspecific
+#define hthread_set_value           pthread_setspecific
+
+#endif
+
+#ifdef __cplusplus
+class HV_EXPORT ThreadLocalStorage {
+public:
+    enum {
+        THREAD_NAME = 0,
+        EVENT_LOOP  = 1,
+        MAX_NUM     = 16,
+    };
+    ThreadLocalStorage() {
+        hthread_key_create(&key);
+    }
+
+    ~ThreadLocalStorage() {
+        hthread_key_delete(key);
+    }
+
+    void set(void* val) {
+        hthread_set_value(key, val);
+    }
+
+    void* get() {
+        return hthread_get_value(key);
+    }
+
+    static void set(int idx, void* val) {
+        return tls[idx].set(val);
+    }
+
+    static void* get(int idx) {
+        return tls[idx].get();
+    }
+
+    static void setThreadName(const char* name) {
+        set(THREAD_NAME, (void*)name);
+    }
+    static const char* threadName();
+
+private:
+    hthread_key_t key;
+    static ThreadLocalStorage tls[MAX_NUM];
+};
+#endif
+
+#endif // HV_THREAD_LOCAL_STORAGE_H_

+ 0 - 20
base/hthread.c

@@ -1,20 +0,0 @@
-#include "hthread.h"
-
-static hthread_key_t tls_thread_name = INVALID_HTHREAD_KEY;
-
-void hthread_setname(const char* name) {
-    if (tls_thread_name == INVALID_HTHREAD_KEY) {
-        hthread_key_create(&tls_thread_name);
-    }
-    hthread_set_value(tls_thread_name, name);
-}
-
-const char* hthread_getname() {
-    static char unnamed[32];
-    void* value = hthread_get_value(tls_thread_name);
-    if (value) {
-        return (char*)value;
-    }
-    snprintf(unnamed, sizeof(unnamed)-1, "thread-%ld", hv_gettid());
-    return unnamed;
-}

+ 1 - 20
base/hthread.h

@@ -1,7 +1,6 @@
 #ifndef HV_THREAD_H_
 #define HV_THREAD_H_
 
-#include "hexport.h"
 #include "hplatform.h"
 
 #ifdef OS_WIN
@@ -61,14 +60,8 @@ static inline int hthread_join(hthread_t th) {
     return 0;
 }
 
-#define hthread_key_t               DWORD
-#define INVALID_HTHREAD_KEY         0xFFFFFFFF
-#define hthread_key_create(pkey)    *pkey = TlsAlloc()
-#define hthread_key_delete          TlsFree
-#define hthread_get_value           TlsGetValue
-#define hthread_set_value           TlsSetValue
-
 #else
+
 typedef pthread_t   hthread_t;
 typedef void* (*hthread_routine)(void*);
 #define HTHREAD_RETTYPE void*
@@ -83,20 +76,8 @@ static inline int hthread_join(hthread_t th) {
     return pthread_join(th, NULL);
 }
 
-#define hthread_key_t               pthread_key_t
-#define INVALID_HTHREAD_KEY         0xFFFFFFFF
-#define hthread_key_create(pkey)    pthread_key_create(pkey, NULL)
-#define hthread_key_delete          pthread_key_delete
-#define hthread_get_value           pthread_getspecific
-#define hthread_set_value           pthread_setspecific
-
 #endif
 
-BEGIN_EXTERN_C
-HV_EXPORT void hthread_setname(const char* name);
-HV_EXPORT const char* hthread_getname();
-END_EXTERN_C
-
 #ifdef __cplusplus
 /************************************************
  * HThread

+ 1 - 0
cmake/vars.cmake

@@ -26,6 +26,7 @@ set(BASE_HEADERS
     base/hthreadpool.h
     base/hobjectpool.h
     base/ifconfig.h
+    base/ThreadLocalStorage.h
 )
 
 set(UTILS_HEADERS

+ 8 - 0
event/hevent.c

@@ -44,6 +44,14 @@ struct sockaddr* hio_peeraddr(hio_t* io) {
     return io->peeraddr;
 }
 
+void hio_set_context(hio_t* io, void* ctx) {
+    io->ctx = ctx;
+}
+
+void* hio_context(hio_t* io) {
+    return io->ctx;
+}
+
 void hio_setcb_accept   (hio_t* io, haccept_cb  accept_cb) {
     io->accept_cb = accept_cb;
 }

+ 1 - 0
event/hevent.h

@@ -129,6 +129,7 @@ struct hio_s {
     int         event_index[2]; // for poll,kqueue
     void*       hovlp;          // for iocp/overlapio
     void*       ssl;            // for SSL
+    void*       ctx;
 };
 /*
  * hio lifeline:

+ 4 - 2
event/hloop.c

@@ -774,8 +774,10 @@ int hio_del(hio_t* io, int events) {
 #endif
     if (!io->active) return -1;
 
-    iowatcher_del_event(io->loop, io->fd, events);
-    io->events &= ~events;
+    if (io->events) {
+        iowatcher_del_event(io->loop, io->fd, events);
+        io->events &= ~events;
+    }
     if (io->events == 0) {
         io->loop->nios--;
         // NOTE: not EVENT_DEL, avoid free

+ 2 - 0
event/hloop.h

@@ -212,6 +212,8 @@ HV_EXPORT int hio_revents (hio_t* io);
 HV_EXPORT hio_type_e       hio_type     (hio_t* io);
 HV_EXPORT struct sockaddr* hio_localaddr(hio_t* io);
 HV_EXPORT struct sockaddr* hio_peeraddr (hio_t* io);
+HV_EXPORT void hio_set_context(hio_t* io, void* ctx);
+HV_EXPORT void* hio_context(hio_t* io);
 HV_EXPORT bool hio_is_opened(hio_t* io);
 HV_EXPORT bool hio_is_closed(hio_t* io);
 

+ 15 - 0
evpp/Buffer.h

@@ -0,0 +1,15 @@
+#ifndef HV_BUFFER_HPP_
+#define HV_BUFFER_HPP_
+
+#include <memory>
+
+#include "hbuf.h"
+
+namespace hv {
+
+typedef HBuf Buffer;
+typedef std::shared_ptr<Buffer>     BufferPtr;
+
+}
+
+#endif // HV_BUFFER_HPP_

+ 17 - 0
evpp/Callback.h

@@ -0,0 +1,17 @@
+#ifndef HV_CALLBACK_HPP_
+#define HV_CALLBACK_HPP_
+
+#include <functional>
+
+#include "Buffer.h"
+#include "Channel.h"
+
+namespace hv {
+
+typedef std::function<void(const SocketChannelPtr&)>            ConnectionCallback;
+typedef std::function<void(const SocketChannelPtr&, Buffer*)>   MessageCallback;
+typedef std::function<void(const SocketChannelPtr&, Buffer*)>   WriteCompleteCallback;
+
+}
+
+#endif // HV_CALLBACK_HPP_

+ 155 - 0
evpp/Channel.h

@@ -0,0 +1,155 @@
+#ifndef HV_CHANNEL_HPP_
+#define HV_CHANNEL_HPP_
+
+#include <functional>
+#include <memory>
+
+#include "hloop.h"
+#include "hsocket.h"
+
+#include "Buffer.h"
+
+namespace hv {
+
+class Channel {
+public:
+    Channel(hio_t* io = NULL) {
+        io_ = io;
+        fd_ = -1;
+        id_ = 0;
+        ctx_ = NULL;
+        if (io) {
+            fd_ = hio_fd(io);
+            id_ = hio_id(io);
+            hio_set_context(io, this);
+            hio_setcb_read(io_, on_read);
+            hio_setcb_write(io_, on_write);
+            hio_setcb_close(io_, on_close);
+        }
+    }
+
+    virtual ~Channel() {
+        close();
+    }
+
+    hio_t* io() { return io_; }
+    int fd() { return fd_; }
+    int id() { return id_; }
+    int error() { return hio_error(io_); }
+
+    void setContext(void* ctx) {
+        ctx_ = ctx;
+    }
+    void* context() {
+        return ctx_;
+    }
+
+    bool isOpened() {
+        if (io_ == NULL) return false;
+        return id_ == hio_id(io_) && hio_is_opened(io_);
+    }
+    bool isClosed() {
+        return !isOpened();
+    }
+
+    int startRead() {
+        if (!isOpened()) return 0;
+        return hio_read_start(io_);
+    }
+
+    int stopRead() {
+        if (!isOpened()) return 0;
+        return hio_read_stop(io_);
+    }
+
+    int write(Buffer* buf) {
+        if (!isOpened()) return 0;
+        return hio_write(io_, buf->data(), buf->size());
+    }
+
+    int write(const std::string& str) {
+        if (!isOpened()) return 0;
+        return hio_write(io_, str.data(), str.size());
+    }
+
+    int close() {
+        if (!isOpened()) return 0;
+        return hio_close(io_);
+    }
+
+public:
+    hio_t*      io_;
+    int         fd_;
+    uint32_t    id_;
+    void*       ctx_;
+    std::function<void(Buffer*)> onread;
+    std::function<void(Buffer*)> onwrite;
+    std::function<void()>        onclose;
+
+private:
+    static void on_read(hio_t* io, void* data, int readbytes) {
+        Channel* channel = (Channel*)hio_context(io);
+        if (channel && channel->onread) {
+            Buffer buf(data, readbytes);
+            channel->onread(&buf);
+        }
+    }
+
+    static void on_write(hio_t* io, const void* data, int writebytes) {
+        Channel* channel = (Channel*)hio_context(io);
+        if (channel && channel->onwrite) {
+            Buffer buf((void*)data, writebytes);
+            channel->onwrite(&buf);
+        }
+    }
+
+    static void on_close(hio_t* io) {
+        Channel* channel = (Channel*)hio_context(io);
+        if (channel && channel->onclose) {
+            channel->onclose();
+        }
+    }
+};
+
+class SocketChannel : public Channel {
+public:
+    enum Status {
+        OPENED,
+        CONNECTING,
+        CONNECTED,
+        DISCONNECTED,
+        CLOSED,
+    } status;
+
+    SocketChannel(hio_t* io) : Channel(io) {
+        status = isOpened() ? OPENED : CLOSED;
+    }
+    virtual ~SocketChannel() {}
+
+    bool isConnected() {
+        return isOpened() && status == CONNECTED;
+    }
+
+    std::string localaddr() {
+        struct sockaddr* addr = hio_localaddr(io_);
+        char buf[SOCKADDR_STRLEN] = {0};
+        return SOCKADDR_STR(addr, buf);
+    }
+
+    std::string peeraddr() {
+        struct sockaddr* addr = hio_peeraddr(io_);
+        char buf[SOCKADDR_STRLEN] = {0};
+        return SOCKADDR_STR(addr, buf);
+    }
+
+    int send(const std::string& str) {
+        return write(str);
+    }
+};
+
+typedef std::shared_ptr<Channel>        ChannelPtr;
+typedef std::shared_ptr<SocketChannel>  SocketChannelPtr;
+
+}
+
+#endif // HV_CHANNEL_HPP_

+ 33 - 0
evpp/EventLoop.h

@@ -11,6 +11,7 @@
 
 #include "Status.h"
 #include "Event.h"
+#include "ThreadLocalStorage.h"
 
 namespace hv {
 
@@ -43,6 +44,7 @@ public:
     // @brief Run loop forever
     void run() {
         if (loop_ == NULL) return;
+        ThreadLocalStorage::set(ThreadLocalStorage::EVENT_LOOP, this);
         setStatus(kRunning);
         hloop_run(loop_);
         setStatus(kStopped);
@@ -209,6 +211,37 @@ private:
 
 typedef std::shared_ptr<EventLoop> EventLoopPtr;
 
+// ThreadLocalStorage
+static inline EventLoop* tlsEventLoop() {
+    return (EventLoop*)ThreadLocalStorage::get(ThreadLocalStorage::EVENT_LOOP);
+}
+
+static inline TimerID setTimer(int timeout_ms, TimerCallback cb, int repeat = INFINITE) {
+    EventLoop* loop = tlsEventLoop();
+    if (loop == NULL) return (TimerID)-1;
+    return loop->setTimer(timeout_ms, cb, repeat);
+}
+
+static inline void killTimer(TimerID timerID) {
+    EventLoop* loop = tlsEventLoop();
+    if (loop == NULL) return;
+    loop->killTimer(timerID);
+}
+
+static inline void resetTimer(TimerID timerID) {
+    EventLoop* loop = tlsEventLoop();
+    if (loop == NULL) return;
+    loop->resetTimer(timerID);
+}
+
+static inline TimerID setTimeout(int timeout_ms, TimerCallback cb) {
+    return setTimer(timeout_ms, cb, 1);
+}
+
+static inline TimerID setInterval(int interval_ms, TimerCallback cb) {
+    return setTimer(interval_ms, cb, INFINITE);
+}
+
 }
 
 #endif // HV_EVENT_LOOP_HPP_

+ 4 - 0
evpp/EventLoopThread.h

@@ -3,6 +3,8 @@
 
 #include <thread>
 
+#include "hlog.h"
+
 #include "EventLoop.h"
 
 namespace hv {
@@ -82,6 +84,7 @@ public:
 
 private:
     void loop_thread(const Functor& pre, const Functor& post) {
+        hlogi("EventLoopThread started, tid=%ld", hv_gettid());
         setStatus(kStarted);
 
         if (pre) {
@@ -100,6 +103,7 @@ private:
         }
 
         setStatus(kStopped);
+        hlogi("EventLoopThread stopped, tid=%ld", hv_gettid());
     }
 
 private:

+ 2 - 1
evpp/README.md

@@ -10,7 +10,8 @@ hloop.h中的c接口被封装成了c++的类,参考了muduo和evpp。
 
 ```
 .
-├── Channel.h               IO管道类,封装了hio_t
+├── Buffer.h                缓存类
+├── Channel.h               通道类,封装了hio_t
 ├── Event.h                 事件类,封装了hevent_t、htimer_t
 ├── EventLoop.h             事件循环类,封装了hloop_t
 ├── EventLoopThread.h       事件循环线程类,组合了EventLoop和thread

+ 195 - 0
evpp/TcpClient.h

@@ -0,0 +1,195 @@
+#ifndef HV_TCP_CLIENT_HPP_
+#define HV_TCP_CLIENT_HPP_
+
+#include "hsocket.h"
+#include "hssl.h"
+#include "hlog.h"
+
+#include "EventLoopThread.h"
+#include "Callback.h"
+#include "Channel.h"
+
+namespace hv {
+
+struct ReconnectInfo {
+    uint32_t min_delay;  // ms
+    uint32_t max_delay;  // ms
+    uint32_t cur_delay;  // ms
+    /*
+     * @delay_policy
+     * 0: fixed
+     * min_delay=3s => 3,3,3...
+     * 1: linear
+     * min_delay=3s max_delay=10s => 3,6,9,10,10...
+     * other: exponential
+     * min_delay=3s max_delay=60s delay_policy=2 => 3,6,12,24,48,60,60...
+     */
+    uint32_t delay_policy;
+    uint32_t max_retry_cnt;
+    uint32_t cur_retry_cnt;
+
+    ReconnectInfo() {
+        min_delay = 1000;
+        max_delay = 60000;
+        cur_delay = 0;
+        // 1,2,4,8,16,32,60,60...
+        delay_policy = 2;
+        max_retry_cnt = INFINITE;
+        cur_retry_cnt = 0;
+    }
+};
+
+class TcpClient {
+public:
+    TcpClient() {
+        tls = false;
+        connect_timeout = 5000;
+        enable_reconnect = false;
+    }
+
+    ~TcpClient() {
+    }
+
+    EventLoopPtr loop() {
+        return loop_thread.loop();
+    }
+
+    //@retval >=0 connfd, <0 error
+    int createsocket(int port, const char* host = "127.0.0.1") {
+        memset(&peeraddr, 0, sizeof(peeraddr));
+        int ret = sockaddr_set_ipport(&peeraddr, host, port);
+        if (ret != 0) {
+            return -1;
+        }
+        return createsocket(&peeraddr.sa);
+    }
+
+    int createsocket(struct sockaddr* peeraddr) {
+        int connfd = socket(peeraddr->sa_family, SOCK_STREAM, 0);
+        // SOCKADDR_PRINT(peeraddr);
+        if (connfd < 0) {
+            perror("socket");
+            return -2;
+        }
+
+        hio_t* io = hio_get(loop_thread.hloop(), connfd);
+        assert(io != NULL);
+        hio_set_peeraddr(io, peeraddr, SOCKADDR_LEN(peeraddr));
+        channel.reset(new SocketChannel(io));
+        return connfd;
+    }
+
+    int startConnect() {
+        assert(channel != NULL);
+        channel->onread = [this](Buffer* buf) {
+            if (onMessage) {
+                onMessage(channel, buf);
+            }
+        };
+        channel->onwrite = [this](Buffer* buf) {
+            if (onWriteComplete) {
+                onWriteComplete(channel, buf);
+            }
+        };
+        channel->onclose = [this]() {
+            channel->status = SocketChannel::CLOSED;
+            if (onConnection) {
+                onConnection(channel);
+            }
+            channel = NULL;
+            // reconnect
+            if (enable_reconnect) {
+                startReconnect();
+            }
+        };
+
+        hio_t* connio = channel->io();
+        hevent_set_userdata(connio, this);
+        if (tls) {
+            hio_enable_ssl(connio);
+        }
+        hio_set_connect_timeout(connio, connect_timeout);
+        hio_setcb_connect(connio, onConnect);
+        hio_connect(connio);
+        return 0;
+    }
+
+    int startReconnect() {
+        if (++reconnect_info.cur_retry_cnt > reconnect_info.max_retry_cnt) return 0;
+        if (reconnect_info.delay_policy == 0) {
+            // fixed
+            reconnect_info.cur_delay = reconnect_info.min_delay;
+        } else if (reconnect_info.delay_policy == 1) {
+            // linear
+            reconnect_info.cur_delay += reconnect_info.min_delay;
+        } else {
+            // exponential
+            reconnect_info.cur_delay *= reconnect_info.delay_policy;
+        }
+        reconnect_info.cur_delay = MAX(reconnect_info.cur_delay, reconnect_info.min_delay);
+        reconnect_info.cur_delay = MIN(reconnect_info.cur_delay, reconnect_info.max_delay);
+        loop_thread.loop()->setTimeout(reconnect_info.cur_delay, [this](TimerID timerID){
+            hlogi("reconnect... cnt=%d, delay=%d", reconnect_info.cur_retry_cnt, reconnect_info.cur_delay);
+            // printf("reconnect... cnt=%d, delay=%d\n", reconnect_info.cur_retry_cnt, reconnect_info.cur_delay);
+            createsocket(&peeraddr.sa);
+            startConnect();
+        });
+        return 0;
+    }
+
+    void start(bool wait_threads_started = true) {
+        loop_thread.start(wait_threads_started, std::bind(&TcpClient::startConnect, this));
+    }
+    void stop(bool wait_threads_stopped = true) {
+        loop_thread.stop(wait_threads_stopped);
+    }
+
+    int withTLS(const char* cert_file = NULL, const char* key_file = NULL) {
+        tls = true;
+        hssl_ctx_init_param_t param;
+        memset(&param, 0, sizeof(param));
+        param.crt_file = cert_file;
+        param.key_file = key_file;
+        return hssl_ctx_init(&param) == NULL ? -1 : 0;
+    }
+
+    void setConnectTimeout(int ms) {
+        connect_timeout = ms;
+    }
+
+    void setReconnect(ReconnectInfo* info) {
+        enable_reconnect = true;
+        reconnect_info = *info;
+    }
+
+private:
+    static void onConnect(hio_t* io) {
+        TcpClient* client = (TcpClient*)hevent_userdata(io);
+        SocketChannelPtr channel = client->channel;
+        channel->status = SocketChannel::CONNECTED;
+        channel->startRead();
+        if (client->onConnection) {
+            client->onConnection(channel);
+        }
+    }
+
+public:
+    SocketChannelPtr        channel;
+
+    sockaddr_u              peeraddr;
+    bool                    tls;
+    int                     connect_timeout;
+    bool                    enable_reconnect;
+    ReconnectInfo           reconnect_info;
+
+    // Callback
+    ConnectionCallback      onConnection;
+    MessageCallback         onMessage;
+    WriteCompleteCallback   onWriteComplete;
+private:
+    EventLoopThread         loop_thread;
+};
+
+}
+
+#endif // HV_TCP_CLIENT_HPP_

+ 63 - 0
evpp/TcpClient_test.cpp

@@ -0,0 +1,63 @@
+/*
+ * TcpClient_test.cpp
+ *
+ * @build
+ * make libhv && sudo make install
+ * g++ -std=c++11 TcpClient_test.cpp -o TcpClient_test -I/usr/local/include/hv -lhv -lpthread
+ *
+ */
+
+#include "TcpClient.h"
+#include "htime.h"
+
+using namespace hv;
+
+int main(int argc, char* argv[]) {
+    if (argc < 2) {
+        printf("Usage: %s port\n", argv[0]);
+        return -10;
+    }
+    int port = atoi(argv[1]);
+
+    TcpClient cli;
+    int connfd = cli.createsocket(port);
+    if (connfd < 0) {
+        return -20;
+    }
+    printf("client connect to port %d, connfd=%d ...\n", port, connfd);
+    cli.onConnection = [](const SocketChannelPtr& channel) {
+        std::string peeraddr = channel->peeraddr();
+        if (channel->isConnected()) {
+            printf("connected to %s! connfd=%d\n", peeraddr.c_str(), channel->fd());
+            // send(time) every 3s
+            setInterval(3000, [channel](TimerID timerID){
+                if (channel->isConnected()) {
+                    char str[DATETIME_FMT_BUFLEN] = {0};
+                    datetime_t dt = datetime_now();
+                    datetime_fmt(&dt, str);
+                    channel->send(str);
+                } else {
+                    killTimer(timerID);
+                }
+            });
+        } else {
+            printf("disconnected to %s! connfd=%d\n", peeraddr.c_str(), channel->fd());
+        }
+    };
+    cli.onMessage = [](const SocketChannelPtr& channel, Buffer* buf) {
+        printf("< %.*s\n", (int)buf->size(), (char*)buf->data());
+    };
+    cli.onWriteComplete = [](const SocketChannelPtr& channel, Buffer* buf) {
+        printf("> %.*s\n", (int)buf->size(), (char*)buf->data());
+    };
+    // reconnect: 1,2,4,8,10,10,10...
+    ReconnectInfo reconn;
+    reconn.min_delay = 1000;
+    reconn.max_delay = 10000;
+    reconn.delay_policy = 2;
+    cli.setReconnect(&reconn);
+    cli.start();
+
+    while (1) sleep(1);
+    return 0;
+}

+ 142 - 0
evpp/TcpServer.h

@@ -0,0 +1,142 @@
+#ifndef HV_TCP_SERVER_HPP_
+#define HV_TCP_SERVER_HPP_
+
+#include "hsocket.h"
+#include "hssl.h"
+#include "hlog.h"
+
+#include "EventLoopThreadPool.h"
+#include "Callback.h"
+#include "Channel.h"
+
+namespace hv {
+
+class TcpServer {
+public:
+    TcpServer() {
+        listenfd = -1;
+        tls = false;
+        max_connections = 0xFFFFFFFF;
+        connection_num = 0;
+    }
+
+    ~TcpServer() {
+    }
+
+    //@retval >=0 listenfd, <0 error
+    int createsocket(int port, const char* host = "0.0.0.0") {
+        listenfd = Listen(port, host);
+        return listenfd;
+    }
+
+    void setThreadNum(int num) {
+        loop_threads.setThreadNum(num);
+    }
+    void start(bool wait_threads_started = true) {
+        loop_threads.start(wait_threads_started, [this](const EventLoopPtr& loop){
+            assert(listenfd >= 0);
+            hio_t* listenio = haccept(loop->loop(), listenfd, onAccept);
+            hevent_set_userdata(listenio, this);
+            if (tls) {
+                hio_enable_ssl(listenio);
+            }
+        });
+    }
+    void stop(bool wait_threads_stopped = true) {
+        loop_threads.stop(wait_threads_stopped);
+    }
+
+    EventLoopPtr loop(int idx = -1) {
+        return loop_threads.loop(idx);
+    }
+    hloop_t* hloop(int idx = -1) {
+        return loop_threads.hloop(idx);
+    }
+
+    int withTLS(const char* cert_file, const char* key_file) {
+        tls = true;
+        hssl_ctx_init_param_t param;
+        memset(&param, 0, sizeof(param));
+        param.crt_file = cert_file;
+        param.key_file = key_file;
+        return hssl_ctx_init(&param) == NULL ? -1 : 0;
+    }
+
+    // channel
+    SocketChannelPtr addChannel(hio_t* io) {
+        std::lock_guard<std::mutex> locker(mutex_);
+        SocketChannelPtr channel(new SocketChannel(io));
+        int fd = channel->fd();
+        if (channels.size() < fd) {
+            channels.resize(2 * fd);
+        }
+        channels[fd] = channel;
+        return channel;
+    }
+
+    void removeChannel(ChannelPtr channel) {
+        std::lock_guard<std::mutex> locker(mutex_);
+        int fd = channel->fd();
+        if (fd < channels.size()) {
+            channels[fd] = NULL;
+        }
+    }
+
+private:
+    static void onAccept(hio_t* connio) {
+        TcpServer* server = (TcpServer*)hevent_userdata(connio);
+        if (server->connection_num >= server->max_connections) {
+            hlogw("over max_connections");
+            hio_close(connio);
+            return;
+        }
+        SocketChannelPtr channel = server->addChannel(connio);
+        channel->status = SocketChannel::CONNECTED;
+        ++server->connection_num;
+
+        channel->onread = [server, channel](Buffer* buf) {
+            if (server->onMessage) {
+                server->onMessage(channel, buf);
+            }
+        };
+        channel->onwrite = [server, channel](Buffer* buf) {
+            if (server->onWriteComplete) {
+                server->onWriteComplete(channel, buf);
+            }
+        };
+        channel->onclose = [server, channel]() {
+            channel->status = SocketChannel::CLOSED;
+            if (server->onConnection) {
+                server->onConnection(channel);
+            }
+            server->removeChannel(channel);
+            --server->connection_num;
+        };
+
+        channel->startRead();
+        if (server->onConnection) {
+            server->onConnection(channel);
+        }
+    }
+
+public:
+    int                     listenfd;
+    bool                    tls;
+    // Callback
+    ConnectionCallback      onConnection;
+    MessageCallback         onMessage;
+    WriteCompleteCallback   onWriteComplete;
+
+    uint32_t                max_connections;
+    std::atomic<uint32_t>   connection_num;
+
+private:
+    EventLoopThreadPool     loop_threads;
+    // with fd as index
+    std::vector<SocketChannelPtr>   channels; // GUAREDE_BY(mutex_)
+    std::mutex                      mutex_;
+};
+
+}
+
+#endif // HV_TCP_SERVER_HPP_

+ 48 - 0
evpp/TcpServer_test.cpp

@@ -0,0 +1,48 @@
+/*
+ * TcpServer_test.cpp
+ *
+ * @build
+ * make libhv && sudo make install
+ * g++ -std=c++11 TcpServer_test.cpp -o TcpServer_test -I/usr/local/include/hv -lhv -lpthread
+ *
+ */
+
+#include "TcpServer.h"
+
+using namespace hv;
+
+int main(int argc, char* argv[]) {
+    if (argc < 2) {
+        printf("Usage: %s port\n", argv[0]);
+        return -10;
+    }
+    int port = atoi(argv[1]);
+
+    TcpServer srv;
+    int listenfd = srv.createsocket(port);
+    if (listenfd < 0) {
+        return -20;
+    }
+    printf("server listen on port %d, listenfd=%d ...\n", port, listenfd);
+    srv.onConnection = [](const SocketChannelPtr& channel) {
+        std::string peeraddr = channel->peeraddr();
+        if (channel->isConnected()) {
+            printf("%s connected! connfd=%d\n", peeraddr.c_str(), channel->fd());
+        } else {
+            printf("%s disconnected! connfd=%d\n", peeraddr.c_str(), channel->fd());
+        }
+    };
+    srv.onMessage = [](const SocketChannelPtr& channel, Buffer* buf) {
+        // echo
+        printf("< %.*s\n", (int)buf->size(), (char*)buf->data());
+        channel->write(buf);
+    };
+    srv.onWriteComplete = [](const SocketChannelPtr& channel, Buffer* buf) {
+        printf("> %.*s\n", (int)buf->size(), (char*)buf->data());
+    };
+    srv.setThreadNum(4);
+    srv.start();
+
+    while (1) sleep(1);
+    return 0;
+}

+ 76 - 0
evpp/UdpClient.h

@@ -0,0 +1,76 @@
+#ifndef HV_UDP_CLIENT_HPP_
+#define HV_UDP_CLIENT_HPP_
+
+#include "hsocket.h"
+
+#include "EventLoopThread.h"
+#include "Callback.h"
+#include "Channel.h"
+
+namespace hv {
+
+class UdpClient {
+public:
+    UdpClient() {
+    }
+
+    ~UdpClient() {
+    }
+
+    EventLoopPtr loop() {
+        return loop_thread.loop();
+    }
+
+    //@retval >=0 sockfd, <0 error
+    int createsocket(int port, const char* host = "127.0.0.1") {
+        hio_t* io = hloop_create_udp_client(loop_thread.hloop(), host, port);
+        if (io == NULL) return -1;
+        channel.reset(new SocketChannel(io));
+        return channel->fd();
+    }
+
+    void start(bool wait_threads_started = true) {
+        loop_thread.start(wait_threads_started,
+            [this]() {
+                assert(channel != NULL);
+                channel->onread = [this](Buffer* buf) {
+                    if (onMessage) {
+                        onMessage(channel, buf);
+                    }
+                };
+                channel->onwrite = [this](Buffer* buf) {
+                    if (onWriteComplete) {
+                        onWriteComplete(channel, buf);
+                    }
+                };
+                channel->startRead();
+                return 0;
+            }
+        );
+    }
+    void stop(bool wait_threads_stopped = true) {
+        loop_thread.stop(wait_threads_stopped);
+    }
+
+    int sendto(Buffer* buf) {
+        if (channel == NULL) return 0;
+        return channel->write(buf);
+    }
+
+    int sendto(const std::string& str) {
+        if (channel == NULL) return 0;
+        return channel->write(str);
+    }
+
+public:
+    SocketChannelPtr        channel;
+    // Callback
+    MessageCallback         onMessage;
+    WriteCompleteCallback   onWriteComplete;
+private:
+    EventLoopThread         loop_thread;
+};
+
+}
+
+#endif // HV_UDP_CLIENT_HPP_

+ 46 - 0
evpp/UdpClient_test.cpp

@@ -0,0 +1,46 @@
+/*
+ * UdpClient_test.cpp
+ *
+ * @build
+ * make libhv && sudo make install
+ * g++ -std=c++11 UdpClient_test.cpp -o UdpClient_test -I/usr/local/include/hv -lhv -lpthread
+ *
+ */
+
+#include "UdpClient.h"
+#include "htime.h"
+
+using namespace hv;
+
+int main(int argc, char* argv[]) {
+    if (argc < 2) {
+        printf("Usage: %s port\n", argv[0]);
+        return -10;
+    }
+    int port = atoi(argv[1]);
+
+    UdpClient cli;
+    int sockfd = cli.createsocket(port);
+    if (sockfd < 0) {
+        return -20;
+    }
+    printf("client sendto port %d, sockfd=%d ...\n", port, sockfd);
+    cli.onMessage = [](const SocketChannelPtr& channel, Buffer* buf) {
+        printf("< %.*s\n", (int)buf->size(), (char*)buf->data());
+    };
+    cli.onWriteComplete = [](const SocketChannelPtr& channel, Buffer* buf) {
+        printf("> %.*s\n", (int)buf->size(), (char*)buf->data());
+    };
+    cli.start();
+
+    // sendto(time) every 3s
+    cli.loop()->setInterval(3000, [&cli](TimerID timerID) {
+        char str[DATETIME_FMT_BUFLEN] = {0};
+        datetime_t dt = datetime_now();
+        datetime_fmt(&dt, str);
+        cli.sendto(str);
+    });
+
+    while (1) sleep(1);
+    return 0;
+}

+ 79 - 0
evpp/UdpServer.h

@@ -0,0 +1,79 @@
+#ifndef HV_UDP_SERVER_HPP_
+#define HV_UDP_SERVER_HPP_
+
+#include "hsocket.h"
+
+#include "EventLoopThreadPool.h"
+#include "Callback.h"
+#include "Channel.h"
+
+namespace hv {
+
+class UdpServer {
+public:
+    UdpServer() {
+    }
+
+    ~UdpServer() {
+    }
+
+    EventLoopPtr loop() {
+        return loop_thread.loop();
+    }
+
+    //@retval >=0 bindfd, <0 error
+    int createsocket(int port, const char* host = "0.0.0.0") {
+        hio_t* io = hloop_create_udp_server(loop_thread.hloop(), host, port);
+        if (io == NULL) return -1;
+        channel.reset(new SocketChannel(io));
+        return channel->fd();
+    }
+
+    void start(bool wait_threads_started = true) {
+        loop_thread.start(wait_threads_started,
+            [this]() {
+                assert(channel != NULL);
+                channel->onread = [this](Buffer* buf) {
+                    if (onMessage) {
+                        onMessage(channel, buf);
+                    }
+                };
+                channel->onwrite = [this](Buffer* buf) {
+                    if (onWriteComplete) {
+                        onWriteComplete(channel, buf);
+                    }
+                };
+                channel->startRead();
+                return 0;
+            }
+        );
+    }
+    void stop(bool wait_threads_stopped = true) {
+        loop_thread.stop(wait_threads_stopped);
+    }
+
+    int sendto(Buffer* buf, struct sockaddr* peeraddr = NULL) {
+        if (channel == NULL) return 0;
+        if (peeraddr) hio_set_peeraddr(channel->io(), peeraddr, SOCKADDR_LEN(peeraddr));
+        return channel->write(buf);
+    }
+
+    int sendto(const std::string& str, struct sockaddr* peeraddr = NULL) {
+        if (channel == NULL) return 0;
+        if (peeraddr) hio_set_peeraddr(channel->io(), peeraddr, SOCKADDR_LEN(peeraddr));
+        return channel->write(str);
+    }
+
+public:
+    SocketChannelPtr        channel;
+    // Callback
+    MessageCallback         onMessage;
+    WriteCompleteCallback   onWriteComplete;
+
+private:
+    EventLoopThread         loop_thread;
+};
+
+}
+
+#endif // HV_UDP_SERVER_HPP_

+ 39 - 0
evpp/UdpServer_test.cpp

@@ -0,0 +1,39 @@
+/*
+ * UdpServer_test.cpp
+ *
+ * @build
+ * make libhv && sudo make install
+ * g++ -std=c++11 UdpServer_test.cpp -o UdpServer_test -I/usr/local/include/hv -lhv -lpthread
+ *
+ */
+
+#include "UdpServer.h"
+
+using namespace hv;
+
+int main(int argc, char* argv[]) {
+    if (argc < 2) {
+        printf("Usage: %s port\n", argv[0]);
+        return -10;
+    }
+    int port = atoi(argv[1]);
+
+    UdpServer srv;
+    int bindfd = srv.createsocket(port);
+    if (bindfd < 0) {
+        return -20;
+    }
+    printf("server bind on port %d, bindfd=%d ...\n", port, bindfd);
+    srv.onMessage = [](const SocketChannelPtr& channel, Buffer* buf) {
+        // echo
+        printf("< %.*s\n", (int)buf->size(), (char*)buf->data());
+        channel->write(buf);
+    };
+    srv.onWriteComplete = [](const SocketChannelPtr& channel, Buffer* buf) {
+        printf("> %.*s\n", (int)buf->size(), (char*)buf->data());
+    };
+    srv.start();
+
+    while (1) sleep(1);
+    return 0;
+}

+ 19 - 3
evpp/build_test.sh

@@ -1,5 +1,21 @@
 #!/bin/bash
 
-g++ -std=c++11 EventLoop_test.cpp -o EventLoop_test -I/usr/local/include/hv -lhv
-g++ -std=c++11 EventLoopThread_test.cpp -o EventLoopThread_test -I/usr/local/include/hv -lhv -lpthread
-g++ -std=c++11 EventLoopThreadPool_test.cpp -o EventLoopThreadPool_test -I/usr/local/include/hv -lhv -lpthread
+CC=gcc
+CXX=g++
+CFLAGS="-g -O0 -Wall"
+CXXFLAGS="-std=c++11"
+INCFLAGS="-I/usr/local/include/hv"
+LDFLAGS="-lhv -lpthread"
+
+# EventLoop
+$CXX $CFLAGS $CXXFLAGS EventLoop_test.cpp -o EventLoop_test $INCFLAGS $LDFLAGS
+$CXX $CFLAGS $CXXFLAGS EventLoopThread_test.cpp -o EventLoopThread_test $INCFLAGS $LDFLAGS
+$CXX $CFLAGS $CXXFLAGS EventLoopThreadPool_test.cpp -o EventLoopThreadPool_test $INCFLAGS $LDFLAGS
+
+# TCP
+$CXX $CFLAGS $CXXFLAGS TcpServer_test.cpp -o TcpServer_test $INCFLAGS $LDFLAGS
+$CXX $CFLAGS $CXXFLAGS TcpClient_test.cpp -o TcpClient_test $INCFLAGS $LDFLAGS
+
+# UDP
+$CXX $CFLAGS $CXXFLAGS UdpServer_test.cpp -o UdpServer_test $INCFLAGS $LDFLAGS
+$CXX $CFLAGS $CXXFLAGS UdpClient_test.cpp -o UdpClient_test $INCFLAGS $LDFLAGS

+ 5 - 1
examples/http_client_test.cpp

@@ -45,7 +45,11 @@ static void test_http_sync_client() {
         printf("%s\n", resp->body.c_str());
     }
 
-    resp = requests::post("127.0.0.1:8080/echo", "hello,world!");
+    hv::Json jroot;
+    jroot["user"] = "admin";
+    http_headers headers;
+    headers["Content-Type"] = "application/json";
+    resp = requests::post("127.0.0.1:8080/echo", jroot.dump(), headers);
     if (resp == NULL) {
         printf("request failed!\n");
     } else {