阿里云微服务消息队列(MQTT For IoT)使用Demo

Step By Step

服务使用大图

阿里云微服务消息队列(MQTT For IoT)使用Demo

  • 1、不同设备通过SDK和平台侧建立连接,实现设备与平台侧的交互通信;
  • 2、通过规则流转功能,将设备上报的消息流转到MQ Topic,也可以通过MQ Topic向MQTT Topic下发消息;
  • 3、基于Server端管控API,实现消息的直接下发、设备在线状态查询以及Group的创建。
一、MQTT Server创建(公网区域)

1、创建实例
阿里云微服务消息队列(MQTT For IoT)使用Demo

阿里云微服务消息队列(MQTT For IoT)使用Demo

2、Topic和Group创建
阿里云微服务消息队列(MQTT For IoT)使用Demo

阿里云微服务消息队列(MQTT For IoT)使用Demo

阿里云微服务消息队列(MQTT For IoT)使用Demo

二、设备端Java Code Sample

1、pom.xml

 <dependencies>
            <dependency>
                <groupId>commons-codec</groupId>
                <artifactId>commons-codec</artifactId>
                <version>1.10</version>
            </dependency>
            <dependency>
                <groupId>org.eclipse.paho</groupId>
                <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
                <version>1.2.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.httpcomponents</groupId>
                <artifactId>httpclient</artifactId>
                <version>4.5.2</version>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.48</version>
            </dependency>
            <dependency>
                <groupId>com.aliyun</groupId>
                <artifactId>aliyun-java-sdk-onsmqtt</artifactId>
                <version>1.0.3</version>
            </dependency>
            <dependency>
                <groupId>com.aliyun</groupId>
                <artifactId>aliyun-java-sdk-core</artifactId>
                <version>4.5.0</version>
            </dependency>
        </dependencies>

