Forráskód Böngészése

Update jsonrpc protorpc

ithewei 4 éve
szülő
commit
423f3d2615

+ 2 - 2
Makefile

@@ -149,11 +149,11 @@ websocket_client_test: prepare
 jsonrpc: jsonrpc_client jsonrpc_server
 
 jsonrpc_client: prepare
-	$(MAKEF) TARGET=$@ SRCDIRS="$(CORE_SRCDIRS)" SRCS="examples/jsonrpc/jsonrpc_client.c examples/jsonrpc/jsonrpc.c examples/jsonrpc/cJSON.c"
+	$(MAKEF) TARGET=$@ SRCDIRS="$(CORE_SRCDIRS)" SRCS="examples/jsonrpc/jsonrpc_client.c examples/jsonrpc/cJSON.c"
 
 jsonrpc_server: prepare
 	$(RM) examples/jsonrpc/*.o
-	$(MAKEF) TARGET=$@ SRCDIRS="$(CORE_SRCDIRS)" SRCS="examples/jsonrpc/jsonrpc_server.c examples/jsonrpc/jsonrpc.c examples/jsonrpc/cJSON.c"
+	$(MAKEF) TARGET=$@ SRCDIRS="$(CORE_SRCDIRS)" SRCS="examples/jsonrpc/jsonrpc_server.c examples/jsonrpc/cJSON.c"
 
 protorpc: protorpc_client protorpc_server
 

+ 5 - 2
event/hloop.h

@@ -317,6 +317,7 @@ HV_EXPORT int hio_read_once (hio_t* io);
 HV_EXPORT int hio_read_until_length(hio_t* io, unsigned int len);
 // hio_read_once => hread_cb(...delim)
 HV_EXPORT int hio_read_until_delim (hio_t* io, unsigned char delim);
+// @see examples/tinyhttpd.c
 #define hio_readline(io)        hio_read_until_delim(io, '\n')
 #define hio_readstring(io)      hio_read_until_delim(io, '\0')
 #define hio_readbytes(io, len)  hio_read_until_length(io, len)
@@ -413,13 +414,13 @@ HV_EXPORT hio_t* hio_get_upstream(hio_t* io);
 
 // @tcp_upstream: hio_create -> hio_setup_upstream -> hio_setcb_close(hio_close_upstream) -> hconnect -> on_connect -> hio_read_upstream
 // @return upstream_io
-// @see examples/tcp_proxy_server
+// @see examples/tcp_proxy_server.c
 HV_EXPORT hio_t* hio_setup_tcp_upstream(hio_t* io, const char* host, int port, int ssl DEFAULT(0));
 #define hio_setup_ssl_upstream(io, host, port) hio_setup_tcp_upstream(io, host, port, 1)
 
 // @udp_upstream: hio_create -> hio_setup_upstream -> hio_read_upstream
 // @return upstream_io
-// @see examples/udp_proxy_server
+// @see examples/udp_proxy_server.c
 HV_EXPORT hio_t* hio_setup_udp_upstream(hio_t* io, const char* host, int port);
 
 //-----------------unpack---------------------------------------------
@@ -488,6 +489,7 @@ typedef struct unpack_setting_s {
 #endif
 } unpack_setting_t;
 
+// @see examples/jsonrpc examples/protorpc
 HV_EXPORT void hio_set_unpack(hio_t* io, unpack_setting_t* setting);
 HV_EXPORT void hio_unset_unpack(hio_t* io);
 
@@ -569,6 +571,7 @@ typedef struct kcp_setting_s {
 #endif
 } kcp_setting_t;
 
+// @see examples/udp_echo_server.c => #define TEST_KCP 1
 HV_EXPORT int hio_set_kcp(hio_t* io, kcp_setting_t* setting DEFAULT(NULL));
 #endif
 

+ 2 - 2
examples/CMakeLists.txt

@@ -41,10 +41,10 @@ target_link_libraries(udp_echo_server ${HV_LIBRARIES})
 add_executable(udp_proxy_server udp_proxy_server.c)
 target_link_libraries(udp_proxy_server ${HV_LIBRARIES})
 
-add_executable(jsonrpc_client jsonrpc/jsonrpc_client.c jsonrpc/jsonrpc.c jsonrpc/cJSON.c)
+add_executable(jsonrpc_client jsonrpc/jsonrpc_client.c jsonrpc/cJSON.c)
 target_link_libraries(jsonrpc_client ${HV_LIBRARIES})
 
-add_executable(jsonrpc_server jsonrpc/jsonrpc_server.c jsonrpc/jsonrpc.c jsonrpc/cJSON.c)
+add_executable(jsonrpc_server jsonrpc/jsonrpc_server.c jsonrpc/cJSON.c)
 target_link_libraries(jsonrpc_server ${HV_LIBRARIES})
 
 if(WITH_EVPP)

+ 0 - 49
examples/jsonrpc/jsonrpc.c

@@ -1,49 +0,0 @@
-#include "jsonrpc.h"
-
-#include <string.h> // import memcpy
-
-int jsonrpc_pack(const jsonrpc_message* msg, void* buf, int len) {
-    if (!msg || !buf || !len) return -1;
-    const jsonrpc_head* head = &(msg->head);
-    unsigned int packlen = jsonrpc_package_length(head);
-    // Check is buffer enough
-    if (len < packlen) {
-        return -2;
-    }
-    unsigned char* p = (unsigned char*)buf;
-    // flags
-    *p++ = head->flags;
-    // hton length
-    unsigned int length = head->length;
-    *p++ = (length >> 24) & 0xFF;
-    *p++ = (length >> 16) & 0xFF;
-    *p++ = (length >>  8) & 0xFF;
-    *p++ =  length        & 0xFF;
-    // memcpy body
-    memcpy(p, msg->body, head->length);
-
-    return packlen;
-}
-
-int jsonrpc_unpack(jsonrpc_message* msg, const void* buf, int len) {
-    if (!msg || !buf || !len) return -1;
-    if (len < JSONRPC_HEAD_LENGTH) return -2;
-    jsonrpc_head* head = &(msg->head);
-    const unsigned char* p = (const unsigned char*)buf;
-    // flags
-    head->flags = *p++;
-    // ntoh length
-    head->length  = ((unsigned int)*p++) << 24;
-    head->length |= ((unsigned int)*p++) << 16;
-    head->length |= ((unsigned int)*p++) << 8;
-    head->length |= *p++;
-    // Check is buffer enough
-    unsigned int packlen = jsonrpc_package_length(head);
-    if (len < packlen) {
-        return -3;
-    }
-    // NOTE: just shadow copy
-    msg->body = (const char*)buf + JSONRPC_HEAD_LENGTH;
-
-    return packlen;
-}

+ 0 - 27
examples/jsonrpc/jsonrpc.h

@@ -1,27 +0,0 @@
-#ifndef HV_JSON_RPC_H_
-#define HV_JSON_RPC_H_
-
-// flags:1byte + length:4bytes = 5bytes
-#define JSONRPC_HEAD_LENGTH  5
-typedef struct {
-    unsigned char   flags;
-    unsigned int    length;
-} jsonrpc_head;
-
-typedef const char* jsonrpc_body;
-
-typedef struct {
-    jsonrpc_head   head;
-    jsonrpc_body   body;
-} jsonrpc_message;
-
-static inline unsigned int jsonrpc_package_length(const jsonrpc_head* head) {
-    return JSONRPC_HEAD_LENGTH + head->length;
-}
-
-// @retval >0 package_length, <0 error
-int jsonrpc_pack(const jsonrpc_message* msg, void* buf, int len);
-// @retval >0 package_length, <0 error
-int jsonrpc_unpack(jsonrpc_message* msg, const void* buf, int len);
-
-#endif // HV_JSON_RPC_H_

+ 12 - 35
examples/jsonrpc/jsonrpc_client.c

@@ -12,7 +12,6 @@
 #include "hbase.h"
 #include "hsocket.h"
 
-#include "jsonrpc.h"
 #include "cJSON.h"
 
 // hloop_create_tcp_client -> on_connect -> hio_write -> hio_read -> on_recv
@@ -41,19 +40,10 @@ static void on_recv(hio_t* io, void* readbuf, int readbytes) {
                 SOCKADDR_STR(hio_peeraddr(io), peeraddrstr));
     }
 
-    // unpack
-    jsonrpc_message msg;
-    memset(&msg, 0, sizeof(msg));
-    int packlen = jsonrpc_unpack(&msg, readbuf, readbytes);
-    if (packlen < 0) {
-        printf("jsonrpc_unpack failed!\n");
-        return;
-    }
-    assert(packlen == readbytes);
-
-    printf("< %.*s\n", msg.head.length, msg.body);
+    char* resp_str = (char*)readbuf;
+    printf("< %s\n", resp_str);
     // cJSON_Parse
-    cJSON* jres = cJSON_ParseWithLength(msg.body, msg.head.length);
+    cJSON* jres = cJSON_Parse(resp_str);
     cJSON* jerror = cJSON_GetObjectItem(jres, "error");
     cJSON* jresult = cJSON_GetObjectItem(jres, "result");
     // ...
@@ -80,25 +70,14 @@ static void on_connect(hio_t* io) {
     hevent_set_userdata(io, NULL);
     assert(jreq != NULL);
 
-    // cJSON_Print -> pack -> hio_write
-    jsonrpc_message msg;
-    memset(&msg, 0, sizeof(msg));
-    msg.body = cJSON_PrintUnformatted(jreq);
-    msg.head.length = strlen(msg.body);
-    printf("> %.*s\n", msg.head.length, msg.body);
-
-    // pack
-    unsigned int packlen = jsonrpc_package_length(&msg.head);
-    unsigned char* writebuf = NULL;
-    HV_ALLOC(writebuf, packlen);
-    packlen = jsonrpc_pack(&msg, writebuf, packlen);
-    if (packlen > 0) {
-        hio_write(io, writebuf, packlen);
-    }
+    // cJSON_Print -> hio_write
+    char* req_str = cJSON_PrintUnformatted(jreq);
+    printf("> %s\n", req_str);
+    // NOTE: +1 for \0
+    hio_write(io, req_str, strlen(req_str) + 1);
 
     cJSON_Delete(jreq);
-    cJSON_free((void*)msg.body);
-    HV_FREE(writebuf);
+    cJSON_free(req_str);
 }
 
 static int jsonrpc_call(hloop_t* loop, const char* host, int port, const char* method, const char* param1, const char* param2) {
@@ -142,12 +121,10 @@ int main(int argc, char** argv) {
 
     // init jsonrpc_unpack_setting
     memset(&jsonrpc_unpack_setting, 0, sizeof(unpack_setting_t));
-    jsonrpc_unpack_setting.mode = UNPACK_BY_LENGTH_FIELD;
+    jsonrpc_unpack_setting.mode = UNPACK_BY_DELIMITER;
     jsonrpc_unpack_setting.package_max_length = DEFAULT_PACKAGE_MAX_LENGTH;
-    jsonrpc_unpack_setting.body_offset = JSONRPC_HEAD_LENGTH;
-    jsonrpc_unpack_setting.length_field_offset = 1;
-    jsonrpc_unpack_setting.length_field_bytes = 4;
-    jsonrpc_unpack_setting.length_field_coding = ENCODE_BY_BIG_ENDIAN;
+    jsonrpc_unpack_setting.delimiter[0] = '\0';
+    jsonrpc_unpack_setting.delimiter_bytes = 1;
 
     hloop_t* loop = hloop_new(0);
 

+ 12 - 36
examples/jsonrpc/jsonrpc_server.c

@@ -11,7 +11,6 @@
 #include "hbase.h"
 #include "hsocket.h"
 
-#include "jsonrpc.h"
 #include "cJSON.h"
 #include "router.h"
 #include "handler.h"
@@ -43,20 +42,10 @@ static void on_recv(hio_t* io, void* readbuf, int readbytes) {
                 SOCKADDR_STR(hio_peeraddr(io), peeraddrstr));
     }
 
-    // unpack -> cJSON_Parse -> router -> cJSON_Print -> pack -> hio_write
-    // unpack
-    jsonrpc_message msg;
-    memset(&msg, 0, sizeof(msg));
-    int packlen = jsonrpc_unpack(&msg, readbuf, readbytes);
-    if (packlen < 0) {
-        printf("jsonrpc_unpack failed!\n");
-        return;
-    }
-    assert(packlen == readbytes);
-
-    // cJSON_Parse
-    printf("> %.*s\n", msg.head.length, msg.body);
-    cJSON* jreq = cJSON_ParseWithLength(msg.body, msg.head.length);
+    // cJSON_Parse -> router -> cJSON_Print -> hio_write
+    char* req_str = (char*)readbuf;
+    printf("> %s\n", req_str);
+    cJSON* jreq = cJSON_Parse(req_str);
     cJSON* jres = cJSON_CreateObject();
     cJSON* jid = cJSON_GetObjectItem(jreq, "id");
     cJSON* jmethod = cJSON_GetObjectItem(jreq, "method");
@@ -82,25 +71,14 @@ static void on_recv(hio_t* io, void* readbuf, int readbytes) {
         bad_request(jreq, jres);
     }
 
-    // cJSON_Print
-    memset(&msg, 0, sizeof(msg));
-    msg.body = cJSON_PrintUnformatted(jres);
-    msg.head.length = strlen(msg.body);
-    printf("< %.*s\n", msg.head.length, msg.body);
-
-    // pack
-    packlen = jsonrpc_package_length(&msg.head);
-    unsigned char* writebuf = NULL;
-    HV_ALLOC(writebuf, packlen);
-    packlen = jsonrpc_pack(&msg, writebuf, packlen);
-    if (packlen > 0) {
-        hio_write(io, writebuf, packlen);
-    }
+    char* resp_str = cJSON_PrintUnformatted(jres);
+    printf("< %s\n", resp_str);
+    // NOTE: +1 for \0
+    hio_write(io, resp_str, strlen(resp_str) + 1);
 
     cJSON_Delete(jreq);
     cJSON_Delete(jres);
-    cJSON_free((void*)msg.body);
-    HV_FREE(writebuf);
+    cJSON_free(resp_str);
 }
 
 static void on_accept(hio_t* io) {
@@ -128,12 +106,10 @@ int main(int argc, char** argv) {
 
     // init jsonrpc_unpack_setting
     memset(&jsonrpc_unpack_setting, 0, sizeof(unpack_setting_t));
-    jsonrpc_unpack_setting.mode = UNPACK_BY_LENGTH_FIELD;
+    jsonrpc_unpack_setting.mode = UNPACK_BY_DELIMITER;
     jsonrpc_unpack_setting.package_max_length = DEFAULT_PACKAGE_MAX_LENGTH;
-    jsonrpc_unpack_setting.body_offset = JSONRPC_HEAD_LENGTH;
-    jsonrpc_unpack_setting.length_field_offset = 1;
-    jsonrpc_unpack_setting.length_field_bytes = 4;
-    jsonrpc_unpack_setting.length_field_coding = ENCODE_BY_BIG_ENDIAN;
+    jsonrpc_unpack_setting.delimiter[0] = '\0';
+    jsonrpc_unpack_setting.delimiter_bytes = 1;
 
     hloop_t* loop = hloop_new(0);
     hio_t* listenio = hloop_create_tcp_server(loop, "0.0.0.0", port, on_accept);

+ 2 - 2
examples/protorpc/proto/base.proto

@@ -15,6 +15,6 @@ message Request {
 
 message Response {
     uint64  id      = 1;
-    optional bytes  result  = 2;
-    optional Error  error   = 3;
+    bytes   result  = 2;
+    Error   error   = 3;
 }

+ 14 - 2
examples/protorpc/protorpc.c

@@ -11,8 +11,14 @@ int protorpc_pack(const protorpc_message* msg, void* buf, int len) {
         return -2;
     }
     unsigned char* p = (unsigned char*)buf;
-    // flags
+    *p++ = head->protocol[0];
+    *p++ = head->protocol[1];
+    *p++ = head->protocol[2];
+    *p++ = head->protocol[3];
+    *p++ = head->version;
     *p++ = head->flags;
+    *p++ = head->reserved[0];
+    *p++ = head->reserved[1];
     // hton length
     unsigned int length = head->length;
     *p++ = (length >> 24) & 0xFF;
@@ -32,8 +38,14 @@ int protorpc_unpack(protorpc_message* msg, const void* buf, int len) {
     if (len < PROTORPC_HEAD_LENGTH) return -2;
     protorpc_head* head = &(msg->head);
     const unsigned char* p = (const unsigned char*)buf;
-    // flags
+    head->protocol[0] = *p++;
+    head->protocol[1] = *p++;
+    head->protocol[2] = *p++;
+    head->protocol[3] = *p++;
+    head->version = *p++;
     head->flags = *p++;
+    head->reserved[0] = *p++;
+    head->reserved[1] = *p++;
     // ntoh length
     head->length  = ((unsigned int)*p++) << 24;
     head->length |= ((unsigned int)*p++) << 16;

+ 35 - 2
examples/protorpc/protorpc.h

@@ -1,14 +1,24 @@
 #ifndef HV_PROTO_RPC_H_
 #define HV_PROTO_RPC_H_
 
+#include <string.h>
+
 #ifdef __cplusplus
 extern "C" {
 #endif
 
-// flags:1byte + length:4bytes = 5bytes
-#define PROTORPC_HEAD_LENGTH  5
+#define PROTORPC_NAME       "HRPC"
+#define PROTORPC_VERSION    1
+
+// protocol:4bytes + version:1byte + flags:1byte + reserved:2bytes + length:4bytes = 12bytes
+#define PROTORPC_HEAD_LENGTH                12
+#define PROTORPC_HEAD_LENGTH_FIELD_OFFSET   8
+#define PROTORPC_HEAD_LENGTH_FIELD_BYTES    4
 typedef struct {
+    unsigned char   protocol[4];
+    unsigned char   version;
     unsigned char   flags;
+    unsigned char   reserved[2];
     unsigned int    length;
 } protorpc_head;
 
@@ -23,6 +33,29 @@ static inline unsigned int protorpc_package_length(const protorpc_head* head) {
     return PROTORPC_HEAD_LENGTH + head->length;
 }
 
+static inline void protorpc_head_init(protorpc_head* head) {
+    // protocol = HRPC
+    memcpy(head->protocol, PROTORPC_NAME, 4);
+    head->version = PROTORPC_VERSION;
+    head->reserved[0] = head->reserved[1] = 0;
+    head->length = 0;
+}
+
+static inline void protorpc_message_init(protorpc_message* msg) {
+    protorpc_head_init(&msg->head);
+    msg->body = NULL;
+}
+
+static inline int protorpc_head_check(protorpc_head* head) {
+    if (memcmp(head->protocol, PROTORPC_NAME, 4) != 0) {
+        return -1;
+    }
+    if (head->version != PROTORPC_VERSION) {
+        return -2;
+    }
+    return 0;
+}
+
 // @retval >0 package_length, <0 error
 int protorpc_pack(const protorpc_message* msg, void* buf, int len);
 // @retval >0 package_length, <0 error

+ 10 - 6
examples/protorpc/protorpc_client.cpp

@@ -71,8 +71,8 @@ public:
         protorpc_unpack_setting.mode = UNPACK_BY_LENGTH_FIELD;
         protorpc_unpack_setting.package_max_length = DEFAULT_PACKAGE_MAX_LENGTH;
         protorpc_unpack_setting.body_offset = PROTORPC_HEAD_LENGTH;
-        protorpc_unpack_setting.length_field_offset = 1;
-        protorpc_unpack_setting.length_field_bytes = 4;
+        protorpc_unpack_setting.length_field_offset = PROTORPC_HEAD_LENGTH_FIELD_OFFSET;
+        protorpc_unpack_setting.length_field_bytes = PROTORPC_HEAD_LENGTH_FIELD_BYTES;
         protorpc_unpack_setting.length_field_coding = ENCODE_BY_BIG_ENDIAN;
         setUnpack(&protorpc_unpack_setting);
 
@@ -97,6 +97,10 @@ public:
                 return;
             }
             assert(packlen == buf->size());
+            if (protorpc_head_check(&msg.head) != 0) {
+                printf("protorpc_head_check failed!\n");
+                return;
+            }
             // Response::ParseFromArray
             protorpc::ResponsePtr res(new protorpc::Response);
             if (!res->ParseFromArray(msg.body, msg.head.length)) {
@@ -134,8 +138,8 @@ public:
         calls[req->id()] = protorpc::ContextPtr(ctx);
         // Request::SerializeToArray + protorpc_pack
         protorpc_message msg;
-        memset(&msg, 0, sizeof(msg));
-        msg.head.length = req->ByteSizeLong();
+        protorpc_message_init(&msg);
+        msg.head.length = req->ByteSize();
         int packlen = protorpc_package_length(&msg.head);
         unsigned char* writebuf = NULL;
         HV_ALLOC(writebuf, packlen);
@@ -175,7 +179,7 @@ public:
 
         if (res == NULL) return kRpcTimeout;
         if (res->has_error()) return kRpcError;
-        if (!res->has_result()) return kRpcNoResult;
+        if (res->result().empty()) return kRpcNoResult;
         protorpc::CalcResult result;
         if (!result.ParseFromString(res->result())) return kRpcParseError;
         out = result.num();
@@ -193,7 +197,7 @@ public:
 
         if (res == NULL) return kRpcTimeout;
         if (res->has_error()) return kRpcError;
-        if (!res->has_result()) return kRpcNoResult;
+        if (res->result().empty()) return kRpcNoResult;
         if (!result->ParseFromString(res->result())) return kRpcParseError;
         return kRpcSuccess;
     }

+ 8 - 4
examples/protorpc/protorpc_server.cpp

@@ -45,8 +45,8 @@ public:
         protorpc_unpack_setting.mode = UNPACK_BY_LENGTH_FIELD;
         protorpc_unpack_setting.package_max_length = DEFAULT_PACKAGE_MAX_LENGTH;
         protorpc_unpack_setting.body_offset = PROTORPC_HEAD_LENGTH;
-        protorpc_unpack_setting.length_field_offset = 1;
-        protorpc_unpack_setting.length_field_bytes = 4;
+        protorpc_unpack_setting.length_field_offset = PROTORPC_HEAD_LENGTH_FIELD_OFFSET;
+        protorpc_unpack_setting.length_field_bytes = PROTORPC_HEAD_LENGTH_FIELD_BYTES;
         protorpc_unpack_setting.length_field_coding = ENCODE_BY_BIG_ENDIAN;
         setUnpack(&protorpc_unpack_setting);
     }
@@ -65,6 +65,10 @@ private:
             return;
         }
         assert(packlen == buf->size());
+        if (protorpc_head_check(&msg.head) != 0) {
+            printf("protorpc_head_check failed!\n");
+            return;
+        }
 
         // Request::ParseFromArray
         protorpc::Request req;
@@ -90,8 +94,8 @@ private:
         }
 
         // Response::SerializeToArray + protorpc_pack
-        memset(&msg, 0, sizeof(msg));
-        msg.head.length = res.ByteSizeLong();
+        protorpc_message_init(&msg);
+        msg.head.length = res.ByteSize();
         packlen = protorpc_package_length(&msg.head);
         unsigned char* writebuf = NULL;
         HV_ALLOC(writebuf, packlen);