Bläddra i källkod

Add mqtt_client_set_connect_timeout, mqtt_client_is_connected

ithewei 3 år sedan
förälder
incheckning
d989b5c1fc
3 ändrade filer med 23 tillägg och 1 borttagningar
  1. 2 0
      docs/API.md
  2. 16 0
      mqtt/mqtt_client.c
  3. 5 1
      mqtt/mqtt_client.h

+ 2 - 0
docs/API.md

@@ -624,7 +624,9 @@
 - mqtt_client_new_ssl_ctx
 - mqtt_client_new_ssl_ctx
 - mqtt_client_set_reconnect
 - mqtt_client_set_reconnect
 - mqtt_client_reconnect
 - mqtt_client_reconnect
+- mqtt_client_set_connect_timeout
 - mqtt_client_connect
 - mqtt_client_connect
+- mqtt_client_is_connected
 - mqtt_client_disconnect
 - mqtt_client_disconnect
 - mqtt_client_publish
 - mqtt_client_publish
 - mqtt_client_subscribe
 - mqtt_client_subscribe

+ 16 - 0
mqtt/mqtt_client.c

@@ -168,6 +168,7 @@ static void reconnect_timer_cb(htimer_t* timer) {
 
 
 static void on_close(hio_t* io) {
 static void on_close(hio_t* io) {
     mqtt_client_t* cli = (mqtt_client_t*)hevent_userdata(io);
     mqtt_client_t* cli = (mqtt_client_t*)hevent_userdata(io);
+    cli->connected = 0;
     if (cli->cb) {
     if (cli->cb) {
         cli->head.type = MQTT_TYPE_DISCONNECT;
         cli->head.type = MQTT_TYPE_DISCONNECT;
         cli->cb(cli, cli->head.type);
         cli->cb(cli, cli->head.type);
@@ -206,6 +207,7 @@ static void on_packet(hio_t* io, void* buf, int len) {
             hio_close(io);
             hio_close(io);
             return;
             return;
         }
         }
+        cli->connected = 1;
         if (cli->keepalive) {
         if (cli->keepalive) {
             hio_set_heartbeat(io, cli->keepalive * 1000, mqtt_send_ping);
             hio_set_heartbeat(io, cli->keepalive * 1000, mqtt_send_ping);
         }
         }
@@ -452,6 +454,10 @@ int mqtt_client_reconnect(mqtt_client_t* cli) {
     return 0;
     return 0;
 }
 }
 
 
+void mqtt_client_set_connect_timeout(mqtt_client_t* cli, int ms) {
+    cli->connect_timeout = ms;
+}
+
 int mqtt_client_connect(mqtt_client_t* cli, const char* host, int port, int ssl) {
 int mqtt_client_connect(mqtt_client_t* cli, const char* host, int port, int ssl) {
     if (!cli) return -1;
     if (!cli) return -1;
     hv_strncpy(cli->host, host, sizeof(cli->host));
     hv_strncpy(cli->host, host, sizeof(cli->host));
@@ -465,6 +471,9 @@ int mqtt_client_connect(mqtt_client_t* cli, const char* host, int port, int ssl)
         }
         }
         hio_enable_ssl(io);
         hio_enable_ssl(io);
     }
     }
+    if (cli->connect_timeout > 0) {
+        hio_set_connect_timeout(io, cli->connect_timeout);
+    }
     cli->io = io;
     cli->io = io;
     hevent_set_userdata(io, cli);
     hevent_set_userdata(io, cli);
     hio_setcb_connect(io, on_connect);
     hio_setcb_connect(io, on_connect);
@@ -472,6 +481,10 @@ int mqtt_client_connect(mqtt_client_t* cli, const char* host, int port, int ssl)
     return hio_connect(io);
     return hio_connect(io);
 }
 }
 
 
