RabbitMQ 发布确认

一、概述

在实际生产环境中由于一些不明原因,生产者发送的消息在传递到队列的过程中发生消息丢失

1、网络波动,Producer 发送消息的没有到达交换机,消息发生丢失

2、Producer 发送的消息到达了交换机,而交换机发生了故障,没有将消息路由给队列,消息发生丢失

3、Producer 发送的消息到达了交换机,交换机将消息路由到队列的时候,队列发生故障,消息发生丢失

如果消息发生了丢失,就需要我们手动处理和恢复,那么怎么样才能保证 Producer 生产的消息成功的投递到队列中呢,这就需要使用 RabbitMQ 的发布确认机制了

整个发布确认机制包括两个部分

1、交换机确认: 保证 Producer 生产的消息成功投递到交换机上而不发生丢失

2、消息的回退: 保证交换机成功将消息路由到队列上而不发生消息

 

二、交换机确认

当生产者发送消息给 RabbitMQ 时,如果消息到达了交换机,那么交换机应该告知生产者消息是否已经成功到达交换机,这个时候我们就需要通过 ConfirmCallback

1、原理图

RabbitMQ 发布确认

2、application.yml

# 该配置项取值方式有三种 none、correlated、simple

# none: 禁用发布确认模式(默认值)
# correlated: 消息成功发布到交换机后,会触发 RabbitTemplate.ConfirmCallback 实现类的 confirm() 方法
# simple: 
经测试有两种效果,其一效果和 correlated 值一样会触发回调方法,
其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 节点返回发送结果,
根据返回结果来判定下一步的逻辑,要注意的点是 waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker
spring.rabbitmq.publisher-confirm-type=correlated

3、自定义 MyConfirmCallback 实现 RabbitTemplate.confirmCallback