2、Device Code Sample

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class MQ4IoTSendMessageToMQ4IoTUseSignatureMode {

    public static void main(String[] args) throws Exception {
        /**
         * MQ4IOT 实例 ID,购买后控制台获取
         */
        String instanceId = "post-cn-n6w*********";
        /**
         * 接入点地址,购买 MQ4IOT 实例,且配置完成后即可获取,接入点地址必须填写分配的域名,不得使用 IP 地址直接连接,否则可能会导致客户端异常。
         */
        String endPoint = "post-cn-n6w********.mqtt.aliyuncs.com";
        /**
         * 账号 accesskey,从账号系统控制台获取
         */
        String accessKey = "LTAIOZZg********";
        /**
         * 账号 secretKey,从账号系统控制台获取,仅在Signature鉴权模式下需要设置
         */
        String secretKey = "v7CjUJCMk7j9aK****************";
        /**
         * MQ4IOT clientId,由业务系统分配,需要保证每个 tcp 连接都不一样,保证全局唯一,如果不同的客户端对象(tcp 连接)使用了相同的 clientId 会导致连接异常断开。
         * clientId 由两部分组成,格式为 GroupID@@@DeviceId,其中 groupId 在 MQ4IOT 控制台申请,DeviceId 由业务方自己设置,clientId 总长度不得超过64个字符。
         */
        String clientId = "GID_MQTT_Client1@@@device1";
        /**
         * MQ4IOT 消息的一级 topic,需要在控制台申请才能使用。
         * 如果使用了没有申请或者没有被授权的 topic 会导致鉴权失败,服务端会断开客户端连接。
         */
        final String parentTopic = "MQTT_Topic";
        /**
         * MQ4IOT支持子级 topic,用来做自定义的过滤,此处为示意,可以填写任何字符串,具体参考https://help.aliyun.com/document_detail/42420.html?spm=a2c4g.11186623.6.544.1ea529cfAO5zV3
         * 需要注意的是,完整的 topic 长度不得超过128个字符。
         */
        final String mq4IotTopic = parentTopic + "/" + "testMq4Iot";
        /**
         * QoS参数代表传输质量,可选0,1,2,根据实际需求合理设置,具体参考 https://help.aliyun.com/document_detail/42420.html?spm=a2c4g.11186623.6.544.1ea529cfAO5zV3
         */
        final int qosLevel = 0;
        ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
        final MemoryPersistence memoryPersistence = new MemoryPersistence();
        /**
         * 客户端使用的协议和端口必须匹配,具体参考文档 https://help.aliyun.com/document_detail/44866.html?spm=a2c4g.11186623.6.552.25302386RcuYFB
         * 如果是 SSL 加密则设置ssl://endpoint:8883
         */
        final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);
        /**
         * 客户端设置好发送超时时间,防止无限阻塞
         */
        mqttClient.setTimeToWait(5000);
        final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>());
        mqttClient.setCallback(new MqttCallbackExtended() {
            @Override
            public void connectComplete(boolean reconnect, String serverURI) {
                /**
                 * 客户端连接成功后就需要尽快订阅需要的 topic
                 */
                System.out.println("connect success");
                executorService.submit(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            final String topicFilter[] = {mq4IotTopic};
                            final int[] qos = {qosLevel};
                            mqttClient.subscribe(topicFilter, qos);
                        } catch (MqttException e) {
                            e.printStackTrace();
                        }
                    }
                });
            }

            @Override
            public void connectionLost(Throwable throwable) {
                throwable.printStackTrace();
            }

            @Override
            public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                /**
                 * 消费消息的回调接口,需要确保该接口不抛异常,该接口运行返回即代表消息消费成功。
                 * 消费消息需要保证在规定时间内完成,如果消费耗时超过服务端约定的超时时间,对于可靠传输的模式,服务端可能会重试推送,业务需要做好幂等去重处理。超时时间约定参考限制
                 * https://help.aliyun.com/document_detail/63620.html?spm=a2c4g.11186623.6.546.229f1f6ago55Fj
                 */
                System.out.println(
                        "receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload()));
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]);
            }
        });
        mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
        for (int i = 0; i < 1; i++) {
            MqttMessage message = new MqttMessage("hello mq4Iot pub sub msg".getBytes());
            message.setQos(qosLevel);
            /**
             *  发送普通消息时,topic 必须和接收方订阅的 topic 一致,或者符合通配符匹配规则
             */
            mqttClient.publish(mq4IotTopic, message);
            /**
             * MQ4IoT支持点对点消息,即如果发送方明确知道该消息只需要给特定的一个设备接收,且知道对端的 clientId,则可以直接发送点对点消息。
             * 点对点消息不需要经过订阅关系匹配,可以简化订阅方的逻辑。点对点消息的 topic 格式规范是  {{parentTopic}}/p2p/{{targetClientId}}
             */
            final String p2pSendTopic = parentTopic + "/p2p/" + clientId;
            message = new MqttMessage("hello mq4Iot p2p msg".getBytes());
            message.setQos(qosLevel);
            mqttClient.publish(p2pSendTopic, message);
        }
        Thread.sleep(Long.MAX_VALUE);
    }
}

工具类:util

3、测试效果
阿里云微服务消息队列(MQTT For IoT)使用Demo

4、消息流转轨迹查询
阿里云微服务消息队列(MQTT For IoT)使用Demo

三、规则流转测试

1、MQ创建三个不同类型的Topic
阿里云微服务消息队列(MQTT For IoT)使用Demo

2、创建Group,用于MQ侧消费消息
阿里云微服务消息队列(MQTT For IoT)使用Demo

3、MQTT侧配置流转规则
阿里云微服务消息队列(MQTT For IoT)使用Demo

阿里云微服务消息队列(MQTT For IoT)使用Demo

阿里云微服务消息队列(MQTT For IoT)使用Demo

阿里云微服务消息队列(MQTT For IoT)使用Demo

4、MQ侧代码测试

4.1 pom.xml

        <dependency>
            <groupId>com.aliyun.openservices</groupId>
            <artifactId>ons-client</artifactId>
            <version>1.8.7.1.Final</version>
        </dependency>

