一、规则链配置
原本的规则链需要增加转换脚本(这里需要增加RPC发给设备的规则,否则仪表库的控制设备,发数据过去没效果)
- 先过滤一个字段
- 转换脚本,将RPC数据给到属性
- 保存属性
msgType = "POST_ATTRIBUTES_REQUEST";
msg = {"CaptureNow":msg.params};
return {msg: msg, metadata: metadata, msgType: msgType};
二、源码
这里实现的逻辑
- 周期性获取一次 共享属性(共享属性可以通过平台进行建立和修改和删除)
- 若这个共享属性为1时,触发一下RPC命令,命令带上false
- 服务器收到RPC命令后,因为设置了规则链,会将这个共享属性变成false
#include <stdio.h>
#include <stdint.h>
#include <stddef.h>
#include <string.h>
#include "freertos/FreeRTOS.h"
#include "freertos/task.h"
#include "esp_system.h"
#include "esp_log.h"
#include "esp_netif.h"
#include "esp_event.h"
#include "protocol_examples_common.h"
#include "nvs.h"
#include "nvs_flash.h"
#include "lwip/sockets.h"
#include "lwip/dns.h"
#include "lwip/netdb.h"
#include "mqtt_client.h"
#include "cJSON.h"
static const char *TAG = "MQTT_EXAMPLE";
uint8_t mqtt_connect_flag = 0;
esp_mqtt_client_handle_t g_client;
static esp_err_t mqtt_event_handler(esp_mqtt_event_handle_t event)
{
esp_mqtt_client_handle_t client = event->client;
int msg_id;
// your_context_t *context = event->context;
switch (event->event_id) {
case MQTT_EVENT_CONNECTED:
ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED");
// msg_id = esp_mqtt_client_publish(client, "v1/devices/me/telemetry", "{temperature:25}", 0, 1, 0);
// ESP_LOGI(TAG, "sent publish successful, msg_id=%d", msg_id);
// msg_id = esp_mqtt_client_publish(client, "v1/devices/me/attributes", "{humtiti:1.0}", 0, 1,0 );
// ESP_LOGI(TAG, "sent publish successful, msg_id=%d", msg_id);
// msg_id = esp_mqtt_client_subscribe(client, "v1/devices/me/attributes", 1);
// ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id);
//设备收到的rpc请求
msg_id = esp_mqtt_client_subscribe(client, "v1/devices/me/rpc/request/+", 1);
ESP_LOGI(TAG, "sent subscribe device rpc topic successful, msg_id=%d", msg_id);
//发送rpc后,收到的回复
msg_id = esp_mqtt_client_subscribe(client, "v1/devices/me/rpc/response/+", 1);
ESP_LOGI(TAG, "sent subscribe device rpc topic successful, msg_id=%d", msg_id);
// msg_id = esp_mqtt_client_publish(client, "v1/devices/me/rpc/response/1", "{\"params\": false}", 0, 1, 0);
// ESP_LOGI(TAG, "sent publish device rpc successful, msg_id=%d", msg_id);
// msg_id = esp_mqtt_client_publish(client, "v1/devices/me/rpc/request/1", "{\"method\": \"getTime\",\"params\": {}}", 0, 1, 0);
// ESP_LOGI(TAG, "sent publish device rpc successful, msg_id=%d", msg_id);
//设备自身请求属性回复
msg_id = esp_mqtt_client_subscribe(client, "v1/devices/me/attributes/response/+", 1);
ESP_LOGI(TAG, "sent subscribe attributes successful, msg_id=%d", msg_id);
// msg_id = esp_mqtt_client_publish(client, "v1/devices/me/attributes/request/1", "{ \"sharedKeys\":\"humtiti,shot\"}", 0, 1, 0);
// ESP_LOGI(TAG, "sent publish attributes successful, msg_id=%d", msg_id);
mqtt_connect_flag = 1;
g_client = client;
// msg_id = esp_mqtt_client_unsubscribe(client, "/topic/qos1");
//ESP_LOGI(TAG, "sent unsubscribe successful, msg_id=%d", msg_id);
break;
case MQTT_EVENT_DISCONNECTED:
ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED");
mqtt_connect_flag = 0;
break;
case MQTT_EVENT_SUBSCRIBED:
ESP_LOGI(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event->msg_id);
msg_id = esp_mqtt_client_publish(client, "v1/devices/me/rpc/request/1", "{\"method\": \"getDevice\",\"params\": {}}", 0, 0, 0);
ESP_LOGI(TAG, "sent publish successful, msg_id=%d", msg_id);
break;
case MQTT_EVENT_UNSUBSCRIBED:
ESP_LOGI(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id);
break;
case MQTT_EVENT_PUBLISHED:
ESP_LOGI(TAG, "MQTT_EVENT_PUBLISHED, msg_id=%d", event->msg_id);
break;
case MQTT_EVENT_DATA:
ESP_LOGI(TAG, "MQTT_EVENT_DATA");
printf("TOPIC=%.*s\r\n", event->topic_len, event->topic);
printf("DATA=%.*s\r\n", event->data_len, event->data);
// msg_id = esp_mqtt_client_publish(client, "v1/devices/me/attributes", "{\"params\": false}", 0, 1, 0);
//ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id);
//收到请求的数据
if(strstr((const char *)event->topic,"me/attributes/response"))
{
cJSON * pJson = cJSON_Parse(event->data);
if(NULL == pJson)
{
ESP_LOGI("CJSON", "cJSON_Parse error");
break;
}
ESP_LOGI("CJSON", "cJSON_Parse ok");
//{"shared":{"CaptureNow":false}}
cJSON * pSub = cJSON_GetObjectItem(pJson, "shared");//获取 shared 数据
if(pSub != NULL)
{
//获取 CaptureNow
cJSON * pSubSub = cJSON_GetObjectItem(pSub, "CaptureNow");
if(NULL==pSubSub)
{
ESP_LOGI("CJSON", "get CaptureNow error");
}
else
{
ESP_LOGI("CJSON", "CaptureNow:%d",pSubSub->valueint);
if(pSubSub->valueint == 1)//等于1时,rpc将属性设置为false
{
//这里等待1秒,让你看清楚而已
vTaskDelay(1000/portTICK_PERIOD_MS);
int msg_id;
static uint8_t request_id = 0;
char request_topic[50];
char publish_data[100]="{\"method\": \"setCaptureNow\",\"params\": false}";
sprintf(request_topic,"v1/devices/me/rpc/request/%d",request_id++);
msg_id = esp_mqtt_client_publish(g_client, request_topic, publish_data, 0, 1, 0);
ESP_LOGI(TAG, "sent publish successful, msg_id=%d", msg_id);
}
// ESP_LOGI("CJSON", "temperature:%d",pSubSub->valueint);
}
}
cJSON_Delete(pJson);
}
//收到RPC数据请求
else if(strstr((const char *)event->topic,"rpc/request"))
{
cJSON * pJson = cJSON_Parse(event->data);
if(NULL == pJson)
{
ESP_LOGI("CJSON", "cJSON_Parse error");
break;
}
cJSON * pSub = cJSON_GetObjectItem(pJson, "method");//获取 method
if(pSub != NULL)
{
if(!strcmp(pSub->valuestring,"setThrust"))
{
//这里设置数据
ESP_LOGI(TAG,"rpc/request ---> setThrust");
}
}
cJSON_Delete(pJson);
}
//收到RPC数据的回复
else if(strstr((const char *)event->topic,"rpc/response"))
{
ESP_LOGI(TAG,"GET rpc/response");
}
break;
case MQTT_EVENT_ERROR:
ESP_LOGI(TAG, "MQTT_EVENT_ERROR");
break;
}
return ESP_OK;
}
static void mqtt_app_start(void)
{
esp_mqtt_client_config_t mqtt_cfg = {
//.uri = CONFIG_BROKER_URL,
.host = "192.168.82.94", //MQTT服务器IP
//.uri = "mqtt://192.168.82.94:1883",
.event_handle = mqtt_event_handler, //MQTT事件
.port=1883, //端口
.username = "U4G6ESZucf0faN6gWaut", //用户名
.client_id = "f10220b0-3707-11ec-a7ab-0d2f8ea1e2e0",
// .password = "public", //密码
// .user_context = (void *)your_context
};
ESP_LOGI(TAG, "uri:%s\r\nusername:%s\r\nclient_id:%s\r\n",mqtt_cfg.uri,mqtt_cfg.username,mqtt_cfg.client_id);
esp_mqtt_client_handle_t client = esp_mqtt_client_init(&mqtt_cfg);
esp_mqtt_client_start(client);
}
static void get_attr_task(void *arg)
{
//vTaskDelay(5000 / portTICK_PERIOD_MS);
for(;;)
{
if(mqtt_connect_flag == 1)
{
vTaskDelay(5000 / portTICK_PERIOD_MS);
//{"sharedKeys":"CaptureNow"}
int msg_id;
static uint8_t request_id = 0;
char request_topic[50];
char publish_data[100]="{\"sharedKeys\":\"CaptureNow\"}";
sprintf(request_topic,"v1/devices/me/attributes/request/%d",request_id++);
msg_id = esp_mqtt_client_publish(g_client, request_topic, publish_data, 0, 1, 0);
ESP_LOGI(TAG, "sent publish successful, msg_id=%d", msg_id);
}
vTaskDelay(1000 / portTICK_PERIOD_MS);
}
}
void app_main()
{
ESP_ERROR_CHECK(nvs_flash_init());
ESP_ERROR_CHECK(esp_netif_init());
ESP_ERROR_CHECK(esp_event_loop_create_default());
ESP_ERROR_CHECK(example_connect());
ESP_LOGI(TAG, "[APP] Startup..");
ESP_LOGI(TAG, "[APP] Free memory: %d bytes", esp_get_free_heap_size());
ESP_LOGI(TAG, "[APP] IDF version: %s", esp_get_idf_version());
esp_log_level_set("*", ESP_LOG_INFO);
esp_log_level_set("MQTT_CLIENT", ESP_LOG_VERBOSE);
esp_log_level_set("TRANSPORT_TCP", ESP_LOG_VERBOSE);
esp_log_level_set("TRANSPORT_SSL", ESP_LOG_VERBOSE);
esp_log_level_set("TRANSPORT", ESP_LOG_VERBOSE);
esp_log_level_set("OUTBOX", ESP_LOG_VERBOSE);
//这个是用来每1秒获取一下数据而已
xTaskCreate(get_attr_task, "get_attr", 2048, NULL, configMAX_PRIORITIES-2, NULL);
mqtt_app_start();
}