Browse Source

Add examples/tinyproxyd

ithewei 4 years ago
parent
commit
1a5e017f77
11 changed files with 532 additions and 50 deletions
  1. 4 1
      Makefile
  2. 1 0
      README-CN.md
  3. 1 0
      README.md
  4. 16 4
      event/hevent.c
  5. 2 1
      event/hloop.h
  6. 7 1
      event/nio.c
  7. 4 0
      examples/CMakeLists.txt
  8. 26 19
      examples/tcp_proxy_server.c
  9. 19 13
      examples/tinyhttpd.c
  10. 438 0
      examples/tinyproxyd.c
  11. 14 11
      examples/udp_proxy_server.c

+ 4 - 1
Makefile

@@ -45,7 +45,7 @@ endif
 default: all
 all: libhv examples
 examples: hmain_test htimer_test hloop_test \
-	nc nmap tinyhttpd httpd curl wget wrk consul \
+	nc nmap tinyhttpd tinyproxyd httpd curl wget wrk consul \
 	tcp_echo_server \
 	tcp_chat_server \
 	tcp_proxy_server \
@@ -118,6 +118,9 @@ nc: prepare
 tinyhttpd: prepare
 	$(MAKEF) TARGET=$@ SRCDIRS="$(CORE_SRCDIRS)" SRCS="examples/tinyhttpd.c"
 
+tinyproxyd: prepare
+	$(MAKEF) TARGET=$@ SRCDIRS="$(CORE_SRCDIRS)" SRCS="examples/tinyproxyd.c"
+
 nmap: prepare
 	$(MAKEF) TARGET=$@ SRCDIRS="$(CORE_SRCDIRS) cpputil examples/nmap" DEFINES="PRINT_DEBUG"
 

+ 1 - 0
README-CN.md

