mqtt -2. 消息生产者

消费者环境

1. 引入maven依赖

  <!--mqtt 相关依赖 start -->
        <!--下面几个都必须存在 -->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-core</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>
        <!--mqtt 相关依赖 end -->
    </dependencies>

定义配置类,连接mqtt服务器,并定义channel

package com.mengmeng.mqtt.emqtt.config;

import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

@Configuration
public class MqttConfig {

    public static final String OUTBOUND_CHANNEL = "outboundChannel";
    public static final String INBOUND_CHANNEL = "inboundChannel";

    public static final String RECEIVED_TOPIC_KEY = "mqtt_receivedTopic";

    @Value("${spring.mqtt.client.username}")
    private String username;
    @Value("${spring.mqtt.client.password}")
    private String password;
    @Value("${spring.mqtt.client.serverURIs}")
    private String[] serverURIs;
    @Value("${spring.mqtt.client.clientId}")
    private String clientId;
    @Value("${spring.mqtt.client.keepAliveInterval}")
    private int keepAliveInterval;
    @Value("${spring.mqtt.client.connectionTimeout}")
    private int connectionTimeout;

    @Value("${spring.mqtt.producer.defaultQos}")
    private int defaultProducerQos;
    @Value("${spring.mqtt.producer.defaultRetained}")
    private boolean defaultRetained;
    @Value("${spring.mqtt.producer.defaultTopic}")
    private String defaultTopic;

    /* 客户端 */
    @Bean
    public MqttConnectOptions getMqttConnectOptions() {
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        mqttConnectOptions.setUserName(username);
        mqttConnectOptions.setPassword(password.toCharArray());
        mqttConnectOptions.setServerURIs(serverURIs);
        mqttConnectOptions.setKeepAliveInterval(keepAliveInterval);
        mqttConnectOptions.setConnectionTimeout(connectionTimeout);
        return mqttConnectOptions;
    }

    @Bean
    public MqttPahoClientFactory getMqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(getMqttConnectOptions());
        return factory;
    }

    /* 发布者 */
    @Bean
    public MessageChannel outboundChannel() {
        return new DirectChannel();
    }

    /**
     * 将channel绑定到MqttClientFactory上
     * @return
     */
    @Bean
    @ServiceActivator(inputChannel = OUTBOUND_CHANNEL)
    public MessageHandler getMqttProducer() {
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId + "_producer", getMqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic(defaultTopic);
        messageHandler.setDefaultRetained(defaultRetained);
        messageHandler.setDefaultQos(defaultProducerQos);
        return messageHandler;
    }

}

定义通用的消息推送接口

package com.mengmeng.mqtt.emqtt.config;

import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;

/**
 * 消息推送接口
 */
@MessagingGateway(defaultRequestChannel = MqttConfig.OUTBOUND_CHANNEL)
public interface MqttSender {

    /**
     * 默认的消息机制
     * @param data
     */
    void sendToMqtt(String data);

    /**
     * 发送消息
     * @param topic
     * @param payload
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);

    /**
     * 发送消息
     * @param topic
     * @param qos
     * @param payload
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}

使用的yml配置

spring:
  mqtt:
    client:
      username: admin
      password: public
      serverURIs: tcp://10.3.9.23:1883  # url末尾不能用/否则启动报错
      clientId: client0001 # ${random.value}
      keepAliveInterval: 30
      connectionTimeout: 30
    producer:
      defaultQos: 1
      defaultRetained: true
      defaultTopic: defaultTopicName
    consumer:
      defaultQos: 1
      completionTimeout: 30000
      consumerTopics: topic1,topic2 # 监听的 topic,多个使用逗号隔开

controller里面发送消息

@RestController
public class TestController {

    @Autowired
    private MqttSender mqttSender;

    @RequestMapping("/send")
    private void send(String data){
        mqttSender.sendToMqtt(data);
    }

    /**
     * 动态增加主题
     * @param topic
     * @param data
     */
    @RequestMapping("/sendToTopic")
    private void send(String topic ,String data){
        mqttSender.sendToMqtt(topic,data);
    }
}

上一篇:响应式web mqtt应用(siot、mqtt、掌控板)


下一篇:SIoT:App Inventor控制掌控板