mqtt_sub.c 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. /*
  2. * mqtt subscribe
  3. *
  4. * @build make examples
  5. * @sub bin/mqtt_sub 127.0.0.1 1883 topic
  6. * @pub bin/mqtt_pub 127.0.0.1 1883 topic payload
  7. *
  8. */
  9. #include "hv.h"
  10. #include "mqtt_client.h"
  11. /*
  12. * @test MQTTS
  13. * #define TEST_SSL 1
  14. *
  15. * @build ./configure --with-mqtt --with-openssl && make clean && make
  16. *
  17. */
  18. #define TEST_SSL 0
  19. #define TEST_AUTH 0
  20. #define TEST_RECONNECT 1
  21. /*
  22. * workflow:
  23. * mqtt_client_new -> mqtt_client_xxx -> mqtt_client_run
  24. *
  25. * mqtt_client_set_xxx ->
  26. * mqtt_client_connect ->
  27. * on_connack -> mqtt_client_subscribe ->
  28. * on_publish -> handle_message
  29. *
  30. */
  31. static void handle_message(mqtt_client_t* cli, mqtt_message_t* msg) {
  32. printf("topic: %.*s\n", msg->topic_len, msg->topic);
  33. printf("payload: %.*s\n", msg->payload_len, msg->payload);
  34. }
  35. static void on_mqtt(mqtt_client_t* cli, int type) {
  36. printf("on_mqtt type=%d\n", type);
  37. switch(type) {
  38. case MQTT_TYPE_CONNECT:
  39. printf("mqtt connected!\n");
  40. if (cli->reconn_setting && cli->reconn_setting->cur_retry_cnt > 0) {
  41. printf("mqtt reconnect cnt=%d, delay=%d\n", cli->reconn_setting->cur_retry_cnt, cli->reconn_setting->cur_delay);
  42. }
  43. break;
  44. case MQTT_TYPE_DISCONNECT:
  45. printf("mqtt disconnected!\n");
  46. if (cli->reconn_setting && cli->reconn_setting->cur_retry_cnt > 0) {
  47. printf("mqtt reconnect cnt=%d, delay=%d\n", cli->reconn_setting->cur_retry_cnt, cli->reconn_setting->cur_delay);
  48. }
  49. break;
  50. case MQTT_TYPE_CONNACK:
  51. printf("mqtt connack!\n");
  52. {
  53. const char* topic = (const char*)mqtt_client_get_userdata(cli);
  54. int mid = mqtt_client_subscribe(cli, topic, 0);
  55. printf("mqtt subscribe mid=%d\n", mid);
  56. }
  57. break;
  58. case MQTT_TYPE_SUBACK:
  59. printf("mqtt suback mid=%d\n", cli->mid);
  60. break;
  61. case MQTT_TYPE_PUBLISH:
  62. handle_message(cli, &cli->message);
  63. default:
  64. break;
  65. }
  66. }
  67. static int mqtt_subscribe(const char* host, int port, const char* topic) {
  68. mqtt_client_t* cli = mqtt_client_new(NULL);
  69. if (cli == NULL) return -1;
  70. cli->keepalive = 10;
  71. #if TEST_AUTH
  72. mqtt_client_set_auth(cli, "test", "123456");
  73. #endif
  74. mqtt_client_set_userdata(cli, (void*)topic);
  75. mqtt_client_set_callback(cli, on_mqtt);
  76. #if TEST_RECONNECT
  77. reconn_setting_t reconn;
  78. reconn_setting_init(&reconn);
  79. reconn.min_delay = 1000;
  80. reconn.max_delay = 10000;
  81. reconn.delay_policy = 2;
  82. mqtt_client_set_reconnect(cli, &reconn);
  83. #endif
  84. int ssl = 0;
  85. #if TEST_SSL
  86. ssl = 1;
  87. #endif
  88. mqtt_client_connect(cli, host, port, ssl);
  89. mqtt_client_run(cli);
  90. mqtt_client_free(cli);
  91. return 0;
  92. }
  93. int main(int argc, char** argv) {
  94. if (argc < 4) {
  95. printf("Usage: %s host port topic\n", argv[0]);
  96. return -10;
  97. }
  98. const char* host = argv[1];
  99. int port = atoi(argv[2]);
  100. const char* topic = argv[3];
  101. return mqtt_subscribe(host, port, topic);
  102. }