ithewei 3 lat temu
rodzic
commit
53fd583266

+ 72 - 0
event/hloop.h

@@ -539,6 +539,78 @@ unpack_setting_t grpc_unpack_setting = {
 };
 */
 
+//-----------------reconnect----------------------------------------
+#define DEFAULT_RECONNECT_MIN_DELAY     1000    // ms
+#define DEFAULT_RECONNECT_MAX_DELAY     60000   // ms
+#define DEFAULT_RECONNECT_DELAY_POLICY  2       // exponential
+#define DEFAULT_RECONNECT_MAX_RETRY_CNT INFINITE
+typedef struct reconn_setting_s {
+    uint32_t min_delay;  // ms
+    uint32_t max_delay;  // ms
+    uint32_t cur_delay;  // ms
+    /*
+     * @delay_policy
+     * 0: fixed
+     * min_delay=3s => 3,3,3...
+     * 1: linear
+     * min_delay=3s max_delay=10s => 3,6,9,10,10...
+     * other: exponential
+     * min_delay=3s max_delay=60s delay_policy=2 => 3,6,12,24,48,60,60...
+     */
+    uint32_t delay_policy;
+    uint32_t max_retry_cnt;
+    uint32_t cur_retry_cnt;
+
+#ifdef __cplusplus
+    reconn_setting_s() {
+        min_delay = DEFAULT_RECONNECT_MIN_DELAY;
+        max_delay = DEFAULT_RECONNECT_MAX_DELAY;
+        cur_delay = 0;
+        // 1,2,4,8,16,32,60,60...
+        delay_policy = DEFAULT_RECONNECT_DELAY_POLICY;
+        max_retry_cnt = DEFAULT_RECONNECT_MAX_RETRY_CNT;
+        cur_retry_cnt = 0;
+    }
+#endif
+} reconn_setting_t;
+
+HV_INLINE void reconn_setting_init(reconn_setting_t* reconn) {
+    reconn->min_delay = DEFAULT_RECONNECT_MIN_DELAY;
+    reconn->max_delay = DEFAULT_RECONNECT_MAX_DELAY;
+    reconn->cur_delay = 0;
+    // 1,2,4,8,16,32,60,60...
+    reconn->delay_policy = DEFAULT_RECONNECT_DELAY_POLICY;
+    reconn->max_retry_cnt = DEFAULT_RECONNECT_MAX_RETRY_CNT;
+    reconn->cur_retry_cnt = 0;
+}
+
+HV_INLINE void reconn_setting_reset(reconn_setting_t* reconn) {
+    reconn->cur_delay = 0;
+    reconn->cur_retry_cnt = 0;
+}
+
+HV_INLINE bool reconn_setting_can_retry(reconn_setting_t* reconn) {
+    ++reconn->cur_retry_cnt;
+    return reconn->max_retry_cnt == INFINITE ||
+           reconn->cur_retry_cnt < reconn->max_retry_cnt;
+}
+
+HV_INLINE uint32_t reconn_setting_calc_delay(reconn_setting_t* reconn) {
+    if (reconn->delay_policy == 0) {
+        // fixed
+        reconn->cur_delay = reconn->min_delay;
+    } else if (reconn->delay_policy == 1) {
+        // linear
+        reconn->cur_delay += reconn->min_delay;
+    } else {
+        // exponential
+        reconn->cur_delay *= reconn->delay_policy;
+    }
+    reconn->cur_delay = MAX(reconn->cur_delay, reconn->min_delay);
+    reconn->cur_delay = MIN(reconn->cur_delay, reconn->max_delay);
+    return reconn->cur_delay;
+}
+
 //-----------------rudp---------------------------------------------
 #if WITH_KCP
 #define WITH_RUDP 1

+ 38 - 65
evpp/TcpClient.h

