11 发布确认高级

发布确认 springboot 版本

11 发布确认高级

配置文件

在配置文件当中需要添加

spring.rabbitmq.publisher-confirm-type=correlated

  • NONE

禁用发布确认模式,是默认值

  • CORRELATED

发布消息成功到交换器后会触发回调方法

  • SIMPLE

经测试有两种效果,其一效果和 CORRELATED 值一样会触发回调方法,
其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法
等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是
waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker

//配置类  发布确认
@Configuration
public class ConfirmConfig {

    //交换机
    public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";
    //队列
    public static final String CONFIRM_QUEUE = "confirm_queue";
    //RoutingKey
    public static final String CONFIRM_ROUTING_KEY = "confirm_routing_key";

    //声明交换机
    @Bean
    public DirectExchange confirmExchange(){
        return new DirectExchange(CONFIRM_EXCHANGE_NAME);
    }

    //声明队列
    @Bean
    public Queue confirmQueue(){
        return QueueBuilder.durable(CONFIRM_QUEUE).build();
    }

    //绑定
    @Bean
    public Binding queueBindingExchange(@Qualifier("confirmExchange")DirectExchange confirmExchange,
                                        @Qualifier("confirmQueue")Queue confirmQueue){
        return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
    }
}

消息生产者

@Slf4j
@RestController
@RequestMapping("/confirm")
public class ProducerController {

    @Autowired
    RabbitTemplate rabbitTemplate;

    //发消息
    @GetMapping("/sendMessage/{message}")
    public void sendMessage(@PathVariable String message){
        CorrelationData correlationData = new CorrelationData("1");
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,ConfirmConfig.CONFIRM_ROUTING_KEY,message,correlationData);

        log.info("发送的消息内容:{}",message);
		
        //改变 routingkey 让他错误
        CorrelationData correlationData2 = new CorrelationData("2");
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,ConfirmConfig.CONFIRM_ROUTING_KEY+"2",message,correlationData2);

        log.info("发送的消息内容:{}",message);
    }

}

回调接口

@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback{

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(){
        //注入
        rabbitTemplate.setConfirmCallback(this);
    }

    /**
     *  交换机确认回调方法
     *  1.发消息  交换机收到了  回调
     *      1.1 CorrelationData  保存了回调消息的ID及相关信息
     *      1.2 交换机收到消息  ack=true
     *      1.3 cause - null
     * 2. 发消息  交换机接受失败  回调
     *      2.1 CorrelationData  保存了回调消息的ID及相关信息
     *      2.2 交换机收到消息  ack = false
     *      2.3  cause  - 失败的原因
     */

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData != null ? correlationData.getId() : "";

        if (ack){
            log.info("交换机收到了id为:{}的消息",id);
        }else {
            log.info("交换机还未收到id:{}的消息,由于原因:{}",id,cause);
        }
    }
}

消息消费者

@Slf4j
@Component
public class Consumer {

    @RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE)
    public void receiveConfirmMessage(Message message){
       String msg = new String(message.getBody());
        log.info("接收到到的队列confirm.queue消息:{}",msg);
    }

}

结果分析

11 发布确认高级

发送了两条消息,第一条消息的 RoutingKey 为 "key1",第二条消息的 RoutingKey 为"key2",两条消息都成功被交换机接收,也收到了交换机的确认回调,但消费者只收到了一条消息,因为第二条消息的 RoutingKey 与队列的 BindingKey 不一致,也没有其它队列能接收这个消息,所有第二条消息被直接丢弃了。

回退消息

队列收不到消息,routingKey 出错,队列消失了

在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。

在配置类加上 spring.rabbitmq.publisher-returns=true

消息生产者代码

@Slf4j
@RestController
@RequestMapping("/confirm")
public class ProducerController {

    @Autowired
    RabbitTemplate rabbitTemplate;

    //发消息
    @GetMapping("/sendMessage/{message}")
    public void sendMessage(@PathVariable String message){
        CorrelationData correlationData = new CorrelationData("1");
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,ConfirmConfig.CONFIRM_ROUTING_KEY,message,correlationData);

        log.info("发送的消息内容:{}",message);

