springboot simple_6 springboot mqtt

1 MQTT简介

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议。
MQTT官网: http://mqtt.org

2 mqtt工作原理

springboot simple_6 springboot mqtt
mqtt分为3种角色:
1 发布者(Publish)、
2 代理(Broker)(服务器)、
3 订阅者(Subscribe)
就像以前我们订阅天气预报的短信,移动营业厅就是服务器,

  1. 当有新的天气预报时,气象局就会通知移动营业厅;
  2. 客户到移动营业厅办理天气预报的短信业务,当有新的天气预报时,就会收到移动营业厅推动的信息。

3 springboot集成mqtt

这里mqtt Broker用的mosquitto, mqtt客户端测试工具mqttBox。

第1步:POM文件引入:

<dependency>
   <groupId>org.springframework.integration</groupId>
   <artifactId>spring-integration-mqtt</artifactId>
</dependency>

查看spring-integration-mqt的POM文件。
springboot simple_6 springboot mqtt
从下面可以看出,spring-integration-mqtt已经引入了org.eclipse.paho.client.mqttv3,所以我们没有必要再引入org.eclipse.paho.client.mqttv3了。
重要的事情说三遍:
*** 没有必要再引入org.eclipse.paho.client.mqttv3
*** 没有必要再引入org.eclipse.paho.client.mqttv3
*** 没有必要再引入org.eclipse.paho.client.mqttv3

第2步:配置application.properties:

#自定义应用开关
mqtt.enabled=true
# 连接地址:
mqtt.serverURIs=tcp://127.0.0.1:1883
# 用户名
mqtt.username=admin
# 密码
mqtt.password=123456
# 心跳
mqtt.connectTimeout=20
# 心跳
mqtt.keep-alive-interval=20
#  MQTT 生产者
# 连接服务器默认客户端ID
mqtt.outChannel.clientId=mqttPublish
# 默认的推送主题,实际可在调用接口时指定
mqtt.outChannel.defaultTopic=air
#  MQTT 消费者
# 连接服务器默认客户端ID
mqtt.inChannel.clientId=mqttSubscribe
# 默认的接收主题,可以订阅多个Topic,逗号分隔
mqtt.inChannel.defaultTopic=weather

第3步:mqtt的注解生成。主要包括以下4个类:
1) MqttBaseConfig
配置MQTT客户端工厂类DefaultMqttPahoClientFactory
2) MqttInConfig
配置Outbound入站,包括:消息通道MessageChannel、消息适配器
MqttPahoMessageDrivenChannelAdapter和消息处理器MessageHandler
3) MqttOutConfig
配置Outbound出站,包括:出站通道 适配器
4) MqttPublisher
mqtt消息发布器。

MqttBaseConfig的代码如下:

@Configuration
public class MqttBaseConfig {
    @Value("${mqtt.serverURIs}")
    private String[] serverURIs;

    @Value("${mqtt.username}")
    private String username;

    @Value("${mqtt.password}")
    private  char[] password;

    @Value("${mqtt.connectTimeout}")
    private int connectTimeout;

    @Value("${mqtt.keep-alive-interval}")
    private int keepAliveInterval;


    @Bean
    public MqttPahoClientFactory mqttClientFactory(){
        DefaultMqttPahoClientFactory defaultMqttPahoClientFactory = new DefaultMqttPahoClientFactory();
        // connection参数
        MqttConnectOptions connectOptions = new MqttConnectOptions();
        connectOptions.setServerURIs(serverURIs);
        connectOptions.setUserName(username);
        connectOptions.setPassword(password);
        connectOptions.setCleanSession(true);
        connectOptions.setConnectionTimeout(connectTimeout);
        connectOptions.setKeepAliveInterval(keepAliveInterval);
        defaultMqttPahoClientFactory.setConnectionOptions(connectOptions);

        return defaultMqttPahoClientFactory;
    }
}

MqttInConfig的代码如下:

