1 MQTT简介
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议。
MQTT官网: http://mqtt.org
2 mqtt工作原理
mqtt分为3种角色:
1 发布者(Publish)、
2 代理(Broker)(服务器)、
3 订阅者(Subscribe)
就像以前我们订阅天气预报的短信,移动营业厅就是服务器,
- 当有新的天气预报时,气象局就会通知移动营业厅;
- 客户到移动营业厅办理天气预报的短信业务,当有新的天气预报时,就会收到移动营业厅推动的信息。
3 springboot集成mqtt
这里mqtt Broker用的mosquitto, mqtt客户端测试工具mqttBox。
第1步:POM文件引入:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
查看spring-integration-mqt的POM文件。
从下面可以看出,spring-integration-mqtt已经引入了org.eclipse.paho.client.mqttv3,所以我们没有必要再引入org.eclipse.paho.client.mqttv3了。
重要的事情说三遍:
*** 没有必要再引入org.eclipse.paho.client.mqttv3
*** 没有必要再引入org.eclipse.paho.client.mqttv3
*** 没有必要再引入org.eclipse.paho.client.mqttv3
第2步:配置application.properties:
#自定义应用开关
mqtt.enabled=true
# 连接地址:
mqtt.serverURIs=tcp://127.0.0.1:1883
# 用户名
mqtt.username=admin
# 密码
mqtt.password=123456
# 心跳
mqtt.connectTimeout=20
# 心跳
mqtt.keep-alive-interval=20
# MQTT 生产者
# 连接服务器默认客户端ID
mqtt.outChannel.clientId=mqttPublish
# 默认的推送主题,实际可在调用接口时指定
mqtt.outChannel.defaultTopic=air
# MQTT 消费者
# 连接服务器默认客户端ID
mqtt.inChannel.clientId=mqttSubscribe
# 默认的接收主题,可以订阅多个Topic,逗号分隔
mqtt.inChannel.defaultTopic=weather
第3步:mqtt的注解生成。主要包括以下4个类:
1) MqttBaseConfig
配置MQTT客户端工厂类DefaultMqttPahoClientFactory
2) MqttInConfig
配置Outbound入站,包括:消息通道MessageChannel、消息适配器
MqttPahoMessageDrivenChannelAdapter和消息处理器MessageHandler
3) MqttOutConfig
配置Outbound出站,包括:出站通道 适配器
4) MqttPublisher
mqtt消息发布器。
MqttBaseConfig的代码如下:
@Configuration
public class MqttBaseConfig {
@Value("${mqtt.serverURIs}")
private String[] serverURIs;
@Value("${mqtt.username}")
private String username;
@Value("${mqtt.password}")
private char[] password;
@Value("${mqtt.connectTimeout}")
private int connectTimeout;
@Value("${mqtt.keep-alive-interval}")
private int keepAliveInterval;
@Bean
public MqttPahoClientFactory mqttClientFactory(){
DefaultMqttPahoClientFactory defaultMqttPahoClientFactory = new DefaultMqttPahoClientFactory();
// connection参数
MqttConnectOptions connectOptions = new MqttConnectOptions();
connectOptions.setServerURIs(serverURIs);
connectOptions.setUserName(username);
connectOptions.setPassword(password);
connectOptions.setCleanSession(true);
connectOptions.setConnectionTimeout(connectTimeout);
connectOptions.setKeepAliveInterval(keepAliveInterval);
defaultMqttPahoClientFactory.setConnectionOptions(connectOptions);
return defaultMqttPahoClientFactory;
}
}
MqttInConfig的代码如下:
@Configuration
@ConditionalOnProperty(value = "mqtt.enabled", havingValue = "true")
@Slf4j
public class MqttInConfig {
@Value("${mqtt.inChannel.clientId}")
private String inChannelClientId;
@Value("${mqtt.inChannel.defaultTopic}")
private String topic;
/**
* mqtt消息入站通道
*
* @return
*/
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer mqttSubscribe(MqttPahoClientFactory factory) {
String clientId = inChannelClientId + System.currentTimeMillis();
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId, factory, topic);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setOutputChannel(mqttInputChannel());
adapter.setCompletionTimeout(5000);
adapter.setQos(2);
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler mqttMessageHandler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
String topic = String.valueOf(message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC));
String payload = String.valueOf(message.getPayload());
log.info("接收到 mqtt消息,主题:{} 消息:{}", topic, payload);
}
};
}
}
MqttOutConfig的代码如下:
@Configuration
@ConditionalOnProperty(value = "mqtt.enabled", havingValue = "true")
public class MqttOutConfig {
@Value("${mqtt.outChannel.clientId}")
private String outChannelClientId;
/**
* mqtt出站消息,用于发送出站消息
* @return
*/
@Bean
public MessageChannel mqttPublishChannel(){
return new DirectChannel();
}
/**
* mqtt消息出站通道的设置
* @param mqttMessageConverter
* @return
*/
@Bean
@ServiceActivator(inputChannel = "mqttOutputChannel")
public MessageHandler mqttOutbound(MqttPahoClientFactory factory) {
String clientId = outChannelClientId + System.currentTimeMillis();;
MqttPahoMessageHandler mqttPahoMessageHandler = new MqttPahoMessageHandler(clientId, factory);
mqttPahoMessageHandler.setConverter(new DefaultPahoMessageConverter());
mqttPahoMessageHandler.setCompletionTimeout(50);
mqttPahoMessageHandler.setAsync(true);
return mqttPahoMessageHandler;
}
}
MqttPublisher的代码如下:
@Component
@ConditionalOnProperty(value = "mqtt.enabled", havingValue = "true")
@MessagingGateway(defaultRequestChannel = "mqttOutputChannel")
public interface MqttPublisher {
/**
* 向默认的topic发送mqtt消息
* @param payload
*/
void sendMessage(String payload);
/**
* 向指定的topic发送mqtt消息
* @param topic
* @param payload
*/
void sendMessage(@Header(MqttHeaders.TOPIC) String topic, String payload);
/**
* 向指定的topic发送mqtt消息,并指定服务质量参数
* @param topic
* @param qos
* @param payload
*/
void sendMessage(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}
4 测试验证
4.1 订阅验证
MqttInConfig类会定义MessageHandler的类,实现handleMessage接口,会收到消息:
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler mqttMessageHandler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
String topic = String.valueOf(message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC));
String payload = String.valueOf(message.getPayload());
log.info("接收到 mqtt消息,主题:{} 消息:{}", topic, payload);
}
};
}
MqttBox发布一个topic:weather;内容为{‘temprature’:‘7℃’}
可以看到springboot应用程序控制台 输出一条订阅的消息。
4.2 发布验证
我们定义一个http api来触发发布一个触pringboot发布topic:air。
@RequestMapping(value = "/publish", method = RequestMethod.GET)
public void publish(@RequestParam(value = "topic") String topic,
@RequestParam(value = "message") String message) {
mqttService.sendMqttMessage(topic, message);
}
Postman发送http get请求来
http://127.0.0.1:8080/mqtt/publish?topic=air&message=PM=2.5
MqttBox订阅一个topic:air,可以看到收到订阅的消息。