@Slf4j
@Component
public class MyConfirmCallback implements RabbitTemplate.ConfirmCallback {
    /**
     * 交换机确认回调方法
     * 1、Producer 发送的消息,交换机确认收到
     * correlationData:保存消息回调 ID 及其它相关的信息
     * ack:true
     * cause:null
     *
     * 2、Producer 发送的消息,交换机没有收到
     * correlationData:保存消息回调 ID 及其它相关的信息
     * ack:false
     * cause:交换机没有收到消息的原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData != null ? correlationData.getId() : "";
        if (ack) {
            log.info("交换机已经收到 id 为:{}的消息", id);
        } else {
            log.info("交换机还未收到 id 为:{}消息,由于原因:{}", id, cause);
        }
    }
}

4、Consumer

@Slf4j
@Component
public class Consumer {
    private static final String PUBLISH_CONFIRM_QUEUE = "confirm_queue";

    @RabbitListener(queues = {PUBLISH_CONFIRM_QUEUE})
    public void receivedMessage(Message message, Channel channel) {

        String msg = new String(message.getBody());

        log.info("接受到消息:{}", msg);
    }
}

5、Producer

@Slf4j
@RestController
public class Producer {

    private static final String PUBLISH_CONFIRM_EXCHANGE = "confirm_exchange";
    private static final String PUBLISH_CONFIRM_ROUTING_KEY = "confirm_routing_key";
    private static final String PUBLISH_ERROR_CONFIRM_EXCHANGE = "confirm_error_exchange";
    private static final String PUBLISH_ERROR_CONFIRM_ROUTING_KEY = "confirm_error_routing_key";

    // 1、依赖注入 rabbitTemplate
    @Autowired
    private RabbitTemplate rabbitTemplate;
    // 2、依赖注入 myConfirmCallback
    @Autowired
    private MyConfirmCallback myConfirmCallback;

    // 3、完成了 1、2 的注入之后再设置 rabbitTemplate 的回调对象
    @PostConstruct
    public void init() {
        // 消息成功传递给交换机时会触发 MyConfirmCallback 中的回调方法 confirm()
        rabbitTemplate.setConfirmCallback(myConfirmCallback);
    }

    @GetMapping("/confirm/sendMessage/{message}")
    public void sendMessage(@PathVariable("message") String message) {
        // 设置唯一 ID
        CorrelationData correlationData = new CorrelationData();
        correlationData.setId(UUID.randomUUID().toString());

        // 使用正确的交换机发送消息(该消息能正确的发布到交换机)
        rabbitTemplate.convertAndSend(PUBLISH_CONFIRM_EXCHANGE, PUBLISH_CONFIRM_ROUTING_KEY, message, correlationData);
        log.info("使用正确的交换机发布消息,发送的消息是: {}", message);

        // 设置唯一 ID
        CorrelationData correlationData2 = new CorrelationData();
        correlationData2.setId(UUID.randomUUID().toString());
        // 使用错误的交换机发送消息(该消息不会发布到交换机)
        rabbitTemplate.convertAndSend(PUBLISH_ERROR_CONFIRM_EXCHANGE, PUBLISH_CONFIRM_ROUTING_KEY, message, correlationData2);

        log.info("使用错误的交换机发布消息,发送的消息是: {}", message);
    }
}

6、测试过程及步骤

我们这里为了模拟消息发送不到交换机,我们分别使用两个不同的交换机(正确的交换机、错误的交换机)发布消息

浏览器发送请求: http://localhost:8080/confirm/sendMessage/小毛毛变身

RabbitMQ 发布确认结果分析:

由于使用了两个不同的交换机,一个交换机是正确的,一个交换机是错误的,所以生产者实际发送了两条消息,显示而消费者只消费掉到一条消息, 如果 Producer 将消息成功发送到了交换机会触发自定义的回调方法 MyConfirmCallback,如果 Producer 未成功将消息发送到交换机,那么它也会触发自定义的回调方法 MyConfirmCallback 方法,我们需要根据具体的 ack 的值(true/false)来判断消息是否正确的发布到了交换机

如果我们更改一下 Producer 的代码,生产者发送消息的时候使用相同的交换机,使用不同的 Routing_key,会发生什么样的效果呢,我们测试一下

@Slf4j
@RestController
public class Producer {

    private static final String PUBLISH_CONFIRM_EXCHANGE = "confirm_exchange";
    private static final String PUBLISH_CONFIRM_ROUTING_KEY = "confirm_routing_key";
    private static final String PUBLISH_ERROR_CONFIRM_EXCHANGE = "confirm_error_exchange";
    private static final String PUBLISH_ERROR_CONFIRM_ROUTING_KEY = "confirm_error_routing_key";

    // 1、依赖注入 rabbitTemplate
    @Autowired
    private RabbitTemplate rabbitTemplate;
    // 2、依赖注入 myConfirmCallback
    @Autowired
    private MyConfirmCallback myConfirmCallback;

    // 3、完成了 1、2 的注入之后再设置 rabbitTemplate 的回调对象
    @PostConstruct
    public void init() {
        // 消息成功传递给交换机时会触发 MyConfirmCallback 中的回调方法 confirm()
        rabbitTemplate.setConfirmCallback(myConfirmCallback);
    }

    @GetMapping("/confirm/sendMessage/{message}")
    public void sendMessage(@PathVariable("message") String message) {
        // 设置唯一 ID
        CorrelationData correlationData = new CorrelationData();
        correlationData.setId(UUID.randomUUID().toString());

        // 使用相同交换机,正确的 routing_key 发送消息
        rabbitTemplate.convertAndSend(PUBLISH_CONFIRM_EXCHANGE, PUBLISH_CONFIRM_ROUTING_KEY, message, correlationData);
        log.info("使用相同交换机,正确的 routing_key 发送消息,发送的消息是: {}", message);

        // 设置唯一 ID
        CorrelationData correlationData2 = new CorrelationData();
        correlationData2.setId(UUID.randomUUID().toString());
        // 使用相同交换机,错误的 routing_key 发送消息
        rabbitTemplate.convertAndSend(PUBLISH_CONFIRM_EXCHANGE, PUBLISH_ERROR_CONFIRM_ROUTING_KEY, message, correlationData2);

        log.info("使用相同交换机,正确的 routing_key 发送消息,发送的消息是: {}", message);
    }
}

测试结果如下

RabbitMQ 发布确认

发送了两条消息,交换机也确认收到了两条消息,但是为什么消费者只消费了一条消息呢

因为生产者发送消息的时候使用的是正确的交换机,消息成功的发送到了交换机,所以触发了自定义的回调方法 MyConfirmCallback,但是由于生产者发送消息的时候使用的 routing_key 一个是正确的,一个是错误的,真正通过交换机路由到队列里的消息只有一条,所以最终消费者只能消费到一条消息

 

三、消息回退

通过上面的例子可以发现,在仅开启了交换机确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果交换机路由到队列的过程中出现了问题,消息还是会直接丢弃,此时生产者是不知道消息已经被丢弃的,那么如何让无法被路由的消息也有类似于交换机确认一样的通知机制呢,这就是我们要介绍的消息回退机制

1、application.yml 添加配置

# 交换机没有将消息路由到队列中时,消息会回退,消息回退时会调用 RabbitTemplate.ReturnsCallback 的实现类的 returnedMessage() 方法
spring.rabbitmq.publisher-returns=true

2、MyConfirmCallback 实现 RabbitTemplate.ReturnsCallback(注意是 ReturnsCallback,原来的 ReturnCallback 已经过时)

@Slf4j
@Component
public class MyConfirmCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
    /**
     * 交换机确认回调方法
     * 1、Producer 发送的消息,交换机确认收到
     * correlationData:保存消息回调 ID 及其它相关的信息
     * ack:true
     * cause:null
     * <p>
     * 2、Producer 发送的消息,交换机没有收到
     * correlationData:保存消息回调 ID 及其它相关的信息
     * ack:false
     * cause:交换机没有收到消息的原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData != null ? correlationData.getId() : "";
        if (ack) {
            log.info("交换机已经收到 id 为:{}的消息", id);
        } else {
            log.info("交换机还未收到 id 为:{}消息,由于原因:{}", id, cause);
        }
    }

    /**
     * 如果交换机没有将消息路由到队列,会触发该回调方法
     */
    @Override
    public void returnedMessage(ReturnedMessage returned) {
        log.info("消息: {} 被服务器退回--->退回原因: {},交换机是: {},路由key是:{},退回编号是:{}",
                new String(returned.getMessage().getBody()), returned.getReplyText(), returned.getExchange(),
                returned.getRoutingKey(), returned.getReplyCode());
    }
}

