阿里云物联网平台如何订阅异步服务调用的返回结果?

业务场景

阿里云物联网平台如何订阅异步服务调用的返回结果?

1、云端向设备发送的下行消息或者异步服务调用到平台也算结束,平台再向设备进行一个透传
2、设备端向平台发送的上行消息到平台就算结束
3、云端通过服务端订阅来获取设备上行的消息

原理
1、要想获取异步服务调用的返回结果,首先设备得有返回
(1) 设备接收平台端透传过来的异步服务调用
阿里云物联网平台如何订阅异步服务调用的返回结果?

(2)设备端收到数据后进行响应
阿里云物联网平台如何订阅异步服务调用的返回结果?
2、这里的设备响应结果发送给平台之后,平台可通过
云产品流传或服务端订阅,再将消息发送给云端
方式一:云平台流转,注意选择Topic:
/${productKey}/${deviceName}/thing/downlink/reply/message
阿里云物联网平台如何订阅异步服务调用的返回结果?
这个为什么可行?可参考官方文档说明
阿里云物联网平台如何订阅异步服务调用的返回结果?
然后添加操作为发布到AMQP消费组
阿里云物联网平台如何订阅异步服务调用的返回结果?

方式二:AMQP服务端订阅
订阅什么呢?勾选设备上报消息即可(前提是设备端有返回,即满足原理1的前提)
阿里云物联网平台如何订阅异步服务调用的返回结果?

操作步骤
不多说了,直接上步骤
1、准备测试用的产品和设备
主要是定义一个异步服务:这里不详细阐述
阿里云物联网平台如何订阅异步服务调用的返回结果?

2、准备测试用的云端调试工具
可以是集成云端SDK的Demo,可以是业务逻辑应用调用云端API,最简单的直接使用云端API在线调试工具
具体参数填写规范,这里也不做详细阐述
阿里云物联网平台如何订阅异步服务调用的返回结果?

3、物联网平台控制台上配置好规则引擎
(1)云平台流转
阿里云物联网平台如何订阅异步服务调用的返回结果?
选择好产品设备和topic

阿里云物联网平台如何订阅异步服务调用的返回结果?
注意SQL语句的编写,这里的字段就是要发送给AMQP客户端的消息内容,可以事先进行调试。
阿里云物联网平台如何订阅异步服务调用的返回结果?
阿里云物联网平台如何订阅异步服务调用的返回结果?

这里要注意AMQP客户端都是按照既定的协议格式进行过滤数据的,所以这里的消息内容需要按照协议进行配置
阿里云物联网平台如何订阅异步服务调用的返回结果?

确定好消息内容后

SQL语句:

SELECT timestamp('yyyy-MM-dd\'T\'HH:mm:ss\'Z\'') as 云平台流转至AMQP测试,deviceName() as deviceName, code as code,data as data,topic() as topic,messageId() as requestId,id as id,topic(1) as productKey,iotId as iotId FROM "/a16hDZJpRCl/IoTDeviceDemo1thing/downlink/reply/message" WHERE

(2)服务端订阅
阿里云物联网平台如何订阅异步服务调用的返回结果?
勾选设备上报消息即可,具体消费组怎么创建就不详细阐述

实测效果:
阿里云物联网平台如何订阅异步服务调用的返回结果?

4、设备端接收消息+响应reply
阿里云物联网平台如何订阅异步服务调用的返回结果?
阿里云物联网平台如何订阅异步服务调用的返回结果?

代码示例:
pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>MQTTClient</artifactId>
    <version>1.0-SNAPSHOT</version>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>6</source>
                    <target>6</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
    <dependencies>
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.1.0</version>
        </dependency>
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>23.0</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>aliyun-java-sdk-core</artifactId>
            <version>3.5.1</version>
        </dependency>
    </dependencies>

</project>

AliyunIoTSignUtil:

package com.alibaba.taro;

import javax.crypto.Mac;
import javax.crypto.SecretKey;
import javax.crypto.spec.SecretKeySpec;
import java.util.Arrays;
import java.util.Map;

/**
 * AliyunIoTSignUtil
 */

