protorpc_server.cpp 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. /*
  2. * proto rpc server
  3. *
  4. * @build make protorpc
  5. * @server bin/protorpc_server 1234
  6. * @client bin/protorpc_client 127.0.0.1 1234 add 1 2
  7. *
  8. */
  9. #include "TcpServer.h"
  10. using namespace hv;
  11. #include "protorpc.h"
  12. #include "router.h"
  13. #include "handler/handler.h"
  14. #include "handler/calc.h"
  15. #include "handler/login.h"
  16. protorpc_router router[] = {
  17. {"add", calc_add},
  18. {"sub", calc_sub},
  19. {"mul", calc_mul},
  20. {"div", calc_div},
  21. {"login", login},
  22. };
  23. #define PROTORPC_ROUTER_NUM (sizeof(router)/sizeof(router[0]))
  24. class ProtoRpcServer : public TcpServer {
  25. public:
  26. ProtoRpcServer() : TcpServer()
  27. {
  28. onConnection = [](const SocketChannelPtr& channel) {
  29. std::string peeraddr = channel->peeraddr();
  30. if (channel->isConnected()) {
  31. printf("%s connected! connfd=%d\n", peeraddr.c_str(), channel->fd());
  32. } else {
  33. printf("%s disconnected! connfd=%d\n", peeraddr.c_str(), channel->fd());
  34. }
  35. };
  36. onMessage = handleMessage;
  37. // init protorpc_unpack_setting
  38. unpack_setting_t protorpc_unpack_setting;
  39. memset(&protorpc_unpack_setting, 0, sizeof(unpack_setting_t));
  40. protorpc_unpack_setting.mode = UNPACK_BY_LENGTH_FIELD;
  41. protorpc_unpack_setting.package_max_length = DEFAULT_PACKAGE_MAX_LENGTH;
  42. protorpc_unpack_setting.body_offset = PROTORPC_HEAD_LENGTH;
  43. protorpc_unpack_setting.length_field_offset = PROTORPC_HEAD_LENGTH_FIELD_OFFSET;
  44. protorpc_unpack_setting.length_field_bytes = PROTORPC_HEAD_LENGTH_FIELD_BYTES;
  45. protorpc_unpack_setting.length_field_coding = ENCODE_BY_BIG_ENDIAN;
  46. setUnpack(&protorpc_unpack_setting);
  47. }
  48. int listen(int port) { return createsocket(port); }
  49. private:
  50. static void handleMessage(const SocketChannelPtr& channel, Buffer* buf) {
  51. // unpack -> Request::ParseFromArray -> router -> Response::SerializeToArray -> pack -> Channel::write
  52. // protorpc_unpack
  53. protorpc_message msg;
  54. memset(&msg, 0, sizeof(msg));
  55. int packlen = protorpc_unpack(&msg, buf->data(), buf->size());
  56. if (packlen < 0) {
  57. printf("protorpc_unpack failed!\n");
  58. return;
  59. }
  60. assert(packlen == buf->size());
  61. if (protorpc_head_check(&msg.head) != 0) {
  62. printf("protorpc_head_check failed!\n");
  63. return;
  64. }
  65. // Request::ParseFromArray
  66. protorpc::Request req;
  67. protorpc::Response res;
  68. if (req.ParseFromArray(msg.body, msg.head.length)) {
  69. printf("> %s\n", req.DebugString().c_str());
  70. res.set_id(req.id());
  71. // router
  72. const char* method = req.method().c_str();
  73. bool found = false;
  74. for (int i = 0; i < PROTORPC_ROUTER_NUM; ++i) {
  75. if (strcmp(method, router[i].method) == 0) {
  76. found = true;
  77. router[i].handler(req, &res);
  78. break;
  79. }
  80. }
  81. if (!found) {
  82. not_found(req, &res);
  83. }
  84. } else {
  85. bad_request(req, &res);
  86. }
  87. // Response::SerializeToArray + protorpc_pack
  88. protorpc_message_init(&msg);
  89. msg.head.length = res.ByteSize();
  90. packlen = protorpc_package_length(&msg.head);
  91. unsigned char* writebuf = NULL;
  92. HV_ALLOC(writebuf, packlen);
  93. packlen = protorpc_pack(&msg, writebuf, packlen);
  94. if (packlen > 0) {
  95. printf("< %s\n", res.DebugString().c_str());
  96. res.SerializeToArray(writebuf + PROTORPC_HEAD_LENGTH, msg.head.length);
  97. channel->write(writebuf, packlen);
  98. }
  99. HV_FREE(writebuf);
  100. }
  101. };
  102. int main(int argc, char** argv) {
  103. if (argc < 2) {
  104. printf("Usage: %s port\n", argv[0]);
  105. return -10;
  106. }
  107. int port = atoi(argv[1]);
  108. ProtoRpcServer srv;
  109. int listenfd = srv.listen(port);
  110. if (listenfd < 0) {
  111. return -20;
  112. }
  113. printf("protorpc_server listen on port %d, listenfd=%d ...\n", port, listenfd);
  114. srv.setThreadNum(4);
  115. srv.start();
  116. while (1) hv_sleep(1);
  117. return 0;
  118. }