Browse Source

#201: Add class MqttClient

ithewei 3 years ago
parent
commit
68a19a5a01
6 changed files with 294 additions and 3 deletions
  1. 4 0
      Makefile
  2. 4 1
      examples/CMakeLists.txt
  3. 83 0
      examples/mqtt/mqtt_client_test.cpp
  4. 1 1
      examples/mqtt/mqtt_pub.c
  5. 1 1
      examples/mqtt/mqtt_sub.c
  6. 201 0
      mqtt/mqtt_client.h

+ 4 - 0
Makefile

@@ -69,6 +69,7 @@ examples: hmain_test htimer_test hloop_test \
 	websocket_client_test \
 	mqtt_sub \
 	mqtt_pub \
+	mqtt_client_test \
 	jsonrpc
 	@echo "make examples done."
 
@@ -181,6 +182,9 @@ mqtt_sub: prepare
 mqtt_pub: prepare
 	$(MAKEF) TARGET=$@ SRCDIRS="$(CORE_SRCDIRS) mqtt" SRCS="examples/mqtt/mqtt_pub.c"
 
+mqtt_client_test: prepare
+	$(MAKEF) TARGET=$@ SRCDIRS="$(CORE_SRCDIRS) mqtt" SRCS="examples/mqtt/mqtt_client_test.cpp"
+
 jsonrpc: jsonrpc_client jsonrpc_server
 
 jsonrpc_client: prepare

+ 4 - 1
examples/CMakeLists.txt

@@ -150,7 +150,10 @@ if(WITH_MQTT)
     add_executable(mqtt_pub mqtt/mqtt_pub.c)
     target_link_libraries(mqtt_pub ${HV_LIBRARIES})
 
-    list(APPEND EXAMPLES mqtt_sub mqtt_pub)
+    add_executable(mqtt_client_test mqtt/mqtt_client_test.cpp)
+    target_link_libraries(mqtt_client_test ${HV_LIBRARIES})
+
+    list(APPEND EXAMPLES mqtt_sub mqtt_pub mqtt_client_test)
 endif()
 
 add_custom_target(examples DEPENDS ${EXAMPLES})

+ 83 - 0
examples/mqtt/mqtt_client_test.cpp

@@ -0,0 +1,83 @@
+/*
+ * mqtt client
+ *
+ * @build   make examples
+ *
+ * @test    bin/mqtt_client_test 127.0.0.1 1883 topic payload
+ *
+ */
+
+#include "mqtt_client.h"
+using namespace hv;
+
+/*
+ * @test    MQTTS
+ * #define  TEST_SSL 1
+ *
+ * @build   ./configure --with-mqtt --with-openssl && make clean && make
+ *
+ */
+#define TEST_SSL        0
+#define TEST_AUTH       0
+#define TEST_RECONNECT  1
+#define TEST_QOS        0
+
+int main(int argc, char** argv) {
+    if (argc < 5) {
+        printf("Usage: %s host port topic payload\n", argv[0]);
+        return -10;
+    }
+    const char* host = argv[1];
+    int port = atoi(argv[2]);
+    const char* topic = argv[3];
+    const char* payload = argv[4];
+
+    MqttClient cli;
+
+    cli.onConnect = [topic, payload](MqttClient* cli) {
+        printf("connected!\n");
+#if TEST_QOS
+        cli->subscribe(topic, 1, [topic, payload](MqttClient* cli) {
+            printf("subscribe OK!\n");
+            cli->publish(topic, payload, 1, 0, [](MqttClient* cli) {
+                printf("publish OK!\n");
+            });
+        });
+#else
+        cli->subscribe(topic);
+        cli->publish(topic, payload);
+#endif
+    };
+
+    cli.onMessage = [](MqttClient* cli, mqtt_message_t* msg) {
+        printf("topic: %.*s\n", msg->topic_len, msg->topic);
+        printf("payload: %.*s\n", msg->payload_len, msg->payload);
+        cli->disconnect();
+        cli->stop();
+    };
+
+    cli.onClose = [](MqttClient* cli) {
+        printf("disconnected!\n");
+    };
+
+#if TEST_AUTH
+    cli.setAuth("test", "123456");
+#endif
+
+#if TEST_RECONNECT
+    reconn_setting_t reconn;
+    reconn_setting_init(&reconn);
+    reconn.min_delay = 1000;
+    reconn.max_delay = 10000;
+    reconn.delay_policy = 2;
+    cli.setReconnect(&reconn);
+#endif
+
+    int ssl = 0;
+#if TEST_SSL
+    ssl = 1;
+#endif
+    cli.connect(host, port, ssl);
+    cli.run();
+    return 0;
+}

