瀏覽代碼

Impl HttpService::Proxy

ithewei 3 年之前
父節點
當前提交
b06f15241f
共有 9 個文件被更改,包括 213 次插入33 次删除
  1. 1 1
      README-CN.md
  2. 1 1
      README.md
  3. 19 2
      event/nio.c
  4. 18 0
      examples/http_server_test.cpp
  5. 102 4
      http/server/HttpHandler.cpp
  6. 9 1
      http/server/HttpHandler.h
  7. 30 22
      http/server/HttpServer.cpp
  8. 15 0
      http/server/HttpService.cpp
  9. 18 2
      http/server/HttpService.h

+ 1 - 1
README-CN.md

@@ -55,7 +55,7 @@
 - 可靠UDP支持: WITH_KCP
 - SSL/TLS加密通信(可选WITH_OPENSSL、WITH_GNUTLS、WITH_MBEDTLS)
 - HTTP服务端/客户端(支持https http1/x http2 grpc)
-- HTTP支持静态文件服务、目录服务、同步/异步API处理函数
+- HTTP支持静态文件服务、目录服务、代理服务、同步/异步API处理函数
 - HTTP支持RESTful风格、URI路由、keep-alive长连接、chunked分块等特性
 - WebSocket服务端/客户端
 - MQTT客户端

+ 1 - 1
README.md

@@ -31,7 +31,7 @@ but simpler api and richer protocols.
 - RUDP support: WITH_KCP
 - SSL/TLS support: (via WITH_OPENSSL or WITH_GNUTLS or WITH_MBEDTLS)
 - HTTP client/server (support https http1/x http2 grpc)
-- HTTP static file service, indexof service, sync/async API handler
+- HTTP supports static service, indexof service, proxy service, sync/async API handler
 - HTTP supports RESTful, URI router, keep-alive, chunked, etc.
 - WebSocket client/server
 - MQTT client

+ 19 - 2
event/nio.c

@@ -226,6 +226,23 @@ connect_error:
     hio_close(io);
 }
 
