Explorar el Código

refactor HttpHandler for proxy and upgrade

ithewei hace 2 años
padre
commit
75ae0bd5a8
Se han modificado 2 ficheros con 385 adiciones y 205 borrados
  1. 343 192
      http/server/HttpHandler.cpp
  2. 42 13
      http/server/HttpHandler.h

+ 343 - 192
http/server/HttpHandler.cpp

@@ -27,10 +27,13 @@ HttpHandler::HttpHandler(hio_t* io) :
     state(WANT_RECV),
     error(0),
     // flags
-    ssl(false),
-    keepalive(true),
-    proxy(false),
-    upgrade(false),
+    ssl(0),
+    keepalive(1),
+    upgrade(0),
+    proxy(0),
+    proxy_connected(0),
+    forward_proxy(0),
+    reverse_proxy(0),
     ip{'\0'},
     port(0),
     pid(0),
@@ -45,7 +48,9 @@ HttpHandler::HttpHandler(hio_t* io) :
     last_recv_pong_time(0),
     // for sendfile
     files(NULL),
-    file(NULL)
+    file(NULL),
+    // for proxy
+    proxy_port(0)
 {
     // Init();
 }
@@ -81,30 +86,22 @@ bool HttpHandler::Init(int http_version) {
     parser->InitRequest(req.get());
     // NOTE: hook http_cb
     req->http_cb = [this](HttpMessage* msg, http_parser_state state, const char* data, size_t size) {
-        if (this->state == WANT_CLOSE || this->error != 0) return;
+        if (this->state == WANT_CLOSE) return;
         switch (state) {
         case HP_HEADERS_COMPLETE:
+            if (this->error != 0) return;
             onHeadersComplete();
             break;
         case HP_BODY:
-            if (api_handler && api_handler->state_handler) {
-                break;
-            }
-            msg->body.append(data, size);
-            return;
+            if (this->error != 0) return;
+            onBody(data, size);
+            break;
         case HP_MESSAGE_COMPLETE:
-            if (proxy) {
-                break;
-            }
             onMessageComplete();
-            return;
+            break;
         default:
             break;
         }
-
-        if (api_handler && api_handler->state_handler) {
-            api_handler->state_handler(getHttpContext(), state, data, size);
-        }
     };
     return true;
 }
@@ -130,11 +127,16 @@ void HttpHandler::Close() {
         writer->status = hv::SocketChannel::DISCONNECTED;
     }
 
