1
0

mqtt_client.c 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508
  1. #include "mqtt_client.h"
  2. #include "hbase.h"
  3. #include "hlog.h"
  4. #include "hendian.h"
  5. static unsigned short mqtt_next_mid() {
  6. static unsigned short s_mid = 0;
  7. return ++s_mid;
  8. }
  9. static int mqtt_send_head(hio_t* io, int type, int length) {
  10. mqtt_head_t head;
  11. memset(&head, 0, sizeof(head));
  12. head.type = type;
  13. head.length = length;
  14. unsigned char headbuf[8] = { 0 };
  15. int headlen = mqtt_head_pack(&head, headbuf);
  16. return hio_write(io, headbuf, headlen);
  17. }
  18. static int mqtt_send_head_with_mid(hio_t* io, int type, unsigned short mid) {
  19. mqtt_head_t head;
  20. memset(&head, 0, sizeof(head));
  21. head.type = type;
  22. if (head.type == MQTT_TYPE_PUBREL) {
  23. head.qos = 1;
  24. }
  25. head.length = 2;
  26. unsigned char headbuf[8] = { 0 };
  27. unsigned char* p = headbuf;
  28. int headlen = mqtt_head_pack(&head, p);
  29. p += headlen;
  30. PUSH16(p, mid);
  31. return hio_write(io, headbuf, headlen + 2);
  32. }
  33. static void mqtt_send_ping(hio_t* io) {
  34. mqtt_send_head(io, MQTT_TYPE_PINGREQ, 0);
  35. }
  36. static void mqtt_send_pong(hio_t* io) {
  37. mqtt_send_head(io, MQTT_TYPE_PINGRESP, 0);
  38. }
  39. static void mqtt_send_disconnect(hio_t* io) {
  40. mqtt_send_head(io, MQTT_TYPE_DISCONNECT, 0);
  41. }
  42. static int mqtt_client_login(mqtt_client_t* cli) {
  43. int len = MQTT_CONN_HEAD_LEN;
  44. unsigned short cid_len = 0,
  45. will_topic_len = 0,
  46. will_payload_len = 0,
  47. username_len = 0,
  48. password_len = 0;
  49. unsigned char conn_flags = 0;
  50. if (*cli->client_id) {
  51. cid_len = strlen(cli->client_id);
  52. len += cid_len;
  53. }
  54. if (cid_len == 0) cli->clean_session = 1;
  55. if (cli->clean_session) {
  56. conn_flags |= MQTT_CONN_CLEAN_SESSION;
  57. }
  58. if (cli->will && cli->will->topic && cli->will->payload) {
  59. will_topic_len = cli->will->topic_len ? cli->will->topic_len : strlen(cli->will->topic);
  60. will_payload_len = cli->will->payload_len ? cli->will->payload_len : strlen(cli->will->payload);
  61. if (will_topic_len && will_payload_len) {
  62. conn_flags |= MQTT_CONN_HAS_WILL;
  63. conn_flags |= ((cli->will->qos & 3) << 3);
  64. if (cli->will->retain) {
  65. conn_flags |= MQTT_CONN_WILL_RETAIN;
  66. }
  67. len += 2 + will_topic_len;
  68. len += 2 + will_payload_len;
  69. }
  70. }
  71. if (*cli->username) {
  72. username_len = strlen(cli->username);
  73. if (username_len) {
  74. conn_flags |= MQTT_CONN_HAS_USERNAME;
  75. len += 2 + username_len;
  76. }
  77. }
  78. if (*cli->password) {
  79. password_len = strlen(cli->password);
  80. if (password_len) {
  81. conn_flags |= MQTT_CONN_HAS_PASSWORD;
  82. len += 2 + password_len;
  83. }
  84. }
  85. mqtt_head_t head;
  86. memset(&head, 0, sizeof(head));
  87. head.type = MQTT_TYPE_CONNECT;
  88. head.length = len;
  89. int buflen = mqtt_estimate_length(&head);
  90. unsigned char* buf = NULL;
  91. HV_STACK_ALLOC(buf, buflen);
  92. unsigned char* p = buf;
  93. int headlen = mqtt_head_pack(&head, p);
  94. p += headlen;
  95. // TODO: just implement MQTT_PROTOCOL_V311
  96. PUSH16(p, 4);
  97. PUSH_N(p, "MQTT", 4);
  98. PUSH8(p, MQTT_PROTOCOL_V311);
  99. PUSH8(p, conn_flags);
  100. PUSH16(p, cli->keepalive);
  101. PUSH16(p, cid_len);
  102. if (cid_len > 0) {
  103. PUSH_N(p, cli->client_id, cid_len);
  104. }
  105. if (conn_flags & MQTT_CONN_HAS_WILL) {
  106. PUSH16(p, will_topic_len);
  107. PUSH_N(p, cli->will->topic, will_topic_len);
  108. PUSH16(p, will_payload_len);
  109. PUSH_N(p, cli->will->payload, will_payload_len);
  110. }
  111. if (conn_flags & MQTT_CONN_HAS_USERNAME) {
  112. PUSH16(p, username_len);
  113. PUSH_N(p, cli->username, username_len);
  114. }
  115. if (conn_flags & MQTT_CONN_HAS_PASSWORD) {
  116. PUSH16(p, password_len);
  117. PUSH_N(p, cli->password, password_len);
  118. }
  119. int nwrite = hio_write(cli->io, buf, p - buf);
  120. HV_STACK_FREE(buf);
  121. return nwrite < 0 ? nwrite : 0;
  122. }
  123. static void on_close(hio_t* io) {
  124. mqtt_client_t* cli = (mqtt_client_t*)hevent_userdata(io);
  125. if (cli->cb) {
  126. cli->head.type = MQTT_TYPE_DISCONNECT;
  127. cli->cb(cli, cli->head.type);
  128. }
  129. }
  130. static void on_packet(hio_t* io, void* buf, int len) {
  131. mqtt_client_t* cli = (mqtt_client_t*)hevent_userdata(io);
  132. unsigned char* p = (unsigned char*)buf;
  133. unsigned char* end = p + len;
  134. memset(&cli->head, 0, sizeof(mqtt_head_t));
  135. int headlen = mqtt_head_unpack(&cli->head, p, len);
  136. if (headlen <= 0) return;
  137. p += headlen;
  138. switch (cli->head.type) {
  139. // case MQTT_TYPE_CONNECT:
  140. case MQTT_TYPE_CONNACK:
  141. {
  142. if (cli->head.length < 2) {
  143. hloge("MQTT CONNACK malformed!");
  144. hio_close(io);
  145. return;
  146. }
  147. unsigned char conn_flags = 0, rc = 0;
  148. POP8(p, conn_flags);
  149. POP8(p, rc);
  150. if (rc != MQTT_CONNACK_ACCEPTED) {
  151. cli->error = rc;
  152. hloge("MQTT CONNACK error=%d", cli->error);
  153. hio_close(io);
  154. return;
  155. }
  156. if (cli->keepalive) {
  157. hio_set_heartbeat(io, cli->keepalive * 1000, mqtt_send_ping);
  158. }
  159. }
  160. break;
  161. case MQTT_TYPE_PUBLISH:
  162. {
  163. if (cli->head.length < 2) {
  164. hloge("MQTT PUBLISH malformed!");
  165. hio_close(io);
  166. return;
  167. }
  168. memset(&cli->message, 0, sizeof(mqtt_message_t));
  169. POP16(p, cli->message.topic_len);
  170. if (end - p < cli->message.topic_len) {
  171. hloge("MQTT PUBLISH malformed!");
  172. hio_close(io);
  173. return;
  174. }
  175. // NOTE: Not deep copy
  176. cli->message.topic = (char*)p;
  177. p += cli->message.topic_len;
  178. if (cli->head.qos > 0) {
  179. if (end - p < 2) {
  180. hloge("MQTT PUBLISH malformed!");
  181. hio_close(io);
  182. return;
  183. }
  184. POP16(p, cli->mid);
  185. }
  186. cli->message.payload_len = end - p;
  187. if (cli->message.payload_len > 0) {
  188. // NOTE: Not deep copy
  189. cli->message.payload = (char*)p;
  190. }
  191. cli->message.qos = cli->head.qos;
  192. if (cli->message.qos == 0) {
  193. // Do nothing
  194. } else if (cli->message.qos == 1) {
  195. mqtt_send_head_with_mid(io, MQTT_TYPE_PUBACK, cli->mid);
  196. } else if (cli->message.qos == 2) {
  197. mqtt_send_head_with_mid(io, MQTT_TYPE_PUBREC, cli->mid);
  198. }
  199. }
  200. break;
  201. case MQTT_TYPE_PUBACK:
  202. case MQTT_TYPE_PUBREC:
  203. case MQTT_TYPE_PUBREL:
  204. case MQTT_TYPE_PUBCOMP:
  205. {
  206. if (cli->head.length < 2) {
  207. hloge("MQTT PUBACK malformed!");
  208. hio_close(io);
  209. return;
  210. }
  211. POP16(p, cli->mid);
  212. if (cli->head.type == MQTT_TYPE_PUBREC) {
  213. mqtt_send_head_with_mid(io, MQTT_TYPE_PUBREL, cli->mid);
  214. } else if (cli->head.type == MQTT_TYPE_PUBREL) {
  215. mqtt_send_head_with_mid(io, MQTT_TYPE_PUBCOMP, cli->mid);
  216. }
  217. }
  218. break;
  219. // case MQTT_TYPE_SUBSCRIBE:
  220. // break;
  221. case MQTT_TYPE_SUBACK:
  222. {
  223. if (cli->head.length < 2) {
  224. hloge("MQTT SUBACK malformed!");
  225. hio_close(io);
  226. return;
  227. }
  228. POP16(p, cli->mid);
  229. }
  230. break;
  231. // case MQTT_TYPE_UNSUBSCRIBE:
  232. // break;
  233. case MQTT_TYPE_UNSUBACK:
  234. {
  235. if (cli->head.length < 2) {
  236. hloge("MQTT UNSUBACK malformed!");
  237. hio_close(io);
  238. return;
  239. }
  240. POP16(p, cli->mid);
  241. }
  242. break;
  243. case MQTT_TYPE_PINGREQ:
  244. mqtt_send_pong(io);
  245. return;
  246. case MQTT_TYPE_PINGRESP:
  247. return;
  248. case MQTT_TYPE_DISCONNECT:
  249. hio_close(io);
  250. return;
  251. default:
  252. hloge("MQTT client received wrong type=%d", (int)cli->head.type);
  253. hio_close(io);
  254. return;
  255. }
  256. if (cli->cb) {
  257. cli->cb(cli, cli->head.type);
  258. }
  259. }
  260. static void on_connect(hio_t* io) {
  261. mqtt_client_t* cli = (mqtt_client_t*)hevent_userdata(io);
  262. if (cli->cb) {
  263. cli->head.type = MQTT_TYPE_CONNECT;
  264. cli->cb(cli, cli->head.type);
  265. }
  266. static unpack_setting_t mqtt_unpack_setting;
  267. mqtt_unpack_setting.mode = UNPACK_BY_LENGTH_FIELD;
  268. mqtt_unpack_setting.package_max_length = DEFAULT_MQTT_PACKAGE_MAX_LENGTH;
  269. mqtt_unpack_setting.body_offset = 2;
  270. mqtt_unpack_setting.length_field_offset = 1;
  271. mqtt_unpack_setting.length_field_bytes = 1;
  272. mqtt_unpack_setting.length_field_coding = ENCODE_BY_VARINT;
  273. hio_set_unpack(io, &mqtt_unpack_setting);
  274. // start recv packet
  275. hio_setcb_read(io, on_packet);
  276. hio_read(io);
  277. mqtt_client_login(cli);
  278. }
  279. mqtt_client_t* mqtt_client_new(hloop_t* loop) {
  280. if (loop == NULL) {
  281. loop = hloop_new(HLOOP_FLAG_AUTO_FREE);
  282. if (loop == NULL) return NULL;
  283. }
  284. mqtt_client_t* cli = NULL;
  285. HV_ALLOC_SIZEOF(cli);
  286. if (cli == NULL) return NULL;
  287. cli->loop = loop;
  288. cli->keepalive = DEFAULT_MQTT_KEEPALIVE;
  289. hmutex_init(&cli->mutex_);
  290. return cli;
  291. }
  292. void mqtt_client_free(mqtt_client_t* cli) {
  293. if (!cli) return;
  294. hmutex_destroy(&cli->mutex_);
  295. if (cli->ssl_ctx && cli->alloced_ssl_ctx) {
  296. hssl_ctx_free(cli->ssl_ctx);
  297. cli->ssl_ctx = NULL;
  298. }
  299. HV_FREE(cli->will);
  300. HV_FREE(cli);
  301. }
  302. void mqtt_client_run (mqtt_client_t* cli) {
  303. if (!cli || !cli->loop) return;
  304. hloop_run(cli->loop);
  305. }
  306. void mqtt_client_stop(mqtt_client_t* cli) {
  307. if (!cli || !cli->loop) return;
  308. hloop_stop(cli->loop);
  309. }
  310. void mqtt_client_set_id(mqtt_client_t* cli, const char* id) {
  311. if (!cli || !id) return;
  312. safe_strncpy(cli->client_id, id, sizeof(cli->client_id));
  313. }
  314. void mqtt_client_set_will(mqtt_client_t* cli, mqtt_message_t* will) {
  315. if (!cli || !will) return;
  316. if (cli->will == NULL) {
  317. HV_ALLOC_SIZEOF(cli->will);
  318. }
  319. memcpy(cli->will, will, sizeof(mqtt_message_t));
  320. }
  321. void mqtt_client_set_auth(mqtt_client_t* cli, const char* username, const char* password) {
  322. if (!cli) return;
  323. if (username) {
  324. safe_strncpy(cli->username, username, sizeof(cli->username));
  325. }
  326. if (password) {
  327. safe_strncpy(cli->password, password, sizeof(cli->password));
  328. }
  329. }
  330. void mqtt_client_set_callback(mqtt_client_t* cli, mqtt_client_cb cb) {
  331. if (!cli) return;
  332. cli->cb = cb;
  333. }
  334. void mqtt_client_set_userdata(mqtt_client_t* cli, void* userdata) {
  335. if (!cli) return;
  336. cli->userdata = userdata;
  337. }
  338. void* mqtt_client_get_userdata(mqtt_client_t* cli) {
  339. if (!cli) return NULL;
  340. return cli->userdata;
  341. }
  342. int mqtt_client_get_last_error(mqtt_client_t* cli) {
  343. if (!cli) return -1;
  344. return cli->error;
  345. }
  346. int mqtt_client_set_ssl_ctx(mqtt_client_t* cli, hssl_ctx_t ssl_ctx) {
  347. cli->ssl_ctx = ssl_ctx;
  348. return 0;
  349. }
  350. int mqtt_client_new_ssl_ctx(mqtt_client_t* cli, hssl_ctx_opt_t* opt) {
  351. opt->endpoint = HSSL_CLIENT;
  352. hssl_ctx_t ssl_ctx = hssl_ctx_new(opt);
  353. if (ssl_ctx == NULL) return HSSL_ERROR;
  354. cli->alloced_ssl_ctx = true;
  355. return mqtt_client_set_ssl_ctx(cli, ssl_ctx);
  356. }
  357. int mqtt_client_connect(mqtt_client_t* cli, const char* host, int port, int ssl) {
  358. if (!cli) return -1;
  359. hio_t* io = hio_create_socket(cli->loop, host, port, HIO_TYPE_TCP, HIO_CLIENT_SIDE);
  360. if (io == NULL) return -1;
  361. if (ssl) {
  362. cli->ssl = 1;
  363. if (cli->ssl_ctx) {
  364. hio_set_ssl_ctx(io, cli->ssl_ctx);
  365. }
  366. hio_enable_ssl(io);
  367. }
  368. cli->io = io;
  369. hevent_set_userdata(io, cli);
  370. hio_setcb_connect(io, on_connect);
  371. hio_setcb_close(io, on_close);
  372. return hio_connect(io);
  373. }
  374. int mqtt_client_disconnect(mqtt_client_t* cli) {
  375. if (!cli || !cli->io) return -1;
  376. mqtt_send_disconnect(cli->io);
  377. return hio_close(cli->io);
  378. }
  379. int mqtt_client_publish(mqtt_client_t* cli, mqtt_message_t* msg) {
  380. if (!cli || !cli->io || !msg) return -1;
  381. int topic_len = msg->topic_len ? msg->topic_len : strlen(msg->topic);
  382. int payload_len = msg->payload_len ? msg->payload_len : strlen(msg->payload);
  383. int len = 2 + topic_len + payload_len;
  384. if (msg->qos > 0) len += 2; // mid
  385. unsigned short mid = 0;
  386. mqtt_head_t head;
  387. memset(&head, 0, sizeof(head));
  388. head.type = MQTT_TYPE_PUBLISH;
  389. head.qos = msg->qos & 3;
  390. head.retain = msg->retain;
  391. head.length = len;
  392. int buflen = mqtt_estimate_length(&head);
  393. // NOTE: send payload alone
  394. buflen -= payload_len;
  395. unsigned char* buf = NULL;
  396. HV_STACK_ALLOC(buf, buflen);
  397. unsigned char* p = buf;
  398. int headlen = mqtt_head_pack(&head, p);
  399. p += headlen;
  400. PUSH16(p, topic_len);
  401. PUSH_N(p, msg->topic, topic_len);
  402. if (msg->qos) {
  403. mid = mqtt_next_mid();
  404. PUSH16(p, mid);
  405. }
  406. hmutex_lock(&cli->mutex_);
  407. // send head + topic + mid
  408. int nwrite = hio_write(cli->io, buf, p - buf);
  409. HV_STACK_FREE(buf);
  410. if (nwrite < 0) {
  411. goto unlock;
  412. }
  413. // send payload
  414. nwrite = hio_write(cli->io, msg->payload, payload_len);
  415. unlock:
  416. hmutex_unlock(&cli->mutex_);
  417. return nwrite < 0 ? nwrite : mid;
  418. }
  419. int mqtt_client_subscribe(mqtt_client_t* cli, const char* topic, int qos) {
  420. if (!cli || !cli->io || !topic) return -1;
  421. int topic_len = strlen(topic);
  422. int len = 2 + 2 + topic_len + 1;
  423. mqtt_head_t head;
  424. memset(&head, 0, sizeof(head));
  425. head.type = MQTT_TYPE_SUBSCRIBE;
  426. head.qos = 1;
  427. head.length = len;
  428. int buflen = mqtt_estimate_length(&head);
  429. unsigned char* buf = NULL;
  430. HV_STACK_ALLOC(buf, buflen);
  431. unsigned char* p = buf;
  432. int headlen = mqtt_head_pack(&head, p);
  433. p += headlen;
  434. unsigned short mid = mqtt_next_mid();
  435. PUSH16(p, mid);
  436. PUSH16(p, topic_len);
  437. PUSH_N(p, topic, topic_len);
  438. PUSH8(p, qos & 3);
  439. // send head + mid + topic + qos
  440. int nwrite = hio_write(cli->io, buf, p - buf);
  441. HV_STACK_FREE(buf);
  442. return nwrite < 0 ? nwrite : mid;
  443. }
  444. int mqtt_client_unsubscribe(mqtt_client_t* cli, const char* topic) {
  445. if (!cli || !cli->io || !topic) return -1;
  446. int topic_len = strlen(topic);
  447. int len = 2 + 2 + topic_len;
  448. mqtt_head_t head;
  449. memset(&head, 0, sizeof(head));
  450. head.type = MQTT_TYPE_UNSUBSCRIBE;
  451. head.qos = 1;
  452. head.length = len;
  453. int buflen = mqtt_estimate_length(&head);
  454. unsigned char* buf = NULL;
  455. HV_STACK_ALLOC(buf, buflen);
  456. unsigned char* p = buf;
  457. int headlen = mqtt_head_pack(&head, p);
  458. p += headlen;
  459. unsigned short mid = mqtt_next_mid();
  460. PUSH16(p, mid);
  461. PUSH16(p, topic_len);
  462. PUSH_N(p, topic, topic_len);
  463. // send head + mid + topic
  464. int nwrite = hio_write(cli->io, buf, p - buf);
  465. HV_STACK_FREE(buf);
  466. return nwrite < 0 ? nwrite : mid;
  467. }