RabbitMQ学习09--死信队列(TTL过期)

1、死信的概念:

死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。

应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ的死信队列机制,当消息消费发生异常时,将消息投入死信队列中.还有比如说:用户在商城下单成功并点击去支付后在指定时间未支付时自动失效。

 

2、死信的来源:

消息 TTL 过期

队列达到最大长度(队列满了,无法再添加数据到 mq 中)

消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false.

 

3、代码示例:

RabbitMQ学习09--死信队列(TTL过期)

 

 

工厂类:

 1 package com.yas.config;
 2 
 3 import com.rabbitmq.client.Channel;
 4 import com.rabbitmq.client.Connection;
 5 import com.rabbitmq.client.ConnectionFactory;
 6 
 7 import java.io.IOException;
 8 import java.util.concurrent.TimeoutException;
 9 
10 public class RabbitMQClient {
11     public static Connection getConnection(){
12         //创建Connection工厂
13         ConnectionFactory factory = new ConnectionFactory();
14         factory.setHost("106.12.17.17");
15         factory.setPort(5672);
16         factory.setUsername("admin");
17         factory.setPassword("cs1986@0312");
18         factory.setVirtualHost("/");
19 
20         //创建Connection
21         Connection connection = null;
22         try {
23             connection = factory.newConnection();
24         } catch (IOException e) {
25             e.printStackTrace();
26         } catch (TimeoutException e) {
27             e.printStackTrace();
28         }
29         return connection;
30     }
31 
32     public static Channel getChannel(){
33         Connection connection = getConnection();
34         try {
35             return connection.createChannel();
36         } catch (IOException e) {
37             e.printStackTrace();
38         }
39         return null;
40     }
41 }

 

消息生产者代码:

 1 package com.yas.deadexchange;
 2 
 3 import com.rabbitmq.client.AMQP;
 4 import com.rabbitmq.client.BuiltinExchangeType;
 5 import com.rabbitmq.client.Channel;
 6 import com.yas.config.RabbitMQClient;
 7 import org.junit.Test;
 8 
 9 public class Publisher {
10     private static final String NORMAL_EXCHANGE = "normal_exchange";
11 
12     @Test
13     public void publish() throws Exception {
14         try (Channel channel = RabbitMQClient.getChannel()) {
15             channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
16             //设置消息的 TTL 时间
17             AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
18             //该信息是用作演示队列个数限制
19             for (int i = 1; i <11 ; i++) {
20                 String message="info"+i;
21                 //channel.basicPublish(NORMAL_EXCHANGE, message.getBytes());
22                 channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes());
23                 System.out.println("生产者发送消息:"+message);
24             }
25         }
26     }
27 }

 

正常队列(normal_queue)消费者:

 1 package com.yas.deadexchange;
 2 
 3 import com.rabbitmq.client.*;
 4 import com.yas.config.RabbitMQClient;
 5 import org.junit.Test;
 6 
 7 import java.util.HashMap;
 8 import java.util.Map;
 9 
10 public class Consumer01 {
11     public static final String NORMAL_EXCHANGE = "normal_exchange";
12     public static final String DEAD_EXCHANGE = "dead_exchange";
13     public static final String NORMAL_QUEUE = "normal_queue";
14     public static final String DEAD_QUEUE = "dead_queue";
15 
16     @Test
17     public void consume() throws Exception {
18         //1.获取连接对象
19         Connection connection = RabbitMQClient.getConnection();
20         //2.创建channel
21         Channel channel = connection.createChannel();
22 
23         //声明死信交换机 类型为 direct
24         channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
25         //声明死信队列
26         channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
27         //死信队列绑定死信交换机与 routingkey
28         channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");
29 
30 
31         //声明普通交换机 类型为 direct
32         channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
33         //正常队列绑定死信队列信息
34         Map<String, Object> params = new HashMap<>();
35         //正常队列设置死信交换机 参数 key 是固定值
36         params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
37         //正常队列设置死信 routing-key 参数 key 是固定值
38         params.put("x-dead-letter-routing-key", "lisi");
39         //声明普通队列
40         channel.queueDeclare(NORMAL_QUEUE, false, false, false, params);
41         //普通队列绑定普通交换机routing-key
42         channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");
43         System.out.println("等待接收消息........... ");
44 
45         //消费者回调
46         DeliverCallback deliverCallback = (consumerTag, delivery) -> {
47             String message = new String(delivery.getBody(), "UTF-8");
48             System.out.println("Consumer01 接收到消息"+message);
49             };
50         channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, consumerTag -> {
51         });
52 
53         System.in.read();
54         //5/释放资源
55         channel.close();
56         connection.close();
57     }
58 }

测试方式:

1.正常消费,先执行消费者,开启消费监听。再开启生产者,则消息从normal_queue被消费。

2.死信队列,不开启消费者,只开启生产者,消息先发送到normal_queue,等10秒超时后,会从normal_queue转发给dead_queue。消息进入死信队列。

 

死信队列消费者:

 1 package com.yas.deadexchange;
 2 
 3 import com.rabbitmq.client.BuiltinExchangeType;
 4 import com.rabbitmq.client.Channel;
 5 import com.rabbitmq.client.Connection;
 6 import com.rabbitmq.client.DeliverCallback;
 7 import com.yas.config.RabbitMQClient;
 8 import org.junit.Test;
 9 
10 //消费死信队列中的消息
11 public class Consumer02 {
12 
13     private static final String DEAD_EXCHANGE = "dead_exchange";
14 
15     @Test
16     public void consume() throws Exception {
17         //1.获取连接对象
18         Connection connection = RabbitMQClient.getConnection();
19         //2.创建channel
20         Channel channel = connection.createChannel();
21 
22         channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
23         String deadQueue = "dead_queue";
24         channel.queueDeclare(deadQueue, false, false, false, null);
25         channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
26         System.out.println("等待接收死信队列消息........... ");
27         DeliverCallback deliverCallback = (consumerTag, delivery) ->
28         {String message = new String(delivery.getBody(), "UTF-8");
29             System.out.println("Consumer02 接收死信队列的消息" + message);
30         };
31         channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {
32         });
33 
34         System.in.read();
35         //5/释放资源
36         channel.close();
37         connection.close();
38     }
39 }

测试方式:

对于进入死信队列的信息,可以通过启动私信队列的消费者完成消费。

 

上一篇:(转)pika详解 (一)


下一篇:(转)pika详解(四) channel 通道