protorpc_client.cpp 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280
  1. /*
  2. * proto rpc client
  3. *
  4. * @build make protorpc
  5. * @server bin/protorpc_server 1234
  6. * @client bin/protorpc_client 127.0.0.1 1234 add 1 2
  7. *
  8. */
  9. #include "TcpClient.h"
  10. #include <mutex>
  11. #include <condition_variable>
  12. using namespace hv;
  13. #include "protorpc.h"
  14. #include "generated/base.pb.h"
  15. #include "generated/calc.pb.h"
  16. #include "generated/login.pb.h"
  17. // valgrind --leak-check=full --show-leak-kinds=all
  18. class ProtobufRAII {
  19. public:
  20. ProtobufRAII() {
  21. }
  22. ~ProtobufRAII() {
  23. google::protobuf::ShutdownProtobufLibrary();
  24. }
  25. };
  26. static ProtobufRAII s_protobuf;
  27. namespace protorpc {
  28. typedef std::shared_ptr<protorpc::Request> RequestPtr;
  29. typedef std::shared_ptr<protorpc::Response> ResponsePtr;
  30. enum ProtoRpcResult {
  31. kRpcSuccess = 0,
  32. kRpcTimeout = -1,
  33. kRpcError = -2,
  34. kRpcNoResult = -3,
  35. kRpcParseError = -4,
  36. };
  37. class ProtoRpcContext {
  38. public:
  39. protorpc::RequestPtr req;
  40. protorpc::ResponsePtr res;
  41. private:
  42. std::mutex _mutex;
  43. std::condition_variable _cond;
  44. public:
  45. void wait(int timeout_ms) {
  46. std::unique_lock<std::mutex> locker(_mutex);
  47. _cond.wait_for(locker, std::chrono::milliseconds(timeout_ms));
  48. }
  49. void notify() {
  50. _cond.notify_one();
  51. }
  52. };
  53. typedef std::shared_ptr<ProtoRpcContext> ContextPtr;
  54. class ProtoRpcClient : public TcpClient {
  55. public:
  56. ProtoRpcClient() : TcpClient()
  57. {
  58. connect_state = kInitialized;
  59. setConnectTimeout(5000);
  60. reconn_setting_t reconn;
  61. reconn_setting_init(&reconn);
  62. reconn.min_delay = 1000;
  63. reconn.max_delay = 10000;
  64. reconn.delay_policy = 2;
  65. setReconnect(&reconn);
  66. // init protorpc_unpack_setting
  67. unpack_setting_t protorpc_unpack_setting;
  68. memset(&protorpc_unpack_setting, 0, sizeof(unpack_setting_t));
  69. protorpc_unpack_setting.mode = UNPACK_BY_LENGTH_FIELD;
  70. protorpc_unpack_setting.package_max_length = DEFAULT_PACKAGE_MAX_LENGTH;
  71. protorpc_unpack_setting.body_offset = PROTORPC_HEAD_LENGTH;
  72. protorpc_unpack_setting.length_field_offset = PROTORPC_HEAD_LENGTH_FIELD_OFFSET;
  73. protorpc_unpack_setting.length_field_bytes = PROTORPC_HEAD_LENGTH_FIELD_BYTES;
  74. protorpc_unpack_setting.length_field_coding = ENCODE_BY_BIG_ENDIAN;
  75. setUnpack(&protorpc_unpack_setting);
  76. onConnection = [this](const SocketChannelPtr& channel) {
  77. std::string peeraddr = channel->peeraddr();
  78. if (channel->isConnected()) {
  79. connect_state = kConnected;
  80. printf("connected to %s! connfd=%d\n", peeraddr.c_str(), channel->fd());
  81. } else {
  82. connect_state = kDisconnectd;
  83. printf("disconnected to %s! connfd=%d\n", peeraddr.c_str(), channel->fd());
  84. }
  85. };
  86. onMessage = [this](const SocketChannelPtr& channel, Buffer* buf) {
  87. // protorpc_unpack
  88. protorpc_message msg;
  89. memset(&msg, 0, sizeof(msg));
  90. int packlen = protorpc_unpack(&msg, buf->data(), buf->size());
  91. if (packlen < 0) {
  92. printf("protorpc_unpack failed!\n");
  93. return;
  94. }
  95. assert(packlen == buf->size());
  96. if (protorpc_head_check(&msg.head) != 0) {
  97. printf("protorpc_head_check failed!\n");
  98. return;
  99. }
  100. // Response::ParseFromArray
  101. auto res = std::make_shared<protorpc::Response>();
  102. if (!res->ParseFromArray(msg.body, msg.head.length)) {
  103. return;
  104. }
  105. // id => res
  106. calls_mutex.lock();
  107. auto iter = calls.find(res->id());
  108. if (iter == calls.end()) {
  109. calls_mutex.unlock();
  110. return;
  111. }
  112. auto ctx = iter->second;
  113. calls_mutex.unlock();
  114. ctx->res = res;
  115. ctx->notify();
  116. };
  117. }
  118. int connect(int port, const char* host = "127.0.0.1") {
  119. createsocket(port, host);
  120. connect_state = kConnecting;
  121. start();
  122. return 0;
  123. }
  124. protorpc::ResponsePtr call(protorpc::RequestPtr& req, int timeout_ms = 10000) {
  125. if (connect_state != kConnected) {
  126. return NULL;
  127. }
  128. static std::atomic<uint64_t> s_id = ATOMIC_VAR_INIT(0);
  129. req->set_id(++s_id);
  130. req->id();
  131. auto ctx = std::make_shared<protorpc::ProtoRpcContext>();
  132. ctx->req = req;
  133. calls_mutex.lock();
  134. calls[req->id()] = ctx;
  135. calls_mutex.unlock();
  136. // Request::SerializeToArray + protorpc_pack
  137. protorpc_message msg;
  138. protorpc_message_init(&msg);
  139. msg.head.length = req->ByteSize();
  140. int packlen = protorpc_package_length(&msg.head);
  141. unsigned char* writebuf = NULL;
  142. HV_STACK_ALLOC(writebuf, packlen);
  143. packlen = protorpc_pack(&msg, writebuf, packlen);
  144. if (packlen > 0) {
  145. printf("%s\n", req->DebugString().c_str());
  146. req->SerializeToArray(writebuf + PROTORPC_HEAD_LENGTH, msg.head.length);
  147. channel->write(writebuf, packlen);
  148. }
  149. HV_STACK_FREE(writebuf);
  150. // wait until response come or timeout
  151. ctx->wait(timeout_ms);
  152. auto res = ctx->res;
  153. calls_mutex.lock();
  154. calls.erase(req->id());
  155. calls_mutex.unlock();
  156. if (res == NULL) {
  157. printf("RPC timeout!\n");
  158. } else if (res->has_error()) {
  159. printf("RPC error:\n%s\n", res->error().DebugString().c_str());
  160. }
  161. return res;
  162. }
  163. int calc(const char* method, int num1, int num2, int& out) {
  164. auto req = std::make_shared<protorpc::Request>();
  165. // method
  166. req->set_method(method);
  167. // params
  168. protorpc::CalcParam param1, param2;
  169. param1.set_num(num1);
  170. param2.set_num(num2);
  171. req->add_params()->assign(param1.SerializeAsString());
  172. req->add_params()->assign(param2.SerializeAsString());
  173. auto res = call(req);
  174. if (res == NULL) return kRpcTimeout;
  175. if (res->has_error()) return kRpcError;
  176. if (res->result().empty()) return kRpcNoResult;
  177. protorpc::CalcResult result;
  178. if (!result.ParseFromString(res->result())) return kRpcParseError;
  179. out = result.num();
  180. return kRpcSuccess;
  181. }
  182. int login(const protorpc::LoginParam& param, protorpc::LoginResult* result) {
  183. auto req = std::make_shared<protorpc::Request>();
  184. // method
  185. req->set_method("login");
  186. // params
  187. req->add_params()->assign(param.SerializeAsString());
  188. auto res = call(req);
  189. if (res == NULL) return kRpcTimeout;
  190. if (res->has_error()) return kRpcError;
  191. if (res->result().empty()) return kRpcNoResult;
  192. if (!result->ParseFromString(res->result())) return kRpcParseError;
  193. return kRpcSuccess;
  194. }
  195. enum {
  196. kInitialized,
  197. kConnecting,
  198. kConnected,
  199. kDisconnectd,
  200. } connect_state;
  201. std::map<uint64_t, protorpc::ContextPtr> calls;
  202. std::mutex calls_mutex;
  203. };
  204. }
  205. int main(int argc, char** argv) {
  206. if (argc < 6) {
  207. printf("Usage: %s host port method param1 param2\n", argv[0]);
  208. printf("method = [add, sub, mul, div]\n");
  209. printf("Examples:\n");
  210. printf(" %s 127.0.0.1 1234 add 1 2\n", argv[0]);
  211. printf(" %s 127.0.0.1 1234 div 1 0\n", argv[0]);
  212. return -10;
  213. }
  214. const char* host = argv[1];
  215. int port = atoi(argv[2]);
  216. const char* method = argv[3];
  217. const char* param1 = argv[4];
  218. const char* param2 = argv[5];
  219. protorpc::ProtoRpcClient cli;
  220. cli.connect(port, host);
  221. while (cli.connect_state == protorpc::ProtoRpcClient::kConnecting) hv_msleep(1);
  222. if (cli.connect_state == protorpc::ProtoRpcClient::kDisconnectd) {
  223. return -20;
  224. }
  225. // test login
  226. {
  227. protorpc::LoginParam param;
  228. param.set_username("admin");
  229. param.set_password("123456");
  230. protorpc::LoginResult result;
  231. if (cli.login(param, &result) == protorpc::kRpcSuccess) {
  232. printf("login success!\n");
  233. printf("%s\n", result.DebugString().c_str());
  234. } else {
  235. printf("login failed!\n");
  236. }
  237. }
  238. // test calc
  239. {
  240. int num1 = atoi(param1);
  241. int num2 = atoi(param2);
  242. int result = 0;
  243. if (cli.calc(method, num1, num2, result) == protorpc::kRpcSuccess) {
  244. printf("calc success!\n");
  245. printf("%d %s %d = %d\n", num1, method, num2, result);
  246. } else {
  247. printf("calc failed!\n");
  248. }
  249. }
  250. return 0;
  251. }