1
0
ithewei 6 жил өмнө
parent
commit
12716505e2

+ 3 - 0
README.md

@@ -92,3 +92,6 @@ hw 是一套跨平台c/c++基础组件,函数名/类名以h/H开头
 
 #### compile WITH_CURL
 - make DEFINES="WITH_CURL CURL_STATICLIB"
+
+### compile WITH_NGHTTP2
+- make DEFINES=WITH_NGHTTP2

+ 7 - 0
examples/curl.cpp

@@ -10,6 +10,7 @@
 #include "http_client.h"
 
 static int  http_version = 1;
+static int  grpc         = 0;
 static bool verbose = false;
 static const char* url = NULL;
 static const char* method = NULL;
@@ -25,6 +26,7 @@ static const struct option long_options[] = {
     {"header",  required_argument,  NULL,   'H'},
     {"data",    required_argument,  NULL,   'd'},
     {"http2",   no_argument,        &http_version, 2},
+    {"grpc",    no_argument,        &grpc,  1},
     {NULL,      0,                  NULL,   0}
 };
 static const char* help = R"(Options:
@@ -35,6 +37,7 @@ static const char* help = R"(Options:
     -H|--header         Add http headers, format -H "Content-Type:application/json Accept:*/*"
     -d|--data           Set http body.
        --http2          Use http2
+       --grpc           Use grpc over http2
 Examples:
     curl -v localhost:8086
     curl -v localhost:8086/v1/api/query?page_no=1&page_size=10
