||
- #include "mqtt_client.h"
- #include "hbase.h"
- #include "hlog.h"
- #include "herr.h"
- #include "hendian.h"
- #include "hsocket.h"
- static unsigned short mqtt_next_mid() {
- static unsigned short s_mid = 0;
- return ++s_mid;
- }
- static int mqtt_client_send(mqtt_client_t* cli, const void* buf, int len) {
- // thread-safe
- hmutex_lock(&cli->mutex_);
- int nwrite = hio_write(cli->io, buf, len);
- hmutex_unlock(&cli->mutex_);
- return nwrite;
- }
- static int mqtt_send_head(hio_t* io, int type, int length) {
- mqtt_client_t* cli = (mqtt_client_t*)hevent_userdata(io);
- mqtt_head_t head;
- memset(&head, 0, sizeof(head));
- head.type = type;
- head.length = length;
- unsigned char headbuf[8] = { 0 };
- int headlen = mqtt_head_pack(&head, headbuf);
- return mqtt_client_send(cli, headbuf, headlen);
- }
- static int mqtt_send_head_with_mid(hio_t* io, int type, unsigned short mid) {
- mqtt_client_t* cli = (mqtt_client_t*)hevent_userdata(io);
- mqtt_head_t head;
- memset(&head, 0, sizeof(head));
- head.type = type;
- if (head.type == MQTT_TYPE_PUBREL) {
- head.qos = 1;
- }
- head.length = 2;
- unsigned char headbuf[8] = { 0 };
- unsigned char* p = headbuf;
- int headlen = mqtt_head_pack(&head, p);
- p += headlen;
- PUSH16(p, mid);
- return mqtt_client_send(cli, headbuf, headlen + 2);
- }
- static void mqtt_send_ping(hio_t* io) {
- mqtt_client_t* cli = (mqtt_client_t*)hevent_userdata(io);
- if (cli->ping_cnt++ == 3) {
- hloge("mqtt no pong!");
- hio_close(io);
- return;
- }
- mqtt_send_head(io, MQTT_TYPE_PINGREQ, 0);
- }
- static void mqtt_send_pong(hio_t* io) {
- mqtt_send_head(io, MQTT_TYPE_PINGRESP, 0);
- }
- static void mqtt_send_disconnect(hio_t* io) {
- mqtt_send_head(io, MQTT_TYPE_DISCONNECT, 0);
- }
- /*
- * MQTT_TYPE_CONNECT
- * 2 + protocol_name + 1 protocol_version + 1 conn_flags + 2 keepalive + 2 + [client_id] +
- * [2 + will_topic + 2 + will_payload] +
- * [2 + username] + [2 + password]
- */
- static int mqtt_client_login(mqtt_client_t* cli) {
- int len = 2 + 1 + 1 + 2 + 2;
- unsigned short cid_len = 0,
- will_topic_len = 0,
- will_payload_len = 0,
- username_len = 0,
- password_len = 0;
- unsigned char conn_flags = 0;
- // protocol_name_len
- len += cli->protocol_version == MQTT_PROTOCOL_V31 ? 6 : 4;
- if (*cli->client_id) {
- cid_len = strlen(cli->client_id);
- } else {
- cid_len = 20;
- hv_random_string(cli->client_id, cid_len);
- hlogi("MQTT client_id: %.*s", (int)cid_len, cli->client_id);
- }
- len += cid_len;
- if (cid_len == 0) cli->clean_session = 1;
- if (cli->clean_session) {
- conn_flags |= MQTT_CONN_CLEAN_SESSION;
- }
- if (cli->will && cli->will->topic && cli->will->payload) {
- will_topic_len = cli->will->topic_len ? cli->will->topic_len : strlen(cli->will->topic);
- will_payload_len = cli->will->payload_len ? cli->will->payload_len : strlen(cli->will->payload);
- if (will_topic_len && will_payload_len) {
- conn_flags |= MQTT_CONN_HAS_WILL;
- conn_flags |= ((cli->will->qos & 3) << 3);
- if (cli->will->retain) {
- conn_flags |= MQTT_CONN_WILL_RETAIN;
- }
- len += 2 + will_topic_len;
- len += 2 + will_payload_len;
- }
- }
- if (*cli->username) {
- username_len = strlen(cli->username);
- if (username_len) {
- conn_flags |= MQTT_CONN_HAS_USERNAME;
- len += 2 + username_len;
- }
- }
- if (*cli->password) {
- password_len = strlen(cli->password);
- if (password_len) {
- conn_flags |= MQTT_CONN_HAS_PASSWORD;
- len += 2 + password_len;
- }
- }
- mqtt_head_t head;
- memset(&head, 0, sizeof(head));
- head.type = MQTT_TYPE_CONNECT;
- head.length = len;
- int buflen = mqtt_estimate_length(&head);
- unsigned char* buf = NULL;
- HV_STACK_ALLOC(buf, buflen);
- unsigned char* p = buf;
- int headlen = mqtt_head_pack(&head, p);
- p += headlen;
- // TODO: Not implement MQTT_PROTOCOL_V5
- if (cli->protocol_version == MQTT_PROTOCOL_V31) {
- PUSH16(p, 6);
- PUSH_N(p, MQTT_PROTOCOL_NAME_v31, 6);
- } else {
- PUSH16(p, 4);
- PUSH_N(p, MQTT_PROTOCOL_NAME, 4);
- }
- PUSH8(p, cli->protocol_version);
- PUSH8(p, conn_flags);
- PUSH16(p, cli->keepalive);
- PUSH16(p, cid_len);
- if (cid_len > 0) {
- PUSH_N(p, cli->client_id, cid_len);
- }
- if (conn_flags & MQTT_CONN_HAS_WILL) {
- PUSH16(p, will_topic_len);
- PUSH_N(p, cli->will->topic, will_topic_len);
- PUSH16(p, will_payload_len);
- PUSH_N(p, cli->will->payload, will_payload_len);
- }
- if (conn_flags & MQTT_CONN_HAS_USERNAME) {
- PUSH16(p, username_len);
- PUSH_N(p, cli->username, username_len);
- }
- if (conn_flags & MQTT_CONN_HAS_PASSWORD) {
- PUSH16(p, password_len);
- PUSH_N(p, cli->password, password_len);
- }
- int nwrite = mqtt_client_send(cli, buf, p - buf);
- HV_STACK_FREE(buf);
- return nwrite < 0 ? nwrite : 0;
- }
- static void connect_timeout_cb(htimer_t* timer) {
- mqtt_client_t* cli = (mqtt_client_t*)hevent_userdata(timer);
- if (cli == NULL) return;
- cli->timer = NULL;
- cli->error = ETIMEDOUT;
- hio_t* io = cli->io;
- if (io == NULL) return;
- char localaddrstr[SOCKADDR_STRLEN] = {0};
- char peeraddrstr[SOCKADDR_STRLEN] = {0};
- hlogw("connect timeout [%s] <=> [%s]",
- SOCKADDR_STR(hio_localaddr(io), localaddrstr),
- SOCKADDR_STR(hio_peeraddr(io), peeraddrstr));
- hio_close(io);
- }
- static void reconnect_timer_cb(htimer_t* timer) {
- mqtt_client_t* cli = (mqtt_client_t*)hevent_userdata(timer);
- if (cli == NULL) return;
- cli->timer = NULL;
- mqtt_client_reconnect(cli);
- }
- static void on_close(hio_t* io) {
- mqtt_client_t* cli = (mqtt_client_t*)hevent_userdata(io);
- cli->connected = 0;
- if (cli->cb) {
- cli->head.type = MQTT_TYPE_DISCONNECT;
- cli->cb(cli, cli->head.type);
- }
- // reconnect
- if (cli->reconn_setting && reconn_setting_can_retry(cli->reconn_setting)) {
- uint32_t delay = reconn_setting_calc_delay(cli->reconn_setting);
- cli->timer = htimer_add(cli->loop, reconnect_timer_cb, delay, 1);
- hevent_set_userdata(cli->timer, cli);
- }
- }
- static void on_packet(hio_t* io, void* buf, int len) {
- mqtt_client_t* cli = (mqtt_client_t*)hevent_userdata(io);
- unsigned char* p = (unsigned char*)buf;
- unsigned char* end = p + len;
- memset(&cli->head, 0, sizeof(mqtt_head_t));
- int headlen = mqtt_head_unpack(&cli->head, p, len);
- if (headlen <= 0) return;
- p += headlen;
- switch (cli->head.type) {
- // case MQTT_TYPE_CONNECT:
- case MQTT_TYPE_CONNACK:
- {
- if (cli->head.length < 2) {
- hloge("MQTT CONNACK malformed!");
- hio_close(io);
- return;
- }
- unsigned char conn_flags = 0, rc = 0;
- POP8(p, conn_flags);
- POP8(p, rc);
- if (rc != MQTT_CONNACK_ACCEPTED) {
- cli->error = rc;
- hloge("MQTT CONNACK error=%d", cli->error);
- hio_close(io);
- return;
- }
- cli->connected = 1;
- if (cli->timer) {
- htimer_del(cli->timer);
- cli->timer = NULL;
- }
- if (cli->keepalive) {
- cli->ping_cnt = 0;
- hio_set_heartbeat(io, cli->keepalive * 1000, mqtt_send_ping);
- }
- }
- break;
- case MQTT_TYPE_PUBLISH:
- {
- if (cli->head.length < 2) {
- hloge("MQTT PUBLISH malformed!");
- hio_close(io);
- return;
- }
- memset(&cli->message, 0, sizeof(mqtt_message_t));
- POP16(p, cli->message.topic_len);
- if (end - p < cli->message.topic_len) {
- hloge("MQTT PUBLISH malformed!");
- hio_close(io);
- return;
- }
- // NOTE: Not deep copy
- cli->message.topic = (char*)p;
- p += cli->message.topic_len;
- if (cli->head.qos > 0) {
- if (end - p < 2) {
- hloge("MQTT PUBLISH malformed!");
- hio_close(io);
- return;
- }
- POP16(p, cli->mid);
- }
- cli->message.payload_len = end - p;
- if (cli->message.payload_len > 0) {
- // NOTE: Not deep copy
- cli->message.payload = (char*)p;
- }
- cli->message.qos = cli->head.qos;
- if (cli->message.qos == 0) {
- // Do nothing
- } else if (cli->message.qos == 1) {
- mqtt_send_head_with_mid(io, MQTT_TYPE_PUBACK, cli->mid);
- } else if (cli->message.qos == 2) {
- mqtt_send_head_with_mid(io, MQTT_TYPE_PUBREC, cli->mid);
- }
- }
- break;
- case MQTT_TYPE_PUBACK:
- case MQTT_TYPE_PUBREC:
- case MQTT_TYPE_PUBREL:
- case MQTT_TYPE_PUBCOMP:
- {
- if (cli->head.length < 2) {
- hloge("MQTT PUBACK malformed!");
- hio_close(io);
- return;
- }
- POP16(p, cli->mid);
- if (cli->head.type == MQTT_TYPE_PUBREC) {
- mqtt_send_head_with_mid(io, MQTT_TYPE_PUBREL, cli->mid);
- } else if (cli->head.type == MQTT_TYPE_PUBREL) {
- mqtt_send_head_with_mid(io, MQTT_TYPE_PUBCOMP, cli->mid);
- }
- }
- break;
- // case MQTT_TYPE_SUBSCRIBE:
- // break;
- case MQTT_TYPE_SUBACK:
- {
- if (cli->head.length < 2) {
- hloge("MQTT SUBACK malformed!");
- hio_close(io);
- return;
- }
- POP16(p, cli->mid);
- }
- break;
- // case MQTT_TYPE_UNSUBSCRIBE:
- // break;
- case MQTT_TYPE_UNSUBACK:
- {
- if (cli->head.length < 2) {
- hloge("MQTT UNSUBACK malformed!");
- hio_close(io);
- return;
- }
- POP16(p, cli->mid);
- }
- break;
- case MQTT_TYPE_PINGREQ:
- // printf("recv ping\n");
- // printf("send pong\n");
- mqtt_send_pong(io);
- return;
- case MQTT_TYPE_PINGRESP:
- // printf("recv pong\n");
- cli->ping_cnt = 0;
- return;
- case MQTT_TYPE_DISCONNECT:
- hio_close(io);
- return;
- default:
- hloge("MQTT client received wrong type=%d", (int)cli->head.type);
- hio_close(io);
- return;
- }
- if (cli->cb) {
- cli->cb(cli, cli->head.type);
- }
- }
- static void on_connect(hio_t* io) {
- mqtt_client_t* cli = (mqtt_client_t*)hevent_userdata(io);
- if (cli->cb) {
- cli->head.type = MQTT_TYPE_CONNECT;
- cli->cb(cli, cli->head.type);
- }
- if (cli->reconn_setting) {
- reconn_setting_reset(cli->reconn_setting);
- }
- static unpack_setting_t mqtt_unpack_setting;
- mqtt_unpack_setting.mode = UNPACK_BY_LENGTH_FIELD;
- mqtt_unpack_setting.package_max_length = DEFAULT_MQTT_PACKAGE_MAX_LENGTH;
- mqtt_unpack_setting.body_offset = 2;
- mqtt_unpack_setting.length_field_offset = 1;
- mqtt_unpack_setting.length_field_bytes = 1;
- mqtt_unpack_setting.length_field_coding = ENCODE_BY_VARINT;
- hio_set_unpack(io, &mqtt_unpack_setting);
- // start recv packet
- hio_setcb_read(io, on_packet);
- hio_read(io);
- mqtt_client_login(cli);
- }
- mqtt_client_t* mqtt_client_new(hloop_t* loop) {
- if (loop == NULL) {
- loop = hloop_new(HLOOP_FLAG_AUTO_FREE);
- if (loop == NULL) return NULL;
- }
- mqtt_client_t* cli = NULL;
- HV_ALLOC_SIZEOF(cli);
- if (cli == NULL) return NULL;
- cli->loop = loop;
- cli->protocol_version = MQTT_PROTOCOL_V311;
- cli->keepalive = DEFAULT_MQTT_KEEPALIVE;
- hmutex_init(&cli->mutex_);
- return cli;
- }
- void mqtt_client_free(mqtt_client_t* cli) {
- if (!cli) return;
- hmutex_destroy(&cli->mutex_);
- if (cli->ssl_ctx && cli->alloced_ssl_ctx) {
- hssl_ctx_free(cli->ssl_ctx);
- cli->ssl_ctx = NULL;
- }
- HV_FREE(cli->reconn_setting);
- HV_FREE(cli->will);
- HV_FREE(cli);
- }
- void mqtt_client_run (mqtt_client_t* cli) {
- if (!cli || !cli->loop) return;
- hloop_run(cli->loop);
- }
- void mqtt_client_stop(mqtt_client_t* cli) {
- if (!cli || !cli->loop) return;
- hloop_stop(cli->loop);
- }
- void mqtt_client_set_id(mqtt_client_t* cli, const char* id) {
- if (!cli || !id) return;
- hv_strncpy(cli->client_id, id, sizeof(cli->client_id));
- }
- void mqtt_client_set_will(mqtt_client_t* cli, mqtt_message_t* will) {
- if (!cli || !will) return;
- if (cli->will == NULL) {
- HV_ALLOC_SIZEOF(cli->will);
- }
- memcpy(cli->will, will, sizeof(mqtt_message_t));
- }
- void mqtt_client_set_auth(mqtt_client_t* cli, const char* username, const char* password) {
- if (!cli) return;
- if (username) {
- hv_strncpy(cli->username, username, sizeof(cli->username));
- }
- if (password) {
- hv_strncpy(cli->password, password, sizeof(cli->password));
- }
- }
- void mqtt_client_set_callback(mqtt_client_t* cli, mqtt_client_cb cb) {
- if (!cli) return;
- cli->cb = cb;
- }
- void mqtt_client_set_userdata(mqtt_client_t* cli, void* userdata) {
- if (!cli) return;
- cli->userdata = userdata;
- }
- void* mqtt_client_get_userdata(mqtt_client_t* cli) {
- if (!cli) return NULL;
- return cli->userdata;
- }
- int mqtt_client_get_last_error(mqtt_client_t* cli) {
- if (!cli) return -1;
- return cli->error;
- }
- int mqtt_client_set_ssl_ctx(mqtt_client_t* cli, hssl_ctx_t ssl_ctx) {
- cli->ssl_ctx = ssl_ctx;
- return 0;
- }
- int mqtt_client_new_ssl_ctx(mqtt_client_t* cli, hssl_ctx_opt_t* opt) {
- opt->endpoint = HSSL_CLIENT;
- hssl_ctx_t ssl_ctx = hssl_ctx_new(opt);
- if (ssl_ctx == NULL) return ERR_NEW_SSL_CTX;
- cli->alloced_ssl_ctx = true;
- return mqtt_client_set_ssl_ctx(cli, ssl_ctx);
- }
- int mqtt_client_set_reconnect(mqtt_client_t* cli, reconn_setting_t* reconn) {
- if (reconn == NULL) {
- HV_FREE(cli->reconn_setting);
- return 0;
- }
- if (cli->reconn_setting == NULL) {
- HV_ALLOC_SIZEOF(cli->reconn_setting);
- }
- *cli->reconn_setting = *reconn;
- return 0;
- }
- int mqtt_client_reconnect(mqtt_client_t* cli) {
- mqtt_client_connect(cli, cli->host, cli->port, cli->ssl);
- return 0;
- }
- void mqtt_client_set_connect_timeout(mqtt_client_t* cli, int ms) {
- cli->connect_timeout = ms;
- }
- void mqtt_client_set_host(mqtt_client_t* cli, const char* host, int port, int ssl) {
- hv_strncpy(cli->host, host, sizeof(cli->host));
- cli->port = port;
- cli->ssl = ssl;
- }
- int mqtt_client_connect(mqtt_client_t* cli, const char* host, int port, int ssl) {
- if (!cli) return -1;
- hv_strncpy(cli->host, host, sizeof(cli->host));
- cli->port = port;
- cli->ssl = ssl;
- hio_t* io = hio_create_socket(cli->loop, host, port, HIO_TYPE_TCP, HIO_CLIENT_SIDE);
- if (io == NULL) return -1;
- if (ssl) {
- if (cli->ssl_ctx) {
- hio_set_ssl_ctx(io, cli->ssl_ctx);
- }
- hio_enable_ssl(io);
- }
- cli->io = io;
- hevent_set_userdata(io, cli);
- hio_setcb_connect(io, on_connect);
- hio_setcb_close(io, on_close);
- if (cli->connect_timeout > 0) {
- cli->timer = htimer_add(cli->loop, connect_timeout_cb, cli->connect_timeout, 1);
- hevent_set_userdata(cli->timer, cli);
- }
- return hio_connect(io);
- }
- bool mqtt_client_is_connected(mqtt_client_t* cli) {
- return cli && cli->connected;
- }
- int mqtt_client_disconnect(mqtt_client_t* cli) {
- if (!cli || !cli->io) return -1;
- // cancel reconnect first
- mqtt_client_set_reconnect(cli, NULL);
- mqtt_send_disconnect(cli->io);
- return hio_close(cli->io);
- }
- int mqtt_client_publish(mqtt_client_t* cli, mqtt_message_t* msg) {
- if (!cli || !cli->io || !msg) return -1;
- if (!cli->connected) return -2;
- int topic_len = msg->topic_len ? msg->topic_len : strlen(msg->topic);
- int payload_len = msg->payload_len ? msg->payload_len : strlen(msg->payload);
- int len = 2 + topic_len + payload_len;
- if (msg->qos > 0) len += 2; // mid
- unsigned short mid = 0;
- mqtt_head_t head;
- memset(&head, 0, sizeof(head));
- head.type = MQTT_TYPE_PUBLISH;
- head.qos = msg->qos & 3;
- head.retain = msg->retain;
- head.length = len;
- int buflen = mqtt_estimate_length(&head);
- // NOTE: send payload alone
- buflen -= payload_len;
- unsigned char* buf = NULL;
- HV_STACK_ALLOC(buf, buflen);
- unsigned char* p = buf;
- int headlen = mqtt_head_pack(&head, p);
- p += headlen;
- PUSH16(p, topic_len);
- PUSH_N(p, msg->topic, topic_len);
- if (msg->qos) {
- mid = mqtt_next_mid();
- PUSH16(p, mid);
- }
- hmutex_lock(&cli->mutex_);
- // send head + topic + mid
- int nwrite = hio_write(cli->io, buf, p - buf);
- HV_STACK_FREE(buf);
- if (nwrite < 0) {
- goto unlock;
- }
- // send payload
- nwrite = hio_write(cli->io, msg->payload, payload_len);
- unlock:
- hmutex_unlock(&cli->mutex_);
- return nwrite < 0 ? nwrite : mid;
- }
- int mqtt_client_subscribe(mqtt_client_t* cli, const char* topic, int qos) {
- if (!cli || !cli->io || !topic) return -1;
- if (!cli->connected) return -2;
- int topic_len = strlen(topic);
- int len = 2 + 2 + topic_len + 1;
- mqtt_head_t head;
- memset(&head, 0, sizeof(head));
- head.type = MQTT_TYPE_SUBSCRIBE;
- head.qos = 1;
- head.length = len;
- int buflen = mqtt_estimate_length(&head);
- unsigned char* buf = NULL;
- HV_STACK_ALLOC(buf, buflen);
- unsigned char* p = buf;
- int headlen = mqtt_head_pack(&head, p);
- p += headlen;
- unsigned short mid = mqtt_next_mid();
- PUSH16(p, mid);
- PUSH16(p, topic_len);
- PUSH_N(p, topic, topic_len);
- PUSH8(p, qos & 3);
- // send head + mid + topic + qos
- int nwrite = mqtt_client_send(cli, buf, p - buf);
- HV_STACK_FREE(buf);
- return nwrite < 0 ? nwrite : mid;
- }
- int mqtt_client_unsubscribe(mqtt_client_t* cli, const char* topic) {
- if (!cli || !cli->io || !topic) return -1;
- if (!cli->connected) return -2;
- int topic_len = strlen(topic);
- int len = 2 + 2 + topic_len;
- mqtt_head_t head;
- memset(&head, 0, sizeof(head));
- head.type = MQTT_TYPE_UNSUBSCRIBE;
- head.qos = 1;
- head.length = len;
- int buflen = mqtt_estimate_length(&head);
- unsigned char* buf = NULL;
- HV_STACK_ALLOC(buf, buflen);
- unsigned char* p = buf;
- int headlen = mqtt_head_pack(&head, p);
- p += headlen;
- unsigned short mid = mqtt_next_mid();
- PUSH16(p, mid);
- PUSH16(p, topic_len);
- PUSH_N(p, topic, topic_len);
- // send head + mid + topic
- int nwrite = mqtt_client_send(cli, buf, p - buf);
- HV_STACK_FREE(buf);
- return nwrite < 0 ? nwrite : mid;
- }
|