4.2 Code Sample

import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Properties;

public class ConsumerTest {
    public static void main(String[] args) {
        Properties properties = new Properties();
        // 您在控制台创建的 Group ID。
        properties.put(PropertyKeyConst.GROUP_ID, "GID_MessageConsumer");
        // AccessKey ID 阿里云身份验证,在阿里云 RAM 控制台创建。
        properties.put(PropertyKeyConst.AccessKey, "LTAIOZZg********");
        // Accesskey Secret 阿里云身份验证,在阿里云服 RAM 控制台创建。
        properties.put(PropertyKeyConst.SecretKey, "v7CjUJCMk7j9aK****************");
        // 设置 TCP 接入域名,进入控制台的实例详情页面的 TCP 协议客户端接入点区域查看。
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "http://MQ_INST_***************_BcLPQ2p0.mq-internet-access.mq-internet.aliyuncs.com:80");
        // 集群订阅方式 (默认)。
        // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
        // 广播订阅方式。
        // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);

        Consumer consumer = ONSFactory.createConsumer(properties);
        //1、订阅设备上行消息
        consumer.subscribe("MessageFromMQTT", "*", new MessageListener() { //订阅多个 Tag。
            public Action consume(Message message, ConsumeContext context) {
                System.out.println("Receive: " + message);
                return Action.CommitMessage;
            }
        });

        //2、订阅设备上下线消息
        consumer.subscribe("DevcieOnlineAndOffline", "*", new MessageListener() { //订阅全部 Tag。
            public Action consume(Message message, ConsumeContext context) {
                System.out.println("Receive: " + message);
                return Action.CommitMessage;
            }
        });

        consumer.start();
        System.out.println("Consumer Started");
    }
}

4.3 测试效果(先启动消费端,然后设备端上行消息)

阿里云微服务消息队列(MQTT For IoT)使用Demo

4.4 通过MQ发送消息到MQTT

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Date;
import java.util.Properties;

public class SendMQMessageToMQTT {

    public static void main(String[] args) {

            Properties properties = new Properties();
            // AccessKeyId 阿里云身份验证,在阿里云用户信息管理控制台获取。
            properties.put(PropertyKeyConst.AccessKey,"LTAIOZZg**********");
            // AccessKeySecret 阿里云身份验证,在阿里云用户信息管理控制台获取。
            properties.put(PropertyKeyConst.SecretKey, "v7CjUJCMk7j9aK****************");
            //设置发送超时时间,单位毫秒。
            properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
            // 设置 TCP 接入域名,进入控制台的实例详情页面的 TCP 协议客户端接入点区域查看。
            properties.put(PropertyKeyConst.NAMESRV_ADDR, "http://MQ_INST_********_BcLPQ2p0.mq-internet-access.mq-internet.aliyuncs.com:80");
            Producer producer = ONSFactory.createProducer(properties);

            // mqttSecondTopic:https://help.aliyun.com/document_detail/112971.html?spm=a2c4g.11186623.6.579.403242ca4pOcpC
            properties.put("mqttSecondTopic","testMq4Iot");

            // 在发送消息前,必须调用 start 方法来启动 Producer,只需调用一次即可。
            producer.start();

            //循环发送消息。
            for (int i = 0; i < 1; i++){
                Message msg = new Message("MessageToMQTT","","MQ Message To MQTT".getBytes());
                msg.setKey("ORDERID_" + i);
                msg.setUserProperties(properties);

                try {
                    SendResult sendResult = producer.send(msg);
                    // 同步发送消息,只要不抛异常就是成功。
                    if (sendResult != null) {
                        System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
                    }
                }
                catch (Exception e) {
                    // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
                    System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
                    e.printStackTrace();
                }
            }

            // 在应用退出前,销毁 Producer 对象。
            // 注意:如果不销毁也没有问题。
            producer.shutdown();
        }

    }

4.5 The Result