@@ -373,6 +373,7 @@ int main() {
 - UDP回显服务:  [examples/udp_echo_server.c](examples/udp_echo_server.c)
 - UDP代理服务:  [examples/udp_proxy_server.c](examples/udp_proxy_server.c)
 - TinyHttpd示例:[examples/tinyhttpd.c](examples/tinyhttpd.c)
+- TinyProxyd示例:[examples/tinyproxyd.c](examples/tinyproxyd.c)
 - jsonRPC示例:  [examples/jsonrpc](examples/jsonrpc)
 - 多accept进程模式: [examples/multi-thread/multi-acceptor-processes.c](examples/multi-thread/multi-acceptor-processes.c)
 - 多accept线程模式: [examples/multi-thread/multi-acceptor-threads.c](examples/multi-thread/multi-acceptor-threads.c)

+ 1 - 0
README.md

@@ -361,6 +361,7 @@ int main() {
 - [examples/udp_echo_server.c](examples/udp_echo_server.c)
 - [examples/udp_proxy_server.c](examples/udp_proxy_server.c)
 - [examples/tinyhttpd.c](examples/tinyhttpd.c)
+- [examples/tinyproxyd.c](examples/tinyproxyd.c)
 - [examples/jsonrpc](examples/jsonrpc)
 - [examples/multi-thread/multi-acceptor-processes.c](examples/multi-thread/multi-acceptor-processes.c)
 - [examples/multi-thread/multi-acceptor-threads.c](examples/multi-thread/multi-acceptor-threads.c)

+ 16 - 4
event/hevent.c

@@ -747,6 +747,16 @@ int hio_read_until_delim(hio_t* io, unsigned char delim) {
     return hio_read_once(io);
 }
 
+int hio_read_remain(hio_t* io) {
+    int remain = io->readbuf.tail - io->readbuf.head;
+    if (remain > 0) {
+        void* buf = io->readbuf.base + io->readbuf.head;
+        io->readbuf.head = io->readbuf.tail = 0;
+        hio_read_cb(io, buf, remain);
+    }
+    return remain;
+}
+
 //-----------------unpack---------------------------------------------
 void hio_set_unpack(hio_t* io, unpack_setting_t* setting) {
     hio_unset_unpack(io);
@@ -814,8 +824,6 @@ void hio_close_upstream(hio_t* io) {
 void hio_setup_upstream(hio_t* io1, hio_t* io2) {
     io1->upstream_io = io2;
     io2->upstream_io = io1;
-    hio_setcb_read(io1, hio_write_upstream);
-    hio_setcb_read(io2, hio_write_upstream);
 }
 
 hio_t* hio_get_upstream(hio_t* io) {
@@ -827,9 +835,12 @@ hio_t* hio_setup_tcp_upstream(hio_t* io, const char* host, int port, int ssl) {
     if (upstream_io == NULL) return NULL;
     if (ssl) hio_enable_ssl(upstream_io);
     hio_setup_upstream(io, upstream_io);
+    hio_setcb_read(io, hio_write_upstream);
+    hio_setcb_read(upstream_io, hio_write_upstream);
     hio_setcb_close(io, hio_close_upstream);
     hio_setcb_close(upstream_io, hio_close_upstream);
-    hconnect(io->loop, upstream_io->fd, hio_read_upstream);
+    hio_setcb_connect(upstream_io, hio_read_upstream);
+    hio_connect(upstream_io);
     return upstream_io;
 }
 
@@ -837,7 +848,8 @@ hio_t* hio_setup_udp_upstream(hio_t* io, const char* host, int port) {
     hio_t* upstream_io = hio_create_socket(io->loop, host, port, HIO_TYPE_UDP, HIO_CLIENT_SIDE);
     if (upstream_io == NULL) return NULL;
     hio_setup_upstream(io, upstream_io);
+    hio_setcb_read(io, hio_write_upstream);
+    hio_setcb_read(upstream_io, hio_write_upstream);
     hio_read_upstream(io);
     return upstream_io;
 }
-

+ 2 - 1
event/hloop.h

@@ -329,7 +329,8 @@ HV_EXPORT int hio_read_once (hio_t* io);
 HV_EXPORT int hio_read_until_length(hio_t* io, unsigned int len);
 // hio_read_once => hread_cb(...delim)
 HV_EXPORT int hio_read_until_delim (hio_t* io, unsigned char delim);
-// @see examples/tinyhttpd.c
+HV_EXPORT int hio_read_remain(hio_t* io);
+// @see examples/tinyhttpd.c examples/tinyproxyd.c
 #define hio_readline(io)        hio_read_until_delim(io, '\n')
 #define hio_readstring(io)      hio_read_until_delim(io, '\0')
 #define hio_readbytes(io, len)  hio_read_until_length(io, len)

+ 7 - 1
event/nio.c

@@ -416,7 +416,13 @@ int hio_read (hio_t* io) {
         hloge("hio_read called but fd[%d] already closed!", io->fd);
         return -1;
     }
-    return hio_add(io, hio_handle_events, HV_READ);
+    hio_add(io, hio_handle_events, HV_READ);
+    if (io->readbuf.tail > io->readbuf.head &&
+        io->unpack_setting == NULL &&
+        io->read_flags == 0) {
+        hio_read_remain(io);
+    }
+    return 0;
 }
 
 int hio_write (hio_t* io, const void* buf, size_t len) {

+ 4 - 0
examples/CMakeLists.txt

@@ -3,6 +3,7 @@ list(APPEND EXAMPLES
     htimer_test
     nc
     tinyhttpd
+    tinyproxyd
     tcp_echo_server
     tcp_chat_server
     tcp_proxy_server
@@ -26,6 +27,9 @@ target_link_libraries(nc ${HV_LIBRARIES})
 add_executable(tinyhttpd tinyhttpd.c)
 target_link_libraries(tinyhttpd ${HV_LIBRARIES})
 
+add_executable(tinyproxyd tinyproxyd.c)
+target_link_libraries(tinyproxyd ${HV_LIBRARIES})
+
 add_executable(tcp_echo_server tcp_echo_server.c)
 target_link_libraries(tcp_echo_server ${HV_LIBRARIES})
 

+ 26 - 19
examples/tcp_proxy_server.c

@@ -3,12 +3,12 @@
  *
  * @build:        make clean && make examples WITH_OPENSSL=yes
  * @http_server:  bin/httpd -s restart -d
- * @proxy_server: bin/tcp_proxy_server 8888 127.0.0.1:8080
- *                bin/tcp_proxy_server 8888 127.0.0.1:8443
- *                bin/tcp_proxy_server 8888 www.baidu.com
- *                bin/tcp_proxy_server 8888 www.baidu.com:443
- * @client:       bin/curl -v 127.0.0.1:8888
- *                bin/nc 127.0.0.1 8888
+ * @proxy_server: bin/tcp_proxy_server 1080 127.0.0.1:8080
+ *                bin/tcp_proxy_server 1080 127.0.0.1:8443
+ *                bin/tcp_proxy_server 1080 www.baidu.com
+ *                bin/tcp_proxy_server 1080 www.baidu.com:443
+ * @client:       bin/curl -v 127.0.0.1:1080
+ *                bin/nc 127.0.0.1 1080
  *                > GET / HTTP/1.1
  *                > Connection: close
  *                > [Enter]
@@ -20,10 +20,14 @@
 #include "hloop.h"
 #include "hsocket.h"
 
-static char proxy_host[64] = "127.0.0.1";
-static int  proxy_port = 80;
+static char proxy_host[64] = "0.0.0.0";
+static int  proxy_port = 1080;
 static int  proxy_ssl = 0;
 
+static char backend_host[64] = "127.0.0.1";
+static int  backend_port = 80;
+static int  backend_ssl = 0;
+
 // hloop_create_tcp_server -> on_accept -> hio_setup_tcp_upstream
 
 static void on_accept(hio_t* io) {
@@ -36,36 +40,39 @@ static void on_accept(hio_t* io) {
             SOCKADDR_STR(hio_peeraddr(io), peeraddrstr));
     */
 
-    if (proxy_port % 1000 == 443) proxy_ssl = 1;
-    hio_setup_tcp_upstream(io, proxy_host, proxy_port, proxy_ssl);
+    if (backend_port % 1000 == 443) backend_ssl = 1;
+    hio_setup_tcp_upstream(io, backend_host, backend_port, backend_ssl);
 }
 
 int main(int argc, char** argv) {
     if (argc < 3) {
-        printf("Usage: %s port proxy_host:proxy_port\n", argv[0]);
+        printf("Usage: %s proxy_port backend_host:backend_port\n", argv[0]);
         return -10;
     }
-    int port = atoi(argv[1]);
+    proxy_port = atoi(argv[1]);
     char* pos = strchr(argv[2], ':');
     if (pos) {
         int len = pos - argv[2];
         if (len > 0) {
-            memcpy(proxy_host, argv[2], len);
-            proxy_host[len] = '\0';
+            memcpy(backend_host, argv[2], len);
+            backend_host[len] = '\0';
         }
-        proxy_port = atoi(pos + 1);
+        backend_port = atoi(pos + 1);
     } else {
-        strncpy(proxy_host, argv[2], sizeof(proxy_host));
+        strncpy(backend_host, argv[2], sizeof(backend_host));
     }
-    if (proxy_port == 0) proxy_port = 80;
-    printf("proxy: [%s:%d]\n", proxy_host, proxy_port);
+    if (backend_port == 0) backend_port = 80;
+    printf("%s:%d proxy %s:%d\n", proxy_host, proxy_port, backend_host, backend_port);
 
     hloop_t* loop = hloop_new(0);
-    hio_t* listenio = hloop_create_tcp_server(loop, "0.0.0.0", port, on_accept);
+    hio_t* listenio = hloop_create_tcp_server(loop, proxy_host, proxy_port, on_accept);
     if (listenio == NULL) {
         return -20;
     }
     printf("listenfd=%d\n", hio_fd(listenio));
+    if (proxy_ssl) {
+        hio_enable_ssl(listenio);
+    }
     hloop_run(loop);
     hloop_free(&loop);
     return 0;

+ 19 - 13
examples/tinyhttpd.c

@@ -1,5 +1,5 @@
 /*
- * tinyhttpd
+ * tinyhttpd tiny http server
  *
  * @build    make examples
  *
@@ -34,7 +34,8 @@ static hloop_t*  accept_loop = NULL;
 static hloop_t** worker_loops = NULL;
 
 #define HTTP_KEEPALIVE_TIMEOUT  60000 // ms
-#define HTTP_HEAD_MAX_LENGTH    1024
+#define HTTP_MAX_URL_LENGTH     256
+#define HTTP_MAX_HEAD_LENGTH    1024
 
 #define HTML_TAG_BEGIN  "<html><body><center><h1>"
 #define HTML_TAG_END    "</h1></center></body></html>"
@@ -67,7 +68,7 @@ typedef struct {
         // request line
         struct {
             char method[32];
-            char path[256];
+            char path[HTTP_MAX_URL_LENGTH];
         };
         // status line
         struct {
@@ -76,11 +77,15 @@ typedef struct {
         };
     };
     // headers
+    char        host[64];
     int         content_length;
     char        content_type[64];
     unsigned    keepalive:  1;
+//  char        head[HTTP_MAX_HEAD_LENGTH];
+//  int         head_len;
     // body
-    const char* body; // body_len = content_length
+    char*       body;
+    int         body_len; // body_len = content_length
 } http_msg_t;
 
 typedef struct {
@@ -126,7 +131,7 @@ static int http_reply(http_conn_t* conn,
             int status_code, const char* status_message,
             const char* content_type,
             const char* body, int body_len) {
-    char stackbuf[HTTP_HEAD_MAX_LENGTH + 1024] = {0};
+    char stackbuf[HTTP_MAX_HEAD_LENGTH + 1024] = {0};
     char* buf = stackbuf;
     int buflen = sizeof(stackbuf);
     http_msg_t* req  = &conn->request;
@@ -134,16 +139,16 @@ static int http_reply(http_conn_t* conn,
     resp->major_version = req->major_version;
     resp->minor_version = req->minor_version;
     resp->status_code = status_code;
-    if (status_message) strcpy(resp->status_message, status_message);
-    if (content_type)   strcpy(resp->content_type, content_type);
+    if (status_message) strncpy(resp->status_message, status_message, sizeof(req->status_message) - 1);
+    if (content_type)   strncpy(resp->content_type, content_type, sizeof(req->content_type) - 1);
     resp->keepalive = req->keepalive;
     if (body) {
         if (body_len <= 0) body_len = strlen(body);
         resp->content_length = body_len;
-        resp->body = body;
+        resp->body = (char*)body;
     }
-    if (resp->content_length > buflen - HTTP_HEAD_MAX_LENGTH) {
-        HV_ALLOC(buf, HTTP_HEAD_MAX_LENGTH + resp->content_length);
+    if (resp->content_length > buflen - HTTP_MAX_HEAD_LENGTH) {
+        HV_ALLOC(buf, HTTP_MAX_HEAD_LENGTH + resp->content_length);
     }
     int msglen = http_response_dump(resp, buf, buflen);
     int nwrite = hio_write(conn->io, buf, msglen);
@@ -220,13 +225,13 @@ static bool parse_http_head(http_conn_t* conn, char* buf, int len) {
     if (stricmp(key, "Content-Length") == 0) {
         req->content_length = atoi(val);
     } else if (stricmp(key, "Content-Type") == 0) {
-        strcpy(req->content_type, val);
+        strncpy(req->content_type, val, sizeof(req->content_type) - 1);
     } else if (stricmp(key, "Connection") == 0) {
         if (stricmp(val, "close") == 0) {
             req->keepalive = 0;
         }
     } else {
-        // TODO: save head
+        // TODO: save other head
     }
     return true;
 }
@@ -328,7 +333,8 @@ static void on_recv(hio_t* io, void* buf, int readbytes) {
     case s_body:
         // printf("s_body\n");
         req->body = str;
-        if (readbytes == req->content_length) {
+        req->body_len += readbytes;
+        if (req->body_len == req->content_length) {
             conn->state = s_end;
         } else {
             // WARN: too large content_length should be handled by streaming!

+ 438 - 0
examples/tinyproxyd.c

@@ -0,0 +1,438 @@
+/*
+ * tinyproxyd       tiny http proxy server
+ *
+ * @build           make examples
+ *
+ * @http_server     bin/tinyhttpd  8000
+ * @proxy_server    bin/tinyproxyd 1080
+ *
+ * @proxy_client    bin/curl -v www.httpbin.org/get --http-proxy 127.0.0.1:1080
+ *                  bin/curl -v www.httpbin.org/post -d hello --http-proxy 127.0.0.1:1080
+ *                      curl -v www.httpbin.org/get --proxy http://127.0.0.1:1080
+ *                      curl -v www.httpbin.org/post -d hello --proxy http://127.0.0.1:1080
+ *
+ */
+
+#include "hv.h"
+#include "hloop.h"
+
+/*
+ * workflow:
+ * hloop_new -> hloop_create_tcp_server -> hloop_run ->
+ * on_accept -> HV_ALLOC(http_conn_t) -> hio_readline ->
+ * on_recv -> parse_http_request_line -> hio_readline ->
+ * on_recv -> parse_http_head -> ...  -> hio_readline ->
+ * on_head_end -> hio_setup_upstream ->
+ * on_upstream_connect -> hio_write_upstream(head) ->
+ * on_body -> hio_write_upstream(body) ->
+ * on_upstream_close -> hio_close ->
+ * on_close -> HV_FREE(http_conn_t)
+ *
+ */
+
+static char proxy_host[64] = "0.0.0.0";
+static int  proxy_port = 1080;
+static int  proxy_ssl = 0;
+
+static int thread_num = 1;
+static hloop_t*  accept_loop = NULL;
+static hloop_t** worker_loops = NULL;
+
+#define HTTP_KEEPALIVE_TIMEOUT  60000 // ms
+#define HTTP_MAX_URL_LENGTH     256
+#define HTTP_MAX_HEAD_LENGTH    1024
+
+typedef enum {
+    s_begin,
+    s_first_line,
+    s_request_line = s_first_line,
+    s_status_line = s_first_line,
+    s_head,
+    s_head_end,
+    s_body,
+    s_end
+} http_state_e;
+
+typedef struct {
+    // first line
+    int             major_version;
+    int             minor_version;
+    union {
+        // request line
+        struct {
+            char method[32];
+            char path[HTTP_MAX_URL_LENGTH];
+        };
+        // status line
+        struct {
+            int  status_code;
+            char status_message[64];
+        };
+    };
+    // headers
+    char        host[64];
+    int         content_length;
+    char        content_type[64];
+    unsigned    keepalive:  1;
+    char        head[HTTP_MAX_HEAD_LENGTH];
+    int         head_len;
+    // body
+    char*       body;
+    int         body_len; // body_len = content_length
+} http_msg_t;
+
+typedef struct {
+    hio_t*          io;
+    http_state_e    state;
+    http_msg_t      request;
+//  http_msg_t      response;
+} http_conn_t;
+
+static int http_request_dump(http_conn_t* conn, char* buf, int len) {
+    http_msg_t* msg = &conn->request;
+    int offset = 0;
+    // request line
+    const char* pos = strstr(msg->path, "://");
+    pos = pos ? pos + 3 : msg->path;
+    const char* path = strchr(pos, '/');
+    if (path == NULL) path = "/";
+    offset += snprintf(buf + offset, len - offset, "%s %s HTTP/%d.%d\r\n", msg->method, path, msg->major_version, msg->minor_version);
+    // headers
+    /*
+    offset += snprintf(buf + offset, len - offset, "Connection: %s\r\n", msg->keepalive ? "keep-alive" : "close");
+    if (msg->content_length > 0) {
+        offset += snprintf(buf + offset, len - offset, "Content-Length: %d\r\n", msg->content_length);
+    }
+    if (*msg->content_type) {
+        offset += snprintf(buf + offset, len - offset, "Content-Type: %s\r\n", msg->content_type);
+    }
+    */
+    if (msg->head_len) {
+        memcpy(buf + offset, msg->head, msg->head_len);
+        offset += msg->head_len;
+    }
+    char peeraddrstr[SOCKADDR_STRLEN] = {0};
+    SOCKADDR_STR(hio_peeraddr(conn->io), peeraddrstr);
+    offset += snprintf(buf + offset, len - offset, "X-Origin-IP: %s\r\n", peeraddrstr);
+    // TODO: Add your headers
+    offset += snprintf(buf + offset, len - offset, "\r\n");
+    // body
+    if (msg->body && msg->content_length > 0) {
+        memcpy(buf + offset, msg->body, msg->content_length);
+        offset += msg->content_length;
+    }
+    return offset;
+}
+
+static bool parse_http_request_line(http_conn_t* conn, char* buf, int len) {
+    // GET / HTTP/1.1
+    http_msg_t* req = &conn->request;
+    sscanf(buf, "%s %s HTTP/%d.%d", req->method, req->path, &req->major_version, &req->minor_version);
+    if (req->major_version != 1) return false;
+    if (req->minor_version == 1) req->keepalive = 1;
+    // printf("%s %s HTTP/%d.%d\r\n", req->method, req->path, req->major_version, req->minor_version);
+    return true;
+}
+
+static bool parse_http_head(http_conn_t* conn, char* buf, int len) {
+    http_msg_t* req = &conn->request;
+    // Content-Type: text/html
+    const char* key = buf;
+    const char* val = buf;
+    char* delim = strchr(buf, ':');
+    if (!delim) return false;
+    *delim = '\0';
+    val = delim + 1;
+    // trim space
+    while (*val == ' ') ++val;
+    // printf("%s: %s\r\n", key, val);
+    if (stricmp(key, "Host") == 0) {
+        strncpy(req->host, val, sizeof(req->host) - 1);
+    } else if (stricmp(key, "Content-Length") == 0) {
+        req->content_length = atoi(val);
+    } else if (stricmp(key, "Content-Type") == 0) {
+        strncpy(req->content_type, val, sizeof(req->content_type) - 1);
+    } else if (stricmp(key, "Proxy-Connection") == 0) {
+        if (stricmp(val, "close") == 0) {
+            req->keepalive = 0;
+        }
+    }
+    return true;
+}
+
+static void on_upstream_connect(hio_t* upstream_io) {
+    // printf("on_upstream_connect\n");
+    http_conn_t* conn = (http_conn_t*)hevent_userdata(upstream_io);
+    http_msg_t* req = &conn->request;
+    // send head
+    char stackbuf[HTTP_MAX_HEAD_LENGTH + 1024] = {0};
+    char* buf = stackbuf;
+    int buflen = sizeof(stackbuf);
+    int msglen = http_request_dump(conn, buf, buflen);
+    hio_write(upstream_io, buf, msglen);
+    if (conn->state != s_end) {
+        // start recv body then upstream
+        hio_read_start(conn->io);
+    } else {
+        if (req->keepalive) {
+            // Connection: keep-alive\r\n
+            // reset and receive next request
+            memset(&conn->request,  0, sizeof(http_msg_t));
+            // memset(&conn->response, 0, sizeof(http_msg_t));
+            conn->state = s_first_line;
+            hio_readline(conn->io);
+        }
+    }
+    // start recv response
+    hio_read_start(upstream_io);
+}
+
+static int on_head_end(http_conn_t* conn) {
+    http_msg_t* req = &conn->request;
+    if (req->host[0] == '\0') {
+        fprintf(stderr, "No Host header!\n");
+        return -1;
+    }
+    char backend_host[64] = {0};
+    strcpy(backend_host, req->host);
+    int backend_port = 80;
+    int backend_ssl = strncmp(req->path, "https", 5) == 0 ? 1 : 0;
+    char* pos = strchr(backend_host, ':');
+    if (pos) {
+        *pos = '\0';
+        backend_port = atoi(pos + 1);
+    }
+    // printf("upstream %s:%d\n", backend_host, backend_port);
+    if (backend_port == proxy_port &&
+        (strcmp(backend_host, proxy_host) == 0 ||
+         strcmp(backend_host, "localhost") == 0 ||
+         strcmp(backend_host, "127.0.0.1") == 0)) {
+        fprintf(stderr, "Cound not to upstream proxy server itself!\n");
+        return -2;
+    }
+    hloop_t* loop = hevent_loop(conn->io);
+    // hio_t* upstream_io = hio_setup_tcp_upstream(conn->io, backend_host, backend_port, backend_ssl);
+    hio_t* upstream_io = hio_create_socket(loop, backend_host, backend_port, HIO_TYPE_TCP, HIO_CLIENT_SIDE);
+    if (upstream_io == NULL) {
+        fprintf(stderr, "Failed to upstream %s:%d!\n", backend_host, backend_port);
+        return -3;
+    }
+    if (backend_ssl) {
+        hio_enable_ssl(upstream_io);
+    }
+    hevent_set_userdata(upstream_io, conn);
+    hio_setup_upstream(conn->io, upstream_io);
+    hio_setcb_read(upstream_io, hio_write_upstream);
+    hio_setcb_close(upstream_io, hio_close_upstream);
+    hio_setcb_connect(upstream_io, on_upstream_connect);
+    hio_connect(upstream_io);
+    return 0;
+}
+
+static int on_body(http_conn_t* conn, void* buf, int readbytes) {
+    hio_write_upstream(conn->io, buf, readbytes);
+    return 0;
+}
+
+static void on_close(hio_t* io) {
+    // printf("on_close fd=%d error=%d\n", hio_fd(io), hio_error(io));
+    http_conn_t* conn = (http_conn_t*)hevent_userdata(io);
+    if (conn) {
+        HV_FREE(conn);
+        hevent_set_userdata(io, NULL);
+    }
+    hio_close_upstream(io);
+}
+
+static void on_recv(hio_t* io, void* buf, int readbytes) {
+    char* str = (char*)buf;
+    // printf("on_recv fd=%d readbytes=%d\n", hio_fd(io), readbytes);
+    // printf("%.*s", readbytes, str);
+    http_conn_t* conn = (http_conn_t*)hevent_userdata(io);
+    http_msg_t* req = &conn->request;
+    switch (conn->state) {
+    case s_begin:
+        // printf("s_begin");
+        conn->state = s_first_line;
+    case s_first_line:
+        // printf("s_first_line\n");
+        if (readbytes < 2) {
+            fprintf(stderr, "Not match \r\n!");
+            hio_close(io);
+            return;
+        }
+        str[readbytes - 2] = '\0';
+        if (parse_http_request_line(conn, str, readbytes - 2) == false) {
+            fprintf(stderr, "Failed to parse http request line:\n%s\n", str);
+            hio_close(io);
+            return;
+        }
+        // start read head
+        conn->state = s_head;
+        hio_readline(io);
+        break;
+    case s_head:
+        // printf("s_head\n");
+        if (readbytes < 2) {
+            fprintf(stderr, "Not match \r\n!");
+            hio_close(io);
+            return;
+        }
+        if (readbytes == 2 && str[0] == '\r' && str[1] == '\n') {
+            conn->state = s_head_end;
+        } else {
+            // NOTE: save head
+            if (strnicmp(str, "Proxy-", 6) != 0) {
+                if (req->head_len + readbytes < HTTP_MAX_HEAD_LENGTH) {
+                    memcpy(req->head + req->head_len, buf, readbytes);
+                    req->head_len += readbytes;
+                }
+            }
+            str[readbytes - 2] = '\0';
+            if (parse_http_head(conn, str, readbytes - 2) == false) {
+                fprintf(stderr, "Failed to parse http head:\n%s\n", str);
+                hio_close(io);
+                return;
+            }
+            hio_readline(io);
+            break;
+        }
+    case s_head_end:
+        // printf("s_head_end\n");
+        if (on_head_end(conn) < 0) {
+            hio_close(io);
+            return;
+        }
+        if (req->content_length == 0) {
+            conn->state = s_end;
+        } else {
+            conn->state = s_body;
+            // NOTE: start read body on_upstream_connect
+            // hio_read_start(io);
+            break;
+        }
+    case s_body:
+        // printf("s_body\n");
+        if (on_body(conn, buf, readbytes) < 0) {
+            hio_close(io);
+            return;
+        }
+        req->body = str;
+        req->body_len += readbytes;
+        if (readbytes == req->content_length) {
+            conn->state = s_end;
+        } else {
+            // Not end
+            break;
+        }
+    case s_end:
+        // printf("s_end\n");
+        if (req->keepalive) {
+            // Connection: keep-alive\r\n
+            // reset and receive next request
+            memset(&conn->request,  0, sizeof(http_msg_t));
+            // memset(&conn->response, 0, sizeof(http_msg_t));
+            conn->state = s_first_line;
+            hio_readline(io);
+        } else {
+            // Connection: close\r\n
+            // NOTE: wait upstream close!
+            // hio_close(io);
+        }
+        break;
+    default: break;
+    }
+}
+
+static void new_conn_event(hevent_t* ev) {
+    hloop_t* loop = ev->loop;
+    hio_t* io = (hio_t*)hevent_userdata(ev);
+    hio_attach(loop, io);
+
+    /*
+    char localaddrstr[SOCKADDR_STRLEN] = {0};
+    char peeraddrstr[SOCKADDR_STRLEN] = {0};
+    printf("tid=%ld connfd=%d [%s] <= [%s]\n",
+            (long)hv_gettid(),
+            (int)hio_fd(io),
+            SOCKADDR_STR(hio_localaddr(io), localaddrstr),
+            SOCKADDR_STR(hio_peeraddr(io), peeraddrstr));
+    */
+
+    hio_setcb_close(io, on_close);
+    hio_setcb_read(io, on_recv);
+    hio_set_keepalive_timeout(io, HTTP_KEEPALIVE_TIMEOUT);
+
+    http_conn_t* conn = NULL;
+    HV_ALLOC_SIZEOF(conn);
+    conn->io = io;
+    hevent_set_userdata(io, conn);
+    // start read first line
+    conn->state = s_first_line;
+    hio_readline(io);
+}
+
+static hloop_t* get_next_loop() {
+    static int s_cur_index = 0;
+    if (s_cur_index == thread_num) {
+        s_cur_index = 0;
+    }
+    return worker_loops[s_cur_index++];
+}
+
+static void on_accept(hio_t* io) {
+    hio_detach(io);
+
+    hloop_t* worker_loop = get_next_loop();
+    hevent_t ev;
+    memset(&ev, 0, sizeof(ev));
+    ev.loop = worker_loop;
+    ev.cb = new_conn_event;
+    ev.userdata = io;
+    hloop_post_event(worker_loop, &ev);
+}
+
+static HTHREAD_RETTYPE worker_thread(void* userdata) {
+    hloop_t* loop = (hloop_t*)userdata;
+    hloop_run(loop);
+    return 0;
+}
+
+static HTHREAD_RETTYPE accept_thread(void* userdata) {
+    hloop_t* loop = (hloop_t*)userdata;
+    hio_t* listenio = hloop_create_tcp_server(loop, proxy_host, proxy_port, on_accept);
+    if (listenio == NULL) {
+        exit(1);
+    }
+    if (proxy_ssl) {
+        hio_enable_ssl(listenio);
+    }
+    printf("tinyproxyd listening on %s:%d, listenfd=%d, thread_num=%d\n",
+            proxy_host, proxy_port, hio_fd(listenio), thread_num);
+    hloop_run(loop);
+    return 0;
+}
+
+int main(int argc, char** argv) {
+    if (argc < 2) {
+        printf("Usage: %s proxy_port [thread_num]\n", argv[0]);
+        return -10;
+    }
+    proxy_port = atoi(argv[1]);
+    if (argc > 2) {
+        thread_num = atoi(argv[3]);
+    } else {
+        thread_num = get_ncpu();
+    }
+    if (thread_num == 0) thread_num = 1;
+
+    worker_loops = (hloop_t**)malloc(sizeof(hloop_t*) * thread_num);
+    for (int i = 0; i < thread_num; ++i) {
+        worker_loops[i] = hloop_new(HLOOP_FLAG_AUTO_FREE);
+        hthread_create(worker_thread, worker_loops[i]);
+    }
+
+    accept_loop = hloop_new(HLOOP_FLAG_AUTO_FREE);
+    accept_thread(accept_loop);
+    return 0;
+}

+ 14 - 11
examples/udp_proxy_server.c

@@ -11,36 +11,39 @@
 
 #include "hloop.h"
 
-static char proxy_host[64] = "127.0.0.1";
-static int proxy_port = 1234;
+static char proxy_host[64] = "0.0.0.0";
+static int  proxy_port = 1080;
+
+static char backend_host[64] = "127.0.0.1";
+static int  backend_port = 80;
 
 // hloop_create_udp_server -> hio_setup_udp_upstream
 
 int main(int argc, char** argv) {
     if (argc < 3) {
-        printf("Usage: %s port proxy_host:proxy_port\n", argv[0]);
+        printf("Usage: %s proxy_port backend_host:backend_port\n", argv[0]);
         return -10;
     }
-    int port = atoi(argv[1]);
+    proxy_port = atoi(argv[1]);
     char* pos = strchr(argv[2], ':');
     if (pos) {
         int len = pos - argv[2];
         if (len > 0) {
-            memcpy(proxy_host, argv[2], len);
-            proxy_host[len] = '\0';
+            memcpy(backend_host, argv[2], len);
+            backend_host[len] = '\0';
         }
-        proxy_port = atoi(pos + 1);
+        backend_port = atoi(pos + 1);
     } else {
-        strncpy(proxy_host, argv[2], sizeof(proxy_host));
+        strncpy(backend_host, argv[2], sizeof(backend_host));
     }
-    printf("proxy: [%s:%d]\n", proxy_host, proxy_port);
+    printf("%s:%d proxy %s:%d\n", proxy_host, proxy_port, backend_host, backend_port);
 
     hloop_t* loop = hloop_new(0);
-    hio_t* io = hloop_create_udp_server(loop, "0.0.0.0", port);
+    hio_t* io = hloop_create_udp_server(loop, proxy_host, proxy_port);
     if (io == NULL) {
         return -20;
     }
-    hio_t* upstream_io = hio_setup_udp_upstream(io, proxy_host, proxy_port);
+    hio_t* upstream_io = hio_setup_udp_upstream(io, backend_host, backend_port);
     if (upstream_io == NULL) {
         return -30;
     }