浏览代码

hio_read_until

ithewei 4 年之前
父节点
当前提交
e6c99dd501
共有 7 个文件被更改,包括 211 次插入100 次删除
  1. 92 16
      event/hevent.c
  2. 22 5
      event/hevent.h
  3. 6 1
      event/hloop.c
  4. 3 0
      event/hloop.h
  5. 38 54
      event/nio.c
  6. 4 12
      event/unpack.c
  7. 46 12
      examples/tcp_echo_server.c

+ 92 - 16
event/hevent.c

@@ -53,7 +53,7 @@ void* hio_context(hio_t* io) {
     return io->ctx;
 }
 
-haccept_cb  hio_getcb_accept(hio_t* io) {
+haccept_cb hio_getcb_accept(hio_t* io) {
     return io->accept_cb;
 }
 
@@ -61,38 +61,104 @@ hconnect_cb hio_getcb_connect(hio_t* io) {
     return io->connect_cb;
 }
 
-hread_cb    hio_getcb_read(hio_t* io) {
+hread_cb hio_getcb_read(hio_t* io) {
     return io->read_cb;
 }
 
-hwrite_cb   hio_getcb_write(hio_t* io) {
+hwrite_cb hio_getcb_write(hio_t* io) {
     return io->write_cb;
 }
 
-hclose_cb   hio_getcb_close(hio_t* io) {
+hclose_cb hio_getcb_close(hio_t* io) {
     return io->close_cb;
 }
 
-void hio_setcb_accept   (hio_t* io, haccept_cb  accept_cb) {
+void hio_setcb_accept(hio_t* io, haccept_cb accept_cb) {
     io->accept_cb = accept_cb;
 }
 
-void hio_setcb_connect  (hio_t* io, hconnect_cb connect_cb) {
+void hio_setcb_connect(hio_t* io, hconnect_cb connect_cb) {
     io->connect_cb = connect_cb;
 }
 
-void hio_setcb_read     (hio_t* io, hread_cb    read_cb) {
+void hio_setcb_read(hio_t* io, hread_cb read_cb) {
     io->read_cb = read_cb;
 }
 
-void hio_setcb_write    (hio_t* io, hwrite_cb   write_cb) {
+void hio_setcb_write(hio_t* io, hwrite_cb write_cb) {
     io->write_cb = write_cb;
 }
 
-void hio_setcb_close    (hio_t* io, hclose_cb   close_cb) {
+void hio_setcb_close(hio_t* io, hclose_cb close_cb) {
     io->close_cb = close_cb;
 }
 
+void hio_accept_cb(hio_t* io) {
+    /*
+    char localaddrstr[SOCKADDR_STRLEN] = {0};
+    char peeraddrstr[SOCKADDR_STRLEN] = {0};
+    printd("accept connfd=%d [%s] <= [%s]\n", io->fd,
+            SOCKADDR_STR(io->localaddr, localaddrstr),
+            SOCKADDR_STR(io->peeraddr, peeraddrstr));
+    */
+    if (io->accept_cb) {
+        // printd("accept_cb------\n");
+        io->accept_cb(io);
+        // printd("accept_cb======\n");
+    }
+}
+
+void hio_connect_cb(hio_t* io) {
+    /*
+    char localaddrstr[SOCKADDR_STRLEN] = {0};
+    char peeraddrstr[SOCKADDR_STRLEN] = {0};
+    printd("connect connfd=%d [%s] => [%s]\n", io->fd,
+            SOCKADDR_STR(io->localaddr, localaddrstr),
+            SOCKADDR_STR(io->peeraddr, peeraddrstr));
+    */
+    if (io->connect_cb) {
+        // printd("connect_cb------\n");
+        io->connect_cb(io);
+        // printd("connect_cb======\n");
+    }
+}
+
+void hio_read_cb(hio_t* io, void* buf, int len) {
+    if (io->read_cb) {
+        // printd("read_cb------\n");
+        io->read_cb(io, buf, len);
+        // printd("read_cb======\n");
+    }
+
+    if (hio_is_alloced_readbuf(io) && io->readbuf.len > READ_BUFSIZE_HIGH_WATER) {
+        // readbuf autosize
+        size_t small_size = io->readbuf.len / 2;
+        if (len < small_size) {
+            if (++io->small_readbytes_cnt == 3) {
+                io->small_readbytes_cnt = 0;
+                io->readbuf.base = (char*)safe_realloc(io->readbuf.base, small_size, io->readbuf.len);
+                io->readbuf.len = small_size;
+            }
+        }
+    }
+}
+
+void hio_write_cb(hio_t* io, const void* buf, int len) {
+    if (io->write_cb) {
+        // printd("write_cb------\n");
+        io->write_cb(io, buf, len);
+        // printd("write_cb======\n");
+    }
+}
+
+void hio_close_cb(hio_t* io) {
+    if (io->close_cb) {
+        // printd("close_cb------\n");
+        io->close_cb(io);
+        // printd("close_cb======\n");
+    }
+}
+
 void hio_set_type(hio_t* io, hio_type_e type) {
     io->io_type = type;
 }
@@ -132,9 +198,11 @@ int hio_set_ssl(hio_t* io, hssl_t ssl) {
 
 void hio_set_readbuf(hio_t* io, void* buf, size_t len) {
     assert(io && buf && len != 0);
+    hio_free_readbuf(io);
     io->readbuf.base = (char*)buf;
     io->readbuf.len = len;
     io->readbuf.offset = 0;
+    io->alloced_readbuf = 0;
 }
 
 void hio_del_connect_timer(hio_t* io) {
@@ -237,13 +305,6 @@ void hio_set_heartbeat(hio_t* io, int interval_ms, hio_send_heartbeat_fn fn) {
     io->heartbeat_fn = fn;
 }
 
-bool hio_is_alloced_readbuf(hio_t* io) {
-    return  io->alloced_readbuf &&
-            io->readbuf.base &&
-            io->readbuf.len &&
-            io->readbuf.base != io->loop->readbuf.base;
-}
-
 void hio_alloc_readbuf(hio_t* io, int len) {
     if (hio_is_alloced_readbuf(io)) {
         io->readbuf.base = (char*)safe_realloc(io->readbuf.base, len, io->readbuf.len);
@@ -264,6 +325,21 @@ void hio_free_readbuf(hio_t* io) {
     }
 }
 
+int hio_read_once (hio_t* io) {
+    io->read_once = 1;
+    return hio_read_start(io);
+}
+
+int hio_read_until(hio_t* io, int len) {
+    io->read_until = len;
+    // NOTE: prepare readbuf
+    if (hio_is_loop_readbuf(io) ||
+        io->readbuf.len < len) {
+        hio_alloc_readbuf(io, len);
+    }
+    return hio_read_once(io);
+}
+
 void hio_unset_unpack(hio_t* io) {
     if (io->unpack_setting) {
         io->unpack_setting = NULL;

+ 22 - 5
event/hevent.h

@@ -10,7 +10,9 @@
 #include "heap.h"
 #include "queue.h"
 
-#define HLOOP_READ_BUFSIZE  8192
+#define HLOOP_READ_BUFSIZE          8192        // 8K
+#define READ_BUFSIZE_HIGH_WATER     65536       // 64K
+#define WRITE_QUEUE_HIGH_WATER      (1U << 23)  // 8M
 
 ARRAY_DECL(hio_t*, io_array);
 QUEUE_DECL(hevent_t, event_queue);
@@ -98,7 +100,8 @@ struct hio_s {
     unsigned    recvfrom    :1;
     unsigned    sendto      :1;
     unsigned    close       :1;
-    unsigned    alloced_readbuf :1;
+    unsigned    read_once   :1;     // for hio_read_once
+    unsigned    alloced_readbuf :1; // for hio_read_until, hio_set_unpack
 // public:
     uint32_t    id; // fd cannot be used as unique identifier, so we provide an id
     int         fd;
@@ -108,9 +111,12 @@ struct hio_s {
     int         revents;
     struct sockaddr*    localaddr;
     struct sockaddr*    peeraddr;
-    offset_buf_t        readbuf;        // for hread
-    struct write_queue  write_queue;    // for hwrite
+    offset_buf_t        readbuf;        // for read
+    int                 read_until;     // for hio_read_until
+    uint32_t            small_readbytes_cnt;
+    struct write_queue  write_queue;    // for write
     hrecursive_mutex_t  write_mutex;    // lock write and write_queue
+    uint32_t            write_queue_bytes;
     // callbacks
     hread_cb    read_cb;
     hwrite_cb   write_cb;
@@ -151,12 +157,23 @@ void hio_done(hio_t* io);
 void hio_free(hio_t* io);
 uint32_t hio_next_id();
 
+void hio_accept_cb(hio_t* io);
+void hio_connect_cb(hio_t* io);
+void hio_read_cb(hio_t* io, void* buf, int len);
+void hio_write_cb(hio_t* io, const void* buf, int len);
+void hio_close_cb(hio_t* io);
+
 void hio_del_connect_timer(hio_t* io);
 void hio_del_close_timer(hio_t* io);
 void hio_del_keepalive_timer(hio_t* io);
 void hio_del_heartbeat_timer(hio_t* io);
 
-bool hio_is_alloced_readbuf(hio_t* io);
+static inline bool hio_is_loop_readbuf(hio_t* io) {
+    return io->readbuf.base == io->loop->readbuf.base;
+}
+static inline bool hio_is_alloced_readbuf(hio_t* io) {
+    return io->alloced_readbuf;
+}
 void hio_alloc_readbuf(hio_t* io, int len);
 void hio_free_readbuf(hio_t* io);
 

+ 6 - 1
event/hloop.c

@@ -176,7 +176,7 @@ process_timers:
 static void hloop_stat_timer_cb(htimer_t* timer) {
     hloop_t* loop = timer->loop;
     // hlog_set_level(LOG_LEVEL_DEBUG);
-    hlogd("[loop] pid=%ld tid=%ld uptime=%lluus cnt=%llu nactives=%u nios=%d ntimers=%d nidles=%u",
+    hlogd("[loop] pid=%ld tid=%ld uptime=%lluus cnt=%llu nactives=%u nios=%u ntimers=%u nidles=%u",
         loop->pid, loop->tid, loop->cur_hrtime - loop->start_hrtime, loop->loop_cnt,
         loop->nactives, loop->nios, loop->ntimers, loop->nidles);
 }
@@ -684,6 +684,11 @@ void hio_ready(hio_t* io) {
     io->readbuf.base = io->loop->readbuf.base;
     io->readbuf.len = io->loop->readbuf.len;
     io->readbuf.offset = 0;
+    io->read_once = 0;
+    io->read_until = 0;
+    io->small_readbytes_cnt = 0;
+    // write_queue
+    io->write_queue_bytes = 0;
     // callbacks
     io->read_cb = NULL;
     io->write_cb = NULL;

+ 3 - 0
event/hloop.h

@@ -289,6 +289,9 @@ HV_EXPORT int hio_connect(hio_t* io);
 HV_EXPORT int hio_read   (hio_t* io);
 #define hio_read_start(io) hio_read(io)
 #define hio_read_stop(io)  hio_del(io, HV_READ)
+// hio_read_start => hread_cb => hio_read_stop
+HV_EXPORT int hio_read_once (hio_t* io);
+HV_EXPORT int hio_read_until(hio_t* io, int len);
 // NOTE: hio_write is thread-safe, locked by recursive_mutex, allow to be called by other threads.
 // hio_try_write => hio_add(io, HV_WRITE) => write => hwrite_cb
 HV_EXPORT int hio_write  (hio_t* io, const void* buf, size_t len);

+ 38 - 54
event/nio.c

@@ -34,36 +34,12 @@ static void __close_timeout_cb(htimer_t* timer) {
 }
 
 static void __accept_cb(hio_t* io) {
-    /*
-    char localaddrstr[SOCKADDR_STRLEN] = {0};
-    char peeraddrstr[SOCKADDR_STRLEN] = {0};
-    printd("accept connfd=%d [%s] <= [%s]\n", io->fd,
-            SOCKADDR_STR(io->localaddr, localaddrstr),
-            SOCKADDR_STR(io->peeraddr, peeraddrstr));
-    */
-
-    if (io->accept_cb) {
-        // printd("accept_cb------\n");
-        io->accept_cb(io);
-        // printd("accept_cb======\n");
-    }
+    hio_accept_cb(io);
 }
 
 static void __connect_cb(hio_t* io) {
-    /*
-    char localaddrstr[SOCKADDR_STRLEN] = {0};
-    char peeraddrstr[SOCKADDR_STRLEN] = {0};
-    printd("connect connfd=%d [%s] => [%s]\n", io->fd,
-            SOCKADDR_STR(io->localaddr, localaddrstr),
-            SOCKADDR_STR(io->peeraddr, peeraddrstr));
-    */
     hio_del_connect_timer(io);
-
-    if (io->connect_cb) {
-        // printd("connect_cb------\n");
-        io->connect_cb(io);
-        // printd("connect_cb======\n");
-    }
+    hio_connect_cb(io);
 }
 
 static void __read_cb(hio_t* io, void* buf, int readbytes) {
@@ -77,11 +53,11 @@ static void __read_cb(hio_t* io, void* buf, int readbytes) {
         return;
     }
 
-    if (io->read_cb) {
-        // printd("read_cb------\n");
-        io->read_cb(io, buf, readbytes);
-        // printd("read_cb======\n");
+    if (io->read_once) {
+        hio_read_stop(io);
     }
+
+    hio_read_cb(io, buf, readbytes);
 }
 
 static void __write_cb(hio_t* io, const void* buf, int writebytes) {
@@ -89,27 +65,16 @@ static void __write_cb(hio_t* io, const void* buf, int writebytes) {
     if (io->keepalive_timer) {
         htimer_reset(io->keepalive_timer);
     }
-
-    if (io->write_cb) {
-        // printd("write_cb------\n");
-        io->write_cb(io, buf, writebytes);
-        // printd("write_cb======\n");
-    }
+    hio_write_cb(io, buf, writebytes);
 }
 
 static void __close_cb(hio_t* io) {
     // printd("close fd=%d\n", io->fd);
-
     hio_del_connect_timer(io);
     hio_del_close_timer(io);
     hio_del_keepalive_timer(io);
     hio_del_heartbeat_timer(io);
-
-    if (io->close_cb) {
-        // printd("close_cb------\n");
-        io->close_cb(io);
-        // printd("close_cb======\n");
-    }
+    hio_close_cb(io);
 }
 
 static void ssl_server_handshake(hio_t* io) {
@@ -309,7 +274,11 @@ static void nio_read(hio_t* io) {
     int len = 0, nread = 0, err = 0;
 read:
     buf = io->readbuf.base + io->readbuf.offset;
-    len = io->readbuf.len - io->readbuf.offset;
+    if (io->read_until) {
+        len = io->read_until;
+    } else {
+        len = io->readbuf.len - io->readbuf.offset;
+    }
     nread = __nio_read(io, buf, len);
     // printd("read retval=%d\n", nread);
     if (nread < 0) {
@@ -329,9 +298,18 @@ read:
     if (nread == 0) {
         goto disconnect;
     }
-    __read_cb(io, buf, nread);
-    if (nread == len) {
-        goto read;
+    if (io->read_until) {
+        io->readbuf.offset += nread;
+        io->read_until -= nread;
+        if (io->read_until == 0) {
+            __read_cb(io, io->readbuf.base, io->readbuf.offset);
+            io->readbuf.offset = 0;
+        }
+    } else {
+        __read_cb(io, buf, nread);
+        if (nread == len) {
+            goto read;
+        }
     }
     return;
 read_error:
@@ -374,6 +352,7 @@ write:
     }
     __write_cb(io, buf, nwrite);
     pbuf->offset += nwrite;
+    io->write_queue_bytes -= nwrite;
     if (nwrite == len) {
         HV_FREE(pbuf->base);
         write_queue_pop_front(&io->write_queue);
@@ -524,17 +503,22 @@ enqueue:
         hio_add(io, hio_handle_events, HV_WRITE);
     }
     if (nwrite < len) {
-        offset_buf_t rest;
-        rest.len = len;
-        rest.offset = nwrite;
+        offset_buf_t remain;
+        remain.len = len;
+        remain.offset = nwrite;
         // NOTE: free in nio_write
-        HV_ALLOC(rest.base, rest.len);
-        memcpy(rest.base, buf, rest.len);
+        HV_ALLOC(remain.base, remain.len);
+        memcpy(remain.base, buf, remain.len);
         if (io->write_queue.maxsize == 0) {
             write_queue_init(&io->write_queue, 4);
         }
-        write_queue_push_back(&io->write_queue, &rest);
-        // hlogd("write queue %d", rest.len);
+        write_queue_push_back(&io->write_queue, &remain);
+        io->write_queue_bytes += remain.len - remain.offset;
+        // hlogd("write queue %d, total %u", remain.len - remain.offset, io->write_queue_bytes);
+        if (io->write_queue_bytes > WRITE_QUEUE_HIGH_WATER) {
+            hlogw("write queue %d, total %u, over high water %u",
+                    remain.len - remain.offset, io->write_queue_bytes, WRITE_QUEUE_HIGH_WATER);
+        }
     }
     hrecursive_mutex_unlock(&io->write_mutex);
     return nwrite;

+ 4 - 12
event/unpack.c

@@ -14,9 +14,7 @@ int hio_unpack(hio_t* io, void* buf, int readbytes) {
     case UNPACK_BY_LENGTH_FIELD:
         return hio_unpack_by_length_field(io, buf, readbytes);
     default:
-        if (io->read_cb) {
-            io->read_cb(io, buf, readbytes);
-        }
+        hio_read_cb(io, buf, readbytes);
         return readbytes;
     }
 }
@@ -34,9 +32,7 @@ int hio_unpack_by_fixed_length(hio_t* io, void* buf, int readbytes) {
     int remain = ep - p;
     int handled = 0;
     while (remain >= fixed_length) {
-        if (io->read_cb) {
-            io->read_cb(io, (void*)p, fixed_length);
-        }
+        hio_read_cb(io, (void*)p, fixed_length);
         handled += fixed_length;
         p += fixed_length;
         remain -= fixed_length;
@@ -77,9 +73,7 @@ int hio_unpack_by_delimiter(hio_t* io, void* buf, int readbytes) {
 match:
         p += delimiter_bytes;
         remain -= delimiter_bytes;
-        if (io->read_cb) {
-            io->read_cb(io, (void*)sp, p - sp);
-        }
+        hio_read_cb(io, (void*)sp, p - sp);
         handled += p - sp;
         sp = p;
         continue;
@@ -150,9 +144,7 @@ int hio_unpack_by_length_field(hio_t* io, void* buf, int readbytes) {
         }
         package_len = head_len + body_len;
         if (remain >= package_len) {
-            if (io->read_cb) {
-                io->read_cb(io, (void*)p, package_len);
-            }
+            hio_read_cb(io, (void*)p, package_len);
             handled += package_len;
             p += package_len;
             remain -= package_len;

+ 46 - 12
examples/tcp_echo_server.c

@@ -19,7 +19,15 @@
  * @build   ./configure --with-openssl && make clean && make
  *
  */
-#define TEST_SSL 0
+#define TEST_SSL        0
+#define TEST_READ_ONCE  0
+#define TEST_READ_UNTIL 0
+#define TEST_READ_STOP  0
+#define TEST_UNPACK     0
+
+#if TEST_UNPACK
+unpack_setting_t unpack_setting;
+#endif
 
 // hloop_create_tcp_server -> on_accept -> hio_read -> on_recv -> hio_write
 
@@ -38,6 +46,14 @@ static void on_recv(hio_t* io, void* buf, int readbytes) {
     // echo
     printf("> %.*s", readbytes, (char*)buf);
     hio_write(io, buf, readbytes);
+
+#if TEST_READ_STOP
+    hio_read_stop(io);
+#elif TEST_READ_ONCE
+    hio_read_once(io);
+#elif TEST_READ_UNTIL
+    hio_read_until(io, TEST_READ_UNTIL);
+#endif
 }
 
 static void on_accept(hio_t* io) {
@@ -50,7 +66,18 @@ static void on_accept(hio_t* io) {
 
     hio_setcb_close(io, on_close);
     hio_setcb_read(io, on_recv);
-    hio_read(io);
+
+#if TEST_UNPACK
+    hio_set_unpack(io, &unpack_setting);
+#endif
+
+#if TEST_READ_ONCE
+    hio_read_once(io);
+#elif TEST_READ_UNTIL
+    hio_read_until(io, TEST_READ_UNTIL);
+#else
+    hio_read_start(io);
+#endif
 }
 
 int main(int argc, char** argv) {
@@ -61,19 +88,26 @@ int main(int argc, char** argv) {
     int port = atoi(argv[1]);
 
 #if TEST_SSL
-    {
-        hssl_ctx_init_param_t param;
-        memset(&param, 0, sizeof(param));
-        param.crt_file = "cert/server.crt";
-        param.key_file = "cert/server.key";
-        param.endpoint = HSSL_SERVER;
-        if (hssl_ctx_init(&param) == NULL) {
-            fprintf(stderr, "hssl_ctx_init failed!\n");
-            return -30;
-        }
+    hssl_ctx_init_param_t ssl_param;
+    memset(&ssl_param, 0, sizeof(ssl_param));
+    ssl_param.crt_file = "cert/server.crt";
+    ssl_param.key_file = "cert/server.key";
+    ssl_param.endpoint = HSSL_SERVER;
+    if (hssl_ctx_init(&ssl_param) == NULL) {
+        fprintf(stderr, "hssl_ctx_init failed!\n");
+        return -30;
     }
 #endif
 
+#if TEST_UNPACK
+    memset(&unpack_setting, 0, sizeof(unpack_setting_t));
+    unpack_setting.package_max_length = DEFAULT_PACKAGE_MAX_LENGTH;
+    unpack_setting.mode = UNPACK_BY_DELIMITER;
+    unpack_setting.delimiter[0] = '\r';
+    unpack_setting.delimiter[1] = '\n';
+    unpack_setting.delimiter_bytes = 2;
+#endif
+
     hloop_t* loop = hloop_new(0);
 #if TEST_SSL
     hio_t* listenio = hloop_create_ssl_server(loop, "0.0.0.0", port, on_accept);