Răsfoiți Sursa

http_client_send_async

ithewei 5 ani în urmă
părinte
comite
cd30f2e9dd

+ 4 - 4
Makefile

@@ -85,17 +85,17 @@ httpd: prepare
 	$(MAKEF) TARGET=$@ SRCDIRS=". base utils event http http/server examples/httpd"
 
 curl: prepare
-	$(MAKEF) TARGET=$@ SRCDIRS=". base utils http http/client" SRCS="examples/curl.cpp"
-	# $(MAKEF) TARGET=$@ SRCDIRS=". base utils http http/client" SRCS="examples/curl.cpp" WITH_CURL=yes DEFINES="CURL_STATICLIB"
+	$(MAKEF) TARGET=$@ SRCDIRS=". base utils event http http/client" SRCS="examples/curl.cpp"
+	# $(MAKEF) TARGET=$@ SRCDIRS=". base utils event http http/client" SRCS="examples/curl.cpp" WITH_CURL=yes DEFINES="CURL_STATICLIB"
 
 http_server_test: prepare
 	$(MAKEF) TARGET=$@ SRCDIRS=". base utils event http http/server" SRCS="examples/http_server_test.cpp"
 
 http_client_test: prepare
-	$(MAKEF) TARGET=$@ SRCDIRS=". base utils http http/client" SRCS="examples/http_client_test.cpp"
+	$(MAKEF) TARGET=$@ SRCDIRS=". base utils event http http/client" SRCS="examples/http_client_test.cpp"
 
 consul_cli: prepare
-	$(MAKEF) TARGET=$@ SRCDIRS=". base utils http http/client consul" SRCS="examples/consul_cli.cpp" DEFINES="PRINT_DEBUG"
+	$(MAKEF) TARGET=$@ SRCDIRS=". base utils event http http/client consul" SRCS="examples/consul_cli.cpp" DEFINES="PRINT_DEBUG"
 
 unittest: prepare
 	$(CC)  -g -Wall -std=c99   -I. -Ibase            -o bin/mkdir_p           unittest/mkdir_test.c         base/hbase.c

+ 3 - 2
Makefile.in

@@ -67,9 +67,10 @@ endif
 
 # CFLAGS, CXXFLAGS, ARFLAGS
 ifeq ($(BUILD_TYPE), DEBUG)
-	DEFAULT_CFLAGS = -g -Wall
+	DEFAULT_CFLAGS = -g -Wall -O0
+else
+	DEFAULT_CFLAGS += -O2
 endif
-DEFAULT_CFLAGS += -O2
 
 CFLAGS ?= $(DEFAULT_CFLAGS)
 CXXFLAGS ?= $(DEFAULT_CFLAGS)

+ 21 - 23
event/hloop.c

@@ -202,6 +202,16 @@ static void hloop_cleanup(hloop_t* loop) {
         loop->pendings[i] = NULL;
     }
 
+    // ios
+    printd("cleanup ios...\n");
+    for (int i = 0; i < loop->ios.maxsize; ++i) {
+        hio_t* io = loop->ios.ptr[i];
+        if (io) {
+            hio_free(io);
+        }
+    }
+    io_array_cleanup(&loop->ios);
+
     // idles
     printd("cleanup idles...\n");
     struct list_node* node = loop->idles.next;
@@ -223,16 +233,6 @@ static void hloop_cleanup(hloop_t* loop) {
     }
     heap_init(&loop->timers, NULL);
 
