业务场景
1、云端向设备发送的下行消息或者异步服务调用到平台也算结束,平台再向设备进行一个透传
2、设备端向平台发送的上行消息到平台就算结束
3、云端通过服务端订阅来获取设备上行的消息
原理
1、要想获取异步服务调用的返回结果,首先设备得有返回
(1) 设备接收平台端透传过来的异步服务调用
(2)设备端收到数据后进行响应
2、这里的设备响应结果发送给平台之后,平台可通过
云产品流传或服务端订阅,再将消息发送给云端
方式一:云平台流转,注意选择Topic:
/${productKey}/${deviceName}/thing/downlink/reply/message
这个为什么可行?可参考官方文档说明
然后添加操作为发布到AMQP消费组
方式二:AMQP服务端订阅
订阅什么呢?勾选设备上报消息即可(前提是设备端有返回,即满足原理1的前提)
操作步骤
不多说了,直接上步骤
1、准备测试用的产品和设备
主要是定义一个异步服务:这里不详细阐述
2、准备测试用的云端调试工具
可以是集成云端SDK的Demo,可以是业务逻辑应用调用云端API,最简单的直接使用云端API在线调试工具
具体参数填写规范,这里也不做详细阐述
3、物联网平台控制台上配置好规则引擎
(1)云平台流转
选择好产品设备和topic
注意SQL语句的编写,这里的字段就是要发送给AMQP客户端的消息内容,可以事先进行调试。
这里要注意AMQP客户端都是按照既定的协议格式进行过滤数据的,所以这里的消息内容需要按照协议进行配置
确定好消息内容后
SQL语句:
SELECT timestamp('yyyy-MM-dd\'T\'HH:mm:ss\'Z\'') as 云平台流转至AMQP测试,deviceName() as deviceName, code as code,data as data,topic() as topic,messageId() as requestId,id as id,topic(1) as productKey,iotId as iotId FROM "/a16hDZJpRCl/IoTDeviceDemo1thing/downlink/reply/message" WHERE
(2)服务端订阅
勾选设备上报消息即可,具体消费组怎么创建就不详细阐述
实测效果:
4、设备端接收消息+响应reply
代码示例:
pom.xml:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>MQTTClient</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>6</source>
<target>6</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>23.0</version>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-core</artifactId>
<version>3.5.1</version>
</dependency>
</dependencies>
</project>
AliyunIoTSignUtil:
package com.alibaba.taro;
import javax.crypto.Mac;
import javax.crypto.SecretKey;
import javax.crypto.spec.SecretKeySpec;
import java.util.Arrays;
import java.util.Map;
/**
* AliyunIoTSignUtil
*/
public class AliyunIoTSignUtil {
public static String sign(Map<String, String> params, String deviceSecret, String signMethod) {
//将参数Key按字典顺序排序
String[] sortedKeys = params.keySet().toArray(new String[] {});
Arrays.sort(sortedKeys);
//生成规范化请求字符串
StringBuilder canonicalizedQueryString = new StringBuilder();
for (String key : sortedKeys) {
if ("sign".equalsIgnoreCase(key)) {
continue;
}
canonicalizedQueryString.append(key).append(params.get(key));
}
try {
String key = deviceSecret;
return encryptHMAC(signMethod,canonicalizedQueryString.toString(), key);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* HMACSHA1加密
*
*/
public static String encryptHMAC(String signMethod,String content, String key) throws Exception {
SecretKey secretKey = new SecretKeySpec(key.getBytes("utf-8"), signMethod);
Mac mac = Mac.getInstance(secretKey.getAlgorithm());
mac.init(secretKey);
byte[] data = mac.doFinal(content.getBytes("utf-8"));
return bytesToHexString(data);
}
public static final String bytesToHexString(byte[] bArray) {
StringBuffer sb = new StringBuffer(bArray.length);
String sTemp;
for (int i = 0; i < bArray.length; i++) {
sTemp = Integer.toHexString(0xFF & bArray[i]);
if (sTemp.length() < 2) {
sb.append(0);
}
sb.append(sTemp.toUpperCase());
}
return sb.toString();
}
}
Demo:
package com.alibaba;
import com.alibaba.taro.AliyunIoTSignUtil;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.json.JSONObject;
import java.util.HashMap;
import java.util.Map;
public class CustomTopicMessageDemo2 {
public static String productKey = "a16hD*****";
public static String deviceName = "IoTDevice****";
public static String deviceSecret = "0895205d*********";
public static String regionId = "cn-shanghai";
// 物模型-属性上报topic
//private static String pubTopic = "/sys/" + productKey + "/" + deviceName + "/thing/event/property/post";
//private static String subTopic = "/sys/" + productKey + "/" + deviceName + "/thing/service/property/set";
// 自定义topic,在产品Topic列表位置定义
//private static String pubTopic = "/"+productKey + "/" + deviceName+"/user/DemoTest";
//private static String subTopic = "/"+productKey + "/" + deviceName+"/user/DemoTest";
private static String pubTopic = "/"+productKey + "/" + deviceName+"/user/get";
private static String subTopic = "/"+productKey + "/" + deviceName+"/user/get";
private static MqttClient mqttClient;
public static void main(String [] args){
initAliyunIoTClient();
// ScheduledExecutorService scheduledThreadPool = new ScheduledThreadPoolExecutor(1,
// new ThreadFactoryBuilder().setNameFormat("thread-runner-%d").build());
//
// scheduledThreadPool.scheduleAtFixedRate(()->postDeviceProperties(), 10,10, TimeUnit.SECONDS);
// 汇报属性
//String payloadJson = "{\"params\":{\"MasterLightSwitch\":0,\"LivingLightSwitch\":0,\"SecondaryLightSwotch\":0,\"MasterCurtainSwitch\":1,\"SecondaryCurtainSwitch\":1,\"LivingCurtainSwitch\":1}}";
//String payloadJson = "{\"params\":{\"Temp\":77,\"yyy\":{\"tttt\":\"123\"}}}";
String payloadJson = "{\"params\":{\"Temp\":77,\"yyy\":\"8888\"}}";
//String payloadJson = "{\"tts\":\"ss\"}";
//String payloadJson = "34454545";
postDeviceProperties(payloadJson);
try {
mqttClient.subscribe(subTopic); // 订阅Topic
} catch (MqttException e) {
System.out.println("error:" + e.getMessage());
e.printStackTrace();
}
// 设置订阅监听
mqttClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable throwable) {
System.out.println("connection Lost");
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
String payload = new String(mqttMessage.getPayload());
System.out.println(" 接收消息:");
System.out.println("Topic : " + s);
System.out.println(payload); //打印输出消息payLoad
System.out.println("=================================================================");
// String subTopic = "/sys/" + productKey + "/" + deviceName + "/thing/service/property/set";
// if(s.equals(subTopic)) {
// JSONObject jsonProperty = new JSONObject(payload);
// if(jsonProperty.has("params"))
// {
// String paramsJson = jsonProperty.get("params").toString();
// System.out.println("test paramsJson is:\n" + paramsJson);
// String params = "{\"params\": " + paramsJson + "}";
// System.out.println("test params is:\n" + params);
// System.out.println("收到属性设置后,再上报一次属性:");
// postDeviceProperties(params);
// }
// }
//收到服务调用,给予返回reply
// 下行(Alink JSON):
// 请求Topic:/sys/{productKey}/{deviceName}/thing/service/{tsl.service.identifier}
// 响应Topic:/sys/{productKey}/{deviceName}/thing/service/{tsl.service.identifier}_reply
String subTopic = "/sys/" + productKey + "/" + deviceName + "/thing/service/StartP2PStreaming";
String replyTopic = "/sys/" + productKey + "/" + deviceName + "/thing/service/StartP2PStreaming_reply";
if(s.equals(subTopic)) {
JSONObject jsonProperty = new JSONObject(payload);
if(jsonProperty.has("id"))
{
String id = jsonProperty.get("id").toString();
String replyJson = "{\"data\":{},\"code\":200,\"id\":\""+ id +"\"}";
//System.out.println("test replyJson is:\n" + replyJson);
//String replys = "{\"params\": " + replyJson + "}";
//System.out.println("test reply is:\n" + replys);
System.out.println("收到服务调用后,给予返回");
postServiceReply(replyJson,replyTopic);
}
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
});
}
/**
* 初始化 Client 对象
*/
private static void initAliyunIoTClient() {
try {
// 构造连接需要的参数
String clientId = "java" + System.currentTimeMillis();
Map<String, String> params = new HashMap<String, String>(16);
params.put("productKey", productKey);
params.put("deviceName", deviceName);
params.put("clientId", clientId);
String timestamp = String.valueOf(System.currentTimeMillis());
params.put("timestamp", timestamp);
// cn-shanghai
String targetServer = "tcp://" + productKey + ".iot-as-mqtt."+regionId+".aliyuncs.com:1883";
String mqttclientId = clientId + "|securemode=3,signmethod=hmacsha1,timestamp=" + timestamp + "|";
String mqttUsername = deviceName + "&" + productKey;
String mqttPassword = AliyunIoTSignUtil.sign(params, deviceSecret, "hmacsha1");
connectMqtt(targetServer, mqttclientId, mqttUsername, mqttPassword);
} catch (Exception e) {
System.out.println("initAliyunIoTClient error " + e.getMessage());
}
}
public static void connectMqtt(String url, String clientId, String mqttUsername, String mqttPassword) throws Exception {
MemoryPersistence persistence = new MemoryPersistence();
mqttClient = new MqttClient(url, clientId, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
// MQTT 3.1.1
connOpts.setMqttVersion(4);
connOpts.setAutomaticReconnect(false);
connOpts.setCleanSession(false);
//connOpts.setCleanSession(true);
connOpts.setUserName(mqttUsername);
connOpts.setPassword(mqttPassword.toCharArray());
connOpts.setKeepAliveInterval(60);
mqttClient.connect(connOpts);
}
/**
* 汇报属性
*/
private static void postDeviceProperties(String payloadJson) {
try {
//上报数据
//高级版 物模型-属性上报payload
System.out.println("上报属性值:");
//String payloadJson = "{\"params\":{\"Status\":0,\"Data\":\"15\"}}";
//String payloadJson = "{\"GeoLocation\":{\"Longitude\":120.99,\"Latitude\":30.13,\"Altitude\":39.01},\"BatteryPercentage\":40.703533, \"Temperature\":2.233362}";
//String payloadJson = "{\"id\":\"3\",\"version\":\"1.0\",\"params\":{\"GeoLocation\":{\"Longitude\":120.999,\"Latitude\":30.13,\"Altitude\":39.01},\"BatteryPercentage\":42.99999, \"Temperature\":2.233362}}";
//String payloadJson = "{\"params\":{\"MasterLightSwitch\":0,\"LivingLightSwitch\":0,\"SecondaryLightSwotch\":0,\"MasterCurtainSwitch\":1,\"SecondaryCurtainSwitch\":1,\"LivingCurtainSwitch\":1}}";
System.out.println(payloadJson);
MqttMessage message = new MqttMessage(payloadJson.getBytes("utf-8"));
message.setQos(0);
mqttClient.publish(pubTopic, message);
System.out.println("=================================================================");
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
/**
* 服务返回
*/
private static void postServiceReply(String payloadJson,String relpyTopic) {
try {
//上报数据
//高级版 物模型-属性上报payload
System.out.println("服务调用返回:");
//String payloadJson = "{\"params\":{\"Status\":0,\"Data\":\"15\"}}";
System.out.println("Topic:");
System.out.println(relpyTopic);
System.out.println(payloadJson);
MqttMessage message = new MqttMessage(payloadJson.getBytes("utf-8"));
message.setQos(0);
mqttClient.publish(relpyTopic, message);
System.out.println("=================================================================");
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
}
实测效果:
5、云端使用AMQP客户端登录,并接收消息
参考官方文档,这里就不作详细阐述。
https://help.aliyun.com/document_detail/143601.html?spm=a2c4g.11186623.6.624.304e354e2OEGFh
代码示例:
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>Test</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<!-- amqp 1.0 qpid client -->
<!-- <dependency>-->
<!-- <groupId>org.apache.qpid</groupId>-->
<!-- <artifactId>qpid-jms-client</artifactId>-->
<!-- <version>0.47.0</version>-->
<!-- </dependency>-->
<!-- https://mvnrepository.com/artifact/commons-codec/commons-codec -->
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.10</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.qpid/qpid-jms-client -->
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-jms-client</artifactId>
<version>0.47.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.maven.surefire</groupId>
<artifactId>maven-surefire-common</artifactId>
<version>2.12.4</version>
</dependency>
<!-- util for base64-->
<!-- <dependency>-->
<!-- <groupId>commons-codec</groupId>-->
<!-- <artifactId>commons-codec</artifactId>-->
<!-- <version>1.3</version>-->
<!-- </dependency>-->
</dependencies>
</project>
Demo
package com.alibaba;
import org.apache.commons.codec.binary.Base64;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsConnectionListener;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.net.URI;
import java.util.Hashtable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class AmqpJavaClientDemo {
private final static Logger logger = LoggerFactory.getLogger(AmqpJavaClientDemo.class);
//业务处理异步线程池,线程池参数可以根据您的业务特点调整,或者您也可以用其他异步方式处理接收到的消息。
private final static ExecutorService executorService = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors() * 2,
60,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(50000));
public static void main(String[] args) throws Exception {
//参数说明,请参见AMQP客户端接入说明文档。
String accessKey = "LTAI4G2*****";
String accessSecret = "Mp2f4qopmULI6*****";
String consumerGroupId = "e0oRIYMSOYwQ*****";
//iotInstanceId:购买的实例请填写实例ID,公共实例请填空字符串""。
String iotInstanceId = "";
long timeStamp = System.currentTimeMillis();
//签名方法:支持hmacmd5、hmacsha1和hmacsha256。
String signMethod = "hmacsha1";
//控制台服务端订阅中消费组状态页客户端ID一栏将显示clientId参数。
//建议使用机器UUID、MAC地址、IP等唯一标识等作为clientId。便于您区分识别不同的客户端。
String clientId = "yangboClientId";
//userName组装方法,请参见AMQP客户端接入说明文档。
String userName = clientId + "|authMode=aksign"
+ ",signMethod=" + signMethod
+ ",timestamp=" + timeStamp
+ ",authId=" + accessKey
+ ",iotInstanceId=" + iotInstanceId
+ ",consumerGroupId=" + consumerGroupId
+ "|";
//计算签名,password组装方法,请参见AMQP客户端接入说明文档。
String signContent = "authId=" + accessKey + "×tamp=" + timeStamp;
String password = doSign(signContent,accessSecret, signMethod);
//接入域名,请参见AMQP客户端接入说明文档。
String connectionUrl = "failover:(amqps://1875496626634053.iot-amqp.cn-shanghai.aliyuncs.com:5671?amqp.idleTimeout=80000)"
+ "?failover.reconnectDelay=30";
Hashtable<String, String> hashtable = new Hashtable<String, String>();
hashtable.put("connectionfactory.SBCF",connectionUrl);
hashtable.put("queue.QUEUE", "default");
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Context context = new InitialContext(hashtable);
ConnectionFactory cf = (ConnectionFactory)context.lookup("SBCF");
Destination queue = (Destination)context.lookup("QUEUE");
// Create Connection
Connection connection = cf.createConnection(userName, password);
((JmsConnection) connection).addConnectionListener(myJmsConnectionListener);
System.out.println("connection success");
// Create Session
// Session.CLIENT_ACKNOWLEDGE: 收到消息后,需要手动调用message.acknowledge()。
// Session.AUTO_ACKNOWLEDGE: SDK自动ACK(推荐)。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
// Create Receiver Link
MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(messageListener);
}
private static MessageListener messageListener = new MessageListener() {
@Override
public void onMessage(Message message) {
try {
//1.收到消息之后一定要ACK。
// 推荐做法:创建Session选择Session.AUTO_ACKNOWLEDGE,这里会自动ACK。
// 其他做法:创建Session选择Session.CLIENT_ACKNOWLEDGE,这里一定要调message.acknowledge()来ACK。
// message.acknowledge();
//2.建议异步处理收到的消息,确保onMessage函数里没有耗时逻辑。
// 如果业务处理耗时过程过长阻塞住线程,可能会影响SDK收到消息后的正常回调。
executorService.submit(new Runnable() {
public void run() {
processMessage(message);
}
});
} catch (Exception e) {
logger.error("submit task occurs exception ", e);
}
}
};
/**
* 在这里处理您收到消息后的具体业务逻辑。
*/
private static void processMessage(Message message) {
try {
byte[] body = message.getBody(byte[].class);
String content = new String(body);
String topic = message.getStringProperty("topic");
String messageId = message.getStringProperty("messageId");
System.out.println("receive message"
+ ", topic = " + topic
+ ", messageId = " + messageId
+ ", content = " + content);
logger.info("receive message"
+ ", topic = " + topic
+ ", messageId = " + messageId
+ ", content = " + content);
} catch (Exception e) {
logger.error("processMessage occurs error ", e);
}
}
private static JmsConnectionListener myJmsConnectionListener = new JmsConnectionListener() {
/**
* 连接成功建立。
*/
@Override
public void onConnectionEstablished(URI remoteURI) {
logger.info("onConnectionEstablished, remoteUri:{}", remoteURI);
}
/**
* 尝试过最大重试次数之后,最终连接失败。
*/
@Override
public void onConnectionFailure(Throwable error) {
logger.error("onConnectionFailure, {}", error.getMessage());
}
/**
* 连接中断。
*/
@Override
public void onConnectionInterrupted(URI remoteURI) {
logger.info("onConnectionInterrupted, remoteUri:{}", remoteURI);
}
/**
* 连接中断后又自动重连上。
*/
@Override
public void onConnectionRestored(URI remoteURI) {
logger.info("onConnectionRestored, remoteUri:{}", remoteURI);
}
@Override
public void onInboundMessage(JmsInboundMessageDispatch envelope) {}
@Override
public void onSessionClosed(Session session, Throwable cause) {}
@Override
public void onConsumerClosed(MessageConsumer consumer, Throwable cause) {}
@Override
public void onProducerClosed(MessageProducer producer, Throwable cause) {}
};
/**
* 计算签名,password组装方法,请参见AMQP客户端接入说明文档。
*/
private static String doSign(String toSignString, String secret, String signMethod) throws Exception {
SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(), signMethod);
Mac mac = Mac.getInstance(signMethod);
mac.init(signingKey);
byte[] rawHmac = mac.doFinal(toSignString.getBytes());
return Base64.encodeBase64String(rawHmac);
// return Arrays.toString(Base64.encodeBase64(rawHmac));
}
}
实测效果:
云平台流转方式的返回结果:
AMQP订阅方式返回结果: