消费者环境
1. 引入maven依赖
<!--mqtt 相关依赖 start -->
<!--下面几个都必须存在 -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<!--mqtt 相关依赖 end -->
</dependencies>
定义配置类,连接mqtt服务器,并定义channel
package com.mengmeng.mqtt.emqtt.config;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
@Configuration
public class MqttConfig {
public static final String OUTBOUND_CHANNEL = "outboundChannel";
public static final String INBOUND_CHANNEL = "inboundChannel";
public static final String RECEIVED_TOPIC_KEY = "mqtt_receivedTopic";
@Value("${spring.mqtt.client.username}")
private String username;
@Value("${spring.mqtt.client.password}")
private String password;
@Value("${spring.mqtt.client.serverURIs}")
private String[] serverURIs;
@Value("${spring.mqtt.client.clientId}")
private String clientId;
@Value("${spring.mqtt.client.keepAliveInterval}")
private int keepAliveInterval;
@Value("${spring.mqtt.client.connectionTimeout}")
private int connectionTimeout;
@Value("${spring.mqtt.producer.defaultQos}")
private int defaultProducerQos;
@Value("${spring.mqtt.producer.defaultRetained}")
private boolean defaultRetained;
@Value("${spring.mqtt.producer.defaultTopic}")
private String defaultTopic;
/* 客户端 */
@Bean
public MqttConnectOptions getMqttConnectOptions() {
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setUserName(username);
mqttConnectOptions.setPassword(password.toCharArray());
mqttConnectOptions.setServerURIs(serverURIs);
mqttConnectOptions.setKeepAliveInterval(keepAliveInterval);
mqttConnectOptions.setConnectionTimeout(connectionTimeout);
return mqttConnectOptions;
}
@Bean
public MqttPahoClientFactory getMqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(getMqttConnectOptions());
return factory;
}
/* 发布者 */
@Bean
public MessageChannel outboundChannel() {
return new DirectChannel();
}
/**
* 将channel绑定到MqttClientFactory上
* @return
*/
@Bean
@ServiceActivator(inputChannel = OUTBOUND_CHANNEL)
public MessageHandler getMqttProducer() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId + "_producer", getMqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(defaultTopic);
messageHandler.setDefaultRetained(defaultRetained);
messageHandler.setDefaultQos(defaultProducerQos);
return messageHandler;
}
}
定义通用的消息推送接口
package com.mengmeng.mqtt.emqtt.config;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
/**
* 消息推送接口
*/
@MessagingGateway(defaultRequestChannel = MqttConfig.OUTBOUND_CHANNEL)
public interface MqttSender {
/**
* 默认的消息机制
* @param data
*/
void sendToMqtt(String data);
/**
* 发送消息
* @param topic
* @param payload
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
/**
* 发送消息
* @param topic
* @param qos
* @param payload
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}
使用的yml配置
spring:
mqtt:
client:
username: admin
password: public
serverURIs: tcp://10.3.9.23:1883 # url末尾不能用/否则启动报错
clientId: client0001 # ${random.value}
keepAliveInterval: 30
connectionTimeout: 30
producer:
defaultQos: 1
defaultRetained: true
defaultTopic: defaultTopicName
consumer:
defaultQos: 1
completionTimeout: 30000
consumerTopics: topic1,topic2 # 监听的 topic,多个使用逗号隔开
controller里面发送消息
@RestController
public class TestController {
@Autowired
private MqttSender mqttSender;
@RequestMapping("/send")
private void send(String data){
mqttSender.sendToMqtt(data);
}
/**
* 动态增加主题
* @param topic
* @param data
*/
@RequestMapping("/sendToTopic")
private void send(String topic ,String data){
mqttSender.sendToMqtt(topic,data);
}
}