1:引言
如果保证消息的可靠性?需要解决如下问题
问题1:生产者能百分之百将消息发送给消息队列!
两种意外情况:
第一,消费者发送消息给MQ失败,消息丢失;
第二,交换机路由到队列失败,路由键写错;
问题2:消费者能百分百接收到请求,且业务执行过程中还不能出错!
2:生产者确认
在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。
confirm 确认模式
return 退回模式
rabbitmq 整个消息投递的路径为:
消息从生产者(producer)发送消息到交换机(exchange),不论是否成功,都会执行一个确认回调方法confirmCallback 。
消息从交换机(exchange)到消息队列( queue )投递失败则会执行一个返回回调方法 returnCallback。
我们将利用这两个 callback 控制消息的可靠性投递
1:confirm 确认模式
目标
演示消息确认模式效果
生产者发布消息确认模式特点,不论消息是否进入交换机均执行回调方法
实现步骤
1. 在配置文件中,开启生产者发布消息确认模式
2. 编写生产者确认回调方法
3. 在RabbitTemplate中,设置消息发布确认回调方法
4. 请求测试:
测试成功回调:
测试失败回调:
实现过程
1. 在配置文件中,开启生产者发布消息确认模式
# 开启生产者确认模式:(confirm),投递到交换机,不论失败或者成功都回调
spring.rabbitmq.publisher-confirms=true
2. 编写生产者确认回调方法
//发送消息回调确认类,实现回调接口ConfirmCallback,重写其中confirm()方法
@Component
public class MessageConfirmCallback implements
RabbitTemplate.ConfirmCallback {
/**
* 投递到交换机,不论投递成功还是失败都回调次方法
* @param correlationData 投递相关数据
* @param ack 是否投递到交换机
* @param cause 投递失败原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack,
String cause) {
if (ack){
System.out.println("消息进入交换机成功{}");
} else {
System.out.println("消息进入交换机失败{} , 失败原因:" + cause);
}
}
}
3. 在RabbitTemplate中,设置消息发布确认回调方法
@Component
public class MessageConfirmCallback implements
RabbitTemplate.ConfirmCallback{
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 创建RabbitTemplate对象之后执行当前方法,为模板对象设置回调确认方法
* 设置消息确认回调方法
* 设置消息回退回调方法
*/
@PostConstruct
public void initRabbitTemplate(){
//设置消息确认回调方法
rabbitTemplate.setConfirmCallback(this::confirm);
}
/**
* 投递到交换机,不论投递成功还是失败都回调次方法
* @param correlationData 投递相关数据
* @param ack 是否投递到交换机
* @param cause 投递失败原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack,
String cause) {
if (ack){
System.out.println("消息进入交换机成功{}");
} else {
System.out.println("消息进入交换机失败{} , 失败原因:" + cause);
}
}
}
4. 请求测试:
测试成功回调: http://localhost:8080/direct/sendMsg?exchange=order_exchange&routingkey=order.A&msg=购买苹果手机
测试失败回调: http://localhost:8080/direct/sendMsg?
exchange=order_xxxxxxx&routingkey=order.A&msg=购买苹果手机
2:return 退回模式
目标
演示消息回退模式效果
消息回退模式特点:消息进入交换机,路由到队列过程中出现异常则执行回调方法
实现步骤
1. 在配置文件中,开启生产者发布消息回退模式
2. 在MessageConfirmCallback类中,实现接口RabbitTemplate.ReturnCallback
3. 并重写RabbitTemplate.ReturnCallback接口中returnedMessage()方法
4. 在RabbitTemplate中,设置消息发布回退回调方法
5. 请求测试:
测试成功回调:
测试失败回调:
实现过程
1. 在配置文件中,开启生产者发布消息回退模式
# 开启生产者回退模式:(returns),交换机将消息路由到队列,出现异常则回调
spring.rabbitmq.publisher-returns=true
2. 在MessageConfirmCallback类中,实现接口RabbitTemplate.ReturnCallback
@Component
public class RabbitConfirm implements RabbitTemplate.ConfirmCallback
,RabbitTemplate.ReturnCallback {
//..省略
}
3. 并重写RabbitTemplate.ReturnCallback接口中returnedMessage()方法
/**
* 当消息投递到交换机,交换机路由到消息队列中出现异常,执行returnedMessaged方
法
* @param message 投递消息内容
* @param replyCode 返回错误状态码
* @param replyText 返回错误内容
* @param exchange 交换机名称
* @param routingKey 路由键
*/
@Override
public void returnedMessage(Message message, int replyCode, String
replyText, String exchange, String routingKey) {
System.out.println("交换机路由至消息队列出错:>>>>>>>");
System.out.println("交换机:"+exchange);
System.out.println("路由键:"+routingKey);
System.out.println("错误状态码:"+replyCode);
System.out.println("错误原因:"+replyText);
System.out.println("发送消息内容:"+message.toString());
System.out.println("<<<<<<<<");
}
4. 在RabbitTemplate中,设置消息发布回退回调方法
@PostConstruct
public void initRabbitTemplate(){
//设置消息确认回调方法
rabbitTemplate.setConfirmCallback(this::confirm);
//设置消息回退回调方法
rabbitTemplate.setReturnCallback(this::returnedMessage);
}
5. 请求测试失败执行returnedMessage方法: http://localhost:8080/direct/sendMsg?
exchange=order_exchange&routingkey=xxxxx&msg=购买苹果手机
3:小结
确认模式
设置publisher-confirms="true" 开启 确认模式。
实现RabbitTemplate.ConfirmCallback接口,重写confirm方法
特点:不论消息是否成功投递至交换机,都回调confirm方法,只有在发送失败时需要写业务代码进行处理。
退回模式
设置publisher-returns="true" 开启 退回模式。
实现RabbitTemplate.ReturnCallback接口,重写returnedMessage方法
特点:消息进入交换机后,只有当从exchange路由到queue失败,才去回调returnedMessage方法;
3:消费者确认(ACK)
ack指 Acknowledge,拥有确认的含义,是消费端收到消息的一种确认机制;
1:消息确认的三种类型
自动确认:acknowledge="none"
手动确认:acknowledge="manual"
根据异常情况确认:acknowledge="auto",(这种方式使用麻烦,不作讲解)
其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从
RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。
如果设置了手动确认方式,则需要在业务处理成功后,调用 channel.basicAck() ,手动签收,如果出现异常,则调用 channel.basicNack() 方法,让其自动重新发送消息。
自己实现消费者签收代码:自定义监听器涉及三个对象:三个对象必须注入Spring容器
1. 自定义监听器对象
2. 自定义监听器的适配器Adaptor对象
3. 监听器的容器对象
可以使用RabbitTemplate中提供的签收方式:
2:代码实现
目标
演示消费者手动确认效果
自定义消费者接收消息监听器,监听收到消息的内容,手动进行签收;当业务系统抛出异常则拒绝签收,重回队列
实现步骤
1. 搭建新的案例工程consumer-received-ack,用于演示ack消费者签收
2. 在消费者工程中,创建自定义监听器类CustomAckConsumerListener,实现
ChannelAwareMessageListener接口
3. 编写监听器配置类ListenerConfiguration,配置自定义监听器绑定消息队列 order.A
注入消息队列监听器适配器对象到ioc容器
注入消息队列监听器容器对象到ioc容器:
配置连接工厂
配置自定义监听器适配器对象
配置消息队列
开启手动签收
4. 启动消费者服务,观察控制台,消费者监听器是否与RabbitMQ建立Connection
5. 测试发送消息手动签收
6. 模拟业务逻辑出现异常情况
7. 测试异常情况,演示拒绝签收消息,消息重回队列
实现过程
1. 搭建新的案例工程consumer-received-ack,搭建过程类似于生产者确认
2. 在消费者工程中,创建自定义监听器类CustomAckConsumerListener,实现
ChannelAwareMessageListener接口
/**
* 自定义监听器,监听到消息之后,立即执行onMessage方法
*/
@Component
public class CustomAckConsumerListener implements
ChannelAwareMessageListener {
/**
* 监听到消息之后执行的方法
* @param message 消息内容
* @param channel 消息所在频道
*/
@Override
public void onMessage(Message message, Channel channel) throws
Exception {
//获取消息内容
byte[] messageBody = message.getBody();
String msg = new String(messageBody, "utf-8");
System.out.println("接收到消息,执行具体业务逻辑{} 消息内容:"+msg);
//获取投递标签
MessageProperties messageProperties =
message.getMessageProperties();
long deliveryTag = messageProperties.getDeliveryTag();
/**
* 签收消息,前提条件,必须在监听器的配置中,开启手动签收模式
* 参数1:消息投递标签
* 参数2:是否批量签收:true一次性签收所有,false,只签收当前消息
*/
channel.basicAck(deliveryTag,false);
System.out.println("手动签收完成:{}");
}
}
3. 编写监听器配置类ListenerConfiguration,配置自定义监听器绑定消息队列 order.A
注入消息队列监听器适配器对象到ioc容器
注入消息队列监听器容器对象到ioc容器:
配置连接工厂
配置自定义监听器
配置消息队列
开启手动签收
/**
* 消费者监听器配置,将监听器绑定到消息队列上
*/
@Configuration
public class ListenerConfiguration {
/**
* 注入消息监听器适配器
* @param customAckConsumerListener 自定义监听器对象
*/
@Bean
public MessageListenerAdapter
messageListenerAdapter(CustomAckConsumerListener
customAckConsumerListener){
//创建自定义监听器适配器对象
return new
MessageListenerAdapter(customAckConsumerListener);
}
/**
* 注入消息监听器容器
* @param connectionFactory 连接工厂
* @param messageListenerAdapter 自定义的消息监听器适配器
*/
@Bean
public SimpleMessageListenerContainer
simpleMessageListenerContainer(
ConnectionFactory connectionFactory,
MessageListenerAdapter messageListenerAdapter){
//简单的消息监听器容器对象
SimpleMessageListenerContainer container = new
SimpleMessageListenerContainer();
//绑定消息队列
container.setQueueNames("order.A");
//设置连接工厂对象
container.setConnectionFactory(connectionFactory);
//设置消息监听器适配器
container.setMessageListener(messageListenerAdapter);
//设置手动确认消息:NONE(不确认消息),MANUAL(手动确认消息),AUTO(自
动确认消息)
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return container;
}
}
4. 启动消费者控制,观察控制台,消费者监听器是否与RabbitMQ建立Connection
5. 测试发送消息手动签收,请求地址http://localhost:8080/direct/sendMsg?exchange=order_exchange&routingkey=order.A&msg=购买苹果手机
6. 模拟业务逻辑出现异常情况,修改自定义监听器
@Override
public void onMessage(Message message, Channel channel) throws Exception
{
//获取消息内容
byte[] messageBody = message.getBody();
String msg = new String(messageBody, "utf-8");
System.out.println("接收到消息,执行具体业务逻辑{} 消息内容:"+msg);
//获取投递标签
MessageProperties messageProperties =
message.getMessageProperties();
long deliveryTag = messageProperties.getDeliveryTag();
try {
if (msg.contains("苹果")){
throw new RuntimeException("不允许卖苹果手机!!!");
}
/**
* 手动签收消息
* 参数1:消息投递标签
* 参数2:是否批量签收:true一次性签收所有,false,只签收当前消息
*/
channel.basicAck(deliveryTag,false);
System.out.println("手动签收完成:{}");
} catch (Exception ex){
/**
* 手动拒绝签收
* 参数1:当前消息的投递标签
* 参数2:是否批量签收:true一次性签收所有,false,只签收当前消息
* 参数3:是否重回队列,true为重回队列,false为不重回
*/
channel.basicNack(deliveryTag,false,true);
System.out.println("拒绝签收,重回队列:{}"+ex);
}
}
7. 测试异常情况,演示拒绝签收消息,消息重回队列
请求地址包含苹果,抛出异常:http://localhost:8080/direct/sendMsg?exchange=order_ex
change&routingkey=order.A&msg=购买苹果手机
控制台打印结果
2:小结
如果想手动签收消息,那么需要自定义实现消息接收监听器,实现
ChannelAwareMessageListener接口
设置AcknowledgeMode模式
none:自动
auto:异常模式
manual:手动
调用channel.basicAck方法签收消息
调用channel.basicNAck方法拒签消息