| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251 |
- /*
- * tcp client demo
- *
- * @build make examples
- * @server bin/tcp_echo_server 1234
- * @client bin/tcp_client_test 127.0.0.1 1234
- *
- */
- #include "hloop.h"
- #include "hssl.h"
- #include "hmutex.h"
- #include "hbase.h"
- #include "herr.h"
- #define TEST_SSL 0
- #define TEST_UNPACK 0
- #define TEST_RECONNECT 1
- // @see mqtt/mqtt_client.h
- typedef struct tcp_client_s {
- // connect: host:port
- char host[256];
- int port;
- int connect_timeout; // ms
- // reconnect
- reconn_setting_t* reconn_setting;
- // flags
- unsigned char ssl: 1; // Read Only
- unsigned char alloced_ssl_ctx: 1; // intern
- unsigned char connected : 1;
- // privdata
- hloop_t* loop;
- hio_t* io;
- htimer_t* reconn_timer;
- // SSL/TLS
- hssl_ctx_t ssl_ctx;
- // thread-safe
- hmutex_t mutex_;
- // ...
- } tcp_client_t;
- static tcp_client_t* tcp_client_new(hloop_t* loop DEFAULT(NULL));
- static void tcp_client_run (tcp_client_t* cli);
- static void tcp_client_stop(tcp_client_t* cli);
- static void tcp_client_free(tcp_client_t* cli);
- // SSL/TLS
- static int tcp_client_set_ssl_ctx(tcp_client_t* cli, hssl_ctx_t ssl_ctx);
- static int tcp_client_new_ssl_ctx(tcp_client_t* cli, hssl_ctx_opt_t* opt);
- // reconnect
- static int tcp_client_set_reconnect(tcp_client_t* cli, reconn_setting_t* reconn);
- static int tcp_client_reconnect(tcp_client_t* cli);
- static void tcp_client_set_connnect_timeout(tcp_client_t* cli, int timeout_ms);
- static int tcp_client_connect(tcp_client_t* cli, const char* host, int port, int ssl);
- static int tcp_client_disconnect(tcp_client_t* cli);
- static bool tcp_client_is_connected(tcp_client_t* cli);
- static int tcp_client_send(tcp_client_t* cli, const void* buf, int len);
- static void reconnect_timer_cb(htimer_t* timer) {
- tcp_client_t* cli = (tcp_client_t*)hevent_userdata(timer);
- if (cli == NULL) return;
- cli->reconn_timer = NULL;
- tcp_client_reconnect(cli);
- }
- static void on_close(hio_t* io) {
- printf("onclose: connfd=%d error=%d\n", hio_fd(io), hio_error(io));
- tcp_client_t* cli = (tcp_client_t*)hevent_userdata(io);
- cli->connected = 0;
- // reconnect
- if (cli->reconn_setting && reconn_setting_can_retry(cli->reconn_setting)) {
- uint32_t delay = reconn_setting_calc_delay(cli->reconn_setting);
- printf("reconnect cnt=%d, delay=%d ...\n", cli->reconn_setting->cur_retry_cnt, cli->reconn_setting->cur_delay);
- cli->reconn_timer = htimer_add(cli->loop, reconnect_timer_cb, delay, 1);
- hevent_set_userdata(cli->reconn_timer, cli);
- }
- }
- static void on_message(hio_t* io, void* buf, int len) {
- printf("onmessage: %.*s\n", len, (char*)buf);
- tcp_client_t* cli = (tcp_client_t*)hevent_userdata(io);
- // ...
- }
- static void on_connect(hio_t* io) {
- printf("onconnect: connfd=%d\n", hio_fd(io));
- tcp_client_t* cli = (tcp_client_t*)hevent_userdata(io);
- cli->connected = 1;
- #if TEST_UNPACK
- static unpack_setting_t s_unpack_setting;
- s_unpack_setting.mode = UNPACK_BY_DELIMITER;
- s_unpack_setting.package_max_length = DEFAULT_PACKAGE_MAX_LENGTH;
- s_unpack_setting.delimiter_bytes = 2;
- s_unpack_setting.delimiter[0] = '\r';
- s_unpack_setting.delimiter[1] = '\n';
- hio_set_unpack(io, &s_unpack_setting);
- #endif
- hio_write(io, "hello\r\n", 7);
- hio_setcb_read(io, on_message);
- hio_read(io);
- }
- // hloop_new -> malloc(tcp_client_t)
- tcp_client_t* tcp_client_new(hloop_t* loop) {
- if (loop == NULL) {
- loop = hloop_new(HLOOP_FLAG_AUTO_FREE);
- if (loop == NULL) return NULL;
- }
- tcp_client_t* cli = NULL;
- HV_ALLOC_SIZEOF(cli);
- if (cli == NULL) return NULL;
- cli->loop = loop;
- hmutex_init(&cli->mutex_);
- return cli;
- }
- // hloop_free -> free(tcp_client_t)
- void tcp_client_free(tcp_client_t* cli) {
- if (!cli) return;
- hmutex_destroy(&cli->mutex_);
- if (cli->ssl_ctx && cli->alloced_ssl_ctx) {
- hssl_ctx_free(cli->ssl_ctx);
- cli->ssl_ctx = NULL;
- }
- HV_FREE(cli->reconn_setting);
- HV_FREE(cli);
- }
- void tcp_client_run (tcp_client_t* cli) {
- if (!cli || !cli->loop) return;
- hloop_run(cli->loop);
- }
- void tcp_client_stop(tcp_client_t* cli) {
- if (!cli || !cli->loop) return;
- hloop_stop(cli->loop);
- }
- int tcp_client_set_ssl_ctx(tcp_client_t* cli, hssl_ctx_t ssl_ctx) {
- cli->ssl_ctx = ssl_ctx;
- return 0;
- }
- // hssl_ctx_new(opt) -> tcp_client_set_ssl_ctx
- int tcp_client_new_ssl_ctx(tcp_client_t* cli, hssl_ctx_opt_t* opt) {
- opt->endpoint = HSSL_CLIENT;
- hssl_ctx_t ssl_ctx = hssl_ctx_new(opt);
- if (ssl_ctx == NULL) return ERR_NEW_SSL_CTX;
- cli->alloced_ssl_ctx = true;
- return tcp_client_set_ssl_ctx(cli, ssl_ctx);
- }
- int tcp_client_set_reconnect(tcp_client_t* cli, reconn_setting_t* reconn) {
- if (reconn == NULL) {
- HV_FREE(cli->reconn_setting);
- return 0;
- }
- if (cli->reconn_setting == NULL) {
- HV_ALLOC_SIZEOF(cli->reconn_setting);
- }
- *cli->reconn_setting = *reconn;
- return 0;
- }
- int tcp_client_reconnect(tcp_client_t* cli) {
- tcp_client_connect(cli, cli->host, cli->port, cli->ssl);
- return 0;
- }
- int tcp_client_connect(tcp_client_t* cli, const char* host, int port, int ssl) {
- if (!cli) return -1;
- hv_strncpy(cli->host, host, sizeof(cli->host));
- cli->port = port;
- cli->ssl = ssl;
- hio_t* io = hio_create_socket(cli->loop, host, port, HIO_TYPE_TCP, HIO_CLIENT_SIDE);
- if (io == NULL) return -1;
- if (ssl) {
- if (cli->ssl_ctx) {
- hio_set_ssl_ctx(io, cli->ssl_ctx);
- }
- hio_enable_ssl(io);
- }
- if (cli->connect_timeout > 0) {
- hio_set_connect_timeout(io, cli->connect_timeout);
- }
- cli->io = io;
- hevent_set_userdata(io, cli);
- hio_setcb_connect(io, on_connect);
- hio_setcb_close(io, on_close);
- return hio_connect(io);
- }
- int tcp_client_disconnect(tcp_client_t* cli) {
- if (!cli || !cli->io) return -1;
- // cancel reconnect first
- tcp_client_set_reconnect(cli, NULL);
- return hio_close(cli->io);
- }
- bool tcp_client_is_connected(tcp_client_t* cli) {
- return cli && cli->connected;
- }
- int tcp_client_send(tcp_client_t* cli, const void* buf, int len) {
- if (!cli || !cli->io || !buf || len == 0) return -1;
- if (!cli->connected) return -2;
- // thread-safe
- hmutex_lock(&cli->mutex_);
- int nwrite = hio_write(cli->io, buf, len);
- hmutex_unlock(&cli->mutex_);
- return nwrite;
- }
- int main(int argc, char** argv) {
- if (argc < 3) {
- printf("Usage: %s host port\n", argv[0]);
- return -10;
- }
- const char* host = argv[1];
- int port = atoi(argv[2]);
- tcp_client_t* cli = tcp_client_new(NULL);
- if (!cli) return -20;
- #if TEST_RECONNECT
- reconn_setting_t reconn;
- reconn_setting_init(&reconn);
- reconn.min_delay = 1000;
- reconn.max_delay = 10000;
- reconn.delay_policy = 2;
- tcp_client_set_reconnect(cli, &reconn);
- #endif
- int ssl = 0;
- #if TEST_SSL
- ssl = 1;
- #endif
- tcp_client_connect(cli, host, port, ssl);
- tcp_client_run(cli);
- tcp_client_free(cli);
- return 0;
- }
|