Browse Source

Add hio_set_max_read_bufsize, hio_set_max_write_bufsize

ithewei 3 years ago
parent
commit
cb70c9baad
5 changed files with 57 additions and 33 deletions
  1. 32 20
      event/hevent.c
  2. 3 1
      event/hevent.h
  3. 9 6
      event/hloop.h
  4. 4 6
      event/nio.c
  5. 9 0
      evpp/Channel.h

+ 32 - 20
event/hevent.c

@@ -106,9 +106,11 @@ void hio_ready(hio_t* io) {
     io->readbuf.head = io->readbuf.tail = 0;
     io->read_flags = 0;
     io->read_until_length = 0;
+    io->max_read_bufsize = MAX_READ_BUFSIZE;
     io->small_readbytes_cnt = 0;
     // write_queue
     io->write_bufsize = 0;
+    io->max_write_bufsize = MAX_WRITE_BUFSIZE;
     // callbacks
     io->read_cb = NULL;
     io->write_cb = NULL;
@@ -246,14 +248,6 @@ void* hio_context(hio_t* io) {
     return io->ctx;
 }
 
-hio_readbuf_t* hio_get_readbuf(hio_t* io) {
-    return &io->readbuf;
-}
-
-size_t hio_write_bufsize(hio_t* io) {
-    return io->write_bufsize;
-}
-
 haccept_cb hio_getcb_accept(hio_t* io) {
     return io->accept_cb;
 }
@@ -485,15 +479,6 @@ int hio_new_ssl_ctx(hio_t* io, hssl_ctx_opt_t* opt) {
     return hio_set_ssl_ctx(io, ssl_ctx);
 }
 
-void hio_set_readbuf(hio_t* io, void* buf, size_t len) {
-    assert(io && buf && len != 0);
-    hio_free_readbuf(io);
-    io->readbuf.base = (char*)buf;
-    io->readbuf.len = len;
-    io->readbuf.head = io->readbuf.tail = 0;
-    io->alloced_readbuf = 0;
-}
-
 void hio_del_connect_timer(hio_t* io) {
     if (io->connect_timer) {
         htimer_del(io->connect_timer);
@@ -687,9 +672,10 @@ void hio_set_heartbeat(hio_t* io, int interval_ms, hio_send_heartbeat_fn fn) {
     io->heartbeat_fn = fn;
 }
 
+//-----------------iobuf---------------------------------------------
 void hio_alloc_readbuf(hio_t* io, int len) {
-    if (len > MAX_READ_BUFSIZE) {
-        hloge("read bufsize > %u, close it!", (unsigned int)MAX_READ_BUFSIZE);
+    if (len > io->max_read_bufsize) {
+        hloge("read bufsize > %u, close it!", io->max_read_bufsize);
         io->error = ERR_OVER_LIMIT;
         hio_close_async(io);
         return;
@@ -714,6 +700,31 @@ void hio_free_readbuf(hio_t* io) {
     }
 }
 
+void hio_set_readbuf(hio_t* io, void* buf, size_t len) {
+    assert(io && buf && len != 0);
+    hio_free_readbuf(io);
+    io->readbuf.base = (char*)buf;
+    io->readbuf.len = len;
+    io->readbuf.head = io->readbuf.tail = 0;
+    io->alloced_readbuf = 0;
+}
+
+hio_readbuf_t* hio_get_readbuf(hio_t* io) {
+    return &io->readbuf;
+}
+
+void hio_set_max_read_bufsize (hio_t* io, uint32_t size) {
+    io->max_read_bufsize = size;
+}
+
+void hio_set_max_write_bufsize(hio_t* io, uint32_t size) {
+    io->max_write_bufsize = size;
+}
+
+size_t hio_write_bufsize(hio_t* io) {
+    return io->write_bufsize;
+}
+
 int hio_read_once (hio_t* io) {
     io->read_flags |= HIO_READ_ONCE;
     return hio_read_start(io);
@@ -806,8 +817,9 @@ void hio_set_unpack(hio_t* io, unpack_setting_t* setting) {
     if (io->unpack_setting->mode == UNPACK_BY_FIXED_LENGTH) {
         io->readbuf.len = io->unpack_setting->fixed_length;
     } else {
-        io->readbuf.len = HLOOP_READ_BUFSIZE;
+        io->readbuf.len = MIN(HLOOP_READ_BUFSIZE, io->unpack_setting->package_max_length);
     }
+    io->max_read_bufsize = io->unpack_setting->package_max_length;
     hio_alloc_readbuf(io, io->readbuf.len);
 }
 

+ 3 - 1
event/hevent.h

@@ -17,7 +17,7 @@
 #define READ_BUFSIZE_HIGH_WATER     65536       // 64K
 #define WRITE_BUFSIZE_HIGH_WATER    (1U << 23)  // 8M
 #define MAX_READ_BUFSIZE            (1U << 24)  // 16M
-#define MAX_WRITE_BUFSIZE           (1U << 26)  // 64M
+#define MAX_WRITE_BUFSIZE           (1U << 24)  // 16M
 
 // hio_read_flags
 #define HIO_READ_ONCE           0x1
@@ -133,11 +133,13 @@ struct hio_s {
         unsigned int    read_until_length;
         unsigned char   read_until_delim;
     };
+    uint32_t            max_read_bufsize;
     uint32_t            small_readbytes_cnt; // for readbuf autosize
     // write
     struct write_queue  write_queue;
     hrecursive_mutex_t  write_mutex; // lock write and write_queue
     uint32_t            write_bufsize;
+    uint32_t            max_write_bufsize;
     // callbacks
     hread_cb    read_cb;
     hwrite_cb   write_cb;

+ 9 - 6
event/hloop.h

@@ -272,14 +272,21 @@ HV_EXPORT void hio_set_context(hio_t* io, void* ctx);
 HV_EXPORT void* hio_context(hio_t* io);
 HV_EXPORT bool hio_is_opened(hio_t* io);
 HV_EXPORT bool hio_is_closed(hio_t* io);
+
+// iobuf
 // #include "hbuf.h"
 typedef struct fifo_buf_s hio_readbuf_t;
+// NOTE: One loop per thread, one readbuf per loop.
+// But you can pass in your own readbuf instead of the default readbuf to avoid memcopy.
+HV_EXPORT void hio_set_readbuf(hio_t* io, void* buf, size_t len);
 HV_EXPORT hio_readbuf_t* hio_get_readbuf(hio_t* io);
+HV_EXPORT void hio_set_max_read_bufsize (hio_t* io, uint32_t size);
+HV_EXPORT void hio_set_max_write_bufsize(hio_t* io, uint32_t size);
 // NOTE: hio_write is non-blocking, so there is a write queue inside hio_t to cache unwritten data and wait for writable.
 // @return current buffer size of write queue.
 HV_EXPORT size_t   hio_write_bufsize(hio_t* io);
-#define hio_write_queue_is_empty(io) (hio_write_bufsize(io) == 0)
-#define hio_write_is_complete(io)    (hio_write_bufsize(io) == 0)
+#define hio_write_is_complete(io) (hio_write_bufsize(io) == 0)
+
 HV_EXPORT uint64_t hio_last_read_time(hio_t* io);   // ms
 HV_EXPORT uint64_t hio_last_write_time(hio_t* io);  // ms
 
@@ -296,7 +303,6 @@ HV_EXPORT hread_cb    hio_getcb_read(hio_t* io);
 HV_EXPORT hwrite_cb   hio_getcb_write(hio_t* io);
 HV_EXPORT hclose_cb   hio_getcb_close(hio_t* io);
 
-// some useful settings
 // Enable SSL/TLS is so easy :)
 HV_EXPORT int  hio_enable_ssl(hio_t* io);
 HV_EXPORT bool hio_is_ssl(hio_t* io);
@@ -307,9 +313,6 @@ HV_EXPORT int  hio_new_ssl_ctx(hio_t* io, hssl_ctx_opt_t* opt);
 HV_EXPORT hssl_t     hio_get_ssl(hio_t* io);
 HV_EXPORT hssl_ctx_t hio_get_ssl_ctx(hio_t* io);
 
-// NOTE: One loop per thread, one readbuf per loop.
-// But you can pass in your own readbuf instead of the default readbuf to avoid memcopy.
-HV_EXPORT void hio_set_readbuf(hio_t* io, void* buf, size_t len);
 // connect timeout => hclose_cb
 HV_EXPORT void hio_set_connect_timeout(hio_t* io, int timeout_ms DEFAULT(HIO_DEFAULT_CONNECT_TIMEOUT));
 // close timeout => hclose_cb

+ 4 - 6
event/nio.c

@@ -498,12 +498,10 @@ enqueue:
         hio_add(io, hio_handle_events, HV_WRITE);
     }
     if (nwrite < len) {
-        if (io->write_bufsize + len - nwrite > MAX_WRITE_BUFSIZE) {
-            if (io->write_bufsize > MAX_WRITE_BUFSIZE) {
-                hloge("write bufsize > %u, close it!", (unsigned int)MAX_WRITE_BUFSIZE);
-                io->error = ERR_OVER_LIMIT;
-                goto write_error;
-            }
+        if (io->write_bufsize + len - nwrite > io->max_write_bufsize) {
+            hloge("write bufsize > %u, close it!", io->max_write_bufsize);
+            io->error = ERR_OVER_LIMIT;
+            goto write_error;
         }
         offset_buf_t remain;
         remain.len = len - nwrite;

+ 9 - 0
evpp/Channel.h

@@ -125,6 +125,15 @@ public:
         return write(str.data(), str.size());
     }
 
+    // iobuf setting
+    void setMaxReadBufsize(uint32_t size) {
+        if (io_ == NULL) return;
+        return hio_set_max_read_bufsize(io_, size);
+    }
+    void setMaxWriteBufsize(uint32_t size) {
+        if (io_ == NULL) return;
+        return hio_set_max_write_bufsize(io_, size);
+    }
     size_t writeBufsize() {
         if (io_ == NULL) return 0;
         return hio_write_bufsize(io_);