-    // close proxy
-    if (proxy) {
-        if (io) hio_close_upstream(io);
+    if (api_handler && api_handler->state_handler) {
+        if (parser && !parser->IsComplete()) {
+            api_handler->state_handler(context(), HP_ERROR, NULL, 0);
+        }
+        return;
     }
 
+    // close proxy
+    closeProxy();
+
     // close file
     closeFile();
 
@@ -210,7 +212,7 @@ bool HttpHandler::SwitchWebSocket() {
     return true;
 }
 
-const HttpContextPtr& HttpHandler::getHttpContext() {
+const HttpContextPtr& HttpHandler::context() {
     if (!ctx) {
         ctx = std::make_shared<hv::HttpContext>();
         ctx->service = service;
@@ -236,15 +238,100 @@ int HttpHandler::invokeHttpHandler(const http_handler* handler) {
         status_code = HTTP_STATUS_NEXT;
     } else if (handler->ctx_handler) {
         // NOTE: ctx_handler run on IO thread, you can easily post HttpContextPtr to your consumer thread for processing.
-        status_code = handler->ctx_handler(getHttpContext());
+        status_code = handler->ctx_handler(context());
     } else if (handler->state_handler) {
-        status_code = handler->state_handler(getHttpContext(), HP_MESSAGE_COMPLETE, NULL, 0);
+        status_code = handler->state_handler(context(), HP_MESSAGE_COMPLETE, NULL, 0);
     }
     return status_code;
 }
 
 void HttpHandler::onHeadersComplete() {
     // printf("onHeadersComplete\n");
+    handleRequestHeaders();
+
+    HttpRequest* pReq = req.get();
+    if (service && service->pathHandlers.size() != 0) {
+        service->GetRoute(pReq, &api_handler);
+    }
+
+    if (api_handler && api_handler->state_handler) {
+        api_handler->state_handler(context(), HP_HEADERS_COMPLETE, NULL, 0);
+        return;
+    }
+
+    if (proxy) {
+        handleProxy();
+        return;
+    }
+
+    // Expect: 100-continue
+    handleExpect100();
+}
+
+void HttpHandler::onBody(const char* data, size_t size) {
+    if (api_handler && api_handler->state_handler) {
+        api_handler->state_handler(context(), HP_BODY, data, size);
+        return;
+    }
+
+    if (proxy && proxy_connected) {
+        if (io) hio_write_upstream(io, (void*)data, size);
+        return;
+    }
+
+    req->body.append(data, size);
+    return;
+}
+
+void HttpHandler::onMessageComplete() {
+    // printf("onMessageComplete\n");
+    int status_code = HTTP_STATUS_OK;
+
+    if (error) {
+        SendHttpStatusResponse(resp->status_code);
+        return;
+    }
+
+    if (proxy) {
+        if (proxy_connected) Reset();
+        return;
+    }
+
+    addResponseHeaders();
+
+    // upgrade ? handleUpgrade : HandleHttpRequest
+    upgrade = 0;
+    auto iter_upgrade = req->headers.find("upgrade");
+    if (iter_upgrade != req->headers.end()) {
+        upgrade = 1;
+        handleUpgrade(iter_upgrade->second.c_str());
+        status_code = resp->status_code;
+    } else {
+        status_code = HandleHttpRequest();
+        if (status_code != HTTP_STATUS_NEXT) {
+            SendHttpResponse();
+        }
+    }
+
+    // access log
+    if (service && service->enable_access_log) {
+        hlogi("[%ld-%ld][%s:%d][%s %s]=>[%d %s]",
+            pid, tid, ip, port,
+            http_method_str(req->method), req->path.c_str(),
+            resp->status_code, resp->status_message());
+    }
+
+    if (status_code != HTTP_STATUS_NEXT) {
+        // keepalive ? Reset : Close
+        if (keepalive) {
+            Reset();
+        } else {
+            state = WANT_CLOSE;
+        }
+    }
+}
+
+void HttpHandler::handleRequestHeaders() {
     HttpRequest* pReq = req.get();
     pReq->scheme = ssl ? "https" : "http";
     pReq->client_addr.ip = ip;
@@ -253,11 +340,12 @@ void HttpHandler::onHeadersComplete() {
     // keepalive
     keepalive = pReq->IsKeepAlive();
 
-    // NOTE: Detect proxy before ParseUrl
-    bool proxy = false;
+    // proxy
+    proxy = 0;
     if (hv::startswith(pReq->url, "http")) {
         // forward proxy
-        proxy = true;
+        proxy = 1;
+        forward_proxy = 1;
         auto iter = pReq->headers.find("Proxy-Connection");
         if (iter != pReq->headers.end()) {
             const char* keepalive_value = iter->second.c_str();
@@ -276,150 +364,39 @@ void HttpHandler::onHeadersComplete() {
     // printf("url=%s\n", pReq->url.c_str());
     pReq->ParseUrl();
 
-    if (service && service->pathHandlers.size() != 0) {
-        service->GetRoute(pReq, &api_handler);
-    }
-
-    if (api_handler && api_handler->state_handler && writer) {
-        writer->onclose = [this](){
-            // HP_ERROR
-            if (!parser->IsComplete()) {
-                if (api_handler && api_handler->state_handler) {
-                    api_handler->state_handler(getHttpContext(), HP_ERROR, NULL, 0);
-                }
-            }
-        };
-        return;
-    }
-
-    if (proxy) {
-        // forward proxy
-        if (service && service->enable_forward_proxy) {
-            proxyConnect(pReq->url);
-        } else {
-            resp->status_code = HTTP_STATUS_FORBIDDEN;
-            hlogw("Forbidden to forward proxy %s", pReq->url.c_str());
-        }
-        return;
-    }
-
-    if (service && service->proxies.size() != 0) {
+    if (!proxy) {
         // reverse proxy
         std::string proxy_url = service->GetProxyUrl(pReq->path.c_str());
         if (!proxy_url.empty()) {
             pReq->url = proxy_url;
-            proxyConnect(pReq->url);
-            return;
+            proxy = 1;
+            reverse_proxy = 1;
         }
     }
 
+    // TODO: rewrite url
+}
+
+void HttpHandler::handleExpect100() {
     // Expect: 100-continue
-    auto iter = pReq->headers.find("Expect");
-    if (iter != pReq->headers.end() &&
+    auto iter = req->headers.find("Expect");
+    if (iter != req->headers.end() &&
         stricmp(iter->second.c_str(), "100-continue") == 0) {
         if (io) hio_write(io, HTTP_100_CONTINUE_RESPONSE, HTTP_100_CONTINUE_RESPONSE_LEN);
     }
-
-    // TODO: rewrite
 }
 
-void HttpHandler::onMessageComplete() {
-    // printf("onMessageComplete\n");
-    int status_code = 200;
-
+void HttpHandler::addResponseHeaders() {
+    HttpResponse* pResp = resp.get();
     // Server:
     static char s_Server[64] = {'\0'};
     if (s_Server[0] == '\0') {
         snprintf(s_Server, sizeof(s_Server), "httpd/%s", hv_version());
     }
-    resp->headers["Server"] = s_Server;
+    pResp->headers["Server"] = s_Server;
 
     // Connection:
-    resp->headers["Connection"] = keepalive ? "keep-alive" : "close";
-
-    // Upgrade ? SwitchHTTP2 / SwitchWebSocket : HandleHttpRequest -> SendHttpResponse
-    upgrade = false;
-    auto iter_upgrade = req->headers.find("upgrade");
-    if (iter_upgrade != req->headers.end()) {
-        upgrade = true;
-        const char* upgrade_proto = iter_upgrade->second.c_str();
-        hlogi("[%s:%d] Upgrade: %s", ip, port, upgrade_proto);
-        // websocket
-        if (stricmp(upgrade_proto, "websocket") == 0) {
-            /*
-            HTTP/1.1 101 Switching Protocols
-            Connection: Upgrade
-            Upgrade: websocket
-            Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
-            */
-            resp->status_code = HTTP_STATUS_SWITCHING_PROTOCOLS;
-            resp->headers["Connection"] = "Upgrade";
-            resp->headers["Upgrade"] = "websocket";
-            auto iter_key = req->headers.find(SEC_WEBSOCKET_KEY);
-            if (iter_key != req->headers.end()) {
-                char ws_accept[32] = {0};
-                ws_encode_key(iter_key->second.c_str(), ws_accept);
-                resp->headers[SEC_WEBSOCKET_ACCEPT] = ws_accept;
-            }
-            auto iter_protocol = req->headers.find(SEC_WEBSOCKET_PROTOCOL);
-            if (iter_protocol != req->headers.end()) {
-                hv::StringList subprotocols = hv::split(iter_protocol->second, ',');
-                if (subprotocols.size() > 0) {
-                    hlogw("%s: %s => just select first protocol %s", SEC_WEBSOCKET_PROTOCOL, iter_protocol->second.c_str(), subprotocols[0].c_str());
-                    resp->headers[SEC_WEBSOCKET_PROTOCOL] = subprotocols[0];
-                }
-            }
-            SendHttpResponse();
-            // switch protocol to websocket
-            if (!SwitchWebSocket()) {
-                hloge("[%s:%d] unsupported websocket", ip, port);
-                error = ERR_INVALID_PROTOCOL;
-                return;
-            }
-            // onopen
-            WebSocketOnOpen();
-        }
-        // h2/h2c
-        else if (strnicmp(upgrade_proto, "h2", 2) == 0) {
-            /*
-            HTTP/1.1 101 Switching Protocols
-            Connection: Upgrade
-            Upgrade: h2c
-            */
-            if (io) hio_write(io, HTTP2_UPGRADE_RESPONSE, strlen(HTTP2_UPGRADE_RESPONSE));
-            // switch protocol to http2
-            if (!SwitchHTTP2()) {
-                hloge("[%s:%d] unsupported HTTP2", ip, port);
-                error = ERR_INVALID_PROTOCOL;
-                return;
-            }
-        }
-        else {
-            hloge("[%s:%d] unsupported Upgrade: %s", upgrade_proto);
-            error = ERR_INVALID_PROTOCOL;
-            return;
-        }
-    } else {
-        status_code = HandleHttpRequest();
-    }
-
-    SendHttpResponse();
-
-    // access log
-    if (service && service->enable_access_log) {
-        hlogi("[%ld-%ld][%s:%d][%s %s]=>[%d %s]",
-            pid, tid, ip, port,
-            http_method_str(req->method), req->path.c_str(),
-            resp->status_code, resp->status_message());
-    }
-
-    if (status_code != HTTP_STATUS_NEXT) {
-        if (keepalive) {
-            Reset();
-        } else {
-            state = WANT_CLOSE;
-        }
-    }
+    pResp->headers["Connection"] = keepalive ? "keep-alive" : "close";
 }
 
 int HttpHandler::HandleHttpRequest() {
@@ -488,7 +465,6 @@ postprocessor:
         state = HANDLE_CONTINUE;
     } else {
         state = HANDLE_END;
-        parser->SubmitResponse(resp.get());
     }
     return status_code;
 }
@@ -825,15 +801,18 @@ return_header:
             return 0;
         }
     } else if (protocol == HTTP_V2) {
-        return parser->GetSendData(data, len);
+        int ret = parser->GetSendData(data, len);
+        if (ret == 0) state = SEND_DONE;
+        return ret;
     }
     return 0;
 }
 
-int HttpHandler::SendHttpResponse() {
-    if (!io) return -1;
+int HttpHandler::SendHttpResponse(bool submit) {
+    if (!io || !parser) return -1;
     char* data = NULL;
     size_t len = 0, total_len = 0;
+    if (submit) parser->SubmitResponse(resp.get());
     while (GetSendData(&data, &len)) {
         // printf("GetSendData %d\n", (int)len);
         if (data && len) {
@@ -845,11 +824,15 @@ int HttpHandler::SendHttpResponse() {
 }
 
 int HttpHandler::SendHttpStatusResponse(http_status status_code) {
+    if (state > WANT_SEND) return 0;
     resp->status_code = status_code;
+    addResponseHeaders();
+    HandleHttpRequest();
     state = WANT_SEND;
     return SendHttpResponse();
 }
 
+//------------------sendfile--------------------------------------
 int HttpHandler::openFile(const char* filepath) {
     closeFile();
     file = new LargeFile;
@@ -902,59 +885,169 @@ void HttpHandler::closeFile() {
     }
 }
 
-void HttpHandler::onProxyClose(hio_t* upstream_io) {
-    // printf("onProxyClose\n");
-    HttpHandler* handler = (HttpHandler*)hevent_userdata(upstream_io);
-    if (handler == NULL) return;
+//------------------upgrade--------------------------------------
+int HttpHandler::handleUpgrade(const char* upgrade_protocol) {
+    hlogi("[%s:%d] Upgrade: %s", ip, port, upgrade_protocol);
 
-    hevent_set_userdata(upstream_io, NULL);
+    // websocket
+    if (stricmp(upgrade_protocol, "websocket") == 0) {
+        return upgradeWebSocket();
+    }
 
-    int error = hio_error(upstream_io);
-    if (error == ETIMEDOUT) {
-        handler->SendHttpStatusResponse(HTTP_STATUS_GATEWAY_TIMEOUT);
+    // h2/h2c
+    if (strnicmp(upgrade_protocol, "h2", 2) == 0) {
+        return upgradeHTTP2();
     }
 
-    handler->error = error;
-    hio_close_upstream(upstream_io);
+    hloge("[%s:%d] unsupported Upgrade: %s", upgrade_protocol);
+    return SetError(ERR_INVALID_PROTOCOL);
 }
 
-void HttpHandler::onProxyConnect(hio_t* upstream_io) {
-    // printf("onProxyConnect\n");
-    HttpHandler* handler = (HttpHandler*)hevent_userdata(upstream_io);
-    hio_t* io = hio_get_upstream(upstream_io);
-    assert(handler != NULL && io != NULL);
+int HttpHandler::upgradeWebSocket() {
+    /*
+    HTTP/1.1 101 Switching Protocols
+    Connection: Upgrade
+    Upgrade: websocket
+    Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
+    */
+    resp->status_code = HTTP_STATUS_SWITCHING_PROTOCOLS;
+    resp->headers["Connection"] = "Upgrade";
+    resp->headers["Upgrade"] = "websocket";
+
+    // Sec-WebSocket-Accept:
+    auto iter_key = req->headers.find(SEC_WEBSOCKET_KEY);
+    if (iter_key != req->headers.end()) {
+        char ws_accept[32] = {0};
+        ws_encode_key(iter_key->second.c_str(), ws_accept);
+        resp->headers[SEC_WEBSOCKET_ACCEPT] = ws_accept;
+    }
 
-    HttpRequest* req = handler->req.get();
-    // NOTE: send head + received body
-    req->headers.erase("Proxy-Connection");
-    req->headers["Connection"] = handler->keepalive ? "keep-alive" : "close";
-    req->headers["X-Real-IP"] = handler->ip;
-    std::string msg = req->Dump(true, true);
-    // printf("%s\n", msg.c_str());
-    hio_write(upstream_io, msg.c_str(), msg.size());
+    // Sec-WebSocket-Protocol:
+    auto iter_protocol = req->headers.find(SEC_WEBSOCKET_PROTOCOL);
+    if (iter_protocol != req->headers.end()) {
+        hv::StringList subprotocols = hv::split(iter_protocol->second, ',');
+        if (subprotocols.size() > 0) {
+            hlogw("%s: %s => just select first protocol %s", SEC_WEBSOCKET_PROTOCOL, iter_protocol->second.c_str(), subprotocols[0].c_str());
+            resp->headers[SEC_WEBSOCKET_PROTOCOL] = subprotocols[0];
+        }
+    }
 
-    // NOTE: start recv body continue then upstream
-    hio_setcb_read(io, hio_write_upstream);
-    hio_read_start(io);
-    hio_setcb_read(upstream_io, hio_write_upstream);
-    hio_read_start(upstream_io);
+    SendHttpResponse();
+
+    if (!SwitchWebSocket()) {
+        hloge("[%s:%d] unsupported websocket", ip, port);
+        return SetError(ERR_INVALID_PROTOCOL);
+    }
+
+    // onopen
+    WebSocketOnOpen();
+    return 0;
+}
+
+int HttpHandler::upgradeHTTP2() {
+    /*
+    HTTP/1.1 101 Switching Protocols
+    Connection: Upgrade
+    Upgrade: h2c
+    */
+    resp->status_code = HTTP_STATUS_SWITCHING_PROTOCOLS;
+    resp->headers["Connection"] = "Upgrade";
+    resp->headers["Upgrade"] = "h2c";
+
+    SendHttpResponse();
+
+    if (!SwitchHTTP2()) {
+        hloge("[%s:%d] unsupported HTTP2", ip, port);
+        return SetError(ERR_INVALID_PROTOCOL);
+    }
+
+    // NOTE: send HTTP2_SETTINGS frame
+    SendHttpResponse(false);
+
+    return 0;
+}
+
+//------------------proxy--------------------------------------
+int HttpHandler::handleProxy() {
+    if (forward_proxy) {
+        return handleForwardProxy();
+    }
+
+    if (reverse_proxy) {
+        return handleReverseProxy();
+    }
+
+    return 0;
+}
+
+int HttpHandler::handleForwardProxy() {
+    if (service && service->enable_forward_proxy) {
+        return connectProxy(req->url);
+    } else {
+        proxy = 0;
+        SetError(HTTP_STATUS_FORBIDDEN, HTTP_STATUS_FORBIDDEN);
+        hlogw("Forbidden to forward proxy %s", req->url.c_str());
+    }
+    return 0;
 }
 
-int HttpHandler::proxyConnect(const std::string& strUrl) {
+int HttpHandler::handleReverseProxy() {
+    return connectProxy(req->url);
+}
+
+int HttpHandler::connectProxy(const std::string& strUrl) {
     if (!io) return ERR_NULL_POINTER;
-    proxy = true;
 
     HUrl url;
     url.parse(strUrl);
-    hlogi("proxy_pass %s", strUrl.c_str());
+    hlogi("[%s:%d] proxy_pass %s", ip, port, strUrl.c_str());
+
+    if (proxy_connected) {
+        if (url.host == proxy_host && url.port == proxy_port) {
+            // reuse keepalive connection
+            sendProxyRequest();
+            return 0;
+        } else {
+            // detach and close previous connection
+            hio_t* upstream_io = hio_get_upstream(io);
+            if (upstream_io) {
+                hio_setcb_close(upstream_io, NULL);
+                closeProxy();
+            }
+        }
+    }
+
+    bool allow_proxy = true;
+    if (service && service->trustProxies.size() != 0) {
+        allow_proxy = false;
+        for (const auto& trust_proxy : service->trustProxies) {
+            if (trust_proxy == url.host) {
+                allow_proxy = true;
+                break;
+            }
+        }
+    }
+    if (service && service->noProxies.size() != 0) {
+        for (const auto& no_proxy : service->noProxies) {
+            if (no_proxy == url.host) {
+                allow_proxy = false;
+                break;
+            }
+        }
+    }
+    if (!allow_proxy) {
+        SetError(HTTP_STATUS_FORBIDDEN, HTTP_STATUS_FORBIDDEN);
+        hlogw("Forbidden to proxy %s", url.host.c_str());
+        return 0;
+    }
 
     hloop_t* loop = hevent_loop(io);
-    hio_t* upstream_io = hio_create_socket(loop, url.host.c_str(), url.port, HIO_TYPE_TCP, HIO_CLIENT_SIDE);
+    proxy = 1;
+    proxy_host = url.host;
+    proxy_port = url.port;
+    hio_t* upstream_io = hio_create_socket(loop, proxy_host.c_str(), proxy_port, HIO_TYPE_TCP, HIO_CLIENT_SIDE);
     if (upstream_io == NULL) {
-        SendHttpStatusResponse(HTTP_STATUS_BAD_GATEWAY);
-        hio_close_async(io);
-        error = ERR_SOCKET;
-        return error;
+        return SetError(ERR_SOCKET, HTTP_STATUS_BAD_GATEWAY);
     }
     if (url.scheme == "https") {
         hio_enable_ssl(upstream_io);
@@ -977,3 +1070,61 @@ int HttpHandler::proxyConnect(const std::string& strUrl) {
     hio_read_stop(io);
     return 0;
 }
+
+int HttpHandler::closeProxy() {
+    if (proxy && proxy_connected) {
+        proxy_connected = 0;
+        if (io) hio_close_upstream(io);
+    }
+    return 0;
+}
+
+int HttpHandler::sendProxyRequest() {
+    if (!io || !proxy_connected) return -1;
+
+    req->headers.erase("Host");
+    req->FillHost(proxy_host.c_str(), proxy_port);
+    req->headers.erase("Proxy-Connection");
+    req->headers["Connection"] = keepalive ? "keep-alive" : "close";
+    req->headers["X-Real-IP"] = ip;
+    // NOTE: send head + received body
+    std::string msg = req->Dump(true, true);
+    // printf("%s\n", msg.c_str());
+    req->Reset();
+
+    hio_write_upstream(io, (void*)msg.c_str(), msg.size());
+    if (parser->IsComplete()) state = WANT_SEND;
+    return msg.size();
+}
+
+void HttpHandler::onProxyConnect(hio_t* upstream_io) {
+    // printf("onProxyConnect\n");
+    HttpHandler* handler = (HttpHandler*)hevent_userdata(upstream_io);
+    hio_t* io = hio_get_upstream(upstream_io);
+    assert(handler != NULL && io != NULL);
+    handler->proxy_connected = 1;
+
+    handler->sendProxyRequest();
+
+    // NOTE: start recv body continue then upstream
+    hio_setcb_read(upstream_io, hio_write_upstream);
+    hio_read_start(upstream_io);
+    hio_read_start(io);
+}
+
+void HttpHandler::onProxyClose(hio_t* upstream_io) {
+    // printf("onProxyClose\n");
+    HttpHandler* handler = (HttpHandler*)hevent_userdata(upstream_io);
+    if (handler == NULL) return;
+    handler->proxy_connected = 0;
+
+    hevent_set_userdata(upstream_io, NULL);
+
+    int error = hio_error(upstream_io);
+    if (error == ETIMEDOUT) {
+        handler->SendHttpStatusResponse(HTTP_STATUS_GATEWAY_TIMEOUT);
+    }
+
+    handler->error = error;
+    hio_close_upstream(upstream_io);
+}

+ 42 - 13
http/server/HttpHandler.h

@@ -33,10 +33,13 @@ public:
     int                     error;
 
     // flags
-    unsigned ssl:           1;
-    unsigned keepalive:     1;
-    unsigned proxy:         1;
-    unsigned upgrade:       1;
+    unsigned ssl                :1;
+    unsigned keepalive          :1;
+    unsigned upgrade            :1;
+    unsigned proxy              :1;
+    unsigned proxy_connected    :1;
+    unsigned forward_proxy      :1;
+    unsigned reverse_proxy      :1;
 
     // peeraddr
     char                    ip[64];
@@ -69,11 +72,15 @@ public:
 
     // for sendfile
     FileCache               *files;
-    file_cache_ptr          fc; // cache small file
+    file_cache_ptr          fc;     // cache small file
     struct LargeFile : public HFile {
-        HBuf                buf;
-        uint64_t            timer;
-    } *file; // for large file
+        HBuf        buf;
+        uint64_t    timer;
+    }                       *file;  // for large file
+
+    // for proxy
+    std::string             proxy_host;
+    int                     proxy_port;
 
     HttpHandler(hio_t* io = NULL);
     ~HttpHandler();
@@ -84,8 +91,8 @@ public:
 
     /* @workflow:
      * HttpServer::on_recv -> HttpHandler::FeedRecvData -> Init -> HttpParser::InitRequest -> HttpRequest::http_cb ->
-     * onHeadersComplete -> proxy ? proxyConnect -> hio_setup_upstream :
-     * onMessageComplete -> upgrade ? SwitchHTTP2 / SwitchWebSocket : HandleHttpRequest -> HttpParser::SubmitResponse ->
+     * onHeadersComplete -> proxy ? handleProxy -> connectProxy :
+     * onMessageComplete -> upgrade ? handleUpgrade : HandleHttpRequest -> HttpParser::SubmitResponse ->
      * SendHttpResponse -> while(GetSendData) hio_write ->
      * keepalive ? Reset : Close -> hio_close
      *
@@ -107,7 +114,7 @@ public:
 
     int GetSendData(char** data, size_t* len);
 
-    int SendHttpResponse();
+    int SendHttpResponse(bool submit = true);
     int SendHttpStatusResponse(http_status status_code);
 
     // HTTP2
@@ -128,10 +135,22 @@ public:
         }
     }
 
+    int SetError(int error_code, http_status status_code = HTTP_STATUS_BAD_REQUEST) {
+        error = error_code;
+        if (resp) resp->status_code = status_code;
+        return error;
+    }
+
 private:
-    const HttpContextPtr& getHttpContext();
+    const HttpContextPtr& context();
+    void  handleRequestHeaders();
+    // Expect: 100-continue
+    void  handleExpect100();
+    void  addResponseHeaders();
+
     // http_cb
     void onHeadersComplete();
+    void onBody(const char* data, size_t size);
     void onMessageComplete();
 
     // default handlers
@@ -148,8 +167,18 @@ private:
     void closeFile();
     bool isFileOpened();
 
+    // upgrade
+    int handleUpgrade(const char* upgrade_protocol);
+    int upgradeWebSocket();
+    int upgradeHTTP2();
+
     // proxy
-    int proxyConnect(const std::string& url);
+    int handleProxy();
+    int handleForwardProxy();
+    int handleReverseProxy();
+    int connectProxy(const std::string& url);
+    int closeProxy();
+    int sendProxyRequest();
     static void onProxyConnect(hio_t* upstream_io);
     static void onProxyClose(hio_t* upstream_io);
 };