Quellcode durchsuchen

http_async_handler(HttpRequestPtr, HttpResponseWriterPtr)

hewei.it vor 4 Jahren
Ursprung
Commit
b457fef203

+ 3 - 3
http/Http1Parser.h

@@ -1,5 +1,5 @@
-#ifndef HTTP1_PARSER_H_
-#define HTTP1_PARSER_H_
+#ifndef HV_HTTP1_PARSER_H_
+#define HV_HTTP1_PARSER_H_
 
 #include "HttpParser.h"
 #include "http_parser.h"
@@ -135,4 +135,4 @@ public:
     }
 };
 
-#endif // HTTP1_PARSER_H_
+#endif // HV_HTTP1_PARSER_H_

+ 3 - 3
http/Http2Parser.h

@@ -1,5 +1,5 @@
-#ifndef HTTP2_PARSER_H_
-#define HTTP2_PARSER_H_
+#ifndef HV_HTTP2_PARSER_H_
+#define HV_HTTP2_PARSER_H_
 
 #ifdef WITH_NGHTTP2
 #include "HttpParser.h"
@@ -86,4 +86,4 @@ public:
 
 #endif
 
-#endif // HTTP2_PARSER_H_
+#endif // HV_HTTP2_PARSER_H_

+ 21 - 0
http/HttpMessage.cpp

@@ -287,6 +287,27 @@ void HttpMessage::FillContentLength() {
     }
 }
 
