protorpc_client.cpp 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265
  1. /*
  2. * proto rpc client
  3. *
  4. * @build make protorpc
  5. * @server bin/protorpc_server 1234
  6. * @client bin/protorpc_client 127.0.0.1 1234 add 1 2
  7. *
  8. */
  9. #include "TcpClient.h"
  10. #include <mutex>
  11. #include <condition_variable>
  12. using namespace hv;
  13. #include "protorpc.h"
  14. #include "generated/base.pb.h"
  15. #include "generated/calc.pb.h"
  16. #include "generated/login.pb.h"
  17. namespace protorpc {
  18. typedef std::shared_ptr<protorpc::Request> RequestPtr;
  19. typedef std::shared_ptr<protorpc::Response> ResponsePtr;
  20. enum ProtoRpcResult {
  21. kRpcSuccess = 0,
  22. kRpcTimeout = -1,
  23. kRpcError = -2,
  24. kRpcNoResult = -3,
  25. kRpcParseError = -4,
  26. };
  27. class ProtoRpcContext {
  28. public:
  29. protorpc::RequestPtr req;
  30. protorpc::ResponsePtr res;
  31. private:
  32. std::mutex _mutex;
  33. std::condition_variable _cond;
  34. public:
  35. void wait(int timeout_ms) {
  36. std::unique_lock<std::mutex> locker(_mutex);
  37. _cond.wait_for(locker, std::chrono::milliseconds(timeout_ms));
  38. }
  39. void notify() {
  40. _cond.notify_one();
  41. }
  42. };
  43. typedef std::shared_ptr<ProtoRpcContext> ContextPtr;
  44. class ProtoRpcClient : public TcpClient {
  45. public:
  46. ProtoRpcClient() : TcpClient()
  47. {
  48. connect_state = kInitialized;
  49. setConnectTimeout(5000);
  50. ReconnectInfo reconn;
  51. reconn.min_delay = 1000;
  52. reconn.max_delay = 10000;
  53. reconn.delay_policy = 2;
  54. setReconnect(&reconn);
  55. // init protorpc_unpack_setting
  56. unpack_setting_t protorpc_unpack_setting;
  57. memset(&protorpc_unpack_setting, 0, sizeof(unpack_setting_t));
  58. protorpc_unpack_setting.mode = UNPACK_BY_LENGTH_FIELD;
  59. protorpc_unpack_setting.package_max_length = DEFAULT_PACKAGE_MAX_LENGTH;
  60. protorpc_unpack_setting.body_offset = PROTORPC_HEAD_LENGTH;
  61. protorpc_unpack_setting.length_field_offset = PROTORPC_HEAD_LENGTH_FIELD_OFFSET;
  62. protorpc_unpack_setting.length_field_bytes = PROTORPC_HEAD_LENGTH_FIELD_BYTES;
  63. protorpc_unpack_setting.length_field_coding = ENCODE_BY_BIG_ENDIAN;
  64. setUnpack(&protorpc_unpack_setting);
  65. onConnection = [this](const SocketChannelPtr& channel) {
  66. std::string peeraddr = channel->peeraddr();
  67. if (channel->isConnected()) {
  68. connect_state = kConnected;
  69. printf("connected to %s! connfd=%d\n", peeraddr.c_str(), channel->fd());
  70. } else {
  71. connect_state = kDisconnectd;
  72. printf("disconnected to %s! connfd=%d\n", peeraddr.c_str(), channel->fd());
  73. }
  74. };
  75. onMessage = [this](const SocketChannelPtr& channel, Buffer* buf) {
  76. // protorpc_unpack
  77. protorpc_message msg;
  78. memset(&msg, 0, sizeof(msg));
  79. int packlen = protorpc_unpack(&msg, buf->data(), buf->size());
  80. if (packlen < 0) {
  81. printf("protorpc_unpack failed!\n");
  82. return;
  83. }
  84. assert(packlen == buf->size());
  85. if (protorpc_head_check(&msg.head) != 0) {
  86. printf("protorpc_head_check failed!\n");
  87. return;
  88. }
  89. // Response::ParseFromArray
  90. protorpc::ResponsePtr res(new protorpc::Response);
  91. if (!res->ParseFromArray(msg.body, msg.head.length)) {
  92. return;
  93. }
  94. // id => res
  95. calls_mutex.lock();
  96. auto iter = calls.find(res->id());
  97. if (iter == calls.end()) {
  98. return;
  99. }
  100. auto ctx = iter->second;
  101. calls_mutex.unlock();
  102. ctx->res = res;
  103. ctx->notify();
  104. };
  105. }
  106. int connect(int port, const char* host = "127.0.0.1") {
  107. createsocket(port, host);
  108. connect_state = kConnecting;
  109. start();
  110. return 0;
  111. }
  112. protorpc::ResponsePtr call(protorpc::RequestPtr& req, int timeout_ms = 10000) {
  113. if (connect_state != kConnected) {
  114. return NULL;
  115. }
  116. static std::atomic<uint64_t> s_id = ATOMIC_VAR_INIT(0);
  117. req->set_id(++s_id);
  118. req->id();
  119. auto ctx = new protorpc::ProtoRpcContext;
  120. ctx->req = req;
  121. calls[req->id()] = protorpc::ContextPtr(ctx);
  122. // Request::SerializeToArray + protorpc_pack
  123. protorpc_message msg;
  124. protorpc_message_init(&msg);
  125. msg.head.length = req->ByteSize();
  126. int packlen = protorpc_package_length(&msg.head);
  127. unsigned char* writebuf = NULL;
  128. HV_ALLOC(writebuf, packlen);
  129. packlen = protorpc_pack(&msg, writebuf, packlen);
  130. if (packlen > 0) {
  131. printf("%s\n", req->DebugString().c_str());
  132. req->SerializeToArray(writebuf + PROTORPC_HEAD_LENGTH, msg.head.length);
  133. channel->write(writebuf, packlen);
  134. }
  135. HV_FREE(writebuf);
  136. // wait until response come or timeout
  137. ctx->wait(timeout_ms);
  138. auto res = ctx->res;
  139. calls_mutex.lock();
  140. calls.erase(req->id());
  141. calls_mutex.unlock();
  142. if (res == NULL) {
  143. printf("RPC timeout!\n");
  144. } else if (res->has_error()) {
  145. printf("RPC error:\n%s\n", res->error().DebugString().c_str());
  146. }
  147. return res;
  148. }
  149. int calc(const char* method, int num1, int num2, int& out) {
  150. protorpc::RequestPtr req(new protorpc::Request);
  151. // method
  152. req->set_method(method);
  153. // params
  154. protorpc::CalcParam param1, param2;
  155. param1.set_num(num1);
  156. param2.set_num(num2);
  157. req->add_params()->assign(param1.SerializeAsString());
  158. req->add_params()->assign(param2.SerializeAsString());
  159. auto res = call(req);
  160. if (res == NULL) return kRpcTimeout;
  161. if (res->has_error()) return kRpcError;
  162. if (res->result().empty()) return kRpcNoResult;
  163. protorpc::CalcResult result;
  164. if (!result.ParseFromString(res->result())) return kRpcParseError;
  165. out = result.num();
  166. return kRpcSuccess;
  167. }
  168. int login(const protorpc::LoginParam& param, protorpc::LoginResult* result) {
  169. protorpc::RequestPtr req(new protorpc::Request);
  170. // method
  171. req->set_method("login");
  172. // params
  173. req->add_params()->assign(param.SerializeAsString());
  174. auto res = call(req);
  175. if (res == NULL) return kRpcTimeout;
  176. if (res->has_error()) return kRpcError;
  177. if (res->result().empty()) return kRpcNoResult;
  178. if (!result->ParseFromString(res->result())) return kRpcParseError;
  179. return kRpcSuccess;
  180. }
  181. enum {
  182. kInitialized,
  183. kConnecting,
  184. kConnected,
  185. kDisconnectd,
  186. } connect_state;
  187. std::map<uint64_t, protorpc::ContextPtr> calls;
  188. std::mutex calls_mutex;
  189. };
  190. }
  191. int main(int argc, char** argv) {
  192. if (argc < 6) {
  193. printf("Usage: %s host port method param1 param2\n", argv[0]);
  194. printf("method = [add, sub, mul, div]\n");
  195. printf("Examples:\n");
  196. printf(" %s 127.0.0.1 1234 add 1 2\n", argv[0]);
  197. printf(" %s 127.0.0.1 1234 div 1 0\n", argv[0]);
  198. return -10;
  199. }
  200. const char* host = argv[1];
  201. int port = atoi(argv[2]);
  202. const char* method = argv[3];
  203. const char* param1 = argv[4];
  204. const char* param2 = argv[5];
  205. protorpc::ProtoRpcClient cli;
  206. cli.connect(port, host);
  207. while (cli.connect_state == protorpc::ProtoRpcClient::kConnecting) hv_msleep(1);
  208. if (cli.connect_state == protorpc::ProtoRpcClient::kDisconnectd) {
  209. return -20;
  210. }
  211. // test login
  212. {
  213. protorpc::LoginParam param;
  214. param.set_username("admin");
  215. param.set_password("123456");
  216. protorpc::LoginResult result;
  217. if (cli.login(param, &result) == protorpc::kRpcSuccess) {
  218. printf("login success!\n");
  219. printf("%s\n", result.DebugString().c_str());
  220. } else {
  221. printf("login failed!\n");
  222. }
  223. }
  224. // test calc
  225. {
  226. int num1 = atoi(param1);
  227. int num2 = atoi(param2);
  228. int result = 0;
  229. if (cli.calc(method, num1, num2, result) == protorpc::kRpcSuccess) {
  230. printf("calc success!\n");
  231. printf("%d %s %d = %d\n", num1, method, num2, result);
  232. } else {
  233. printf("calc failed!\n");
  234. }
  235. }
  236. return 0;
  237. }