浏览代码

application/grpc

ithewei 6 年之前
父节点
当前提交
1c1b257818
共有 8 个文件被更改,包括 150 次插入122 次删除
  1. 13 0
      examples/http_api_test.h
  2. 2 62
      http/Http1Session.cpp
  3. 70 10
      http/Http1Session.h
  4. 14 42
      http/Http2Session.cpp
  5. 27 4
      http/Http2Session.h
  6. 16 0
      http/HttpSession.h
  7. 1 1
      http/client/http_client.cpp
  8. 7 3
      http/server/HttpServer.cpp

+ 13 - 0
examples/http_api_test.h

@@ -8,6 +8,7 @@
     XXX("/json",    POST,   http_api_json)      \
     XXX("/mp",      POST,   http_api_mp)        \
     XXX("/kv",      POST,   http_api_kv)        \
+    XXX("/grpc",    POST,   http_api_grpc)      \
     XXX("/query",   GET,    http_api_query)     \
     XXX("/echo",    POST,   http_api_echo)      \
 
@@ -42,6 +43,18 @@ inline int http_api_mp(HttpRequest* req, HttpResponse* res) {
     return 0;
 }
 
+inline int http_api_grpc(HttpRequest* req, HttpResponse* res) {
+    if (req->content_type != APPLICATION_GRPC) {
+        res->status_code = HTTP_STATUS_BAD_REQUEST;
+        return 0;
+    }
+    // parse protobuf: ParseFromString
+    // req->body;
+    // serailize protobuf: SerializeAsString
+    // res->body;
+    return 0;
+}
+
 inline int http_api_kv(HttpRequest*req, HttpResponse* res) {
     if (req->content_type != APPLICATION_URLENCODED) {
         res->status_code = HTTP_STATUS_BAD_REQUEST;

+ 2 - 62
http/Http1Session.cpp

@@ -9,6 +9,8 @@ static int on_message_begin(http_parser* parser);
 static int on_headers_complete(http_parser* parser);
 static int on_message_complete(http_parser* parser);
 
+http_parser_settings* Http1Session::cbs = NULL;
+
 Http1Session::Http1Session(http_session_type type) {
     if (cbs == NULL) {
         cbs = (http_parser_settings*)malloc(sizeof(http_parser_settings));
@@ -32,68 +34,6 @@ Http1Session::Http1Session(http_session_type type) {
 Http1Session::~Http1Session() {
 }
 
-int Http1Session::GetSendData(char** data, size_t* len) {
-    if (!submited) {
-        *data = NULL;
-        *len = 0;
-        return 0;
-    }
-    sendbuf = submited->Dump(true, true);
-    submited = NULL;
-    *data = (char*)sendbuf.data();
-    *len = sendbuf.size();
-    return sendbuf.size();
-}
-
-int Http1Session::FeedRecvData(const char* data, size_t len) {
-    return http_parser_execute(&parser, cbs, data, len);
-}
-
-bool Http1Session::WantRecv() {
-    return state != HP_MESSAGE_COMPLETE;
-}
-
-int Http1Session::SubmitRequest(HttpRequest* req) {
-    submited = req;
-    return 0;
-}
-
-int Http1Session::SubmitResponse(HttpResponse* res) {
-    submited = res;
-    return 0;
-}
-
-int Http1Session::InitRequest(HttpRequest* req) {
-    req->Reset();
-    parsed = req;
-    http_parser_init(&parser, HTTP_REQUEST);
-    state = HP_START_REQ_OR_RES;
-    url.clear();
-    header_field.clear();
-    header_value.clear();
-    return 0;
-}
-
-int Http1Session::InitResponse(HttpResponse* res) {
-    res->Reset();
-    parsed = res;
-    http_parser_init(&parser, HTTP_RESPONSE);
-    url.clear();
-    header_field.clear();
-    header_value.clear();
-    return 0;
-}
-
-int Http1Session::GetError() {
-    return parser.http_errno;
-}
-
-const char* Http1Session::StrError(int error) {
-    return http_errno_description((enum http_errno)error);
-}
-
-http_parser_settings* Http1Session::cbs = NULL;
-
 int on_url(http_parser* parser, const char *at, size_t length) {
     printd("on_url:%.*s\n", (int)length, at);
     Http1Session* hss = (Http1Session*)parser->data;

+ 70 - 10
http/Http1Session.h

@@ -6,7 +6,8 @@
 
 enum http_parser_state {
     HP_START_REQ_OR_RES,
-    HP_MESSAGE_BEGIN, HP_URL,
+    HP_MESSAGE_BEGIN,
+    HP_URL,
     HP_STATUS,
     HP_HEADER_FIELD,
     HP_HEADER_VALUE,
@@ -39,22 +40,81 @@ public:
         }
     }
 
-    virtual int GetSendData(char** data, size_t* len);
-    virtual int FeedRecvData(const char* data, size_t len);
-    virtual bool WantRecv();
+    virtual int GetSendData(char** data, size_t* len) {
+        if (!submited) {
+            *data = NULL;
+            *len = 0;
+            return 0;
+        }
+        sendbuf = submited->Dump(true, true);
+        submited = NULL;
+        *data = (char*)sendbuf.data();
+        *len = sendbuf.size();
+        return sendbuf.size();
+    }
+
+    virtual int FeedRecvData(const char* data, size_t len) {
+        return http_parser_execute(&parser, cbs, data, len);
+    }
+
+    virtual int  GetState() {
+        return (int)state;
+    }
+
+    virtual bool WantRecv() {
+        return state != HP_MESSAGE_COMPLETE;
+    }
+
+    virtual bool WantSend() {
+        return state == HP_MESSAGE_COMPLETE;
+    }
+
+    virtual bool IsComplete() {
+        return state == HP_MESSAGE_COMPLETE;
+    }
+
+    virtual int GetError() {
+        return parser.http_errno;
+    }
+
+    virtual const char* StrError(int error) {
+        return http_errno_description((enum http_errno)error);
+    }
 
     // client
     // SubmitRequest -> while(GetSendData) {send} -> InitResponse -> do {recv -> FeedRecvData} while(WantRecv)
-    virtual int SubmitRequest(HttpRequest* req);
-    virtual int InitResponse(HttpResponse* res);
+    virtual int SubmitRequest(HttpRequest* req) {
+        submited = req;
+        return 0;
+    }
+
+    virtual int InitResponse(HttpResponse* res) {
+        res->Reset();
+        parsed = res;
+        http_parser_init(&parser, HTTP_RESPONSE);
+        url.clear();
+        header_field.clear();
+        header_value.clear();
+        return 0;
+    }
 
     // server
     // InitRequest -> do {recv -> FeedRecvData} while(WantRecv) -> SubmitResponse -> while(GetSendData) {send}
-    virtual int InitRequest(HttpRequest* req);
-    virtual int SubmitResponse(HttpResponse* res);
+    virtual int InitRequest(HttpRequest* req) {
+        req->Reset();
+        parsed = req;
+        http_parser_init(&parser, HTTP_REQUEST);
+        state = HP_START_REQ_OR_RES;
+        url.clear();
+        header_field.clear();
+        header_value.clear();
+        return 0;
+    }
 
-    virtual int GetError();
-    virtual const char* StrError(int error);
+    virtual int SubmitResponse(HttpResponse* res) {
+        submited = res;
+        return 0;
+    }
 };
 
 #endif // HTTP1_SESSION_H_

+ 14 - 42
http/Http2Session.cpp

@@ -56,6 +56,7 @@ Http2Session::Http2Session(http_session_type type) {
     }
     else if (type == HTTP_SERVER) {
         nghttp2_session_server_new(&session, cbs, NULL);
+        state = HSS_WANT_RECV;
     }
     nghttp2_session_set_user_data(session, this);
     submited = NULL;
@@ -111,7 +112,7 @@ int Http2Session::GetSendData(char** data, size_t* len) {
                 // grpc server send grpc-status in HTTP2 header frame
                 framehd.flags = HTTP2_FLAG_NONE;
 
-#ifdef TEST_PROTOBUF
+                /*
                 // @test protobuf
                 // message StringMessage {
                 //     string str = 1;
@@ -127,7 +128,7 @@ int Http2Session::GetSendData(char** data, size_t* len) {
                 msghd.length += protobuf_taglen;
                 framehd.length += protobuf_taglen;
                 *len += protobuf_taglen;
-#endif
+                */
             }
 
             grpc_message_hd_pack(&msghd, frame_hdbuf + HTTP2_FRAME_HDLEN);
@@ -142,7 +143,8 @@ int Http2Session::GetSendData(char** data, size_t* len) {
         void* content = submited->Content();
         int content_length = submited->ContentLength();
         if (content_length == 0) {
-            state = HSS_SEND_DONE;
+            // skip send_data
+            goto send_done;
         }
         else {
             state = HSS_SEND_DATA;
@@ -151,6 +153,7 @@ int Http2Session::GetSendData(char** data, size_t* len) {
         }
     }
     else if (state == HSS_SEND_DATA) {
+send_done:
         state = HSS_SEND_DONE;
         if (submited->ContentType() == APPLICATION_GRPC) {
             if (type == HTTP_SERVER && stream_closed) {
@@ -170,7 +173,7 @@ int Http2Session::GetSendData(char** data, size_t* len) {
 
 int Http2Session::FeedRecvData(const char* data, size_t len) {
     printd("nghttp2_session_mem_recv %d\n", len);
-    state = HSS_RECVING;
+    state = HSS_WANT_RECV;
     size_t ret = nghttp2_session_mem_recv(session, (const uint8_t*)data, len);
     if (ret != len) {
         error = ret;
@@ -178,16 +181,6 @@ int Http2Session::FeedRecvData(const char* data, size_t len) {
     return (int)ret;
 }
 
-bool Http2Session::WantRecv() {
-    if (stream_id == -1) return true;
-    if (stream_closed) return false;
-    if (state == HSS_RECV_DATA ||
-        state == HSS_RECV_PING) {
-        return false;
-    }
-    return true;
-}
-
 int Http2Session::SubmitRequest(HttpRequest* req) {
     submited = req;
 
@@ -239,7 +232,6 @@ int Http2Session::SubmitRequest(HttpRequest* req) {
     // nghttp2_data_provider data_prd;
     // data_prd.read_callback = data_source_read_callback;
     //stream_id = nghttp2_submit_request(session, NULL, &nvs[0], nvs.size(), &data_prd, NULL);
-    stream_closed = 0;
     state = HSS_SEND_HEADERS;
     return 0;
 }
@@ -256,13 +248,9 @@ int Http2Session::SubmitResponse(HttpResponse* res) {
             res->headers["content-type"] = http_content_type_str(APPLICATION_GRPC);
         }
         //res->headers["accept-encoding"] = "identity";
-        //hss->state = HSS_RECV_PING;
-        //break;
         //res->headers["grpc-accept-encoding"] = "identity";
         //res->headers["grpc-status"] = "0";
-#ifdef TEST_PROTOBUF
-        res->status_code = HTTP_STATUS_OK;
-#endif
+        //res->status_code = HTTP_STATUS_OK;
     }
 
     std::vector<nghttp2_nv> nvs;
@@ -296,7 +284,6 @@ int Http2Session::SubmitResponse(HttpResponse* res) {
     // avoid DATA_SOURCE_COPY, we do not use nghttp2_submit_data
     // data_prd.read_callback = data_source_read_callback;
     //nghttp2_submit_response(session, stream_id, &nvs[0], nvs.size(), &data_prd);
-    stream_closed = 0;
     state = HSS_SEND_HEADERS;
     return 0;
 }
@@ -317,14 +304,6 @@ int Http2Session::InitRequest(HttpRequest* req) {
     return 0;
 }
 
-int Http2Session::GetError() {
-    return error;
-}
-
-const char* Http2Session::StrError(int error) {
-    return nghttp2_http2_strerror(error);
-}
-
 nghttp2_session_callbacks* Http2Session::cbs = NULL;
 
 int on_header_callback(nghttp2_session *session,
@@ -396,19 +375,6 @@ int on_frame_recv_callback(nghttp2_session *session,
     printd("on_frame_recv_callback\n");
     print_frame_hd(&frame->hd);
     Http2Session* hss = (Http2Session*)userdata;
-    switch (frame->hd.type) {
-    case NGHTTP2_DATA:
-    case NGHTTP2_HEADERS:
-        hss->stream_id = frame->hd.stream_id;
-        if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM) {
-            printd("on_stream_closed stream_id=%d\n", hss->stream_id);
-            hss->stream_closed = 1;
-        }
-        break;
-    default:
-        break;
-    }
-
     switch (frame->hd.type) {
     case NGHTTP2_DATA:
         hss->state = HSS_RECV_DATA;
@@ -425,6 +391,12 @@ int on_frame_recv_callback(nghttp2_session *session,
     default:
         break;
     }
+    hss->stream_id = frame->hd.stream_id;
+    hss->stream_closed = 0;
+    if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM) {
+        printd("on_stream_closed stream_id=%d\n", hss->stream_id);
+        hss->stream_closed = 1;
+    }
 
     return 0;
 }

+ 27 - 4
http/Http2Session.h

@@ -16,7 +16,8 @@ enum http2_session_state {
     HSS_SEND_DATA_FRAME_HD,
     HSS_SEND_DATA,
     HSS_SEND_DONE,
-    HSS_RECVING,
+
+    HSS_WANT_RECV,
     HSS_RECV_SETTINGS,
     HSS_RECV_PING,
     HSS_RECV_HEADERS,
@@ -42,7 +43,31 @@ public:
 
     virtual int GetSendData(char** data, size_t* len);
     virtual int FeedRecvData(const char* data, size_t len);
-    virtual bool WantRecv();
+
+    virtual int GetState() {
+        return (int)state;
+    }
+
+    virtual bool WantRecv() {
+        return state == HSS_WANT_RECV;
+    }
+
+    virtual bool WantSend() {
+        return state != HSS_WANT_RECV;
+    }
+
+    virtual bool IsComplete() {
+        // HTTP2_HEADERS / HTTP2_DATA EOS
+        return (state == HSS_RECV_HEADERS || state == HSS_RECV_DATA) && stream_closed == 1;
+    }
+
+    virtual int GetError() {
+        return error;
+    }
+
+    virtual const char* StrError(int error) {
+        return nghttp2_http2_strerror(error);
+    }
 
     // client
     // SubmitRequest -> while(GetSendData) {send} -> InitResponse -> do {recv -> FeedRecvData} while(WantRecv)
@@ -54,8 +79,6 @@ public:
     virtual int InitRequest(HttpRequest* req);
     virtual int SubmitResponse(HttpResponse* res);
 
-    virtual int GetError();
-    virtual const char* StrError(int error);
 };
 
 #endif

+ 16 - 0
http/HttpSession.h

@@ -13,8 +13,24 @@ public:
 
     virtual int GetSendData(char** data, size_t* len) = 0;
     virtual int FeedRecvData(const char* data, size_t len) = 0;
+
+    // Http1Session: http_parser_state
+    // Http2Session: http2_session_state
+    virtual int GetState() = 0;
+
+    // Http1Session: GetState() != HP_MESSAGE_COMPLETE
+    // Http2Session: GetState() == HSS_WANT_RECV
     virtual bool WantRecv() = 0;
 
+    // Http1Session: GetState() == HP_MESSAGE_COMPLETE
+    // Http2Session: GetState() == HSS_WANT_SEND
+    virtual bool WantSend() = 0;
+
+    // IsComplete: Is recved HttpRequest or HttpResponse complete?
+    // Http1Session: GetState() == HP_MESSAGE_COMPLETE
+    // Http2Session: (state == HSS_RECV_HEADERS || state == HSS_RECV_DATA) && stream_closed
+    virtual bool IsComplete() = 0;
+
     // client
     // SubmitRequest -> while(GetSendData) {send} -> InitResponse -> do {recv -> FeedRecvData} while(WantRecv)
     virtual int SubmitRequest(HttpRequest* req) = 0;

+ 1 - 1
http/client/http_client.cpp

@@ -437,7 +437,7 @@ recv:
         if (nparse != nrecv) {
             return ERR_PARSE;
         }
-    } while(cli->session->WantRecv());
+    } while(!cli->session->IsComplete());
     return err;
 }
 

+ 7 - 3
http/server/HttpServer.cpp

@@ -101,15 +101,17 @@ static void on_recv(hio_t* io, void* _buf, int readbytes) {
             }
             return;
         }
-        else if (
-                h2s->state != HSS_RECV_HEADERS &&
-                h2s->state != HSS_RECV_DATA) {
+        else if ((h2s->state == HSS_RECV_HEADERS && req->method != HTTP_POST) || h2s->state == HSS_RECV_DATA) {
+            goto handle_request;
+        }
+        else {
             // ignore other http2 frame
             return;
         }
     }
 
     // Upgrade: h2
+    {
     auto iter_upgrade = req->headers.find("upgrade");
     if (iter_upgrade != req->headers.end()) {
         hlogi("[%s:%d] Upgrade: %s", handler->ip, handler->port, iter_upgrade->second.c_str());
@@ -136,8 +138,10 @@ static void on_recv(hio_t* io, void* _buf, int readbytes) {
             return;
         }
     }
+    }
 #endif
 
+handle_request:
     int ret = handler->HandleRequest();
     // prepare headers body
     // Server: