概述
MQTT协议是基于PUB/SUB的异步通信模式,不适用于服务端同步控制设备端返回结果的场景。前面博客基于开源Java MQTT Client的阿里云物联网平台RRPC功能测试介绍了RRPC原理,使用Open API实现服务端的调用实现。本文介绍如何使用NET SDK在本地实现RRPC的服务端调用。
Step By Step
1、创建产品与设备
参考链接 准备工作部分。
2、设备端Code
import com.alibaba.taro.AliyunIoTSignUtil;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.HashMap;
import java.util.Map;
public class IoTRRPCForSysTopicDemo {
// 设备三元组信息
public static String productKey = "a16MX******";
public static String deviceName = "RRPC******";
public static String deviceSecret = "Ayq069ifWO7WOmwKyswNLv6E********";
public static String regionId = "cn-shanghai";
// RRPC 系统Topic
private static String subTopic = "/sys/" + productKey + "/" + deviceName+ "/rrpc/request/+";
private static MqttClient mqttClient;
public static void main(String [] args) {
initAliyunIoTClient();
// RRPC订阅Topic
try {
mqttClient.subscribe(subTopic);
} catch (MqttException e) {
e.printStackTrace();
}
mqttClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
System.out.println("connectionLost:" + cause.getMessage());
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("message: " + new String(message.getPayload()));
System.out.println("topic:" + topic);
// 根据RRPC请求消息Topic,构建RRPC响应消息Topic
String responseTopic = topic.replace("request","response");
MqttMessage message1 = new MqttMessage("resonse demo".getBytes("utf-8"));
mqttClient.publish(responseTopic, message1);
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}
});
}
/***
* 初始化Client
*/
private static void initAliyunIoTClient() {
try {
String clientId = "java" + System.currentTimeMillis();
Map<String, String> params = new HashMap<>(16);
params.put("productKey", productKey);
params.put("deviceName", deviceName);
params.put("clientId", clientId);
String timestamp = String.valueOf(System.currentTimeMillis());
params.put("timestamp", timestamp);
// cn-shanghai
String targetServer = "tcp://" + productKey + ".iot-as-mqtt." + regionId + ".aliyuncs.com:1883";
String mqttclientId = clientId + "|securemode=3,signmethod=hmacsha1,timestamp=" + timestamp + "|";
String mqttUsername = deviceName + "&" + productKey;
String mqttPassword = AliyunIoTSignUtil.sign(params, deviceSecret, "hmacsha1");
connectMqtt(targetServer, mqttclientId, mqttUsername, mqttPassword);
} catch (Exception e) {
System.out.println("initAliyunIoTClient error " + e.getMessage());
}
}
public static void connectMqtt(String url, String clientId, String mqttUsername, String mqttPassword) throws Exception {
MemoryPersistence persistence = new MemoryPersistence();
mqttClient = new MqttClient(url, clientId, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
// MQTT 3.1.1
connOpts.setMqttVersion(4);
connOpts.setAutomaticReconnect(false);
connOpts.setCleanSession(true);
connOpts.setUserName(mqttUsername);
connOpts.setPassword(mqttPassword.toCharArray());
connOpts.setKeepAliveInterval(60);
mqttClient.connect(connOpts);
}
}
3、云端Code
- 3.1 SDK安装:
Install-Package aliyun-net-sdk-iot -Version 7.4.0
- 3.2 NET Code
using System;
using System.Text;
using Aliyun.Acs.Core;
using Aliyun.Acs.Core.Exceptions;
using Aliyun.Acs.Core.Profile;
using Aliyun.Acs.Iot.Model.V20180120;
namespace IoTRRPCDemo
{
class Program
{
static void Main(string[] args)
{
// ak,sk https://yq.aliyun.com/articles/693979
IClientProfile profile = DefaultProfile.GetProfile("cn-shanghai", "LTAIOZZg********", "v7CjUJCMk7j9aKduMAQLjy********");
DefaultAcsClient client = new DefaultAcsClient(profile);
client.SetConnectTimeoutInMilliSeconds(8000);// 设置 Client 连接超时时间,默认是5S,RRPC响应超过5S是必须设置
var request = new RRpcRequest();
request.Timeout = 8000;
// 设置设备三元组信息
request.DeviceName = "RRPC******";
request.RequestBase64Byte = "5raI5oGv5LiL6KGM5rWL6K+V"; // 消息下行测试 base64编码内容
request.ProductKey = "a16MX******";
try
{
var response = client.GetAcsResponse(request);
Console.WriteLine(Encoding.Default.GetString(response.HttpResponse.Content));
Console.ReadKey();
}
catch (ServerException e)
{
Console.WriteLine("ServerException:");
Console.WriteLine(e);
Console.ReadKey();
}
catch (ClientException e)
{
Console.WriteLine("ClientException: ");
Console.WriteLine(e);
Console.ReadKey();
}
}
}
}
4、测试情况
- 4.1 设备端
- 4.2 云端