1、1 在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。
RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。
rabbitmq 整个消息投递的路径为:
producer--->rabbitmq broker--->exchange--->queue--->consumer
1、2 confirm 确认模式
消息从 producer 到 exchange 则会返回一个 confirmCallback 。
1、3 return 退回模式
消息从 exchange-->queue 投递失败则会返回一个 returnCallback 。
2、1 确认模式:
2、2 配置文件开启确认模式:
# 开启消息的confirm机制
publisher-confirms: true
2、3 使用rabbitTemplate.setConfirmCallback设置回调函数。
/**
* @author Echo
* 生产者的 confirm 模式
* 生产者的投递路径: producer--->rabbitmq broker--->exchange
* 具体操作:
* 1、配置文件中配置 confirm 模式的开启
* publisher-confirms: true
*
* 2、设置确认回调机制
* 通过rabbitTemplate.setConfirmCallback(RabbitTemplate.ConfirmCallback confirmCallback)
* ConfirmCallback对象对应的参数:
* confirm(@NonNull CorrelationData correlationData, boolean ack, @Nullable String cause)
* CorrelationData:相关配置信息
* ack:exchange交换机是否成功收到了消息。true 成功,false代表失败
* cause:失败原因
*/
@Test
public void testConfirm(){
rabbitTemplate.setConfirmCallback((correlationData, b, s) -> {
System.out.println("confirm 方法被指行了.........");
if(b){
System.out.println("接受消息成功" + s);
}else{
System.out.println("接受失败" + s);
}
});
// 正常情况
rabbitTemplate.convertAndSend("springboot_topic_exchange","t1.topic.test","hello springboot topic");
// 模拟异常
rabbitTemplate.convertAndSend("springboot_topic_exchange1","t1.topic.test","hello springboot topic");
}
2、4
运行测试。
3、1 退回模式:
3、2 配置文件开启此模式:
# 开启 退回模式
publisher-returns: true
3、3 具体代码如下:
/**
* @author Echo
* 回退模式
*/
@Test
public void testReturn(){
// 1、设置交换机处理失败消息的模式
rabbitTemplate.setMandatory(true);
// 2、设置ReturnCallBack
/**
* @author Echo
* setReturnCallback(ReturnCallback)
* 需要传入一个ReturnCallback对象
* ReturnCallback:
* returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey)
* 参数解释:
* message:消息对象
* replyCode:错误码
* replyText:错误信息
* exchange:交换机名称
* routingKey:路由键
*/
rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> {
System.out.println("return 执行了....");
System.out.println(message);
System.out.println(replyCode);
System.out.println(replyText);
System.out.println(exchange);
System.out.println(routingKey);
});
// 正常情况
rabbitTemplate.convertAndSend(RmConfig.EXCHANGE_NAME,"t1.topic.test","hello springboot topic");
// 失败情况
// rabbitTemplate.convertAndSend(RmConfig.EXCHANGE_NAME,"t2.topic.test","hello springboot topic");
}
项目代码链接:https://github.com/Mbm7280/rabbitmq_demo