@@ -10,33 +10,7 @@
 
 namespace hv {
 
-struct ReconnectInfo {
-    uint32_t min_delay;  // ms
-    uint32_t max_delay;  // ms
-    uint32_t cur_delay;  // ms
-    /*
-     * @delay_policy
-     * 0: fixed
-     * min_delay=3s => 3,3,3...
-     * 1: linear
-     * min_delay=3s max_delay=10s => 3,6,9,10,10...
-     * other: exponential
-     * min_delay=3s max_delay=60s delay_policy=2 => 3,6,12,24,48,60,60...
-     */
-    uint32_t delay_policy;
-    uint32_t max_retry_cnt;
-    uint32_t cur_retry_cnt;
-
-    ReconnectInfo() {
-        min_delay = 1000;
-        max_delay = 60000;
-        cur_delay = 0;
-        // 1,2,4,8,16,32,60,60...
-        delay_policy = 2;
-        max_retry_cnt = INFINITE;
-        cur_retry_cnt = 0;
-    }
-};
+typedef struct reconn_setting_s ReconnectInfo; // Deprecated
 
 template<class TSocketChannel = SocketChannel>
 class TcpClientTmpl {
@@ -46,11 +20,13 @@ public:
     TcpClientTmpl() {
         tls = false;
         connect_timeout = 5000;
-        enable_reconnect = false;
-        enable_unpack = false;
+        reconn_setting = NULL;
+        unpack_setting = NULL;
     }
 
     virtual ~TcpClientTmpl() {
+        HV_FREE(reconn_setting);
+        HV_FREE(unpack_setting);
     }
 
     const EventLoopPtr& loop() {
@@ -82,7 +58,7 @@ public:
     }
     // closesocket thread-safe
     void closesocket() {
-        enable_reconnect = false;
+        setReconnect(NULL);
         if (channel) {
             channel->close(true);
         }
@@ -97,13 +73,16 @@ public:
             channel->setConnectTimeout(connect_timeout);
         }
         channel->onconnect = [this]() {
-            if (enable_unpack) {
-                channel->setUnpack(&unpack_setting);
+            if (unpack_setting) {
+                channel->setUnpack(unpack_setting);
             }
             channel->startRead();
             if (onConnection) {
                 onConnection(channel);
             }
+            if (reconn_setting) {
+                reconn_setting_reset(reconn_setting);
+            }
         };
         channel->onread = [this](Buffer* buf) {
             if (onMessage) {
@@ -120,7 +99,7 @@ public:
                 onConnection(channel);
             }
             // reconnect
-            if (enable_reconnect) {
+            if (reconn_setting) {
                 startReconnect();
             } else {
                 channel = NULL;
@@ -132,22 +111,11 @@ public:
     }
 
     int startReconnect() {
-        if (++reconnect_info.cur_retry_cnt > reconnect_info.max_retry_cnt) return 0;
-        if (reconnect_info.delay_policy == 0) {
-            // fixed
-            reconnect_info.cur_delay = reconnect_info.min_delay;
-        } else if (reconnect_info.delay_policy == 1) {
-            // linear
-            reconnect_info.cur_delay += reconnect_info.min_delay;
-        } else {
-            // exponential
-            reconnect_info.cur_delay *= reconnect_info.delay_policy;
-        }
-        reconnect_info.cur_delay = MAX(reconnect_info.cur_delay, reconnect_info.min_delay);
-        reconnect_info.cur_delay = MIN(reconnect_info.cur_delay, reconnect_info.max_delay);
-        loop_thread.loop()->setTimeout(reconnect_info.cur_delay, [this](TimerID timerID){
-            hlogi("reconnect... cnt=%d, delay=%d", reconnect_info.cur_retry_cnt, reconnect_info.cur_delay);
-            // printf("reconnect... cnt=%d, delay=%d\n", reconnect_info.cur_retry_cnt, reconnect_info.cur_delay);
+        if (!reconn_setting) return -1;
+        if (!reconn_setting_can_retry(reconn_setting)) return -2;
+        uint32_t delay = reconn_setting_calc_delay(reconn_setting);
+        loop_thread.loop()->setTimeout(delay, [this](TimerID timerID){
+            hlogi("reconnect... cnt=%d, delay=%d", reconn_setting->cur_retry_cnt, reconn_setting->cur_delay);
             createsocket(&peeraddr.sa);
             startConnect();
         });
@@ -159,7 +127,7 @@ public:
     }
     // stop thread-safe
     void stop(bool wait_threads_stopped = true) {
-        enable_reconnect = false;
+        setReconnect(NULL);
         loop_thread.stop(wait_threads_stopped);
     }
 
@@ -206,22 +174,29 @@ public:
         connect_timeout = ms;
     }
 
-    void setReconnect(ReconnectInfo* info) {
-        if (info) {
-            enable_reconnect = true;
-            reconnect_info = *info;
-        } else {
-            enable_reconnect = false;
+    void setReconnect(reconn_setting_t* setting) {
+        if (setting == NULL) {
+            HV_FREE(reconn_setting);
+            return;
         }
+        if (reconn_setting == NULL) {
+            HV_ALLOC_SIZEOF(reconn_setting);
+        }
+        *reconn_setting = *setting;
+    }
+    bool isReconnect() {
+        return reconn_setting && reconn_setting->cur_retry_cnt > 0;
     }
 
     void setUnpack(unpack_setting_t* setting) {
-        if (setting) {
-            enable_unpack = true;
-            unpack_setting = *setting;
-        } else {
-            enable_unpack = false;
+        if (setting == NULL) {
+            HV_FREE(unpack_setting);
+            return;
+        }
+        if (unpack_setting == NULL) {
+            HV_ALLOC_SIZEOF(unpack_setting);
         }
+        *unpack_setting = *setting;
     }
 
 public:
@@ -230,10 +205,8 @@ public:
     sockaddr_u              peeraddr;
     bool                    tls;
     int                     connect_timeout;
-    bool                    enable_reconnect;
-    ReconnectInfo           reconnect_info;
-    bool                    enable_unpack;
-    unpack_setting_t        unpack_setting;
+    reconn_setting_t*       reconn_setting;
+    unpack_setting_t*       unpack_setting;
 
     // Callback
     std::function<void(const TSocketChannelPtr&)>           onConnection;

+ 5 - 2
evpp/TcpClient_test.cpp

@@ -23,7 +23,7 @@ int main(int argc, char* argv[]) {
         return -20;
     }
     printf("client connect to port %d, connfd=%d ...\n", port, connfd);
-    cli.onConnection = [](const SocketChannelPtr& channel) {
+    cli.onConnection = [&cli](const SocketChannelPtr& channel) {
         std::string peeraddr = channel->peeraddr();
         if (channel->isConnected()) {
             printf("connected to %s! connfd=%d\n", peeraddr.c_str(), channel->fd());
@@ -41,12 +41,15 @@ int main(int argc, char* argv[]) {
         } else {
             printf("disconnected to %s! connfd=%d\n", peeraddr.c_str(), channel->fd());
         }
+        if (cli.isReconnect()) {
+            printf("reconnect cnt=%d, delay=%d\n", cli.reconn_setting->cur_retry_cnt, cli.reconn_setting->cur_delay);
+        }
     };
     cli.onMessage = [](const SocketChannelPtr& channel, Buffer* buf) {
         printf("< %.*s\n", (int)buf->size(), (char*)buf->data());
     };
     // reconnect: 1,2,4,8,10,10,10...
-    ReconnectInfo reconn;
+    reconn_setting_t reconn;
     reconn.min_delay = 1000;
     reconn.max_delay = 10000;
     reconn.delay_policy = 2;

+ 16 - 1
examples/mqtt/mqtt_sub.c

@@ -19,6 +19,7 @@
  */
 #define TEST_SSL        0
 #define TEST_AUTH       0
+#define TEST_RECONNECT  1
 
 /*
  * workflow:
@@ -41,10 +42,15 @@ static void on_mqtt(mqtt_client_t* cli, int type) {
     switch(type) {
     case MQTT_TYPE_CONNECT:
         printf("mqtt connected!\n");
+        if (cli->reconn_setting && cli->reconn_setting->cur_retry_cnt > 0) {
+            printf("mqtt reconnect cnt=%d, delay=%d\n", cli->reconn_setting->cur_retry_cnt, cli->reconn_setting->cur_delay);
+        }
         break;
     case MQTT_TYPE_DISCONNECT:
         printf("mqtt disconnected!\n");
-        mqtt_client_stop(cli);
+        if (cli->reconn_setting && cli->reconn_setting->cur_retry_cnt > 0) {
+            printf("mqtt reconnect cnt=%d, delay=%d\n", cli->reconn_setting->cur_retry_cnt, cli->reconn_setting->cur_delay);
+        }
         break;
     case MQTT_TYPE_CONNACK:
         printf("mqtt connack!\n");
@@ -80,6 +86,15 @@ static int mqtt_subscribe(const char* host, int port, const char* topic) {
     mqtt_client_set_userdata(cli, (void*)topic);
     mqtt_client_set_callback(cli, on_mqtt);
 
+#if TEST_RECONNECT
+    reconn_setting_t reconn;
+    reconn_setting_init(&reconn);
+    reconn.min_delay = 1000;
+    reconn.max_delay = 10000;
+    reconn.delay_policy = 2;
+    mqtt_client_set_reconnect(cli, &reconn);
+#endif
+
     int ssl = 0;
 #if TEST_SSL
     ssl = 1;

+ 1 - 1
examples/protorpc/protorpc_client.cpp

@@ -70,7 +70,7 @@ public:
 
         setConnectTimeout(5000);
 
-        ReconnectInfo reconn;
+        reconn_setting_t reconn;
         reconn.min_delay = 1000;
         reconn.max_delay = 10000;
         reconn.delay_policy = 2;

+ 1 - 1
examples/websocket_client_test.cpp

@@ -31,7 +31,7 @@ int main(int argc, char** argv) {
     };
 
     // reconnect: 1,2,4,8,10,10,10...
-    ReconnectInfo reconn;
+    reconn_setting_t reconn;
     reconn.min_delay = 1000;
     reconn.max_delay = 10000;
     reconn.delay_policy = 2;

+ 43 - 1
mqtt/mqtt_client.c

@@ -131,12 +131,25 @@ static int mqtt_client_login(mqtt_client_t* cli) {
     return nwrite < 0 ? nwrite : 0;
 }
 
+static void reconnect_timer_cb(htimer_t* timer) {
+    mqtt_client_t* cli = (mqtt_client_t*)hevent_userdata(timer);
+    if (cli == NULL) return;
+    cli->reconn_timer = NULL;
+    mqtt_client_reconnect(cli);
+}
+
 static void on_close(hio_t* io) {
     mqtt_client_t* cli = (mqtt_client_t*)hevent_userdata(io);
     if (cli->cb) {
         cli->head.type = MQTT_TYPE_DISCONNECT;
         cli->cb(cli, cli->head.type);
     }
+    // reconnect
+    if (cli->reconn_setting && reconn_setting_can_retry(cli->reconn_setting)) {
+        uint32_t delay = reconn_setting_calc_delay(cli->reconn_setting);
+        cli->reconn_timer = htimer_add(cli->loop, reconnect_timer_cb, delay, 1);
+        hevent_set_userdata(cli->reconn_timer, cli);
+    }
 }
 
 static void on_packet(hio_t* io, void* buf, int len) {
@@ -277,6 +290,9 @@ static void on_connect(hio_t* io) {
         cli->head.type = MQTT_TYPE_CONNECT;
         cli->cb(cli, cli->head.type);
     }
+    if (cli->reconn_setting) {
+        reconn_setting_reset(cli->reconn_setting);
+    }
 
     static unpack_setting_t mqtt_unpack_setting;
     mqtt_unpack_setting.mode = UNPACK_BY_LENGTH_FIELD;
@@ -311,10 +327,15 @@ mqtt_client_t* mqtt_client_new(hloop_t* loop) {
 void mqtt_client_free(mqtt_client_t* cli) {
     if (!cli) return;
     hmutex_destroy(&cli->mutex_);
+    if (cli->reconn_timer) {
+        htimer_del(cli->reconn_timer);
+        cli->reconn_timer = NULL;
+    }
     if (cli->ssl_ctx && cli->alloced_ssl_ctx) {
         hssl_ctx_free(cli->ssl_ctx);
         cli->ssl_ctx = NULL;
     }
+    HV_FREE(cli->reconn_setting);
     HV_FREE(cli->will);
     HV_FREE(cli);
 }
@@ -385,12 +406,31 @@ int mqtt_client_new_ssl_ctx(mqtt_client_t* cli, hssl_ctx_opt_t* opt) {
     return mqtt_client_set_ssl_ctx(cli, ssl_ctx);
 }
 
+int mqtt_client_set_reconnect(mqtt_client_t* cli, reconn_setting_t* reconn) {
+    if (reconn == NULL) {
+        HV_FREE(cli->reconn_setting);
+        return 0;
+    }
+    if (cli->reconn_setting == NULL) {
+        HV_ALLOC_SIZEOF(cli->reconn_setting);
+    }
+    *cli->reconn_setting = *reconn;
+    return 0;
+}
+
+int mqtt_client_reconnect(mqtt_client_t* cli) {
+    mqtt_client_connect(cli, cli->host, cli->port, cli->ssl);
+    return 0;
+}
+
 int mqtt_client_connect(mqtt_client_t* cli, const char* host, int port, int ssl) {
     if (!cli) return -1;
+    safe_strncpy(cli->host, host, sizeof(cli->host));
+    cli->port = port;
+    cli->ssl = ssl;
     hio_t* io = hio_create_socket(cli->loop, host, port, HIO_TYPE_TCP, HIO_CLIENT_SIDE);
     if (io == NULL) return -1;
     if (ssl) {
-        cli->ssl = 1;
         if (cli->ssl_ctx) {
             hio_set_ssl_ctx(io, cli->ssl_ctx);
         }
@@ -405,6 +445,8 @@ int mqtt_client_connect(mqtt_client_t* cli, const char* host, int port, int ssl)
 
 int mqtt_client_disconnect(mqtt_client_t* cli) {
     if (!cli || !cli->io) return -1;
+    // cancel reconnect first
+    mqtt_client_set_reconnect(cli, NULL);
     mqtt_send_disconnect(cli->io);
     return hio_close(cli->io);
 }

+ 10 - 2
mqtt/mqtt_client.h

@@ -18,6 +18,8 @@ struct mqtt_client_s {
     // connect: host:port
     char host[256];
     int  port;
+    // reconnect
+    reconn_setting_t* reconn_setting;
     // login: flags + keepalive + client_id + will + username + password
     // flags
     unsigned short clean_session:   1;
@@ -40,8 +42,9 @@ struct mqtt_client_s {
     // userdata
     void* userdata;
     // privdata
-    hloop_t* loop;
-    hio_t*   io;
+    hloop_t*    loop;
+    hio_t*      io;
+    htimer_t*   reconn_timer;
     // SSL/TLS
     hssl_ctx_t ssl_ctx;
     // thread-safe
@@ -85,6 +88,11 @@ HV_EXPORT int mqtt_client_set_ssl_ctx(mqtt_client_t* cli, hssl_ctx_t ssl_ctx);
 // hssl_ctx_new(opt) -> mqtt_client_set_ssl_ctx
 HV_EXPORT int mqtt_client_new_ssl_ctx(mqtt_client_t* cli, hssl_ctx_opt_t* opt);
 
+// reconnect
+HV_EXPORT int mqtt_client_set_reconnect(mqtt_client_t* cli,
+        reconn_setting_t* reconn);
+HV_EXPORT int mqtt_client_reconnect(mqtt_client_t* cli);
+
 // connect
 // hio_create_socket -> hio_connect ->
 // on_connect -> mqtt_client_login ->