-    // ios
-    printd("cleanup ios...\n");
-    for (int i = 0; i < loop->ios.maxsize; ++i) {
-        hio_t* io = loop->ios.ptr[i];
-        if (io) {
-            hio_free(io);
-        }
-    }
-    io_array_cleanup(&loop->ios);
-
     // readbuf
     if (loop->readbuf.base && loop->readbuf.len) {
         HV_FREE(loop->readbuf.base);
@@ -660,7 +660,7 @@ int hio_del(hio_t* io, int events) {
 
 hio_t* hread(hloop_t* loop, int fd, void* buf, size_t len, hread_cb read_cb) {
     hio_t* io = hio_get(loop, fd);
-    if (io == NULL) return NULL;
+    assert(io != NULL);
     io->readbuf.base = (char*)buf;
     io->readbuf.len = len;
     if (read_cb) {
@@ -672,7 +672,7 @@ hio_t* hread(hloop_t* loop, int fd, void* buf, size_t len, hread_cb read_cb) {
 
 hio_t* hwrite(hloop_t* loop, int fd, const void* buf, size_t len, hwrite_cb write_cb) {
     hio_t* io = hio_get(loop, fd);
-    if (io == NULL) return NULL;
+    assert(io != NULL);
     if (write_cb) {
         io->write_cb = write_cb;
     }
@@ -682,8 +682,7 @@ hio_t* hwrite(hloop_t* loop, int fd, const void* buf, size_t len, hwrite_cb writ
 
 hio_t* haccept(hloop_t* loop, int listenfd, haccept_cb accept_cb) {
     hio_t* io = hio_get(loop, listenfd);
-    if (io == NULL) return NULL;
-    io->accept = 1;
+    assert(io != NULL);
     if (accept_cb) {
         io->accept_cb = accept_cb;
     }
@@ -693,8 +692,7 @@ hio_t* haccept(hloop_t* loop, int listenfd, haccept_cb accept_cb) {
 
 hio_t* hconnect (hloop_t* loop, int connfd, hconnect_cb connect_cb) {
     hio_t* io = hio_get(loop, connfd);
-    if (io == NULL) return NULL;
-    io->connect = 1;
+    assert(io != NULL);
     if (connect_cb) {
         io->connect_cb = connect_cb;
     }
@@ -704,13 +702,13 @@ hio_t* hconnect (hloop_t* loop, int connfd, hconnect_cb connect_cb) {
 
 void hclose (hloop_t* loop, int fd) {
     hio_t* io = hio_get(loop, fd);
-    if (io == NULL) return;
+    assert(io != NULL);
     hio_close(io);
 }
 
 hio_t* hrecv (hloop_t* loop, int connfd, void* buf, size_t len, hread_cb read_cb) {
     //hio_t* io = hio_get(loop, connfd);
-    //if (io == NULL) return NULL;
+    //assert(io != NULL);
     //io->recv = 1;
     //if (io->io_type != HIO_TYPE_SSL) {
         //io->io_type = HIO_TYPE_TCP;
@@ -720,7 +718,7 @@ hio_t* hrecv (hloop_t* loop, int connfd, void* buf, size_t len, hread_cb read_cb
 
 hio_t* hsend (hloop_t* loop, int connfd, const void* buf, size_t len, hwrite_cb write_cb) {
     //hio_t* io = hio_get(loop, connfd);
-    //if (io == NULL) return NULL;
+    //assert(io != NULL);
     //io->send = 1;
     //if (io->io_type != HIO_TYPE_SSL) {
         //io->io_type = HIO_TYPE_TCP;
@@ -730,7 +728,7 @@ hio_t* hsend (hloop_t* loop, int connfd, const void* buf, size_t len, hwrite_cb
 
 hio_t* hrecvfrom (hloop_t* loop, int sockfd, void* buf, size_t len, hread_cb read_cb) {
     //hio_t* io = hio_get(loop, sockfd);
-    //if (io == NULL) return NULL;
+    //assert(io != NULL);
     //io->recvfrom = 1;
     //io->io_type = HIO_TYPE_UDP;
     return hread(loop, sockfd, buf, len, read_cb);
@@ -738,7 +736,7 @@ hio_t* hrecvfrom (hloop_t* loop, int sockfd, void* buf, size_t len, hread_cb rea
 
 hio_t* hsendto (hloop_t* loop, int sockfd, const void* buf, size_t len, hwrite_cb write_cb) {
     //hio_t* io = hio_get(loop, sockfd);
-    //if (io == NULL) return NULL;
+    //assert(io != NULL);
     //io->sendto = 1;
     //io->io_type = HIO_TYPE_UDP;
     return hwrite(loop, sockfd, buf, len, write_cb);
@@ -771,7 +769,7 @@ hio_t* hloop_create_tcp_client (hloop_t* loop, const char* host, int port, hconn
     }
 
     hio_t* io = hio_get(loop, connfd);
-    if (io == NULL) return NULL;
+    assert(io != NULL);
     hio_set_peeraddr(io, &peeraddr.sa, sockaddr_len(&peeraddr));
     hconnect(loop, connfd, connect_cb);
     return io;
@@ -803,7 +801,7 @@ hio_t* hloop_create_udp_client(hloop_t* loop, const char* host, int port) {
     }
 
     hio_t* io = hio_get(loop, sockfd);
-    if (io == NULL) return NULL;
+    assert(io != NULL);
     hio_set_peeraddr(io, &peeraddr.sa, sockaddr_len(&peeraddr));
     return io;
 }

+ 2 - 0
event/nio.c

@@ -460,6 +460,7 @@ static void hio_handle_events(hio_t* io) {
 }
 
 int hio_accept(hio_t* io) {
+    io->accept = 1;
     hio_add(io, hio_handle_events, HV_READ);
     return 0;
 }
@@ -483,6 +484,7 @@ int hio_connect(hio_t* io) {
     int timeout = io->connect_timeout ? io->connect_timeout : HIO_DEFAULT_CONNECT_TIMEOUT;
     io->connect_timer = htimer_add(io->loop, __connect_timeout_cb, timeout, 1);
     io->connect_timer->privdata = io;
+    io->connect = 1;
     return hio_add(io, hio_handle_events, HV_WRITE);
 }
 

+ 2 - 0
event/overlapio.c

@@ -247,6 +247,7 @@ int hio_accept (hio_t* io) {
     for (int i = 0; i < ACCEPTEX_NUM; ++i) {
         post_acceptex(io, NULL);
     }
+    io->accept = 1;
     return hio_add(io, hio_handle_events, HV_READ);
 }
 
@@ -286,6 +287,7 @@ int hio_connect (hio_t* io) {
             goto error;
         }
     }
+    io->connect = 1;
     return hio_add(io, hio_handle_events, HV_WRITE);
 error:
     hio_close(io);

+ 46 - 2
examples/http_client_test.cpp

@@ -1,7 +1,38 @@
 #include "requests.h"
 
-int main() {
-    auto resp = requests::get("http://www.example.com");
+#include "hthread.h" // import hv_gettid
+
+static void onResponse(int state, HttpRequestPtr req, HttpResponsePtr resp, void* userdata) {
+    printf("test_http_async_client response thread tid=%ld\n", hv_gettid());
+    if (state != 0) {
+        printf("onError: %s:%d\n", http_client_strerror(state), state);
+    } else {
+        printf("onSuccess\n");
+        printf("%d %s\r\n", resp->status_code, resp->status_message());
+        printf("%s\n", resp->body.c_str());
+    }
+
+    int* finished = (int*)userdata;
+    *finished = 1;
+}
+
+static void test_http_async_client(int* finished) {
+    printf("test_http_async_client request thread tid=%ld\n", hv_gettid());
+    HttpRequestPtr req = HttpRequestPtr(new HttpRequest);
+    HttpResponsePtr resp = HttpResponsePtr(new HttpResponse);
+    req->method = HTTP_POST;
+    req->url = "127.0.0.1:8080/echo";
+    req->body = "this is an async request.";
+    req->timeout = 10;
+    int ret = http_client_send_async(req, resp, onResponse, (void*)finished);
+    if (ret != 0) {
+        printf("http_client_send_async error: %s:%d\n", http_client_strerror(ret), ret);
+        *finished = 1;
+    }
+}
+
+static void test_http_sync_client() {
+    auto resp = requests::get("http://127.0.0.1:8080/ping");
     if (resp == NULL) {
         printf("request failed!\n");
     } else {
@@ -16,6 +47,19 @@ int main() {
         printf("%d %s\r\n", resp2->status_code, resp2->status_message());
         printf("%s\n", resp2->body.c_str());
     }
+}
+
+int main() {
+    int finished = 0;
+    test_http_async_client(&finished);
+
+    test_http_sync_client();
+
+    // demo wait async ResponseCallback
+    while (!finished) {
+        hv_delay(100);
+    }
+    printf("finished!\n");
 
     return 0;
 }

+ 9 - 1
http/HttpMessage.h

@@ -1,6 +1,7 @@
 #ifndef HTTP_MESSAGE_H_
 #define HTTP_MESSAGE_H_
 
+#include <memory>
 #include <string>
 #include <map>
 
@@ -150,7 +151,8 @@ public:
     std::string         path;
     QueryParams         query_params;
     // client_addr
-    HNetAddr            client_addr;
+    HNetAddr            client_addr; // for http server save client addr of request
+    int                 timeout; // for http client timeout
 
     HttpRequest() : HttpMessage() {
         type = HTTP_REQUEST;
@@ -165,6 +167,7 @@ public:
         host = "127.0.0.1";
         port = DEFAULT_HTTP_PORT;
         path = "/";
+        timeout = 0;
     }
 
     virtual void Reset() {
@@ -214,4 +217,9 @@ public:
     virtual std::string Dump(bool is_dump_headers = true, bool is_dump_body = false);
 };
 
+typedef std::shared_ptr<HttpRequest>    HttpRequestPtr;
+typedef std::shared_ptr<HttpResponse>   HttpResponsePtr;
+// state: 0 onSucceed other onError
+typedef void (*HttpResponseCallback)(int state, HttpRequestPtr req, HttpResponsePtr resp, void* userdata);
+
 #endif // HTTP_MESSAGE_H_

+ 2 - 0
http/HttpParser.h

@@ -46,4 +46,6 @@ public:
     virtual const char* StrError(int error) = 0;
 };
 
+typedef std::shared_ptr<HttpParser> HttpParserPtr;
+
 #endif // HTTP_PARSER_H_

+ 270 - 66
http/client/http_client.cpp

@@ -1,46 +1,52 @@
 #include "http_client.h"
 
-#define MAX_CONNECT_TIMEOUT 3000 // ms
-
-#include "hstring.h" // import asprintf,trim
+#include <mutex>
 
 #ifdef WITH_CURL
 #include "curl/curl.h"
-#else
+#endif
+
 #include "herr.h"
+#include "hstring.h"
 #include "hsocket.h"
 #include "hssl.h"
 #include "HttpParser.h"
-#endif
 
+// for async
+#include "hthread.h"
+#include "hloop.h"
 struct http_client_s {
     std::string  host;
     int          port;
     int          https;
-    int          http_version;
     int          timeout; // s
     http_headers headers;
 //private:
 #ifdef WITH_CURL
     CURL* curl;
-#else
-    int fd;
-    hssl_t ssl;
-    HttpParser*  parser;
 #endif
+    int fd;
+    // for sync
+    hssl_t          ssl;
+    HttpParserPtr   parser;
+    // for async
+    std::mutex  mutex_;
+    hthread_t   thread_;
+    hloop_t*    loop_;
 
     http_client_s() {
+        host = LOCALHOST;
         port = DEFAULT_HTTP_PORT;
         https = 0;
-        http_version = 1;
         timeout = DEFAULT_HTTP_TIMEOUT;
 #ifdef WITH_CURL
         curl = NULL;
-#else
+#endif
         fd = -1;
         ssl = NULL;
-        parser = NULL;
-#endif
+
+        thread_ = 0;
+        loop_ = NULL;
     }
 
     ~http_client_s() {
@@ -48,12 +54,20 @@ struct http_client_s {
     }
 
     void Close() {
+        if (loop_) {
+            hloop_stop(loop_);
+            loop_ = NULL;
+        }
+        if (thread_) {
+            hthread_join(thread_);
+            thread_ = 0;
+        }
 #ifdef WITH_CURL
         if (curl) {
             curl_easy_cleanup(curl);
             curl = NULL;
         }
-#else
+#endif
         if (ssl) {
             hssl_free(ssl);
             ssl = NULL;
@@ -62,24 +76,16 @@ struct http_client_s {
             closesocket(fd);
             fd = -1;
         }
-        if (parser) {
-            delete parser;
-            parser = NULL;
-        }
-#endif
     }
 };
 
-static int __http_client_send(http_client_t* cli, HttpRequest* req, HttpResponse* res);
+static int __http_client_send(http_client_t* cli, HttpRequest* req, HttpResponse* resp);
 
 http_client_t* http_client_new(const char* host, int port, int https) {
     http_client_t* cli = new http_client_t;
-    cli->https = https;
+    if (host) cli->host = host;
     cli->port = port;
-    if (host) {
-        cli->host = host;
-        cli->headers["Host"] = asprintf("%s:%d", host, port);
-    }
+    cli->https = https;
     cli->headers["Connection"] = "keep-alive";
     return cli;
 }
@@ -121,26 +127,40 @@ const char* http_client_get_header(http_client_t* cli, const char* key) {
     return NULL;
 }
 
-int http_client_send(http_client_t* cli, HttpRequest* req, HttpResponse* res) {
+int http_client_send(http_client_t* cli, HttpRequest* req, HttpResponse* resp) {
+    if (!cli || !req || !resp) return ERR_NULL_POINTER;
+
+    if (req->url.empty() || *req->url.c_str() == '/') {
+        req->host = cli->host;
+        req->port = cli->port;
+        req->https = cli->https;
+    }
+
+    if (req->timeout == 0) {
+        req->timeout = cli->timeout;
+    }
+
     for (auto& pair : cli->headers) {
         if (req->headers.find(pair.first) == req->headers.end()) {
             req->headers[pair.first] = pair.second;
         }
     }
-    return __http_client_send(cli, req, res);
+
+    return __http_client_send(cli, req, resp);
 }
 
-int http_client_send(HttpRequest* req, HttpResponse* res, int timeout) {
+int http_client_send(HttpRequest* req, HttpResponse* resp) {
+    if (!req || !resp) return ERR_NULL_POINTER;
+
     http_client_t cli;
-    cli.timeout = timeout;
-    return __http_client_send(&cli, req, res);
+    return __http_client_send(&cli, req, resp);
 }
 
 #ifdef WITH_CURL
 static size_t s_header_cb(char* buf, size_t size, size_t cnt, void* userdata) {
     if (buf == NULL || userdata == NULL)    return 0;
 
-    HttpResponse* res = (HttpResponse*)userdata;
+    HttpResponse* resp = (HttpResponse*)userdata;
 
     std::string str(buf);
     std::string::size_type pos = str.find_first_of(':');
@@ -158,16 +178,16 @@ static size_t s_header_cb(char* buf, size_t size, size_t cnt, void* userdata) {
                 sscanf(buf, "HTTP/%d %d", &http_major, &status_code);
                 http_minor = 0;
             }
-            res->http_major = http_major;
-            res->http_minor = http_minor;
-            res->status_code = (http_status)status_code;
+            resp->http_major = http_major;
+            resp->http_minor = http_minor;
+            resp->status_code = (http_status)status_code;
         }
     }
     else {
         // headers
         std::string key = trim(str.substr(0, pos));
         std::string value = trim(str.substr(pos+1));
-        res->headers[key] = value;
+        resp->headers[key] = value;
     }
     return size*cnt;
 }
@@ -175,16 +195,12 @@ static size_t s_header_cb(char* buf, size_t size, size_t cnt, void* userdata) {
 static size_t s_body_cb(char *buf, size_t size, size_t cnt, void *userdata) {
     if (buf == NULL || userdata == NULL)    return 0;
 
-    HttpResponse* res = (HttpResponse*)userdata;
-    res->body.append(buf, size*cnt);
+    HttpResponse* resp = (HttpResponse*)userdata;
+    resp->body.append(buf, size*cnt);
     return size*cnt;
 }
 
-int __http_client_send(http_client_t* cli, HttpRequest* req, HttpResponse* res) {
-    if (req == NULL || res == NULL) {
-        return -1;
-    }
-
+int __http_client_send(http_client_t* cli, HttpRequest* req, HttpResponse* resp) {
     if (cli->curl == NULL) {
         cli->curl = curl_easy_init();
     }
@@ -264,19 +280,19 @@ int __http_client_send(http_client_t* cli, HttpRequest* req, HttpResponse* res)
     }
 
     curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, s_body_cb);
-    curl_easy_setopt(curl, CURLOPT_WRITEDATA, res);
+    curl_easy_setopt(curl, CURLOPT_WRITEDATA, resp);
 
     curl_easy_setopt(curl, CURLOPT_HEADER, 0);
     curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, s_header_cb);
-    curl_easy_setopt(curl, CURLOPT_HEADERDATA, res);
+    curl_easy_setopt(curl, CURLOPT_HEADERDATA, resp);
 
     int ret = curl_easy_perform(curl);
     /*
     if (ret != 0) {
         hloge("curl error: %d: %s", ret, curl_easy_strerror((CURLcode)ret));
     }
-    if (res->body.length() != 0) {
-        hlogd("[Response]\n%s", res->body.c_str());
+    if (resp->body.length() != 0) {
+        hlogd("[Response]\n%s", resp->body.c_str());
     }
     double total_time, name_time, conn_time, pre_time;
     curl_easy_getinfo(curl, CURLINFO_TOTAL_TIME, &total_time);
@@ -302,12 +318,13 @@ const char* http_client_strerror(int errcode) {
     return curl_easy_strerror((CURLcode)errcode);
 }
 #else
-static int __http_client_connect(http_client_t* cli) {
-    int blocktime = MAX_CONNECT_TIMEOUT;
-    if (cli->timeout > 0) {
-        blocktime = MIN(cli->timeout*1000, blocktime);
+static int __http_client_connect(http_client_t* cli, HttpRequest* req) {
+    int blocktime = DEFAULT_CONNECT_TIMEOUT;
+    if (req->timeout > 0) {
+        blocktime = MIN(req->timeout*1000, blocktime);
     }
-    int connfd = ConnectTimeout(cli->host.c_str(), cli->port, blocktime);
+    req->ParseUrl();
+    int connfd = ConnectTimeout(req->host.c_str(), req->port, blocktime);
     if (connfd < 0) {
         return connfd;
     }
@@ -334,35 +351,25 @@ static int __http_client_connect(http_client_t* cli) {
     }
 
     if (cli->parser == NULL) {
-        cli->parser = HttpParser::New(HTTP_CLIENT, (http_version)cli->http_version);
+        cli->parser = HttpParserPtr(HttpParser::New(HTTP_CLIENT, (http_version)req->http_major));
     }
 
     cli->fd = connfd;
     return 0;
 }
 
-int __http_client_send(http_client_t* cli, HttpRequest* req, HttpResponse* res) {
+int __http_client_send(http_client_t* cli, HttpRequest* req, HttpResponse* resp) {
     // connect -> send -> recv -> http_parser
     int err = 0;
     int timeout = cli->timeout;
     int connfd = cli->fd;
 
-    req->ParseUrl();
-    if (cli->host.size() == 0) {
-        cli->host = req->host;
-        cli->port = req->port;
-    }
-    if (cli->https == 0) {
-        cli->https = req->https;
-    }
-    cli->http_version = req->http_major;
-
     time_t start_time = time(NULL);
     time_t cur_time;
     int fail_cnt = 0;
 connect:
     if (connfd <= 0) {
-        int ret = __http_client_connect(cli);
+        int ret = __http_client_connect(cli, req);
         if (ret != 0) {
             return ret;
         }
@@ -408,7 +415,7 @@ send:
             }
         }
     }
-    cli->parser->InitResponse(res);
+    cli->parser->InitResponse(resp);
 recv:
     do {
         if (timeout > 0) {
@@ -439,3 +446,200 @@ const char* http_client_strerror(int errcode) {
     return socket_strerror(errcode);
 }
 #endif
+
+struct HttpContext {
+    HttpRequestPtr  req;
+    HttpResponsePtr resp;
+    HttpParserPtr   parser;
+
+    hio_t*          io;
+    htimer_t*       timer;
+
+    HttpResponseCallback cb;
+    void*                userdata;
+
+    HttpContext() {
+        io = NULL;
+        timer = NULL;
+        cb = NULL;
+        userdata = NULL;
+    }
+
+    ~HttpContext() {
+        killTimer();
+    }
+
+    void closeIO() {
+        if (io) {
+            hio_close(io);
+            io = NULL;
+        }
+    }
+
+    void killTimer() {
+        if (timer) {
+            htimer_del(timer);
+            timer = NULL;
+        }
+    }
+
+    void callback(int state) {
+        if (cb) {
+            cb(state, req, resp, userdata);
+            // NOTE: ensure cb only called once
+            cb = NULL;
+        }
+    }
+
+    void successCallback() {
+        killTimer();
+        callback(0);
+    }
+
+    void errorCallback(int error) {
+        closeIO();
+        callback(error);
+    }
+};
+
+static void on_close(hio_t* io) {
+    HttpContext* ctx = (HttpContext*)hevent_userdata(io);
+    if (ctx) {
+        int error = hio_error(io);
+        ctx->callback(error);
+        delete ctx;
+        hevent_set_userdata(io, NULL);
+    }
+}
+
+static void on_recv(hio_t* io, void* buf, int readbytes) {
+    HttpContext* ctx = (HttpContext*)hevent_userdata(io);
+
+    int nparse = ctx->parser->FeedRecvData((const char*)buf, readbytes);
+    if (nparse != readbytes) {
+        ctx->errorCallback(ERR_PARSE);
+        return;
+    }
+    if (ctx->parser->IsComplete()) {
+        ctx->successCallback();
+        return;
+    }
+}
+
+static void on_connect(hio_t* io) {
+    HttpContext* ctx = (HttpContext*)hevent_userdata(io);
+
+    ctx->parser = HttpParserPtr(HttpParser::New(HTTP_CLIENT, (http_version)ctx->req->http_major));
+    ctx->parser->InitResponse(ctx->resp.get());
+    ctx->parser->SubmitRequest(ctx->req.get());
+
+    char* data = NULL;
+    size_t len = 0;
+    while (ctx->parser->GetSendData(&data, &len)) {
+        hio_write(io, data, len);
+    }
+
+    hio_setcb_read(io, on_recv);
+    hio_read(io);
+}
+
+static void on_timeout(htimer_t* timer) {
+    HttpContext* ctx = (HttpContext*)hevent_userdata(timer);
+    ctx->errorCallback(ERR_TASK_TIMEOUT);
+}
+
+static HTHREAD_ROUTINE(http_client_loop_thread) {
+    hloop_t* loop = (hloop_t*)userdata;
+    assert(loop != NULL);
+    hloop_run(loop);
+    return 0;
+}
+
+// hloop_new -> htread_create -> hloop_run ->
+// hio_connect -> on_connect -> hio_write -> hio_read -> on_recv ->
+// HttpResponseCallback -> on_close
+static int __http_client_send_async(http_client_t* cli, HttpRequestPtr req, HttpResponsePtr resp,
+        HttpResponseCallback cb, void* userdata) {
+    sockaddr_u peeraddr;
+    memset(&peeraddr, 0, sizeof(peeraddr));
+    req->ParseUrl();
+    int ret = sockaddr_set_ipport(&peeraddr, req->host.c_str(), req->port);
+    if (ret != 0) {
+        return ERR_INVALID_PARAM;
+    }
+    int connfd = socket(peeraddr.sa.sa_family, SOCK_STREAM, 0);
+    if (connfd < 0) {
+        return ERR_SOCKET;
+    }
+
+    cli->mutex_.lock();
+    if (cli->loop_ == NULL) {
+        cli->loop_ = hloop_new(HLOOP_FLAG_AUTO_FREE);
+    }
+    if (cli->thread_ == 0) {
+        cli->thread_ = hthread_create(http_client_loop_thread, cli->loop_);
+    }
+    cli->mutex_.unlock();
+
+    hio_t* connio = hio_get(cli->loop_, connfd);
+    assert(connio != NULL);
+
+    hio_set_peeraddr(connio, &peeraddr.sa, sockaddr_len(&peeraddr));
+    hio_setcb_connect(connio, on_connect);
+    hio_setcb_close(connio, on_close);
+
+    // https
+    if (req->https) {
+        hio_enable_ssl(connio);
+    }
+
+    // new HttpContext
+    // delete on_close
+    HttpContext* ctx = new HttpContext;
+    ctx->io = connio;
+    ctx->req = req;
+    ctx->resp = resp;
+    ctx->cb = cb;
+    ctx->userdata = userdata;
+    hevent_set_userdata(connio, ctx);
+
+    // timeout
+    if (req->timeout != 0) {
+        ctx->timer = htimer_add(cli->loop_, on_timeout, req->timeout * 1000, 1);
+        assert(ctx->timer != NULL);
+        hevent_set_userdata(ctx->timer, ctx);
+    }
+
+    return hio_connect(connio);
+}
+
+int http_client_send_async(http_client_t* cli, HttpRequestPtr req, HttpResponsePtr resp,
+        HttpResponseCallback cb, void* userdata) {
+    if (!cli || !req || !resp) return ERR_NULL_POINTER;
+
+    if (req->url.empty() || *req->url.c_str() == '/') {
+        req->host = cli->host;
+        req->port = cli->port;
+        req->https = cli->https;
+    }
+
+    if (req->timeout == 0) {
+        req->timeout = cli->timeout;
+    }
+
+    for (auto& pair : cli->headers) {
+        if (req->headers.find(pair.first) == req->headers.end()) {
+            req->headers[pair.first] = pair.second;
+        }
+    }
+
+    return __http_client_send_async(cli, req, resp, cb, userdata);
+}
+
+int http_client_send_async(HttpRequestPtr req, HttpResponsePtr resp,
+        HttpResponseCallback cb, void* userdata) {
+    if (!req || !resp) return ERR_NULL_POINTER;
+
+    static http_client_t s_default_async_client;
+    return __http_client_send_async(&s_default_async_client, req, resp, cb, userdata);
+}

+ 15 - 3
http/client/http_client.h

@@ -12,7 +12,7 @@
 int main(int argc, char* argv[]) {
     HttpRequest req;
     req.method = HTTP_GET;
-    req.url = "http://ftp.sjtu.edu.cn/sites/ftp.kernel.org/pub/linux/kernel/";
+    req.url = "http://www.example.com";
     HttpResponse res;
     int ret = http_client_send(&req, &res);
     printf("%s\n", req.Dump(true,true).c_str());
@@ -35,14 +35,26 @@ HV_EXPORT const char* http_client_strerror(int errcode);
 
 HV_EXPORT int http_client_set_timeout(http_client_t* cli, int timeout);
 
+// common headers
 HV_EXPORT int http_client_clear_headers(http_client_t* cli);
 HV_EXPORT int http_client_set_header(http_client_t* cli, const char* key, const char* value);
 HV_EXPORT int http_client_del_header(http_client_t* cli, const char* key);
 HV_EXPORT const char* http_client_get_header(http_client_t* cli, const char* key);
 
-HV_EXPORT int http_client_send(http_client_t* cli, HttpRequest* req, HttpResponse* res);
+// sync
+HV_EXPORT int http_client_send(http_client_t* cli, HttpRequest* req, HttpResponse* resp);
 
+// async
+// Intern will start an event-loop thread when http_client_send_async first called,
+// http_client_del will destroy the thread.
+HV_EXPORT int http_client_send_async(http_client_t* cli, HttpRequestPtr req, HttpResponsePtr resp,
+                                    HttpResponseCallback cb = NULL, void* userdata = NULL);
+
+// top-level api
 // http_client_new -> http_client_send -> http_client_del
-HV_EXPORT int http_client_send(HttpRequest* req, HttpResponse* res, int timeout = DEFAULT_HTTP_TIMEOUT);
+HV_EXPORT int http_client_send(HttpRequest* req, HttpResponse* resp);
+// http_client_send_async(&default_async_client, ...)
+HV_EXPORT int http_client_send_async(HttpRequestPtr req, HttpResponsePtr resp,
+                                    HttpResponseCallback cb = NULL, void* userdata = NULL);
 
 #endif  // HTTP_CLIENT_H_

+ 7 - 6
http/client/requests.h

@@ -41,9 +41,14 @@ typedef std::shared_ptr<HttpResponse> Response;
 static http_headers DefaultHeaders;
 static http_body    NoBody;
 
+Response request(Request req) {
+    Response resp = Response(new HttpResponse);
+    int ret = http_client_send(req.get(), resp.get());
+    return ret ? NULL : resp;
+}
+
 Response request(http_method method, const char* url, const http_body& body = NoBody, const http_headers& headers = DefaultHeaders) {
     Request req = Request(new HttpRequest);
-    Response resp = Response(new HttpResponse);
     req->method = method;
     req->url = url;
     if (&body != &NoBody) {
@@ -52,11 +57,7 @@ Response request(http_method method, const char* url, const http_body& body = No
     if (&headers != &DefaultHeaders) {
         req->headers = headers;
     }
-    int ret = http_client_send(req.get(), resp.get());
-    if (ret != 0) {
-        return NULL;
-    }
-    return resp;
+    return request(req);
 }
 
 Response get(const char* url, const http_headers& headers = DefaultHeaders) {

+ 3 - 3
http/server/HttpHandler.h

@@ -12,15 +12,15 @@ public:
     int                     port;
     // for handle_request
     HttpService*            service;
-    HttpParser*             parser;
     FileCache*              files;
+    file_cache_t*           fc;
+
     HttpRequest             req;
     HttpResponse            res;
-    file_cache_t*           fc;
+    HttpParserPtr           parser;
 
     HttpHandler() {
         service = NULL;
-        parser = NULL;
         files = NULL;
         fc = NULL;
     }

+ 6 - 6
http/server/HttpServer.cpp

@@ -4,9 +4,10 @@
 #include "hmain.h"
 #include "hloop.h"
 
-#include "http2def.h"
 #include "FileCache.h"
 #include "HttpHandler.h"
+
+#include "http2def.h"
 #include "Http2Parser.h"
 
 #define MIN_HTTP_REQUEST        "GET / HTTP/1.1\r\n\r\n"
@@ -47,7 +48,7 @@ static void on_recv(hio_t* io, void* _buf, int readbytes) {
             handler->req.http_major = 2;
             handler->req.http_minor = 0;
         }
-        handler->parser = HttpParser::New(HTTP_SERVER, version);
+        handler->parser = HttpParserPtr(HttpParser::New(HTTP_SERVER, version));
         if (handler->parser == NULL) {
             hloge("[%s:%d] unsupported HTTP%d", handler->ip, handler->port, (int)version);
             hio_close(io);
@@ -56,7 +57,7 @@ static void on_recv(hio_t* io, void* _buf, int readbytes) {
         handler->parser->InitRequest(&handler->req);
     }
 
-    HttpParser* parser = handler->parser;
+    HttpParser* parser = handler->parser.get();
     HttpRequest* req = &handler->req;
     HttpResponse* res = &handler->res;
 
@@ -100,13 +101,13 @@ static void on_recv(hio_t* io, void* _buf, int readbytes) {
             // h2/h2c
             if (strnicmp(iter_upgrade->second.c_str(), "h2", 2) == 0) {
                 hio_write(io, HTTP2_UPGRADE_RESPONSE, strlen(HTTP2_UPGRADE_RESPONSE));
-                SAFE_DELETE(handler->parser);
-                parser = handler->parser = HttpParser::New(HTTP_SERVER, HTTP_V2);
+                parser = HttpParser::New(HTTP_SERVER, HTTP_V2);
                 if (parser == NULL) {
                     hloge("[%s:%d] unsupported HTTP2", handler->ip, handler->port);
                     hio_close(io);
                     return;
                 }
+                handler->parser.reset(parser);
                 HttpRequest http1_req = *req;
                 parser->InitRequest(req);
                 *req = http1_req;
@@ -206,7 +207,6 @@ handle_request:
 static void on_close(hio_t* io) {
     HttpHandler* handler = (HttpHandler*)hevent_userdata(io);
     if (handler) {
-        SAFE_DELETE(handler->parser);
         delete handler;
         hevent_set_userdata(io, NULL);
     }