原理
实现上述需求,其核心就是任意俩个字,而解决这种问题的手段一般就是“通配”。虽然是通配,但是每一条消息实际上还是有个“明确”的目的地的
(1)规则引擎配置:
源端:配置通配符 "+"
这一条保证消息源可以是任意的
目的端:配置通配符"${TargetDevice}"
这一条保证目的地是任意的,而真正的目的地可以根据TargetDevice变化而变化。
(2)设备端
源设备上报TargetDevice字段 ,表明该消息去向
Step By Step
前提:测试相关的环境准备好,两个程序(设备)(A作为源,B作为目的地),物联网平台上topic啊,物模型啊之类的都定义好。
1、配置规则引擎
(1)编写SQL语句
这里选择的是自定义topic消息,也可以选其他的。(注意topic需要有发布权限)
SELECT TargetDevice FROM "/a16*JpRCl/+/user/update"
(2)添加操作(配置目的地)
这里选下发到另一个自定义topic(注意,测试设备需要成功订阅这个topic)
/a16*pRCl/${TargetDevice}/user/get
(3)启动规则
2、设备端开发
源设备A:上报消息(消息中一定要包含TargetDevice字段)
这里体现TargetDevice必须要有,你们自己测试的时候可以再加些其他字段
源码:
AliyunIoTSignUtil:
package com.alibaba.taro;
import javax.crypto.Mac;
import javax.crypto.SecretKey;
import javax.crypto.spec.SecretKeySpec;
import java.util.Arrays;
import java.util.Map;
/**
* AliyunIoTSignUtil
*/
public class AliyunIoTSignUtil {
public static String sign(Map<String, String> params, String deviceSecret, String signMethod) {
//将参数Key按字典顺序排序
String[] sortedKeys = params.keySet().toArray(new String[] {});
Arrays.sort(sortedKeys);
//生成规范化请求字符串
StringBuilder canonicalizedQueryString = new StringBuilder();
for (String key : sortedKeys) {
if ("sign".equalsIgnoreCase(key)) {
continue;
}
canonicalizedQueryString.append(key).append(params.get(key));
}
try {
String key = deviceSecret;
return encryptHMAC(signMethod,canonicalizedQueryString.toString(), key);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* HMACSHA1加密
*
*/
public static String encryptHMAC(String signMethod,String content, String key) throws Exception {
SecretKey secretKey = new SecretKeySpec(key.getBytes("utf-8"), signMethod);
Mac mac = Mac.getInstance(secretKey.getAlgorithm());
mac.init(secretKey);
byte[] data = mac.doFinal(content.getBytes("utf-8"));
return bytesToHexString(data);
}
public static final String bytesToHexString(byte[] bArray) {
StringBuffer sb = new StringBuffer(bArray.length);
String sTemp;
for (int i = 0; i < bArray.length; i++) {
sTemp = Integer.toHexString(0xFF & bArray[i]);
if (sTemp.length() < 2) {
sb.append(0);
}
sb.append(sTemp.toUpperCase());
}
return sb.toString();
}
}
设备A:
package com.alibaba;
import com.alibaba.taro.AliyunIoTSignUtil;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.json.JSONObject;
import java.util.HashMap;
import java.util.Map;
public class CustomTopicMessageDemo {
public static String productKey = "a16***pRCl";
public static String deviceName = "IoTDeviceDemo1";
public static String deviceSecret = "a3b15a11*****c4952";
public static String regionId = "cn-shanghai";
// 自定义topic,在产品Topic列表位置定义
private static String pubTopic = "/"+productKey + "/" + deviceName+"/user/update";
private static String subTopic = "/"+productKey + "/" + deviceName+"/user/update";
private static MqttClient mqttClient;
public static void main(String [] args){
initAliyunIoTClient();
// ScheduledExecutorService scheduledThreadPool = new ScheduledThreadPoolExecutor(1,
// new ThreadFactoryBuilder().setNameFormat("thread-runner-%d").build());
//
// scheduledThreadPool.scheduleAtFixedRate(()->postDeviceProperties(), 10,10, TimeUnit.SECONDS);
// 汇报属性
String payloadJson = "{\"TargetDevice\":\"IoTDeviceDemo2\"}";
postDeviceProperties(payloadJson);
try {
mqttClient.subscribe(subTopic); // 订阅Topic
} catch (MqttException e) {
System.out.println("error:" + e.getMessage());
e.printStackTrace();
}
// 设置订阅监听
mqttClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable throwable) {
System.out.println("connection Lost");
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
String payload = new String(mqttMessage.getPayload());
System.out.println(" 接收消息:");
System.out.println("Topic : " + s);
System.out.println(payload); //打印输出消息payLoad
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
});
}
/**
* 初始化 Client 对象
*/
private static void initAliyunIoTClient() {
try {
// 构造连接需要的参数
String clientId = "java" + System.currentTimeMillis();
Map<String, String> params = new HashMap<String, String>(16);
params.put("productKey", productKey);
params.put("deviceName", deviceName);
params.put("clientId", clientId);
String timestamp = String.valueOf(System.currentTimeMillis());
params.put("timestamp", timestamp);
// cn-shanghai
//String targetServer = "tcp://" + productKey + ".iot-as-mqtt."+regionId+".aliyuncs.com:443";
String targetServer = "tcp://" + productKey + ".iot-as-mqtt."+regionId+".aliyuncs.com:1883";
String mqttclientId = clientId + "|securemode=3,signmethod=hmacsha1,timestamp=" + timestamp + "|";
String mqttUsername = deviceName + "&" + productKey;
String mqttPassword = AliyunIoTSignUtil.sign(params, deviceSecret, "hmacsha1");
connectMqtt(targetServer, mqttclientId, mqttUsername, mqttPassword);
} catch (Exception e) {
System.out.println("initAliyunIoTClient error " + e.getMessage());
}
}
public static void connectMqtt(String url, String clientId, String mqttUsername, String mqttPassword) throws Exception {
MemoryPersistence persistence = new MemoryPersistence();
mqttClient = new MqttClient(url, clientId, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
// MQTT 3.1.1
connOpts.setMqttVersion(4);
connOpts.setAutomaticReconnect(false);
connOpts.setCleanSession(false);
//connOpts.setCleanSession(true);
connOpts.setUserName(mqttUsername);
connOpts.setPassword(mqttPassword.toCharArray());
connOpts.setKeepAliveInterval(60);
mqttClient.connect(connOpts);
}
/**
* 汇报属性
*/
private static void postDeviceProperties(String payloadJson) {
try {
//上报数据
//高级版 物模型-属性上报payload
System.out.println("上报属性值:");
MqttMessage message = new MqttMessage(payloadJson.getBytes("utf-8"));
message.setQos(0);
mqttClient.publish(pubTopic, message);
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
/**
* 服务返回
*/
private static void postServiceReply(String payloadJson,String relpyTopic) {
try {
//上报数据
//高级版 物模型-属性上报payload
System.out.println("服务调用返回:");
//String payloadJson = "{\"params\":{\"Status\":0,\"Data\":\"15\"}}";
System.out.println("Topic:");
System.out.println(relpyTopic);
System.out.println(payloadJson);
MqttMessage message = new MqttMessage(payloadJson.getBytes("utf-8"));
message.setQos(0);
mqttClient.publish(relpyTopic, message);
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
}
目的设备B:
订阅规则引擎配置的topic
/a16*pRCl/${TargetDevice}/user/get
源码:
B设备
package com.alibaba;
import com.alibaba.taro.AliyunIoTSignUtil;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.json.JSONObject;
import java.util.HashMap;
import java.util.Map;
public class CustomTopicMessageDemo2 {
public static String productKey = "a1*****pRCl";
public static String deviceName = "IoTDeviceDemo2";
public static String deviceSecret = "0895205*****7e2b4bf2";
public static String regionId = "cn-shanghai";
private static String pubTopic = "/"+productKey + "/" + deviceName+"/user/get";
private static String subTopic = "/"+productKey + "/" + deviceName+"/user/get";
private static MqttClient mqttClient;
public static void main(String [] args){
initAliyunIoTClient();
// ScheduledExecutorService scheduledThreadPool = new ScheduledThreadPoolExecutor(1,
// new ThreadFactoryBuilder().setNameFormat("thread-runner-%d").build());
//
// scheduledThreadPool.scheduleAtFixedRate(()->postDeviceProperties(), 10,10, TimeUnit.SECONDS);
String payloadJson = "{\"tts\":\"ss\"}";
postDeviceProperties(payloadJson);
try {
mqttClient.subscribe(subTopic); // 订阅Topic
} catch (MqttException e) {
System.out.println("error:" + e.getMessage());
e.printStackTrace();
}
// 设置订阅监听
mqttClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable throwable) {
System.out.println("connection Lost");
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
String payload = new String(mqttMessage.getPayload());
System.out.println(" 接收消息:");
System.out.println("Topic : " + s);
System.out.println(payload); //打印输出消息payLoad
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
});
}
/**
* 初始化 Client 对象
*/
private static void initAliyunIoTClient() {
try {
// 构造连接需要的参数
String clientId = "java" + System.currentTimeMillis();
Map<String, String> params = new HashMap<String, String>(16);
params.put("productKey", productKey);
params.put("deviceName", deviceName);
params.put("clientId", clientId);
String timestamp = String.valueOf(System.currentTimeMillis());
params.put("timestamp", timestamp);
// cn-shanghai
String targetServer = "tcp://" + productKey + ".iot-as-mqtt."+regionId+".aliyuncs.com:1883";
String mqttclientId = clientId + "|securemode=3,signmethod=hmacsha1,timestamp=" + timestamp + "|";
String mqttUsername = deviceName + "&" + productKey;
String mqttPassword = AliyunIoTSignUtil.sign(params, deviceSecret, "hmacsha1");
connectMqtt(targetServer, mqttclientId, mqttUsername, mqttPassword);
} catch (Exception e) {
System.out.println("initAliyunIoTClient error " + e.getMessage());
}
}
public static void connectMqtt(String url, String clientId, String mqttUsername, String mqttPassword) throws Exception {
MemoryPersistence persistence = new MemoryPersistence();
mqttClient = new MqttClient(url, clientId, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
// MQTT 3.1.1
connOpts.setMqttVersion(4);
connOpts.setAutomaticReconnect(false);
connOpts.setCleanSession(false);
//connOpts.setCleanSession(true);
connOpts.setUserName(mqttUsername);
connOpts.setPassword(mqttPassword.toCharArray());
connOpts.setKeepAliveInterval(60);
mqttClient.connect(connOpts);
}
/**
* 汇报属性
*/
private static void postDeviceProperties(String payloadJson) {
try {
//上报数据
//高级版 物模型-属性上报payload
System.out.println("上报属性值:");
System.out.println(payloadJson);
MqttMessage message = new MqttMessage(payloadJson.getBytes("utf-8"));
message.setQos(0);
mqttClient.publish(pubTopic, message);
System.out.println("=================================================================");
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
/**
* 服务返回
*/
private static void postServiceReply(String payloadJson,String relpyTopic) {
try {
//上报数据
//高级版 物模型-属性上报payload
System.out.println("服务调用返回:");
//String payloadJson = "{\"params\":{\"Status\":0,\"Data\":\"15\"}}";
System.out.println("Topic:");
System.out.println(relpyTopic);
System.out.println(payloadJson);
MqttMessage message = new MqttMessage(payloadJson.getBytes("utf-8"));
message.setQos(0);
mqttClient.publish(relpyTopic, message);
System.out.println("=================================================================");
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
}
测试结果
1、运行B程序、再运行A程序
A程序上报消息
B程序收到云平台转发消息
2、控制台日志
2020/12/20 14:16:28.304
设备A上报消息:
2020/12/20 14:16:28.334
平台触发规则引擎转发消息
2020/12/20 14:16:28.336
设备B收到转发的消息
总览: