Browse Source

add keepalive

ithewei 6 năm trước cách đây
mục cha
commit
456c88ddcd
6 tập tin đã thay đổi với 142 bổ sung56 xóa
  1. 11 0
      event/hloop.c
  2. 1 0
      event/hloop.h
  3. 41 22
      examples/webbench.c
  4. 19 2
      http/server/FileCache.h
  5. 29 0
      http/server/HttpHandler.h
  6. 41 32
      http/server/http_server.cpp

+ 11 - 0
event/hloop.c

@@ -278,6 +278,17 @@ htimer_t* htimer_add(hloop_t* loop, htimer_cb cb, uint64_t timeout, uint32_t rep
     return (htimer_t*)timer;
 }
 
+void htimer_reset(htimer_t* timer) {
+    if (timer->event_type != HEVENT_TYPE_TIMEOUT || timer->pending) {
+        return;
+    }
+    hloop_t* loop = timer->loop;
+    htimeout_t* timeout = (htimeout_t*)timer;
+    heap_remove(&loop->timers, &timer->node);
+    timer->next_timeout = hloop_now_hrtime(loop) + timeout->timeout*1000;
+    heap_insert(&loop->timers, &timer->node);
+}
+
 htimer_t* htimer_add_period(hloop_t* loop, htimer_cb cb,
                 int8_t minute,  int8_t hour, int8_t day,
                 int8_t week, int8_t month, uint32_t repeat) {

+ 1 - 0
event/hloop.h

@@ -202,6 +202,7 @@ htimer_t*   htimer_add_period(hloop_t* loop, htimer_cb cb,
                 int8_t minute DEFAULT(0),  int8_t hour  DEFAULT(-1), int8_t day DEFAULT(-1),
                 int8_t week   DEFAULT(-1), int8_t month DEFAULT(-1), uint32_t repeat DEFAULT(INFINITE));
 void        htimer_del(htimer_t* timer);
+void        htimer_reset(htimer_t* timer);
 
 // io
 // frist level apis

+ 41 - 22
examples/webbench.c

@@ -57,6 +57,7 @@ char* proxy_host = NULL;
 int proxy_port = 80;
 int method  = METHOD_GET;
 int http    = 1; // 1=HTTP/1.1 0=HTTP/1.0
+int keepalive = 0;
 const char* url = NULL;
 
 #define REQUEST_SIZE    2048
@@ -65,7 +66,7 @@ char buf[1460] = {0};
 
 int mypipe[2]; // IPC
 
-static const char options[] = "?hV01t:p:c:";
+static const char options[] = "?hV01kt:p:c:";
 
 static const struct option long_options[] = {
     {"help", no_argument, NULL, 'h'},
@@ -75,6 +76,7 @@ static const struct option long_options[] = {
     {"clients", required_argument, NULL, 'c'},
     {"http10", no_argument, NULL, '0'},
     {"http11", no_argument, NULL, '1'},
+    {"keepalive", no_argument, NULL, 'k'},
     {"get", no_argument, &method, METHOD_GET},
     {"head", no_argument, &method, METHOD_HEAD},
     {"options", no_argument, &method, METHOD_OPTIONS},
@@ -90,6 +92,7 @@ Options:\n\
   -V|--version              Print version.\n\
   -0|--http10               Use HTTP/1.0 protocol.\n\
   -1|--http11               Use HTTP/1.1 protocol.\n\
+  -k|--keepalive            Connection: keep-alive.\n\
   -t|--time <sec>           Run benchmark for <sec> seconds. Default 30.\n\
   -p|--proxy <server:port>  Use proxy server for request.\n\
   -c|--clients <n>          Run <n> HTTP clients. Default one.\n\
@@ -110,6 +113,7 @@ int parse_cmdline(int argc, char** argv) {
         case 'V': puts(VERSION); exit(1);
         case '0': http = 0; break;
         case '1': http = 1; break;
+        case 'k': keepalive = 1; break;
         case 't': time = atoi(optarg); break;
         case 'c': clients = atoi(optarg); break;
         case 'p':
@@ -190,7 +194,9 @@ int main(int argc, char** argv) {
         strncpy(host, server, sizeof(host));
         free(server);
     }
-    printf("server %s:%d\n", host, port);
+    char Host[256];
+    snprintf(Host, sizeof(Host), "Host: %s:%d\r\n", host, port);
+    printf("%s", Host);
 
     // test connect
     int sock = Connect(host, port);
@@ -229,8 +235,14 @@ int main(int argc, char** argv) {
     }
     strcat(request, "\r\n");
     strcat(request, "User-Agent: webbench/1.18.3.15\r\n");
+    strcat(request, Host);
     strcat(request, "Cache-Control: no-cache\r\n");
-    strcat(request, "Connection: close\r\n");
+    if (keepalive) {
+        strcat(request, "Connection: keep-alive\r\n");
+    }
+    else {
+        strcat(request, "Connection: close\r\n");
+    }
     strcat(request, "\r\n");
     printf("%s", request);
 
@@ -258,36 +270,43 @@ int main(int argc, char** argv) {
             signal(SIGALRM, alarm_handler);
             alarm(time);
             int sock = -1;
-loop:
+            int len = strlen(request);
+            int wrbytes, rdbytes;
             while (1) {
+connect:
                 if (timerexpired) break;
-                sock = Connect(host, port);
-                if (sock <= 0) {
+                if (sock == -1) {
+                    sock = Connect(host, port);
+                }
+                if (sock < 0) {
                     ++failed;
                     continue;
                 }
-                int len = strlen(request);
-                int wrbytes = write(sock, request, len);
+write:
+                if (timerexpired) break;
+                wrbytes = write(sock, request, len);
                 //printf("write %d bytes\n", wrbytes);
                 if (wrbytes != len) {
                     ++failed;
-                    sock = -1;
-                    continue;
+                    goto close;
                 }
-                while (1) {
-                    if (timerexpired) break;
-                    int rdbytes = read(sock, buf, sizeof(buf));
-                    //printf("read %d bytes\n", rdbytes);
-                    if (rdbytes < 0) {
-                        ++failed;
-                        sock = -1;
-                        goto loop;
-                    }
-                    if (rdbytes == 0) break;
-                    bytes += rdbytes;
+                //printf("%s\n", request);
+read:
+                if (timerexpired) break;
+                rdbytes = read(sock, buf, sizeof(buf));
+                //printf("read %d bytes\n", rdbytes);
+                if (rdbytes <= 0) {
+                    ++failed;
+                    goto close;
                 }
-                close(sock);
+                //printf("%s\n", buf);
+                bytes += rdbytes;
                 ++succeed;
+close:
+                if (!keepalive) {
+                    close(sock);
+                    sock = -1;
+                }
             }
 
             fp = fdopen(mypipe[1], "w");

+ 19 - 2
http/server/FileCache.h

@@ -13,13 +13,17 @@
 #define INVALID_FD  -1
 #endif
 
+#define HTTP_HEADER_MAX_LENGTH      1024 // 1k
+
 typedef struct file_cache_s {
     //std::string filepath;
     struct stat st;
     time_t      open_time;
     time_t      stat_time;
     uint32_t    stat_cnt;
-    HBuf        filebuf;
+    HBuf        buf; // http_header + file_content
+    hbuf_t      filebuf;
+    hbuf_t      httpbuf;
     char        last_modified[64];
     char        etag[64];
     const char* content_type;
@@ -28,6 +32,19 @@ typedef struct file_cache_s {
         stat_cnt = 0;
         content_type = NULL;
     }
+
+    void resize_buf(int filesize) {
+        buf.resize(HTTP_HEADER_MAX_LENGTH + filesize);
+        filebuf.base = buf.base + HTTP_HEADER_MAX_LENGTH;
+        filebuf.len = filesize;
+    }
+
+    void prepend_header(const char* header, int len) {
+        if (len > HTTP_HEADER_MAX_LENGTH) return;
+        httpbuf.base = filebuf.base - len;
+        httpbuf.len = len + filebuf.len;
+        memcpy(httpbuf.base, header, len);
+    }
 } file_cache_t;
 
 // filepath => file_cache_t
@@ -84,7 +101,7 @@ public:
                 fc->stat_cnt = 1;
                 cached_files[filepath] = fc;
             }
-            fc->filebuf.resize(fc->st.st_size);
+            fc->resize_buf(fc->st.st_size);
             read(fd, fc->filebuf.base, fc->filebuf.len);
             close(fd);
             time_t tt = fc->st.st_mtime;

+ 29 - 0
http/server/HttpHandler.h

@@ -4,6 +4,9 @@
 #include "HttpService.h"
 #include "HttpParser.h"
 #include "FileCache.h"
+#include "hloop.h"
+
+#define HTTP_KEEPALIVE_TIMEOUT  75 // s
 
 /*
 <!DOCTYPE html>
@@ -37,6 +40,11 @@ static inline void make_http_status_page(http_status status_code, std::string& p
 </html>)";
 }
 
+static inline void on_keepalive_timeout(htimer_t* timer) {
+    hio_t* io = (hio_t*)timer->userdata;
+    hclose(io);
+}
+
 class HttpHandler {
 public:
     HttpService*            service;
@@ -47,13 +55,24 @@ public:
     HttpRequest             req;
     HttpResponse            res;
     file_cache_t*           fc;
+    hio_t*                  io;
+    htimer_t*               keepalive_timer;
 
     HttpHandler() {
         service = NULL;
         files = NULL;
+        io = NULL;
+        keepalive_timer = NULL;
         init();
     }
 
+    ~HttpHandler() {
+        if (keepalive_timer) {
+            htimer_del(keepalive_timer);
+            keepalive_timer = NULL;
+        }
+    }
+
     void init() {
         parser.parser_request_init(&req);
         req.init();
@@ -61,6 +80,16 @@ public:
         fc = NULL;
     }
 
+    void keepalive() {
+        if (keepalive_timer == NULL) {
+            keepalive_timer = htimer_add(io->loop, on_keepalive_timeout, HTTP_KEEPALIVE_TIMEOUT*1000, 1);
+            keepalive_timer->userdata = io;
+        }
+        else {
+            htimer_reset(keepalive_timer);
+        }
+    }
+
     int handle_request() {
         // preprocessor -> api -> web -> postprocessor
         // preprocessor

+ 41 - 32
http/server/http_server.cpp

@@ -50,52 +50,60 @@ static void on_read(hio_t* io, void* buf, int readbytes) {
     if (parser->get_state() == HP_MESSAGE_COMPLETE) {
         handler->handle_request();
         // prepare header body
+        // Connection:
+        bool keepalive = true;
+        auto iter = handler->req.headers.find("connection");
+        if (iter != handler->req.headers.end()) {
+            if (stricmp(iter->second.c_str(), "keep-alive") == 0) {
+                keepalive = true;
+            }
+            else if (stricmp(iter->second.c_str(), "close") == 0) {
+                keepalive = false;
+            }
+        }
+        if (keepalive) {
+            handler->res.headers["Connection"] = "keep-alive";
+        }
+        else {
+            handler->res.headers["Connection"] = "close";
+        }
+        // Date:
         time_t tt;
         time(&tt);
         char c_str[256] = {0};
         strftime(c_str, sizeof(c_str), "%a, %d %b %Y %H:%M:%S GMT", gmtime(&tt));
         handler->res.headers["Date"] = c_str;
         std::string header = handler->res.dump(true, false);
-        const char* body = NULL;
-        int content_length = 0;
+        hbuf_t sendbuf;
+        bool send_in_one_packet = true;
         if (handler->fc) {
-            body = (const char*)handler->fc->filebuf.base;
-            content_length = handler->fc->filebuf.len;
-        }
-        else {
-            body = handler->res.body.c_str();
-            content_length = handler->res.body.size();
-        }
-        bool send_in_one_packet;
-        if (content_length > (1<<20)) {
-            send_in_one_packet = false;
+            handler->fc->prepend_header(header.c_str(), header.size());
+            sendbuf = handler->fc->httpbuf;
         }
         else {
-            send_in_one_packet = true;
-            if (content_length > 0) {
-                header.insert(header.size(), body, content_length);
+            if (handler->res.body.size() > (1<<20)) {
+                send_in_one_packet = false;
+            } else if (handler->res.body.size() != 0) {
+                header += handler->res.body;
             }
+            sendbuf.base = (char*)header.c_str();
+            sendbuf.len = header.size();
         }
         // send header/body
-        hwrite(io->loop, io->fd, header.c_str(), header.size());
-        if (!send_in_one_packet) {
+        hwrite(io->loop, io->fd, sendbuf.base, sendbuf.len);
+        if (send_in_one_packet == false) {
             // send body
-            hwrite(io->loop, io->fd, body, content_length);
+            hwrite(io->loop, io->fd, handler->res.body.data(), handler->res.body.size());
         }
+
         hlogi("[%s:%d][%s %s]=>[%d %s]",
             handler->srcip, handler->srcport,
             http_method_str(handler->req.method), handler->req.url.c_str(),
             handler->res.status_code, http_status_str(handler->res.status_code));
-        // Connection: Keep-Alive
-        bool keep_alive = false;
-        auto iter = handler->req.headers.find("connection");
-        if (iter != handler->req.headers.end()) {
-            if (stricmp(iter->second.c_str(), "keep-alive") == 0) {
-                keep_alive = true;
-            }
-        }
-        if (keep_alive) {
+
+        if (keepalive) {
             handler->init();
+            handler->keepalive();
         }
         else {
             hclose(io);
@@ -122,6 +130,11 @@ static void on_accept(hio_t* io, int connfd) {
     //printd("accept listenfd=%d connfd=%d [%s:%d] <= [%s:%d]\n", io->fd, connfd,
             //localip, ntohs(localaddr->sin_port),
             //peerip, ntohs(peeraddr->sin_port));
+
+    nonblocking(connfd);
+    HBuf* buf = (HBuf*)io->loop->userdata;
+    hio_t* connio = hread(io->loop, connfd, buf->base, buf->len, on_read);
+    connio->close_cb = on_close;
     // new HttpHandler
     // delete on_close
     HttpHandler* handler = new HttpHandler;
@@ -129,11 +142,7 @@ static void on_accept(hio_t* io, int connfd) {
     handler->files = &s_filecache;
     inet_ntop(peeraddr->sin_family, &peeraddr->sin_addr, handler->srcip, sizeof(handler->srcip));
     handler->srcport = ntohs(peeraddr->sin_port);
-
-    nonblocking(connfd);
-    HBuf* buf = (HBuf*)io->loop->userdata;
-    hio_t* connio = hread(io->loop, connfd, buf->base, buf->len, on_read);
-    connio->close_cb = on_close;
+    handler->io = connio;
     connio->userdata = handler;
 }