1
0
Эх сурвалжийг харах

hio_set_connect_timeout hio_set_keepalive_timeout hio_set_heartbeat

ithewei 5 жил өмнө
parent
commit
91905376cd

+ 1 - 1
base/hlog.h

@@ -46,7 +46,7 @@ typedef enum {
     LOG_LEVEL_SILENT
 } log_level_e;
 
-#define DEFAULT_LOG_FILE            "default"
+#define DEFAULT_LOG_FILE            "libhv"
 #define DEFAULT_LOG_LEVEL           LOG_LEVEL_INFO
 #define DEFAULT_LOG_REMAIN_DAYS     1
 #define DEFAULT_LOG_MAX_BUFSIZE     (1<<14)  // 16k

+ 33 - 21
event/hevent.c

@@ -21,27 +21,6 @@ struct sockaddr* hio_peeraddr(hio_t* io) {
     return io->peeraddr;
 }
 
-void hio_set_readbuf(hio_t* io, void* buf, size_t len) {
-    if (buf == NULL || len == 0) {
-        hloop_t* loop = io->loop;
-        if (loop && (loop->readbuf.base == NULL || loop->readbuf.len == 0)) {
-            loop->readbuf.len = HLOOP_READ_BUFSIZE;
-            loop->readbuf.base = (char*)malloc(loop->readbuf.len);
-            io->readbuf = loop->readbuf;
-        }
-    }
-    else {
-        io->readbuf.base = (char*)buf;
-        io->readbuf.len = len;
-    }
-}
-
-int hio_enable_ssl(hio_t* io) {
-    printd("ssl fd=%d\n", io->fd);
-    io->io_type = HIO_TYPE_SSL;
-    return 0;
-}
-
 void hio_setcb_accept   (hio_t* io, haccept_cb  accept_cb) {
     io->accept_cb = accept_cb;
 }
@@ -79,3 +58,36 @@ void hio_set_peeraddr (hio_t* io, struct sockaddr* addr, int addrlen) {
     }
     memcpy(io->peeraddr, addr, addrlen);
 }
+
+int hio_enable_ssl(hio_t* io) {
+    io->io_type = HIO_TYPE_SSL;
+    return 0;
+}
+
+void hio_set_readbuf(hio_t* io, void* buf, size_t len) {
+    if (buf == NULL || len == 0) {
+        hloop_t* loop = io->loop;
+        if (loop && (loop->readbuf.base == NULL || loop->readbuf.len == 0)) {
+            loop->readbuf.len = HLOOP_READ_BUFSIZE;
+            HV_ALLOC(loop->readbuf.base, loop->readbuf.len);
+            io->readbuf = loop->readbuf;
+        }
+    }
+    else {
+        io->readbuf.base = (char*)buf;
+        io->readbuf.len = len;
+    }
+}
+
+void hio_set_connect_timeout(hio_t* io, int timeout_ms) {
+    io->connect_timeout = timeout_ms;
+}
+
+void hio_set_keepalive_timeout(hio_t* io, int timeout_ms) {
+    io->keepalive_timeout = timeout_ms;
+}
+
+void hio_set_heartbeat(hio_t* io, int interval_ms, hio_send_heartbeat_fn fn) {
+    io->heartbeat_interval = interval_ms;
+    io->heartbeat_fn = fn;
+}

+ 11 - 2
event/hevent.h