+bool HttpMessage::IsKeepAlive() {
+    bool keepalive = true;
+    auto iter = headers.find("connection");
+    if (iter != headers.end()) {
+        const char* keepalive_value = iter->second.c_str();
+        if (stricmp(keepalive_value, "keep-alive") == 0) {
+            keepalive = true;
+        }
+        else if (stricmp(keepalive_value, "close") == 0) {
+            keepalive = false;
+        }
+        else if (stricmp(keepalive_value, "upgrade") == 0) {
+            keepalive = true;
+        }
+    }
+    else if (http_major == 1 && http_minor == 0) {
+        keepalive = false;
+    }
+    return keepalive;
+}
+
 void HttpMessage::DumpHeaders(std::string& str) {
     FillContentType();
     FillContentLength();

+ 5 - 3
http/HttpMessage.h

@@ -1,5 +1,5 @@
-#ifndef HTTP_MESSAGE_H_
-#define HTTP_MESSAGE_H_
+#ifndef HV_HTTP_MESSAGE_H_
+#define HV_HTTP_MESSAGE_H_
 
 /*
  * @class HttpMessage
@@ -191,6 +191,8 @@ public:
     // body.size -> content_length <-> headers Content-Length
     void FillContentLength();
 
+    bool IsKeepAlive();
+
     std::string GetHeader(const char* key, const std::string& defvalue = "") {
         auto iter = headers.find(key);
         if (iter != headers.end()) {
@@ -391,4 +393,4 @@ typedef std::shared_ptr<HttpRequest>    HttpRequestPtr;
 typedef std::shared_ptr<HttpResponse>   HttpResponsePtr;
 typedef std::function<void(const HttpResponsePtr&)> HttpResponseCallback;
 
-#endif // HTTP_MESSAGE_H_
+#endif // HV_HTTP_MESSAGE_H_

+ 3 - 3
http/HttpParser.h

@@ -1,5 +1,5 @@
-#ifndef HTTP_PARSER_H_
-#define HTTP_PARSER_H_
+#ifndef HV_HTTP_PARSER_H_
+#define HV_HTTP_PARSER_H_
 
 #include "hexport.h"
 #include "HttpMessage.h"
@@ -48,4 +48,4 @@ public:
 
 typedef std::shared_ptr<HttpParser> HttpParserPtr;
 
-#endif // HTTP_PARSER_H_
+#endif // HV_HTTP_PARSER_H_

+ 8 - 5
http/WebSocketChannel.h

@@ -19,20 +19,23 @@ public:
 
     // isConnected, send, close
 
-    int send(const std::string& msg, enum ws_opcode opcode DEFAULT(WS_OPCODE_TEXT)) {
+    int send(const std::string& msg, enum ws_opcode opcode = WS_OPCODE_TEXT) {
+        return send(msg.c_str(), msg.size(), opcode);
+    }
+
+    int send(const char* buf, int len, enum ws_opcode opcode = WS_OPCODE_BINARY) {
         bool has_mask = false;
         char mask[4] = {0};
         if (type == WS_CLIENT) {
             has_mask = true;
             *(int*)mask = rand();
         }
-        int frame_size = ws_calc_frame_size(msg.size(), has_mask);
+        int frame_size = ws_calc_frame_size(len, has_mask);
         if (sendbuf.len < frame_size) {
             sendbuf.resize(ceil2e(frame_size));
         }
-        ws_build_frame(sendbuf.base, msg.c_str(), msg.size(), mask, has_mask, opcode);
-        Buffer buf(sendbuf.base, frame_size);
-        return write(&buf);
+        ws_build_frame(sendbuf.base, buf, len, mask, has_mask, opcode);
+        return write(sendbuf.base, frame_size);
     }
 
 private:

+ 2 - 4
http/client/AsyncHttpClient.cpp

@@ -71,11 +71,9 @@ int AsyncHttpClient::doTask(const HttpClientTaskPtr& task) {
             return;
         }
         if (ctx->parser->IsComplete()) {
-            std::string req_connection = ctx->task->req->GetHeader("Connection");
-            std::string resp_connection = ctx->resp->GetHeader("Connection");
+            bool keepalive = ctx->task->req->IsKeepAlive() && ctx->resp->IsKeepAlive();
             ctx->successCallback();
-            if (stricmp(req_connection.c_str(), "keep-alive") == 0 &&
-                stricmp(resp_connection.c_str(), "keep-alive") == 0) {
+            if (keepalive) {
                 // NOTE: add into conn_pools to reuse
                 // hlogd("add into conn_pools");
                 conn_pools[channel->peeraddr()].add(channel->fd());

+ 5 - 1
http/client/WebSocketClient.cpp

@@ -155,8 +155,12 @@ int WebSocketClient::close() {
 }
 
 int WebSocketClient::send(const std::string& msg) {
+    return send(msg.c_str(), msg.size(), WS_OPCODE_TEXT);
+}
+
+int WebSocketClient::send(const char* buf, int len, enum ws_opcode opcode) {
     if (channel == NULL) return -1;
-    return channel->send(msg);
+    return channel->send(buf, len, opcode);
 }
 
 }

+ 1 - 0
http/client/WebSocketClient.h

@@ -29,6 +29,7 @@ public:
     int open(const char* url);
     int close();
     int send(const std::string& msg);
+    int send(const char* buf, int len, enum ws_opcode opcode = WS_OPCODE_BINARY);
 
 private:
     enum State {

+ 3 - 3
http/client/http_client.h

@@ -1,5 +1,5 @@
-#ifndef HTTP_CLIENT_H_
-#define HTTP_CLIENT_H_
+#ifndef HV_HTTP_CLIENT_H_
+#define HV_HTTP_CLIENT_H_
 
 #include "hexport.h"
 #include "HttpMessage.h"
@@ -55,4 +55,4 @@ HV_EXPORT int http_client_send(HttpRequest* req, HttpResponse* resp);
 // http_client_send_async(&default_async_client, ...)
 HV_EXPORT int http_client_send_async(HttpRequestPtr req, HttpResponseCallback resp_cb = NULL);
 
-#endif  // HTTP_CLIENT_H_
+#endif // HV_HTTP_CLIENT_H_

+ 3 - 3
http/grpcdef.h

@@ -1,5 +1,5 @@
-#ifndef GRPC_DEF_H_
-#define GRPC_DEF_H_
+#ifndef HV_GRPC_DEF_H_
+#define HV_GRPC_DEF_H_
 
 #ifdef __cplusplus
 extern "C" {
@@ -121,4 +121,4 @@ static inline long long varint_decode(const unsigned char* buf, int* len) {
 }
 #endif
 
-#endif // GRPC_DEF_H_
+#endif // HV_GRPC_DEF_H_

+ 3 - 3
http/http2def.h

@@ -1,5 +1,5 @@
-#ifndef HTTP2_DEF_H_
-#define HTTP2_DEF_H_
+#ifndef HV_HTTP2_DEF_H_
+#define HV_HTTP2_DEF_H_
 
 #ifdef __cplusplus
 extern "C" {
@@ -83,4 +83,4 @@ static inline void http2_frame_hd_unpack(const unsigned char* buf, http2_frame_h
 }
 #endif
 
-#endif
+#endif // HV_HTTP2_DEF_H_

+ 3 - 3
http/http_content.h

@@ -1,5 +1,5 @@
-#ifndef HTTP_CONTENT_H_
-#define HTTP_CONTENT_H_
+#ifndef HV_HTTP_CONTENT_H_
+#define HV_HTTP_CONTENT_H_
 
 #include "hexport.h"
 #include "hstring.h"
@@ -63,4 +63,4 @@ HV_EXPORT std::string dump_json(hv::Json& json);
 HV_EXPORT int         parse_json(const char* str, hv::Json& json, std::string& errmsg);
 #endif
 
-#endif // HTTP_CONTENT_H_
+#endif // HV_HTTP_CONTENT_H_

+ 3 - 3
http/httpdef.h

@@ -1,5 +1,5 @@
-#ifndef HTTP_DEF_H_
-#define HTTP_DEF_H_
+#ifndef HV_HTTP_DEF_H_
+#define HV_HTTP_DEF_H_
 
 #include "hexport.h"
 
@@ -181,4 +181,4 @@ HV_EXPORT enum http_content_type http_content_type_enum_by_suffix(const char* su
 
 END_EXTERN_C
 
-#endif // HTTP_DEF_H_
+#endif // HV_HTTP_DEF_H_

+ 103 - 59
http/server/HttpHandler.cpp

@@ -6,43 +6,54 @@
 int HttpHandler::HandleHttpRequest() {
     // preprocessor -> api -> web -> postprocessor
 
-    int ret = 0;
-    http_api_handler api = NULL;
+    int status_code = 200;
+    http_sync_handler sync_handler = NULL;
+    http_async_handler async_handler = NULL;
 
-    req.ParseUrl();
-    req.client_addr.ip = ip;
-    req.client_addr.port = port;
+    HttpRequest* pReq = req.get();
+    HttpResponse* pResp = resp.get();
+
+    pReq->ParseUrl();
+    pReq->client_addr.ip = ip;
+    pReq->client_addr.port = port;
 
 preprocessor:
+    state = HANDLE_BEGIN;
     if (service->preprocessor) {
-        ret = service->preprocessor(&req, &res);
-        if (ret != 0) {
+        status_code = service->preprocessor(pReq, pResp);
+        if (status_code != 0) {
             goto make_http_status_page;
         }
     }
 
     if (service->api_handlers.size() != 0) {
-        service->GetApi(&req, &api);
+        service->GetApi(pReq, &sync_handler, &async_handler);
     }
 
-    if (api) {
-        // api service
-        ret = api(&req, &res);
-        if (ret != 0) {
+    if (sync_handler) {
+        // sync api service
+        status_code = sync_handler(pReq, pResp);
+        if (status_code != 0) {
             goto make_http_status_page;
         }
     }
+    else if (async_handler) {
+        // async api service
+        async_handler(req, writer);
+        status_code = 0;
+    }
     else if (service->document_root.size() != 0 &&
-            (req.method == HTTP_GET || req.method == HTTP_HEAD)) {
-        // web service
-        // path safe check
-        const char* s = req.path.c_str();
+            (pReq->method == HTTP_GET || pReq->method == HTTP_HEAD)) {
+        // file service
+        status_code = 200;
+        const char* s = pReq->path.c_str();
         const char* e = s;
         while (*e && *e != '?' && *e != '#') ++e;
         std::string path = std::string(s, e);
         const char* req_path = path.c_str();
+        // path safe check
         if (*req_path != '/' || strstr(req_path, "/../")) {
-            res.status_code = HTTP_STATUS_BAD_REQUEST;
+            pResp->status_code = HTTP_STATUS_BAD_REQUEST;
             goto make_http_status_page;
         }
         std::string filepath = service->document_root;
@@ -56,26 +67,26 @@ preprocessor:
             is_index_of = true;
         }
         if (!is_dir || is_index_of) {
-            bool need_read = req.method == HTTP_HEAD ? false : true;
+            bool need_read = pReq->method == HTTP_HEAD ? false : true;
             fc = files->Open(filepath.c_str(), need_read, (void*)req_path);
         }
         if (fc == NULL) {
             // Not Found
-            ret = HTTP_STATUS_NOT_FOUND;
+            status_code = HTTP_STATUS_NOT_FOUND;
         }
         else {
             // Not Modified
-            auto iter = req.headers.find("if-not-match");
-            if (iter != req.headers.end() &&
+            auto iter = pReq->headers.find("if-not-match");
+            if (iter != pReq->headers.end() &&
                 strcmp(iter->second.c_str(), fc->etag) == 0) {
-                ret = HTTP_STATUS_NOT_MODIFIED;
+                status_code = HTTP_STATUS_NOT_MODIFIED;
                 fc = NULL;
             }
             else {
-                iter = req.headers.find("if-modified-since");
-                if (iter != req.headers.end() &&
+                iter = pReq->headers.find("if-modified-since");
+                if (iter != pReq->headers.end() &&
                     strcmp(iter->second.c_str(), fc->last_modified) == 0) {
-                    ret = HTTP_STATUS_NOT_MODIFIED;
+                    status_code = HTTP_STATUS_NOT_MODIFIED;
                     fc = NULL;
                 }
             }
@@ -83,14 +94,14 @@ preprocessor:
     }
     else {
         // Not Implemented
-        ret = HTTP_STATUS_NOT_IMPLEMENTED;
+        status_code = HTTP_STATUS_NOT_IMPLEMENTED;
     }
 
 make_http_status_page:
-    if (ret >= 100 && ret < 600) {
-        res.status_code = (http_status)ret;
+    if (status_code >= 100 && status_code < 600) {
+        pResp->status_code = (http_status)status_code;
     }
-    if (res.status_code >= 400 && res.body.size() == 0 && req.method != HTTP_HEAD) {
+    if (pResp->status_code >= 400 && pResp->body.size() == 0 && pReq->method != HTTP_HEAD) {
         // error page
         if (service->error_page.size() != 0) {
             std::string filepath = service->document_root;
@@ -99,37 +110,70 @@ make_http_status_page:
             fc = files->Open(filepath.c_str(), true, NULL);
         }
         // status page
-        if (fc == NULL && res.body.size() == 0) {
-            res.content_type = TEXT_HTML;
-            make_http_status_page(res.status_code, res.body);
+        if (fc == NULL && pResp->body.size() == 0) {
+            pResp->content_type = TEXT_HTML;
+            make_http_status_page(pResp->status_code, pResp->body);
         }
     }
 
     if (fc) {
-        res.content = fc->filebuf.base;
-        res.content_length = fc->filebuf.len;
+        pResp->content = fc->filebuf.base;
+        pResp->content_length = fc->filebuf.len;
         if (fc->content_type && *fc->content_type != '\0') {
-            res.headers["Content-Type"] = fc->content_type;
+            pResp->headers["Content-Type"] = fc->content_type;
         }
-        res.headers["Last-Modified"] = fc->last_modified;
-        res.headers["Etag"] = fc->etag;
+        pResp->headers["Last-Modified"] = fc->last_modified;
+        pResp->headers["Etag"] = fc->etag;
     }
 
 postprocessor:
     if (service->postprocessor) {
-        ret = service->postprocessor(&req, &res);
+        service->postprocessor(pReq, pResp);
     }
 
-    state = WANT_SEND;
-    return ret;
+    if (status_code == 0) {
+        state = HANDLE_CONTINUE;
+    } else {
+        state = HANDLE_END;
+        parser->SubmitResponse(pResp);
+    }
+    return status_code;
+}
+
+int HttpHandler::FeedRecvData(const char* data, size_t len) {
+    int nfeed = 0;
+    if (protocol == HttpHandler::WEBSOCKET) {
+        nfeed = ws->parser->FeedRecvData(data, len);
+        if (nfeed != len) {
+            hloge("[%s:%d] websocket parse error!", ip, port);
+        }
+    } else {
+        if (state != WANT_RECV) {
+            Reset();
+        }
+        nfeed = parser->FeedRecvData(data, len);
+        if (nfeed != len) {
+            hloge("[%s:%d] http parse error: %s", ip, port, parser->StrError(parser->GetError()));
+        }
+    }
+    return nfeed;
 }
 
 int HttpHandler::GetSendData(char** data, size_t* len) {
+    if (state == HANDLE_CONTINUE) {
+        return 0;
+    }
+
+    HttpRequest* pReq = req.get();
+    HttpResponse* pResp = resp.get();
+
     if (protocol == HTTP_V1) {
         switch(state) {
         case WANT_RECV:
             if (parser->IsComplete()) state = WANT_SEND;
             else return 0;
+        case HANDLE_END:
+             state = WANT_SEND;
         case WANT_SEND:
             state = SEND_HEADER;
         case SEND_HEADER:
@@ -137,13 +181,13 @@ int HttpHandler::GetSendData(char** data, size_t* len) {
             int content_length = 0;
             const char* content = NULL;
             // HEAD
-            if (req.method == HTTP_HEAD) {
+            if (pReq->method == HTTP_HEAD) {
                 if (fc) {
-                    res.headers["Accept-Ranges"] = "bytes";
-                    res.headers["Content-Length"] = hv::to_string(fc->st.st_size);
+                    pResp->headers["Accept-Ranges"] = "bytes";
+                    pResp->headers["Content-Length"] = hv::to_string(fc->st.st_size);
                 } else {
-                    res.headers["Content-Type"] = "text/html";
-                    res.headers["Content-Length"] = "0";
+                    pResp->headers["Content-Type"] = "text/html";
+                    pResp->headers["Content-Length"] = "0";
                 }
                 state = SEND_DONE;
                 goto return_nobody;
@@ -153,29 +197,29 @@ int HttpHandler::GetSendData(char** data, size_t* len) {
                 long from, to, total;
                 int nread;
                 // Range:
-                if (req.GetRange(from, to)) {
+                if (pReq->GetRange(from, to)) {
                     HFile file;
                     if (file.open(fc->filepath.c_str(), "rb") != 0) {
-                        res.status_code = HTTP_STATUS_NOT_FOUND;
+                        pResp->status_code = HTTP_STATUS_NOT_FOUND;
                         state = SEND_DONE;
                         goto return_nobody;
                     }
                     total = file.size();
                     if (to == 0 || to >= total) to = total - 1;
-                    res.content_length = to - from + 1;
+                    pResp->content_length = to - from + 1;
                     nread = file.readrange(body, from, to);
-                    if (nread != res.content_length) {
-                        res.status_code = HTTP_STATUS_INTERNAL_SERVER_ERROR;
+                    if (nread != pResp->content_length) {
+                        pResp->status_code = HTTP_STATUS_INTERNAL_SERVER_ERROR;
                         state = SEND_DONE;
                         goto return_nobody;
                     }
-                    res.SetRange(from, to, total);
+                    pResp->SetRange(from, to, total);
                     state = SEND_BODY;
                     goto return_header;
                 }
                 // FileCache
                 // NOTE: no copy filebuf, more efficient
-                header = res.Dump(true, false);
+                header = pResp->Dump(true, false);
                 fc->prepend_header(header.c_str(), header.size());
                 *data = fc->httpbuf.base;
                 *len = fc->httpbuf.len;
@@ -183,15 +227,15 @@ int HttpHandler::GetSendData(char** data, size_t* len) {
                 return *len;
             }
             // API service
-            content_length = res.ContentLength();
-            content = (const char*)res.Content();
+            content_length = pResp->ContentLength();
+            content = (const char*)pResp->Content();
             if (content) {
                 if (content_length > (1 << 20)) {
                     state = SEND_BODY;
                     goto return_header;
                 } else {
                     // NOTE: header+body in one package if <= 1M
-                    header = res.Dump(true, false);
+                    header = pResp->Dump(true, false);
                     header.append(content, content_length);
                     state = SEND_DONE;
                     goto return_header;
@@ -201,9 +245,9 @@ int HttpHandler::GetSendData(char** data, size_t* len) {
                 goto return_header;
             }
 return_nobody:
-            res.content_length = 0;
+            pResp->content_length = 0;
 return_header:
-            if (header.empty()) header = res.Dump(true, false);
+            if (header.empty()) header = pResp->Dump(true, false);
             *data = (char*)header.c_str();
             *len = header.size();
             return *len;
@@ -211,8 +255,8 @@ return_header:
         case SEND_BODY:
         {
             if (body.empty()) {
-                *data = (char*)res.Content();
-                *len = res.ContentLength();
+                *data = (char*)pResp->Content();
+                *len = pResp->ContentLength();
             } else {
                 *data = (char*)body.c_str();
                 *len = body.size();

+ 47 - 8
http/server/HttpHandler.h

@@ -1,5 +1,5 @@
-#ifndef HTTP_HANDLER_H_
-#define HTTP_HANDLER_H_
+#ifndef HV_HTTP_HANDLER_H_
+#define HV_HTTP_HANDLER_H_
 
 #include "HttpService.h"
 #include "HttpParser.h"
@@ -55,6 +55,9 @@ public:
     } protocol;
     enum State {
         WANT_RECV,
+        HANDLE_BEGIN,
+        HANDLE_CONTINUE,
+        HANDLE_END,
         WANT_SEND,
         SEND_HEADER,
         SEND_BODY,
@@ -69,8 +72,9 @@ public:
     HttpService             *service;
     FileCache               *files;
 
-    HttpRequest             req;
-    HttpResponse            res;
+    HttpRequestPtr          req;
+    HttpResponsePtr         resp;
+    HttpResponseWriterPtr   writer;
     HttpParserPtr           parser;
 
     // for GetSendData
@@ -90,12 +94,46 @@ public:
         ws_cbs = NULL;
     }
 
+    bool Init(int http_version = 1) {
+        parser.reset(HttpParser::New(HTTP_SERVER, (enum http_version)http_version));
+        if (parser == NULL) {
+            return false;
+        }
+        protocol = http_version == 1 ? HTTP_V1 : HTTP_V2;
+        req.reset(new HttpRequest);
+        resp.reset(new HttpResponse);
+        if (http_version == 2) {
+            req->http_major = 2;
+            req->http_minor = 0;
+            resp->http_major = 2;
+            resp->http_minor = 0;
+        }
+        parser->InitRequest(req.get());
+        return true;
+    }
+
+    bool SwitchHTTP2() {
+        parser.reset(HttpParser::New(HTTP_SERVER, ::HTTP_V2));
+        if (parser == NULL) {
+            return false;
+        }
+        protocol = HTTP_V2;
+        req->http_major = 2;
+        req->http_minor = 0;
+        resp->http_major = 2;
+        resp->http_minor = 0;
+        parser->InitRequest(req.get());
+        return true;
+    }
+
     void Reset() {
         state = WANT_RECV;
-        req.Reset();
-        res.Reset();
+        req->Reset();
+        resp->Reset();
+        parser->InitRequest(req.get());
     }
 
+    int FeedRecvData(const char* data, size_t len);
     // @workflow: preprocessor -> api -> web -> postprocessor
     // @result: HttpRequest -> HttpResponse/file_cache_t
     int HandleHttpRequest();
@@ -104,12 +142,13 @@ public:
     // websocket
     WebSocketHandler* SwitchWebSocket() {
         ws.reset(new WebSocketHandler);
+        protocol = WEBSOCKET;
         return ws.get();
     }
     void WebSocketOnOpen() {
         ws->onopen();
         if (ws_cbs && ws_cbs->onopen) {
-            ws_cbs->onopen(ws->channel, req.url);
+            ws_cbs->onopen(ws->channel, req->url);
         }
     }
     void WebSocketOnClose() {
@@ -125,4 +164,4 @@ public:
     }
 };
 
-#endif // HTTP_HANDLER_H_
+#endif // HV_HTTP_HANDLER_H_

+ 99 - 0
http/server/HttpResponseWriter.h

@@ -0,0 +1,99 @@
+#ifndef HV_HTTP_RESPONSE_WRITER_H_
+#define HV_HTTP_RESPONSE_WRITER_H_
+
+#include "Channel.h"
+#include "HttpMessage.h"
+
+namespace hv {
+
+class HttpResponseWriter : public SocketChannel {
+public:
+    HttpResponsePtr resp;
+    enum State {
+        SEND_BEGIN,
+        SEND_HEADER,
+        SEND_BODY,
+        SEND_END,
+    } state;
+    HttpResponseWriter(hio_t* io, const HttpResponsePtr& _resp)
+        : SocketChannel(io)
+        , resp(_resp)
+        , state(SEND_BEGIN)
+    {}
+    ~HttpResponseWriter() {}
+
+    int Begin() {
+        state = SEND_BEGIN;
+        return 0;
+    }
+
+    int WriteStatus(http_status status_codes) {
+        resp->status_code = status_codes;
+        return 0;
+    }
+
+    int WriteHeader(const char* key, const char* value) {
+        resp->headers[key] = value;
+        return 0;
+    }
+
+    int EndHeaders(const char* key = NULL, const char* value = NULL) {
+        if (state != SEND_BEGIN) return -1;
+        if (key && value) {
+            resp->headers[key] = value;
+        }
+        std::string headers = resp->Dump(true, false);
+        state = SEND_HEADER;
+        return write(headers);
+    }
+
+    int WriteBody(const char* buf, int len = -1) {
+        if (len == -1) len = strlen(buf);
+        if (state == SEND_BEGIN) {
+            resp->body.append(buf, len);
+            return len;
+        } else {
+            state = SEND_BODY;
+            return write(buf, len);
+        }
+    }
+
+    int WriteBody(const std::string& str) {
+        return WriteBody(str.c_str(), str.size());
+    }
+
+    int End(const char* buf = NULL, int len = -1) {
+        if (state == SEND_END) return 0;
+        int ret = 0;
+        if (buf) {
+            ret = WriteBody(buf, len);
+        }
+        bool is_dump_headers = true;
+        bool is_dump_body = true;
+        if (state == SEND_HEADER) {
+            is_dump_headers = false;
+        } else if (state == SEND_BODY) {
+            is_dump_headers = false;
+            is_dump_body = false;
+        }
+        if (is_dump_body) {
+            std::string msg = resp->Dump(is_dump_headers, is_dump_body);
+            ret = write(msg);
+        }
+        state = SEND_END;
+        if (!resp->IsKeepAlive()) {
+            close();
+        }
+        return ret;
+    }
+
+    int End(const std::string& str) {
+        return End(str.c_str(), str.size());
+    }
+};
+
+}
+
+typedef std::shared_ptr<hv::HttpResponseWriter> HttpResponseWriterPtr;
+
+#endif // HV_HTTP_RESPONSE_WRITER_H_

+ 32 - 70
http/server/HttpServer.cpp

@@ -79,21 +79,11 @@ static void on_recv(hio_t* io, void* _buf, int readbytes) {
     const char* buf = (const char*)_buf;
     HttpHandler* handler = (HttpHandler*)hevent_userdata(io);
 
-    if (handler->protocol == HttpHandler::WEBSOCKET) {
-        WebSocketParser* parser = handler->ws->parser.get();
-        int nfeed = parser->FeedRecvData(buf, readbytes);
-        if (nfeed != readbytes) {
-            hloge("[%s:%d] websocket parse error!", handler->ip, handler->port);
-            hio_close(io);
-        }
-        return;
-    }
-
-    // HTTP1 / HTTP2 -> HttpParser -> InitRequest
-    // recv -> FeedRecvData -> !WantRecv -> HttpRequest ->
-    // HandleRequest -> HttpResponse -> SubmitResponse -> while (GetSendData) -> send
+    // HttpHandler::Init(http_version) -> upgrade ? SwitchHTTP2 / SwitchWebSocket
+    // on_recv -> FeedRecvData -> HttpRequest
+    // onComplete -> HandleRequest -> HttpResponse -> while (GetSendData) -> send
 
-    if (handler->parser == NULL) {
+    if (handler->protocol == HttpHandler::UNKNOWN) {
         // check request-line
         if (readbytes < MIN_HTTP_REQUEST_LEN) {
             hloge("[%s:%d] http request-line too small", handler->ip, handler->port);
@@ -107,67 +97,49 @@ static void on_recv(hio_t* io, void* _buf, int readbytes) {
                 return;
             }
         }
-        http_version version = HTTP_V1;
-        handler->protocol = HttpHandler::HTTP_V1;
+        int http_version = 1;
         if (strncmp((char*)buf, HTTP2_MAGIC, MIN(readbytes, HTTP2_MAGIC_LEN)) == 0) {
-            version = HTTP_V2;
-            handler->protocol = HttpHandler::HTTP_V2;
-            handler->req.http_major = 2;
-            handler->req.http_minor = 0;
+            http_version = 2;
         }
-        handler->parser = HttpParserPtr(HttpParser::New(HTTP_SERVER, version));
-        if (handler->parser == NULL) {
-            hloge("[%s:%d] unsupported HTTP%d", handler->ip, handler->port, (int)version);
+        if (!handler->Init(http_version)) {
+            hloge("[%s:%d] unsupported HTTP%d", handler->ip, handler->port, http_version);
             hio_close(io);
             return;
         }
-        handler->parser->InitRequest(&handler->req);
+        handler->writer.reset(new HttpResponseWriter(io, handler->resp));
     }
 
-    HttpParser* parser = handler->parser.get();
-    HttpRequest* req = &handler->req;
-    HttpResponse* res = &handler->res;
-
-    int nfeed = parser->FeedRecvData(buf, readbytes);
+    int nfeed = handler->FeedRecvData(buf, readbytes);
     if (nfeed != readbytes) {
-        hloge("[%s:%d] http parse error: %s", handler->ip, handler->port, parser->StrError(parser->GetError()));
         hio_close(io);
         return;
     }
 
+    if (handler->protocol == HttpHandler::WEBSOCKET) {
+        return;
+    }
+
+    HttpParser* parser = handler->parser.get();
     if (parser->WantRecv()) {
         return;
     }
 
+    HttpRequest* req = handler->req.get();
+    HttpResponse* resp = handler->resp.get();
+
     // Server:
     static char s_Server[64] = {'\0'};
     if (s_Server[0] == '\0') {
         snprintf(s_Server, sizeof(s_Server), "httpd/%s", hv_compile_version());
     }
-    res->headers["Server"] = s_Server;
+    resp->headers["Server"] = s_Server;
 
     // Connection:
-    bool keepalive = true;
-    auto iter_keepalive = req->headers.find("connection");
-    if (iter_keepalive != req->headers.end()) {
-        const char* keepalive_value = iter_keepalive->second.c_str();
-        if (stricmp(keepalive_value, "keep-alive") == 0) {
-            keepalive = true;
-        }
-        else if (stricmp(keepalive_value, "close") == 0) {
-            keepalive = false;
-        }
-        else if (stricmp(keepalive_value, "upgrade") == 0) {
-            keepalive = true;
-        }
-    }
-    else if (req->http_major == 1 && req->http_minor == 0) {
-        keepalive = false;
-    }
+    bool keepalive = req->IsKeepAlive();
     if (keepalive) {
-        res->headers["Connection"] = "keep-alive";
+        resp->headers["Connection"] = "keep-alive";
     } else {
-        res->headers["Connection"] = "close";
+        resp->headers["Connection"] = "close";
     }
 
     // Upgrade:
@@ -186,14 +158,14 @@ static void on_recv(hio_t* io, void* _buf, int readbytes) {
             Upgrade: websocket
             Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
             */
-            res->status_code = HTTP_STATUS_SWITCHING_PROTOCOLS;
-            res->headers["Connection"] = "Upgrade";
-            res->headers["Upgrade"] = "websocket";
+            resp->status_code = HTTP_STATUS_SWITCHING_PROTOCOLS;
+            resp->headers["Connection"] = "Upgrade";
+            resp->headers["Upgrade"] = "websocket";
             auto iter_key = req->headers.find(SEC_WEBSOCKET_KEY);
             if (iter_key != req->headers.end()) {
                 char ws_accept[32] = {0};
                 ws_encode_key(iter_key->second.c_str(), ws_accept);
-                res->headers[SEC_WEBSOCKET_ACCEPT] = ws_accept;
+                resp->headers[SEC_WEBSOCKET_ACCEPT] = ws_accept;
             }
             upgrade_protocol = HttpHandler::WEBSOCKET;
         }
