引言
提到 RabbitMq,想必大家都不陌生,我们经常使用它来进行服务之间的异步通信,如短信,日志,邮件发送等业务场景都可以使用 RabbitMq 实现,但在使用的过程中我们会遇到消息丢失的问题,也会有延迟消费消息的需求,相信通过本文你会找到自己想要的答案,下面就让我们一起来看一下吧。
消息可靠性
一条消息从发送到接收,在 RabbitMq 中会经历如下过程,如图:
上图中的每一步都可能存在消息丢失的情况,消息丢失原因包括以下几种:
- 发送时丢失:消息在发往 exchange 的过程中丢失;消息成功到达 exchange,但在发往 queue 的过程中丢失
- mq 服务宕机,queue 中的消息丢失
- 消费者成功接收消息,消息未消费完,服务宕机导致消息丢失
RabbitMq 针对以上问题,提供了自己的解决方案,如下:
- 生产者确认机制
- 消息持久化存储
- 消费者确认机制
- 失败重试机制
通过以上机制我们便可以解决消息丢失的问题,保证消息的可靠性,下面我们通过一个案例来演示上述四种机制(需要提前搭建好一个 RabbitMq 服务,不赘述),实现消息的可靠性传输。
1. 项目搭建
首先创建 pom 父工程 mq-advanced-demo,pom.xml 配置如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>mq.demo</groupId>
<artifactId>mq-advanced-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<modules>
<module>publisher</module>
<module>consumer</module>
</modules>
<packaging>pom</packaging>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.9.RELEASE</version>
<relativePath/>
</parent>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--单元测试-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
</dependencies>
</project>
然后分别创建子工程 publisher 和 consumer:
publisher 的 pom.xml:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>mq-advanced-demo</artifactId>
<groupId>mq.demo</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>publisher</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
</project>
consumer 的 pom.xml:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>mq-advanced-demo</artifactId>
<groupId>mq.demo</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>consumer</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
</project>
整体项目结构如下:
至此,项目模型基本搭建完毕(项目完整代码可于文末获取)。
2. 生产者确认机制
RabbitMQ 提供了生产者确认机制来避免消息发送到 MQ 过程中丢失。这种机制必须给每个消息指定一个唯一 ID。消息发送到 MQ 之后,会返回结果给发送者,表示消息是否处理成功。
返回结果有以下两种:
- publisher-confirm:发送者确认:消息成功投递到交换机,返回 ack;消息未投递到交换机,返回 nack
- publisher-return:发送者回执:消息成功投递到交换机,但是没有路由到队列。返回 ack,及路由失败原因
图示如下:
下面我们来进行编码实现,首先,修改 publisher 服务中的 application.yml,添加以下内容:
logging:
pattern:
dateformat: HH:mm:ss
level:
mq: debug
spring:
rabbitmq:
# rabbitMQ的ip地址
host: 127.0.0.1
# 端口
port: 5672
# 集群模式配置
# addresses: 127.0.0.1:8071, 127.0.0.1:8072, 127.0.0.1:8073
username: admin
password: 123456
virtual-host: /
# 消费者确认机制相关配置
# 开启publisher-confirm,
# 这里支持两种类型:simple:同步等待confirm结果,直到超时;
# correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
publisher-confirm-type: correlated
# publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback
publisher-returns: true
# 定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息
template:
mandatory: true
然后在代码中定义 ReturnCallback 回调,每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目加载时配置,在 congfig 包下创建 CommonConfig 类:
package mq.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
// 获取RabbitTemplate对象
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
// 配置ReturnCallback
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
// 记录日志
log.error("消息发送到队列失败,响应码:{}, 失败原因:{}, 交换机: {}, 路由key:{}, 消息: {}",
replyCode, replyText, exchange, routingKey, message.toString());
// 如果有需要的话,重发消息
});
}
}
接着定义 ConfirmCallback,ConfirmCallback 可以在发送消息时指定,因为每个业务处理 confirm 成功或失败的逻辑不一定相同。提前在 mq 控制台创建好 exchange 和 queue,在 publisher 服务下的 test 包下创建 SpringAmqpTest 类:
package mq.spring;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSendMessage2SimpleQueue() throws InterruptedException {
// 1.准备消息
String message = "hello, spring amqp!";
// 2.准备CorrelationData
// 2.1.消息ID
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 2.2.准备ConfirmCallback
correlationData.getFuture().addCallback(result -> {
// 判断结果
if (result.isAck()) {
// ACK
log.debug("消息成功投递到交换机!消息ID: {}", correlationData.getId());
} else {
// NACK
log.error("消息投递到交换机失败!消息ID:{},原因:{}", correlationData.getId(), result.getReason());
// 重发消息
}
}, ex -> {
// 记录日志
log.error("消息发送异常, ID:{}, 原因{}", correlationData.getId(), ex.getMessage());
// 可以重发消息
});
// 3.发送消息 需创建 mqtest 交换机,并绑定一个 routingKey 为 mqtest_queue 的队列,可根据实际情况修改
rabbitTemplate.convertAndSend("mqtest", "mqtest_queue", message, correlationData);
// 休眠一会儿,等待ack回执
Thread.sleep(2000);
}
}
执行测试类,打印 log 如下:
异常情况大家可以自行测试。
3. 消息持久化存储
生产者确认机制可以确保消息投递到 RabbitMQ 的队列中,但是消息发送到 RabbitMQ 以后,如果突然宕机,也可能导致消息丢失。
要想确保消息在 RabbitMQ 中安全保存,必须开启消息持久化机制,即交换机持久化,队列持久化,消息持久化。
默认情况下,springAmqp 声明的交换机,队列,消息都是持久化的,并不需要我们特意指定,即 Durability 属性都为 Durable。
4. 消费者确认机制
RabbitMQ 是阅后即焚机制,RabbitMQ 确认消息被消费者消费后会立刻删除。
RabbitMQ 是通过消费者回执来确认消费者是否成功处理消息的:消费者获取消息后,向 RabbitMQ 发送 ACK 回执,表明自己已经处理消息。
SpringAmqp 则允许配置三种确认模式:
- manual:手动 ack,需要在业务代码结束后,调用 api 发送 ack,虽灵活但会提高编码复杂度。
- auto:自动 ack,没有异常则返回 ack;抛出异常则返回 nack,消息重新入队,一直到没有异常为止,也可以设置最大重试次数,超过次数后发送到专门收集错误消息的队列进一步处理
- none:关闭ack,MQ 假定消费者获取消息后会成功处理,因此消息投递后立即被删除(消息投递是不可靠的,可能丢失)
一般情况下,我们选择使用 auto 模式即可,接下来进行编码实现。
向 consumer 服务的 application.yml 添加以下内容:
logging:
pattern:
dateformat: HH:mm:ss
level:
mq: debug
spring:
rabbitmq:
#rabbitMQ的ip地址
host: 127.0.0.1
#端口
port: 5672
#集群模式配置
#addresses: 127.0.0.1:8071, 127.0.0.1:8072, 127.0.0.1:8073
username: admin
password: 123456
virtual-host: /
listener:
simple:
prefetch: 1
acknowledge-mode: auto
在 listener 包下添加 SpringRabbitListener 类:
package mq.listener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "mqtest_queue")
public void listenSimpleQueue(String msg) {
log.debug("消费者接收到mqtest_queue的消息:【" + msg + "】");
//System.out.println(1 / 0);
log.info("消费者处理消息成功!");
}
}
启动服务,消息成功被消费:
但如果处理消息的过程中出现异常,消息会不断重新入队,不断消费,导致 cpu 飙升,这个时候便需要结合失败重试机制来对消息进行处理。
5. 失败重试机制
我们可以利用 Spring 的 retry 机制,在消费者出现异常时进行本地重试,而不是无限制的 requeue 到 mq 队列,修改 consumer 服务的 application.yml 文件:
logging:
pattern:
dateformat: HH:mm:ss
level:
mq: debug
spring:
rabbitmq:
#rabbitMQ的ip地址
host: 127.0.0.1
#端口
port: 5672
#集群模式配置
#addresses: 127.0.0.1:8071, 127.0.0.1:8072, 127.0.0.1:8073
username: admin
password: 123456
virtual-host: /
listener:
simple:
prefetch: 1
acknowledge-mode: auto
# 失败重试
retry:
# 开启消费者失败重试
enabled: true
# 初始的失败等待时长为1秒
initial-interval: 1000
# 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
multiplier: 3
# 最大重试次数
max-attempts: 4
# true无状态;false有状态。如果业务中包含事务,这里改为false
stateless: true
接下来重新向队列中发送消息,故意让监听代码产生异常,启动 consumer 服务,打印 log 如下:
如上图,消息重试了 4 次最终抛出异常,但该消息同样被 mq 删除了,所以我们会得到以下结论:
- 开启本地重试时,消息处理过程中抛出异常,不会 requeue 到队列,而是在消费者本地重试
- 重试达到最大次数后,Spring会返回ack,消息会被丢弃
在开启重试模式后,重试次数耗尽,如果消息依然失败,为了防止消息被直接丢弃,需要有MessageRecovery 接口来处理,它包含三种不同的实现:
- RejectAndDontRequeueRecoverer:重试耗尽后,直接 reject,丢弃消息。默认就是这种方式
- ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
- RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
很显然,RepublishMessageRecoverer 方式更为友好,失败后可以将消息投递到一个指定的,专门存放异常消息的队列,后续集中处理。
在 consumer 服务中定义处理失败消息的交换机和队列:
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}
定义一个 RepublishMessageRecoverer,关联队列和交换机:
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
重启 consumer 服务,可以发现错误消息被成功发到了指定的队列:
6. 总结
通过以上案例分析,我们可以总结出以下几点来确保 RabbitMq 的消息可靠性传输:
- 开启生产者确认机制,确保生产者的消息能到达队列
- 开启持久化功能,确保消息未消费前在队列中不会丢失
- 开启消费者确认机制为 auto,由 spring 确认消息处理成功后完成 ack
- 开启消费者失败重试机制,并设置 MessageRecoverer,多次重试失败后将消息投递到异常交换机,后续集中处理
实现延迟队列
当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):
- 消费者使用 basic.reject 或 basic.nack 声明消费失败,并且消息的 requeue 参数设置为 false
- 消息是一个过期消息,超时无人消费
- 要投递的队列消息满了,无法投递
如果这个包含死信的队列配置了 dead-letter-exchange 属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称为死信交换机(Dead Letter Exchange,简称DLX)。
如果同时配置了 dead-letter-routing-key 参数,则消息会由死信交换机发送到指定的队列,也就是死信队列。
接下来在 consumer 服务中,定义一组死信交换机,死信队列:
@Bean
public DirectExchange dlExchange() {
// 声明死信交换机 dl.direct
return new DirectExchange("dl.direct", true, false);
}
@Bean
public Queue dlQueue() {
// 声明存储死信的队列 dl.queue
return new Queue("dl.queue", true);
}
@Bean
public Binding dlBinding() {
// 将死信队列 与 死信交换机绑定
return BindingBuilder.bind(dlQueue()).to(dlExchange()).with("dl");
}
为了实现消息的延迟消费,我们需要再定义一组接收超时消息的交换机和队列并把两者绑定,设置超时时间为 10s:
@Bean
public DirectExchange ttlExchange(){
return new DirectExchange("ttl.direct");
}
@Bean
public Queue ttlQueue(){
return QueueBuilder.durable("ttl.queue")
// 设置队列的超时时间,10秒
.ttl(10000)
.deadLetterExchange("dl.direct")
.deadLetterRoutingKey("dl")
.build();
}
@Bean
public Binding ttlBinding(){
return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");
}
监听超时死信队列:
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "dl.queue", durable = "true"),
exchange = @Exchange(name = "dl.direct"),
key = "dl"
))
public void listenDlQueue(String msg) {
log.info("消费者接收到了dl.queue 的延迟消息:{}", msg);
}
在 publisher 服务测试发送消息:
@Test
public void testTTLMessage() throws InterruptedException {
// 1.准备消息
Message message = MessageBuilder
.withBody("hello, ttl messsage".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.build();
// 2.发送消息
rabbitTemplate.convertAndSend("ttl.direct", "ttl", message);
// 3.记录日志
log.info("消息已经成功发送!");
}
重新启动 consumer 服务,执行测试代码,查看 log:
查看 consumer 服务 的 log:
通过对比消息发送和消费的时间,发现二者相差 10s,跟我们设置的队列超时时间相同,延迟消息得以实现。
我们也可以在发送消息的时候设置消息过期时间:
@Test
public void testTTLMessage() throws InterruptedException {
// 1.准备消息
Message message = MessageBuilder
.withBody("hello, ttl messsage".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
//设置超时时间为5s
.setExpiration("5000")
.build();
// 2.发送消息
rabbitTemplate.convertAndSend("ttl.direct", "ttl", message);
// 3.记录日志
log.info("消息已经成功发送!");
}
执行测试代码,查看 log:
查看 consumer 服务 的 log:
可以看到,发送和消费的时间相差 5s,跟我们设置的消息超时时间相同。
下面列举几种延迟队列的使用场景:
- 延迟发送短信
- 用户下单,如果用户在一小时内未支付,自动取消
- 预约会议,半小时后通知所有参会人员
当然,这些我们也可以使用定时任务来实现,只是会徒增许多无意义的查询,数据量大的话会比较耗费性能。
总结一下,设置消息超时主要有以下两种方式:
- 给队列设置ttl属性,进入队列后超过ttl时间的消息变为死信
- 给消息设置ttl属性,队列接收到消息超过ttl时间后变为死信
实现延迟消费的步骤:
- 创建一个交换机作为死信交换机并绑定一个队列作为死信队列
- 给消息的目标队列设置队列超时时间并指定死信交换机和路由 key
- 将消息的目标队列绑定到死信交换机
- 消费者监听死信队列获取超时消息
结语
本文详细讲解了 RabbitMq 的消息可靠性以及延迟队列的解决方案,在平时的开发工作中,我们可以结合具体业务场景来选择使用,希望可以对你有所帮助,我们下次更新,再见!
关注公众号螺旋编程极客
发送mq
可获取本文源码资料。