@@ -89,6 +89,7 @@ struct hperiod_s {
 QUEUE_DECL(offset_buf_t, write_queue);
 struct hio_s {
     HEVENT_FIELDS
+    // flags
     unsigned    ready       :1;
     unsigned    closed      :1;
     unsigned    accept      :1;
@@ -98,6 +99,7 @@ struct hio_s {
     unsigned    send        :1;
     unsigned    recvfrom    :1;
     unsigned    sendto      :1;
+// public:
     int         fd;
     hio_type_e  io_type;
     int         error;
@@ -113,11 +115,18 @@ struct hio_s {
     hclose_cb   close_cb;
     haccept_cb  accept_cb;
     hconnect_cb connect_cb;
-//private:
+    // timers
+    int         connect_timeout;    // ms
+    htimer_t*   connect_timer;
+    int         keepalive_timeout;  // ms
+    htimer_t*   keepalive_timer;
+    int         heartbeat_interval; // ms
+    hio_send_heartbeat_fn heartbeat_fn;
+    htimer_t*   heartbeat_timer;
+// private:
     int         event_index[2]; // for poll,kqueue
     void*       hovlp;          // for iocp/overlapio
     void*       ssl;            // for SSL
-    htimer_t*   timer;          // for io timeout
 };
 
 #define EVENT_ENTRY(p)          container_of(p, hevent_t, pending_node)

+ 35 - 15
event/hloop.c

@@ -161,18 +161,28 @@ process_timers:
 
 static void hloop_init(hloop_t* loop) {
     loop->status = HLOOP_STATUS_STOP;
+
     // idles
     list_init(&loop->idles);
+
     // timers
     heap_init(&loop->timers, timers_compare);
-    // ios: init when hio_add
-    //io_array_init(&loop->ios, IO_ARRAY_INIT_SIZE);
+
+    // ios: init when hio_get
+    // io_array_init(&loop->ios, IO_ARRAY_INIT_SIZE);
+
+    // readbuf: alloc when hio_set_readbuf
+    // loop->readbuf.len = HLOOP_READ_BUFSIZE;
+    // HV_ALLOC(loop->readbuf.base, loop->readbuf.len);
+
     // iowatcher: init when iowatcher_add_event
-    //iowatcher_init(loop);
+    // iowatcher_init(loop);
+
     // custom_events: init when hloop_post_event
-    //event_queue_init(&loop->custom_events, 4);
+    // event_queue_init(&loop->custom_events, 4);
     loop->sockpair[0] = loop->sockpair[1] = -1;
     hmutex_init(&loop->custom_events_mutex);
+
     // NOTE: init start_time here, because htimer_add use it.
     loop->start_ms = gettimeofday_ms();
     loop->start_hrtime = loop->cur_hrtime = gethrtime_us();
@@ -184,6 +194,7 @@ static void hloop_cleanup(hloop_t* loop) {
     for (int i = 0; i < HEVENT_PRIORITY_SIZE; ++i) {
         loop->pendings[i] = NULL;
     }
+
     // idles
     printd("cleanup idles...\n");
     struct list_node* node = loop->idles.next;
@@ -194,6 +205,7 @@ static void hloop_cleanup(hloop_t* loop) {
         HV_FREE(idle);
     }
     list_init(&loop->idles);
+
     // timers
     printd("cleanup timers...\n");
     htimer_t* timer;
@@ -203,6 +215,7 @@ static void hloop_cleanup(hloop_t* loop) {
         HV_FREE(timer);
     }
     heap_init(&loop->timers, NULL);
+
     // ios
     printd("cleanup ios...\n");
     for (int i = 0; i < loop->ios.maxsize; ++i) {
@@ -215,14 +228,17 @@ static void hloop_cleanup(hloop_t* loop) {
         }
     }
     io_array_cleanup(&loop->ios);
+
     // readbuf
     if (loop->readbuf.base && loop->readbuf.len) {
-        free(loop->readbuf.base);
+        HV_FREE(loop->readbuf.base);
         loop->readbuf.base = NULL;
         loop->readbuf.len = 0;
     }
+
     // iowatcher
     iowatcher_cleanup(loop);
+
     // custom_events
     hmutex_lock(&loop->custom_events_mutex);
     if (loop->sockpair[0] != -1 && loop->sockpair[1] != -1) {
@@ -238,7 +254,6 @@ static void hloop_cleanup(hloop_t* loop) {
 hloop_t* hloop_new(int flags) {
     hloop_t* loop;
     HV_ALLOC_SIZEOF(loop);
-    memset(loop, 0, sizeof(hloop_t));
     hloop_init(loop);
     loop->flags |= flags;
     return loop;
@@ -439,14 +454,6 @@ const char* hio_engine() {
 #endif
 }
 
-void hio_init(hio_t* io) {
-    memset(io, 0, sizeof(hio_t));
-    io->event_type = HEVENT_TYPE_IO;
-    io->event_index[0] = io->event_index[1] = -1;
-    // write_queue init when hwrite try_write failed
-    //write_queue_init(&io->write_queue, 4);;
-}
-
 static void fill_io_type(hio_t* io) {
     int type = 0;
     socklen_t optlen = sizeof(int);
@@ -495,25 +502,37 @@ static void hio_socket_init(hio_t* io) {
     }
 }
 
+void hio_init(hio_t* io) {
+    // write_queue init when hwrite try_write failed
+    // write_queue_init(&io->write_queue, 4);
+}
+
 void hio_ready(hio_t* io) {
     if (io->ready) return;
+    // flags
     io->ready = 1;
     io->closed = 0;
     io->accept = io->connect = io->connectex = 0;
     io->recv = io->send = 0;
     io->recvfrom = io->sendto = 0;
+    // public:
     io->io_type = HIO_TYPE_UNKNOWN;
     io->error = 0;
     io->events = io->revents = 0;
+    // callbacks
     io->read_cb = NULL;
     io->write_cb = NULL;
     io->close_cb = 0;
     io->accept_cb = 0;
     io->connect_cb = 0;
+    // timers
+    io->connect_timer = NULL;
+    // private:
     io->event_index[0] = io->event_index[1] = -1;
     io->hovlp = NULL;
     io->ssl = NULL;
-    io->timer = NULL;
+
+    // io_type
     fill_io_type(io);
     if (io->io_type & HIO_TYPE_SOCKET) {
         hio_socket_init(io);
@@ -553,6 +572,7 @@ hio_t* hio_get(hloop_t* loop, int fd) {
     if (io == NULL) {
         HV_ALLOC_SIZEOF(io);
         hio_init(io);
+        io->event_type = HEVENT_TYPE_IO;
         io->loop = loop;
         io->fd = fd;
         loop->ios.ptr[fd] = io;

+ 29 - 8
event/hloop.h

@@ -5,8 +5,8 @@
 #include "hplatform.h"
 #include "hdef.h"
 
-typedef struct hloop_s  hloop_t;
-typedef struct hevent_s hevent_t;
+typedef struct hloop_s      hloop_t;
+typedef struct hevent_s     hevent_t;
 
 typedef struct hidle_s      hidle_t;
 typedef struct htimer_s     htimer_t;
@@ -54,6 +54,7 @@ typedef enum {
     uint64_t            event_id;       \
     hevent_cb           cb;             \
     void*               userdata;       \
+    void*               privdata;       \
     int                 priority;       \
     struct hevent_s*    pending_next;   \
     HEVENT_FLAGS
@@ -87,6 +88,10 @@ typedef enum {
     HIO_TYPE_SOCKET  = 0x00FFFF00,
 } hio_type_e;
 
+#define HIO_DEFAULT_CONNECT_TIMEOUT     5000    // ms
+#define HIO_DEFAULT_KEEPALIVE_TIMEOUT   75000   // ms
+#define HIO_DEFAULT_HEARTBEAT_INTERVAL  3000    // ms
+
 BEGIN_EXTERN_C
 
 // loop
@@ -108,6 +113,7 @@ HV_EXPORT void     hloop_update_time(hloop_t* loop);
 HV_EXPORT uint64_t hloop_now(hloop_t* loop);          // s
 HV_EXPORT uint64_t hloop_now_ms(hloop_t* loop);       // ms
 HV_EXPORT uint64_t hloop_now_hrtime(hloop_t* loop);   // us
+#define hloop_now_us hloop_now_hrtime
 
 // userdata
 HV_EXPORT void  hloop_set_userdata(hloop_t* loop, void* userdata);
@@ -185,12 +191,6 @@ HV_EXPORT hio_type_e hio_type(hio_t* io);
 HV_EXPORT struct sockaddr* hio_localaddr(hio_t* io);
 HV_EXPORT struct sockaddr* hio_peeraddr (hio_t* io);
 
-// TODO: One loop per thread, one readbuf per loop.
-// But you can pass in your own readbuf instead of the default readbuf to avoid memcopy.
-HV_EXPORT void hio_set_readbuf(hio_t* io, void* buf, size_t len);
-// Enable SSL/TLS is so easy :)
-HV_EXPORT int  hio_enable_ssl(hio_t* io);
-
 // set callbacks
 HV_EXPORT void hio_setcb_accept   (hio_t* io, haccept_cb  accept_cb);
 HV_EXPORT void hio_setcb_connect  (hio_t* io, hconnect_cb connect_cb);
@@ -198,6 +198,27 @@ HV_EXPORT void hio_setcb_read     (hio_t* io, hread_cb    read_cb);
 HV_EXPORT void hio_setcb_write    (hio_t* io, hwrite_cb   write_cb);
 HV_EXPORT void hio_setcb_close    (hio_t* io, hclose_cb   close_cb);
 
+// some useful settings
+// Enable SSL/TLS is so easy :)
+HV_EXPORT int  hio_enable_ssl(hio_t* io);
+// TODO: One loop per thread, one readbuf per loop.
+// But you can pass in your own readbuf instead of the default readbuf to avoid memcopy.
+HV_EXPORT void hio_set_readbuf(hio_t* io, void* buf, size_t len);
+// connect timeout => hclose_cb
+HV_EXPORT void hio_set_connect_timeout(hio_t* io, int timeout_ms DEFAULT(HIO_DEFAULT_CONNECT_TIMEOUT));
+// keepalive timeout => hclose_cb
+HV_EXPORT void hio_set_keepalive_timeout(hio_t* io, int timeout_ms DEFAULT(HIO_DEFAULT_KEEPALIVE_TIMEOUT));
+/*
+void send_heartbeat(hio_t* io) {
+    static char buf[] = "PING\r\n";
+    hio_write(io, buf, 6);
+}
+hio_set_heartbeat(io, 3000, send_heartbeat);
+*/
+typedef void (*hio_send_heartbeat_fn)(hio_t* io);
+// heartbeat interval => hio_send_heartbeat_fn
+HV_EXPORT void hio_set_heartbeat(hio_t* io, int interval_ms, hio_send_heartbeat_fn fn);
+
 // Nonblocking, poll IO events in the loop to call corresponding callback.
 HV_EXPORT int hio_accept (hio_t* io);
 HV_EXPORT int hio_connect(hio_t* io);

+ 158 - 65
event/nio.c

@@ -4,6 +4,148 @@
 #include "hsocket.h"
 #include "hlog.h"
 
+static void __connect_timeout_cb(htimer_t* timer) {
+    hio_t* io = (hio_t*)timer->privdata;
+    if (io) {
+        char localaddrstr[SOCKADDR_STRLEN] = {0};
+        char peeraddrstr[SOCKADDR_STRLEN] = {0};
+        hlogw("connect timeout [%s] <=> [%s]",
+                SOCKADDR_STR(io->localaddr, localaddrstr),
+                SOCKADDR_STR(io->peeraddr, peeraddrstr));
+        hio_close(io);
+    }
+}
+
+static void __keepalive_timeout_cb(htimer_t* timer) {
+    hio_t* io = (hio_t*)timer->privdata;
+    if (io) {
+        char localaddrstr[SOCKADDR_STRLEN] = {0};
+        char peeraddrstr[SOCKADDR_STRLEN] = {0};
+        hlogw("keepalive timeout [%s] <=> [%s]",
+                SOCKADDR_STR(io->localaddr, localaddrstr),
+                SOCKADDR_STR(io->peeraddr, peeraddrstr));
+        hio_close(io);
+    }
+}
+
+static void __heartbeat_timer_cb(htimer_t* timer) {
+    hio_t* io = (hio_t*)timer->privdata;
+    if (io && io->heartbeat_fn) {
+        io->heartbeat_fn(io);
+    }
+}
+
+static void __accept_cb(hio_t* io) {
+    /*
+    char localaddrstr[SOCKADDR_STRLEN] = {0};
+    char peeraddrstr[SOCKADDR_STRLEN] = {0};
+    printd("accept connfd=%d [%s] <= [%s]\n", io->fd,
+            SOCKADDR_STR(io->localaddr, localaddrstr),
+            SOCKADDR_STR(io->peeraddr, peeraddrstr));
+    */
+
+    if (io->accept_cb) {
+        // printd("accept_cb------\n");
+        io->accept_cb(io);
+        // printd("accept_cb======\n");
+    }
+
+    if (io->keepalive_timeout > 0) {
+        io->keepalive_timer = htimer_add(io->loop, __keepalive_timeout_cb, io->keepalive_timeout, 1);
+        io->keepalive_timer->privdata = io;
+    }
+
+    if (io->heartbeat_interval > 0) {
+        io->heartbeat_timer = htimer_add(io->loop, __heartbeat_timer_cb, io->heartbeat_interval, INFINITE);
+        io->heartbeat_timer->privdata = io;
+    }
+}
+
+static void __connect_cb(hio_t* io) {
+    /*
+    char localaddrstr[SOCKADDR_STRLEN] = {0};
+    char peeraddrstr[SOCKADDR_STRLEN] = {0};
+    printd("connect connfd=%d [%s] => [%s]\n", io->fd,
+            SOCKADDR_STR(io->localaddr, localaddrstr),
+            SOCKADDR_STR(io->peeraddr, peeraddrstr));
+    */
+    if (io->connect_timer) {
+        htimer_del(io->connect_timer);
+        io->connect_timer = NULL;
+        io->connect_timeout = 0;
+    }
+
+    if (io->connect_cb) {
+        // printd("connect_cb------\n");
+        io->connect_cb(io);
+        // printd("connect_cb======\n");
+    }
+
+    if (io->keepalive_timeout > 0) {
+        io->keepalive_timer = htimer_add(io->loop, __keepalive_timeout_cb, io->keepalive_timeout, 1);
+        io->keepalive_timer->privdata = io;
+    }
+
+    if (io->heartbeat_interval > 0) {
+        io->heartbeat_timer = htimer_add(io->loop, __heartbeat_timer_cb, io->heartbeat_interval, INFINITE);
+        io->heartbeat_timer->privdata = io;
+    }
+}
+
+static void __read_cb(hio_t* io, void* buf, int readbytes) {
+    // printd("> %.*s\n", readbytes, buf);
+    if (io->keepalive_timer) {
+        htimer_reset(io->keepalive_timer);
+    }
+
+    if (io->read_cb) {
+        // printd("read_cb------\n");
+        io->read_cb(io, buf, readbytes);
+        // printd("read_cb======\n");
+    }
+}
+
+static void __write_cb(hio_t* io, const void* buf, int writebytes) {
+    // printd("< %.*s\n", writebytes, buf);
+    if (io->keepalive_timer) {
+        htimer_reset(io->keepalive_timer);
+    }
+
+    if (io->write_cb) {
+        // printd("write_cb------\n");
+        io->write_cb(io, buf, writebytes);
+        // printd("write_cb======\n");
+    }
+}
+
+static void __close_cb(hio_t* io) {
+    // printd("close fd=%d\n", io->fd);
+    if (io->connect_timer) {
+        htimer_del(io->connect_timer);
+        io->connect_timer = NULL;
+        io->connect_timeout = 0;
+    }
+
+    if (io->keepalive_timer) {
+        htimer_del(io->keepalive_timer);
+        io->keepalive_timer = NULL;
+        io->keepalive_timeout = 0;
+    }
+
+    if (io->heartbeat_timer) {
+        htimer_del(io->heartbeat_timer);
+        io->heartbeat_timer = NULL;
+        io->heartbeat_interval = 0;
+        io->heartbeat_fn = NULL;
+    }
+
+    if (io->close_cb) {
+        // printd("close_cb------\n");
+        io->close_cb(io);
+        // printd("close_cb======\n");
+    }
+}
+
 #ifdef WITH_OPENSSL
 #include "openssl/ssl.h"
 #include "openssl/err.h"
@@ -20,10 +162,10 @@ static void ssl_do_handshark(hio_t* io) {
         io->cb = NULL;
         printd("ssl handshark finished.\n");
         if (io->accept_cb) {
-            io->accept_cb(io);
+            __accept_cb(io);
         }
         else if (io->connect_cb) {
-            io->connect_cb(io);
+            __connect_cb(io);
         }
     }
     else {
@@ -63,6 +205,7 @@ accept:
     getsockname(connfd, io->localaddr, &addrlen);
     connio = hio_get(io->loop, connfd);
     // NOTE: inherit from listenio
+    connio->accept_cb = io->accept_cb;
     connio->userdata = io->userdata;
 
 #ifdef WITH_OPENSSL
@@ -74,7 +217,6 @@ accept:
         SSL* ssl = SSL_new(ssl_ctx);
         SSL_set_fd(ssl, connfd);
         connio->ssl = ssl;
-        connio->accept_cb = io->accept_cb;
         hio_enable_ssl(connio);
         //int ret = SSL_accept(ssl);
         SSL_set_accept_state(ssl);
@@ -82,20 +224,9 @@ accept:
     }
 #endif
 
-    if (io->io_type != HIO_TYPE_SSL) {
+    if (connio->io_type != HIO_TYPE_SSL) {
         // NOTE: SSL call accept_cb after handshark finished
-        if (io->accept_cb) {
-            /*
-            char localaddrstr[SOCKADDR_STRLEN] = {0};
-            char peeraddrstr[SOCKADDR_STRLEN] = {0};
-            printd("accept listenfd=%d connfd=%d [%s] <= [%s]\n", io->fd, connfd,
-                    SOCKADDR_STR(io->localaddr, localaddrstr),
-                    SOCKADDR_STR(io->peeraddr, peeraddrstr));
-            */
-            //printd("accept_cb------\n");
-            io->accept_cb(connio);
-            //printd("accept_cb======\n");
-        }
+        __accept_cb(connio);
     }
     goto accept;
 
@@ -115,13 +246,7 @@ static void nio_connect(hio_t* io) {
     else {
         addrlen = sizeof(sockaddr_u);
         getsockname(io->fd, io->localaddr, &addrlen);
-        /*
-        char localaddrstr[SOCKADDR_STRLEN] = {0};
-        char peeraddrstr[SOCKADDR_STRLEN] = {0};
-        printd("connect connfd=%d [%s] => [%s]\n", io->fd,
-                SOCKADDR_STR(io->localaddr, localaddrstr),
-                SOCKADDR_STR(io->peeraddr, peeraddrstr));
-        */
+
 #ifdef WITH_OPENSSL
         if (io->io_type == HIO_TYPE_SSL) {
             SSL_CTX* ssl_ctx = (SSL_CTX*)ssl_ctx_instance();
@@ -136,13 +261,10 @@ static void nio_connect(hio_t* io) {
             ssl_do_handshark(io);
         }
 #endif
+
         if (io->io_type != HIO_TYPE_SSL) {
             // NOTE: SSL call connect_cb after handshark finished
-            if (io->connect_cb) {
-                //printd("connect_cb------\n");
-                io->connect_cb(io);
-                //printd("connect_cb======\n");
-            }
+            __connect_cb(io);
         }
         return;
     }
@@ -199,12 +321,7 @@ read:
     if (nread == 0) {
         goto disconnect;
     }
-    //printd("> %.*s\n", nread, buf);
-    if (io->read_cb) {
-        //printd("read_cb------\n");
-        io->read_cb(io, buf, nread);
-        //printd("read_cb======\n");
-    }
+    __read_cb(io, buf, nread);
     if (nread == len) {
         goto read;
     }
@@ -260,11 +377,7 @@ write:
     if (nwrite == 0) {
         goto disconnect;
     }
-    if (io->write_cb) {
-        //printd("write_cb------\n");
-        io->write_cb(io, buf, nwrite);
-        //printd("write_cb======\n");
-    }
+    __write_cb(io, buf, nwrite);
     pbuf->offset += nwrite;
     if (nwrite == len) {
         HV_FREE(pbuf->base);
@@ -298,10 +411,6 @@ static void hio_handle_events(hio_t* io) {
             // NOTE: connect just do once
             // ONESHOT
             io->connect = 0;
-            if (io->timer) {
-                htimer_del(io->timer);
-                io->timer = NULL;
-            }
 
             nio_connect(io);
         }
@@ -318,11 +427,6 @@ int hio_accept(hio_t* io) {
     return 0;
 }
 
-#define CONNECT_TIMEOUT     5000 // ms
-static void connect_timeout_cb(htimer_t* timer) {
-    hio_close((hio_t*)timer->userdata);
-}
-
 int hio_connect(hio_t* io) {
     int ret = connect(io->fd, io->peeraddr, SOCKADDR_LEN(io->peeraddr));
 #ifdef OS_WIN
@@ -336,14 +440,12 @@ int hio_connect(hio_t* io) {
     }
     if (ret == 0) {
         // connect ok
-        if (io->connect_cb) {
-            io->connect_cb(io);
-        }
+        __connect_cb(io);
         return 0;
     }
-    htimer_t* timer = htimer_add(io->loop, connect_timeout_cb, CONNECT_TIMEOUT, 1);
-    timer->userdata = io;
-    io->timer = timer;
+    int timeout = io->connect_timeout ? io->connect_timeout : HIO_DEFAULT_CONNECT_TIMEOUT;
+    io->connect_timer = htimer_add(io->loop, __connect_timeout_cb, timeout, 1);
+    io->connect_timer->privdata = io;
     return hio_add(io, hio_handle_events, HV_WRITE);
 }
 
@@ -392,11 +494,7 @@ try_write:
         if (nwrite == 0) {
             goto disconnect;
         }
-        if (io->write_cb) {
-            //printd("try_write_cb------\n");
-            io->write_cb(io, buf, nwrite);
-            //printd("try_write_cb======\n");
-        }
+        __write_cb(io, buf, nwrite);
         if (nwrite == len) {
             //goto write_done;
             return nwrite;
@@ -424,7 +522,6 @@ disconnect:
 }
 
 int hio_close (hio_t* io) {
-    printd("close fd=%d\n", io->fd);
     if (io->closed) return 0;
     io->closed = 1;
     hio_del(io, HV_RDWR);
@@ -438,11 +535,7 @@ int hio_close (hio_t* io) {
         SSL_free((SSL*)io->ssl);
     }
 #endif
-    if (io->close_cb) {
-        //printd("close_cb------\n");
-        io->close_cb(io);
-        //printd("close_cb======\n");
-    }
+    __close_cb(io);
     return 0;
 }
 #endif

+ 7 - 0
examples/nc.c

@@ -14,6 +14,12 @@ hio_t*      sockio = NULL;
 
 int verbose = 0;
 
+void send_heartbeat(hio_t* io) {
+    static char buf[] = "PING\r\n";
+    // printf("send_heartbeat %s", buf);
+    hio_write(io, buf, 6);
+}
+
 void on_recv(hio_t* io, void* buf, int readbytes) {
     //printf("on_recv fd=%d readbytes=%d\n", hio_fd(io), readbytes);
     if (verbose) {
@@ -123,6 +129,7 @@ Examples: nc 127.0.0.1 80\n\
     hio_setcb_close(sockio, on_close);
     hio_setcb_read(sockio, on_recv);
     hio_set_readbuf(sockio, recvbuf, RECV_BUFSIZE);
+    // hio_set_heartbeat(sockio, 1000, send_heartbeat);
 
     hloop_run(loop);
     hloop_free(&loop);

+ 4 - 31
http/server/HttpHandler.h

@@ -1,17 +1,9 @@
 #ifndef HTTP_HANDLER_H_
 #define HTTP_HANDLER_H_
 
-#include "HttpParser.h"
 #include "HttpService.h"
+#include "HttpParser.h"
 #include "FileCache.h"
-#include "hloop.h"
-
-#define HTTP_KEEPALIVE_TIMEOUT  75 // s
-
-static inline void on_keepalive_timeout(htimer_t* timer) {
-    hio_t* io = (hio_t*)hevent_userdata(timer);
-    hio_close(io);
-}
 
 class HttpHandler {
 public:
@@ -20,29 +12,20 @@ public:
     int                     port;
     // for handle_request
     HttpService*            service;
-    FileCache*              files;
     HttpParser*             parser;
+    FileCache*              files;
     HttpRequest             req;
     HttpResponse            res;
     file_cache_t*           fc;
-    // for keepalive
-    hio_t*                  io;
-    htimer_t*               keepalive_timer;
 
     HttpHandler() {
         service = NULL;
-        files = NULL;
         parser = NULL;
+        files = NULL;
         fc = NULL;
-        io = NULL;
-        keepalive_timer = NULL;
     }
 
     ~HttpHandler() {
-        if (keepalive_timer) {
-            htimer_del(keepalive_timer);
-            keepalive_timer = NULL;
-        }
     }
 
     // @workflow: preprocessor -> api -> web -> postprocessor
@@ -50,19 +33,9 @@ public:
     int HandleRequest();
 
     void Reset() {
-        fc = NULL;
         req.Reset();
         res.Reset();
-    }
-
-    void KeepAlive() {
-        if (keepalive_timer == NULL) {
-            keepalive_timer = htimer_add(hevent_loop(io), on_keepalive_timeout, HTTP_KEEPALIVE_TIMEOUT*1000, 1);
-            hevent_set_userdata(keepalive_timer, io);
-        }
-        else {
-            htimer_reset(keepalive_timer);
-        }
+        fc = NULL;
     }
 };
 

+ 1 - 5
http/server/HttpServer.cpp

@@ -68,9 +68,6 @@ static void on_recv(hio_t* io, void* _buf, int readbytes) {
     }
 
     if (parser->WantRecv()) {
-        // NOTE: KeepAlive will reset keepalive_timer,
-        // if no data recv within keepalive timeout, closesocket actively.
-        handler->KeepAlive();
         return;
     }
 
@@ -198,7 +195,6 @@ handle_request:
         res->status_code, http_status_str(res->status_code));
 
     if (keepalive) {
-        handler->KeepAlive();
         handler->Reset();
         parser->InitRequest(req);
     }
@@ -229,6 +225,7 @@ static void on_accept(hio_t* io) {
     hio_setcb_close(io, on_close);
     hio_setcb_read(io, on_recv);
     hio_read(io);
+    hio_set_keepalive_timeout(io, HIO_DEFAULT_KEEPALIVE_TIMEOUT);
     // new HttpHandler
     // delete on_close
     HttpHandler* handler = new HttpHandler;
@@ -236,7 +233,6 @@ static void on_accept(hio_t* io) {
     handler->files = &s_filecache;
     sockaddr_ip((sockaddr_u*)hio_peeraddr(io), handler->ip, sizeof(handler->ip));
     handler->port = sockaddr_port((sockaddr_u*)hio_peeraddr(io));
-    handler->io = io;
     hevent_set_userdata(io, handler);
 }