Rabbitmq死信队列

目录

死信队列定义

消息中间件中的消息被拒绝时,它将被重新publish到另一个交换机上,这个交换机我们就叫做死信交换机,死信交换机将被拒绝的消息投递到一个队列上,该队列就是死信队列。死信队列和普通队列一样,有交换机和路由key。

Rabbitmq死信队列

产生死信队列的几种情况

  • 队列达到最大长度
  • 消息ttl过期
  • 消息被拒绝(basic.reject/ basic.nack)并且requeue=false

应答模式

应答模式分为两种,手动签收和自动签收,自动应答就是消费者消费了一条消息就自动告诉队列删除消息。这样的弊端就是不管消费逻辑有没有成功,都会将消息删除,这样就会造成消息丢失。而使用手动签收后,就是在消费逻辑处理完成后,手动告诉队列消费成功,然后队列移除该条消息。

开启手动应答模式

spring.rabbitmq.listener.simple.acknowledge-mode = manual

代码实现

  1. 新建项目springboot-rabbitmq,并引入pom依赖

      <dependency>
      	  <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-amqp</artifactId>
      </dependency>
    
  2. 在配置文件中添加rabbitmq相关配置

    spring:
        rabbitmq:
            host: 192.168.0.130
            username: guest
            password: guest
            virtual-host: test
            template:
                retry:
                    enabled: true
            listener:
                simple:
                    acknowledge-mode: manual
                    default-requeue-rejected: false
                type: simple
            publisher-returns: true
            publisher-confirm-type: correlated
    

    配置参数:

     1. rabbitmq.publisher-confirm-type=correlated
        # NONE 值是禁用发布确认模式,是默认值;
        # CORRELATED 值是发布消息成功到交换机后会触发回调方法;
        # SIMPLE 值有两种结果,其一效果和 CORRELATED 值一样会触发回调方法,其二在发布消息成功后
         	使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法等待节点返回发送结果
       		根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie 方法如果返回false 则会关闭channel,则接下来无法发送消息到broker;
     2. rabbitmq.publisher-returns=true
    	Return 消息机制用于处理一个不可路由的消息。在某些情况下,
    	如果我们在发送消息的是否,当前的 exchange 不存在或者指定路由 key 路由找不到,
    	这个时候需要使用 Return 来监听这种不可达的消息。
     3.  rabbitmq.listener.simple.acknowledge-mode=manual
    	开启手动应答
     4. rabbitmq.template.retry.enabled=true
     	是否开启重试
     5. rabbitmq.listener.simple.default-requeue-rejected=false
    	值为false,如果达到重试次数上限,将消息放入到死信队列中
    
  3. 创建rabbitmq配置文件

    import com.lee.constants.RabbitConstants;
    import org.springframework.amqp.core.*;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.HashMap;
    import java.util.Map;
    
    //配置队列信息
    @Configuration
    public class RabbitQueueConfig {
    
        //队列过期时间
        private int orderQueueTTL = 3000;
    
        // 配置普通队列
        @Bean
        public Queue orderQueue() {
            Map<String,Object> deadParamsMap = new HashMap<String, Object>();
            // 设置死信队列的Exchange
            deadParamsMap.put("x-dead-letter-exchange",RabbitConstants.DEAD_EXCHANGE);
            //设置死信队列的RouteKey
            deadParamsMap.put("x-dead-letter-routing-key",RabbitConstants.DEAD_ROUTE_KEY);
            // 设置对接过期时间"x-message-ttl"
    //        deadParamsMap.put("x-message-ttl",orderQueueTTL);
            // 设置对接可以存储的最大消息数量
            deadParamsMap.put("x-max-length",20);
    //        return QueueBuilder.durable(RabbitConstants.ORDER_QUEUE)
    //                .withArguments(deadParamsMap)
    //                .build();
            return new Queue(RabbitConstants.ORDER_QUEUE,true,false,false,deadParamsMap);
        }
    
        // 普通交换机
        @Bean
        public TopicExchange orderTopicExchange() {
            return new TopicExchange(RabbitConstants.ORDER_EXCHANGE);
        }
    
        // 绑定
        @Bean
        public Binding orderBinding() {
            return BindingBuilder.bind(orderQueue())
                    .to(orderTopicExchange())
                    .with(RabbitConstants.ORDER_ROUTE_KEY);
        }
    
        //配置死信队列
        @Bean
        public Queue deadQueue() {
    //        return QueueBuilder.durable(RabbitConstants.DEAD_QUEUE)
    ////                .withArguments(deadParamsMap)
    //                .build();
            return new Queue(RabbitConstants.DEAD_QUEUE);
        }
    
        // 死信交换机
        @Bean
        public DirectExchange deadExchange() {
            return new DirectExchange(RabbitConstants.DEAD_EXCHANGE);
        }
    
        // 死信  绑定
        @Bean
        public Binding deadBinding() {
            return BindingBuilder.bind(deadQueue())
                    .to(deadExchange())
                    .with(RabbitConstants.DEAD_ROUTE_KEY);
        }
    }
    
  4. 常量类定义

    //定义个常量类
    public class RabbitConstants {
    
        //订单队列
        public static final String ORDER_ROUTE_KEY = "order_route_key2";
        public static final String ORDER_EXCHANGE = "order_exchange2";
        public static final String ORDER_QUEUE = "order_queue_test2";
    
        //死信队列
        public static final String DEAD_QUEUE = "dead_queue2";
        public static final String DEAD_EXCHANGE = "dead_exchange2";
        public static final String DEAD_ROUTE_KEY = "dead_route_key2";
    }
    
  5. 测试Service
    模式发送500个消息,队列大小为20

    @Service
    public class OrderProducerService implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {
    
        @Autowired
        private RabbitTemplate rabbitTemplate ;
    
    
        @PostConstruct
        public void setCallback() throws InterruptedException {
            // 设置返回回调
            rabbitTemplate.setReturnCallback(this);
            // 设置确认回调
            rabbitTemplate.setConfirmCallback(this);
            // 模拟消息发送
    //        Runnable runnable = new Runnable() {
    //            public void run() {
    //                send("这是我发送的测试消息,测试id="+ UUID.randomUUID().toString());
    //            }
    //        };
    //        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
    //        scheduledExecutorService.scheduleAtFixedRate(runnable,20,5, TimeUnit.SECONDS);
            for (int x=0;x<500;x++) {
                Thread.sleep(100);
                send("这是我发送的测试消息,测试id="+ UUID.randomUUID().toString());
            }
        }
    
        private void send(String message) {
            CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
            rabbitTemplate.convertAndSend(RabbitConstants.ORDER_EXCHANGE, RabbitConstants.ORDER_ROUTE_KEY, message, correlationData);
        }
    
        /**
         * 回调确认消息是否发送成功  confirm机制只保证消息到达交换机,不保证消息可以路由到正确的queue,如果交换机错误,就会触发confirm机制
         * @param correlationData
         * @param ack
         * @param s
         */
        public void confirm(CorrelationData correlationData, boolean ack, String s) {
            System.out.println("消息发送成功,发送ack确认,id="+correlationData.getId());
            if (ack){
                System.out.println("发送成功");
            }else {
                System.out.println("发送失败");
            }
        }
    
        /**
         *  Return 消息机制用于处理一个不可路由的消息。在某些情况下,如果我们在发送消息的是否,当前的 exchange 不存在或者指定路由 key 路由找不到,
         *      *  这个时候需要使用 Return 来监听这种不可达的消息
         * @param message
         * @param replyCode
         * @param replyText
         * @param exchange
         * @param routingKey
         */
        public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
            System.out.println("消息丢失, 没有投递成功");
        }
    
        // 监听死信队列
        @RabbitListener(bindings = @QueueBinding(value = @Queue(value = RabbitConstants.DEAD_QUEUE),exchange = @Exchange(value = RabbitConstants.DEAD_EXCHANGE),
                key = RabbitConstants.DEAD_ROUTE_KEY))
        public void deadQueueListener(Message message, Channel channel) throws InterruptedException, IOException {
            System.out.println("死信队列");
            String receivedRoutingKey = message.getMessageProperties().getReceivedRoutingKey();
            String msg = new String(message.getBody());
            System.out.println("路由key= [ "+receivedRoutingKey+" ]接收到的消息= [ "+msg +" ]");
            //Thread.sleep(5000);
            // 发送ack给消息队列,收到消息了
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
    
        }
    
        // 监听订单队列
        @RabbitListener(queues = RabbitConstants.ORDER_QUEUE)
        public void orderQueueListener(Message message, Channel channel) throws IOException, InterruptedException {
            System.out.println("正常队列");
            String receivedRoutingKey = message.getMessageProperties().getReceivedRoutingKey();
            String msg = new String(message.getBody());
            System.out.println("路由key= [ "+receivedRoutingKey+" ]接收到的消息= [ "+msg +" ]");
            // 休眠 当队列消息ttl达到5000时,交由死信队列
            Thread.sleep(1000);
            //发送ack给消息队列,收到消息了
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
        }
    }
    
  6. 测试请求

    Rabbitmq控制台
    Rabbitmq死信队列
    业务控制台消费
    Rabbitmq死信队列

参考文章链接:
https://www.freesion.com/article/98691194274/
https://segmentfault.com/a/1190000022109462

项目地址:spring-cloud-nacos

上一篇:C#中MaskedTextBox的电子邮件掩码


下一篇:Linux下Tomcat端口、进程以及防火墙设置