3、Producer

@Slf4j
@RestController
public class Producer {

    private static final String PUBLISH_CONFIRM_EXCHANGE = "confirm_exchange";
    private static final String PUBLISH_CONFIRM_ROUTING_KEY = "confirm_routing_key";
    private static final String PUBLISH_ERROR_CONFIRM_EXCHANGE = "confirm_error_exchange";
    private static final String PUBLISH_ERROR_CONFIRM_ROUTING_KEY = "confirm_error_routing_key";

    // 1、依赖注入 rabbitTemplate
    @Autowired
    private RabbitTemplate rabbitTemplate;
    // 2、依赖注入 myConfirmCallback
    @Autowired
    private MyConfirmCallback myConfirmCallback;

    // 3、完成了 1、2 的注入之后再设置 rabbitTemplate 的回调对象
    @PostConstruct
    public void init() {
        // 消息成功传递给交换机时会触发 MyConfirmCallback 中的回调方法 confirm()
        rabbitTemplate.setConfirmCallback(myConfirmCallback);
        // 消息回退时会触发 MyConfirmCallback 中的回调方法 returnedMessage()
        rabbitTemplate.setReturnsCallback(myConfirmCallback);
    }

    @GetMapping("/confirm/sendMessage/{message}")
    public void sendMessage(@PathVariable("message") String message) {
        // 设置唯一 ID
        CorrelationData correlationData = new CorrelationData();
        correlationData.setId(UUID.randomUUID().toString());

        // 使用相同交换机,正确的 routing_key 发送消息
        rabbitTemplate.convertAndSend(PUBLISH_CONFIRM_EXCHANGE, PUBLISH_CONFIRM_ROUTING_KEY, message, correlationData);
        log.info("使用相同交换机,正确的 routing_key 发送消息,发送的消息是: {}", message);

        // 设置唯一 ID
        CorrelationData correlationData2 = new CorrelationData();
        correlationData2.setId(UUID.randomUUID().toString());
        // 使用相同交换机,错误的 routing_key 发送消息
        rabbitTemplate.convertAndSend(PUBLISH_CONFIRM_EXCHANGE, PUBLISH_ERROR_CONFIRM_ROUTING_KEY, message, correlationData2);

        log.info("使用相同交换机,正确的 routing_key 发送消息,发送的消息是: {}", message);
    }
}

