1
0

protorpc_client.cpp 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289
  1. /*
  2. * proto rpc client
  3. *
  4. * @build make protorpc
  5. * @server bin/protorpc_server 1234
  6. * @client bin/protorpc_client 127.0.0.1 1234 add 1 2
  7. *
  8. */
  9. #include "TcpClient.h"
  10. #include <atomic>
  11. #include <mutex>
  12. #include <future>
  13. using namespace hv;
  14. #include "protorpc.h"
  15. #include "generated/base.pb.h"
  16. #include "generated/calc.pb.h"
  17. #include "generated/login.pb.h"
  18. // valgrind --leak-check=full --show-leak-kinds=all
  19. class ProtobufRAII {
  20. public:
  21. ProtobufRAII() {
  22. }
  23. ~ProtobufRAII() {
  24. google::protobuf::ShutdownProtobufLibrary();
  25. }
  26. };
  27. static ProtobufRAII s_protobuf;
  28. namespace protorpc {
  29. typedef std::shared_ptr<protorpc::Request> RequestPtr;
  30. typedef std::shared_ptr<protorpc::Response> ResponsePtr;
  31. enum ProtoRpcResult {
  32. kRpcSuccess = 0,
  33. kRpcTimeout = -1,
  34. kRpcError = -2,
  35. kRpcNoResult = -3,
  36. kRpcParseError = -4,
  37. };
  38. class ProtoRpcContext {
  39. public:
  40. protorpc::RequestPtr req;
  41. std::promise<protorpc::ResponsePtr> res;
  42. };
  43. typedef std::shared_ptr<ProtoRpcContext> ContextPtr;
  44. class ProtoRpcClient : public TcpClient {
  45. public:
  46. ProtoRpcClient() : TcpClient()
  47. {
  48. connect_status = kInitialized;
  49. // reconnect setting
  50. reconn_setting_t reconn;
  51. reconn_setting_init(&reconn);
  52. reconn.min_delay = 1000;
  53. reconn.max_delay = 10000;
  54. reconn.delay_policy = 2;
  55. setReconnect(&reconn);
  56. // init protorpc_unpack_setting
  57. unpack_setting_t protorpc_unpack_setting;
  58. memset(&protorpc_unpack_setting, 0, sizeof(unpack_setting_t));
  59. protorpc_unpack_setting.mode = UNPACK_BY_LENGTH_FIELD;
  60. protorpc_unpack_setting.package_max_length = DEFAULT_PACKAGE_MAX_LENGTH;
  61. protorpc_unpack_setting.body_offset = PROTORPC_HEAD_LENGTH;
  62. protorpc_unpack_setting.length_field_offset = PROTORPC_HEAD_LENGTH_FIELD_OFFSET;
  63. protorpc_unpack_setting.length_field_bytes = PROTORPC_HEAD_LENGTH_FIELD_BYTES;
  64. protorpc_unpack_setting.length_field_coding = ENCODE_BY_BIG_ENDIAN;
  65. setUnpack(&protorpc_unpack_setting);
  66. onConnection = [this](const SocketChannelPtr& channel) {
  67. std::string peeraddr = channel->peeraddr();
  68. if (channel->isConnected()) {
  69. connect_status = kConnected;
  70. printf("connected to %s! connfd=%d\n", peeraddr.c_str(), channel->fd());
  71. } else {
  72. connect_status = kDisconnectd;
  73. printf("disconnected to %s! connfd=%d\n", peeraddr.c_str(), channel->fd());
  74. }
  75. };
  76. onMessage = [this](const SocketChannelPtr& channel, Buffer* buf) {
  77. // protorpc_unpack
  78. protorpc_message msg;
  79. memset(&msg, 0, sizeof(msg));
  80. int packlen = protorpc_unpack(&msg, buf->data(), buf->size());
  81. if (packlen < 0) {
  82. printf("protorpc_unpack failed!\n");
  83. return;
  84. }
  85. assert(packlen == buf->size());
  86. if (protorpc_head_check(&msg.head) != 0) {
  87. printf("protorpc_head_check failed!\n");
  88. return;
  89. }
  90. // Response::ParseFromArray
  91. auto res = std::make_shared<protorpc::Response>();
  92. if (!res->ParseFromArray(msg.body, msg.head.length)) {
  93. return;
  94. }
  95. // id => res
  96. calls_mutex.lock();
  97. auto iter = calls.find(res->id());
  98. if (iter == calls.end()) {
  99. calls_mutex.unlock();
  100. return;
  101. }
  102. auto ctx = iter->second;
  103. calls_mutex.unlock();
  104. ctx->res.set_value(res);
  105. };
  106. }
  107. // @retval >0 connfd, <0 error, =0 connecting
  108. int connect(int port, const char* host = "127.0.0.1", bool wait_connect = true, int connect_timeout = 5000) {
  109. int fd = createsocket(port, host);
  110. if (fd < 0) {
  111. return fd;
  112. }
  113. setConnectTimeout(connect_timeout);
  114. connect_status = kConnecting;
  115. start();
  116. if (wait_connect) {
  117. while (connect_status == kConnecting) hv_msleep(1);
  118. return connect_status == kConnected ? fd : -1;
  119. }
  120. return 0;
  121. }
  122. bool isConnected() {
  123. return connect_status == kConnected;
  124. }
  125. protorpc::ResponsePtr call(protorpc::RequestPtr& req, int timeout_ms = 10000) {
  126. if (!isConnected()) {
  127. printf("RPC not connected!\n");
  128. return NULL;
  129. }
  130. static std::atomic<uint64_t> s_id = ATOMIC_VAR_INIT(0);
  131. req->set_id(++s_id);
  132. req->id();
  133. auto ctx = std::make_shared<protorpc::ProtoRpcContext>();
  134. ctx->req = req;
  135. calls_mutex.lock();
  136. calls[req->id()] = ctx;
  137. calls_mutex.unlock();
  138. // Request::SerializeToArray + protorpc_pack
  139. protorpc_message msg;
  140. protorpc_message_init(&msg);
  141. msg.head.length = req->ByteSize();
  142. int packlen = protorpc_package_length(&msg.head);
  143. unsigned char* writebuf = NULL;
  144. HV_STACK_ALLOC(writebuf, packlen);
  145. packlen = protorpc_pack(&msg, writebuf, packlen);
  146. if (packlen > 0) {
  147. printf("%s\n", req->DebugString().c_str());
  148. req->SerializeToArray(writebuf + PROTORPC_HEAD_LENGTH, msg.head.length);
  149. channel->write(writebuf, packlen);
  150. }
  151. HV_STACK_FREE(writebuf);
  152. protorpc::ResponsePtr res;
  153. if (timeout_ms > 0) {
  154. // wait until response come or timeout
  155. auto fut = ctx->res.get_future();
  156. auto status = fut.wait_for(std::chrono::milliseconds(timeout_ms));
  157. if (status == std::future_status::ready) {
  158. res = fut.get();
  159. if (res->has_error()) {
  160. printf("RPC error:\n%s\n", res->error().DebugString().c_str());
  161. }
  162. } else if (status == std::future_status::timeout) {
  163. printf("RPC timeout!\n");
  164. } else {
  165. printf("RPC unexpected status: %d!\n", (int)status);
  166. }
  167. }
  168. calls_mutex.lock();
  169. calls.erase(req->id());
  170. calls_mutex.unlock();
  171. return res;
  172. }
  173. int calc(const char* method, int num1, int num2, int& out) {
  174. auto req = std::make_shared<protorpc::Request>();
  175. // method
  176. req->set_method(method);
  177. // params
  178. protorpc::CalcParam param1, param2;
  179. param1.set_num(num1);
  180. param2.set_num(num2);
  181. req->add_params()->assign(param1.SerializeAsString());
  182. req->add_params()->assign(param2.SerializeAsString());
  183. auto res = call(req);
  184. if (res == NULL) return kRpcTimeout;
  185. if (res->has_error()) return kRpcError;
  186. if (res->result().empty()) return kRpcNoResult;
  187. protorpc::CalcResult result;
  188. if (!result.ParseFromString(res->result())) return kRpcParseError;
  189. out = result.num();
  190. return kRpcSuccess;
  191. }
  192. int login(const protorpc::LoginParam& param, protorpc::LoginResult* result) {
  193. auto req = std::make_shared<protorpc::Request>();
  194. // method
  195. req->set_method("login");
  196. // params
  197. req->add_params()->assign(param.SerializeAsString());
  198. auto res = call(req);
  199. if (res == NULL) return kRpcTimeout;
  200. if (res->has_error()) return kRpcError;
  201. if (res->result().empty()) return kRpcNoResult;
  202. if (!result->ParseFromString(res->result())) return kRpcParseError;
  203. return kRpcSuccess;
  204. }
  205. private:
  206. enum ConnectStatus {
  207. kInitialized,
  208. kConnecting,
  209. kConnected,
  210. kDisconnectd,
  211. };
  212. std::atomic<ConnectStatus> connect_status;
  213. std::map<uint64_t, protorpc::ContextPtr> calls;
  214. std::mutex calls_mutex;
  215. };
  216. }
  217. int main(int argc, char** argv) {
  218. if (argc < 6) {
  219. printf("Usage: %s host port method param1 param2\n", argv[0]);
  220. printf("method = [add, sub, mul, div]\n");
  221. printf("Examples:\n");
  222. printf(" %s 127.0.0.1 1234 add 1 2\n", argv[0]);
  223. printf(" %s 127.0.0.1 1234 div 1 0\n", argv[0]);
  224. return -10;
  225. }
  226. const char* host = argv[1];
  227. int port = atoi(argv[2]);
  228. const char* method = argv[3];
  229. const char* param1 = argv[4];
  230. const char* param2 = argv[5];
  231. protorpc::ProtoRpcClient cli;
  232. int ret = cli.connect(port, host, true);
  233. if (ret < 0) {
  234. return -20;
  235. }
  236. // test login
  237. {
  238. protorpc::LoginParam param;
  239. param.set_username("admin");
  240. param.set_password("123456");
  241. protorpc::LoginResult result;
  242. if (cli.login(param, &result) == protorpc::kRpcSuccess) {
  243. printf("login success!\n");
  244. printf("%s\n", result.DebugString().c_str());
  245. } else {
  246. printf("login failed!\n");
  247. }
  248. }
  249. // test calc
  250. {
  251. int num1 = atoi(param1);
  252. int num2 = atoi(param2);
  253. int result = 0;
  254. if (cli.calc(method, num1, num2, result) == protorpc::kRpcSuccess) {
  255. printf("calc success!\n");
  256. printf("%d %s %d = %d\n", num1, method, num2, result);
  257. } else {
  258. printf("calc failed!\n");
  259. }
  260. }
  261. return 0;
  262. }