Prechádzať zdrojové kódy

New feature: WITH_MQTT

ithewei 3 rokov pred
rodič
commit
7bd7c293d6
21 zmenil súbory, kde vykonal 1058 pridanie a 4 odobranie
  1. 6 0
      BUILD.md
  2. 7 1
      CMakeLists.txt
  3. 14 1
      Makefile
  4. 3 0
      Makefile.vars
  5. 2 0
      README-CN.md
  6. 2 0
      README.md
  7. 1 0
      TREE.md
  8. 13 0
      base/hendian.h
  9. 5 0
      cmake/vars.cmake
  10. 1 0
      config.ini
  11. 2 1
      config.mk
  12. 1 0
      configure
  13. 20 0
      docs/API.md
  14. 1 1
      docs/PLAN.md
  15. 12 0
      examples/CMakeLists.txt
  16. 133 0
      examples/mqtt/mqtt_pub.c
  17. 103 0
      examples/mqtt/mqtt_sub.c
  18. 508 0
      mqtt/mqtt_client.c
  19. 115 0
      mqtt/mqtt_client.h
  20. 22 0
      mqtt/mqtt_protocol.c
  21. 87 0
      mqtt/mqtt_protocol.h

+ 6 - 0
BUILD.md

@@ -146,3 +146,9 @@ bin/curl -v http://localhost:8080 --http2
 ./configure --with-kcp
 make clean && make
 ```
+
+### compile WITH_MQTT
+```
+./configure --with-mqtt
+make clean && make
+```

+ 7 - 1
CMakeLists.txt

@@ -15,6 +15,7 @@ option(WITH_EVPP "compile evpp" ON)
 option(WITH_HTTP "compile http" ON)
 option(WITH_HTTP_SERVER "compile http/server" ON)
 option(WITH_HTTP_CLIENT "compile http/client" ON)
+option(WITH_MQTT "compile mqtt" OFF)
 
 option(ENABLE_IPV6 "ipv6" OFF)
 option(ENABLE_UDS "Unix Domain Socket" OFF)
@@ -163,7 +164,7 @@ if(APPLE)
 endif()
 
 # see Makefile
-set(ALL_SRCDIRS . base ssl event event/kcp util cpputil evpp protocol http http/client http/server)
+set(ALL_SRCDIRS . base ssl event event/kcp util cpputil evpp protocol http http/client http/server mqtt)
 set(CORE_SRCDIRS . base ssl event)
 if(WITH_KCP)
     set(CORE_SRCDIRS ${CORE_SRCDIRS} event/kcp)
@@ -201,6 +202,11 @@ if(WITH_EVPP)
     endif()
 endif()
 
+if(WITH_MQTT)
+    set(LIBHV_HEADERS ${LIBHV_HEADERS} ${MQTT_HEADERS})
+    set(LIBHV_SRCDIRS ${LIBHV_SRCDIRS} mqtt)
+endif()
+
 list_source_directories(LIBHV_SRCS ${LIBHV_SRCDIRS})
 
 if(BUILD_SHARED)

+ 14 - 1
Makefile

@@ -2,7 +2,7 @@ include config.mk
 include Makefile.vars
 
 MAKEF=$(MAKE) -f Makefile.in
-ALL_SRCDIRS=. base ssl event event/kcp util cpputil evpp protocol http http/client http/server
+ALL_SRCDIRS=. base ssl event event/kcp util cpputil evpp protocol http http/client http/server mqtt
 CORE_SRCDIRS=. base ssl event
 ifeq ($(WITH_KCP), yes)
 CORE_SRCDIRS += event/kcp
@@ -42,6 +42,11 @@ endif
 endif
 endif
 
+ifeq ($(WITH_MQTT), yes)
+LIBHV_HEADERS += $(MQTT_HEADERS)
+LIBHV_SRCDIRS += mqtt
+endif
+
 default: all
 all: libhv examples
 examples: hmain_test htimer_test hloop_test \
@@ -58,6 +63,8 @@ examples: hmain_test htimer_test hloop_test \
 	http_server_test http_client_test \
 	websocket_server_test \
 	websocket_client_test \
+	mqtt_sub \
+	mqtt_pub \
 	jsonrpc \
 
 clean:
@@ -157,6 +164,12 @@ websocket_server_test: prepare
 websocket_client_test: prepare
 	$(MAKEF) TARGET=$@ SRCDIRS="$(CORE_SRCDIRS) util cpputil evpp http http/client" SRCS="examples/websocket_client_test.cpp"
 
+mqtt_sub: prepare
+	$(MAKEF) TARGET=$@ SRCDIRS="$(CORE_SRCDIRS) mqtt" SRCS="examples/mqtt/mqtt_sub.c"
+
+mqtt_pub: prepare
+	$(MAKEF) TARGET=$@ SRCDIRS="$(CORE_SRCDIRS) mqtt" SRCS="examples/mqtt/mqtt_pub.c"
+
 jsonrpc: jsonrpc_client jsonrpc_server
 
 jsonrpc_client: prepare

+ 3 - 0
Makefile.vars

@@ -89,3 +89,6 @@ HTTP_SERVER_HEADERS =   http/server/HttpServer.h\
 						http/server/HttpContext.h\
 						http/server/HttpResponseWriter.h\
 						http/server/WebSocketServer.h\
+
+MQTT_HEADERS = mqtt/mqtt_protocol.h\
+			   mqtt/mqtt_client.h\

+ 2 - 0
README-CN.md

@@ -31,6 +31,7 @@
 - HTTP支持静态文件服务、目录服务、同步/异步API处理函数
 - HTTP支持RESTful风格、URI路由、keep-alive长连接、chunked分块等特性
 - WebSocket服务端/客户端
+- MQTT客户端
 
 ## ⌛️ 构建
 
@@ -379,6 +380,7 @@ int main() {
 - TinyHttpd示例:[examples/tinyhttpd.c](examples/tinyhttpd.c)
 - TinyProxyd示例:[examples/tinyproxyd.c](examples/tinyproxyd.c)
 - jsonRPC示例:  [examples/jsonrpc](examples/jsonrpc)
+- MQTT示例: [examples/mqtt](examples/mqtt)
 - 多accept进程模式: [examples/multi-thread/multi-acceptor-processes.c](examples/multi-thread/multi-acceptor-processes.c)
 - 多accept线程模式: [examples/multi-thread/multi-acceptor-threads.c](examples/multi-thread/multi-acceptor-threads.c)
 - 一个accept线程+多worker线程: [examples/multi-thread/one-acceptor-multi-workers.c](examples/multi-thread/one-acceptor-multi-workers.c)

+ 2 - 0
README.md

@@ -33,6 +33,7 @@ but simpler api and richer protocols.
 - HTTP static file service, indexof service, sync/async API handler
 - HTTP supports RESTful, URI router, keep-alive, chunked, etc.
 - WebSocket client/server
+- MQTT client
 
 ## ⌛️ Build
 
@@ -367,6 +368,7 @@ int main() {
 - [examples/tinyhttpd.c](examples/tinyhttpd.c)
 - [examples/tinyproxyd.c](examples/tinyproxyd.c)
 - [examples/jsonrpc](examples/jsonrpc)
+- [examples/mqtt](examples/mqtt)
 - [examples/multi-thread/multi-acceptor-processes.c](examples/multi-thread/multi-acceptor-processes.c)
 - [examples/multi-thread/multi-acceptor-threads.c](examples/multi-thread/multi-acceptor-threads.c)
 - [examples/multi-thread/one-acceptor-multi-workers.c](examples/multi-thread/one-acceptor-multi-workers.c)

+ 1 - 0
TREE.md

@@ -26,6 +26,7 @@
 ├── lib         库文件安装目录
 ├── logs        日志生成目录
 ├── misc        杂项
+├── mqtt        MQTT协议
 ├── protocol    包含icmp、dns、ftp、smtp等协议的实现
 ├── scripts     shell脚本存放目录
 ├── ssl         SSL/TLS加密通信

+ 13 - 0
base/hendian.h

@@ -94,6 +94,19 @@
 #define PUSH_LE32(p, v) PU32(p) = htole32(v); p += 4
 #define PUSH_LE64(p, v) PU64(p) = htole64(v); p += 8
 
+// NOTE: NET_ENDIAN = BIG_ENDIAN
+#define POP8(p, v)      POP_BE8(p, v)
+#define POP16(p, v)     POP_BE16(p, v)
+#define POP32(p, v)     POP_BE32(p, v)
+#define POP64(p, v)     POP_BE64(p, v)
+#define POP_N(p, v, n)  memcpy(v, p, n); p += n
+
+#define PUSH8(p, v)     PUSH_BE8(p, v)
+#define PUSH16(p, v)    PUSH_BE16(p, v)
+#define PUSH32(p, v)    PUSH_BE32(p, v)
+#define PUSH64(p, v)    PUSH_BE64(p, v)
+#define PUSH_N(p, v, n) memcpy(p, v, n); p += n
+
 static inline int detect_endian() {
     union {
         char c;

+ 5 - 0
cmake/vars.cmake

@@ -102,3 +102,8 @@ set(HTTP_SERVER_HEADERS
     http/server/HttpResponseWriter.h
     http/server/WebSocketServer.h
 )
+
+set(MQTT_HEADERS
+    mqtt/mqtt_protocol.h
+    mqtt/mqtt_client.h
+)

+ 1 - 0
config.ini

@@ -13,6 +13,7 @@ WITH_EVPP=yes
 WITH_HTTP=yes
 WITH_HTTP_SERVER=yes
 WITH_HTTP_CLIENT=yes
+WITH_MQTT=no
 
 # features
 # base/hsocket.c: replace gethostbyname with getaddrinfo

+ 2 - 1
config.mk

@@ -7,6 +7,7 @@ WITH_EVPP=yes
 WITH_HTTP=yes
 WITH_HTTP_SERVER=yes
 WITH_HTTP_CLIENT=yes
+WITH_MQTT=no
 ENABLE_IPV6=no
 ENABLE_UDS=no
 ENABLE_WINDUMP=no
@@ -17,4 +18,4 @@ WITH_OPENSSL=no
 WITH_GNUTLS=no
 WITH_MBEDTLS=no
 WITH_KCP=no
-CONFIG_DATE=20211124
+CONFIG_DATE=20220127

+ 1 - 0
configure

@@ -23,6 +23,7 @@ modules:
   --with-http           compile http module?            (DEFAULT: $WITH_HTTP)
   --with-http-client    compile http client module?     (DEFAULT: $WITH_HTTP_CLIENT)
   --with-http-server    compile http server module?     (DEFAULT: $WITH_HTTP_SERVER)
+  --with-mqtt           compile mqtt module?            (DEFAULT: $WITH_MQTT)
 
 features:
   --enable-ipv6         enable IPv6?                    (DEFAULT: $ENABLE_IPV6)

+ 20 - 0
docs/API.md

@@ -606,6 +606,26 @@
 - websocket_server_stop
 - class WebSocketServer
 
+## mqtt
+- mqtt_client_new
+- mqtt_client_free
+- mqtt_client_run
+- mqtt_client_stop
+- mqtt_client_set_id
+- mqtt_client_set_will
+- mqtt_client_set_auth
+- mqtt_client_set_callback
+- mqtt_client_set_userdata
+- mqtt_client_get_userdata
+- mqtt_client_get_last_error
+- mqtt_client_set_ssl_ctx
+- mqtt_client_new_ssl_ctx
+- mqtt_client_connect
+- mqtt_client_disconnect
+- mqtt_client_publish
+- mqtt_client_subscribe
+- mqtt_client_unsubscribe
+
 ## other
 - class HThreadPool
 - class HObjectPool

+ 1 - 1
docs/PLAN.md

@@ -6,6 +6,7 @@
 - evpp: c++ EventLoop interface similar to muduo and evpp
 - http client/server: include https http1/x http2
 - websocket client/server
+- mqtt client
 
 ## Improving
 
@@ -15,7 +16,6 @@
 
 ## Plan
 
-- mqtt client
 - redis client
 - async DNS
 - lua binding

+ 12 - 0
examples/CMakeLists.txt

@@ -137,4 +137,16 @@ endif()
 endif()
 endif()
 
+if(WITH_MQTT)
+    include_directories(../mqtt)
+
+    add_executable(mqtt_sub mqtt/mqtt_sub.c)
+    target_link_libraries(mqtt_sub ${HV_LIBRARIES})
+
+    add_executable(mqtt_pub mqtt/mqtt_pub.c)
+    target_link_libraries(mqtt_pub ${HV_LIBRARIES})
+
+    list(APPEND EXAMPLES mqtt_sub mqtt_pub)
+endif()
+
 add_custom_target(examples DEPENDS ${EXAMPLES})

+ 133 - 0
examples/mqtt/mqtt_pub.c

@@ -0,0 +1,133 @@
+/*
+ * mqtt publish
+ *
+ * @build   make examples
+ * @sub     bin/mqtt_sub 127.0.0.1 1883 topic
+ * @pub     bin/mqtt_pub 127.0.0.1 1883 topic payload
+ *
+ */
+
+#include "hv.h"
+#include "mqtt_client.h"
+
+/*
+ * @test    MQTTS
+ * #define  TEST_SSL 1
+ *
+ * @build   ./configure --with-openssl && make clean && make
+ *
+ */
+#define TEST_SSL        0
+#define TEST_AUTH       0
+
+/*
+ * workflow:
+ * mqtt_client_new -> mqtt_client_xxx -> mqtt_client_run
+ *
+ * mqtt_client_set_xxx ->
+ * mqtt_client_connect ->
+ * on_connack -> mqtt_client_publish ->
+ * on_puback -> mqtt_client_disconnect ->
+ * on_disconnect -> mqtt_client_stop
+ *
+ */
+
+static void on_mqtt(mqtt_client_t* cli, int type) {
+    printf("on_mqtt type=%d\n", type);
+    switch(type) {
+    case MQTT_TYPE_CONNECT:
+        printf("mqtt connected!\n");
+        break;
+    case MQTT_TYPE_DISCONNECT:
+        printf("mqtt disconnected!\n");
+    {
+        mqtt_message_t* msg = (mqtt_message_t*)mqtt_client_get_userdata(cli);
+        HV_FREE(msg);
+        mqtt_client_set_userdata(cli, NULL);
+        mqtt_client_stop(cli);
+    }
+        break;
+    case MQTT_TYPE_CONNACK:
+        printf("mqtt connack!\n");
+    {
+        mqtt_message_t* msg = (mqtt_message_t*)mqtt_client_get_userdata(cli);
+        if (msg == NULL) return;
+        int mid = mqtt_client_publish(cli, msg);
+        printf("mqtt publish mid=%d\n", mid);
+        if (msg->qos == 0) {
+            mqtt_client_disconnect(cli);
+        } else if (msg->qos == 1) {
+            // wait MQTT_TYPE_PUBACK
+        } else if (msg->qos == 2) {
+            // wait MQTT_TYPE_PUBREC
+        }
+    }
+        break;
+    case MQTT_TYPE_PUBACK: /* qos = 1 */
+        printf("mqtt puback mid=%d\n", cli->mid);
+        mqtt_client_disconnect(cli);
+        break;
+    case MQTT_TYPE_PUBREC: /* qos = 2 */
+        printf("mqtt pubrec mid=%d\n", cli->mid);
+        // wait MQTT_TYPE_PUBCOMP
+        break;
+    case MQTT_TYPE_PUBCOMP: /* qos = 2 */
+        printf("mqtt pubcomp mid=%d\n", cli->mid);
+        mqtt_client_disconnect(cli);
+        break;
+    default:
+        break;
+    }
+}
+
+static int mqtt_publish(const char* host, int port, const char* topic, const char* payload) {
+    mqtt_client_t* cli = mqtt_client_new(NULL);
+    if (cli == NULL) return -1;
+
+    // client_id
+    char client_id[64];
+    snprintf(client_id, sizeof(client_id), "mqtt_pub_%ld", hv_getpid());
+    printf("client_id: %s\n", client_id);
+    mqtt_client_set_id(cli, client_id);
+    // will
+    mqtt_message_t will;
+    memset(&will, 0, sizeof(will));
+    will.topic = "will";
+    will.payload = "This is a will.";
+    mqtt_client_set_will(cli, &will);
+#if TEST_AUTH
+    mqtt_client_set_auth(cli, "test", "123456");
+#endif
+
+    mqtt_message_t* msg = NULL;
+    HV_ALLOC_SIZEOF(msg);
+    msg->topic = topic;
+    msg->topic_len = strlen(topic);
+    msg->payload = payload;
+    msg->payload_len = strlen(payload);
+    msg->qos = 1;
+    mqtt_client_set_userdata(cli, msg);
+    mqtt_client_set_callback(cli, on_mqtt);
+
+    int ssl = 0;
+#if TEST_SSL
+    ssl = 1;
+#endif
+    mqtt_client_connect(cli, host, port, ssl);
+    mqtt_client_run(cli);
+    mqtt_client_free(cli);
+    return 0;
+}
+
+int main(int argc, char** argv) {
+    if (argc < 5) {
+        printf("Usage: %s host port topic payload\n", argv[0]);
+        return -10;
+    }
+    const char* host = argv[1];
+    int port = atoi(argv[2]);
+    const char* topic = argv[3];
+    const char* payload = argv[4];
+
+    return mqtt_publish(host, port, topic, payload);
+}

+ 103 - 0
examples/mqtt/mqtt_sub.c

@@ -0,0 +1,103 @@
+/*
+ * mqtt subscribe
+ *
+ * @build   make examples
+ * @sub     bin/mqtt_sub 127.0.0.1 1883 topic
+ * @pub     bin/mqtt_pub 127.0.0.1 1883 topic payload
+ *
+ */
+
+#include "hv.h"
+#include "mqtt_client.h"
+
+/*
+ * @test    MQTTS
+ * #define  TEST_SSL 1
+ *
+ * @build   ./configure --with-openssl && make clean && make
+ *
+ */
+#define TEST_SSL        0
+#define TEST_AUTH       0
+
+/*
+ * workflow:
+ * mqtt_client_new -> mqtt_client_xxx -> mqtt_client_run
+ *
+ * mqtt_client_set_xxx ->
+ * mqtt_client_connect ->
+ * on_connack -> mqtt_client_subscribe ->
+ * on_publish -> handle_message
+ *
+ */
+
+static void handle_message(mqtt_client_t* cli, mqtt_message_t* msg) {
+    printf("topic: %.*s\n", msg->topic_len, msg->topic);
+    printf("payload: %.*s\n", msg->payload_len, msg->payload);
+}
+
+static void on_mqtt(mqtt_client_t* cli, int type) {
+    printf("on_mqtt type=%d\n", type);
+    switch(type) {
+    case MQTT_TYPE_CONNECT:
+        printf("mqtt connected!\n");
+        break;
+    case MQTT_TYPE_DISCONNECT:
+        printf("mqtt disconnected!\n");
+        mqtt_client_stop(cli);
+        break;
+    case MQTT_TYPE_CONNACK:
+        printf("mqtt connack!\n");
+    {
+        const char* topic = (const char*)mqtt_client_get_userdata(cli);
+        int mid = mqtt_client_subscribe(cli, topic, 2);
+        printf("mqtt subscribe mid=%d\n", mid);
+    }
+        break;
+    case MQTT_TYPE_SUBACK:
+        printf("mqtt suback mid=%d\n", cli->mid);
+        break;
+    case MQTT_TYPE_PUBLISH:
+        handle_message(cli, &cli->message);
+    default:
+        break;
+    }
+}
+
+static int mqtt_subscribe(const char* host, int port, const char* topic) {
+    mqtt_client_t* cli = mqtt_client_new(NULL);
+    if (cli == NULL) return -1;
+
+    // client_id
+    char client_id[64];
+    snprintf(client_id, sizeof(client_id), "mqtt_sub_%ld", hv_getpid());
+    printf("client_id: %s\n", client_id);
+    mqtt_client_set_id(cli, client_id);
+#if TEST_AUTH
+    mqtt_client_set_auth(cli, "test", "123456");
+#endif
+
+    mqtt_client_set_userdata(cli, (void*)topic);
+    mqtt_client_set_callback(cli, on_mqtt);
+
+    int ssl = 0;
+#if TEST_SSL
+    ssl = 1;
+#endif
+    mqtt_client_connect(cli, host, port, ssl);
+    mqtt_client_run(cli);
+    mqtt_client_free(cli);
+    return 0;
+}
+
+int main(int argc, char** argv) {
+    if (argc < 4) {
+        printf("Usage: %s host port topic\n", argv[0]);
+        return -10;
+    }
+    const char* host = argv[1];
+    int port = atoi(argv[2]);
+    const char* topic = argv[3];
+
+    return mqtt_subscribe(host, port, topic);
+}

+ 508 - 0
mqtt/mqtt_client.c

@@ -0,0 +1,508 @@
+#include "mqtt_client.h"
+#include "hbase.h"
+#include "hlog.h"
+#include "hendian.h"
+
+static unsigned short mqtt_next_mid() {
+    static unsigned short s_mid = 0;
+    return ++s_mid;
+}
+
+static int mqtt_send_head(hio_t* io, int type, int length) {
+    mqtt_head_t head;
+    memset(&head, 0, sizeof(head));
+    head.type = type;
+    head.length = length;
+    unsigned char headbuf[8] = { 0 };
+    int headlen = mqtt_head_pack(&head, headbuf);
+    return hio_write(io, headbuf, headlen);
+}
+
+static int mqtt_send_head_with_mid(hio_t* io, int type, unsigned short mid) {
+    mqtt_head_t head;
+    memset(&head, 0, sizeof(head));
+    head.type = type;
+    if (head.type == MQTT_TYPE_PUBREL) {
+        head.qos = 1;
+    }
+    head.length = 2;
+    unsigned char headbuf[8] = { 0 };
+    unsigned char* p = headbuf;
+    int headlen = mqtt_head_pack(&head, p);
+    p += headlen;
+    PUSH16(p, mid);
+    return hio_write(io, headbuf, headlen + 2);
+}
+
+static void mqtt_send_ping(hio_t* io) {
+    mqtt_send_head(io, MQTT_TYPE_PINGREQ, 0);
+}
+
+static void mqtt_send_pong(hio_t* io) {
+    mqtt_send_head(io, MQTT_TYPE_PINGRESP, 0);
+}
+
+static void mqtt_send_disconnect(hio_t* io) {
+    mqtt_send_head(io, MQTT_TYPE_DISCONNECT, 0);
+}
+
+static int mqtt_client_login(mqtt_client_t* cli) {
+    int len = MQTT_CONN_HEAD_LEN;
+    unsigned short cid_len = 0,
+                   will_topic_len = 0,
+                   will_payload_len = 0,
+                   username_len = 0,
+                   password_len = 0;
+    unsigned char conn_flags = 0;
+
+    if (*cli->client_id) {
+        cid_len = strlen(cli->client_id);
+        len += cid_len;
+    }
+    if (cid_len == 0) cli->clean_session = 1;
+    if (cli->clean_session) {
+        conn_flags |= MQTT_CONN_CLEAN_SESSION;
+    }
+    if (cli->will && cli->will->topic && cli->will->payload) {
+        will_topic_len = cli->will->topic_len ? cli->will->topic_len : strlen(cli->will->topic);
+        will_payload_len = cli->will->payload_len ? cli->will->payload_len : strlen(cli->will->payload);
+        if (will_topic_len && will_payload_len) {
+            conn_flags |= MQTT_CONN_HAS_WILL;
+            conn_flags |= ((cli->will->qos & 3) << 3);
+            if (cli->will->retain) {
+                conn_flags |= MQTT_CONN_WILL_RETAIN;
+            }
+            len += 2 + will_topic_len;
+            len += 2 + will_payload_len;
+        }
+    }
+    if (*cli->username) {
+        username_len = strlen(cli->username);
+        if (username_len) {
+            conn_flags |= MQTT_CONN_HAS_USERNAME;
+            len += 2 + username_len;
+        }
+    }
+    if (*cli->password) {
+        password_len = strlen(cli->password);
+        if (password_len) {
+            conn_flags |= MQTT_CONN_HAS_PASSWORD;
+            len += 2 + password_len;
+        }
+    }
+
+    mqtt_head_t head;
+    memset(&head, 0, sizeof(head));
+    head.type = MQTT_TYPE_CONNECT;
+    head.length = len;
+    int buflen = mqtt_estimate_length(&head);
+    unsigned char* buf = NULL;
+    HV_STACK_ALLOC(buf, buflen);
+    unsigned char* p = buf;
+    int headlen = mqtt_head_pack(&head, p);
+    p += headlen;
+    // TODO: just implement MQTT_PROTOCOL_V311
+    PUSH16(p, 4);
+    PUSH_N(p, "MQTT", 4);
+    PUSH8(p, MQTT_PROTOCOL_V311);
+    PUSH8(p, conn_flags);
+    PUSH16(p, cli->keepalive);
+    PUSH16(p, cid_len);
+    if (cid_len > 0) {
+        PUSH_N(p, cli->client_id, cid_len);
+    }
+    if (conn_flags & MQTT_CONN_HAS_WILL) {
+        PUSH16(p, will_topic_len);
+        PUSH_N(p, cli->will->topic, will_topic_len);
+        PUSH16(p, will_payload_len);
+        PUSH_N(p, cli->will->payload, will_payload_len);
+    }
+    if (conn_flags & MQTT_CONN_HAS_USERNAME) {
+        PUSH16(p, username_len);
+        PUSH_N(p, cli->username, username_len);
+    }
+    if (conn_flags & MQTT_CONN_HAS_PASSWORD) {
+        PUSH16(p, password_len);
+        PUSH_N(p, cli->password, password_len);
+    }
+
+    int nwrite = hio_write(cli->io, buf, p - buf);
+    HV_STACK_FREE(buf);
+    return nwrite < 0 ? nwrite : 0;
+}
+
+static void on_close(hio_t* io) {
+    mqtt_client_t* cli = (mqtt_client_t*)hevent_userdata(io);
+    if (cli->cb) {
+        cli->head.type = MQTT_TYPE_DISCONNECT;
+        cli->cb(cli, cli->head.type);
+    }
+}
+
+static void on_packet(hio_t* io, void* buf, int len) {
+    mqtt_client_t* cli = (mqtt_client_t*)hevent_userdata(io);
+    unsigned char* p = (unsigned char*)buf;
+    unsigned char* end = p + len;
+    memset(&cli->head, 0, sizeof(mqtt_head_t));
+    int headlen = mqtt_head_unpack(&cli->head, p, len);
+    if (headlen <= 0) return;
+    p += headlen;
+    switch (cli->head.type) {
+    // case MQTT_TYPE_CONNECT:
+    case MQTT_TYPE_CONNACK:
+    {
+        if (cli->head.length < 2) {
+            hloge("MQTT CONNACK malformed!");
+            hio_close(io);
+            return;
+        }
+        unsigned char conn_flags = 0, rc = 0;
+        POP8(p, conn_flags);
+        POP8(p, rc);
+        if (rc != MQTT_CONNACK_ACCEPTED) {
+            cli->error = rc;
+            hloge("MQTT CONNACK error=%d", cli->error);
+            hio_close(io);
+            return;
+        }
+        if (cli->keepalive) {
+            hio_set_heartbeat(io, cli->keepalive * 1000, mqtt_send_ping);
+        }
+    }
+        break;
+    case MQTT_TYPE_PUBLISH:
+    {
+        if (cli->head.length < 2) {
+            hloge("MQTT PUBLISH malformed!");
+            hio_close(io);
+            return;
+        }
+        memset(&cli->message, 0, sizeof(mqtt_message_t));
+        POP16(p, cli->message.topic_len);
+        if (end - p < cli->message.topic_len) {
+            hloge("MQTT PUBLISH malformed!");
+            hio_close(io);
+            return;
+        }
+        // NOTE: Not deep copy
+        cli->message.topic = (char*)p;
+        p += cli->message.topic_len;
+        if (cli->head.qos > 0) {
+            if (end - p < 2) {
+                hloge("MQTT PUBLISH malformed!");
+                hio_close(io);
+                return;
+            }
+            POP16(p, cli->mid);
+        }
+        cli->message.payload_len = end - p;
+        if (cli->message.payload_len > 0) {
+            // NOTE: Not deep copy
+            cli->message.payload = (char*)p;
+        }
+        cli->message.qos = cli->head.qos;
+        if (cli->message.qos == 0) {
+            // Do nothing
+        } else if (cli->message.qos == 1) {
+            mqtt_send_head_with_mid(io, MQTT_TYPE_PUBACK, cli->mid);
+        } else if (cli->message.qos == 2) {
+            mqtt_send_head_with_mid(io, MQTT_TYPE_PUBREC, cli->mid);
+        }
+    }
+        break;
+    case MQTT_TYPE_PUBACK:
+    case MQTT_TYPE_PUBREC:
+    case MQTT_TYPE_PUBREL:
+    case MQTT_TYPE_PUBCOMP:
+    {
+        if (cli->head.length < 2) {
+            hloge("MQTT PUBACK malformed!");
+            hio_close(io);
+            return;
+        }
+        POP16(p, cli->mid);
+        if (cli->head.type == MQTT_TYPE_PUBREC) {
+            mqtt_send_head_with_mid(io, MQTT_TYPE_PUBREL, cli->mid);
+        } else if (cli->head.type == MQTT_TYPE_PUBREL) {
+            mqtt_send_head_with_mid(io, MQTT_TYPE_PUBCOMP, cli->mid);
+        }
+    }
+        break;
+    // case MQTT_TYPE_SUBSCRIBE:
+    //     break;
+    case MQTT_TYPE_SUBACK:
+    {
+        if (cli->head.length < 2) {
+            hloge("MQTT SUBACK malformed!");
+            hio_close(io);
+            return;
+        }
+        POP16(p, cli->mid);
+    }
+        break;
+    // case MQTT_TYPE_UNSUBSCRIBE:
+    //     break;
+    case MQTT_TYPE_UNSUBACK:
+    {
+        if (cli->head.length < 2) {
+            hloge("MQTT UNSUBACK malformed!");
+            hio_close(io);
+            return;
+        }
+        POP16(p, cli->mid);
+    }
+        break;
+    case MQTT_TYPE_PINGREQ:
+        mqtt_send_pong(io);
+        return;
+    case MQTT_TYPE_PINGRESP:
+        return;
+    case MQTT_TYPE_DISCONNECT:
+        hio_close(io);
+        return;
+    default:
+        hloge("MQTT client received wrong type=%d", (int)cli->head.type);
+        hio_close(io);
+        return;
+    }
+
+    if (cli->cb) {
+        cli->cb(cli, cli->head.type);
+    }
+}
+
+static void on_connect(hio_t* io) {
+    mqtt_client_t* cli = (mqtt_client_t*)hevent_userdata(io);
+    if (cli->cb) {
+        cli->head.type = MQTT_TYPE_CONNECT;
+        cli->cb(cli, cli->head.type);
+    }
+
+    static unpack_setting_t mqtt_unpack_setting;
+    mqtt_unpack_setting.mode = UNPACK_BY_LENGTH_FIELD;
+    mqtt_unpack_setting.package_max_length = DEFAULT_MQTT_PACKAGE_MAX_LENGTH;
+    mqtt_unpack_setting.body_offset = 2;
+    mqtt_unpack_setting.length_field_offset = 1;
+    mqtt_unpack_setting.length_field_bytes = 1;
+    mqtt_unpack_setting.length_field_coding = ENCODE_BY_VARINT;
+    hio_set_unpack(io, &mqtt_unpack_setting);
+
+    // start recv packet
+    hio_setcb_read(io, on_packet);
+    hio_read(io);
+
+    mqtt_client_login(cli);
+}
+
+mqtt_client_t* mqtt_client_new(hloop_t* loop) {
+    if (loop == NULL) {
+        loop = hloop_new(HLOOP_FLAG_AUTO_FREE);
+        if (loop == NULL) return NULL;
+    }
+    mqtt_client_t* cli = NULL;
+    HV_ALLOC_SIZEOF(cli);
+    if (cli == NULL) return NULL;
+    cli->loop = loop;
+    cli->keepalive = DEFAULT_MQTT_KEEPALIVE;
+    hmutex_init(&cli->mutex_);
+    return cli;
+}
+
+void mqtt_client_free(mqtt_client_t* cli) {
+    if (!cli) return;
+    hmutex_destroy(&cli->mutex_);
+    if (cli->ssl_ctx && cli->alloced_ssl_ctx) {
+        hssl_ctx_free(cli->ssl_ctx);
+        cli->ssl_ctx = NULL;
+    }
+    HV_FREE(cli->will);
+    HV_FREE(cli);
+}
+
+void mqtt_client_run (mqtt_client_t* cli) {
+    if (!cli || !cli->loop) return;
+    hloop_run(cli->loop);
+}
+
+void mqtt_client_stop(mqtt_client_t* cli) {
+    if (!cli || !cli->loop) return;
+    hloop_stop(cli->loop);
+}
+
+void mqtt_client_set_id(mqtt_client_t* cli, const char* id) {
+    if (!cli || !id) return;
+    safe_strncpy(cli->client_id, id, sizeof(cli->client_id));
+}
+
+void mqtt_client_set_will(mqtt_client_t* cli, mqtt_message_t* will) {
+    if (!cli || !will) return;
+    if (cli->will == NULL) {
+        HV_ALLOC_SIZEOF(cli->will);
+    }
+    memcpy(cli->will, will, sizeof(mqtt_message_t));
+}
+
+void mqtt_client_set_auth(mqtt_client_t* cli, const char* username, const char* password) {
+    if (!cli) return;
+    if (username) {
+        safe_strncpy(cli->username, username, sizeof(cli->username));
+    }
+    if (password) {
+        safe_strncpy(cli->password, password, sizeof(cli->password));
+    }
+}
+
+void mqtt_client_set_callback(mqtt_client_t* cli, mqtt_client_cb cb) {
+    if (!cli) return;
+    cli->cb = cb;
+}
+
+void  mqtt_client_set_userdata(mqtt_client_t* cli, void* userdata) {
+    if (!cli) return;
+    cli->userdata = userdata;
+}
+
+void* mqtt_client_get_userdata(mqtt_client_t* cli) {
+    if (!cli) return NULL;
+    return cli->userdata;
+}
+
+int mqtt_client_get_last_error(mqtt_client_t* cli) {
+    if (!cli) return -1;
+    return cli->error;
+}
+
+int mqtt_client_set_ssl_ctx(mqtt_client_t* cli, hssl_ctx_t ssl_ctx) {
+    cli->ssl_ctx = ssl_ctx;
+    return 0;
+}
+
+int mqtt_client_new_ssl_ctx(mqtt_client_t* cli, hssl_ctx_opt_t* opt) {
+    opt->endpoint = HSSL_CLIENT;
+    hssl_ctx_t ssl_ctx = hssl_ctx_new(opt);
+    if (ssl_ctx == NULL) return HSSL_ERROR;
+    cli->alloced_ssl_ctx = true;
+    return mqtt_client_set_ssl_ctx(cli, ssl_ctx);
+}
+
+int mqtt_client_connect(mqtt_client_t* cli, const char* host, int port, int ssl) {
+    if (!cli) return -1;
+    hio_t* io = hio_create_socket(cli->loop, host, port, HIO_TYPE_TCP, HIO_CLIENT_SIDE);
+    if (io == NULL) return -1;
+    if (ssl) {
+        cli->ssl = 1;
+        if (cli->ssl_ctx) {
+            hio_set_ssl_ctx(io, cli->ssl_ctx);
+        }
+        hio_enable_ssl(io);
+    }
+    cli->io = io;
+    hevent_set_userdata(io, cli);
+    hio_setcb_connect(io, on_connect);
+    hio_setcb_close(io, on_close);
+    return hio_connect(io);
+}
+
+int mqtt_client_disconnect(mqtt_client_t* cli) {
+    if (!cli || !cli->io) return -1;
+    mqtt_send_disconnect(cli->io);
+    return hio_close(cli->io);
+}
+
+int mqtt_client_publish(mqtt_client_t* cli, mqtt_message_t* msg) {
+    if (!cli || !cli->io || !msg) return -1;
+    int topic_len = msg->topic_len ? msg->topic_len : strlen(msg->topic);
+    int payload_len = msg->payload_len ? msg->payload_len : strlen(msg->payload);
+    int len = 2 + topic_len + payload_len;
+    if (msg->qos > 0) len += 2; // mid
+    unsigned short mid = 0;
+
+    mqtt_head_t head;
+    memset(&head, 0, sizeof(head));
+    head.type = MQTT_TYPE_PUBLISH;
+    head.qos = msg->qos & 3;
+    head.retain = msg->retain;
+    head.length = len;
+    int buflen = mqtt_estimate_length(&head);
+    // NOTE: send payload alone
+    buflen -= payload_len;
+    unsigned char* buf = NULL;
+    HV_STACK_ALLOC(buf, buflen);
+    unsigned char* p = buf;
+    int headlen = mqtt_head_pack(&head, p);
+    p += headlen;
+    PUSH16(p, topic_len);
+    PUSH_N(p, msg->topic, topic_len);
+    if (msg->qos) {
+        mid = mqtt_next_mid();
+        PUSH16(p, mid);
+    }
+
+    hmutex_lock(&cli->mutex_);
+    // send head + topic + mid
+    int nwrite = hio_write(cli->io, buf, p - buf);
+    HV_STACK_FREE(buf);
+    if (nwrite < 0) {
+        goto unlock;
+    }
+
+    // send payload
+    nwrite = hio_write(cli->io, msg->payload, payload_len);
+
+unlock:
+    hmutex_unlock(&cli->mutex_);
+    return nwrite < 0 ? nwrite : mid;
+}
+
+int mqtt_client_subscribe(mqtt_client_t* cli, const char* topic, int qos) {
+    if (!cli || !cli->io || !topic) return -1;
+    int topic_len = strlen(topic);
+    int len = 2 + 2 + topic_len + 1;
+
+    mqtt_head_t head;
+    memset(&head, 0, sizeof(head));
+    head.type = MQTT_TYPE_SUBSCRIBE;
+    head.qos = 1;
+    head.length = len;
+    int buflen = mqtt_estimate_length(&head);
+    unsigned char* buf = NULL;
+    HV_STACK_ALLOC(buf, buflen);
+    unsigned char* p = buf;
+    int headlen = mqtt_head_pack(&head, p);
+    p += headlen;
+    unsigned short mid = mqtt_next_mid();
+    PUSH16(p, mid);
+    PUSH16(p, topic_len);
+    PUSH_N(p, topic, topic_len);
+    PUSH8(p, qos & 3);
+    // send head + mid + topic + qos
+    int nwrite = hio_write(cli->io, buf, p - buf);
+    HV_STACK_FREE(buf);
+    return nwrite < 0 ? nwrite : mid;
+}
+
+int mqtt_client_unsubscribe(mqtt_client_t* cli, const char* topic) {
+    if (!cli || !cli->io || !topic) return -1;
+    int topic_len = strlen(topic);
+    int len = 2 + 2 + topic_len;
+
+    mqtt_head_t head;
+    memset(&head, 0, sizeof(head));
+    head.type = MQTT_TYPE_UNSUBSCRIBE;
+    head.qos = 1;
+    head.length = len;
+    int buflen = mqtt_estimate_length(&head);
+    unsigned char* buf = NULL;
+    HV_STACK_ALLOC(buf, buflen);
+    unsigned char* p = buf;
+    int headlen = mqtt_head_pack(&head, p);
+    p += headlen;
+    unsigned short mid = mqtt_next_mid();
+    PUSH16(p, mid);
+    PUSH16(p, topic_len);
+    PUSH_N(p, topic, topic_len);
+    // send head + mid + topic
+    int nwrite = hio_write(cli->io, buf, p - buf);
+    HV_STACK_FREE(buf);
+    return nwrite < 0 ? nwrite : mid;
+}

+ 115 - 0
mqtt/mqtt_client.h

@@ -0,0 +1,115 @@
+#ifndef HV_MQTT_CLIENT_H_
+#define HV_MQTT_CLIENT_H_
+
+#include "mqtt_protocol.h"
+#include "hloop.h"
+#include "hssl.h"
+#include "hmutex.h"
+
+#define DEFAULT_MQTT_KEEPALIVE  60 // s
+
+typedef struct mqtt_client_s mqtt_client_t;
+
+// @type    mqtt_type_e
+// @example examples/mqtt
+typedef void (*mqtt_client_cb)(mqtt_client_t* cli, int type);
+
+struct mqtt_client_s {
+    // connect: host:port
+    char host[256];
+    int  port;
+    // login: flags + keepalive + client_id + will + username + password
+    // flags
+    unsigned short clean_session:   1;
+    unsigned short ssl: 1; // Read Only
+    unsigned short alloced_ssl_ctx: 1; // intern
+    unsigned short keepalive;
+    char client_id[64];
+    // will
+    mqtt_message_t* will;
+    // auth
+    char username[64];
+    char password[64];
+    // message
+    mqtt_head_t head;
+    int error;              // for MQTT_TYPE_CONNACK
+    int mid;                // for MQTT_TYPE_SUBACK, MQTT_TYPE_PUBACK
+    mqtt_message_t message; // for MQTT_TYPE_PUBLISH
+    // callback
+    mqtt_client_cb cb;
+    // userdata
+    void* userdata;
+    // privdata
+    hloop_t* loop;
+    hio_t*   io;
+    // SSL/TLS
+    hssl_ctx_t ssl_ctx;
+    // thread-safe
+    hmutex_t mutex_;
+};
+
+BEGIN_EXTERN_C
+
+// hloop_new -> malloc(mqtt_client_t)
+HV_EXPORT mqtt_client_t* mqtt_client_new(hloop_t* loop DEFAULT(NULL));
+// @see hloop_run
+HV_EXPORT void           mqtt_client_run (mqtt_client_t* cli);
+// @see hloop_stop
+HV_EXPORT void           mqtt_client_stop(mqtt_client_t* cli);
+// hloop_free -> free(mqtt_client_t)
+HV_EXPORT void           mqtt_client_free(mqtt_client_t* cli);
+
+// id
+HV_EXPORT void mqtt_client_set_id(mqtt_client_t* cli, const char* id);
+
+// will
+HV_EXPORT void mqtt_client_set_will(mqtt_client_t* cli,
+        mqtt_message_t* will);
+
+// auth
+HV_EXPORT void mqtt_client_set_auth(mqtt_client_t* cli,
+        const char* username, const char* password);
+
+// callback
+HV_EXPORT void mqtt_client_set_callback(mqtt_client_t* cli, mqtt_client_cb cb);
+
+// userdata
+HV_EXPORT void  mqtt_client_set_userdata(mqtt_client_t* cli, void* userdata);
+HV_EXPORT void* mqtt_client_get_userdata(mqtt_client_t* cli);
+
+// error
+HV_EXPORT int mqtt_client_get_last_error(mqtt_client_t* cli);
+
+// SSL/TLS
+HV_EXPORT int mqtt_client_set_ssl_ctx(mqtt_client_t* cli, hssl_ctx_t ssl_ctx);
+// hssl_ctx_new(opt) -> mqtt_client_set_ssl_ctx
+HV_EXPORT int mqtt_client_new_ssl_ctx(mqtt_client_t* cli, hssl_ctx_opt_t* opt);
+
+// connect
+// hio_create_socket -> hio_connect ->
+// on_connect -> mqtt_client_login ->
+// on_connack
+HV_EXPORT int mqtt_client_connect(mqtt_client_t* cli,
+        const char* host,
+        int port DEFAULT(DEFAULT_MQTT_PORT),
+        int ssl  DEFAULT(0));
+
+// disconnect
+// @see hio_close
+HV_EXPORT int mqtt_client_disconnect(mqtt_client_t* cli);
+
+// publish
+HV_EXPORT int mqtt_client_publish(mqtt_client_t* cli,
+        mqtt_message_t* msg);
+
+// subscribe
+HV_EXPORT int mqtt_client_subscribe(mqtt_client_t* cli,
+        const char* topic, int qos);
+
+// unsubscribe
+HV_EXPORT int mqtt_client_unsubscribe(mqtt_client_t* cli,
+        const char* topic);
+
+END_EXTERN_C
+
+#endif // HV_MQTT_CLIENT_H_

+ 22 - 0
mqtt/mqtt_protocol.c

@@ -0,0 +1,22 @@
+#include "mqtt_protocol.h"
+#include "hmath.h"
+
+int mqtt_head_pack(mqtt_head_t* head, unsigned char buf[]) {
+    buf[0] = (head->type << 4) |
+             (head->dup  << 3) |
+             (head->qos  << 1) |
+             (head->retain);
+    int bytes = varint_encode(head->length, buf + 1);
+    return 1 + bytes;
+}
+
+int mqtt_head_unpack(mqtt_head_t* head, const unsigned char* buf, int len) {
+    head->type   = (buf[0] >> 4) & 0x0F;
+    head->dup    = (buf[0] >> 3) & 0x01;
+    head->qos    = (buf[0] >> 1) & 0x03;
+    head->retain =  buf[0] & 0x01;
+    int bytes = len - 1;
+    head->length = varint_decode(buf + 1, &bytes);
+    if (bytes <= 0) return bytes;
+    return 1 + bytes;
+}

+ 87 - 0
mqtt/mqtt_protocol.h

@@ -0,0 +1,87 @@
+#ifndef HV_MQTT_PROTOCOL_H_
+#define HV_MQTT_PROTOCOL_H_
+
+#include "hexport.h"
+
+#define DEFAULT_MQTT_PORT   1883
+
+#define MQTT_PROTOCOL_V31   3 // Deprecated
+#define MQTT_PROTOCOL_V311  4
+#define MQTT_PROTOCOL_V5    5 // Not yet supproted
+
+/*
+ * MQTT connect
+ * 2 + 4 protocol_name + 1 protocol_version + 1 conn_flags + 2 keepalive + 2 + [client_id] +
+ * [2 + will_topic + 2 + will_payload] +
+ * [2 + username] + [2 + password]
+ */
+#define MQTT_CONN_HEAD_LEN  12
+
+/*
+ * connect flags
+ * 0        1               2       3-4         5           6           7
+ * reserved clean_session has_will will_qos will_retain has_password has_username
+ */
+#define MQTT_CONN_CLEAN_SESSION 0x02
+#define MQTT_CONN_HAS_WILL      0x04
+#define MQTT_CONN_WILL_RETAIN   0x20
+#define MQTT_CONN_HAS_PASSWORD  0x40
+#define MQTT_CONN_HAS_USERNAME  0x80
+
+typedef enum {
+    MQTT_TYPE_CONNECT       = 1,
+    MQTT_TYPE_CONNACK       = 2,
+    MQTT_TYPE_PUBLISH       = 3,
+    MQTT_TYPE_PUBACK        = 4,
+    MQTT_TYPE_PUBREC        = 5,
+    MQTT_TYPE_PUBREL        = 6,
+    MQTT_TYPE_PUBCOMP       = 7,
+    MQTT_TYPE_SUBSCRIBE     = 8,
+    MQTT_TYPE_SUBACK        = 9,
+    MQTT_TYPE_UNSUBSCRIBE   = 10,
+    MQTT_TYPE_UNSUBACK      = 11,
+    MQTT_TYPE_PINGREQ       = 12,
+    MQTT_TYPE_PINGRESP      = 13,
+    MQTT_TYPE_DISCONNECT    = 14,
+} mqtt_type_e;
+
+typedef enum {
+    MQTT_CONNACK_ACCEPTED                       = 0,
+    MQTT_CONNACK_REFUSED_PROTOCOL_VERSION       = 1,
+    MQTT_CONNACK_REFUSED_IDENTIFIER_REJECTED    = 2,
+    MQTT_CONNACK_REFUSED_SERVER_UNAVAILABLE     = 3,
+    MQTT_CONNACK_REFUSED_BAD_USERNAME_PASSWORD  = 4,
+    MQTT_CONNACK_REFUSED_NOT_AUTHORIZED         = 5,
+} mqtt_connack_e;
+
+typedef struct mqtt_head_s {
+    unsigned char type:     4;
+    unsigned char dup:      1;
+    unsigned char qos:      2;
+    unsigned char retain:   1;
+    unsigned int  length;
+} mqtt_head_t;
+
+typedef struct mqtt_message_s {
+    unsigned int    topic_len;
+    const char*     topic;
+    unsigned int    payload_len;
+    const char*     payload;
+    unsigned char   qos;
+    unsigned char   retain;
+} mqtt_message_t;
+
+BEGIN_EXTERN_C
+
+#define DEFAULT_MQTT_PACKAGE_MAX_LENGTH (1 << 28)   // 256M
+HV_INLINE int mqtt_estimate_length(mqtt_head_t* head) {
+    // 28 bits => 4*7 bits varint
+    return 1 + 4 + head->length;
+}
+
+HV_EXPORT int mqtt_head_pack(mqtt_head_t* head, unsigned char buf[]);
+HV_EXPORT int mqtt_head_unpack(mqtt_head_t* head, const unsigned char* buf, int len);
+
+END_EXTERN_C
+
+#endif // HV_MQTT_PROTOCOL_H_