@@ -205,14 +177,12 @@ static void on_recv(hio_t* io, void* _buf, int readbytes) {
             Upgrade: h2c
             */
             hio_write(io, HTTP2_UPGRADE_RESPONSE, strlen(HTTP2_UPGRADE_RESPONSE));
-            parser = HttpParser::New(HTTP_SERVER, HTTP_V2);
-            if (parser == NULL) {
+            if (!handler->SwitchHTTP2()) {
                 hloge("[%s:%d] unsupported HTTP2", handler->ip, handler->port);
                 hio_close(io);
                 return;
             }
-            handler->parser.reset(parser);
-            parser->InitRequest(req);
+            parser = handler->parser.get();
         }
         else {
             hio_close(io);
@@ -220,9 +190,9 @@ static void on_recv(hio_t* io, void* _buf, int readbytes) {
         }
     }
 
+    int status_code = 200;
     if (parser->IsComplete() && !upgrade) {
-        handler->HandleHttpRequest();
-        parser->SubmitResponse(res);
+        status_code = handler->HandleHttpRequest();
     }
 
     char* data = NULL;
@@ -240,17 +210,13 @@ static void on_recv(hio_t* io, void* _buf, int readbytes) {
         hloop_pid(loop), hloop_tid(loop),
         handler->ip, handler->port,
         http_method_str(req->method), req->path.c_str(),
-        res->status_code, res->status_message());
+        resp->status_code, resp->status_message());
 
     // switch protocol to websocket
     if (upgrade && upgrade_protocol == HttpHandler::WEBSOCKET) {
         WebSocketHandler* ws = handler->SwitchWebSocket();
-        handler->protocol = HttpHandler::WEBSOCKET;
         ws->channel.reset(new WebSocketChannel(io, WS_SERVER));
         ws->parser->onMessage = std::bind(websocket_onmessage, std::placeholders::_1, std::placeholders::_2, io);
-        // NOTE: need to reset callbacks
-        hio_setcb_read(io, on_recv);
-        hio_setcb_close(io, on_close);
         // NOTE: cancel keepalive timer, judge alive by heartbeat.
         hio_set_keepalive_timeout(io, 0);
         hio_set_heartbeat(io, HIO_DEFAULT_HEARTBEAT_INTERVAL, websocket_heartbeat);
@@ -259,11 +225,7 @@ static void on_recv(hio_t* io, void* _buf, int readbytes) {
         return;
     }
 
-    // keep-alive
-    if (keepalive) {
-        handler->Reset();
-        parser->InitRequest(req);
-    } else {
+    if (status_code && !keepalive) {
         hio_close(io);
     }
 }

+ 3 - 3
http/server/HttpServer.h

@@ -1,5 +1,5 @@
-#ifndef HTTP_SERVER_H_
-#define HTTP_SERVER_H_
+#ifndef HV_HTTP_SERVER_H_
+#define HV_HTTP_SERVER_H_
 
 #include "hexport.h"
 #include "HttpService.h"
@@ -63,4 +63,4 @@ HV_EXPORT int http_server_run(http_server_t* server, int wait = 1);
 // NOTE: stop all loops and join all threads
 HV_EXPORT int http_server_stop(http_server_t* server);
 
-#endif
+#endif // HV_HTTP_SERVER_H_

+ 18 - 11
http/server/HttpService.cpp

@@ -2,7 +2,7 @@
 
 #include "hbase.h" // import strendswith
 
-void HttpService::AddApi(const char* path, http_method method, http_api_handler handler) {
+void HttpService::AddApi(const char* path, http_method method, http_sync_handler handler, http_async_handler async_handler) {
     std::shared_ptr<http_method_handlers> method_handlers = NULL;
     auto iter = api_handlers.find(path);
     if (iter == api_handlers.end()) {
@@ -16,15 +16,16 @@ void HttpService::AddApi(const char* path, http_method method, http_api_handler
     for (auto iter = method_handlers->begin(); iter != method_handlers->end(); ++iter) {
         if (iter->method == method) {
             // update
-            iter->handler = handler;
+            iter->sync_handler = handler;
+            iter->async_handler = async_handler;
             return;
         }
     }
     // add
-    method_handlers->push_back(http_method_handler(method, handler));
+    method_handlers->push_back(http_method_handler(method, handler, async_handler));
 }
 
-int HttpService::GetApi(const char* url, http_method method, http_api_handler* handler) {
+int HttpService::GetApi(const char* url, http_method method, http_sync_handler* handler, http_async_handler* async_handler) {
     // {base_url}/path?query
     const char* s = url;
     const char* b = base_url.c_str();
@@ -38,21 +39,24 @@ int HttpService::GetApi(const char* url, http_method method, http_api_handler* h
     std::string path = std::string(s, e);
     auto iter = api_handlers.find(path);
     if (iter == api_handlers.end()) {
-        *handler = NULL;
+        if (handler) *handler = NULL;
+        if (async_handler) *async_handler = NULL;
         return HTTP_STATUS_NOT_FOUND;
     }
     auto method_handlers = iter->second;
     for (auto iter = method_handlers->begin(); iter != method_handlers->end(); ++iter) {
         if (iter->method == method) {
-            *handler = iter->handler;
+            if (handler) *handler = iter->sync_handler;
+            if (async_handler) *async_handler = iter->async_handler;
             return 0;
         }
     }
-    *handler = NULL;
+    if (handler) *handler = NULL;
+    if (async_handler) *async_handler = NULL;
     return HTTP_STATUS_METHOD_NOT_ALLOWED;
 }
 
-int HttpService::GetApi(HttpRequest* req, http_api_handler* handler) {
+int HttpService::GetApi(HttpRequest* req, http_sync_handler* handler, http_async_handler* async_handler) {
     // {base_url}/path?query
     const char* s = req->path.c_str();
     const char* b = base_url.c_str();
@@ -109,17 +113,20 @@ int HttpService::GetApi(HttpRequest* req, http_api_handler* handler) {
                         // RESTful /:field/ => req->query_params[field]
                         req->query_params[param.first] = param.second;
                     }
-                    *handler = iter->handler;
+                    if (handler) *handler = iter->sync_handler;
+                    if (async_handler) *async_handler = iter->async_handler;
                     return 0;
                 }
             }
 
             if (params.size() == 0) {
-                *handler = NULL;
+                if (handler) *handler = NULL;
+                if (async_handler) *async_handler = NULL;
                 return HTTP_STATUS_METHOD_NOT_ALLOWED;
             }
         }
     }
-    *handler = NULL;
+    if (handler) *handler = NULL;
+    if (async_handler) *async_handler = NULL;
     return HTTP_STATUS_NOT_FOUND;
 }

+ 75 - 32
http/server/HttpService.h

@@ -1,5 +1,5 @@
-#ifndef HTTP_SERVICE_H_
-#define HTTP_SERVICE_H_
+#ifndef HV_HTTP_SERVICE_H_
+#define HV_HTTP_SERVICE_H_
 
 #include <string>
 #include <map>
@@ -9,42 +9,48 @@
 
 #include "hexport.h"
 #include "HttpMessage.h"
+#include "HttpResponseWriter.h"
 
 #define DEFAULT_BASE_URL        "/v1/api"
 #define DEFAULT_DOCUMENT_ROOT   "/var/www/html"
 #define DEFAULT_HOME_PAGE       "index.html"
+#define DEFAULT_ERROR_PAGE      "error.html"
 
 /*
- * @param[in] req: parsed structured http request
- * @param[out] res: structured http response
- * @return  0: handle continue
- *          http_status_code: handle done
+ * @param[in]  req:  parsed structured http request
+ * @param[out] resp: structured http response
+ * @return  0:                  handle continue
+ *          http_status_code:   handle done
  */
-// typedef int (*http_api_handler)(HttpRequest* req, HttpResponse* res);
-// NOTE: use std::function/std::bind is more convenient and more flexible.
-typedef std::function<int(HttpRequest* req, HttpResponse* resp)> http_api_handler;
+typedef std::function<int(HttpRequest* req, HttpResponse* resp)>                            http_sync_handler;
+typedef std::function<void(const HttpRequestPtr& req, const HttpResponseWriterPtr& writer)> http_async_handler;
 
 struct http_method_handler {
     http_method         method;
-    http_api_handler    handler;
-    http_method_handler(http_method m = HTTP_POST, http_api_handler h = NULL) {
+    http_sync_handler   sync_handler;
+    http_async_handler  async_handler;
+    http_method_handler(http_method m = HTTP_POST,
+                        http_sync_handler s = NULL,
+                        http_async_handler a = NULL)
+    {
         method = m;
-        handler = h;
+        sync_handler = s;
+        async_handler = a;
     }
 };
-// method => http_api_handler
-typedef std::list<http_method_handler> http_method_handlers;
+// method => http_sync_handler
+typedef std::list<http_method_handler>                                  http_method_handlers;
 // path => http_method_handlers
-typedef std::map<std::string, std::shared_ptr<http_method_handlers>> http_api_handlers;
+typedef std::map<std::string, std::shared_ptr<http_method_handlers>>    http_api_handlers;
 
 struct HV_EXPORT HttpService {
-    // preprocessor -> api -> web -> postprocessor
-    http_api_handler    preprocessor;
-    http_api_handler    postprocessor;
+    // preprocessor -> api service -> file service -> indexof service -> postprocessor
+    http_sync_handler   preprocessor;
+    http_sync_handler   postprocessor;
     // api service (that is http.APIServer)
     std::string         base_url;
     http_api_handlers   api_handlers;
-    // web service (that is http.FileServer)
+    // file service (that is http.FileServer)
     std::string document_root;
     std::string home_page;
     std::string error_page;
@@ -57,13 +63,14 @@ struct HV_EXPORT HttpService {
         // base_url = DEFAULT_BASE_URL;
         document_root = DEFAULT_DOCUMENT_ROOT;
         home_page = DEFAULT_HOME_PAGE;
+        // error_page = DEFAULT_ERROR_PAGE;
     }
 
-    void AddApi(const char* path, http_method method, http_api_handler handler);
+    void AddApi(const char* path, http_method method, http_sync_handler handler = NULL, http_async_handler async_handler = NULL);
     // @retval 0 OK, else HTTP_STATUS_NOT_FOUND, HTTP_STATUS_METHOD_NOT_ALLOWED
-    int GetApi(const char* url, http_method method, http_api_handler* handler);
+    int GetApi(const char* url, http_method method, http_sync_handler* handler = NULL, http_async_handler* async_handler = NULL);
     // RESTful API /:field/ => req->query_params["field"]
-    int GetApi(HttpRequest* req, http_api_handler* handler);
+    int GetApi(HttpRequest* req, http_sync_handler* handler = NULL, http_async_handler* async_handler = NULL);
 
     StringList Paths() {
         StringList paths;
@@ -74,36 +81,72 @@ struct HV_EXPORT HttpService {
     }
 
     // github.com/gin-gonic/gin
-    void Handle(const char* httpMethod, const char* relativePath, http_api_handler handlerFunc) {
-        AddApi(relativePath, http_method_enum(httpMethod), handlerFunc);
+    void Handle(const char* httpMethod, const char* relativePath, http_sync_handler handlerFunc) {
+        AddApi(relativePath, http_method_enum(httpMethod), handlerFunc, NULL);
+    }
+    void Handle(const char* httpMethod, const char* relativePath, http_async_handler handlerFunc) {
+        AddApi(relativePath, http_method_enum(httpMethod), NULL, handlerFunc);
     }
 
-    void HEAD(const char* relativePath, http_api_handler handlerFunc) {
+    // HEAD
+    void HEAD(const char* relativePath, http_sync_handler handlerFunc) {
+        Handle("HEAD", relativePath, handlerFunc);
+    }
+    void HEAD(const char* relativePath, http_async_handler handlerFunc) {
         Handle("HEAD", relativePath, handlerFunc);
     }
 
-    void GET(const char* relativePath, http_api_handler handlerFunc) {
+    // GET
+    void GET(const char* relativePath, http_sync_handler handlerFunc) {
+        Handle("GET", relativePath, handlerFunc);
+    }
+    void GET(const char* relativePath, http_async_handler handlerFunc) {
         Handle("GET", relativePath, handlerFunc);
     }
 
-    void POST(const char* relativePath, http_api_handler handlerFunc) {
+    // POST
+    void POST(const char* relativePath, http_sync_handler handlerFunc) {
+        Handle("POST", relativePath, handlerFunc);
+    }
+    void POST(const char* relativePath, http_async_handler handlerFunc) {
         Handle("POST", relativePath, handlerFunc);
     }
 
-    void PUT(const char* relativePath, http_api_handler handlerFunc) {
+    // PUT
+    void PUT(const char* relativePath, http_sync_handler handlerFunc) {
+        Handle("PUT", relativePath, handlerFunc);
+    }
+    void PUT(const char* relativePath, http_async_handler handlerFunc) {
         Handle("PUT", relativePath, handlerFunc);
     }
 
+    // DELETE
     // NOTE: Windows <winnt.h> #define DELETE as a macro, we have to replace DELETE with Delete.
-    void Delete(const char* relativePath, http_api_handler handlerFunc) {
+    void Delete(const char* relativePath, http_sync_handler handlerFunc) {
+        Handle("DELETE", relativePath, handlerFunc);
+    }
+    void Delete(const char* relativePath, http_async_handler handlerFunc) {
         Handle("DELETE", relativePath, handlerFunc);
     }
 
-    void PATCH(const char* relativePath, http_api_handler handlerFunc) {
+    // PATCH
+    void PATCH(const char* relativePath, http_sync_handler handlerFunc) {
+        Handle("PATCH", relativePath, handlerFunc);
+    }
+    void PATCH(const char* relativePath, http_async_handler handlerFunc) {
         Handle("PATCH", relativePath, handlerFunc);
     }
 
-    void Any(const char* relativePath, http_api_handler handlerFunc) {
+    // Any
+    void Any(const char* relativePath, http_sync_handler handlerFunc) {
+        Handle("HEAD", relativePath, handlerFunc);
+        Handle("GET", relativePath, handlerFunc);
+        Handle("POST", relativePath, handlerFunc);
+        Handle("PUT", relativePath, handlerFunc);
+        Handle("DELETE", relativePath, handlerFunc);
+        Handle("PATCH", relativePath, handlerFunc);
+    }
+    void Any(const char* relativePath, http_async_handler handlerFunc) {
         Handle("HEAD", relativePath, handlerFunc);
         Handle("GET", relativePath, handlerFunc);
         Handle("POST", relativePath, handlerFunc);
@@ -113,4 +156,4 @@ struct HV_EXPORT HttpService {
     }
 };
 
-#endif // HTTP_SERVICE_H_
+#endif // HV_HTTP_SERVICE_H_

+ 3 - 3
http/server/http_page.h

@@ -1,5 +1,5 @@
-#ifndef HTTP_PAGE_H_
-#define HTTP_PAGE_H_
+#ifndef HV_HTTP_PAGE_H_
+#define HV_HTTP_PAGE_H_
 
 #include <string>
 
@@ -41,4 +41,4 @@ void make_http_status_page(http_status status_code, std::string& page);
  */
 void make_index_of_page(const char* dir, std::string& page, const char* url = "");
 
-#endif // HTTP_PAGE_H_
+#endif // HV_HTTP_PAGE_H_