kawaiimqtt:
1 /* 5. wifi connect */ 2 rt_wlan_connect(WIFI_SSID, WIFI_KEY); 3 4 /* 6. startup mqtt client */ 5 mqtt_log_init(); 6 rt_snprintf(cid, sizeof(cid), "rtthread%d", rt_tick_get()); 7 /* check network connection status */ 8 9 net_dev = netdev_get_by_name("w0"); 10 while(!(netdev_is_internet_up(net_dev))) 11 { 12 rt_thread_mdelay(100); 13 timeout++; 14 if(timeout == 200) 15 { 16 rt_kprintf("wifi connect failed!\r\n"); 17 return -RT_ERROR; 18 } 19 } 20 21 client = mqtt_lease(); 22 23 mqtt_set_host(client, MQTT_URL); 24 mqtt_set_port(client, MQTT_PORT); 25 mqtt_set_user_name(client, "rt-thread"); 26 mqtt_set_password(client, "rt-thread"); 27 mqtt_set_client_id(client, cid); 28 mqtt_set_clean_session(client, 1); 29 30 if(mqtt_connect(client)) 31 { 32 KAWAII_MQTT_LOG_E("%s:%d %s()... mqtt connect failed...", __FILE__, __LINE__, __FUNCTION__); 33 is_started = 0; 34 return -RT_ERROR; 35 } 36 37 is_started = 1; 38 mqtt_subscribe(client, SUB1_NAME, QOS0, sub_topic_handle_led); 39 mqtt_subscribe(client, SUB2_NAME, QOS1, sub_topic_handle_num); 40 mqtt_subscribe(client, SUB_OTA, QOS2, sub_topic_handle_ota); 41 42 tid3 = rt_thread_create("mq_pub", mqtt_t_publish, RT_NULL, 2048, 13, 10); 43 if (tid3 != RT_NULL) 44 { 45 rt_thread_startup(tid3); 46 }View Code
1保证网络连接
2设置mqtt客户端:远端URL,端口,客户端ID,用户名,密码。。。
3连接远程mqtt服务器
4订阅主题:主题,Qos,收到主题时的响应(将主题名和主题内容分离出来,然后匹配响应)
1 typedef struct { 2 char **str; //the PChar of string array 3 size_t num; //the number of string 4 }IString; 5 6 /* 拆分字符串 */ 7 static int Split(char *src, char *delim, IString* istr)//split buf 8 { 9 int i; 10 char *str = NULL, *p = NULL; 11 12 (*istr).num = 1; 13 str = (char*)rt_calloc(strlen(src)+1,sizeof(char)); 14 if (str == NULL) return 0; 15 (*istr).str = (char**)rt_calloc(1,sizeof(char *)); 16 if ((*istr).str == NULL) return 0; 17 strcpy(str,src); 18 19 p = strtok(str, delim); 20 (*istr).str[0] = (char*)rt_calloc(strlen(p)+1,sizeof(char)); 21 if ((*istr).str[0] == NULL) return 0; 22 strcpy((*istr).str[0],p); 23 for(i = 1; (p = strtok(NULL, delim)); i++) 24 { 25 (*istr).num++; 26 (*istr).str = (char**)rt_realloc((*istr).str,(i+1)*sizeof(char *)); 27 if ((*istr).str == NULL) return 0; 28 (*istr).str[i] = (char*)rt_calloc(strlen(p)+1,sizeof(char)); 29 if ((*istr).str[0] == NULL) return 0; 30 strcpy((*istr).str[i],p); 31 } 32 rt_free(str); 33 str = p = NULL; 34 35 return 1; 36 } 37 static void sub_topic_handle_led(void* client, message_data_t* msg) 38 { 39 (void) client; 40 KAWAII_MQTT_LOG_I("-----------------------------------------------------------------------------------\r\n"); 41 KAWAII_MQTT_LOG_I("%s:%d %s()...\ntopic: %s\nmessage:%s\r\n", __FILE__, __LINE__, __FUNCTION__, msg->topic_name, (char*)msg->message->payload); 42 KAWAII_MQTT_LOG_I("-----------------------------------------------------------------------------------\r\n"); 43 44 int i; 45 IString istr; 46 struct file_msg file_msg; 47 char *tick_num; 48 49 if (Split(msg->message->payload," ",&istr)) 50 { 51 for (i = 0; i < istr.num; i++) 52 rt_kprintf("%s\n",istr.str[i]); 53 54 if(i == 2) 55 { 56 if(rt_strncmp(istr.str[0], "led", 3)) 57 { 58 KAWAII_MQTT_LOG_E("command error!\r\n"); 59 } 60 else 61 { 62 if(!(rt_strncmp(istr.str[1], "off", 3))) 63 { 64 rt_pin_write(LED_PIN_RED, PIN_HIGH); 65 } 66 else if(!(rt_strncmp(istr.str[1], "on", 2))) 67 { 68 rt_pin_write(LED_PIN_RED, PIN_LOW); 69 } 70 else 71 { 72 KAWAII_MQTT_LOG_E("state command error!\r\n"); 73 for (i = 0; i < istr.num; i++) 74 rt_free(istr.str[i]); 75 rt_free(istr.str); 76 return; 77 } 78 79 tick_num = rt_calloc(1, 10); 80 if (tick_num == RT_NULL) { 81 rt_kprintf("memory is not enough \r\n"); 82 } 83 else { 84 itoa(rt_tick_get(), tick_num, 10); 85 file_msg.timestamp = tick_num; 86 file_msg.cmd = istr.str[0]; 87 file_msg.state = istr.str[1]; 88 file_msg.str = istr.str; 89 90 rt_mq_send(&mq_demo, &file_msg, sizeof(file_msg)); 91 } 92 } 93 } 94 else 95 { 96 KAWAII_MQTT_LOG_E("command error! \r\n"); 97 for (i = 0; i < istr.num; i++) 98 rt_free(istr.str[i]); 99 rt_free(istr.str); 100 } 101 102 } 103 else 104 { 105 KAWAII_MQTT_LOG_E("Split failure!\r\n"); 106 } 107 }View Code
5 建立发布线程用于发布主题(设置主题Oos,负载内容,然后向指定主题发送(若是数据采集可以用消息队列等待发送)。)
1 /* 板子发布消息的线程 */ 2 static void mqtt_t_publish(void *parameter) 3 { 4 char pub_buf[64]; 5 mqtt_message_t msg; 6 memset(&msg, 0, sizeof(msg)); 7 8 rt_thread_mdelay(2 * RT_TICK_PER_SECOND); 9 10 while(1) 11 { 12 if(is_started) 13 { 14 memset(pub_buf, 0, 64); 15 16 sprintf(pub_buf, "%d", usMRegHoldBuf[SLAVE_ADDR - 1][MB_RECV_REG_NUM]); 17 18 msg.qos = QOS0; 19 msg.payload = pub_buf; 20 21 mqtt_publish(client, PUB_NAME, &msg); 22 } 23 rt_thread_mdelay(5 * RT_TICK_PER_SECOND); 24 } 25 }View Code