@@ -90,6 +93,10 @@ int main(int argc, char* argv[]) {
 
     int ret = 0;
     HttpRequest req;
+    if (grpc) {
+        http_version = 2;
+        req.content_type = APPLICATION_GRPC;
+    }
     if (http_version == 2) {
         req.http_major = 2;
         req.http_minor = 0;

+ 166 - 0
http/Http1Session.cpp

@@ -0,0 +1,166 @@
+#include "Http1Session.h"
+
+static int on_url(http_parser* parser, const char *at, size_t length);
+static int on_status(http_parser* parser, const char *at, size_t length);
+static int on_header_field(http_parser* parser, const char *at, size_t length);
+static int on_header_value(http_parser* parser, const char *at, size_t length);
+static int on_body(http_parser* parser, const char *at, size_t length);
+static int on_message_begin(http_parser* parser);
+static int on_headers_complete(http_parser* parser);
+static int on_message_complete(http_parser* parser);
+
+Http1Session::Http1Session(http_session_type type) {
+    if (cbs == NULL) {
+        cbs = (http_parser_settings*)malloc(sizeof(http_parser_settings));
+        http_parser_settings_init(cbs);
+        cbs->on_message_begin    = on_message_begin;
+        cbs->on_url              = on_url;
+        cbs->on_status           = on_status;
+        cbs->on_header_field     = on_header_field;
+        cbs->on_header_value     = on_header_value;
+        cbs->on_headers_complete = on_headers_complete;
+        cbs->on_body             = on_body;
+        cbs->on_message_complete = on_message_complete;
+    }
+    http_parser_init(&parser, HTTP_BOTH);
+    parser.data = this;
+    state = HP_START_REQ_OR_RES;
+    submited = NULL;
+    parsed = NULL;
+}
+
+Http1Session::~Http1Session() {
+}
+
+int Http1Session::GetSendData(char** data, size_t* len) {
+    if (!submited) {
+        *data = NULL;
+        *len = 0;
+        return 0;
+    }
+    sendbuf = submited->Dump(true, true);
+    submited = NULL;
+    *data = (char*)sendbuf.data();
+    *len = sendbuf.size();
+    return sendbuf.size();
+}
+
+int Http1Session::FeedRecvData(const char* data, size_t len) {
+    return http_parser_execute(&parser, cbs, data, len);
+}
+
+bool Http1Session::WantRecv() {
+    return state != HP_MESSAGE_COMPLETE;
+}
+
+int Http1Session::SubmitRequest(HttpRequest* req) {
+    submited = req;
+    return 0;
+}
+
+int Http1Session::SubmitResponse(HttpResponse* res) {
+    submited = res;
+    return 0;
+}
+
+int Http1Session::InitRequest(HttpRequest* req) {
+    req->Reset();
+    parsed = req;
+    http_parser_init(&parser, HTTP_REQUEST);
+    return 0;
+}
+
+int Http1Session::InitResponse(HttpResponse* res) {
+    res->Reset();
+    parsed = res;
+    http_parser_init(&parser, HTTP_RESPONSE);
+    return 0;
+}
+
+int Http1Session::GetError() {
+    return parser.http_errno;
+}
+
+const char* Http1Session::StrError(int error) {
+    return http_errno_description((enum http_errno)error);
+}
+
+http_parser_settings* Http1Session::cbs = NULL;
+
+int on_url(http_parser* parser, const char *at, size_t length) {
+    printd("on_url:%.*s\n", (int)length, at);
+    Http1Session* hss = (Http1Session*)parser->data;
+    hss->state = HP_URL;
+    hss->url.insert(hss->url.size(), at, length);
+    return 0;
+}
+
+int on_status(http_parser* parser, const char *at, size_t length) {
+    printd("on_status:%.*s\n", (int)length, at);
+    Http1Session* hss = (Http1Session*)parser->data;
+    hss->state = HP_STATUS;
+    return 0;
+}
+
+int on_header_field(http_parser* parser, const char *at, size_t length) {
+    printd("on_header_field:%.*s\n", (int)length, at);
+    Http1Session* hss = (Http1Session*)parser->data;
+    hss->handle_header();
+    hss->state = HP_HEADER_FIELD;
+    hss->header_field.insert(hss->header_field.size(), at, length);
+    return 0;
+}
+
+int on_header_value(http_parser* parser, const char *at, size_t length) {
+    printd("on_header_value:%.*s""\n", (int)length, at);
+    Http1Session* hss = (Http1Session*)parser->data;
+    hss->state = HP_HEADER_VALUE;
+    hss->header_value.insert(hss->header_value.size(), at, length);
+    return 0;
+}
+
+int on_body(http_parser* parser, const char *at, size_t length) {
+    //printd("on_body:%.*s""\n", (int)length, at);
+    Http1Session* hss = (Http1Session*)parser->data;
+    hss->state = HP_BODY;
+    hss->parsed->body.insert(hss->parsed->body.size(), at, length);
+    return 0;
+}
+
+int on_message_begin(http_parser* parser) {
+    printd("on_message_begin\n");
+    Http1Session* hss = (Http1Session*)parser->data;
+    hss->state = HP_MESSAGE_BEGIN;
+    return 0;
+}
+
+int on_headers_complete(http_parser* parser) {
+    printd("on_headers_complete\n");
+    Http1Session* hss = (Http1Session*)parser->data;
+    hss->handle_header();
+    auto iter = hss->parsed->headers.find("content-type");
+    if (iter != hss->parsed->headers.end()) {
+        hss->parsed->content_type = http_content_type_enum(iter->second.c_str());
+    }
+    hss->parsed->http_major = parser->http_major;
+    hss->parsed->http_minor = parser->http_minor;
+    if (hss->parsed->type == HTTP_REQUEST) {
+        HttpRequest* req = (HttpRequest*)hss->parsed;
+        req->method = (http_method)parser->method;
+        req->url = hss->url;
+    }
+    else if (hss->parsed->type == HTTP_RESPONSE) {
+        HttpResponse* res = (HttpResponse*)hss->parsed;
+        res->status_code = (http_status)parser->status_code;
+    }
+    hss->state = HP_HEADERS_COMPLETE;
+    return 0;
+}
+
+int on_message_complete(http_parser* parser) {
+    printd("on_message_complete\n");
+    Http1Session* hss = (Http1Session*)parser->data;
+    hss->state = HP_MESSAGE_COMPLETE;
+    return 0;
+}
+

+ 60 - 0
http/Http1Session.h

@@ -0,0 +1,60 @@
+#ifndef HTTP1_SESSION_H_
+#define HTTP1_SESSION_H_
+
+#include "HttpSession.h"
+#include "http_parser.h"
+
+enum http_parser_state {
+    HP_START_REQ_OR_RES,
+    HP_MESSAGE_BEGIN, HP_URL,
+    HP_STATUS,
+    HP_HEADER_FIELD,
+    HP_HEADER_VALUE,
+    HP_HEADERS_COMPLETE,
+    HP_BODY,
+    HP_MESSAGE_COMPLETE
+};
+
+class Http1Session : public HttpSession {
+public:
+    static http_parser_settings*    cbs;
+    http_parser                     parser;
+    http_parser_state               state;
+    HttpPayload*                    submited;
+    HttpPayload*                    parsed;
+    // tmp
+    std::string url;          // for on_url
+    std::string header_field; // for on_header_field
+    std::string header_value; // for on_header_value
+    std::string sendbuf;      // for GetSendData
+
+    Http1Session(http_session_type type = HTTP_CLIENT);
+    virtual ~Http1Session();
+
+    void handle_header() {
+        if (header_field.size() != 0 && header_value.size() != 0) {
+            parsed->headers[header_field] = header_value;
+            header_field.clear();
+            header_value.clear();
+        }
+    }
+
+    virtual int GetSendData(char** data, size_t* len);
+    virtual int FeedRecvData(const char* data, size_t len);
+    virtual bool WantRecv();
+
+    // client
+    // SubmitRequest -> while(GetSendData) {send} -> InitResponse -> do {recv -> FeedRecvData} while(WantRecv)
+    virtual int SubmitRequest(HttpRequest* req);
+    virtual int InitResponse(HttpResponse* res);
+
+    // server
+    // InitRequest -> do {recv -> FeedRecvData} while(WantRecv) -> SubmitResponse -> while(GetSendData) {send}
+    virtual int InitRequest(HttpRequest* req);
+    virtual int SubmitResponse(HttpResponse* res);
+
+    virtual int GetError();
+    virtual const char* StrError(int error);
+};
+
+#endif // HTTP1_SESSION_H_

+ 373 - 0
http/Http2Session.cpp

@@ -0,0 +1,373 @@
+#ifdef WITH_NGHTTP2
+
+#include "Http2Session.h"
+
+static nghttp2_nv make_nv(const char* name, const char* value) {
+    nghttp2_nv nv;
+    nv.name = (uint8_t*)name;
+    nv.value = (uint8_t*)value;
+    nv.namelen = strlen(name);
+    nv.valuelen = strlen(value);
+    nv.flags = NGHTTP2_NV_FLAG_NONE;
+    return nv;
+}
+
+static nghttp2_nv make_nv2(const char* name, const char* value,
+        int namelen, int valuelen) {
+    nghttp2_nv nv;
+    nv.name = (uint8_t*)name;
+    nv.value = (uint8_t*)value;
+    nv.namelen = namelen;
+    nv.valuelen = valuelen;
+    nv.flags = NGHTTP2_NV_FLAG_NONE;
+    return nv;
+}
+
+static void print_frame_hd(const nghttp2_frame_hd* hd) {
+    printd("[frame] length=%d type=%x flags=%x stream_id=%d\n",
+        (int)hd->length, (int)hd->type, (int)hd->flags, hd->stream_id);
+}
+static int on_header_callback(nghttp2_session *session,
+        const nghttp2_frame *frame,
+        const uint8_t *name, size_t namelen,
+        const uint8_t *value, size_t valuelen,
+        uint8_t flags, void *userdata);
+static int on_data_chunk_recv_callback(nghttp2_session *session,
+        uint8_t flags, int32_t stream_id, const uint8_t *data,
+        size_t len, void *userdata);
+static int on_frame_recv_callback(nghttp2_session *session,
+        const nghttp2_frame *frame, void *userdata);
+/*
+static ssize_t data_source_read_callback(nghttp2_session *session,
+        int32_t stream_id, uint8_t *buf, size_t length,
+        uint32_t *data_flags, nghttp2_data_source *source, void *userdata);
+*/
+
+
+Http2Session::Http2Session(http_session_type type) {
+    this->type = type;
+    if (cbs == NULL) {
+        nghttp2_session_callbacks_new(&cbs);
+        nghttp2_session_callbacks_set_on_header_callback(cbs, on_header_callback);
+        nghttp2_session_callbacks_set_on_data_chunk_recv_callback(cbs, on_data_chunk_recv_callback);
+        nghttp2_session_callbacks_set_on_frame_recv_callback(cbs, on_frame_recv_callback);
+    }
+    if (type == HTTP_CLIENT) {
+        nghttp2_session_client_new(&session, cbs, NULL);
+        state = HSS_SEND_MAGIC;
+    }
+    else if (type == HTTP_SERVER) {
+        nghttp2_session_server_new(&session, cbs, NULL);
+        state = HSS_SEND_SETTINGS;
+    }
+    nghttp2_session_set_user_data(session, this);
+    submited = NULL;
+    parsed = NULL;
+    stream_id = -1;
+    stream_closed = 0;
+
+    nghttp2_settings_entry settings[] = {
+        {NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 100}
+    };
+    nghttp2_submit_settings(session, NGHTTP2_FLAG_NONE, settings, ARRAY_SIZE(settings));
+    state = HSS_SEND_SETTINGS;
+}
+
+Http2Session::~Http2Session() {
+    if (session) {
+        nghttp2_session_del(session);
+        session = NULL;
+    }
+}
+
+int Http2Session::GetSendData(char** data, size_t* len) {
+    // HTTP2_MAGIC,HTTP2_SETTINGS,HTTP2_HEADERS
+    *len = nghttp2_session_mem_send(session, (const uint8_t**)data);
+    printd("nghttp2_session_mem_send %d\n", *len);
+    if (*len != 0) return *len;
+
+    if (submited == NULL) return 0;
+    // HTTP2_DATA
+    if (state == HSS_SEND_HEADERS) {
+        void* content = submited->Content();
+        int content_length = submited->ContentLength();
+        // HTTP2 DATA framehd
+        state = HSS_SEND_DATA_FRAME_HD;
+        http2_frame_hd  framehd;
+        framehd.length = content_length;
+        framehd.type = HTTP2_DATA;
+        framehd.flags = HTTP2_FLAG_END_STREAM;
+        framehd.stream_id = stream_id;
+        *data = (char*)frame_hdbuf;
+        *len = HTTP2_FRAME_HDLEN;
+        printd("HTTP2 DATA framehd-------------------\n");
+        if (submited->ContentType() == APPLICATION_GRPC) {
+            printd("grpc DATA framehd-----------------\n");
+            if (type == HTTP_SERVER) {
+                // grpc server send grpc_status in HTTP2 header frame
+                framehd.flags = HTTP2_FLAG_NONE;
+            }
+            framehd.length += GRPC_MESSAGE_HDLEN;
+            grpc_message_hd msghd;
+            msghd.flags = 0;
+            msghd.length = content_length;
+            grpc_message_hd_pack(&msghd, frame_hdbuf + HTTP2_FRAME_HDLEN);
+            *len = HTTP2_FRAME_HDLEN + GRPC_MESSAGE_HDLEN;
+        }
+        http2_frame_hd_pack(&framehd, frame_hdbuf);
+    }
+    else if (state == HSS_SEND_DATA_FRAME_HD) {
+        // HTTP2 DATA
+        printd("HTTP2 DATA-------------------\n");
+        void* content = submited->Content();
+        int content_length = submited->ContentLength();
+        if (content_length == 0) {
+            state = HSS_SEND_DONE;
+        }
+        else {
+            state = HSS_SEND_DATA;
+            *data = (char*)content;
+            *len = content_length;
+        }
+    }
+    else if (state == HSS_SEND_DATA) {
+        state = HSS_SEND_DONE;
+        if (submited->ContentType() == APPLICATION_GRPC) {
+            if (type == HTTP_SERVER) {
+                // grpc HEADERS grpc-status
+                printd("grpc HEADERS grpc-status-----------------\n");
+                int flags = NGHTTP2_FLAG_END_STREAM | NGHTTP2_FLAG_END_HEADERS;
+                nghttp2_nv nv = make_nv("grpc_status", "0");
+                nghttp2_submit_headers(session, flags, stream_id, NULL, &nv, 1, NULL);
+                *len = nghttp2_session_mem_send(session, (const uint8_t**)data);
+            }
+        }
+    }
+
+    printd("GetSendData %d\n", *len);
+    return *len;
+}
+
+int Http2Session::FeedRecvData(const char* data, size_t len) {
+    printd("nghttp2_session_mem_recv %d\n", len);
+    size_t ret = nghttp2_session_mem_recv(session, (const uint8_t*)data, len);
+    if (ret != len) {
+        error = ret;
+    }
+    return (int)ret;
+}
+
+bool Http2Session::WantRecv() {
+    return stream_id == -1 || stream_closed == 0;
+}
+
+int Http2Session::SubmitRequest(HttpRequest* req) {
+    submited = req;
+
+    std::vector<nghttp2_nv> nvs;
+    char c_str[256] = {0};
+    req->ParseUrl();
+    nvs.push_back(make_nv(":method", http_method_str(req->method)));
+    nvs.push_back(make_nv(":path", req->path.c_str()));
+    nvs.push_back(make_nv(":scheme", req->https ? "https" : "http"));
+    if (req->port == 0 ||
+        req->port == DEFAULT_HTTP_PORT ||
+        req->port == DEFAULT_HTTPS_PORT) {
+        nvs.push_back(make_nv(":authority", req->host.c_str()));
+    }
+    else {
+        snprintf(c_str, sizeof(c_str), "%s:%d", req->host.c_str(), req->port);
+        nvs.push_back(make_nv(":authority", c_str));
+    }
+    req->FillContentType();
+    req->FillContentLength();
+    const char* name;
+    const char* value;
+    for (auto& header : req->headers) {
+        name = header.first.c_str();
+        value = header.second.c_str();
+        strlower((char*)name);
+        if (strcmp(name, "connection") == 0) {
+            // HTTP2 default keep-alive
+            continue;
+        }
+        if (strcmp(name, "content-length") == 0) {
+            // HTTP2 have frame_hd.length
+            continue;
+        }
+        nvs.push_back(make_nv2(name, value, header.first.size(), header.second.size()));
+    }
+    int flags = NGHTTP2_FLAG_END_HEADERS;
+    // we set EOS on DATA frame
+    stream_id = nghttp2_submit_headers(session, flags, -1, NULL, &nvs[0], nvs.size(), NULL);
+    // avoid DATA_SOURCE_COPY, we do not use nghttp2_submit_data
+    // nghttp2_data_provider data_prd;
+    // data_prd.read_callback = data_source_read_callback;
+    //stream_id = nghttp2_submit_request(session, NULL, &nvs[0], nvs.size(), &data_prd, NULL);
+    stream_closed = 0;
+    state = HSS_SEND_HEADERS;
+    return 0;
+}
+
+int Http2Session::SubmitResponse(HttpResponse* res) {
+    submited = res;
+
+    std::vector<nghttp2_nv> nvs;
+    char c_str[256] = {0};
+    snprintf(c_str, sizeof(c_str), "%d", res->status_code);
+    nvs.push_back(make_nv(":status", c_str));
+    res->FillContentType();
+    res->FillContentLength();
+
+    const char* name;
+    const char* value;
+    if (parsed && parsed->ContentType() == APPLICATION_GRPC) {
+        // correct content_type: application/grpc
+        if (res->ContentType() != APPLICATION_GRPC) {
+            res->content_type = APPLICATION_GRPC;
+            res->headers["content-type"] = http_content_type_str(APPLICATION_GRPC);
+        }
+    }
+    for (auto& header : res->headers) {
+        name = header.first.c_str();
+        value = header.second.c_str();
+        strlower((char*)name);
+        if (strcmp(name, "connection") == 0) {
+            // HTTP2 default keep-alive
+            continue;
+        }
+        if (strcmp(name, "content-length") == 0) {
+            // HTTP2 have frame_hd.length
+            continue;
+        }
+        nvs.push_back(make_nv2(name, value, header.first.size(), header.second.size()));
+    }
+    int flags = NGHTTP2_FLAG_END_HEADERS;
+    // we set EOS on DATA frame
+    if (stream_id == -1) {
+        // upgrade
+        nghttp2_session_upgrade(session, NULL, 0, NULL);
+        stream_id = 1;
+    }
+    nghttp2_submit_headers(session, flags, stream_id, NULL, &nvs[0], nvs.size(), NULL);
+    // avoid DATA_SOURCE_COPY, we do not use nghttp2_submit_data
+    // data_prd.read_callback = data_source_read_callback;
+    //stream_id = nghttp2_submit_request(session, NULL, &nvs[0], nvs.size(), &data_prd, NULL);
+    //nghttp2_submit_response(session, stream_id, &nvs[0], nvs.size(), &data_prd);
+    stream_closed = 0;
+    state = HSS_SEND_HEADERS;
+    return 0;
+}
+
+int Http2Session::InitResponse(HttpResponse* res) {
+    res->Reset();
+    res->http_major = 2;
+    res->http_minor = 0;
+    parsed = res;
+    return 0;
+}
+
+int Http2Session::InitRequest(HttpRequest* req) {
+    req->Reset();
+    req->http_major = 2;
+    req->http_minor = 0;
+    parsed = req;
+    return 0;
+}
+
+int Http2Session::GetError() {
+    return error;
+}
+
+const char* Http2Session::StrError(int error) {
+    return nghttp2_http2_strerror(error);
+}
+
+nghttp2_session_callbacks* Http2Session::cbs = NULL;
+
+int on_header_callback(nghttp2_session *session,
+    const nghttp2_frame *frame,
+    const uint8_t *_name, size_t namelen,
+    const uint8_t *_value, size_t valuelen,
+    uint8_t flags, void *userdata) {
+    printd("on_header_callback\n");
+    print_frame_hd(&frame->hd);
+    const char* name = (const char*)_name;
+    const char* value = (const char*)_value;
+    printd("%s: %s\n", name, value);
+    Http2Session* hss = (Http2Session*)userdata;
+    if (*name == ':') {
+        if (hss->parsed->type == HTTP_REQUEST) {
+            // :method :path :scheme :authority
+            HttpRequest* req = (HttpRequest*)hss->parsed;
+            if (strcmp(name, ":method") == 0) {
+                req->method = http_method_enum(value);
+            }
+            else if (strcmp(name, ":path") == 0) {
+                req->url = value;
+            }
+            else if (strcmp(name, ":scheme") == 0) {
+                req->headers["Scheme"] = value;
+            }
+            else if (strcmp(name, ":authority") == 0) {
+                req->headers["Host"] = value;
+            }
+        }
+        else if (hss->parsed->type == HTTP_RESPONSE) {
+            HttpResponse* res = (HttpResponse*)hss->parsed;
+            if (strcmp(name, ":status") == 0) {
+                res->status_code = (http_status)atoi(value);
+            }
+        }
+    }
+    else {
+        hss->parsed->headers[name] = value;
+        if (strcmp(name, "content-type") == 0) {
+            hss->parsed->content_type = http_content_type_enum(value);
+        }
+    }
+    return 0;
+}
+
+int on_data_chunk_recv_callback(nghttp2_session *session,
+    uint8_t flags, int32_t stream_id, const uint8_t *data,
+    size_t len, void *userdata) {
+    printd("on_data_chunk_recv_callback\n");
+    printd("stream_id=%d length=%d\n", stream_id, (int)len);
+    //printd("%.*s\n", (int)len, data);
+    Http2Session* hss = (Http2Session*)userdata;
+
+    if (hss->parsed->ContentType() == APPLICATION_GRPC) {
+        // grpc_message_hd
+        printd("grpc Length-Prefixed-Message-----------------\n");
+        if (len >= GRPC_MESSAGE_HDLEN) {
+            data += GRPC_MESSAGE_HDLEN;
+            len -= GRPC_MESSAGE_HDLEN;
+        }
+    }
+    hss->parsed->body.insert(hss->parsed->body.size(), (const char*)data, len);
+    return 0;
+}
+
+int on_frame_recv_callback(nghttp2_session *session,
+    const nghttp2_frame *frame, void *userdata) {
+    printd("on_frame_recv_callback\n");
+    print_frame_hd(&frame->hd);
+    Http2Session* hss = (Http2Session*)userdata;
+    switch (frame->hd.type) {
+    case NGHTTP2_DATA:
+    case NGHTTP2_HEADERS:
+        hss->stream_id = frame->hd.stream_id;
+        if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM) {
+            printd("on_stream_closed stream_id=%d\n", hss->stream_id);
+            hss->stream_closed = 1;
+        }
+        break;
+    default:
+        break;
+    }
+
+    return 0;
+}
+
+#endif

+ 57 - 0
http/Http2Session.h

@@ -0,0 +1,57 @@
+#ifndef HTTP2_SESSION_H_
+#define HTTP2_SESSION_H_
+
+#ifdef WITH_NGHTTP2
+#include "HttpSession.h"
+#include "http2def.h"
+#include "grpcdef.h"
+
+#include "nghttp2/nghttp2.h"
+
+enum http2_session_state {
+    HSS_SEND_MAGIC,
+    HSS_SEND_SETTINGS,
+    HSS_SEND_HEADERS,
+    HSS_SEND_DATA_FRAME_HD,
+    HSS_SEND_DATA,
+    HSS_SEND_DONE
+};
+
+class Http2Session : public HttpSession {
+public:
+    http_session_type               type;
+    static nghttp2_session_callbacks* cbs;
+    nghttp2_session*                session;
+    http2_session_state             state;
+    HttpPayload*                    submited;
+    HttpPayload*                    parsed;
+    int error;
+    int stream_id;
+    int stream_closed;
+    // http2_frame_hd + grpc_message_hd
+    unsigned char                   frame_hdbuf[HTTP2_FRAME_HDLEN+GRPC_MESSAGE_HDLEN];
+
+    Http2Session(http_session_type type = HTTP_CLIENT);
+    virtual ~Http2Session();
+
+    virtual int GetSendData(char** data, size_t* len);
+    virtual int FeedRecvData(const char* data, size_t len);
+    virtual bool WantRecv();
+
+    // client
+    // SubmitRequest -> while(GetSendData) {send} -> InitResponse -> do {recv -> FeedRecvData} while(WantRecv)
+    virtual int SubmitRequest(HttpRequest* req);
+    virtual int InitResponse(HttpResponse* res);
+
+    // server
+    // InitRequest -> do {recv -> FeedRecvData} while(WantRecv) -> SubmitResponse -> while(GetSendData) {send}
+    virtual int InitRequest(HttpRequest* req);
+    virtual int SubmitResponse(HttpResponse* res);
+
+    virtual int GetError();
+    virtual const char* StrError(int error);
+};
+
+#endif
+
+#endif // HTTP2_SESSION_H_

+ 3 - 532
http/HttpSession.cpp

@@ -1,540 +1,11 @@
 #include "HttpSession.h"
 
-#include "http_parser.h"
-static int on_url(http_parser* parser, const char *at, size_t length);
-static int on_status(http_parser* parser, const char *at, size_t length);
-static int on_header_field(http_parser* parser, const char *at, size_t length);
-static int on_header_value(http_parser* parser, const char *at, size_t length);
-static int on_body(http_parser* parser, const char *at, size_t length);
-static int on_message_begin(http_parser* parser);
-static int on_headers_complete(http_parser* parser);
-static int on_message_complete(http_parser* parser);
-
-enum http_parser_state {
-    HP_START_REQ_OR_RES,
-    HP_MESSAGE_BEGIN, HP_URL,
-    HP_STATUS,
-    HP_HEADER_FIELD,
-    HP_HEADER_VALUE,
-    HP_HEADERS_COMPLETE,
-    HP_BODY,
-    HP_MESSAGE_COMPLETE
-};
-
-class Http1Session : public HttpSession {
-public:
-    static http_parser_settings*    cbs;
-    http_parser                     parser;
-    http_parser_state               state;
-    HttpPayload*                    submited;
-    HttpPayload*                    parsed;
-    // tmp
-    std::string url;          // for on_url
-    std::string header_field; // for on_header_field
-    std::string header_value; // for on_header_value
-    std::string sendbuf;      // for GetSendData
-
-    void handle_header() {
-        if (header_field.size() != 0 && header_value.size() != 0) {
-            parsed->headers[header_field] = header_value;
-            header_field.clear();
-            header_value.clear();
-        }
-    }
-
-    Http1Session() {
-        if (cbs == NULL) {
-            cbs = (http_parser_settings*)malloc(sizeof(http_parser_settings));
-            http_parser_settings_init(cbs);
-            cbs->on_message_begin    = on_message_begin;
-            cbs->on_url              = on_url;
-            cbs->on_status           = on_status;
-            cbs->on_header_field     = on_header_field;
-            cbs->on_header_value     = on_header_value;
-            cbs->on_headers_complete = on_headers_complete;
-            cbs->on_body             = on_body;
-            cbs->on_message_complete = on_message_complete;
-        }
-        http_parser_init(&parser, HTTP_BOTH);
-        parser.data = this;
-        state = HP_START_REQ_OR_RES;
-        submited = NULL;
-        parsed = NULL;
-    }
-
-    virtual ~Http1Session() {
-    }
-
-    virtual int GetSendData(char** data, size_t* len) {
-        if (!submited) {
-            *data = NULL;
-            *len = 0;
-            return 0;
-        }
-        sendbuf = submited->Dump(true, true);
-        submited = NULL;
-        *data = (char*)sendbuf.data();
-        *len = sendbuf.size();
-        return sendbuf.size();
-    }
-
-    virtual int FeedRecvData(const char* data, size_t len) {
-        return http_parser_execute(&parser, cbs, data, len);
-    }
-
-    virtual bool WantRecv() {
-        return state != HP_MESSAGE_COMPLETE;
-    }
-
-    virtual int SubmitRequest(HttpRequest* req) {
-        submited = req;
-        return 0;
-    }
-
-    virtual int SubmitResponse(HttpResponse* res) {
-        submited = res;
-        return 0;
-    }
-
-    virtual int InitRequest(HttpRequest* req) {
-        req->Reset();
-        parsed = req;
-        http_parser_init(&parser, HTTP_REQUEST);
-        return 0;
-    }
-
-    virtual int InitResponse(HttpResponse* res) {
-        res->Reset();
-        parsed = res;
-        http_parser_init(&parser, HTTP_RESPONSE);
-        return 0;
-    }
-
-    virtual int GetError() {
-        return parser.http_errno;
-    }
-
-    virtual const char* StrError(int error) {
-        return http_errno_description((enum http_errno)error);
-    }
-};
-
-http_parser_settings* Http1Session::cbs = NULL;
-
-int on_url(http_parser* parser, const char *at, size_t length) {
-    printd("on_url:%.*s\n", (int)length, at);
-    Http1Session* hss = (Http1Session*)parser->data;
-    hss->state = HP_URL;
-    hss->url.insert(hss->url.size(), at, length);
-    return 0;
-}
-
-int on_status(http_parser* parser, const char *at, size_t length) {
-    printd("on_status:%.*s\n", (int)length, at);
-    Http1Session* hss = (Http1Session*)parser->data;
-    hss->state = HP_STATUS;
-    return 0;
-}
-
-int on_header_field(http_parser* parser, const char *at, size_t length) {
-    printd("on_header_field:%.*s\n", (int)length, at);
-    Http1Session* hss = (Http1Session*)parser->data;
-    hss->handle_header();
-    hss->state = HP_HEADER_FIELD;
-    hss->header_field.insert(hss->header_field.size(), at, length);
-    return 0;
-}
-
-int on_header_value(http_parser* parser, const char *at, size_t length) {
-    printd("on_header_value:%.*s""\n", (int)length, at);
-    Http1Session* hss = (Http1Session*)parser->data;
-    hss->state = HP_HEADER_VALUE;
-    hss->header_value.insert(hss->header_value.size(), at, length);
-    return 0;
-}
-
-int on_body(http_parser* parser, const char *at, size_t length) {
-    //printd("on_body:%.*s""\n", (int)length, at);
-    Http1Session* hss = (Http1Session*)parser->data;
-    hss->state = HP_BODY;
-    hss->parsed->body.insert(hss->parsed->body.size(), at, length);
-    return 0;
-}
-
-int on_message_begin(http_parser* parser) {
-    printd("on_message_begin\n");
-    Http1Session* hss = (Http1Session*)parser->data;
-    hss->state = HP_MESSAGE_BEGIN;
-    return 0;
-}
-
-int on_headers_complete(http_parser* parser) {
-    printd("on_headers_complete\n");
-    Http1Session* hss = (Http1Session*)parser->data;
-    hss->handle_header();
-    auto iter = hss->parsed->headers.find("content-type");
-    if (iter != hss->parsed->headers.end()) {
-        hss->parsed->content_type = http_content_type_enum(iter->second.c_str());
-    }
-    hss->parsed->http_major = parser->http_major;
-    hss->parsed->http_minor = parser->http_minor;
-    if (hss->parsed->type == HTTP_REQUEST) {
-        HttpRequest* req = (HttpRequest*)hss->parsed;
-        req->method = (http_method)parser->method;
-        req->url = hss->url;
-    }
-    else if (hss->parsed->type == HTTP_RESPONSE) {
-        HttpResponse* res = (HttpResponse*)hss->parsed;
-        res->status_code = (http_status)parser->status_code;
-    }
-    hss->state = HP_HEADERS_COMPLETE;
-    return 0;
-}
-
-int on_message_complete(http_parser* parser) {
-    printd("on_message_complete\n");
-    Http1Session* hss = (Http1Session*)parser->data;
-    hss->state = HP_MESSAGE_COMPLETE;
-    return 0;
-}
-
-#ifdef WITH_NGHTTP2
-#include "nghttp2/nghttp2.h"
-#include "http2def.h"
-static nghttp2_nv make_nv(const char* name, const char* value) {
-    nghttp2_nv nv;
-    nv.name = (uint8_t*)name;
-    nv.value = (uint8_t*)value;
-    nv.namelen = strlen(name);
-    nv.valuelen = strlen(value);
-    nv.flags = NGHTTP2_NV_FLAG_NONE;
-    return nv;
-}
-
-static nghttp2_nv make_nv2(const char* name, const char* value,
-        int namelen, int valuelen) {
-    nghttp2_nv nv;
-    nv.name = (uint8_t*)name;
-    nv.value = (uint8_t*)value;
-    nv.namelen = namelen;
-    nv.valuelen = valuelen;
-    nv.flags = NGHTTP2_NV_FLAG_NONE;
-    return nv;
-}
-
-static void print_frame_hd(const nghttp2_frame_hd* hd) {
-    printd("[frame] length=%d type=%x flags=%x stream_id=%d\n",
-        (int)hd->length, (int)hd->type, (int)hd->flags, hd->stream_id);
-}
-static int on_header_callback(nghttp2_session *session,
-        const nghttp2_frame *frame,
-        const uint8_t *name, size_t namelen,
-        const uint8_t *value, size_t valuelen,
-        uint8_t flags, void *userdata);
-static int on_data_chunk_recv_callback(nghttp2_session *session,
-        uint8_t flags, int32_t stream_id, const uint8_t *data,
-        size_t len, void *userdata);
-static int on_frame_recv_callback(nghttp2_session *session,
-        const nghttp2_frame *frame, void *userdata);
-/*
-static ssize_t data_source_read_callback(nghttp2_session *session,
-        int32_t stream_id, uint8_t *buf, size_t length,
-        uint32_t *data_flags, nghttp2_data_source *source, void *userdata);
-*/
-
-enum http2_session_state {
-    HSS_SEND_MAGIC,
-    HSS_SEND_SETTINGS,
-    HSS_SEND_HEADERS,
-    HSS_SEND_DATA_FRAME_HD,
-    HSS_SEND_DATA
-};
-
-class Http2Session : public HttpSession {
-public:
-    static nghttp2_session_callbacks* cbs;
-    nghttp2_session*                session;
-    http2_session_state             state;
-    HttpPayload*                    submited;
-    HttpPayload*                    parsed;
-    int error;
-    int stream_id;
-    int stream_closed;
-    unsigned char                   frame_hdbuf[HTTP2_FRAME_HDLEN];
-
-    Http2Session(http_session_type type) {
-        if (cbs == NULL) {
-            nghttp2_session_callbacks_new(&cbs);
-            nghttp2_session_callbacks_set_on_header_callback(cbs, on_header_callback);
-            nghttp2_session_callbacks_set_on_data_chunk_recv_callback(cbs, on_data_chunk_recv_callback);
-            nghttp2_session_callbacks_set_on_frame_recv_callback(cbs, on_frame_recv_callback);
-        }
-        if (type == HTTP_CLIENT) {
-            nghttp2_session_client_new(&session, cbs, NULL);
-            state = HSS_SEND_MAGIC;
-        }
-        else if (type == HTTP_SERVER) {
-            nghttp2_session_server_new(&session, cbs, NULL);
-            state = HSS_SEND_SETTINGS;
-        }
-        nghttp2_session_set_user_data(session, this);
-        submited = NULL;
-        parsed = NULL;
-        stream_id = -1;
-        stream_closed = 0;
-
-        nghttp2_settings_entry settings[] = {
-            {NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 100}
-        };
-        nghttp2_submit_settings(session, NGHTTP2_FLAG_NONE, settings, ARRAY_SIZE(settings));
-        state = HSS_SEND_SETTINGS;
-    }
-
-    virtual ~Http2Session() {
-        if (session) {
-            nghttp2_session_del(session);
-            session = NULL;
-        }
-    }
-
-    virtual int GetSendData(char** data, size_t* len) {
-        // HTTP2_MAGIC,HTTP2_SETTINGS,HTTP2_HEADERS
-        *len = nghttp2_session_mem_send(session, (const uint8_t**)data);
-        if (*len == 0) {
-            if (submited) {
-                void* content = submited->Content();
-                int content_length = submited->ContentLength();
-                if (content_length != 0) {
-                    if (state == HSS_SEND_HEADERS) {
-                        // HTTP2_DATA_FRAME_HD
-                        state = HSS_SEND_DATA_FRAME_HD;
-                        http2_frame_hd hd;
-                        hd.length = content_length;
-                        hd.type = HTTP2_DATA;
-                        hd.flags = HTTP2_FLAG_END_STREAM;
-                        hd.stream_id = stream_id;
-                        http2_frame_hd_pack(&hd, frame_hdbuf);
-                        *data = (char*)frame_hdbuf;
-                        *len = HTTP2_FRAME_HDLEN;
-                    }
-                    else if (state == HSS_SEND_DATA_FRAME_HD) {
-                        // HTTP2_DATA
-                        state = HSS_SEND_DATA;
-                        *data = (char*)content;
-                        *len = content_length;
-                    }
-                }
-            }
-        }
-        return *len;
-    }
-
-    virtual int FeedRecvData(const char* data, size_t len) {
-        size_t ret = nghttp2_session_mem_recv(session, (const uint8_t*)data, len);
-        if (ret != len) {
-            error = ret;
-        }
-        return (int)ret;
-    }
-
-    virtual bool WantRecv() {
-        return stream_id == -1 || stream_closed == 0;
-    }
-
-    virtual int SubmitRequest(HttpRequest* req) {
-        submited = req;
-
-        std::vector<nghttp2_nv> nvs;
-        char c_str[256] = {0};
-        req->ParseUrl();
-        nvs.push_back(make_nv(":method", http_method_str(req->method)));
-        nvs.push_back(make_nv(":path", req->path.c_str()));
-        nvs.push_back(make_nv(":scheme", req->https ? "https" : "http"));
-        if (req->port == 0 ||
-            req->port == DEFAULT_HTTP_PORT ||
-            req->port == DEFAULT_HTTPS_PORT) {
-            nvs.push_back(make_nv(":authority", req->host.c_str()));
-        }
-        else {
-            snprintf(c_str, sizeof(c_str), "%s:%d", req->host.c_str(), req->port);
-            nvs.push_back(make_nv(":authority", c_str));
-        }
-        req->FillContentType();
-        req->FillContentLength();
-        const char* name;
-        const char* value;
-        for (auto& header : req->headers) {
-            name = header.first.c_str();
-            value = header.second.c_str();
-            strlower((char*)name);
-            if (strcmp(name, "connection") == 0) {
-                // HTTP2 use stream
-                continue;
-            }
-            nvs.push_back(make_nv2(name, value, header.first.size(), header.second.size()));
-        }
-        int flags = NGHTTP2_FLAG_END_HEADERS;
-        if (req->ContentLength() == 0) {
-            flags |= NGHTTP2_FLAG_END_STREAM;
-        }
-        stream_id = nghttp2_submit_headers(session, flags, -1, NULL, &nvs[0], nvs.size(), NULL);
-        // avoid DATA_SOURCE_COPY, we do not use nghttp2_submit_data
-        // nghttp2_data_provider data_prd;
-        // data_prd.read_callback = data_source_read_callback;
-        //stream_id = nghttp2_submit_request(session, NULL, &nvs[0], nvs.size(), &data_prd, NULL);
-        stream_closed = 0;
-        state = HSS_SEND_HEADERS;
-        return 0;
-    }
-
-    virtual int SubmitResponse(HttpResponse* res) {
-        submited = res;
-
-        std::vector<nghttp2_nv> nvs;
-        char c_str[256] = {0};
-        snprintf(c_str, sizeof(c_str), "%d", res->status_code);
-        nvs.push_back(make_nv(":status", c_str));
-        res->FillContentType();
-        res->FillContentLength();
-
-        const char* name;
-        const char* value;
-        for (auto& header : res->headers) {
-            name = header.first.c_str();
-            value = header.second.c_str();
-            strlower((char*)name);
-            if (strcmp(name, "connection") == 0) {
-                // HTTP2 use stream
-                continue;
-            }
-            nvs.push_back(make_nv2(name, value, header.first.size(), header.second.size()));
-        }
-        int flags = NGHTTP2_FLAG_END_HEADERS;
-        if (res->ContentLength() == 0) {
-            flags |= NGHTTP2_FLAG_END_STREAM;
-        }
-        if (stream_id == -1) {
-            // upgrade
-            nghttp2_session_upgrade(session, NULL, 0, NULL);
-            stream_id = 1;
-        }
-        nghttp2_submit_headers(session, flags, stream_id, NULL, &nvs[0], nvs.size(), NULL);
-        // avoid DATA_SOURCE_COPY, we do not use nghttp2_submit_data
-        // data_prd.read_callback = data_source_read_callback;
-        //stream_id = nghttp2_submit_request(session, NULL, &nvs[0], nvs.size(), &data_prd, NULL);
-        //nghttp2_submit_response(session, stream_id, &nvs[0], nvs.size(), &data_prd);
-        stream_closed = 0;
-        state = HSS_SEND_HEADERS;
-        return 0;
-    }
-
-    virtual int InitResponse(HttpResponse* res) {
-        res->Reset();
-        res->http_major = 2;
-        res->http_minor = 0;
-        parsed = res;
-        return 0;
-    }
-
-    virtual int InitRequest(HttpRequest* req) {
-        req->Reset();
-        req->http_major = 2;
-        req->http_minor = 0;
-        parsed = req;
-        return 0;
-    }
-
-    virtual int GetError() {
-        return error;
-    }
-
-    virtual const char* StrError(int error) {
-        return nghttp2_http2_strerror(error);
-    }
-};
-
-nghttp2_session_callbacks* Http2Session::cbs = NULL;
-
-int on_header_callback(nghttp2_session *session,
-        const nghttp2_frame *frame,
-        const uint8_t *_name, size_t namelen,
-        const uint8_t *_value, size_t valuelen,
-        uint8_t flags, void *userdata) {
-    printd("on_header_callback\n");
-    print_frame_hd(&frame->hd);
-    const char* name = (const char*)_name;
-    const char* value = (const char*)_value;
-    printd("%s: %s\n", name, value);
-    Http2Session* hss = (Http2Session*)userdata;
-    if (*name == ':') {
-        if (hss->parsed->type == HTTP_REQUEST) {
-            // :method :path :scheme :authority
-            HttpRequest* req = (HttpRequest*)hss->parsed;
-            if (strcmp(name, ":method") == 0) {
-                req->method = http_method_enum(value);
-            }
-            else if (strcmp(name, ":path") == 0) {
-                req->url = value;
-            }
-            else if (strcmp(name, ":scheme") == 0) {
-                req->headers["Scheme"] = value;
-            }
-            else if (strcmp(name, ":authority") == 0) {
-                req->headers["Host"] = value;
-            }
-        }
-        else if (hss->parsed->type == HTTP_RESPONSE) {
-            HttpResponse* res = (HttpResponse*)hss->parsed;
-            if (strcmp(name, ":status") == 0) {
-                res->status_code = (http_status)atoi(value);
-            }
-        }
-    }
-    else {
-        hss->parsed->headers[name] = value;
-        if (strcmp(name, "content-type") == 0) {
-            hss->parsed->content_type = http_content_type_enum(value);
-        }
-    }
-    return 0;
-}
-
-int on_data_chunk_recv_callback(nghttp2_session *session,
-        uint8_t flags, int32_t stream_id, const uint8_t *data,
-        size_t len, void *userdata) {
-    printd("on_data_chunk_recv_callback\n");
-    printd("stream_id=%d length=%d\n", stream_id, (int)len);
-    //printd("%.*s\n", (int)len, data);
-    Http2Session* hss = (Http2Session*)userdata;
-    hss->parsed->body.insert(hss->parsed->body.size(), (const char*)data, len);
-    return 0;
-}
-
-static int on_frame_recv_callback(nghttp2_session *session,
-        const nghttp2_frame *frame, void *userdata) {
-    printd("on_frame_recv_callback\n");
-    print_frame_hd(&frame->hd);
-    Http2Session* hss = (Http2Session*)userdata;
-    switch (frame->hd.type) {
-    case NGHTTP2_DATA:
-    case NGHTTP2_HEADERS:
-        hss->stream_id = frame->hd.stream_id;
-        if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM) {
-            printd("on_stream_closed stream_id=%d\n", hss->stream_id);
-            hss->stream_closed = 1;
-        }
-        break;
-    default:
-        break;
-    }
-
-    return 0;
-}
-#endif
+#include "Http1Session.h"
+#include "Http2Session.h"
 
 HttpSession* HttpSession::New(http_session_type type, http_version version) {
     if (version == HTTP_V1) {
-        return new Http1Session;
+        return new Http1Session(type);
     }
     else if (version == HTTP_V2) {
 #ifdef WITH_NGHTTP2

+ 49 - 0
http/grpcdef.h

@@ -0,0 +1,49 @@
+#ifndef GRPC_DEF_H_
+#define GRPC_DEF_H_
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+// Length-Prefixed-Message
+
+// flags:1byte + length:4bytes = 5bytes
+#define GRPC_MESSAGE_HDLEN  5
+
+typedef struct {
+    unsigned char   flags;
+    int             length;
+} grpc_message_hd;
+
+typedef struct {
+    unsigned char   flags;
+    int             length;
+    unsigned char*  message;
+} grpc_message;
+
+static inline void grpc_message_hd_pack(const grpc_message_hd* hd, unsigned char* buf) {
+    // hton
+    int length = hd->length;
+    unsigned char* p = buf;
+    *p++ = hd->flags;
+    *p++ = (length >> 24) & 0xFF;
+    *p++ = (length >> 16) & 0xFF;
+    *p++ = (length >>  8) & 0xFF;
+    *p++ =  length        & 0xFF;
+}
+
+static inline void grpc_message_hd_unpack(const unsigned char* buf, grpc_message_hd* hd) {
+    // ntoh
+    const unsigned char* p = buf;
+    hd->flags = *p++;
+    hd->length  = *p++ << 24;
+    hd->length += *p++ << 16;
+    hd->length += *p++ << 8;
+    hd->length += *p++;
+}
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif // GRPC_DEF_H_