ithewei il y a 6 ans
Parent
commit
e4238b7dbd
3 fichiers modifiés avec 130 ajouts et 20 suppressions
  1. 53 19
      http/Http2Session.cpp
  2. 2 1
      http/Http2Session.h
  3. 75 0
      http/grpcdef.h

+ 53 - 19
http/Http2Session.cpp

@@ -17,8 +17,7 @@ static nghttp2_nv make_nv2(const char* name, const char* value,
     nghttp2_nv nv;
     nv.name = (uint8_t*)name;
     nv.value = (uint8_t*)value;
-    nv.namelen = namelen;
-    nv.valuelen = valuelen;
+    nv.namelen = namelen; nv.valuelen = valuelen;
     nv.flags = NGHTTP2_NV_FLAG_NONE;
     return nv;
 }
@@ -103,16 +102,36 @@ int Http2Session::GetSendData(char** data, size_t* len) {
         printd("HTTP2 DATA framehd-------------------\n");
         if (submited->ContentType() == APPLICATION_GRPC) {
             printd("grpc DATA framehd-----------------\n");
-            if (type == HTTP_SERVER) {
-                // grpc server send grpc_status in HTTP2 header frame
-                framehd.flags = HTTP2_FLAG_NONE;
-            }
-            framehd.length += GRPC_MESSAGE_HDLEN;
             grpc_message_hd msghd;
             msghd.flags = 0;
             msghd.length = content_length;
+
+            if (type == HTTP_SERVER) {
+                // grpc server send grpc-status in HTTP2 header frame
+                framehd.flags = HTTP2_FLAG_NONE;
+
+#ifdef TEST_PROTOBUF
+                // @test protobuf
+                // message StringMessage {
+                //     string str = 1;
+                // }
+                int protobuf_taglen = 0;
+                int tag = PROTOBUF_MAKE_TAG(1, WIRE_TYPE_LENGTH_DELIMITED);
+                unsigned char* p = frame_hdbuf + HTTP2_FRAME_HDLEN + GRPC_MESSAGE_HDLEN;
+                int bytes = varint_encode(tag, p);
+                protobuf_taglen += bytes;
+                p += bytes;
+                bytes = varint_encode(content_length, p);
+                protobuf_taglen += bytes;
+                msghd.length += protobuf_taglen;
+                framehd.length += protobuf_taglen;
+                *len += protobuf_taglen;
+#endif
+            }
+
             grpc_message_hd_pack(&msghd, frame_hdbuf + HTTP2_FRAME_HDLEN);
-            *len = HTTP2_FRAME_HDLEN + GRPC_MESSAGE_HDLEN;
+            framehd.length += GRPC_MESSAGE_HDLEN;
+            *len += GRPC_MESSAGE_HDLEN;
         }
         http2_frame_hd_pack(&framehd, frame_hdbuf);
     }
@@ -137,7 +156,7 @@ int Http2Session::GetSendData(char** data, size_t* len) {
                 // grpc HEADERS grpc-status
                 printd("grpc HEADERS grpc-status-----------------\n");
                 int flags = NGHTTP2_FLAG_END_STREAM | NGHTTP2_FLAG_END_HEADERS;
-                nghttp2_nv nv = make_nv("grpc_status", "0");
+                nghttp2_nv nv = make_nv("grpc-status", "0");
                 nghttp2_submit_headers(session, flags, stream_id, NULL, &nv, 1, NULL);
                 *len = nghttp2_session_mem_send(session, (const uint8_t**)data);
             }
