Explorar o código

hloop_s add readbuf

hewei %!s(int64=6) %!d(string=hai) anos
pai
achega
9219fdbd8c
Modificáronse 15 ficheiros con 60 adicións e 44 borrados
  1. 3 1
      base/array.h
  2. 3 1
      base/queue.h
  3. 12 2
      event/hevent.c
  4. 3 1
      event/hevent.h
  5. 7 1
      event/hloop.c
  6. 0 1
      event/hloop.h
  7. 2 1
      event/iowatcher.h
  8. 4 2
      event/nio.c
  9. 0 2
      event/nlog.c
  10. 6 1
      event/overlapio.c
  11. 1 1
      examples/nc.c
  12. 2 6
      examples/tcp.c
  13. 2 6
      examples/udp.c
  14. 0 8
      http/server/HttpServer.cpp
  15. 15 10
      unittest/webbench.c

+ 3 - 1
base/array.h

@@ -16,6 +16,8 @@
 
 #include "hbase.h"
 
+#define ARRAY_INIT_SIZE     16
+
 // #include <vector>
 // typedef std::vector<type> atype;
 #define ARRAY_DECL(type, atype) \
@@ -75,12 +77,12 @@ static inline void atype##_cleanup(atype* p) {\
 }\
 \
 static inline void atype##_resize(atype* p, int maxsize) {\
+    if (maxsize == 0) maxsize = ARRAY_INIT_SIZE;\
     p->ptr = (type*)safe_realloc(p->ptr, sizeof(type) * maxsize, sizeof(type) * p->maxsize);\
     p->maxsize = maxsize;\
 }\
 \
 static inline void atype##_double_resize(atype* p) {\
-    assert(p->maxsize != 0);\
     atype##_resize(p, p->maxsize*2);\
 }\
 \

+ 3 - 1
base/queue.h

@@ -14,6 +14,8 @@
 
 #include "hbase.h"
 
+#define QUEUE_INIT_SIZE     16
+
 // #include <deque>
 // typedef std::deque<type> qtype;
 #define QUEUE_DECL(type, qtype) \
@@ -68,12 +70,12 @@ static inline void qtype##_cleanup(qtype* p) {\
 }\
 \
 static inline void qtype##_resize(qtype* p, int maxsize) {\
+    if (maxsize == 0) maxsize = QUEUE_INIT_SIZE;\
     p->ptr = (type*)safe_realloc(p->ptr, sizeof(type)*maxsize, sizeof(type)*p->maxsize);\
     p->maxsize = maxsize;\
 }\
 \
 static inline void qtype##_double_resize(qtype* p) {\
-    assert(p->maxsize != 0);\
     return qtype##_resize(p, p->maxsize*2);\
 }\
 \

+ 12 - 2
event/hevent.c

@@ -22,8 +22,18 @@ struct sockaddr* hio_peeraddr(hio_t* io) {
 }
 
 void hio_set_readbuf(hio_t* io, void* buf, size_t len) {
-    io->readbuf.base = (char*)buf;
-    io->readbuf.len = len;
+    if (buf == NULL || len == 0) {
+        hloop_t* loop = io->loop;
+        if (loop && (loop->readbuf.base == NULL || loop->readbuf.len == 0)) {
+            loop->readbuf.len = HLOOP_READ_BUFSIZE;
+            loop->readbuf.base = (char*)malloc(loop->readbuf.len);
+            io->readbuf = loop->readbuf;
+        }
+    }
+    else {
+        io->readbuf.base = (char*)buf;
+        io->readbuf.len = len;
+    }
 }
 
 int hio_enable_ssl(hio_t* io) {

+ 3 - 1
event/hevent.h

@@ -10,6 +10,8 @@
 #include "hbuf.h"
 #include "hmutex.h"
 
+#define HLOOP_READ_BUFSIZE  8192
+
 typedef enum {
     HLOOP_STATUS_STOP,
     HLOOP_STATUS_RUNNING,
@@ -44,10 +46,10 @@ struct hloop_s {
     // ios: with fd as array.index
     struct io_array             ios;
     uint32_t                    nios;
+    hbuf_t                      readbuf;        // for hread
     void*                       iowatcher;
     // custom_events
     int                         sockpair[2];
-    char                        readbuf[4];
     event_queue                 custom_events;
     hmutex_t                    custom_events_mutex;
 };

+ 7 - 1
event/hloop.c

@@ -215,6 +215,12 @@ static void hloop_cleanup(hloop_t* loop) {
         }
     }
     io_array_cleanup(&loop->ios);
+    // readbuf
+    if (loop->readbuf.base && loop->readbuf.len) {
+        free(loop->readbuf.base);
+        loop->readbuf.base = NULL;
+        loop->readbuf.len = 0;
+    }
     // iowatcher
     iowatcher_cleanup(loop);
     // custom_events
@@ -752,7 +758,7 @@ void hloop_post_event(hloop_t* loop, hevent_t* ev) {
             hloge("socketpair error");
             goto unlock;
         }
-        hread(loop, loop->sockpair[1], loop->readbuf, sizeof(loop->readbuf), sockpair_read_cb);
+        hread(loop, loop->sockpair[1], loop->readbuf.base, loop->readbuf.len, sockpair_read_cb);
     }
     if (loop->custom_events.maxsize == 0) {
         event_queue_init(&loop->custom_events, CUSTOM_EVENT_QUEUE_INIT_SIZE);

+ 0 - 1
event/hloop.h

@@ -167,7 +167,6 @@ void hio_setcb_read     (hio_t* io, hread_cb    read_cb);
 void hio_setcb_write    (hio_t* io, hwrite_cb   write_cb);
 void hio_setcb_close    (hio_t* io, hclose_cb   close_cb);
 
-// NOTE: don't forget to call hio_set_readbuf
 int hio_read   (hio_t* io);
 int hio_write  (hio_t* io, const void* buf, size_t len);
 int hio_close  (hio_t* io);

+ 2 - 1
event/iowatcher.h

@@ -12,7 +12,8 @@
     !defined(EVENT_PORT) &&     \
     !defined(EVENT_NOEVENT)
 #ifdef OS_WIN
-#define EVENT_IOCP
+//#define EVENT_IOCP // IOCP improving
+#define EVENT_POLL
 #elif defined(OS_LINUX)
 #define EVENT_EPOLL
 #elif defined(OS_MAC)

+ 4 - 2
event/nio.c

@@ -154,10 +154,12 @@ connect_failed:
 static void nio_read(hio_t* io) {
     //printd("nio_read fd=%d\n", io->fd);
     int nread;
+    if (io->readbuf.base == NULL || io->readbuf.len == 0) {
+        hio_set_readbuf(io, io->loop->readbuf.base, io->loop->readbuf.len);
+    }
     void* buf = io->readbuf.base;
     int   len = io->readbuf.len;
 read:
-    memset(buf, 0, len);
     switch (io->io_type) {
 #ifdef WITH_OPENSSL
     case HIO_TYPE_SSL:
@@ -197,7 +199,7 @@ read:
     if (nread == 0) {
         goto disconnect;
     }
-    //printd("> %s\n", buf);
+    //printd("> %.*s\n", nread, buf);
     if (io->read_cb) {
         //printd("read_cb------\n");
         io->read_cb(io, buf, nread);

+ 0 - 2
event/nlog.c

@@ -49,8 +49,6 @@ static void on_accept(hio_t* io) {
             SOCKADDR_STR(hio_peeraddr(io), peeraddrstr));
     */
 
-    static char s_readbuf[256] = {0};
-    hio_set_readbuf(io, s_readbuf, sizeof(s_readbuf));
     hio_setcb_read(io, on_read);
     hio_setcb_close(io, on_close);
     hio_read(io);

+ 6 - 1
event/overlapio.c

@@ -47,6 +47,9 @@ int post_recv(hio_t* io, hoverlapped_t* hovlp) {
     hovlp->fd = io->fd;
     hovlp->event = READ_EVENT;
     hovlp->io = io;
+    if (io->readbuf.base == NULL || io->readbuf.len == 0) {
+        hio_set_readbuf(io, io->loop->readbuf.base, io->loop->readbuf.len);
+    }
     hovlp->buf.len = io->readbuf.len;
     if (io->io_type == HIO_TYPE_UDP || io->io_type == HIO_TYPE_IP) {
         SAFE_ALLOC(hovlp->buf.buf, hovlp->buf.len);
@@ -54,7 +57,7 @@ int post_recv(hio_t* io, hoverlapped_t* hovlp) {
     else {
         hovlp->buf.buf = io->readbuf.base;
     }
-    memset(hovlp->buf.buf, 0, hovlp->buf.len);
+    //memset(hovlp->buf.buf, 0, hovlp->buf.len);
     DWORD dwbytes = 0;
     DWORD flags = 0;
     int ret = 0;
@@ -140,11 +143,13 @@ static void on_connectex_complete(hio_t* io) {
         getsockname(io->fd, io->localaddr, &addrlen);
         addrlen = sizeof(struct sockaddr_in6);
         getpeername(io->fd, io->peeraddr, &addrlen);
+        /*
         char localaddrstr[SOCKADDR_STRLEN] = {0};
         char peeraddrstr[SOCKADDR_STRLEN] = {0};
         printd("connect connfd=%d [%s] => [%s]\n", io->fd,
                 SOCKADDR_STR(io->localaddr, localaddrstr),
                 SOCKADDR_STR(io->peeraddr, peeraddrstr));
+        */
         //printd("connect_cb------\n");
         io->connect_cb(io);
         //printd("connect_cb======\n");

+ 1 - 1
examples/nc.c

@@ -23,7 +23,7 @@ void on_recv(hio_t* io, void* buf, int readbytes) {
             SOCKADDR_STR(hio_localaddr(io), localaddrstr),
             SOCKADDR_STR(hio_peeraddr(io), peeraddrstr));
     }
-    printf("%s", (char*)buf);
+    printf("%.*s", readbytes, (char*)buf);
     fflush(stdout);
 }
 

+ 2 - 6
examples/tcp.c

@@ -1,9 +1,6 @@
 #include "hloop.h"
 #include "hsocket.h"
 
-#define RECV_BUFSIZE    8192
-static char recvbuf[RECV_BUFSIZE];
-
 void on_close(hio_t* io) {
     printf("on_close fd=%d error=%d\n", hio_fd(io), hio_error(io));
 }
@@ -15,9 +12,9 @@ void on_recv(hio_t* io, void* buf, int readbytes) {
     printf("[%s] <=> [%s]\n",
             SOCKADDR_STR(hio_localaddr(io), localaddrstr),
             SOCKADDR_STR(hio_peeraddr(io), peeraddrstr));
-    printf("< %s\n", buf);
+    printf("< %.*s", readbytes, (char*)buf);
     // echo
-    printf("> %s\n", buf);
+    printf("> %.*s", readbytes, (char*)buf);
     hio_write(io, buf, readbytes);
 }
 
@@ -31,7 +28,6 @@ void on_accept(hio_t* io) {
 
     hio_setcb_close(io, on_close);
     hio_setcb_read(io, on_recv);
-    hio_set_readbuf(io, recvbuf, RECV_BUFSIZE);
     hio_read(io);
 }
 

+ 2 - 6
examples/udp.c

@@ -1,9 +1,6 @@
 #include "hloop.h"
 #include "hsocket.h"
 
-#define RECV_BUFSIZE    8192
-static char recvbuf[RECV_BUFSIZE];
-
 void on_close(hio_t* io) {
     printf("on_close fd=%d error=%d\n", hio_fd(io), hio_error(io));
 }
@@ -15,9 +12,9 @@ void on_recvfrom(hio_t* io, void* buf, int readbytes) {
     printf("[%s] <=> [%s]\n",
             SOCKADDR_STR(hio_localaddr(io), localaddrstr),
             SOCKADDR_STR(hio_peeraddr(io), peeraddrstr));
-    printf("< %s\n", buf);
+    printf("< %.*s", readbytes, (char*)buf);
     // echo
-    printf("> %s\n", buf);
+    printf("> %.*s", readbytes, (char*)buf);
     hio_write(io, buf, readbytes);
 }
 
@@ -35,7 +32,6 @@ int main(int argc, char** argv) {
     }
     hio_setcb_close(io, on_close);
     hio_setcb_read(io, on_recvfrom);
-    hio_set_readbuf(io, recvbuf, RECV_BUFSIZE);
     hio_read(io);
     hloop_run(loop);
     hloop_free(&loop);

+ 0 - 8
http/server/HttpServer.cpp

@@ -9,8 +9,6 @@
 #include "HttpHandler.h"
 #include "Http2Session.h"
 
-#define RECV_BUFSIZE    8192
-#define SEND_BUFSIZE    8192
 #define MIN_HTTP_REQUEST        "GET / HTTP/1.1\r\n\r\n"
 #define MIN_HTTP_REQUEST_LEN    14 // exclude CRLF
 
@@ -241,10 +239,8 @@ static void on_accept(hio_t* io) {
             SOCKADDR_STR(hio_peeraddr(io), peeraddrstr));
     */
 
-    HBuf* buf = (HBuf*)hloop_userdata(hevent_loop(io));
     hio_setcb_close(io, on_close);
     hio_setcb_read(io, on_recv);
-    hio_set_readbuf(io, buf->base, buf->len);
     hio_read(io);
     // new HttpHandler
     // delete on_close
@@ -291,10 +287,6 @@ static HTHREAD_ROUTINE(worker_thread) {
     if (server->worker_processes == 0 && server->worker_threads <= 1) {
         server->privdata = (void*)loop;
     }
-    // one loop one readbuf.
-    HBuf readbuf;
-    readbuf.resize(RECV_BUFSIZE);
-    hloop_set_userdata(loop, &readbuf);
     hio_t* listenio = haccept(loop, listenfd, on_accept);
     hevent_set_userdata(listenio, server->service);
     if (server->ssl) {

+ 15 - 10
unittest/webbench.c

@@ -49,6 +49,7 @@ int Connect(const char* host, int port) {
 #define VERSION         "webbench/1.19.3.15"
 
 int verbose = 0;
+int quiet = 0;
 volatile int timerexpired = 0; // for timer
 int time    = 30;
 int clients = 1;
@@ -67,12 +68,13 @@ char buf[1460] = {0};
 
 int mypipe[2]; // IPC
 
-static const char options[] = "?hvV01kt:p:c:";
+static const char options[] = "?hVvq01kt:p:c:";
 
 static const struct option long_options[] = {
     {"help", no_argument, NULL, 'h'},
     {"version", no_argument, NULL, 'V'},
     {"verbose", no_argument, NULL, 'v'},
+    {"quiet", no_argument, NULL, 'q'},
     {"time", required_argument, NULL, 't'},
     {"proxy", required_argument, NULL, 'p'},
     {"clients", required_argument, NULL, 'c'},
@@ -93,6 +95,7 @@ Options:\n\
   -?|-h|--help              Print this information.\n\
   -V|--version              Print version.\n\
   -v|--verbose              Print verbose.\n\
+  -q|--quiet                Print quiet.\n\
   -0|--http10               Use HTTP/1.0 protocol.\n\
   -1|--http11               Use HTTP/1.1 protocol.\n\
   -k|--keepalive            Connection: keep-alive.\n\
@@ -115,6 +118,7 @@ int parse_cmdline(int argc, char** argv) {
         case 'h': print_usage(); exit(1);
         case 'V': puts(VERSION); exit(1);
         case 'v': verbose = 1; break;
+        case 'q': quiet = 1; break;
         case '0': http = 0; break;
         case '1': http = 1; break;
         case 'k': keepalive = 1; break;
@@ -248,7 +252,9 @@ int main(int argc, char** argv) {
         strcat(request, "Connection: close\r\n");
     }
     strcat(request, "\r\n");
-    printf("%s", request);
+    if (!quiet) {
+        printf("%s", request);
+    }
 
     // IPC
     if (pipe(mypipe) < 0) {
@@ -259,7 +265,7 @@ int main(int argc, char** argv) {
     // fork childs
     pid_t pid = 0;
     FILE* fp = NULL;
-    int succeed = 0, failed = 0, bytes = 0;
+    long long succeed = 0, failed = 0, bytes = 0;
     int childs = clients;
     int i;
     for (i = 0; i < childs; ++i) {
@@ -302,7 +308,6 @@ write:
                 }
 read:
                 if (timerexpired) break;
-                memset(buf, 0, sizeof(buf));
                 rdbytes = read(sock, buf, sizeof(buf));
                 if (verbose) {
                     printf("read %d bytes\n", rdbytes);
@@ -312,7 +317,7 @@ read:
                     goto close;
                 }
                 if (verbose) {
-                    printf("%s\n", buf);
+                    printf("%.*s\n", rdbytes, buf);
                 }
                 bytes += rdbytes;
                 ++succeed;
@@ -328,7 +333,7 @@ close:
                 perror("fdopen");
                 return 30;
             }
-            fprintf(fp, "%d %d %d\n", succeed, failed, bytes);
+            fprintf(fp, "%lld %lld %lld\n", succeed, failed, bytes);
             fclose(fp);
             //printf("child[%d] end\n", getpid());
             return 0;
@@ -341,16 +346,16 @@ close:
         return 30;
     }
     while (1) {
-        int i,j,k;
-        fscanf(fp, "%d %d %d", &i, &j, &k);
+        long long i,j,k;
+        fscanf(fp, "%lld %lld %lld", &i, &j, &k);
         succeed += i;
         failed += j;
         bytes += k;
         if (--childs==0) break;
     }
     fclose(fp);
-    printf("recv %d bytes/sec, %d succeed, %d failed\n",
-            (int)(bytes)/time,
+    printf("recv %lld bytes/sec, %lld succeed, %lld failed\n",
+            bytes/time,
             succeed,
             failed);