4、测试及分析

浏览器发送请求: http://localhost:8080/confirm/sendMessage/小毛毛变身

RabbitMQ 发布确认

从上面的图可以很明显看出,发送了两条消息,交换机确认收到了两条消息,但是交换机进行路由的时候有一条消息没有进行正确路由到队列,触发了自定义 MyConfirmCallback 的 returnedMessage()方法,导致消费者最终只消费了一条消息

 

四、备份交换机

有了回退消息,我们获得了对无法投递消息的感知能力,有机会在生产者的消息无法被投递时发现并处理.但有时候,我们并不知道该如何处理这些无法路由的消息,最多打个日志,然后触发报警,再来手动处理.而通过日志来处理这些无法路由的消息是很不优雅的做法,特别是当生产者所在的服务有多台机器的时候,手动复制日志会更加麻烦而且容易出错.而且设置 mandatory 参数会增加生产者的复杂性,需要添加处理这些被退回的消息的逻辑.如果既不想丢失消息,又不想增加生产者的复杂性,该怎么做呢?前面在设置死信队列的文章中,我们提到,可以为队列设置死信交换机来存储那些处理失败的消息.可是这些不可路由消息根本没有机会进入到队列,因此无法使用死信队列来保存消息.在 RabbitMQ 中,有一种备份交换机的机制存在,可以很好的应对这个问题.什么是备份交换机呢?备份交换机可以理解为 RabbitMQ 中交换机的“备胎”,当我们为某一个交换机声明一个对应的备份交换机时, 就是为它创建一个备胎,当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理,通常备份交换机的类型为 Fanout,这样就能把所有消息都投递到与其绑定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都进入这个队列了.当然我们还可以建立一个报警队列,用独立的消费者来进行监测和报警

1、原理图

RabbitMQ 发布确认

2、application.yml

spring:
  rabbitmq:
    host: 192.168.59.135
    port: 5672
    username: admin
    password: admin123
    publisher-confirm-type: correlated
    publisher-returns: true
    # 开启 ack
    listener:
      direct:
        acknowledge-mode: manual
      simple:
        acknowledge-mode: manual #采取手动应答
        #concurrency: 1 # 指定最小的消费者数量
        #max-concurrency: 1 #指定最大的消费者数量
        retry:
          enabled: true # 是否支持重试

3、PublicConfirm

@Configuration
public class PublishConfirmConfig {
    private static final String CONFIRM_EXCHANGE = "confirm_exchange";
    private static final String BACKUP_EXCHANGE = "backup_exchange";
    private static final String CONFIRM_QUEUE = "confirm_queue";
    private static final String LOG_QUEUE = "log_queue";
    private static final String WARNING_QUEUE = "warning_queue";
    private static final String CONFIRM_ROUTING_KEY = "confirm";