@Configuration
@ConditionalOnProperty(value = "mqtt.enabled", havingValue = "true")
@Slf4j
public class MqttInConfig {
    @Value("${mqtt.inChannel.clientId}")
    private String inChannelClientId;

    @Value("${mqtt.inChannel.defaultTopic}")
    private String topic;

    /**
     * mqtt消息入站通道
     *
     * @return
     */
    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();

    }

    @Bean
    public MessageProducer mqttSubscribe(MqttPahoClientFactory factory) {
        String clientId = inChannelClientId + System.currentTimeMillis();
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId, factory, topic);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setOutputChannel(mqttInputChannel());
        adapter.setCompletionTimeout(5000);
        adapter.setQos(2);
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler mqttMessageHandler() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                String topic = String.valueOf(message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC));
                String payload = String.valueOf(message.getPayload());
                log.info("接收到 mqtt消息,主题:{} 消息:{}", topic, payload);
            }
        };
    }
}

MqttOutConfig的代码如下:

@Configuration
@ConditionalOnProperty(value = "mqtt.enabled", havingValue = "true")
public class MqttOutConfig {
    @Value("${mqtt.outChannel.clientId}")
    private String outChannelClientId;


    /**
     * mqtt出站消息,用于发送出站消息
     * @return
     */
    @Bean
    public MessageChannel mqttPublishChannel(){
        return new DirectChannel();
    }

    /**
     * mqtt消息出站通道的设置
     * @param mqttMessageConverter
     * @return
     */
    @Bean
    @ServiceActivator(inputChannel = "mqttOutputChannel")
    public MessageHandler mqttOutbound(MqttPahoClientFactory factory) {
        String clientId = outChannelClientId + System.currentTimeMillis();;
        MqttPahoMessageHandler mqttPahoMessageHandler = new MqttPahoMessageHandler(clientId, factory);
        mqttPahoMessageHandler.setConverter(new DefaultPahoMessageConverter());
        mqttPahoMessageHandler.setCompletionTimeout(50);
        mqttPahoMessageHandler.setAsync(true);
        return mqttPahoMessageHandler;
    }
}

MqttPublisher的代码如下:

@Component
@ConditionalOnProperty(value = "mqtt.enabled", havingValue = "true")
@MessagingGateway(defaultRequestChannel = "mqttOutputChannel")
public interface MqttPublisher {
    /**
     * 向默认的topic发送mqtt消息
     * @param payload
     */
    void sendMessage(String payload);

    /**
     * 向指定的topic发送mqtt消息
     * @param topic
     * @param payload
     */
    void sendMessage(@Header(MqttHeaders.TOPIC) String topic, String payload);

    /**
     * 向指定的topic发送mqtt消息,并指定服务质量参数
     * @param topic
     * @param qos
     * @param payload
     */
    void sendMessage(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);

}

4 测试验证

4.1 订阅验证

MqttInConfig类会定义MessageHandler的类,实现handleMessage接口,会收到消息:

    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler mqttMessageHandler() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                String topic = String.valueOf(message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC));
                String payload = String.valueOf(message.getPayload());
                log.info("接收到 mqtt消息,主题:{} 消息:{}", topic, payload);
            }
        };
    }

MqttBox发布一个topic:weather;内容为{‘temprature’:‘7℃’}
springboot simple_6 springboot mqtt
可以看到springboot应用程序控制台 输出一条订阅的消息。
springboot simple_6 springboot mqtt

4.2 发布验证

我们定义一个http api来触发发布一个触pringboot发布topic:air。

    @RequestMapping(value = "/publish", method = RequestMethod.GET)
    public void publish(@RequestParam(value = "topic") String topic,
                      @RequestParam(value = "message") String message) {
        mqttService.sendMqttMessage(topic, message);
    }

Postman发送http get请求来
http://127.0.0.1:8080/mqtt/publish?topic=air&message=PM=2.5
springboot simple_6 springboot mqtt

MqttBox订阅一个topic:air,可以看到收到订阅的消息。
springboot simple_6 springboot mqtt

上一篇:a simple note


下一篇:python 快速万能同步转异步语法