作者:俏巴
概述
使用物联网平台规则引擎的数据流转功能,可将Topic中的数据消息转发至其他Topic或其他阿里云产品进行存储或处理。本文主要演示通过规则引擎将设备上行消息流转到函数计算,并通过函数计算发送消息到钉钉机器人。
Step By Step
产品及设备准备
1、创建产品
2、定义物模型
3、添加设备
4、使用SDK 上行消息,参考链接:基于开源JAVA MQTT Client连接阿里云IoT
import com.alibaba.taro.AliyunIoTSignUtil;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class IoTDemoPubSubDemo {
<span class="hljs-comment">// 设备三元组信息</span>
public <span class="hljs-keyword">static</span> <span class="hljs-built_in">String</span> productKey = <span class="hljs-string">"a16MX********"</span>;
public <span class="hljs-keyword">static</span> <span class="hljs-built_in">String</span> deviceName = <span class="hljs-string">"device1"</span>;
public <span class="hljs-keyword">static</span> <span class="hljs-built_in">String</span> deviceSecret = <span class="hljs-string">"YGLHxUr40E1JaWhk3IVAm0uk********"</span>;
public <span class="hljs-keyword">static</span> <span class="hljs-built_in">String</span> regionId = <span class="hljs-string">"cn-shanghai"</span>;
<span class="hljs-comment">// 物模型-属性上报topic</span>
private <span class="hljs-keyword">static</span> <span class="hljs-built_in">String</span> pubTopic = <span class="hljs-string">"/sys/"</span> + productKey + <span class="hljs-string">"/"</span> + deviceName + <span class="hljs-string">"/thing/event/property/post"</span>;
<span class="hljs-comment">// 自定义topic,在产品Topic列表位置定义</span>
private <span class="hljs-keyword">static</span> <span class="hljs-built_in">String</span> subTopic = <span class="hljs-string">"/sys/"</span> + productKey + <span class="hljs-string">"/"</span> + deviceName + <span class="hljs-string">"/thing/event/property/post_reply"</span>;
private <span class="hljs-keyword">static</span> MqttClient mqttClient;
public <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> main(<span class="hljs-built_in">String</span> [] args){
initAliyunIoTClient();
ScheduledExecutorService scheduledThreadPool = <span class="hljs-keyword">new</span> ScheduledThreadPoolExecutor(<span class="hljs-number">1</span>,
<span class="hljs-keyword">new</span> ThreadFactoryBuilder().setNameFormat(<span class="hljs-string">"thread-runner-%d"</span>).build());
scheduledThreadPool.scheduleAtFixedRate(()->postDeviceProperties(), <span class="hljs-number">10</span>,<span class="hljs-number">5</span>, TimeUnit.SECONDS);
<span class="hljs-keyword">try</span> {
mqttClient.subscribe(subTopic); <span class="hljs-comment">// 订阅Topic</span>
} <span class="hljs-keyword">catch</span> (MqttException e) {
System.out.println(<span class="hljs-string">"error:"</span> + e.getMessage());
e.printStackTrace();
}
<span class="hljs-comment">// 设置订阅监听</span>
mqttClient.setCallback(<span class="hljs-keyword">new</span> MqttCallback() {
@Override
public <span class="hljs-keyword">void</span> connectionLost(Throwable throwable) {
System.out.println(<span class="hljs-string">"connection Lost"</span>);
}
@Override
public <span class="hljs-keyword">void</span> messageArrived(<span class="hljs-built_in">String</span> s, MqttMessage mqttMessage) throws Exception {
System.out.println(<span class="hljs-string">"Sub message"</span>);
System.out.println(<span class="hljs-string">"Topic : "</span> + s);
System.out.println(<span class="hljs-keyword">new</span> <span class="hljs-built_in">String</span>(mqttMessage.getPayload())); <span class="hljs-comment">//打印输出消息payLoad</span>
}
@Override
public <span class="hljs-keyword">void</span> deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
});
}
<span class="hljs-comment">/**
* 初始化 Client 对象
*/</span>
private <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> initAliyunIoTClient() {
<span class="hljs-keyword">try</span> {
<span class="hljs-comment">// 构造连接需要的参数</span>
<span class="hljs-built_in">String</span> clientId = <span class="hljs-string">"java"</span> + System.currentTimeMillis();
<span class="hljs-built_in">Map</span><<span class="hljs-built_in">String</span>, <span class="hljs-built_in">String</span>> params = <span class="hljs-keyword">new</span> HashMap<>(<span class="hljs-number">16</span>);
params.put(<span class="hljs-string">"productKey"</span>, productKey);
params.put(<span class="hljs-string">"deviceName"</span>, deviceName);
params.put(<span class="hljs-string">"clientId"</span>, clientId);
<span class="hljs-built_in">String</span> timestamp = <span class="hljs-built_in">String</span>.valueOf(System.currentTimeMillis());
params.put(<span class="hljs-string">"timestamp"</span>, timestamp);
<span class="hljs-comment">// cn-shanghai</span>
<span class="hljs-built_in">String</span> targetServer = <span class="hljs-string">"tcp://"</span> + productKey + <span class="hljs-string">".iot-as-mqtt."</span>+regionId+<span class="hljs-string">".aliyuncs.com:1883"</span>;
<span class="hljs-built_in">String</span> mqttclientId = clientId + <span class="hljs-string">"|securemode=3,signmethod=hmacsha1,timestamp="</span> + timestamp + <span class="hljs-string">"|"</span>;
<span class="hljs-built_in">String</span> mqttUsername = deviceName + <span class="hljs-string">"&"</span> + productKey;
<span class="hljs-built_in">String</span> mqttPassword = AliyunIoTSignUtil.sign(params, deviceSecret, <span class="hljs-string">"hmacsha1"</span>);
connectMqtt(targetServer, mqttclientId, mqttUsername, mqttPassword);
} <span class="hljs-keyword">catch</span> (Exception e) {
System.out.println(<span class="hljs-string">"initAliyunIoTClient error "</span> + e.getMessage());
}
}
public <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> connectMqtt(<span class="hljs-built_in">String</span> url, <span class="hljs-built_in">String</span> clientId, <span class="hljs-built_in">String</span> mqttUsername, <span class="hljs-built_in">String</span> mqttPassword) throws Exception {
MemoryPersistence persistence = <span class="hljs-keyword">new</span> MemoryPersistence();
mqttClient = <span class="hljs-keyword">new</span> MqttClient(url, clientId, persistence);
MqttConnectOptions connOpts = <span class="hljs-keyword">new</span> MqttConnectOptions();
<span class="hljs-comment">// MQTT 3.1.1</span>
connOpts.setMqttVersion(<span class="hljs-number">4</span>);
connOpts.setAutomaticReconnect(<span class="hljs-literal">false</span>);
// connOpts.setCleanSession(true);
connOpts.setCleanSession(<span class="hljs-literal">false</span>);
connOpts.setUserName(mqttUsername);
connOpts.setPassword(mqttPassword.toCharArray());
connOpts.setKeepAliveInterval(<span class="hljs-number">60</span>);
mqttClient.connect(connOpts);
}
<span class="hljs-comment">/**
* 汇报属性
*/</span>
private <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> postDeviceProperties() {
<span class="hljs-keyword">try</span> {
<span class="hljs-comment">//上报数据</span>
<span class="hljs-comment">//高级版 物模型-属性上报payload</span>
System.out.println(<span class="hljs-string">"上报属性值"</span>);
<span class="hljs-built_in">String</span> payloadJson = <span class="hljs-string">"{\"params\":{\"CurrentTemperature\":13,\"Humidity\":10}}"</span>;
MqttMessage message = <span class="hljs-keyword">new</span> MqttMessage(payloadJson.getBytes(<span class="hljs-string">"utf-8"</span>));
message.setQos(<span class="hljs-number">1</span>);
mqttClient.publish(pubTopic, message);
} <span class="hljs-keyword">catch</span> (Exception e) {
System.out.println(e.getMessage());
}
}
}
5、运行状态查看
函数计算创建与配置
1、创建应用
2、应用下面添加函数
3、编辑脚本
const https = require('https');
const accessToken = '填写accessToken,即钉钉机器人webhook的accessToken';
module.exports.handler = function(event, context, callback) {
var eventJson = JSON.parse(event.toString());
console.log(event.toString());
//钉钉消息格式
const postData = JSON.stringify({
"msgtype": "markdown",
"markdown": {
"title": "设备温湿度传感器",
"text": "#### 温湿度传感器上报n" +
"> 设备名称:" + eventJson.deviceName+ "nn" +
"> 实时温度:" + eventJson.Temperature + "℃nn" +
"> 相对湿度:" + eventJson.Humidity + "%nn" +
"> ###### " + eventJson.time + " 发布 by 物联网平台 n"
},
"at": {
"isAtAll": false
}
});
const options = {
hostname: 'oapi.dingtalk.com',
port: 443,
path: '/robot/send?access_token=' + accessToken,
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Content-Length': Buffer.byteLength(postData)
}
};
const req = https.request(options, (res) => {
res.setEncoding('utf8');
res.on('data', (chunk) => {});
res.on('end', () => {
callback(null, 'success');
});
});
// 异常返回
req.on('error', (e) => {
callback(e);
});
// 写入数据
req.write(postData);
req.end();
};
钉钉机器人webhook的accessToken获取参考链接:阿里云IoT Studio服务开发定时关灯功能示例Demo: 2.3 钉钉机器人Webhook获取 部分。
4、快速测试
规则引擎配置
1、创建规则引擎
2、配置处理数据
SQL字段
deviceName() as deviceName, items.Humidity.value as Humidity, items.CurrentTemperature.value as Temperature, timestamp('yyyy-MM-dd HH:mm:ss') as time
3、配置转发数据
4、启动设备端SDK,周期性上行消息,钉钉群查看通知
5、上行日志查看