1
0

TcpClient.h 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308
  1. #ifndef HV_TCP_CLIENT_HPP_
  2. #define HV_TCP_CLIENT_HPP_
  3. #include "hsocket.h"
  4. #include "hssl.h"
  5. #include "hlog.h"
  6. #include "EventLoopThread.h"
  7. #include "Channel.h"
  8. namespace hv {
  9. template<class TSocketChannel = SocketChannel>
  10. class TcpClientEventLoopTmpl {
  11. public:
  12. typedef std::shared_ptr<TSocketChannel> TSocketChannelPtr;
  13. TcpClientEventLoopTmpl(EventLoopPtr loop = NULL) {
  14. loop_ = loop ? loop : std::make_shared<EventLoop>();
  15. remote_port = 0;
  16. connect_timeout = HIO_DEFAULT_CONNECT_TIMEOUT;
  17. tls = false;
  18. tls_setting = NULL;
  19. reconn_setting = NULL;
  20. unpack_setting = NULL;
  21. }
  22. virtual ~TcpClientEventLoopTmpl() {
  23. HV_FREE(tls_setting);
  24. HV_FREE(reconn_setting);
  25. HV_FREE(unpack_setting);
  26. }
  27. const EventLoopPtr& loop() {
  28. return loop_;
  29. }
  30. // delete thread-safe
  31. void deleteInLoop() {
  32. loop_->runInLoop([this](){
  33. delete this;
  34. });
  35. }
  36. // NOTE: By default, not bind local port. If necessary, you can call bind() after createsocket().
  37. // @retval >=0 connfd, <0 error
  38. int createsocket(int remote_port, const char* remote_host = "127.0.0.1") {
  39. memset(&remote_addr, 0, sizeof(remote_addr));
  40. int ret = sockaddr_set_ipport(&remote_addr, remote_host, remote_port);
  41. if (ret != 0) {
  42. return NABS(ret);
  43. }
  44. this->remote_host = remote_host;
  45. this->remote_port = remote_port;
  46. return createsocket(&remote_addr.sa);
  47. }
  48. int createsocket(struct sockaddr* remote_addr) {
  49. int connfd = ::socket(remote_addr->sa_family, SOCK_STREAM, 0);
  50. // SOCKADDR_PRINT(remote_addr);
  51. if (connfd < 0) {
  52. perror("socket");
  53. return -2;
  54. }
  55. hio_t* io = hio_get(loop_->loop(), connfd);
  56. assert(io != NULL);
  57. hio_set_peeraddr(io, remote_addr, SOCKADDR_LEN(remote_addr));
  58. channel = std::make_shared<TSocketChannel>(io);
  59. return connfd;
  60. }
  61. int bind(int local_port, const char* local_host = "0.0.0.0") {
  62. sockaddr_u local_addr;
  63. memset(&local_addr, 0, sizeof(local_addr));
  64. int ret = sockaddr_set_ipport(&local_addr, local_host, local_port);
  65. if (ret != 0) {
  66. return NABS(ret);
  67. }
  68. return bind(&local_addr.sa);
  69. }
  70. int bind(struct sockaddr* local_addr) {
  71. if (channel == NULL || channel->isClosed()) {
  72. return -1;
  73. }
  74. int ret = ::bind(channel->fd(), local_addr, SOCKADDR_LEN(local_addr));
  75. if (ret != 0) {
  76. perror("bind");
  77. }
  78. return ret;
  79. }
  80. // closesocket thread-safe
  81. void closesocket() {
  82. if (channel && channel->status != SocketChannel::CLOSED) {
  83. loop_->runInLoop([this](){
  84. if (channel) {
  85. setReconnect(NULL);
  86. channel->close();
  87. }
  88. });
  89. }
  90. }
  91. int startConnect() {
  92. if (channel == NULL || channel->isClosed()) {
  93. int connfd = createsocket(&remote_addr.sa);
  94. if (connfd < 0) {
  95. hloge("createsocket %s:%d return %d!\n", remote_host.c_str(), remote_port, connfd);
  96. return connfd;
  97. }
  98. }
  99. if (channel == NULL || channel->status >= SocketChannel::CONNECTING) {
  100. return -1;
  101. }
  102. if (connect_timeout) {
  103. channel->setConnectTimeout(connect_timeout);
  104. }
  105. if (tls) {
  106. channel->enableSSL();
  107. if (tls_setting) {
  108. int ret = channel->newSslCtx(tls_setting);
  109. if (ret != 0) {
  110. hloge("new SSL_CTX failed: %d", ret);
  111. closesocket();
  112. return ret;
  113. }
  114. }
  115. if (!is_ipaddr(remote_host.c_str())) {
  116. channel->setHostname(remote_host);
  117. }
  118. }
  119. channel->onconnect = [this]() {
  120. if (unpack_setting) {
  121. channel->setUnpack(unpack_setting);
  122. }
  123. channel->startRead();
  124. if (onConnection) {
  125. onConnection(channel);
  126. }
  127. if (reconn_setting) {
  128. reconn_setting_reset(reconn_setting);
  129. }
  130. };
  131. channel->onread = [this](Buffer* buf) {
  132. if (onMessage) {
  133. onMessage(channel, buf);
  134. }
  135. };
  136. channel->onwrite = [this](Buffer* buf) {
  137. if (onWriteComplete) {
  138. onWriteComplete(channel, buf);
  139. }
  140. };
  141. channel->onclose = [this]() {
  142. bool reconnect = reconn_setting != NULL;
  143. if (onConnection) {
  144. onConnection(channel);
  145. }
  146. if (reconnect) {
  147. startReconnect();
  148. }
  149. };
  150. return channel->startConnect();
  151. }
  152. int startReconnect() {
  153. if (!reconn_setting) return -1;
  154. if (!reconn_setting_can_retry(reconn_setting)) return -2;
  155. uint32_t delay = reconn_setting_calc_delay(reconn_setting);
  156. hlogi("reconnect... cnt=%d, delay=%d", reconn_setting->cur_retry_cnt, reconn_setting->cur_delay);
  157. loop_->setTimeout(delay, [this](TimerID timerID){
  158. startConnect();
  159. });
  160. return 0;
  161. }
  162. // start thread-safe
  163. void start() {
  164. loop_->runInLoop(std::bind(&TcpClientEventLoopTmpl::startConnect, this));
  165. }
  166. bool isConnected() {
  167. if (channel == NULL) return false;
  168. return channel->isConnected();
  169. }
  170. // send thread-safe
  171. int send(const void* data, int size) {
  172. if (!isConnected()) return -1;
  173. return channel->write(data, size);
  174. }
  175. int send(Buffer* buf) {
  176. return send(buf->data(), buf->size());
  177. }
  178. int send(const std::string& str) {
  179. return send(str.data(), str.size());
  180. }
  181. int withTLS(hssl_ctx_opt_t* opt = NULL) {
  182. tls = true;
  183. if (opt) {
  184. if (tls_setting == NULL) {
  185. HV_ALLOC_SIZEOF(tls_setting);
  186. }
  187. opt->endpoint = HSSL_CLIENT;
  188. *tls_setting = *opt;
  189. }
  190. return 0;
  191. }
  192. void setConnectTimeout(int ms) {
  193. connect_timeout = ms;
  194. }
  195. void setReconnect(reconn_setting_t* setting) {
  196. if (setting == NULL) {
  197. HV_FREE(reconn_setting);
  198. return;
  199. }
  200. if (reconn_setting == NULL) {
  201. HV_ALLOC_SIZEOF(reconn_setting);
  202. }
  203. *reconn_setting = *setting;
  204. }
  205. bool isReconnect() {
  206. return reconn_setting && reconn_setting->cur_retry_cnt > 0;
  207. }
  208. void setUnpack(unpack_setting_t* setting) {
  209. if (setting == NULL) {
  210. HV_FREE(unpack_setting);
  211. return;
  212. }
  213. if (unpack_setting == NULL) {
  214. HV_ALLOC_SIZEOF(unpack_setting);
  215. }
  216. *unpack_setting = *setting;
  217. }
  218. public:
  219. TSocketChannelPtr channel;
  220. std::string remote_host;
  221. int remote_port;
  222. sockaddr_u remote_addr;
  223. int connect_timeout;
  224. bool tls;
  225. hssl_ctx_opt_t* tls_setting;
  226. reconn_setting_t* reconn_setting;
  227. unpack_setting_t* unpack_setting;
  228. // Callback
  229. std::function<void(const TSocketChannelPtr&)> onConnection;
  230. std::function<void(const TSocketChannelPtr&, Buffer*)> onMessage;
  231. // NOTE: Use Channel::isWriteComplete in onWriteComplete callback to determine whether all data has been written.
  232. std::function<void(const TSocketChannelPtr&, Buffer*)> onWriteComplete;
  233. private:
  234. EventLoopPtr loop_;
  235. };
  236. template<class TSocketChannel = SocketChannel>
  237. class TcpClientTmpl : private EventLoopThread, public TcpClientEventLoopTmpl<TSocketChannel> {
  238. public:
  239. TcpClientTmpl(EventLoopPtr loop = NULL)
  240. : EventLoopThread(loop)
  241. , TcpClientEventLoopTmpl<TSocketChannel>(EventLoopThread::loop())
  242. , is_loop_owner(loop == NULL)
  243. {}
  244. virtual ~TcpClientTmpl() {
  245. stop(true);
  246. }
  247. const EventLoopPtr& loop() {
  248. return EventLoopThread::loop();
  249. }
  250. // start thread-safe
  251. void start(bool wait_threads_started = true) {
  252. if (isRunning()) {
  253. TcpClientEventLoopTmpl<TSocketChannel>::start();
  254. } else {
  255. EventLoopThread::start(wait_threads_started, [this]() {
  256. TcpClientTmpl::startConnect();
  257. return 0;
  258. });
  259. }
  260. }
  261. // stop thread-safe
  262. void stop(bool wait_threads_stopped = true) {
  263. TcpClientEventLoopTmpl<TSocketChannel>::closesocket();
  264. if (is_loop_owner) {
  265. EventLoopThread::stop(wait_threads_stopped);
  266. }
  267. }
  268. private:
  269. bool is_loop_owner;
  270. };
  271. typedef TcpClientTmpl<SocketChannel> TcpClient;
  272. }
  273. #endif // HV_TCP_CLIENT_HPP_