RabbitMQ死信队列与消息幂等问题

死信队列

1,死信队列产生的背景

        RabbitMQ死信队列俗称,备胎队列;消息中间件因为某种原因拒收该消息后,可以转移到死信队列中存放,死信队列也可以有交换机和路由key等。

2,产生死信队列的原因

  • 消息投递到MQ中存放 消息已经过期  消费者没有及时的获取到我们消息,消息如果存放到mq服务器中过期之后,会转移到备胎死信队列存放。
  • 队列达到最大的长度 (队列容器已经满了)
  • 消费者消费多次消息失败,就会转移存放到死信队列中

RabbitMQ死信队列与消息幂等问题

3,死信队列的架构原理

        死信队列和普通队列区别不是很大,普通与死信队列都有自己独立的交换机和路由key、队列和消费者。

主要区别如下

  • 1.生产者投递消息先投递到我们普通交换机中,普通交换机在将该消息投到普通队列中缓存起来,普通队列对应有自己独立普通消费者。
  • 2.如果生产者投递消息到普通队列中,普通队列发现该消息一直没有被消费者消费的情况下,在这时候会将该消息转移到死信(备胎)交换机中,死信(备胎)交换机对应有自己独立的 死信(备胎)队列 对应独立死信(备胎)消费者。

 4,死信队列应用场景

30分钟订单超时设计

  1. Redis过期key :
  2. 死信延迟队列实现:采用死信队列,创建一个普通队列没有对应的消费者消费消息,在30分钟过后就会将该消息转移到死信备胎消费者实现消费。备胎死信消费者会根据该订单号码查询是否已经支付过,如果没有支付的情况下则会开始回滚库存操作
     /**
         * 声明死信交换机
         *
         * @return DirectExchange
         */
        @Bean
        public DirectExchange dlxExchange() {
            return new DirectExchange(dlxExchange);
        }
        /**
         * 声明死信队列
         *
         * @return Queue
         */
        @Bean
        public Queue dlxQueue() {
            return new Queue(dlxQueue);
        }
        /**
         * 声明订单业务交换机
         *
         * @return DirectExchange
         */
        @Bean
        public DirectExchange orderExchange() {
            return new DirectExchange(orderExchange);
        }
        /**
         * 声明订单队列
         *
         * @return Queue
         */
        @Bean
        public Queue orderQueue() {
            // 订单队列绑定我们的死信交换机
            Map<String, Object> arguments = new HashMap<>(2);
            arguments.put("x-dead-letter-exchange", dlxExchange);
            arguments.put("x-dead-letter-routing-key", dlxRoutingKey);
            return new Queue(orderQueue, true, false, false, arguments);
        }
        /**
         * 绑定死信队列到死信交换机
         *
         * @return Binding
         */
        @Bean
        public Binding binding() {
            return BindingBuilder.bind(dlxQueue())
                    .to(dlxExchange())
                    .with(dlxRoutingKey);
        }
        /**
         * 绑定订单队列到订单交换机
         *
         * @return Binding
         */
        @Bean
        public Binding orderBinding() {
            return BindingBuilder.bind(orderQueue())
                    .to(orderExchange())
                    .with(orderRoutingKey);
        }

 消息幂等问题

1,RabbitMQ消息自动重试机制

        当我们消费者处理执行我们业务代码的时候,如果抛出异常的情况下在这时候mq会自动触发重试机制,默认的情况下rabbitmq是无限次数的重试。需要人为指定重试次数限制问题

2,在什么情况下消费者需要实现重试策略

A.消费者获取消息后,调用第三方接口,但是调用第三方接口失败呢?是否需要重试?

        该情况下需要实现重试策略,网络延迟只是暂时调用不通,重试多次有可能会调用通。

B.消费者获取消息后,因为代码问题抛出数据异常,是否需要重试?

        该情况下是不需要实现重试策略,就算重试多次,最终还是失败的。可以将日志存放起来,后期通过定时任务或者人工补偿形式。如果是重试多次还是失败消息,需要重新发布消费者版本实现消费,可以使用死信队列

Mq在重试的过程中,有可能会引发消费者重复消费的问题Mq消费者需要解决 幂等性问题幂等性 保证数据唯一

解决方式

生产者在投递消息的时候,生成一个全局唯一id,放在我们消息中,消费者获取到我们该消息,可以根据该全局唯一id实现查数据库去重复。全局唯一id 根据业务来定的  比如订单号码作为全局的id,然后db层面解决数据防重复,在做insert操作 使用唯一主键约束,在做update操作 使用乐观锁

当消费者业务逻辑代码中,抛出异常自动实现重试 (默认是无数次重试)

应该对RabbitMQ重试次数实现限制,比如最多重试5次,每次间隔3s;重试多次还是失败的情况下,存放到死信队列或者存放到数据库表中记录后期人工补偿

spring:
  rabbitmq:
    ####连接地址
    host: 127.0.0.1
    ####端口号
    port: 5672
    ####账号
    username: aaa
    ####密码
    password: aaa
    ### 地址
    virtual-host: /zhangsan
    listener:
      simple:
        retry:
          ####开启消费者(程序出现异常的情况下会)进行重试
          enabled: true
          ####最大重试次数
          max-attempts: 5
          ####重试间隔时间 3秒
          initial-interval: 3000
        acknowledge-mode: manual #开启消息发送确认
@Slf4j
@Component
@RabbitListener(queues = "fanout_order_queue")
public class FanoutOrderConsumer {

    @Autowired
    private OrderManager orderManager;
    @Autowired
    private OrderMapper orderMapper;

    @RabbitHandler
    public void process(OrderEntity orderEntity, Message message, Channel channel) throws IOException {
        try {
        log.info(">>orderEntity:{}<<", orderEntity.toString());
        String orderId = orderEntity.getOrderId();
        if (StringUtils.isEmpty(orderId)) {
            return;
        }
        OrderEntity dbOrderEntity = orderMapper.getOrder(orderId);
        if (dbOrderEntity != null) {
            log.info("另外消费者已经处理过该业务逻辑");
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            return;
        }
        int result = orderManager.addOrder(orderEntity);
        int i = 1 / 0;
        log.info(">>插入数据库中数据成功<<");
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            // 记录该消息日志形式  存放数据库db中、后期通过定时任务实现消息补偿、人工实现补偿

            //将该消息存放到死信队列中,单独写一个死信消费者实现消费。
        }
    }
}

看这代码,就是在添加数据库之前通过全局id先查下数据库时候有这条数据,如果有这条数据则直接消息确认,然后在数据库设置唯一约束,很好解决消费者代来的幂等问题

上一篇:P1093 [NOIP2007 普及组] 奖学金


下一篇:事务特征以及隔离级别,rabbitmq实战