阿里云微服务消息队列(MQTT For IoT)使用Demo

阿里云微服务消息队列(MQTT For IoT)使用Demo

4.6 消息轨迹查询

阿里云微服务消息队列(MQTT For IoT)使用Demo

四、MQTT云端API测试

1、pom.xml

        <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>aliyun-java-sdk-core</artifactId>
            <version>4.5.6</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>aliyun-java-sdk-onsmqtt</artifactId>
            <version>1.0.4</version>
        </dependency>

2、发送消息Code Sample

import com.aliyuncs.DefaultAcsClient;
import com.aliyuncs.IAcsClient;
import com.aliyuncs.exceptions.ClientException;
import com.aliyuncs.exceptions.ServerException;
import com.aliyuncs.profile.DefaultProfile;
import com.google.gson.Gson;
import com.aliyuncs.onsmqtt.model.v20200420.*;

public class SendMessage {

    public static void main(String[] args) {
        DefaultProfile profile = DefaultProfile.getProfile("mq-internet-access", "LTAIOZZg********", "v7CjUJCMk7j9aK****************");
        IAcsClient client = new DefaultAcsClient(profile);

        SendMessageRequest request = new SendMessageRequest();
        request.setRegionId("mq-internet-access");
        request.setInstanceId("post-cn-n6w********");
        request.setPayload("message from manager api!");
        request.setMqttTopic("MQTT_Topic/testMq4Iot");

        try {
            SendMessageResponse response = client.getAcsResponse(request);
            System.out.println(new Gson().toJson(response));
        } catch (ServerException e) {
            e.printStackTrace();
        } catch (ClientException e) {
            System.out.println("ErrCode:" + e.getErrCode());
            System.out.println("ErrMsg:" + e.getErrMsg());
            System.out.println("RequestId:" + e.getRequestId());
        }
    }
}

3、测试效果
阿里云微服务消息队列(MQTT For IoT)使用Demo
阿里云微服务消息队列(MQTT For IoT)使用Demo

4、查询设备状态Code Sample

import com.aliyuncs.DefaultAcsClient;
import com.aliyuncs.IAcsClient;
import com.aliyuncs.exceptions.ClientException;
import com.aliyuncs.exceptions.ServerException;
import com.aliyuncs.profile.DefaultProfile;
import com.google.gson.Gson;
import com.aliyuncs.onsmqtt.model.v20200420.*;

public class QuerySessionByClientId {

    public static void main(String[] args) {
        DefaultProfile profile = DefaultProfile.getProfile("mq-internet-access", "LTAIOZZg********", "v7CjUJCMk7j9aK****************");
        IAcsClient client = new DefaultAcsClient(profile);

        QuerySessionByClientIdRequest request = new QuerySessionByClientIdRequest();
        request.setRegionId("mq-internet-access");
        request.setInstanceId("post-cn-n6w********");
        request.setClientId("GID_MQTT_Client1@@@device1");

        try {
            QuerySessionByClientIdResponse response = client.getAcsResponse(request);
            System.out.println(new Gson().toJson(response));
        } catch (ServerException e) {
            e.printStackTrace();
        } catch (ClientException e) {
            System.out.println("ErrCode:" + e.getErrCode());
            System.out.println("ErrMsg:" + e.getErrMsg());
            System.out.println("RequestId:" + e.getRequestId());
        }

    }
}

5、测试效果
阿里云微服务消息队列(MQTT For IoT)使用Demo

更多参考

QuerySessionByClientId
快速使用 MQTT 的 Java SDK 收发消息(跨产品数据流入)
MQ发送普通消息
MQ订阅消息
阿里云微服务消息队列Token C# 设备端示例Demo
阿里云微服务消息队列Token Java Code Sample
阿里云微服务消息队列Token C# Code Sample

上一篇:基于Android MPAndroidChart实现腾讯QQ群数据统计报表核心功能


下一篇:阿里云机器学习PAI-ModelHub公共模型部署与Python调用示例