protorpc_server.cpp 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. /*
  2. * proto rpc server
  3. *
  4. * @build make protorpc
  5. * @server bin/protorpc_server 1234
  6. * @client bin/protorpc_client 127.0.0.1 1234 add 1 2
  7. *
  8. */
  9. #include "TcpServer.h"
  10. using namespace hv;
  11. #include "protorpc.h"
  12. #include "router.h"
  13. #include "handler/handler.h"
  14. #include "handler/calc.h"
  15. #include "handler/login.h"
  16. // valgrind --leak-check=full --show-leak-kinds=all
  17. class ProtobufRAII {
  18. public:
  19. ProtobufRAII() {
  20. }
  21. ~ProtobufRAII() {
  22. google::protobuf::ShutdownProtobufLibrary();
  23. }
  24. };
  25. static ProtobufRAII s_protobuf;
  26. protorpc_router router[] = {
  27. {"add", calc_add},
  28. {"sub", calc_sub},
  29. {"mul", calc_mul},
  30. {"div", calc_div},
  31. {"login", login},
  32. };
  33. #define PROTORPC_ROUTER_NUM (sizeof(router)/sizeof(router[0]))
  34. class ProtoRpcServer : public TcpServer {
  35. public:
  36. ProtoRpcServer() : TcpServer()
  37. {
  38. onConnection = [](const SocketChannelPtr& channel) {
  39. std::string peeraddr = channel->peeraddr();
  40. if (channel->isConnected()) {
  41. printf("%s connected! connfd=%d\n", peeraddr.c_str(), channel->fd());
  42. } else {
  43. printf("%s disconnected! connfd=%d\n", peeraddr.c_str(), channel->fd());
  44. }
  45. };
  46. onMessage = handleMessage;
  47. // init protorpc_unpack_setting
  48. unpack_setting_t protorpc_unpack_setting;
  49. memset(&protorpc_unpack_setting, 0, sizeof(unpack_setting_t));
  50. protorpc_unpack_setting.mode = UNPACK_BY_LENGTH_FIELD;
  51. protorpc_unpack_setting.package_max_length = DEFAULT_PACKAGE_MAX_LENGTH;
  52. protorpc_unpack_setting.body_offset = PROTORPC_HEAD_LENGTH;
  53. protorpc_unpack_setting.length_field_offset = PROTORPC_HEAD_LENGTH_FIELD_OFFSET;
  54. protorpc_unpack_setting.length_field_bytes = PROTORPC_HEAD_LENGTH_FIELD_BYTES;
  55. protorpc_unpack_setting.length_field_coding = ENCODE_BY_BIG_ENDIAN;
  56. setUnpack(&protorpc_unpack_setting);
  57. }
  58. int listen(int port) { return createsocket(port); }
  59. private:
  60. static void handleMessage(const SocketChannelPtr& channel, Buffer* buf) {
  61. // unpack -> Request::ParseFromArray -> router -> Response::SerializeToArray -> pack -> Channel::write
  62. // protorpc_unpack
  63. protorpc_message msg;
  64. memset(&msg, 0, sizeof(msg));
  65. int packlen = protorpc_unpack(&msg, buf->data(), buf->size());
  66. if (packlen < 0) {
  67. printf("protorpc_unpack failed!\n");
  68. return;
  69. }
  70. assert(packlen == buf->size());
  71. if (protorpc_head_check(&msg.head) != 0) {
  72. printf("protorpc_head_check failed!\n");
  73. return;
  74. }
  75. // Request::ParseFromArray
  76. protorpc::Request req;
  77. protorpc::Response res;
  78. if (req.ParseFromArray(msg.body, msg.head.length)) {
  79. printf("> %s\n", req.DebugString().c_str());
  80. res.set_id(req.id());
  81. // router
  82. const char* method = req.method().c_str();
  83. bool found = false;
  84. for (int i = 0; i < PROTORPC_ROUTER_NUM; ++i) {
  85. if (strcmp(method, router[i].method) == 0) {
  86. found = true;
  87. router[i].handler(req, &res);
  88. break;
  89. }
  90. }
  91. if (!found) {
  92. not_found(req, &res);
  93. }
  94. } else {
  95. bad_request(req, &res);
  96. }
  97. // Response::SerializeToArray + protorpc_pack
  98. protorpc_message_init(&msg);
  99. msg.head.length = res.ByteSize();
  100. packlen = protorpc_package_length(&msg.head);
  101. unsigned char* writebuf = NULL;
  102. HV_STACK_ALLOC(writebuf, packlen);
  103. packlen = protorpc_pack(&msg, writebuf, packlen);
  104. if (packlen > 0) {
  105. printf("< %s\n", res.DebugString().c_str());
  106. res.SerializeToArray(writebuf + PROTORPC_HEAD_LENGTH, msg.head.length);
  107. channel->write(writebuf, packlen);
  108. }
  109. HV_STACK_FREE(writebuf);
  110. }
  111. };
  112. int main(int argc, char** argv) {
  113. if (argc < 2) {
  114. printf("Usage: %s port\n", argv[0]);
  115. return -10;
  116. }
  117. int port = atoi(argv[1]);
  118. ProtoRpcServer srv;
  119. int listenfd = srv.listen(port);
  120. if (listenfd < 0) {
  121. return -20;
  122. }
  123. printf("protorpc_server listen on port %d, listenfd=%d ...\n", port, listenfd);
  124. srv.setThreadNum(4);
  125. srv.start();
  126. while (1) hv_sleep(1);
  127. return 0;
  128. }