1
0

mqtt_client.c 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629
  1. #include "mqtt_client.h"
  2. #include "hbase.h"
  3. #include "hlog.h"
  4. #include "herr.h"
  5. #include "hendian.h"
  6. #include "hsocket.h"
  7. static unsigned short mqtt_next_mid() {
  8. static unsigned short s_mid = 0;
  9. return ++s_mid;
  10. }
  11. static int mqtt_client_send(mqtt_client_t* cli, const void* buf, int len) {
  12. // thread-safe
  13. hmutex_lock(&cli->mutex_);
  14. int nwrite = hio_write(cli->io, buf, len);
  15. hmutex_unlock(&cli->mutex_);
  16. return nwrite;
  17. }
  18. static int mqtt_send_head(hio_t* io, int type, int length) {
  19. mqtt_client_t* cli = (mqtt_client_t*)hevent_userdata(io);
  20. mqtt_head_t head;
  21. memset(&head, 0, sizeof(head));
  22. head.type = type;
  23. head.length = length;
  24. unsigned char headbuf[8] = { 0 };
  25. int headlen = mqtt_head_pack(&head, headbuf);
  26. return mqtt_client_send(cli, headbuf, headlen);
  27. }
  28. static int mqtt_send_head_with_mid(hio_t* io, int type, unsigned short mid) {
  29. mqtt_client_t* cli = (mqtt_client_t*)hevent_userdata(io);
  30. mqtt_head_t head;
  31. memset(&head, 0, sizeof(head));
  32. head.type = type;
  33. if (head.type == MQTT_TYPE_PUBREL) {
  34. head.qos = 1;
  35. }
  36. head.length = 2;
  37. unsigned char headbuf[8] = { 0 };
  38. unsigned char* p = headbuf;
  39. int headlen = mqtt_head_pack(&head, p);
  40. p += headlen;
  41. PUSH16(p, mid);
  42. return mqtt_client_send(cli, headbuf, headlen + 2);
  43. }
  44. static void mqtt_send_ping(hio_t* io) {
  45. mqtt_client_t* cli = (mqtt_client_t*)hevent_userdata(io);
  46. if (cli->ping_cnt++ == 3) {
  47. hloge("mqtt no pong!");
  48. hio_close(io);
  49. return;
  50. }
  51. mqtt_send_head(io, MQTT_TYPE_PINGREQ, 0);
  52. }
  53. static void mqtt_send_pong(hio_t* io) {
  54. mqtt_send_head(io, MQTT_TYPE_PINGRESP, 0);
  55. }
  56. static void mqtt_send_disconnect(hio_t* io) {
  57. mqtt_send_head(io, MQTT_TYPE_DISCONNECT, 0);
  58. }
  59. /*
  60. * MQTT_TYPE_CONNECT
  61. * 2 + protocol_name + 1 protocol_version + 1 conn_flags + 2 keepalive + 2 + [client_id] +
  62. * [2 + will_topic + 2 + will_payload] +
  63. * [2 + username] + [2 + password]
  64. */
  65. static int mqtt_client_login(mqtt_client_t* cli) {
  66. int len = 2 + 1 + 1 + 2 + 2;
  67. unsigned short cid_len = 0,
  68. will_topic_len = 0,
  69. will_payload_len = 0,
  70. username_len = 0,
  71. password_len = 0;
  72. unsigned char conn_flags = 0;
  73. // protocol_name_len
  74. len += cli->protocol_version == MQTT_PROTOCOL_V31 ? 6 : 4;
  75. if (*cli->client_id) {
  76. cid_len = strlen(cli->client_id);
  77. } else {
  78. cid_len = 20;
  79. hv_random_string(cli->client_id, cid_len);
  80. hlogi("MQTT client_id: %.*s", (int)cid_len, cli->client_id);
  81. }
  82. len += cid_len;
  83. if (cid_len == 0) cli->clean_session = 1;
  84. if (cli->clean_session) {
  85. conn_flags |= MQTT_CONN_CLEAN_SESSION;
  86. }
  87. if (cli->will && cli->will->topic && cli->will->payload) {
  88. will_topic_len = cli->will->topic_len ? cli->will->topic_len : strlen(cli->will->topic);
  89. will_payload_len = cli->will->payload_len ? cli->will->payload_len : strlen(cli->will->payload);
  90. if (will_topic_len && will_payload_len) {
  91. conn_flags |= MQTT_CONN_HAS_WILL;
  92. conn_flags |= ((cli->will->qos & 3) << 3);
  93. if (cli->will->retain) {
  94. conn_flags |= MQTT_CONN_WILL_RETAIN;
  95. }
  96. len += 2 + will_topic_len;
  97. len += 2 + will_payload_len;
  98. }
  99. }
  100. if (*cli->username) {
  101. username_len = strlen(cli->username);
  102. if (username_len) {
  103. conn_flags |= MQTT_CONN_HAS_USERNAME;
  104. len += 2 + username_len;
  105. }
  106. }
  107. if (*cli->password) {
  108. password_len = strlen(cli->password);
  109. if (password_len) {
  110. conn_flags |= MQTT_CONN_HAS_PASSWORD;
  111. len += 2 + password_len;
  112. }
  113. }
  114. mqtt_head_t head;
  115. memset(&head, 0, sizeof(head));
  116. head.type = MQTT_TYPE_CONNECT;
  117. head.length = len;
  118. int buflen = mqtt_estimate_length(&head);
  119. unsigned char* buf = NULL;
  120. HV_STACK_ALLOC(buf, buflen);
  121. unsigned char* p = buf;
  122. int headlen = mqtt_head_pack(&head, p);
  123. p += headlen;
  124. // TODO: Not implement MQTT_PROTOCOL_V5
  125. if (cli->protocol_version == MQTT_PROTOCOL_V31) {
  126. PUSH16(p, 6);
  127. PUSH_N(p, MQTT_PROTOCOL_NAME_v31, 6);
  128. } else {
  129. PUSH16(p, 4);
  130. PUSH_N(p, MQTT_PROTOCOL_NAME, 4);
  131. }
  132. PUSH8(p, cli->protocol_version);
  133. PUSH8(p, conn_flags);
  134. PUSH16(p, cli->keepalive);
  135. PUSH16(p, cid_len);
  136. if (cid_len > 0) {
  137. PUSH_N(p, cli->client_id, cid_len);
  138. }
  139. if (conn_flags & MQTT_CONN_HAS_WILL) {
  140. PUSH16(p, will_topic_len);
  141. PUSH_N(p, cli->will->topic, will_topic_len);
  142. PUSH16(p, will_payload_len);
  143. PUSH_N(p, cli->will->payload, will_payload_len);
  144. }
  145. if (conn_flags & MQTT_CONN_HAS_USERNAME) {
  146. PUSH16(p, username_len);
  147. PUSH_N(p, cli->username, username_len);
  148. }
  149. if (conn_flags & MQTT_CONN_HAS_PASSWORD) {
  150. PUSH16(p, password_len);
  151. PUSH_N(p, cli->password, password_len);
  152. }
  153. int nwrite = mqtt_client_send(cli, buf, p - buf);
  154. HV_STACK_FREE(buf);
  155. return nwrite < 0 ? nwrite : 0;
  156. }
  157. static void connect_timeout_cb(htimer_t* timer) {
  158. mqtt_client_t* cli = (mqtt_client_t*)hevent_userdata(timer);
  159. if (cli == NULL) return;
  160. cli->timer = NULL;
  161. cli->error = ETIMEDOUT;
  162. hio_t* io = cli->io;
  163. if (io == NULL) return;
  164. char localaddrstr[SOCKADDR_STRLEN] = {0};
  165. char peeraddrstr[SOCKADDR_STRLEN] = {0};
  166. hlogw("connect timeout [%s] <=> [%s]",
  167. SOCKADDR_STR(hio_localaddr(io), localaddrstr),
  168. SOCKADDR_STR(hio_peeraddr(io), peeraddrstr));
  169. hio_close(io);
  170. }
  171. static void reconnect_timer_cb(htimer_t* timer) {
  172. mqtt_client_t* cli = (mqtt_client_t*)hevent_userdata(timer);
  173. if (cli == NULL) return;
  174. cli->timer = NULL;
  175. mqtt_client_reconnect(cli);
  176. }
  177. static void on_close(hio_t* io) {
  178. mqtt_client_t* cli = (mqtt_client_t*)hevent_userdata(io);
  179. cli->connected = 0;
  180. if (cli->cb) {
  181. cli->head.type = MQTT_TYPE_DISCONNECT;
  182. cli->cb(cli, cli->head.type);
  183. }
  184. // reconnect
  185. if (cli->reconn_setting && reconn_setting_can_retry(cli->reconn_setting)) {
  186. uint32_t delay = reconn_setting_calc_delay(cli->reconn_setting);
  187. cli->timer = htimer_add(cli->loop, reconnect_timer_cb, delay, 1);
  188. hevent_set_userdata(cli->timer, cli);
  189. }
  190. }
  191. static void on_packet(hio_t* io, void* buf, int len) {
  192. mqtt_client_t* cli = (mqtt_client_t*)hevent_userdata(io);
  193. unsigned char* p = (unsigned char*)buf;
  194. unsigned char* end = p + len;
  195. memset(&cli->head, 0, sizeof(mqtt_head_t));
  196. int headlen = mqtt_head_unpack(&cli->head, p, len);
  197. if (headlen <= 0) return;
  198. p += headlen;
  199. switch (cli->head.type) {
  200. // case MQTT_TYPE_CONNECT:
  201. case MQTT_TYPE_CONNACK:
  202. {
  203. if (cli->head.length < 2) {
  204. hloge("MQTT CONNACK malformed!");
  205. hio_close(io);
  206. return;
  207. }
  208. unsigned char conn_flags = 0, rc = 0;
  209. POP8(p, conn_flags);
  210. POP8(p, rc);
  211. if (rc != MQTT_CONNACK_ACCEPTED) {
  212. cli->error = rc;
  213. hloge("MQTT CONNACK error=%d", cli->error);
  214. hio_close(io);
  215. return;
  216. }
  217. cli->connected = 1;
  218. if (cli->timer) {
  219. htimer_del(cli->timer);
  220. cli->timer = NULL;
  221. }
  222. if (cli->keepalive) {
  223. cli->ping_cnt = 0;
  224. hio_set_heartbeat(io, cli->keepalive * 1000, mqtt_send_ping);
  225. }
  226. }
  227. break;
  228. case MQTT_TYPE_PUBLISH:
  229. {
  230. if (cli->head.length < 2) {
  231. hloge("MQTT PUBLISH malformed!");
  232. hio_close(io);
  233. return;
  234. }
  235. memset(&cli->message, 0, sizeof(mqtt_message_t));
  236. POP16(p, cli->message.topic_len);
  237. if (end - p < cli->message.topic_len) {
  238. hloge("MQTT PUBLISH malformed!");
  239. hio_close(io);
  240. return;
  241. }
  242. // NOTE: Not deep copy
  243. cli->message.topic = (char*)p;
  244. p += cli->message.topic_len;
  245. if (cli->head.qos > 0) {
  246. if (end - p < 2) {
  247. hloge("MQTT PUBLISH malformed!");
  248. hio_close(io);
  249. return;
  250. }
  251. POP16(p, cli->mid);
  252. }
  253. cli->message.payload_len = end - p;
  254. if (cli->message.payload_len > 0) {
  255. // NOTE: Not deep copy
  256. cli->message.payload = (char*)p;
  257. }
  258. cli->message.qos = cli->head.qos;
  259. if (cli->message.qos == 0) {
  260. // Do nothing
  261. } else if (cli->message.qos == 1) {
  262. mqtt_send_head_with_mid(io, MQTT_TYPE_PUBACK, cli->mid);
  263. } else if (cli->message.qos == 2) {
  264. mqtt_send_head_with_mid(io, MQTT_TYPE_PUBREC, cli->mid);
  265. }
  266. }
  267. break;
  268. case MQTT_TYPE_PUBACK:
  269. case MQTT_TYPE_PUBREC:
  270. case MQTT_TYPE_PUBREL:
  271. case MQTT_TYPE_PUBCOMP:
  272. {
  273. if (cli->head.length < 2) {
  274. hloge("MQTT PUBACK malformed!");
  275. hio_close(io);
  276. return;
  277. }
  278. POP16(p, cli->mid);
  279. if (cli->head.type == MQTT_TYPE_PUBREC) {
  280. mqtt_send_head_with_mid(io, MQTT_TYPE_PUBREL, cli->mid);
  281. } else if (cli->head.type == MQTT_TYPE_PUBREL) {
  282. mqtt_send_head_with_mid(io, MQTT_TYPE_PUBCOMP, cli->mid);
  283. }
  284. }
  285. break;
  286. // case MQTT_TYPE_SUBSCRIBE:
  287. // break;
  288. case MQTT_TYPE_SUBACK:
  289. {
  290. if (cli->head.length < 2) {
  291. hloge("MQTT SUBACK malformed!");
  292. hio_close(io);
  293. return;
  294. }
  295. POP16(p, cli->mid);
  296. }
  297. break;
  298. // case MQTT_TYPE_UNSUBSCRIBE:
  299. // break;
  300. case MQTT_TYPE_UNSUBACK:
  301. {
  302. if (cli->head.length < 2) {
  303. hloge("MQTT UNSUBACK malformed!");
  304. hio_close(io);
  305. return;
  306. }
  307. POP16(p, cli->mid);
  308. }
  309. break;
  310. case MQTT_TYPE_PINGREQ:
  311. // printf("recv ping\n");
  312. // printf("send pong\n");
  313. mqtt_send_pong(io);
  314. return;
  315. case MQTT_TYPE_PINGRESP:
  316. // printf("recv pong\n");
  317. cli->ping_cnt = 0;
  318. return;
  319. case MQTT_TYPE_DISCONNECT:
  320. hio_close(io);
  321. return;
  322. default:
  323. hloge("MQTT client received wrong type=%d", (int)cli->head.type);
  324. hio_close(io);
  325. return;
  326. }
  327. if (cli->cb) {
  328. cli->cb(cli, cli->head.type);
  329. }
  330. }
  331. static void on_connect(hio_t* io) {
  332. mqtt_client_t* cli = (mqtt_client_t*)hevent_userdata(io);
  333. if (cli->cb) {
  334. cli->head.type = MQTT_TYPE_CONNECT;
  335. cli->cb(cli, cli->head.type);
  336. }
  337. if (cli->reconn_setting) {
  338. reconn_setting_reset(cli->reconn_setting);
  339. }
  340. static unpack_setting_t mqtt_unpack_setting;
  341. mqtt_unpack_setting.mode = UNPACK_BY_LENGTH_FIELD;
  342. mqtt_unpack_setting.package_max_length = DEFAULT_MQTT_PACKAGE_MAX_LENGTH;
  343. mqtt_unpack_setting.body_offset = 2;
  344. mqtt_unpack_setting.length_field_offset = 1;
  345. mqtt_unpack_setting.length_field_bytes = 1;
  346. mqtt_unpack_setting.length_field_coding = ENCODE_BY_VARINT;
  347. hio_set_unpack(io, &mqtt_unpack_setting);
  348. // start recv packet
  349. hio_setcb_read(io, on_packet);
  350. hio_read(io);
  351. mqtt_client_login(cli);
  352. }
  353. mqtt_client_t* mqtt_client_new(hloop_t* loop) {
  354. if (loop == NULL) {
  355. loop = hloop_new(HLOOP_FLAG_AUTO_FREE);
  356. if (loop == NULL) return NULL;
  357. }
  358. mqtt_client_t* cli = NULL;
  359. HV_ALLOC_SIZEOF(cli);
  360. if (cli == NULL) return NULL;
  361. cli->loop = loop;
  362. cli->protocol_version = MQTT_PROTOCOL_V311;
  363. cli->keepalive = DEFAULT_MQTT_KEEPALIVE;
  364. hmutex_init(&cli->mutex_);
  365. return cli;
  366. }
  367. void mqtt_client_free(mqtt_client_t* cli) {
  368. if (!cli) return;
  369. hmutex_destroy(&cli->mutex_);
  370. if (cli->ssl_ctx && cli->alloced_ssl_ctx) {
  371. hssl_ctx_free(cli->ssl_ctx);
  372. cli->ssl_ctx = NULL;
  373. }
  374. HV_FREE(cli->reconn_setting);
  375. HV_FREE(cli->will);
  376. HV_FREE(cli);
  377. }
  378. void mqtt_client_run (mqtt_client_t* cli) {
  379. if (!cli || !cli->loop) return;
  380. hloop_run(cli->loop);
  381. }
  382. void mqtt_client_stop(mqtt_client_t* cli) {
  383. if (!cli || !cli->loop) return;
  384. hloop_stop(cli->loop);
  385. }
  386. void mqtt_client_set_id(mqtt_client_t* cli, const char* id) {
  387. if (!cli || !id) return;
  388. hv_strncpy(cli->client_id, id, sizeof(cli->client_id));
  389. }
  390. void mqtt_client_set_will(mqtt_client_t* cli, mqtt_message_t* will) {
  391. if (!cli || !will) return;
  392. if (cli->will == NULL) {
  393. HV_ALLOC_SIZEOF(cli->will);
  394. }
  395. memcpy(cli->will, will, sizeof(mqtt_message_t));
  396. }
  397. void mqtt_client_set_auth(mqtt_client_t* cli, const char* username, const char* password) {
  398. if (!cli) return;
  399. if (username) {
  400. hv_strncpy(cli->username, username, sizeof(cli->username));
  401. }
  402. if (password) {
  403. hv_strncpy(cli->password, password, sizeof(cli->password));
  404. }
  405. }
  406. void mqtt_client_set_callback(mqtt_client_t* cli, mqtt_client_cb cb) {
  407. if (!cli) return;
  408. cli->cb = cb;
  409. }
  410. void mqtt_client_set_userdata(mqtt_client_t* cli, void* userdata) {
  411. if (!cli) return;
  412. cli->userdata = userdata;
  413. }
  414. void* mqtt_client_get_userdata(mqtt_client_t* cli) {
  415. if (!cli) return NULL;
  416. return cli->userdata;
  417. }
  418. int mqtt_client_get_last_error(mqtt_client_t* cli) {
  419. if (!cli) return -1;
  420. return cli->error;
  421. }
  422. int mqtt_client_set_ssl_ctx(mqtt_client_t* cli, hssl_ctx_t ssl_ctx) {
  423. cli->ssl_ctx = ssl_ctx;
  424. return 0;
  425. }
  426. int mqtt_client_new_ssl_ctx(mqtt_client_t* cli, hssl_ctx_opt_t* opt) {
  427. opt->endpoint = HSSL_CLIENT;
  428. hssl_ctx_t ssl_ctx = hssl_ctx_new(opt);
  429. if (ssl_ctx == NULL) return ERR_NEW_SSL_CTX;
  430. cli->alloced_ssl_ctx = true;
  431. return mqtt_client_set_ssl_ctx(cli, ssl_ctx);
  432. }
  433. int mqtt_client_set_reconnect(mqtt_client_t* cli, reconn_setting_t* reconn) {
  434. if (reconn == NULL) {
  435. HV_FREE(cli->reconn_setting);
  436. return 0;
  437. }
  438. if (cli->reconn_setting == NULL) {
  439. HV_ALLOC_SIZEOF(cli->reconn_setting);
  440. }
  441. *cli->reconn_setting = *reconn;
  442. return 0;
  443. }
  444. int mqtt_client_reconnect(mqtt_client_t* cli) {
  445. mqtt_client_connect(cli, cli->host, cli->port, cli->ssl);
  446. return 0;
  447. }
  448. void mqtt_client_set_connect_timeout(mqtt_client_t* cli, int ms) {
  449. cli->connect_timeout = ms;
  450. }
  451. void mqtt_client_set_host(mqtt_client_t* cli, const char* host, int port, int ssl) {
  452. hv_strncpy(cli->host, host, sizeof(cli->host));
  453. cli->port = port;
  454. cli->ssl = ssl;
  455. }
  456. int mqtt_client_connect(mqtt_client_t* cli, const char* host, int port, int ssl) {
  457. if (!cli) return -1;
  458. hv_strncpy(cli->host, host, sizeof(cli->host));
  459. cli->port = port;
  460. cli->ssl = ssl;
  461. hio_t* io = hio_create_socket(cli->loop, host, port, HIO_TYPE_TCP, HIO_CLIENT_SIDE);
  462. if (io == NULL) return -1;
  463. if (ssl) {
  464. if (cli->ssl_ctx) {
  465. hio_set_ssl_ctx(io, cli->ssl_ctx);
  466. }
  467. hio_enable_ssl(io);
  468. }
  469. cli->io = io;
  470. hevent_set_userdata(io, cli);
  471. hio_setcb_connect(io, on_connect);
  472. hio_setcb_close(io, on_close);
  473. if (cli->connect_timeout > 0) {
  474. cli->timer = htimer_add(cli->loop, connect_timeout_cb, cli->connect_timeout, 1);
  475. hevent_set_userdata(cli->timer, cli);
  476. }
  477. return hio_connect(io);
  478. }
  479. bool mqtt_client_is_connected(mqtt_client_t* cli) {
  480. return cli && cli->connected;
  481. }
  482. int mqtt_client_disconnect(mqtt_client_t* cli) {
  483. if (!cli || !cli->io) return -1;
  484. // cancel reconnect first
  485. mqtt_client_set_reconnect(cli, NULL);
  486. mqtt_send_disconnect(cli->io);
  487. return hio_close(cli->io);
  488. }
  489. int mqtt_client_publish(mqtt_client_t* cli, mqtt_message_t* msg) {
  490. if (!cli || !cli->io || !msg) return -1;
  491. if (!cli->connected) return -2;
  492. int topic_len = msg->topic_len ? msg->topic_len : strlen(msg->topic);
  493. int payload_len = msg->payload_len ? msg->payload_len : strlen(msg->payload);
  494. int len = 2 + topic_len + payload_len;
  495. if (msg->qos > 0) len += 2; // mid
  496. unsigned short mid = 0;
  497. mqtt_head_t head;
  498. memset(&head, 0, sizeof(head));
  499. head.type = MQTT_TYPE_PUBLISH;
  500. head.qos = msg->qos & 3;
  501. head.retain = msg->retain;
  502. head.length = len;
  503. int buflen = mqtt_estimate_length(&head);
  504. // NOTE: send payload alone
  505. buflen -= payload_len;
  506. unsigned char* buf = NULL;
  507. HV_STACK_ALLOC(buf, buflen);
  508. unsigned char* p = buf;
  509. int headlen = mqtt_head_pack(&head, p);
  510. p += headlen;
  511. PUSH16(p, topic_len);
  512. PUSH_N(p, msg->topic, topic_len);
  513. if (msg->qos) {
  514. mid = mqtt_next_mid();
  515. PUSH16(p, mid);
  516. }
  517. hmutex_lock(&cli->mutex_);
  518. // send head + topic + mid
  519. int nwrite = hio_write(cli->io, buf, p - buf);
  520. HV_STACK_FREE(buf);
  521. if (nwrite < 0) {
  522. goto unlock;
  523. }
  524. // send payload
  525. nwrite = hio_write(cli->io, msg->payload, payload_len);
  526. unlock:
  527. hmutex_unlock(&cli->mutex_);
  528. return nwrite < 0 ? nwrite : mid;
  529. }
  530. int mqtt_client_subscribe(mqtt_client_t* cli, const char* topic, int qos) {
  531. if (!cli || !cli->io || !topic) return -1;
  532. if (!cli->connected) return -2;
  533. int topic_len = strlen(topic);
  534. int len = 2 + 2 + topic_len + 1;
  535. mqtt_head_t head;
  536. memset(&head, 0, sizeof(head));
  537. head.type = MQTT_TYPE_SUBSCRIBE;
  538. head.qos = 1;
  539. head.length = len;
  540. int buflen = mqtt_estimate_length(&head);
  541. unsigned char* buf = NULL;
  542. HV_STACK_ALLOC(buf, buflen);
  543. unsigned char* p = buf;
  544. int headlen = mqtt_head_pack(&head, p);
  545. p += headlen;
  546. unsigned short mid = mqtt_next_mid();
  547. PUSH16(p, mid);
  548. PUSH16(p, topic_len);
  549. PUSH_N(p, topic, topic_len);
  550. PUSH8(p, qos & 3);
  551. // send head + mid + topic + qos
  552. int nwrite = mqtt_client_send(cli, buf, p - buf);
  553. HV_STACK_FREE(buf);
  554. return nwrite < 0 ? nwrite : mid;
  555. }
  556. int mqtt_client_unsubscribe(mqtt_client_t* cli, const char* topic) {
  557. if (!cli || !cli->io || !topic) return -1;
  558. if (!cli->connected) return -2;
  559. int topic_len = strlen(topic);
  560. int len = 2 + 2 + topic_len;
  561. mqtt_head_t head;
  562. memset(&head, 0, sizeof(head));
  563. head.type = MQTT_TYPE_UNSUBSCRIBE;
  564. head.qos = 1;
  565. head.length = len;
  566. int buflen = mqtt_estimate_length(&head);
  567. unsigned char* buf = NULL;
  568. HV_STACK_ALLOC(buf, buflen);
  569. unsigned char* p = buf;
  570. int headlen = mqtt_head_pack(&head, p);
  571. p += headlen;
  572. unsigned short mid = mqtt_next_mid();
  573. PUSH16(p, mid);
  574. PUSH16(p, topic_len);
  575. PUSH_N(p, topic, topic_len);
  576. // send head + mid + topic
  577. int nwrite = mqtt_client_send(cli, buf, p - buf);
  578. HV_STACK_FREE(buf);
  579. return nwrite < 0 ? nwrite : mid;
  580. }