        CorrelationData correlationData2 = new CorrelationData("2");
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME+"123",ConfirmConfig.CONFIRM_ROUTING_KEY,message,correlationData2);

        log.info("发送的消息内容:{}",message);
    }

}

回调接口

@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(){
        //注入
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(this);
    }

    /**
     *  交换机确认回调方法
     *  1.发消息  交换机收到了  回调
     *      1.1 CorrelationData  保存了回调消息的ID及相关信息
     *      1.2 交换机收到消息  ack=true
     *      1.3 cause - null
     * 2. 发消息  交换机接受失败  回调
     *      2.1 CorrelationData  保存了回调消息的ID及相关信息
     *      2.2 交换机收到消息  ack = false
     *      2.3  cause  - 失败的原因
     */

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData != null ? correlationData.getId() : "";

        if (ack){
            log.info("交换机收到了id为:{}的消息",id);
        }else {
            log.info("交换机还未收到id:{}的消息,由于原因:{}",id,cause);
        }
    }

    //当消息传递过程中不可达目的地时将消息返回给生产者
    //只有不可达目的地的时候才进行回退
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.error("消息{},被交换机{}退回,退回原因:{},routingKey:{}",
                new String(message.getBody()),exchange,replyText,routingKey);
    }

    @Override
    public void returnedMessage(ReturnedMessage returned) {

    }
}

11 发布确认高级

备份交换机

11 发布确认高级

修改配置类

@Configuration
public class ConfirmConfig {

    //交换机
    public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";
    //队列
    public static final String CONFIRM_QUEUE = "confirm_exchange";
    //RoutingKey
    public static final String CONFIRM_ROUTING_KEY = "confirm_routing_key";
    //备份交换机
    public static final String BACKUP_EXCHANGE_NAME = "backup_exchange";
    //备份队列
    public static final String BACKUP_QUEUE_NAME = "backup_queue";
    //报警队列
    public static final String WARNING_QUEUE_NAME = "warning_queue";


    //声明交换机
    @Bean
    public DirectExchange confirmExchange(){
        //确认交换机 无法确认消息 转发到备份交换机
        return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true)
                .withArgument("alternate-exchange",BACKUP_EXCHANGE_NAME).build();
    }

    //声明队列
    @Bean
    public Queue confirmQueue(){
        return QueueBuilder.durable(CONFIRM_QUEUE).build();
    }

    //绑定
    @Bean
    public Binding queueBindingExchange(@Qualifier("confirmExchange")DirectExchange confirmExchange,
                                        @Qualifier("confirmQueue")Queue confirmQueue){
        return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
    }

    //备份交换机
    @Bean
    public FanoutExchange backupExchange(){
        return new FanoutExchange(BACKUP_EXCHANGE_NAME);
    }

    //备份队列
    @Bean
    public Queue backupQueue(){
        return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
    }

    @Bean
    public Queue warningQueue(){
        return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
    }
    //绑定
    @Bean
    public Binding backupQueueBindingExchange(@Qualifier("backupExchange") FanoutExchange backupExchange,
                                        @Qualifier("backupQueue")Queue backupQueue){
        return BindingBuilder.bind(backupQueue).to(backupExchange);
    }

    //绑定
    @Bean
    public Binding warningQueueBindingExchange(@Qualifier("backupExchange") FanoutExchange backupExchange,
                                              @Qualifier("warningQueue")Queue warningQueue){
        return BindingBuilder.bind(warningQueue).to(backupExchange);
    }
}

报警消费者

@Component
@Slf4j
public class WarningConsumer {

    //接受报警信息
    @RabbitListener(queues = ConfirmConfig.WARNING_QUEUE_NAME)
    public void receiveWarningMsg(Message message) {
        String msg = new String(message.getBody());
        log.error("报警发现不可路由消息:{}",msg);

    }
}

结果分析

11 发布确认高级

mandatory 参数与备份交换机可以一起使用的时候,如果两者同时开启,消息究竟何去何从?谁优先级高,经过上面结果显示答案是备份交换机优先级高。

上一篇:爬虫+数据分析,制作一个世界疫情人数增长动态柱状竞赛图


下一篇:React实现块依次从下到上进入的动画