1
0
Эх сурвалжийг харах

Add hio_set_read_timeout, hio_set_write_timeout

ithewei 4 жил өмнө
parent
commit
fd8cab6b18
7 өөрчлөгдсөн 199 нэмэгдсэн , 87 устгасан
  1. 2 0
      docs/API.md
  2. 105 6
      event/hevent.c
  3. 9 1
      event/hevent.h
  4. 18 4
      event/hloop.c
  5. 6 0
      event/hloop.h
  6. 48 74
      event/nio.c
  7. 11 2
      evpp/Channel.h

+ 2 - 0
docs/API.md

@@ -438,6 +438,8 @@
 - hio_set_readbuf
 - hio_set_connect_timeout
 - hio_set_close_timeout
+- hio_set_read_timeout
+- hio_set_write_timeout
 - hio_set_keepalive_timeout
 - hio_set_heartbeat
 - hio_set_unpack

+ 105 - 6
event/hevent.c

@@ -42,7 +42,7 @@ static void fill_io_type(hio_t* io) {
 }
 
 static void hio_socket_init(hio_t* io) {
-    if (io->io_type & HIO_TYPE_SOCK_RAW || io->io_type & HIO_TYPE_SOCK_DGRAM) {
+    if ((io->io_type & HIO_TYPE_SOCK_DGRAM) || (io->io_type & HIO_TYPE_SOCK_RAW)) {
         // NOTE: sendto multiple peeraddr cannot use io->write_queue
         blocking(io->fd);
     } else {
@@ -97,6 +97,7 @@ void hio_ready(hio_t* io) {
     io->io_type = HIO_TYPE_UNKNOWN;
     io->error = 0;
     io->events = io->revents = 0;
+    io->last_read_hrtime = io->last_write_hrtime = io->loop->cur_hrtime;
     // readbuf
     io->alloced_readbuf = 0;
     io->readbuf.base = io->loop->readbuf.base;
@@ -118,6 +119,10 @@ void hio_ready(hio_t* io) {
     io->connect_timer = NULL;
     io->close_timeout = 0;
     io->close_timer = NULL;
+    io->read_timeout = 0;
+    io->read_timer = NULL;
+    io->write_timeout = 0;
+    io->write_timer = NULL;
     io->keepalive_timeout = 0;
     io->keepalive_timer = NULL;
     io->heartbeat_interval = 0;
@@ -146,7 +151,7 @@ void hio_ready(hio_t* io) {
     }
 
 #if WITH_RUDP
-    if (io->io_type & HIO_TYPE_SOCK_RAW || io->io_type & HIO_TYPE_SOCK_DGRAM) {
+    if ((io->io_type & HIO_TYPE_SOCK_DGRAM) || (io->io_type & HIO_TYPE_SOCK_RAW)) {
         rudp_init(&io->rudp);
     }
 #endif
@@ -173,7 +178,7 @@ void hio_done(hio_t* io) {
     hrecursive_mutex_unlock(&io->write_mutex);
 
 #if WITH_RUDP
-    if (io->io_type & HIO_TYPE_SOCK_RAW || io->io_type & HIO_TYPE_SOCK_DGRAM) {
+    if ((io->io_type & HIO_TYPE_SOCK_DGRAM) || (io->io_type & HIO_TYPE_SOCK_RAW)) {
         rudp_cleanup(&io->rudp);
     }
 #endif
@@ -486,6 +491,22 @@ void hio_del_close_timer(hio_t* io) {
     }
 }
 
+void hio_del_read_timer(hio_t* io) {
+    if (io->read_timer) {
+        htimer_del(io->read_timer);
+        io->read_timer = NULL;
+        io->read_timeout = 0;
+    }
+}
+
+void hio_del_write_timer(hio_t* io) {
+    if (io->write_timer) {
+        htimer_del(io->write_timer);
+        io->write_timer = NULL;
+        io->write_timeout = 0;
+    }
+}
+
 void hio_del_keepalive_timer(hio_t* io) {
     if (io->keepalive_timer) {
         htimer_del(io->keepalive_timer);
@@ -511,9 +532,87 @@ void hio_set_close_timeout(hio_t* io, int timeout_ms) {
     io->close_timeout = timeout_ms;
 }
 
+static void __read_timeout_cb(htimer_t* timer) {
+    hio_t* io = (hio_t*)timer->privdata;
+    uint64_t inactive_ms = (io->loop->cur_hrtime - io->last_read_hrtime) / 1000;
+    if (inactive_ms + 100 < io->read_timeout) {
+        ((struct htimeout_s*)io->read_timer)->timeout = io->read_timeout - inactive_ms;
+        htimer_reset(io->read_timer);
+    } else {
+        char localaddrstr[SOCKADDR_STRLEN] = {0};
+        char peeraddrstr[SOCKADDR_STRLEN] = {0};
+        hlogw("read timeout [%s] <=> [%s]",
+                SOCKADDR_STR(io->localaddr, localaddrstr),
+                SOCKADDR_STR(io->peeraddr, peeraddrstr));
+        io->error = ETIMEDOUT;
+        hio_close(io);
+    }
+}
+
+void hio_set_read_timeout(hio_t* io, int timeout_ms) {
+    if (timeout_ms <= 0) {
+        // del
+        hio_del_read_timer(io);
+        return;
+    }
+
+    if (io->read_timer) {
+        // reset
+        ((struct htimeout_s*)io->read_timer)->timeout = timeout_ms;
+        htimer_reset(io->read_timer);
+    } else {
+        // add
+        io->read_timer = htimer_add(io->loop, __read_timeout_cb, timeout_ms, 1);
+        io->read_timer->privdata = io;
+    }
+    io->read_timeout = timeout_ms;
+}
+
+static void __write_timeout_cb(htimer_t* timer) {
+    hio_t* io = (hio_t*)timer->privdata;
+    uint64_t inactive_ms = (io->loop->cur_hrtime - io->last_write_hrtime) / 1000;
+    if (inactive_ms + 100 < io->write_timeout) {
+        ((struct htimeout_s*)io->write_timer)->timeout = io->write_timeout - inactive_ms;
+        htimer_reset(io->write_timer);
+    } else {
+        char localaddrstr[SOCKADDR_STRLEN] = {0};
+        char peeraddrstr[SOCKADDR_STRLEN] = {0};
+        hlogw("write timeout [%s] <=> [%s]",
+                SOCKADDR_STR(io->localaddr, localaddrstr),
+                SOCKADDR_STR(io->peeraddr, peeraddrstr));
+        io->error = ETIMEDOUT;
+        hio_close(io);
+    }
+}
+
+void hio_set_write_timeout(hio_t* io, int timeout_ms) {
+    if (timeout_ms <= 0) {
+        // del
+        hio_del_write_timer(io);
+        return;
+    }
+
+    if (io->write_timer) {
+        // reset
+        ((struct htimeout_s*)io->write_timer)->timeout = timeout_ms;
+        htimer_reset(io->write_timer);
+    } else {
+        // add
+        io->write_timer = htimer_add(io->loop, __write_timeout_cb, timeout_ms, 1);
+        io->write_timer->privdata = io;
+    }
+    io->write_timeout = timeout_ms;
+}
+
 static void __keepalive_timeout_cb(htimer_t* timer) {
     hio_t* io = (hio_t*)timer->privdata;
-    if (io) {
+    uint64_t last_rw_hrtime = MAX(io->last_read_hrtime, io->last_write_hrtime);
+    uint64_t inactive_ms = (io->loop->cur_hrtime - last_rw_hrtime) / 1000;
+    printf("inactive_ms=%lu\n", inactive_ms);
+    if (inactive_ms + 100 < io->keepalive_timeout) {
+        ((struct htimeout_s*)io->keepalive_timer)->timeout = io->keepalive_timeout - inactive_ms;
+        htimer_reset(io->keepalive_timer);
+    } else {
         char localaddrstr[SOCKADDR_STRLEN] = {0};
         char peeraddrstr[SOCKADDR_STRLEN] = {0};
         hlogw("keepalive timeout [%s] <=> [%s]",
@@ -525,7 +624,7 @@ static void __keepalive_timeout_cb(htimer_t* timer) {
 }
 
 void hio_set_keepalive_timeout(hio_t* io, int timeout_ms) {
-    if (timeout_ms == 0) {
+    if (timeout_ms <= 0) {
         // del
         hio_del_keepalive_timer(io);
         return;
@@ -551,7 +650,7 @@ static void __heartbeat_timer_cb(htimer_t* timer) {
 }
 
 void hio_set_heartbeat(hio_t* io, int interval_ms, hio_send_heartbeat_fn fn) {
-    if (interval_ms == 0) {
+    if (interval_ms <= 0) {
         // del
         hio_del_heartbeat_timer(io);
         return;

+ 9 - 1
event/hevent.h

@@ -95,7 +95,7 @@ struct hperiod_s {
 };
 
 QUEUE_DECL(offset_buf_t, write_queue);
-// sizeof(struct hio_s)=360 on linux-x64
+// sizeof(struct hio_s)=400 on linux-x64
 struct hio_s {
     HEVENT_FIELDS
     // flags
@@ -119,6 +119,8 @@ struct hio_s {
     int         revents;
     struct sockaddr*    localaddr;
     struct sockaddr*    peeraddr;
+    uint64_t            last_read_hrtime;
+    uint64_t            last_write_hrtime;
     // read
     fifo_buf_t          readbuf;
     unsigned int        read_flags;
@@ -141,11 +143,15 @@ struct hio_s {
     // timers
     int         connect_timeout;    // ms
     int         close_timeout;      // ms
+    int         read_timeout;       // ms
+    int         write_timeout;      // ms
     int         keepalive_timeout;  // ms
     int         heartbeat_interval; // ms
     hio_send_heartbeat_fn heartbeat_fn;
     htimer_t*   connect_timer;
     htimer_t*   close_timer;
+    htimer_t*   read_timer;
+    htimer_t*   write_timer;
     htimer_t*   keepalive_timer;
     htimer_t*   heartbeat_timer;
     // upstream
@@ -198,6 +204,8 @@ void hio_close_cb(hio_t* io);
 
 void hio_del_connect_timer(hio_t* io);
 void hio_del_close_timer(hio_t* io);
+void hio_del_read_timer(hio_t* io);
+void hio_del_write_timer(hio_t* io);
 void hio_del_keepalive_timer(hio_t* io);
 void hio_del_heartbeat_timer(hio_t* io);
 

+ 18 - 4
event/hloop.c

@@ -212,7 +212,8 @@ static int hloop_create_sockpair(hloop_t* loop) {
         hloge("socketpair create failed!");
         return -1;
     }
-    hread(loop, loop->sockpair[SOCKPAIR_READ_INDEX], loop->readbuf.base, loop->readbuf.len, sockpair_read_cb);
+    hio_t* io = hread(loop, loop->sockpair[SOCKPAIR_READ_INDEX], loop->readbuf.base, loop->readbuf.len, sockpair_read_cb);
+    io->priority = HEVENT_HIGH_PRIORITY;
     // NOTE: Avoid duplication closesocket in hio_cleanup
     loop->sockpair[SOCKPAIR_READ_INDEX] = -1;
     ++loop->intern_nevents;
@@ -235,13 +236,14 @@ void hloop_post_event(hloop_t* loop, hevent_t* ev) {
         ev->event_id = hloop_next_event_id();
     }
 
+    int nsend = 0;
     hmutex_lock(&loop->custom_events_mutex);
     if (loop->sockpair[SOCKPAIR_WRITE_INDEX] == -1) {
         if (hloop_create_sockpair(loop) != 0) {
             goto unlock;
         }
     }
-    int nsend = send(loop->sockpair[SOCKPAIR_WRITE_INDEX], "e", 1, 0);
+    nsend = send(loop->sockpair[SOCKPAIR_WRITE_INDEX], "e", 1, 0);
     if (nsend != 1) {
         hloge("send failed!");
         goto unlock;
@@ -372,6 +374,7 @@ void hloop_free(hloop_t** pp) {
 int hloop_run(hloop_t* loop) {
     if (loop == NULL) return -1;
     if (loop->status == HLOOP_STATUS_RUNNING) return -2;
+
     loop->status = HLOOP_STATUS_RUNNING;
     loop->pid = hv_getpid();
     loop->tid = hv_gettid();
@@ -396,7 +399,8 @@ int hloop_run(hloop_t* loop) {
             continue;
         }
         ++loop->loop_cnt;
-        if (loop->nactives <= loop->intern_nevents && loop->flags & HLOOP_FLAG_QUIT_WHEN_NO_ACTIVE_EVENTS) {
+        if ((loop->flags & HLOOP_FLAG_QUIT_WHEN_NO_ACTIVE_EVENTS) &&
+            loop->nactives <= loop->intern_nevents) {
             break;
         }
         hloop_process_events(loop);
@@ -404,6 +408,7 @@ int hloop_run(hloop_t* loop) {
             break;
         }
     }
+
     loop->status = HLOOP_STATUS_STOP;
     loop->end_hrtime = gethrtime_us();
 
@@ -467,6 +472,16 @@ uint64_t hloop_now_hrtime(hloop_t* loop) {
     return loop->start_ms * 1000 + (loop->cur_hrtime - loop->start_hrtime);
 }
 
+uint64_t hio_last_read_time(hio_t* io) {
+    hloop_t* loop = io->loop;
+    return loop->start_ms + (io->last_read_hrtime - loop->start_hrtime) / 1000;
+}
+
+uint64_t hio_last_write_time(hio_t* io) {
+    hloop_t* loop = io->loop;
+    return loop->start_ms + (io->last_write_hrtime - loop->start_hrtime) / 1000;
+}
+
 long hloop_pid(hloop_t* loop) {
     return loop->pid;
 }
@@ -723,7 +738,6 @@ int hio_close_async(hio_t* io) {
     ev.cb = hio_close_event_cb;
     ev.userdata = io;
     ev.privdata = (void*)(uintptr_t)io->id;
-    ev.priority = HEVENT_HIGH_PRIORITY;
     hloop_post_event(io->loop, &ev);
     return 0;
 }

+ 6 - 0
event/hloop.h

@@ -259,6 +259,8 @@ HV_EXPORT bool hio_is_opened(hio_t* io);
 HV_EXPORT bool hio_is_closed(hio_t* io);
 HV_EXPORT size_t hio_read_bufsize(hio_t* io);
 HV_EXPORT size_t hio_write_bufsize(hio_t* io);
+HV_EXPORT uint64_t hio_last_read_time(hio_t* io); // ms
+HV_EXPORT uint64_t hio_last_write_time(hio_t* io); // ms
 
 // set callbacks
 HV_EXPORT void hio_setcb_accept   (hio_t* io, haccept_cb  accept_cb);
@@ -286,6 +288,10 @@ HV_EXPORT void hio_set_readbuf(hio_t* io, void* buf, size_t len);
 HV_EXPORT void hio_set_connect_timeout(hio_t* io, int timeout_ms DEFAULT(HIO_DEFAULT_CONNECT_TIMEOUT));
 // close timeout => hclose_cb
 HV_EXPORT void hio_set_close_timeout(hio_t* io, int timeout_ms DEFAULT(HIO_DEFAULT_CLOSE_TIMEOUT));
+// read timeout => hclose_cb
+HV_EXPORT void hio_set_read_timeout(hio_t* io, int timeout_ms);
+// write timeout => hclose_cb
+HV_EXPORT void hio_set_write_timeout(hio_t* io, int timeout_ms);
 // keepalive timeout => hclose_cb
 HV_EXPORT void hio_set_keepalive_timeout(hio_t* io, int timeout_ms DEFAULT(HIO_DEFAULT_KEEPALIVE_TIMEOUT));
 /*

+ 48 - 74
event/nio.c

@@ -43,17 +43,13 @@ static void __connect_cb(hio_t* io) {
 
 static void __read_cb(hio_t* io, void* buf, int readbytes) {
     // printd("> %.*s\n", readbytes, buf);
-    if (io->keepalive_timer) {
-        htimer_reset(io->keepalive_timer);
-    }
+    io->last_read_hrtime = io->loop->cur_hrtime;
     hio_handle_read(io, buf, readbytes);
 }
 
 static void __write_cb(hio_t* io, const void* buf, int writebytes) {
     // printd("< %.*s\n", writebytes, buf);
-    if (io->keepalive_timer) {
-        htimer_reset(io->keepalive_timer);
-    }
+    io->last_write_hrtime = io->loop->cur_hrtime;
     hio_write_cb(io, buf, writebytes);
 }
 
@@ -61,6 +57,8 @@ static void __close_cb(hio_t* io) {
     // printd("close fd=%d\n", io->fd);
     hio_del_connect_timer(io);
     hio_del_close_timer(io);
+    hio_del_read_timer(io);
+    hio_del_write_timer(io);
     hio_del_keepalive_timer(io);
     hio_del_heartbeat_timer(io);
     hio_close_cb(io);
@@ -112,56 +110,58 @@ static void ssl_client_handshake(hio_t* io) {
 
 static void nio_accept(hio_t* io) {
     // printd("nio_accept listenfd=%d\n", io->fd);
-    int connfd = 0, err = 0;
+    int connfd = 0, err = 0, accept_cnt = 0;
     socklen_t addrlen;
-accept:
-    addrlen = sizeof(sockaddr_u);
-    connfd = accept(io->fd, io->peeraddr, &addrlen);
     hio_t* connio = NULL;
-    if (connfd < 0) {
-        err = socket_errno();
-        if (err == EAGAIN) {
-            //goto accept_done;
-            return;
-        } else {
-            perror("accept");
-            io->error = err;
-            goto accept_error;
-        }
-    }
-    addrlen = sizeof(sockaddr_u);
-    getsockname(connfd, io->localaddr, &addrlen);
-    connio = hio_get(io->loop, connfd);
-    // NOTE: inherit from listenio
-    connio->accept_cb = io->accept_cb;
-    connio->userdata = io->userdata;
-    if (io->unpack_setting) {
-        hio_set_unpack(connio, io->unpack_setting);
-    }
-
-    if (io->io_type == HIO_TYPE_SSL) {
-        if (connio->ssl == NULL) {
-            hssl_ctx_t ssl_ctx = hssl_ctx_instance();
-            if (ssl_ctx == NULL) {
+    while (accept_cnt++ < 3) {
+        addrlen = sizeof(sockaddr_u);
+        connfd = accept(io->fd, io->peeraddr, &addrlen);
+        if (connfd < 0) {
+            err = socket_errno();
+            if (err == EAGAIN || err == EINTR) {
+                return;
+            } else {
+                perror("accept");
+                io->error = err;
                 goto accept_error;
             }
-            hssl_t ssl = hssl_new(ssl_ctx, connfd);
-            if (ssl == NULL) {
-                goto accept_error;
+        }
+        addrlen = sizeof(sockaddr_u);
+        getsockname(connfd, io->localaddr, &addrlen);
+        connio = hio_get(io->loop, connfd);
+        // NOTE: inherit from listenio
+        connio->accept_cb = io->accept_cb;
+        connio->userdata = io->userdata;
+        if (io->unpack_setting) {
+            hio_set_unpack(connio, io->unpack_setting);
+        }
+
+        if (io->io_type == HIO_TYPE_SSL) {
+            if (connio->ssl == NULL) {
+                hssl_ctx_t ssl_ctx = hssl_ctx_instance();
+                if (ssl_ctx == NULL) {
+                    io->error = HSSL_ERROR;
+                    goto accept_error;
+                }
+                hssl_t ssl = hssl_new(ssl_ctx, connfd);
+                if (ssl == NULL) {
+                    io->error = HSSL_ERROR;
+                    goto accept_error;
+                }
+                connio->ssl = ssl;
             }
-            connio->ssl = ssl;
+            hio_enable_ssl(connio);
+            ssl_server_handshake(connio);
+        }
+        else {
+            // NOTE: SSL call accept_cb after handshake finished
+            __accept_cb(connio);
         }
-        hio_enable_ssl(connio);
-        ssl_server_handshake(connio);
-    }
-    else {
-        // NOTE: SSL call accept_cb after handshake finished
-        __accept_cb(connio);
     }
-
-    goto accept;
+    return;
 
 accept_error:
+    hloge("listenfd=%d accept error: %s:%d", io->fd, socket_strerror(io->error), io->error);
     hio_close(io);
 }
 
@@ -419,16 +419,6 @@ int hio_read (hio_t* io) {
     return hio_add(io, hio_handle_events, HV_READ);
 }
 
-static void hio_write_event_cb(hevent_t* ev) {
-    hio_t* io = (hio_t*)ev->userdata;
-    if (io->closed) return;
-    uint32_t id = (uintptr_t)ev->privdata;
-    if (io->id != id) return;
-    if (io->keepalive_timer) {
-        htimer_reset(io->keepalive_timer);
-    }
-}
-
 int hio_write (hio_t* io, const void* buf, size_t len) {
     if (io->closed) {
         hloge("hio_write called but fd[%d] already closed!", io->fd);
@@ -460,23 +450,7 @@ try_write:
         if (nwrite == 0) {
             goto disconnect;
         }
-
-        // __write_cb(io, buf, nwrite);
-        if (io->keepalive_timer) {
-            if (hv_gettid() == io->loop->tid) {
-                htimer_reset(io->keepalive_timer);
-            } else {
-                hevent_t ev;
-                memset(&ev, 0, sizeof(ev));
-                ev.cb = hio_write_event_cb;
-                ev.userdata = io;
-                ev.privdata = (void*)(uintptr_t)io->id;
-                ev.priority = HEVENT_HIGH_PRIORITY;
-                hloop_post_event(io->loop, &ev);
-            }
-        }
-        hio_write_cb(io, buf, nwrite);
-
+        __write_cb(io, buf, nwrite);
         if (nwrite == len) {
             //goto write_done;
             hrecursive_mutex_unlock(&io->write_mutex);

+ 11 - 2
evpp/Channel.h

@@ -175,8 +175,7 @@ private:
 
 class SocketChannel : public Channel {
 public:
-    // for TcpClient
-    std::function<void()>   onconnect;
+    std::function<void()>   onconnect; // only for TcpClient
     std::function<void()>   heartbeat;
 
     SocketChannel(hio_t* io) : Channel(io) {
@@ -197,6 +196,16 @@ public:
         hio_set_close_timeout(io_, timeout_ms);
     }
 
+    void setReadTimeout(int timeout_ms) {
+        if (io_ == NULL) return;
+        hio_set_read_timeout(io_, timeout_ms);
+    }
+
+    void setWriteTimeout(int timeout_ms) {
+        if (io_ == NULL) return;
+        hio_set_write_timeout(io_, timeout_ms);
+    }
+
     void setKeepaliveTimeout(int timeout_ms) {
         if (io_ == NULL) return;
         hio_set_keepalive_timeout(io_, timeout_ms);