    // 声明交换机 confirm_exchange,type = direct
    @Bean(CONFIRM_EXCHANGE)
    public DirectExchange directExchange() {
        // 当 confirm_exchange 无法将消息路由到 confirm_queue 时,将消息传递给 backup_exchange (alternate:替代者、代理人)
        return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE).durable(true).alternate(BACKUP_EXCHANGE).build();
    }

    // 声明交换机 backup_exchange,type = fanout
    @Bean(BACKUP_EXCHANGE)
    public FanoutExchange fanoutExchange() {
        return ExchangeBuilder.fanoutExchange(BACKUP_EXCHANGE).durable(true).build();
    }

    // 声明队列 confirm_queue
    @Bean(CONFIRM_QUEUE)
    public Queue confirmQueue() {
        return QueueBuilder.durable(CONFIRM_QUEUE).build();
    }

    // 声明 log_queue
    @Bean(LOG_QUEUE)
    public Queue logQueue() {
        return QueueBuilder.durable(LOG_QUEUE).build();
    }

    // 声明 warning_queue
    @Bean(WARNING_QUEUE)
    public Queue warningQueue() {
        return QueueBuilder.durable(WARNING_QUEUE).build();
    }

    // confirm_queue 绑定 confirm_exchange
    @Bean
    public Binding confirmQueueBindingConfirmExchange(@Qualifier(CONFIRM_QUEUE) Queue queue,
                                                      @Qualifier(CONFIRM_EXCHANGE) DirectExchange exchange) {

        return BindingBuilder.bind(queue).to(exchange).with(CONFIRM_ROUTING_KEY);
    }

    // log_queue 绑定 backup_exchange
    @Bean
    public Binding logQueueBindingBackupExchange(@Qualifier(LOG_QUEUE) Queue queue,
                                                 @Qualifier(BACKUP_EXCHANGE) FanoutExchange exchange) {

        return BindingBuilder.bind(queue).to(exchange);
    }

    // warning_queue 绑定 backup_exchange
    @Bean
    public Binding warningQueueBindingBackupExchange(@Qualifier(WARNING_QUEUE) Queue queue,
                                                     @Qualifier(BACKUP_EXCHANGE) FanoutExchange exchange) {

        return BindingBuilder.bind(queue).to(exchange);
    }
}

4、MyConfirmCallback

@Slf4j
@Component
public class MyConfirmCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
    /**
     * 交换机确认回调方法
     * 1、Producer 发送的消息,交换机确认收到
     * correlationData:保存消息回调 ID 及其它相关的信息
     * ack:true
     * cause:null
     * <p>
     * 2、Producer 发送的消息,交换机没有收到
     * correlationData:保存消息回调 ID 及其它相关的信息
     * ack:false
     * cause:交换机没有收到消息的原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData != null ? correlationData.getId() : "";
        if (ack) {
            log.info("交换机已经收到 id 为:{}的消息", id);
        } else {
            log.info("交换机还未收到 id 为:{}消息,由于原因:{}", id, cause);
        }
    }

    /**
     * 如果交换机没有将消息路由到队列,会触发该回调方法
     */
    @Override
    public void returnedMessage(ReturnedMessage returned) {
        log.info("消息: {} 被服务器退回--->退回原因: {},交换机是: {},路由key是:{},退回编号是:{}",
                new String(returned.getMessage().getBody()), returned.getReplyText(), returned.getExchange(),
                returned.getRoutingKey(), returned.getReplyCode());
    }
}

5、ConfirmConsumer

@Slf4j
@Component
public class ConfirmConsumer {
    private static final String CONFIRM_QUEUE = "confirm_queue";

    @RabbitListener(queues = {CONFIRM_QUEUE})
    public void receivedMessage(Message message, Channel channel, CorrelationData correlationData) throws IOException {
        try {
            String msg = new String(message.getBody());
            log.info("接受到消息:{}", msg, correlationData.getId());
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            log.info("消息消费错误");
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
        }
    }
}

6、LogConsumer

