作者:俏巴
概述
之前物联网平台自定义Topic均做消息的直接透传,不做类似物模型模式的数据脚本解析。平台最新推出的自定义Topic脚本解析功能,设备通过携带解析标记(?_sn=default)的自定义Topic上报数据,物联网平台收到数据后,调用您在控制台提交的数据解析脚本,将自定义格式数据转换为JSON结构体,再流转给后续业务系统。本文主要演示该功能的具体功能实现。
Step By Step
1、创建产品和设备
2、添加脚本
3、设备端通过自定义Topic模拟上行数据
import com.alibaba.taro.AliyunIoTSignUtil;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
// 透传类设备测试
public class IoTDemoPubSubDemoForPersonalTopic {
<span class="hljs-comment">// 设备三元组信息</span>
public <span class="hljs-keyword">static</span> <span class="hljs-built_in">String</span> productKey = <span class="hljs-string">"a1wJG******"</span>;
public <span class="hljs-keyword">static</span> <span class="hljs-built_in">String</span> deviceName = <span class="hljs-string">"Device1"</span>;
public <span class="hljs-keyword">static</span> <span class="hljs-built_in">String</span> deviceSecret = <span class="hljs-string">"40YEyiGzXmvhDdpvbUVFCHjC********"</span>;
public <span class="hljs-keyword">static</span> <span class="hljs-built_in">String</span> regionId = <span class="hljs-string">"cn-shanghai"</span>;
private <span class="hljs-keyword">static</span> <span class="hljs-built_in">String</span> pubTopic = <span class="hljs-string">"/"</span>+ productKey + <span class="hljs-string">"/"</span> + deviceName + <span class="hljs-string">"/user/update?_sn=default"</span>;<span class="hljs-comment">//自定义Topic做脚本解析</span>
private <span class="hljs-keyword">static</span> MqttClient mqttClient;
public <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> main(<span class="hljs-built_in">String</span> [] args){
initAliyunIoTClient(); <span class="hljs-comment">// 初始化Client</span>
ScheduledExecutorService scheduledThreadPool = <span class="hljs-keyword">new</span> ScheduledThreadPoolExecutor(<span class="hljs-number">1</span>,
<span class="hljs-keyword">new</span> ThreadFactoryBuilder().setNameFormat(<span class="hljs-string">"thread-runner-%d"</span>).build());
scheduledThreadPool.scheduleAtFixedRate(()->postDeviceProperties(), <span class="hljs-number">10</span>,<span class="hljs-number">10</span>, TimeUnit.SECONDS);
}
<span class="hljs-comment">/**
* 初始化 Client 对象
*/</span>
private <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> initAliyunIoTClient() {
<span class="hljs-keyword">try</span> {
<span class="hljs-comment">// 构造连接需要的参数</span>
<span class="hljs-built_in">String</span> clientId = <span class="hljs-string">"java"</span> + System.currentTimeMillis();
<span class="hljs-built_in">Map</span><<span class="hljs-built_in">String</span>, <span class="hljs-built_in">String</span>> params = <span class="hljs-keyword">new</span> HashMap<>(<span class="hljs-number">16</span>);
params.put(<span class="hljs-string">"productKey"</span>, productKey);
params.put(<span class="hljs-string">"deviceName"</span>, deviceName);
params.put(<span class="hljs-string">"clientId"</span>, clientId);
<span class="hljs-built_in">String</span> timestamp = <span class="hljs-built_in">String</span>.valueOf(System.currentTimeMillis());
params.put(<span class="hljs-string">"timestamp"</span>, timestamp);
<span class="hljs-comment">// cn-shanghai</span>
<span class="hljs-built_in">String</span> targetServer = <span class="hljs-string">"tcp://"</span> + productKey + <span class="hljs-string">".iot-as-mqtt."</span>+regionId+<span class="hljs-string">".aliyuncs.com:1883"</span>;
<span class="hljs-built_in">String</span> mqttclientId = clientId + <span class="hljs-string">"|securemode=3,signmethod=hmacsha1,timestamp="</span> + timestamp + <span class="hljs-string">"|"</span>;
<span class="hljs-built_in">String</span> mqttUsername = deviceName + <span class="hljs-string">"&"</span> + productKey;
<span class="hljs-built_in">String</span> mqttPassword = AliyunIoTSignUtil.sign(params, deviceSecret, <span class="hljs-string">"hmacsha1"</span>);
connectMqtt(targetServer, mqttclientId, mqttUsername, mqttPassword);
} <span class="hljs-keyword">catch</span> (Exception e) {
System.out.println(<span class="hljs-string">"initAliyunIoTClient error "</span> + e.getMessage());
}
}
public <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> connectMqtt(<span class="hljs-built_in">String</span> url, <span class="hljs-built_in">String</span> clientId, <span class="hljs-built_in">String</span> mqttUsername, <span class="hljs-built_in">String</span> mqttPassword) throws Exception {
MemoryPersistence persistence = <span class="hljs-keyword">new</span> MemoryPersistence();
mqttClient = <span class="hljs-keyword">new</span> MqttClient(url, clientId, persistence);
MqttConnectOptions connOpts = <span class="hljs-keyword">new</span> MqttConnectOptions();
<span class="hljs-comment">// MQTT 3.1.1</span>
connOpts.setMqttVersion(<span class="hljs-number">4</span>);
connOpts.setAutomaticReconnect(<span class="hljs-literal">false</span>);
connOpts.setCleanSession(<span class="hljs-literal">true</span>);
connOpts.setUserName(mqttUsername);
connOpts.setPassword(mqttPassword.toCharArray());
connOpts.setKeepAliveInterval(<span class="hljs-number">60</span>);
mqttClient.connect(connOpts);
}
<span class="hljs-comment">/**
* 汇报属性
*/</span>
private <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> postDeviceProperties() {
<span class="hljs-keyword">try</span> {
<span class="hljs-comment">//上报数据</span>
System.out.println(<span class="hljs-string">"自定义Topic上报属性值"</span>);
<span class="hljs-comment">//0x000000000100320100000000</span>
<span class="hljs-built_in">String</span> hexString = <span class="hljs-string">"000000000100320100000000"</span>;
byte[] payLoad = hexToByteArray(hexString);
MqttMessage message = <span class="hljs-keyword">new</span> MqttMessage(payLoad);
message.setQos(<span class="hljs-number">0</span>);
mqttClient.publish(pubTopic, message);
} <span class="hljs-keyword">catch</span> (Exception e) {
System.out.println(e.getMessage());
}
}
<span class="hljs-comment">/**
* hex字符串转byte数组
* @param inHex 待转换的Hex字符串
* @return 转换后的byte数组结果
*/</span>
public <span class="hljs-keyword">static</span> byte[] hexToByteArray(<span class="hljs-built_in">String</span> inHex){
int hexlen = inHex.length();
byte[] result;
<span class="hljs-keyword">if</span> (hexlen % <span class="hljs-number">2</span> == <span class="hljs-number">1</span>){
<span class="hljs-comment">//奇数</span>
hexlen++;
result = <span class="hljs-keyword">new</span> byte[(hexlen/<span class="hljs-number">2</span>)];
inHex=<span class="hljs-string">"0"</span>+inHex;
}<span class="hljs-keyword">else</span> {
<span class="hljs-comment">//偶数</span>
result = <span class="hljs-keyword">new</span> byte[(hexlen/<span class="hljs-number">2</span>)];
}
int j=<span class="hljs-number">0</span>;
<span class="hljs-keyword">for</span> (int i = <span class="hljs-number">0</span>; i < hexlen; i+=<span class="hljs-number">2</span>){
result[j]=hexToByte(inHex.substring(i,i+<span class="hljs-number">2</span>));
j++;
}
<span class="hljs-keyword">return</span> result;
}
<span class="hljs-comment">/**
* Hex字符串转byte
* @param inHex 待转换的Hex字符串
* @return 转换后的byte
*/</span>
public <span class="hljs-keyword">static</span> byte hexToByte(<span class="hljs-built_in">String</span> inHex) {
<span class="hljs-keyword">return</span> (byte) Integer.parseInt(inHex, <span class="hljs-number">16</span>);
}
}
4、上行消息查看
5、注意事项
1、在物联网平台创建自定义Topic时按正常Topic定义,不添加该解析标记;
2、仅解析设备上报云端的数据,不解析云端下行数据;
3、仅解析上报数据的Payload,并返回解析后的Payload;
4、解析前后,数据所在Topic不变。例如,设备发送到/${productKey}/${deviceName}/user/update的数据,解析后仍在该Topic中。