发布确认 springboot 版本
配置文件
在配置文件当中需要添加
spring.rabbitmq.publisher-confirm-type=correlated
- NONE
禁用发布确认模式,是默认值
- CORRELATED
发布消息成功到交换器后会触发回调方法
- SIMPLE
经测试有两种效果,其一效果和 CORRELATED 值一样会触发回调方法,
其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法
等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是
waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker
//配置类 发布确认
@Configuration
public class ConfirmConfig {
//交换机
public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";
//队列
public static final String CONFIRM_QUEUE = "confirm_queue";
//RoutingKey
public static final String CONFIRM_ROUTING_KEY = "confirm_routing_key";
//声明交换机
@Bean
public DirectExchange confirmExchange(){
return new DirectExchange(CONFIRM_EXCHANGE_NAME);
}
//声明队列
@Bean
public Queue confirmQueue(){
return QueueBuilder.durable(CONFIRM_QUEUE).build();
}
//绑定
@Bean
public Binding queueBindingExchange(@Qualifier("confirmExchange")DirectExchange confirmExchange,
@Qualifier("confirmQueue")Queue confirmQueue){
return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
}
}
消息生产者
@Slf4j
@RestController
@RequestMapping("/confirm")
public class ProducerController {
@Autowired
RabbitTemplate rabbitTemplate;
//发消息
@GetMapping("/sendMessage/{message}")
public void sendMessage(@PathVariable String message){
CorrelationData correlationData = new CorrelationData("1");
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,ConfirmConfig.CONFIRM_ROUTING_KEY,message,correlationData);
log.info("发送的消息内容:{}",message);
//改变 routingkey 让他错误
CorrelationData correlationData2 = new CorrelationData("2");
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,ConfirmConfig.CONFIRM_ROUTING_KEY+"2",message,correlationData2);
log.info("发送的消息内容:{}",message);
}
}
回调接口
@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback{
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init(){
//注入
rabbitTemplate.setConfirmCallback(this);
}
/**
* 交换机确认回调方法
* 1.发消息 交换机收到了 回调
* 1.1 CorrelationData 保存了回调消息的ID及相关信息
* 1.2 交换机收到消息 ack=true
* 1.3 cause - null
* 2. 发消息 交换机接受失败 回调
* 2.1 CorrelationData 保存了回调消息的ID及相关信息
* 2.2 交换机收到消息 ack = false
* 2.3 cause - 失败的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String id = correlationData != null ? correlationData.getId() : "";
if (ack){
log.info("交换机收到了id为:{}的消息",id);
}else {
log.info("交换机还未收到id:{}的消息,由于原因:{}",id,cause);
}
}
}
消息消费者
@Slf4j
@Component
public class Consumer {
@RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE)
public void receiveConfirmMessage(Message message){
String msg = new String(message.getBody());
log.info("接收到到的队列confirm.queue消息:{}",msg);
}
}
结果分析
发送了两条消息,第一条消息的 RoutingKey 为 "key1",第二条消息的 RoutingKey 为"key2",两条消息都成功被交换机接收,也收到了交换机的确认回调,但消费者只收到了一条消息,因为第二条消息的 RoutingKey 与队列的 BindingKey 不一致,也没有其它队列能接收这个消息,所有第二条消息被直接丢弃了。
回退消息
队列收不到消息,routingKey 出错,队列消失了
在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。
在配置类加上 spring.rabbitmq.publisher-returns=true
消息生产者代码
@Slf4j
@RestController
@RequestMapping("/confirm")
public class ProducerController {
@Autowired
RabbitTemplate rabbitTemplate;
//发消息
@GetMapping("/sendMessage/{message}")
public void sendMessage(@PathVariable String message){
CorrelationData correlationData = new CorrelationData("1");
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,ConfirmConfig.CONFIRM_ROUTING_KEY,message,correlationData);
log.info("发送的消息内容:{}",message);
CorrelationData correlationData2 = new CorrelationData("2");
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME+"123",ConfirmConfig.CONFIRM_ROUTING_KEY,message,correlationData2);
log.info("发送的消息内容:{}",message);
}
}
回调接口
@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init(){
//注入
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);
}
/**
* 交换机确认回调方法
* 1.发消息 交换机收到了 回调
* 1.1 CorrelationData 保存了回调消息的ID及相关信息
* 1.2 交换机收到消息 ack=true
* 1.3 cause - null
* 2. 发消息 交换机接受失败 回调
* 2.1 CorrelationData 保存了回调消息的ID及相关信息
* 2.2 交换机收到消息 ack = false
* 2.3 cause - 失败的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String id = correlationData != null ? correlationData.getId() : "";
if (ack){
log.info("交换机收到了id为:{}的消息",id);
}else {
log.info("交换机还未收到id:{}的消息,由于原因:{}",id,cause);
}
}
//当消息传递过程中不可达目的地时将消息返回给生产者
//只有不可达目的地的时候才进行回退
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.error("消息{},被交换机{}退回,退回原因:{},routingKey:{}",
new String(message.getBody()),exchange,replyText,routingKey);
}
@Override
public void returnedMessage(ReturnedMessage returned) {
}
}
备份交换机
修改配置类
@Configuration
public class ConfirmConfig {
//交换机
public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";
//队列
public static final String CONFIRM_QUEUE = "confirm_exchange";
//RoutingKey
public static final String CONFIRM_ROUTING_KEY = "confirm_routing_key";
//备份交换机
public static final String BACKUP_EXCHANGE_NAME = "backup_exchange";
//备份队列
public static final String BACKUP_QUEUE_NAME = "backup_queue";
//报警队列
public static final String WARNING_QUEUE_NAME = "warning_queue";
//声明交换机
@Bean
public DirectExchange confirmExchange(){
//确认交换机 无法确认消息 转发到备份交换机
return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true)
.withArgument("alternate-exchange",BACKUP_EXCHANGE_NAME).build();
}
//声明队列
@Bean
public Queue confirmQueue(){
return QueueBuilder.durable(CONFIRM_QUEUE).build();
}
//绑定
@Bean
public Binding queueBindingExchange(@Qualifier("confirmExchange")DirectExchange confirmExchange,
@Qualifier("confirmQueue")Queue confirmQueue){
return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
}
//备份交换机
@Bean
public FanoutExchange backupExchange(){
return new FanoutExchange(BACKUP_EXCHANGE_NAME);
}
//备份队列
@Bean
public Queue backupQueue(){
return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
}
@Bean
public Queue warningQueue(){
return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
}
//绑定
@Bean
public Binding backupQueueBindingExchange(@Qualifier("backupExchange") FanoutExchange backupExchange,
@Qualifier("backupQueue")Queue backupQueue){
return BindingBuilder.bind(backupQueue).to(backupExchange);
}
//绑定
@Bean
public Binding warningQueueBindingExchange(@Qualifier("backupExchange") FanoutExchange backupExchange,
@Qualifier("warningQueue")Queue warningQueue){
return BindingBuilder.bind(warningQueue).to(backupExchange);
}
}
报警消费者
@Component
@Slf4j
public class WarningConsumer {
//接受报警信息
@RabbitListener(queues = ConfirmConfig.WARNING_QUEUE_NAME)
public void receiveWarningMsg(Message message) {
String msg = new String(message.getBody());
log.error("报警发现不可路由消息:{}",msg);
}
}
结果分析
mandatory 参数与备份交换机可以一起使用的时候,如果两者同时开启,消息究竟何去何从?谁优先级高,经过上面结果显示答案是备份交换机优先级高。