protorpc_client.cpp 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261
  1. /*
  2. * proto rpc client
  3. *
  4. * @build make protorpc
  5. * @server bin/protorpc_server 1234
  6. * @client bin/protorpc_client 127.0.0.1 1234 add 1 2
  7. *
  8. */
  9. #include "TcpClient.h"
  10. #include <mutex>
  11. #include <condition_variable>
  12. using namespace hv;
  13. #include "protorpc.h"
  14. #include "generated/base.pb.h"
  15. #include "generated/calc.pb.h"
  16. #include "generated/login.pb.h"
  17. namespace protorpc {
  18. typedef std::shared_ptr<protorpc::Request> RequestPtr;
  19. typedef std::shared_ptr<protorpc::Response> ResponsePtr;
  20. enum ProtoRpcResult {
  21. kRpcSuccess = 0,
  22. kRpcTimeout = -1,
  23. kRpcError = -2,
  24. kRpcNoResult = -3,
  25. kRpcParseError = -4,
  26. };
  27. class ProtoRpcContext {
  28. public:
  29. protorpc::RequestPtr req;
  30. protorpc::ResponsePtr res;
  31. private:
  32. std::mutex _mutex;
  33. std::condition_variable _cond;
  34. public:
  35. void wait(int timeout_ms) {
  36. std::unique_lock<std::mutex> locker(_mutex);
  37. _cond.wait_for(locker, std::chrono::milliseconds(timeout_ms));
  38. }
  39. void notify() {
  40. _cond.notify_one();
  41. }
  42. };
  43. typedef std::shared_ptr<ProtoRpcContext> ContextPtr;
  44. class ProtoRpcClient : public TcpClient {
  45. public:
  46. ProtoRpcClient() : TcpClient()
  47. {
  48. connect_state = kInitialized;
  49. setConnectTimeout(5000);
  50. ReconnectInfo reconn;
  51. reconn.min_delay = 1000;
  52. reconn.max_delay = 10000;
  53. reconn.delay_policy = 2;
  54. setReconnect(&reconn);
  55. // init protorpc_unpack_setting
  56. unpack_setting_t protorpc_unpack_setting;
  57. memset(&protorpc_unpack_setting, 0, sizeof(unpack_setting_t));
  58. protorpc_unpack_setting.mode = UNPACK_BY_LENGTH_FIELD;
  59. protorpc_unpack_setting.package_max_length = DEFAULT_PACKAGE_MAX_LENGTH;
  60. protorpc_unpack_setting.body_offset = PROTORPC_HEAD_LENGTH;
  61. protorpc_unpack_setting.length_field_offset = 1;
  62. protorpc_unpack_setting.length_field_bytes = 4;
  63. protorpc_unpack_setting.length_field_coding = ENCODE_BY_BIG_ENDIAN;
  64. setUnpack(&protorpc_unpack_setting);
  65. onConnection = [this](const SocketChannelPtr& channel) {
  66. std::string peeraddr = channel->peeraddr();
  67. if (channel->isConnected()) {
  68. connect_state = kConnected;
  69. printf("connected to %s! connfd=%d\n", peeraddr.c_str(), channel->fd());
  70. } else {
  71. connect_state = kDisconnectd;
  72. printf("disconnected to %s! connfd=%d\n", peeraddr.c_str(), channel->fd());
  73. }
  74. };
  75. onMessage = [this](const SocketChannelPtr& channel, Buffer* buf) {
  76. // protorpc_unpack
  77. protorpc_message msg;
  78. memset(&msg, 0, sizeof(msg));
  79. int packlen = protorpc_unpack(&msg, buf->data(), buf->size());
  80. if (packlen < 0) {
  81. printf("protorpc_unpack failed!\n");
  82. return;
  83. }
  84. assert(packlen == buf->size());
  85. // Response::ParseFromArray
  86. protorpc::ResponsePtr res(new protorpc::Response);
  87. if (!res->ParseFromArray(msg.body, msg.head.length)) {
  88. return;
  89. }
  90. // id => res
  91. calls_mutex.lock();
  92. auto iter = calls.find(res->id());
  93. if (iter == calls.end()) {
  94. return;
  95. }
  96. auto ctx = iter->second;
  97. calls_mutex.unlock();
  98. ctx->res = res;
  99. ctx->notify();
  100. };
  101. }
  102. int connect(int port, const char* host = "127.0.0.1") {
  103. createsocket(port, host);
  104. connect_state = kConnecting;
  105. start();
  106. return 0;
  107. }
  108. protorpc::ResponsePtr call(protorpc::RequestPtr& req, int timeout_ms = 10000) {
  109. if (connect_state != kConnected) {
  110. return NULL;
  111. }
  112. static std::atomic<uint64_t> s_id = ATOMIC_VAR_INIT(0);
  113. req->set_id(++s_id);
  114. req->id();
  115. auto ctx = new protorpc::ProtoRpcContext;
  116. ctx->req = req;
  117. calls[req->id()] = protorpc::ContextPtr(ctx);
  118. // Request::SerializeToArray + protorpc_pack
  119. protorpc_message msg;
  120. memset(&msg, 0, sizeof(msg));
  121. msg.head.length = req->ByteSizeLong();
  122. int packlen = protorpc_package_length(&msg.head);
  123. unsigned char* writebuf = NULL;
  124. HV_ALLOC(writebuf, packlen);
  125. packlen = protorpc_pack(&msg, writebuf, packlen);
  126. if (packlen > 0) {
  127. printf("%s\n", req->DebugString().c_str());
  128. req->SerializeToArray(writebuf + PROTORPC_HEAD_LENGTH, msg.head.length);
  129. channel->write(writebuf, packlen);
  130. }
  131. HV_FREE(writebuf);
  132. // wait until response come or timeout
  133. ctx->wait(timeout_ms);
  134. auto res = ctx->res;
  135. calls_mutex.lock();
  136. calls.erase(req->id());
  137. calls_mutex.unlock();
  138. if (res == NULL) {
  139. printf("RPC timeout!\n");
  140. } else if (res->has_error()) {
  141. printf("RPC error:\n%s\n", res->error().DebugString().c_str());
  142. }
  143. return res;
  144. }
  145. int calc(const char* method, int num1, int num2, int& out) {
  146. protorpc::RequestPtr req(new protorpc::Request);
  147. // method
  148. req->set_method(method);
  149. // params
  150. protorpc::CalcParam param1, param2;
  151. param1.set_num(num1);
  152. param2.set_num(num2);
  153. req->add_params()->assign(param1.SerializeAsString());
  154. req->add_params()->assign(param2.SerializeAsString());
  155. auto res = call(req);
  156. if (res == NULL) return kRpcTimeout;
  157. if (res->has_error()) return kRpcError;
  158. if (!res->has_result()) return kRpcNoResult;
  159. protorpc::CalcResult result;
  160. if (!result.ParseFromString(res->result())) return kRpcParseError;
  161. out = result.num();
  162. return kRpcSuccess;
  163. }
  164. int login(const protorpc::LoginParam& param, protorpc::LoginResult* result) {
  165. protorpc::RequestPtr req(new protorpc::Request);
  166. // method
  167. req->set_method("login");
  168. // params
  169. req->add_params()->assign(param.SerializeAsString());
  170. auto res = call(req);
  171. if (res == NULL) return kRpcTimeout;
  172. if (res->has_error()) return kRpcError;
  173. if (!res->has_result()) return kRpcNoResult;
  174. if (!result->ParseFromString(res->result())) return kRpcParseError;
  175. return kRpcSuccess;
  176. }
  177. enum {
  178. kInitialized,
  179. kConnecting,
  180. kConnected,
  181. kDisconnectd,
  182. } connect_state;
  183. std::map<uint64_t, protorpc::ContextPtr> calls;
  184. std::mutex calls_mutex;
  185. };
  186. }
  187. int main(int argc, char** argv) {
  188. if (argc < 6) {
  189. printf("Usage: %s host port method param1 param2\n", argv[0]);
  190. printf("method = [add, sub, mul, div]\n");
  191. printf("Examples:\n");
  192. printf(" %s 127.0.0.1 1234 add 1 2\n", argv[0]);
  193. printf(" %s 127.0.0.1 1234 div 1 0\n", argv[0]);
  194. return -10;
  195. }
  196. const char* host = argv[1];
  197. int port = atoi(argv[2]);
  198. const char* method = argv[3];
  199. const char* param1 = argv[4];
  200. const char* param2 = argv[5];
  201. protorpc::ProtoRpcClient cli;
  202. cli.connect(port, host);
  203. while (cli.connect_state == protorpc::ProtoRpcClient::kConnecting) hv_msleep(1);
  204. if (cli.connect_state == protorpc::ProtoRpcClient::kDisconnectd) {
  205. return -20;
  206. }
  207. // test login
  208. {
  209. protorpc::LoginParam param;
  210. param.set_username("admin");
  211. param.set_password("123456");
  212. protorpc::LoginResult result;
  213. if (cli.login(param, &result) == protorpc::kRpcSuccess) {
  214. printf("login success!\n");
  215. printf("%s\n", result.DebugString().c_str());
  216. } else {
  217. printf("login failed!\n");
  218. }
  219. }
  220. // test calc
  221. {
  222. int num1 = atoi(param1);
  223. int num2 = atoi(param2);
  224. int result = 0;
  225. if (cli.calc(method, num1, num2, result) == protorpc::kRpcSuccess) {
  226. printf("calc success!\n");
  227. printf("%d %s %d = %d\n", num1, method, num2, result);
  228. } else {
  229. printf("calc failed!\n");
  230. }
  231. }
  232. return 0;
  233. }