@@ -164,6 +183,16 @@ bool Http2Session::WantRecv() {
 int Http2Session::SubmitRequest(HttpRequest* req) {
     submited = req;
 
+    req->FillContentType();
+    req->FillContentLength();
+    if (req->ContentType() == APPLICATION_GRPC) {
+        req->method = HTTP_POST;
+        req->headers["te"] = "trailers";
+        req->headers["user-agent"] = "grpc-c++/1.16.0 grpc-c/6.0.0 (linux; nghttp2; hw)";
+        req->headers["accept-encoding"] = "identity";
+        req->headers["grpc-accept-encoding"] = "identity";
+    }
+
     std::vector<nghttp2_nv> nvs;
     char c_str[256] = {0};
     req->ParseUrl();
@@ -179,8 +208,6 @@ int Http2Session::SubmitRequest(HttpRequest* req) {
         snprintf(c_str, sizeof(c_str), "%s:%d", req->host.c_str(), req->port);
         nvs.push_back(make_nv(":authority", c_str));
     }
-    req->FillContentType();
-    req->FillContentLength();
     const char* name;
     const char* value;
     for (auto& header : req->headers) {
@@ -212,22 +239,28 @@ int Http2Session::SubmitRequest(HttpRequest* req) {
 int Http2Session::SubmitResponse(HttpResponse* res) {
     submited = res;
 
-    std::vector<nghttp2_nv> nvs;
-    char c_str[256] = {0};
-    snprintf(c_str, sizeof(c_str), "%d", res->status_code);
-    nvs.push_back(make_nv(":status", c_str));
     res->FillContentType();
     res->FillContentLength();
-
-    const char* name;
-    const char* value;
     if (parsed && parsed->ContentType() == APPLICATION_GRPC) {
         // correct content_type: application/grpc
         if (res->ContentType() != APPLICATION_GRPC) {
             res->content_type = APPLICATION_GRPC;
             res->headers["content-type"] = http_content_type_str(APPLICATION_GRPC);
         }
+        //res->headers["accept-encoding"] = "identity";
+        //res->headers["grpc-accept-encoding"] = "identity";
+        //res->headers["grpc-status"] = "0";
+#ifdef TEST_PROTOBUF
+        res->status_code = HTTP_STATUS_OK;
+#endif
     }
+
+    std::vector<nghttp2_nv> nvs;
+    char c_str[256] = {0};
+    snprintf(c_str, sizeof(c_str), "%d", res->status_code);
+    nvs.push_back(make_nv(":status", c_str));
+    const char* name;
+    const char* value;
     for (auto& header : res->headers) {
         name = header.first.c_str();
         value = header.second.c_str();
@@ -252,7 +285,6 @@ int Http2Session::SubmitResponse(HttpResponse* res) {
     nghttp2_submit_headers(session, flags, stream_id, NULL, &nvs[0], nvs.size(), NULL);
     // avoid DATA_SOURCE_COPY, we do not use nghttp2_submit_data
     // data_prd.read_callback = data_source_read_callback;
-    //stream_id = nghttp2_submit_request(session, NULL, &nvs[0], nvs.size(), &data_prd, NULL);
     //nghttp2_submit_response(session, stream_id, &nvs[0], nvs.size(), &data_prd);
     stream_closed = 0;
     state = HSS_SEND_HEADERS;
@@ -363,6 +395,8 @@ int on_frame_recv_callback(nghttp2_session *session,
             hss->stream_closed = 1;
         }
         break;
+    case NGHTTP2_PING:
+        break;
     default:
         break;
     }

+ 2 - 1
http/Http2Session.h

@@ -29,7 +29,8 @@ public:
     int stream_id;
     int stream_closed;
     // http2_frame_hd + grpc_message_hd
-    unsigned char                   frame_hdbuf[HTTP2_FRAME_HDLEN+GRPC_MESSAGE_HDLEN];
+    // at least HTTP2_FRAME_HDLEN + GRPC_MESSAGE_HDLEN = 9 + 5 = 14
+    unsigned char                   frame_hdbuf[32];
 
     Http2Session(http_session_type type = HTTP_CLIENT);
     virtual ~Http2Session();

+ 75 - 0
http/grpcdef.h

@@ -42,6 +42,81 @@ static inline void grpc_message_hd_unpack(const unsigned char* buf, grpc_message
     hd->length += *p++;
 }
 
+// protobuf
+// tag = field_num << 3 | wire_type
+// varint(tag) [+ varint(length_delimited)] + value;
+typedef enum {
+    WIRE_TYPE_VARINT           = 0,
+    WIRE_TYPE_FIXED64          = 1,
+    WIRE_TYPE_LENGTH_DELIMITED = 2,
+    WIRE_TYPE_START_GROUP      = 3,
+    WIRE_TYPE_END_GROUP        = 4,
+    WIRE_TYPE_FIXED32          = 5,
+} wire_type;
+
+typedef enum {
+    FIELD_TYPE_DOUBLE         = 1,
+    FIELD_TYPE_FLOAT          = 2,
+    FIELD_TYPE_INT64          = 3,
+    FIELD_TYPE_UINT64         = 4,
+    FIELD_TYPE_INT32          = 5,
+    FIELD_TYPE_FIXED64        = 6,
+    FIELD_TYPE_FIXED32        = 7,
+    FIELD_TYPE_BOOL           = 8,
+    FIELD_TYPE_STRING         = 9,
+    FIELD_TYPE_GROUP          = 10,
+    FIELD_TYPE_MESSAGE        = 11,
+    FIELD_TYPE_BYTES          = 12,
+    FIELD_TYPE_UINT32         = 13,
+    FIELD_TYPE_ENUM           = 14,
+    FIELD_TYPE_SFIXED32       = 15,
+    FIELD_TYPE_SFIXED64       = 16,
+    FIELD_TYPE_SINT32         = 17,
+    FIELD_TYPE_SINT64         = 18,
+    MAX_FIELD_TYPE            = 18,
+} field_type;
+
+#define PROTOBUF_MAKE_TAG(field_number, wire_type)  ((field_number) << 3 | (wire_type))
+#define PROTOBUF_FILED_NUMBER(tag)                  ((tag) >> 3)
+#define PROTOBUF_WIRE_TYPE(tag)                     ((tag) & 0x07)
+
+// varint little-endian
+// MSB
+static inline int varint_encode(long long value, unsigned char* buf) {
+    unsigned char ch;
+    unsigned char *p = buf;
+    int bytes = 0;
+    do {
+        ch = value & 0x7F;
+        value >>= 7;
+        *p++ = value == 0 ? ch : (ch | 0x80);
+        ++bytes;
+    } while (value);
+    return bytes;
+}
+
+// @param[IN|OUT] len: in=>buflen, out=>varint bytesize
+static inline long long varint_decode(const unsigned char* buf, int* len) {
+    long long ret = 0;
+    int bytes = 0;
+    int bits = 0;
+    const unsigned char *p = buf;
+    unsigned char ch;
+    do {
+        if (len && *len && bytes == *len) {
+            break;
+        }
+        ch = *p & 0x7F;
+        ret |= (ch << bits);
+        bits += 7;
+        ++bytes;
+        if (!(*p & 0x80)) break;
+        ++p;
+    } while(bytes < 10);
+    *len = bytes;
+    return ret;
+}
+
 #ifdef __cplusplus
 }
 #endif