+static void nio_connect_event_cb(hevent_t* ev) {
+    hio_t* io = (hio_t*)ev->userdata;
+    uint32_t id = (uintptr_t)ev->privdata;
+    if (io->id != id) return;
+    nio_connect(io);
+}
+
+static int nio_connect_async(hio_t* io) {
+    hevent_t ev;
+    memset(&ev, 0, sizeof(ev));
+    ev.cb = nio_connect_event_cb;
+    ev.userdata = io;
+    ev.privdata = (void*)(uintptr_t)io->id;
+    hloop_post_event(io->loop, &ev);
+    return 0;
+}
+
 static int __nio_read(hio_t* io, void* buf, int len) {
     int nread = 0;
     switch (io->io_type) {
@@ -431,12 +448,12 @@ int hio_connect(hio_t* io) {
 #endif
         perror("connect");
         io->error = socket_errno();
-        hio_close(io);
+        hio_close_async(io);
         return ret;
     }
     if (ret == 0) {
         // connect ok
-        nio_connect(io);
+        nio_connect_async(io);
         return 0;
     }
     int timeout = io->connect_timeout ? io->connect_timeout : HIO_DEFAULT_CONNECT_TIMEOUT;

+ 18 - 0
examples/http_server_test.cpp

@@ -32,20 +32,30 @@ int main(int argc, char** argv) {
     if (port == 0) port = 8080;
 
     HttpService router;
+
+    // curl -v http://ip:port/
     router.Static("/", "./html");
+
+    // curl -v http://ip:port/proxy/get
+    router.Proxy("/proxy/", "http://httpbin.org/");
+
+    // curl -v http://ip:port/ping
     router.GET("/ping", [](HttpRequest* req, HttpResponse* resp) {
         return resp->String("pong");
     });
 
+    // curl -v http://ip:port/data
     router.GET("/data", [](HttpRequest* req, HttpResponse* resp) {
         static char data[] = "0123456789";
         return resp->Data(data, 10 /*, false */);
     });
 
+    // curl -v http://ip:port/paths
     router.GET("/paths", [&router](HttpRequest* req, HttpResponse* resp) {
         return resp->Json(router.Paths());
     });
 
+    // curl -v http://ip:port/get?env=1
     router.GET("/get", [](HttpRequest* req, HttpResponse* resp) {
         resp->json["origin"] = req->client_addr.ip;
         resp->json["url"] = req->url;
@@ -54,10 +64,18 @@ int main(int argc, char** argv) {
         return 200;
     });
 
+    // curl -v http://ip:port/echo -d "hello,world!"
     router.POST("/echo", [](const HttpContextPtr& ctx) {
         return ctx->send(ctx->body(), ctx->type());
     });
 
+    // curl -v http://ip:port/user/123
+    router.GET("/user/{id}", [](const HttpContextPtr& ctx) {
+        hv::Json resp;
+        resp["id"] = ctx->param("id");
+        return ctx->send(resp.dump(2));
+    });
+
     http_server_t server;
     server.service = &router;
     server.port = port;

+ 102 - 4
http/server/HttpHandler.cpp

@@ -4,6 +4,7 @@
 #include "herr.h"
 #include "hlog.h"
 #include "htime.h"
+#include "hurl.h"
 #include "hasync.h" // import hv::async for http_async_handler
 #include "http_page.h"
 
@@ -182,6 +183,31 @@ void HttpHandler::onHeadersComplete() {
     pReq->scheme = ssl ? "https" : "http";
     pReq->client_addr.ip = ip;
     pReq->client_addr.port = port;
+
+    // keepalive
+    keepalive = pReq->IsKeepAlive();
+
+    // NOTE: Detect proxy before ParseUrl
+    proxy = 0;
+    if (hv::startswith(pReq->url, "http")) {
+        // forward proxy
+        proxy = 1;
+        auto iter = pReq->headers.find("Proxy-Connection");
+        if (iter != pReq->headers.end()) {
+            const char* keepalive_value = iter->second.c_str();
+            if (stricmp(keepalive_value, "keep-alive") == 0) {
+                keepalive = true;
+            }
+            else if (stricmp(keepalive_value, "close") == 0) {
+                keepalive = false;
+            }
+            else if (stricmp(keepalive_value, "upgrade") == 0) {
+                keepalive = true;
+            }
+        }
+    }
+
+    // printf("url=%s\n", pReq->url.c_str());
     pReq->ParseUrl();
 
     if (service->api_handlers.size() != 0) {
@@ -197,14 +223,86 @@ void HttpHandler::onHeadersComplete() {
             }
         };
     } else {
-        // TODO: forward proxy
-        // TODO: reverse proxy
-        // TODO: rewrite
         // NOTE: not hook http_cb
-        req->http_cb = NULL;
+        pReq->http_cb = NULL;
+
+        if (!proxy && service->proxies.size() != 0) {
+            // reverse proxy
+            std::string proxy_url = service->GetProxyUrl(pReq->path.c_str());
+            if (!proxy_url.empty()) {
+                proxy = 1;
+                pReq->url = proxy_url;
+            }
+        }
+
+        if (proxy) {
+            proxyConnect(pReq->url);
+        } else {
+            // TODO: rewrite
+        }
     }
 }
 
+void HttpHandler::onProxyConnect(hio_t* upstream_io) {
+    // printf("onProxyConnect\n");
+    HttpHandler* handler = (HttpHandler*)hevent_userdata(upstream_io);
+    hio_t* io = hio_get_upstream(upstream_io);
+    assert(handler != NULL && io != NULL);
+
+    HttpRequest* req = handler->req.get();
+    // NOTE: send head + received body
+    req->headers.erase("Proxy-Connection");
+    req->headers["Connection"] = handler->keepalive ? "keep-alive" : "close";
+    req->headers["X-Origin-IP"] = handler->ip;
+    std::string msg = req->Dump(true, true);
+    // printf("%s\n", msg.c_str());
+    hio_write(upstream_io, msg.c_str(), msg.size());
+
+    // NOTE: start recv body continue then upstream
+    hio_setcb_read(io, hio_write_upstream);
+    hio_read_start(io);
+    hio_setcb_read(upstream_io, hio_write_upstream);
+    hio_read_start(upstream_io);
+}
+
+int HttpHandler::proxyConnect(const std::string& strUrl) {
+    if (!writer) return ERR_NULL_POINTER;
+    hio_t* io = writer->io();
+    hloop_t* loop = hevent_loop(io);
+
+    HUrl url;
+    if (!url.parse(strUrl)) {
+        return ERR_PARSE;
+    }
+
+    hlogi("proxy_pass %s", strUrl.c_str());
+    hio_t* upstream_io = hio_create_socket(loop, url.host.c_str(), url.port, HIO_TYPE_TCP, HIO_CLIENT_SIDE);
+    if (upstream_io == NULL) {
+        hio_close_async(io);
+        return ERR_SOCKET;
+    }
+    if (url.scheme == "https") {
+        hio_enable_ssl(upstream_io);
+    }
+    hevent_set_userdata(upstream_io, this);
+    hio_setup_upstream(io, upstream_io);
+    hio_setcb_connect(upstream_io, HttpHandler::onProxyConnect);
+    hio_setcb_close(upstream_io, hio_close_upstream);
+    if (service->proxy_connect_timeout > 0) {
+        hio_set_connect_timeout(upstream_io, service->proxy_connect_timeout);
+    }
+    if (service->proxy_read_timeout > 0) {
+        hio_set_read_timeout(io, service->proxy_read_timeout);
+    }
+    if (service->proxy_write_timeout > 0) {
+        hio_set_write_timeout(io, service->proxy_write_timeout);
+    }
+    hio_connect(upstream_io);
+    // NOTE: wait upstream_io connected then start read
+    hio_read_stop(io);
+    return 0;
+}
+
 int HttpHandler::HandleHttpRequest() {
     // preprocessor -> processor -> postprocessor
     int status_code = HTTP_STATUS_OK;

+ 9 - 1
http/server/HttpHandler.h

@@ -28,8 +28,12 @@ public:
         SEND_DONE,
     } state;
 
+    // flags
+    unsigned ssl:           1;
+    unsigned keepalive:     1;
+    unsigned proxy:         1;
+
     // peeraddr
-    bool                    ssl;
     char                    ip[64];
     int                     port;
 
@@ -101,6 +105,10 @@ private:
     void initRequest();
     void onHeadersComplete();
 
+    // proxy
+    int proxyConnect(const std::string& url);
+    static void onProxyConnect(hio_t* upstream_io);
+
     int defaultRequestHandler();
     int defaultStaticHandler();
     int defaultLargeFileHandler();

+ 30 - 22
http/server/HttpServer.cpp

@@ -70,18 +70,23 @@ static void on_recv(hio_t* io, void* _buf, int readbytes) {
         return;
     }
 
+    hloop_t* loop = hevent_loop(io);
+    HttpParser* parser = handler->parser.get();
+    HttpRequest* req = handler->req.get();
+    HttpResponse* resp = handler->resp.get();
+
+    if (handler->proxy) {
+        return;
+    }
+
     if (protocol == HttpHandler::WEBSOCKET) {
         return;
     }
 
-    HttpParser* parser = handler->parser.get();
     if (parser->WantRecv()) {
         return;
     }
 
-    HttpRequest* req = handler->req.get();
-    HttpResponse* resp = handler->resp.get();
-
     // Server:
     static char s_Server[64] = {'\0'};
     if (s_Server[0] == '\0') {
@@ -90,12 +95,8 @@ static void on_recv(hio_t* io, void* _buf, int readbytes) {
     resp->headers["Server"] = s_Server;
 
     // Connection:
-    bool keepalive = req->IsKeepAlive();
-    if (keepalive) {
-        resp->headers["Connection"] = "keep-alive";
-    } else {
-        resp->headers["Connection"] = "close";
-    }
+    bool keepalive = handler->keepalive;
+    resp->headers["Connection"] = keepalive ? "keep-alive" : "close";
 
     // Upgrade:
     bool upgrade = false;
@@ -161,7 +162,6 @@ static void on_recv(hio_t* io, void* _buf, int readbytes) {
     }
 
     // LOG
-    hloop_t* loop = hevent_loop(io);
     hlogi("[%ld-%ld][%s:%d][%s %s]=>[%d %s]",
         hloop_pid(loop), hloop_tid(loop),
         handler->ip, handler->port,
@@ -187,18 +187,24 @@ static void on_recv(hio_t* io, void* _buf, int readbytes) {
 
 static void on_close(hio_t* io) {
     HttpHandler* handler = (HttpHandler*)hevent_userdata(io);
-    if (handler) {
-        if (handler->protocol == HttpHandler::WEBSOCKET) {
-            // onclose
-            handler->WebSocketOnClose();
-        } else {
-            if (handler->writer && handler->writer->onclose) {
-                handler->writer->onclose();
-            }
+    if (handler == NULL) return;
+
+    // close proxy
+    if (handler->proxy) {
+        hio_close_upstream(io);
+    }
+
+    // onclose
+    if (handler->protocol == HttpHandler::WEBSOCKET) {
+        handler->WebSocketOnClose();
+    } else {
+        if (handler->writer && handler->writer->onclose) {
+            handler->writer->onclose();
         }
-        hevent_set_userdata(io, NULL);
-        delete handler;
     }
+
+    hevent_set_userdata(io, NULL);
+    delete handler;
 }
 
 static void on_accept(hio_t* io) {
@@ -216,7 +222,9 @@ static void on_accept(hio_t* io) {
     hio_setcb_close(io, on_close);
     hio_setcb_read(io, on_recv);
     hio_read(io);
-    hio_set_keepalive_timeout(io, service->keepalive_timeout);
+    if (service->keepalive_timeout > 0) {
+        hio_set_keepalive_timeout(io, service->keepalive_timeout);
+    }
 
     // new HttpHandler, delete on_close
     HttpHandler* handler = new HttpHandler;

+ 15 - 0
http/server/HttpService.cpp

@@ -155,4 +155,19 @@ std::string HttpService::GetStaticFilepath(const char* path) {
     return filepath;
 }
 
+void HttpService::Proxy(const char* path, const char* url) {
+    proxies[path] = url;
+}
+
+std::string HttpService::GetProxyUrl(const char* path) {
+    std::string url;
+    for (auto iter = proxies.begin(); iter != proxies.end(); ++iter) {
+        if (hv_strstartswith(path, iter->first.c_str())) {
+            url = iter->second + (path + iter->first.length());
+            break;
+        }
+    }
+    return url;
+}
+
 }

+ 18 - 2
http/server/HttpService.h

@@ -109,7 +109,7 @@ struct HV_EXPORT HttpService {
     http_handler        processor;
     http_handler        postprocessor;
 
-    // api service (that is http.APIServer)
+    // api service (that is http.ApiServer)
     std::string         base_url;
     http_api_handlers   api_handlers;
 
@@ -119,12 +119,19 @@ struct HV_EXPORT HttpService {
     std::string     document_root;
     std::string     home_page;
     std::string     error_page;
-    // location => root
+    // nginx: location => root
     std::map<std::string, std::string, std::greater<std::string>> staticDirs;
     // indexof service (that is http.DirectoryServer)
     std::string     index_of;
     http_handler    errorHandler;
 
+    // proxy service (that is http.ProxyServer)
+    // nginx: location => proxy_pass
+    std::map<std::string, std::string, std::greater<std::string>> proxies;
+    int proxy_connect_timeout;
+    int proxy_read_timeout;
+    int proxy_write_timeout;
+
     // options
     int keepalive_timeout;
     int max_file_cache_size;        // cache small file
@@ -146,6 +153,10 @@ struct HV_EXPORT HttpService {
         // error_page = DEFAULT_ERROR_PAGE;
         // index_of = DEFAULT_INDEXOF_DIR;
 
+        proxy_connect_timeout = DEFAULT_CONNECT_TIMEOUT;
+        proxy_read_timeout = 0;
+        proxy_write_timeout = 0;
+
         keepalive_timeout = DEFAULT_KEEPALIVE_TIMEOUT;
         max_file_cache_size = MAX_FILE_CACHE_SIZE;
         file_cache_stat_interval = DEFAULT_FILE_CACHE_STAT_INTERVAL;
@@ -164,6 +175,11 @@ struct HV_EXPORT HttpService {
     // @retval / => /var/www/html/index.html
     std::string GetStaticFilepath(const char* path);
 
+    // Proxy("/api/v1/", "http://www.httpbin.org/");
+    void Proxy(const char* path, const char* url);
+    // @retval /api/v1/test => http://www.httpbin.org/test
+    std::string GetProxyUrl(const char* path);
+
     hv::StringList Paths() {
         hv::StringList paths;
         for (auto& pair : api_handlers) {