+ 1 - 1
examples/mqtt/mqtt_pub.c

@@ -14,7 +14,7 @@
  * @test    MQTTS
  * #define  TEST_SSL 1
  *
- * @build   ./configure --with-openssl && make clean && make
+ * @build   ./configure --with-mqtt --with-openssl && make clean && make
  *
  */
 #define TEST_SSL        0

+ 1 - 1
examples/mqtt/mqtt_sub.c

@@ -14,7 +14,7 @@
  * @test    MQTTS
  * #define  TEST_SSL 1
  *
- * @build   ./configure --with-openssl && make clean && make
+ * @build   ./configure --with-mqtt --with-openssl && make clean && make
  *
  */
 #define TEST_SSL        0

+ 201 - 0
mqtt/mqtt_client.h

@@ -125,4 +125,205 @@ HV_EXPORT int mqtt_client_unsubscribe(mqtt_client_t* cli,
 
 END_EXTERN_C
 
+#ifdef __cplusplus
+
+#include <functional>
+#include <map>
+#include <mutex>
+
+namespace hv {
+
+// @usage examples/mqtt/mqtt_client_test.cpp
+class MqttClient {
+public:
+    mqtt_client_t*  client;
+    // callbacks
+    typedef std::function<void(MqttClient*)>                    MqttCallback;
+    typedef std::function<void(MqttClient*, mqtt_message_t*)>   MqttMessageCallback;
+    MqttCallback        onConnect;
+    MqttCallback        onClose;
+    MqttMessageCallback onMessage;
+
+    MqttClient(hloop_t* loop = NULL) {
+        client = mqtt_client_new(loop);
+    }
+
+    ~MqttClient() {
+        if (client) {
+            mqtt_client_free(client);
+            client = NULL;
+        }
+    }
+
+    void run() {
+        mqtt_client_set_callback(client, on_mqtt);
+        mqtt_client_set_userdata(client, this);
+        mqtt_client_run(client);
+    }
+
+    void stop() {
+        mqtt_client_stop(client);
+    }
+
+    void setID(const char* id) {
+        mqtt_client_set_id(client, id);
+    }
+
+    void setWill(mqtt_message_t* will) {
+        mqtt_client_set_will(client, will);
+    }
+
+    void setAuth(const char* username, const char* password) {
+        mqtt_client_set_auth(client, username, password);
+    }
+
+    int lastError() {
+        return mqtt_client_get_last_error(client);
+    }
+
+    // SSL/TLS
+    int setSslCtx(hssl_ctx_t ssl_ctx) {
+        return mqtt_client_set_ssl_ctx(client, ssl_ctx);
+    }
+    int newSslCtx(hssl_ctx_opt_t* opt) {
+        return mqtt_client_new_ssl_ctx(client, opt);
+    }
+
+    void setReconnect(reconn_setting_t* reconn) {
+        mqtt_client_set_reconnect(client, reconn);
+    }
+
+    void setConnectTimeout(int ms) {
+        mqtt_client_set_connect_timeout(client, ms);
+    }
+
+    int connect(const char* host, int port = DEFAULT_MQTT_PORT, int ssl = 0) {
+        return mqtt_client_connect(client, host, port);
+    }
+
+    int reconnect() {
+        return mqtt_client_reconnect(client);
+    }
+
+    int disconnect() {
+        return mqtt_client_disconnect(client);
+    }
+
+    bool isConnected() {
+        return mqtt_client_is_connected(client);
+    }
+
+    int publish(mqtt_message_t* msg, MqttCallback ack_cb = NULL) {
+        int mid = mqtt_client_publish(client, msg);
+        if (msg->qos > 0 && mid >= 0 && ack_cb) {
+            setAckCallback(mid, ack_cb);
+        }
+        return mid;
+    }
+
+    int publish(const std::string& topic, const std::string& payload, int qos = 0, int retain = 0, MqttCallback ack_cb = NULL) {
+        mqtt_message_t msg;
+        memset(&msg, 0, sizeof(msg));
+        msg.topic_len = topic.size();
+        msg.topic = topic.c_str();
+        msg.payload_len = payload.size();
+        msg.payload = payload.c_str();
+        msg.qos = qos;
+        msg.retain = retain;
+        return publish(&msg, ack_cb);
+    }
+
+    int subscribe(const char* topic, int qos = 0, MqttCallback ack_cb = NULL) {
+        int mid = mqtt_client_subscribe(client, topic, qos);
+        if (qos > 0 && mid >= 0 && ack_cb) {
+            setAckCallback(mid, ack_cb);
+        }
+        return mid;
+    }
+
+    int unsubscribe(const char* topic, MqttCallback ack_cb = NULL) {
+        int mid = mqtt_client_subscribe(client, topic);
+        if (mid >= 0 && ack_cb) {
+            setAckCallback(mid, ack_cb);
+        }
+        return mid;
+    }
+
+protected:
+    void setAckCallback(int mid, MqttCallback cb) {
+        ack_cbs_mutex.lock();
+        ack_cbs[mid] = std::move(cb);
+        ack_cbs_mutex.unlock();
+    }
+
+    void invokeAckCallback(int mid) {
+        MqttCallback ack_cb = NULL;
+        ack_cbs_mutex.lock();
+        auto iter = ack_cbs.find(mid);
+        if (iter != ack_cbs.end()) {
+            ack_cb = std::move(iter->second);
+            ack_cbs.erase(iter);
+        }
+        ack_cbs_mutex.unlock();
+        if (ack_cb) ack_cb(this);
+    }
+
+    static void on_mqtt(mqtt_client_t* cli, int type) {
+        MqttClient* client = (MqttClient*)mqtt_client_get_userdata(cli);
+        // printf("on_mqtt type=%d\n", type);
+        switch(type) {
+        case MQTT_TYPE_CONNECT:
+            // printf("mqtt connected!\n");
+            break;
+        case MQTT_TYPE_DISCONNECT:
+            // printf("mqtt disconnected!\n");
+            if (client->onClose) {
+                client->onClose(client);
+            }
+            break;
+        case MQTT_TYPE_CONNACK:
+            // printf("mqtt connack!\n");
+            if (client->onConnect) {
+                client->onConnect(client);
+            }
+            break;
+        case MQTT_TYPE_PUBLISH:
+            if (client->onMessage) {
+                client->onMessage(client, &cli->message);
+            }
+            break;
+        case MQTT_TYPE_PUBACK: /* qos = 1 */
+            // printf("mqtt puback mid=%d\n", cli->mid);
+            client->invokeAckCallback(cli->mid);
+            break;
+        case MQTT_TYPE_PUBREC: /* qos = 2 */
+            // printf("mqtt pubrec mid=%d\n", cli->mid);
+            // wait MQTT_TYPE_PUBCOMP
+            break;
+        case MQTT_TYPE_PUBCOMP: /* qos = 2 */
+            // printf("mqtt pubcomp mid=%d\n", cli->mid);
+            client->invokeAckCallback(cli->mid);
+            break;
+        case MQTT_TYPE_SUBACK:
+            // printf("mqtt suback mid=%d\n", cli->mid);
+            client->invokeAckCallback(cli->mid);
+            break;
+        case MQTT_TYPE_UNSUBACK:
+            // printf("mqtt unsuback mid=%d\n", cli->mid);
+            client->invokeAckCallback(cli->mid);
+            break;
+        default:
+            break;
+        }
+    }
+
+private:
+    // mid => ack callback
+    std::map<int, MqttCallback> ack_cbs;
+    std::mutex                  ack_cbs_mutex;
+};
+
+}
+#endif
+
 #endif // HV_MQTT_CLIENT_H_