@Slf4j
@Component
public class LogConsumer {
    private static final String LOG_QUEUE = "log_queue";

    @RabbitListener(queues = {LOG_QUEUE})
    public void receivedMessage(Message message, Channel channel, CorrelationData correlationData) throws IOException {
        try {
            String msg = new String(message.getBody());
            log.error("日志记录不可路由消息:{}", msg);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            log.info("消息消费错误");
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
        }
    }
}

7、WarningConsumer

@Slf4j
@Component
public class WarningConsumer {
    private static final String WARNING_QUEUE = "warning_queue";

    @RabbitListener(queues = {WARNING_QUEUE})
    public void receivedMessage(Message message, Channel channel, CorrelationData correlationData) throws IOException {
        try {
            String msg = new String(message.getBody());
            log.error("报警发现不可路由消息:{}", msg);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            log.info("消息消费错误");
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
        }
    }
}

8、Producer

@Slf4j
@RestController
public class Producer {

    private static final String CONFIRM_EXCHANGE = "confirm_exchange";
    private static final String CONFIRM_ROUTING_KEY = "confirm";
    private static final String CONFIRM_ROUTING_KEY_ERROR = "confirm_error";

    // 1、依赖注入 rabbitTemplate
    @Autowired
    private RabbitTemplate rabbitTemplate;
    // 2、依赖注入 myConfirmCallback
    @Autowired
    private MyConfirmCallback myConfirmCallback;

    // 3、完成了 1、2 的注入之后再设置 rabbitTemplate 的回调对象
    @PostConstruct
    public void init() {
        // 消息成功传递给交换机时会触发 MyConfirmCallback 中的回调方法 confirm()
        rabbitTemplate.setConfirmCallback(myConfirmCallback);
        // 消息回退时会触发 MyConfirmCallback 中的回调方法 returnedMessage()
        rabbitTemplate.setReturnsCallback(myConfirmCallback);
    }

    @GetMapping("/confirm/sendMessage/{message}")
    public void sendMessage(@PathVariable("message") String message) {
        // 设置唯一 ID
        CorrelationData correlationData = new CorrelationData();
        correlationData.setId(UUID.randomUUID().toString());

        // 使用相同交换机,正确的 routing_key 发送消息
        rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE, CONFIRM_ROUTING_KEY, message, correlationData);
        log.info("使用相同交换机,正确的 routing_key 发送消息,发送的消息是: {}", message);

        // 设置唯一 ID
        CorrelationData correlationData2 = new CorrelationData();
        correlationData2.setId(UUID.randomUUID().toString());
        message = message + ",也是最机智的";
        // 使用相同交换机,错误的 routing_key 发送消息
        rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE, CONFIRM_ROUTING_KEY_ERROR, message, correlationData2);

        log.info("使用相同交换机,正确的 routing_key 发送消息,发送的消息是: {}", message);
    }
}

9、测试

浏览器发送请求 http://localhost:8080/confirm/sendMessage/小毛毛是最可爱的

IDEA 控制台

RabbitMQ 发布确认结果分析

生产者发送两条消息,交换机是相同的(正确的交换机),使用的两个 routing_key,一个是正确的 routing_key,另外一个是错误的 routing_key,这样生产者发送的两条消息,一条会被正确的发送到 confirm_queue,被 ConfirmConsumer 消费掉,另外一条由于使用了错误的 routing_key, confirm_exchange 不能正确的路由到 confirm_queue,这种情况下 confirm_exchange 会将不能路由的消息转发到 backup_exchange,然后被 LogConsumer、WarningConsumer 消费掉

注意事项:我们同时开启了备份交换和消息回退,哪个优先级更高呢?

从 IDEA 控制台可以看出,控制台没有消息回退相关的日志信息,所以是备份交换机的优先级更高

 

上一篇:输入框弹框 $prompt


下一篇:爬虫+数据分析,制作一个世界疫情人数增长动态柱状竞赛图