public class AliyunIoTSignUtil {
    public static String sign(Map<String, String> params, String deviceSecret, String signMethod) {
        //将参数Key按字典顺序排序
        String[] sortedKeys = params.keySet().toArray(new String[] {});
        Arrays.sort(sortedKeys);

        //生成规范化请求字符串
        StringBuilder canonicalizedQueryString = new StringBuilder();
        for (String key : sortedKeys) {
            if ("sign".equalsIgnoreCase(key)) {
                continue;
            }
            canonicalizedQueryString.append(key).append(params.get(key));
        }

        try {
            String key = deviceSecret;
            return encryptHMAC(signMethod,canonicalizedQueryString.toString(), key);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * HMACSHA1加密
     *
     */
    public static String encryptHMAC(String signMethod,String content, String key) throws Exception {
        SecretKey secretKey = new SecretKeySpec(key.getBytes("utf-8"), signMethod);
        Mac mac = Mac.getInstance(secretKey.getAlgorithm());
        mac.init(secretKey);
        byte[] data = mac.doFinal(content.getBytes("utf-8"));
        return bytesToHexString(data);
    }

    public static final String bytesToHexString(byte[] bArray) {

        StringBuffer sb = new StringBuffer(bArray.length);
        String sTemp;
        for (int i = 0; i < bArray.length; i++) {
            sTemp = Integer.toHexString(0xFF & bArray[i]);
            if (sTemp.length() < 2) {
                sb.append(0);
            }
            sb.append(sTemp.toUpperCase());
        }
        return sb.toString();
    }
}

Demo:


package com.alibaba;

import com.alibaba.taro.AliyunIoTSignUtil;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.json.JSONObject;

import java.util.HashMap;
import java.util.Map;


public class CustomTopicMessageDemo2 {

    public static String productKey = "a16hD*****";
    public static String deviceName = "IoTDevice****";
    public static String deviceSecret = "0895205d*********";
    public static String regionId = "cn-shanghai";


    // 物模型-属性上报topic
    //private static String pubTopic = "/sys/" + productKey + "/" + deviceName + "/thing/event/property/post";
    //private static String subTopic = "/sys/" + productKey + "/" + deviceName + "/thing/service/property/set";
    // 自定义topic,在产品Topic列表位置定义
    //private static String pubTopic = "/"+productKey + "/" + deviceName+"/user/DemoTest";
    //private static String subTopic = "/"+productKey + "/" + deviceName+"/user/DemoTest";
    private static String pubTopic = "/"+productKey + "/" + deviceName+"/user/get";
    private static String subTopic = "/"+productKey + "/" + deviceName+"/user/get";

    private static MqttClient mqttClient;

    public static void main(String [] args){

        initAliyunIoTClient();
//        ScheduledExecutorService scheduledThreadPool = new ScheduledThreadPoolExecutor(1,
//                new ThreadFactoryBuilder().setNameFormat("thread-runner-%d").build());
//
//        scheduledThreadPool.scheduleAtFixedRate(()->postDeviceProperties(), 10,10, TimeUnit.SECONDS);
        // 汇报属性
        //String payloadJson = "{\"params\":{\"MasterLightSwitch\":0,\"LivingLightSwitch\":0,\"SecondaryLightSwotch\":0,\"MasterCurtainSwitch\":1,\"SecondaryCurtainSwitch\":1,\"LivingCurtainSwitch\":1}}";
        //String payloadJson = "{\"params\":{\"Temp\":77,\"yyy\":{\"tttt\":\"123\"}}}";
        String payloadJson = "{\"params\":{\"Temp\":77,\"yyy\":\"8888\"}}";
        //String payloadJson = "{\"tts\":\"ss\"}";
        //String payloadJson = "34454545";
        postDeviceProperties(payloadJson);

        try {
            mqttClient.subscribe(subTopic); // 订阅Topic
        } catch (MqttException e) {
            System.out.println("error:" + e.getMessage());
            e.printStackTrace();
        }

        // 设置订阅监听
        mqttClient.setCallback(new MqttCallback() {
            @Override
            public void connectionLost(Throwable throwable) {
                System.out.println("connection Lost");

            }

            @Override
            public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                String payload =  new String(mqttMessage.getPayload());
                System.out.println(" 接收消息:");
                System.out.println("Topic : " + s);
                System.out.println(payload); //打印输出消息payLoad
                System.out.println("=================================================================");

//                String subTopic = "/sys/" + productKey + "/" + deviceName + "/thing/service/property/set";
//                if(s.equals(subTopic)) {
//                    JSONObject jsonProperty = new JSONObject(payload);
//                    if(jsonProperty.has("params"))
//                    {
//                        String paramsJson = jsonProperty.get("params").toString();
//                        System.out.println("test paramsJson is:\n" + paramsJson);
//                        String params = "{\"params\": " +  paramsJson + "}";
//                        System.out.println("test params is:\n" + params);
//                        System.out.println("收到属性设置后,再上报一次属性:");
//                        postDeviceProperties(params);
//                    }
//                }

                //收到服务调用,给予返回reply
//                下行(Alink JSON):
//                请求Topic:/sys/{productKey}/{deviceName}/thing/service/{tsl.service.identifier}
//                响应Topic:/sys/{productKey}/{deviceName}/thing/service/{tsl.service.identifier}_reply
                String subTopic = "/sys/" + productKey + "/" + deviceName + "/thing/service/StartP2PStreaming";
                String replyTopic = "/sys/" + productKey + "/" + deviceName + "/thing/service/StartP2PStreaming_reply";
                if(s.equals(subTopic)) {
                    JSONObject jsonProperty = new JSONObject(payload);
                    if(jsonProperty.has("id"))
                    {
                        String id = jsonProperty.get("id").toString();
                        String replyJson = "{\"data\":{},\"code\":200,\"id\":\""+ id +"\"}";
                        //System.out.println("test replyJson is:\n" + replyJson);
                        //String replys = "{\"params\": " +  replyJson + "}";
                        //System.out.println("test reply is:\n" + replys);
                        System.out.println("收到服务调用后,给予返回");
                        postServiceReply(replyJson,replyTopic);
                    }
                }
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {

            }
        });

    }

    /**
     * 初始化 Client 对象
     */
    private static void initAliyunIoTClient() {

        try {
            // 构造连接需要的参数
            String clientId = "java" + System.currentTimeMillis();
            Map<String, String> params = new HashMap<String, String>(16);
            params.put("productKey", productKey);
            params.put("deviceName", deviceName);
            params.put("clientId", clientId);
            String timestamp = String.valueOf(System.currentTimeMillis());
            params.put("timestamp", timestamp);
            // cn-shanghai
            String targetServer = "tcp://" + productKey + ".iot-as-mqtt."+regionId+".aliyuncs.com:1883";

            String mqttclientId = clientId + "|securemode=3,signmethod=hmacsha1,timestamp=" + timestamp + "|";
            String mqttUsername = deviceName + "&" + productKey;
            String mqttPassword = AliyunIoTSignUtil.sign(params, deviceSecret, "hmacsha1");

            connectMqtt(targetServer, mqttclientId, mqttUsername, mqttPassword);

        } catch (Exception e) {
            System.out.println("initAliyunIoTClient error " + e.getMessage());
        }
    }

    public static void connectMqtt(String url, String clientId, String mqttUsername, String mqttPassword) throws Exception {

        MemoryPersistence persistence = new MemoryPersistence();
        mqttClient = new MqttClient(url, clientId, persistence);
        MqttConnectOptions connOpts = new MqttConnectOptions();
        // MQTT 3.1.1
        connOpts.setMqttVersion(4);
        connOpts.setAutomaticReconnect(false);
        connOpts.setCleanSession(false);
        //connOpts.setCleanSession(true);

        connOpts.setUserName(mqttUsername);
        connOpts.setPassword(mqttPassword.toCharArray());
        connOpts.setKeepAliveInterval(60);

        mqttClient.connect(connOpts);
    }

    /**
     * 汇报属性
     */
    private static void postDeviceProperties(String payloadJson) {

        try {
            //上报数据
            //高级版 物模型-属性上报payload
            System.out.println("上报属性值:");
            //String payloadJson = "{\"params\":{\"Status\":0,\"Data\":\"15\"}}";
            //String payloadJson = "{\"GeoLocation\":{\"Longitude\":120.99,\"Latitude\":30.13,\"Altitude\":39.01},\"BatteryPercentage\":40.703533, \"Temperature\":2.233362}";
            //String payloadJson = "{\"id\":\"3\",\"version\":\"1.0\",\"params\":{\"GeoLocation\":{\"Longitude\":120.999,\"Latitude\":30.13,\"Altitude\":39.01},\"BatteryPercentage\":42.99999, \"Temperature\":2.233362}}";
            //String payloadJson = "{\"params\":{\"MasterLightSwitch\":0,\"LivingLightSwitch\":0,\"SecondaryLightSwotch\":0,\"MasterCurtainSwitch\":1,\"SecondaryCurtainSwitch\":1,\"LivingCurtainSwitch\":1}}";
            System.out.println(payloadJson);
            MqttMessage message = new MqttMessage(payloadJson.getBytes("utf-8"));
            message.setQos(0);
            mqttClient.publish(pubTopic, message);
            System.out.println("=================================================================");
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
    }

    /**
     * 服务返回
     */
    private static void postServiceReply(String payloadJson,String relpyTopic) {

        try {
            //上报数据
            //高级版 物模型-属性上报payload
            System.out.println("服务调用返回:");
            //String payloadJson = "{\"params\":{\"Status\":0,\"Data\":\"15\"}}";
            System.out.println("Topic:");
            System.out.println(relpyTopic);
            System.out.println(payloadJson);
            MqttMessage message = new MqttMessage(payloadJson.getBytes("utf-8"));
            message.setQos(0);
            mqttClient.publish(relpyTopic, message);
            System.out.println("=================================================================");
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
    }

}


实测效果:
阿里云物联网平台如何订阅异步服务调用的返回结果?

5、云端使用AMQP客户端登录,并接收消息

参考官方文档,这里就不作详细阐述。
https://help.aliyun.com/document_detail/143601.html?spm=a2c4g.11186623.6.624.304e354e2OEGFh

代码示例:
pom.xml


<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>Test</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>
        <!-- amqp 1.0 qpid client -->
<!--        <dependency>-->
<!--            <groupId>org.apache.qpid</groupId>-->
<!--            <artifactId>qpid-jms-client</artifactId>-->
<!--            <version>0.47.0</version>-->
<!--        </dependency>-->


        <!-- https://mvnrepository.com/artifact/commons-codec/commons-codec -->
        <dependency>
            <groupId>commons-codec</groupId>
            <artifactId>commons-codec</artifactId>
            <version>1.10</version>
        </dependency>


        <!-- https://mvnrepository.com/artifact/org.apache.qpid/qpid-jms-client -->
        <dependency>
            <groupId>org.apache.qpid</groupId>
            <artifactId>qpid-jms-client</artifactId>
            <version>0.47.0</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.25</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.maven.surefire</groupId>
            <artifactId>maven-surefire-common</artifactId>
            <version>2.12.4</version>
        </dependency>


        <!-- util for base64-->

<!--        <dependency>-->
<!--            <groupId>commons-codec</groupId>-->
<!--            <artifactId>commons-codec</artifactId>-->
<!--            <version>1.3</version>-->
<!--        </dependency>-->
    </dependencies>


</project>

Demo

package com.alibaba;

import org.apache.commons.codec.binary.Base64;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsConnectionListener;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.net.URI;
import java.util.Hashtable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class AmqpJavaClientDemo {

    private final static Logger logger = LoggerFactory.getLogger(AmqpJavaClientDemo.class);

    //业务处理异步线程池,线程池参数可以根据您的业务特点调整,或者您也可以用其他异步方式处理接收到的消息。
    private final static ExecutorService executorService = new ThreadPoolExecutor(
            Runtime.getRuntime().availableProcessors(),
            Runtime.getRuntime().availableProcessors() * 2,
            60,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(50000));


    public static void main(String[] args) throws Exception {
        //参数说明,请参见AMQP客户端接入说明文档。
        String accessKey = "LTAI4G2*****";
        String accessSecret = "Mp2f4qopmULI6*****";
        String consumerGroupId = "e0oRIYMSOYwQ*****";
        //iotInstanceId:购买的实例请填写实例ID,公共实例请填空字符串""。
        String iotInstanceId = "";
        long timeStamp = System.currentTimeMillis();
        //签名方法:支持hmacmd5、hmacsha1和hmacsha256。
        String signMethod = "hmacsha1";
        //控制台服务端订阅中消费组状态页客户端ID一栏将显示clientId参数。
        //建议使用机器UUID、MAC地址、IP等唯一标识等作为clientId。便于您区分识别不同的客户端。
        String clientId = "yangboClientId";

        //userName组装方法,请参见AMQP客户端接入说明文档。
        String userName = clientId + "|authMode=aksign"
                + ",signMethod=" + signMethod
                + ",timestamp=" + timeStamp
                + ",authId=" + accessKey
                + ",iotInstanceId=" + iotInstanceId
                + ",consumerGroupId=" + consumerGroupId
                + "|";
        //计算签名,password组装方法,请参见AMQP客户端接入说明文档。
        String signContent = "authId=" + accessKey + "&timestamp=" + timeStamp;
        String password = doSign(signContent,accessSecret, signMethod);
        //接入域名,请参见AMQP客户端接入说明文档。
            String connectionUrl = "failover:(amqps://1875496626634053.iot-amqp.cn-shanghai.aliyuncs.com:5671?amqp.idleTimeout=80000)"
                + "?failover.reconnectDelay=30";

        Hashtable<String, String> hashtable = new Hashtable<String, String>();
        hashtable.put("connectionfactory.SBCF",connectionUrl);
        hashtable.put("queue.QUEUE", "default");
        hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
        Context context = new InitialContext(hashtable);
        ConnectionFactory cf = (ConnectionFactory)context.lookup("SBCF");
        Destination queue = (Destination)context.lookup("QUEUE");
        // Create Connection
        Connection connection = cf.createConnection(userName, password);
        ((JmsConnection) connection).addConnectionListener(myJmsConnectionListener);

        System.out.println("connection success");
        // Create Session
        // Session.CLIENT_ACKNOWLEDGE: 收到消息后,需要手动调用message.acknowledge()。
        // Session.AUTO_ACKNOWLEDGE: SDK自动ACK(推荐)。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        connection.start();
        // Create Receiver Link
        MessageConsumer consumer = session.createConsumer(queue);
        consumer.setMessageListener(messageListener);
    }

    private static MessageListener messageListener = new MessageListener() {
        @Override
        public void onMessage(Message message) {
            try {
                //1.收到消息之后一定要ACK。
                // 推荐做法:创建Session选择Session.AUTO_ACKNOWLEDGE,这里会自动ACK。
                // 其他做法:创建Session选择Session.CLIENT_ACKNOWLEDGE,这里一定要调message.acknowledge()来ACK。
                // message.acknowledge();
                //2.建议异步处理收到的消息,确保onMessage函数里没有耗时逻辑。
                // 如果业务处理耗时过程过长阻塞住线程,可能会影响SDK收到消息后的正常回调。
                executorService.submit(new Runnable() {
                    public void run() {
                        processMessage(message);
                    }
                });
            } catch (Exception e) {
                logger.error("submit task occurs exception ", e);
            }
        }
    };

    /**
     * 在这里处理您收到消息后的具体业务逻辑。
     */
    private static void processMessage(Message message) {
        try {
            byte[] body = message.getBody(byte[].class);
            String content = new String(body);
            String topic = message.getStringProperty("topic");
            String messageId = message.getStringProperty("messageId");


            System.out.println("receive message"
                    + ", topic = " + topic
                    + ", messageId = " + messageId
                    + ", content = " + content);
            logger.info("receive message"
                    + ", topic = " + topic
                    + ", messageId = " + messageId
                    + ", content = " + content);
        } catch (Exception e) {
            logger.error("processMessage occurs error ", e);
        }
    }

    private static JmsConnectionListener myJmsConnectionListener = new JmsConnectionListener() {
        /**
         * 连接成功建立。
         */
        @Override
        public void onConnectionEstablished(URI remoteURI) {
            logger.info("onConnectionEstablished, remoteUri:{}", remoteURI);
        }

        /**
         * 尝试过最大重试次数之后,最终连接失败。
         */
        @Override
        public void onConnectionFailure(Throwable error) {
            logger.error("onConnectionFailure, {}", error.getMessage());
        }

        /**
         * 连接中断。
         */
        @Override
        public void onConnectionInterrupted(URI remoteURI) {
            logger.info("onConnectionInterrupted, remoteUri:{}", remoteURI);
        }

        /**
         * 连接中断后又自动重连上。
         */
        @Override
        public void onConnectionRestored(URI remoteURI) {
            logger.info("onConnectionRestored, remoteUri:{}", remoteURI);
        }

        @Override
        public void onInboundMessage(JmsInboundMessageDispatch envelope) {}

        @Override
        public void onSessionClosed(Session session, Throwable cause) {}

        @Override
        public void onConsumerClosed(MessageConsumer consumer, Throwable cause) {}

        @Override
        public void onProducerClosed(MessageProducer producer, Throwable cause) {}
    };

    /**
     * 计算签名,password组装方法,请参见AMQP客户端接入说明文档。
     */
    private static String doSign(String toSignString, String secret, String signMethod) throws Exception {
        SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(), signMethod);
        Mac mac = Mac.getInstance(signMethod);
        mac.init(signingKey);
        byte[] rawHmac = mac.doFinal(toSignString.getBytes());
        return Base64.encodeBase64String(rawHmac);

//        return Arrays.toString(Base64.encodeBase64(rawHmac));
    }
}

实测效果:
云平台流转方式的返回结果:
阿里云物联网平台如何订阅异步服务调用的返回结果?

AMQP订阅方式返回结果:
阿里云物联网平台如何订阅异步服务调用的返回结果?

上一篇:hadoop(2): 安装&使用 sqoop


下一篇:Android 技术周报_W3