+bool mqtt_client_is_connected(mqtt_client_t* cli) {
+    return cli && cli->connected;
+}
+
 int mqtt_client_disconnect(mqtt_client_t* cli) {
 int mqtt_client_disconnect(mqtt_client_t* cli) {
     if (!cli || !cli->io) return -1;
     if (!cli || !cli->io) return -1;
     // cancel reconnect first
     // cancel reconnect first
@@ -482,6 +495,7 @@ int mqtt_client_disconnect(mqtt_client_t* cli) {
 
 
 int mqtt_client_publish(mqtt_client_t* cli, mqtt_message_t* msg) {
 int mqtt_client_publish(mqtt_client_t* cli, mqtt_message_t* msg) {
     if (!cli || !cli->io || !msg) return -1;
     if (!cli || !cli->io || !msg) return -1;
+    if (!cli->connected) return -2;
     int topic_len = msg->topic_len ? msg->topic_len : strlen(msg->topic);
     int topic_len = msg->topic_len ? msg->topic_len : strlen(msg->topic);
     int payload_len = msg->payload_len ? msg->payload_len : strlen(msg->payload);
     int payload_len = msg->payload_len ? msg->payload_len : strlen(msg->payload);
     int len = 2 + topic_len + payload_len;
     int len = 2 + topic_len + payload_len;
@@ -527,6 +541,7 @@ unlock:
 
 
 int mqtt_client_subscribe(mqtt_client_t* cli, const char* topic, int qos) {
 int mqtt_client_subscribe(mqtt_client_t* cli, const char* topic, int qos) {
     if (!cli || !cli->io || !topic) return -1;
     if (!cli || !cli->io || !topic) return -1;
+    if (!cli->connected) return -2;
     int topic_len = strlen(topic);
     int topic_len = strlen(topic);
     int len = 2 + 2 + topic_len + 1;
     int len = 2 + 2 + topic_len + 1;
 
 
@@ -554,6 +569,7 @@ int mqtt_client_subscribe(mqtt_client_t* cli, const char* topic, int qos) {
 
 
 int mqtt_client_unsubscribe(mqtt_client_t* cli, const char* topic) {
 int mqtt_client_unsubscribe(mqtt_client_t* cli, const char* topic) {
     if (!cli || !cli->io || !topic) return -1;
     if (!cli || !cli->io || !topic) return -1;
+    if (!cli->connected) return -2;
     int topic_len = strlen(topic);
     int topic_len = strlen(topic);
     int len = 2 + 2 + topic_len;
     int len = 2 + 2 + topic_len;
 
 

+ 5 - 1
mqtt/mqtt_client.h

@@ -18,6 +18,7 @@ struct mqtt_client_s {
     // connect: host:port
     // connect: host:port
     char host[256];
     char host[256];
     int  port;
     int  port;
+    int  connect_timeout; // ms
     // reconnect
     // reconnect
     reconn_setting_t* reconn_setting;
     reconn_setting_t* reconn_setting;
     // login: flags + keepalive + client_id + will + username + password
     // login: flags + keepalive + client_id + will + username + password
@@ -26,6 +27,7 @@ struct mqtt_client_s {
     unsigned char   clean_session:   1;
     unsigned char   clean_session:   1;
     unsigned char   ssl: 1; // Read Only
     unsigned char   ssl: 1; // Read Only
     unsigned char   alloced_ssl_ctx: 1; // intern
     unsigned char   alloced_ssl_ctx: 1; // intern
+    unsigned char   connected : 1;
     unsigned short  keepalive;
     unsigned short  keepalive;
     char client_id[64];
     char client_id[64];
     // will
     // will
@@ -98,10 +100,12 @@ HV_EXPORT int mqtt_client_reconnect(mqtt_client_t* cli);
 // hio_create_socket -> hio_connect ->
 // hio_create_socket -> hio_connect ->
 // on_connect -> mqtt_client_login ->
 // on_connect -> mqtt_client_login ->
 // on_connack
 // on_connack
-HV_EXPORT int mqtt_client_connect(mqtt_client_t* cli,
+HV_EXPORT void mqtt_client_set_connect_timeout(mqtt_client_t* cli, int ms);
+HV_EXPORT int  mqtt_client_connect(mqtt_client_t* cli,
         const char* host,
         const char* host,
         int port DEFAULT(DEFAULT_MQTT_PORT),
         int port DEFAULT(DEFAULT_MQTT_PORT),
         int ssl  DEFAULT(0));
         int ssl  DEFAULT(0));
+HV_EXPORT bool mqtt_client_is_connected(mqtt_client_t* cli);
 
 
 // disconnect
 // disconnect
 // @see hio_close
 // @see hio_close