1、死信的概念:
死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。
应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ的死信队列机制,当消息消费发生异常时,将消息投入死信队列中.还有比如说:用户在商城下单成功并点击去支付后在指定时间未支付时自动失效。
2、死信的来源:
消息 TTL 过期
队列达到最大长度(队列满了,无法再添加数据到 mq 中)
消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false.
3、代码示例:
工厂类:
1 package com.yas.config; 2 3 import com.rabbitmq.client.Channel; 4 import com.rabbitmq.client.Connection; 5 import com.rabbitmq.client.ConnectionFactory; 6 7 import java.io.IOException; 8 import java.util.concurrent.TimeoutException; 9 10 public class RabbitMQClient { 11 public static Connection getConnection(){ 12 //创建Connection工厂 13 ConnectionFactory factory = new ConnectionFactory(); 14 factory.setHost("106.12.17.17"); 15 factory.setPort(5672); 16 factory.setUsername("admin"); 17 factory.setPassword("cs1986@0312"); 18 factory.setVirtualHost("/"); 19 20 //创建Connection 21 Connection connection = null; 22 try { 23 connection = factory.newConnection(); 24 } catch (IOException e) { 25 e.printStackTrace(); 26 } catch (TimeoutException e) { 27 e.printStackTrace(); 28 } 29 return connection; 30 } 31 32 public static Channel getChannel(){ 33 Connection connection = getConnection(); 34 try { 35 return connection.createChannel(); 36 } catch (IOException e) { 37 e.printStackTrace(); 38 } 39 return null; 40 } 41 }
消息生产者代码:
1 package com.yas.deadexchange; 2 3 import com.rabbitmq.client.AMQP; 4 import com.rabbitmq.client.BuiltinExchangeType; 5 import com.rabbitmq.client.Channel; 6 import com.yas.config.RabbitMQClient; 7 import org.junit.Test; 8 9 public class Publisher { 10 private static final String NORMAL_EXCHANGE = "normal_exchange"; 11 12 @Test 13 public void publish() throws Exception { 14 try (Channel channel = RabbitMQClient.getChannel()) { 15 channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); 16 //设置消息的 TTL 时间 17 AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build(); 18 //该信息是用作演示队列个数限制 19 for (int i = 1; i <11 ; i++) { 20 String message="info"+i; 21 //channel.basicPublish(NORMAL_EXCHANGE, message.getBytes()); 22 channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes()); 23 System.out.println("生产者发送消息:"+message); 24 } 25 } 26 } 27 }
正常队列(normal_queue)消费者:
1 package com.yas.deadexchange; 2 3 import com.rabbitmq.client.*; 4 import com.yas.config.RabbitMQClient; 5 import org.junit.Test; 6 7 import java.util.HashMap; 8 import java.util.Map; 9 10 public class Consumer01 { 11 public static final String NORMAL_EXCHANGE = "normal_exchange"; 12 public static final String DEAD_EXCHANGE = "dead_exchange"; 13 public static final String NORMAL_QUEUE = "normal_queue"; 14 public static final String DEAD_QUEUE = "dead_queue"; 15 16 @Test 17 public void consume() throws Exception { 18 //1.获取连接对象 19 Connection connection = RabbitMQClient.getConnection(); 20 //2.创建channel 21 Channel channel = connection.createChannel(); 22 23 //声明死信交换机 类型为 direct 24 channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); 25 //声明死信队列 26 channel.queueDeclare(DEAD_QUEUE, false, false, false, null); 27 //死信队列绑定死信交换机与 routingkey 28 channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi"); 29 30 31 //声明普通交换机 类型为 direct 32 channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); 33 //正常队列绑定死信队列信息 34 Map<String, Object> params = new HashMap<>(); 35 //正常队列设置死信交换机 参数 key 是固定值 36 params.put("x-dead-letter-exchange", DEAD_EXCHANGE); 37 //正常队列设置死信 routing-key 参数 key 是固定值 38 params.put("x-dead-letter-routing-key", "lisi"); 39 //声明普通队列 40 channel.queueDeclare(NORMAL_QUEUE, false, false, false, params); 41 //普通队列绑定普通交换机routing-key 42 channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan"); 43 System.out.println("等待接收消息........... "); 44 45 //消费者回调 46 DeliverCallback deliverCallback = (consumerTag, delivery) -> { 47 String message = new String(delivery.getBody(), "UTF-8"); 48 System.out.println("Consumer01 接收到消息"+message); 49 }; 50 channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, consumerTag -> { 51 }); 52 53 System.in.read(); 54 //5/释放资源 55 channel.close(); 56 connection.close(); 57 } 58 }
测试方式:
1.正常消费,先执行消费者,开启消费监听。再开启生产者,则消息从normal_queue被消费。
2.死信队列,不开启消费者,只开启生产者,消息先发送到normal_queue,等10秒超时后,会从normal_queue转发给dead_queue。消息进入死信队列。
死信队列消费者:
1 package com.yas.deadexchange; 2 3 import com.rabbitmq.client.BuiltinExchangeType; 4 import com.rabbitmq.client.Channel; 5 import com.rabbitmq.client.Connection; 6 import com.rabbitmq.client.DeliverCallback; 7 import com.yas.config.RabbitMQClient; 8 import org.junit.Test; 9 10 //消费死信队列中的消息 11 public class Consumer02 { 12 13 private static final String DEAD_EXCHANGE = "dead_exchange"; 14 15 @Test 16 public void consume() throws Exception { 17 //1.获取连接对象 18 Connection connection = RabbitMQClient.getConnection(); 19 //2.创建channel 20 Channel channel = connection.createChannel(); 21 22 channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); 23 String deadQueue = "dead_queue"; 24 channel.queueDeclare(deadQueue, false, false, false, null); 25 channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi"); 26 System.out.println("等待接收死信队列消息........... "); 27 DeliverCallback deliverCallback = (consumerTag, delivery) -> 28 {String message = new String(delivery.getBody(), "UTF-8"); 29 System.out.println("Consumer02 接收死信队列的消息" + message); 30 }; 31 channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> { 32 }); 33 34 System.in.read(); 35 //5/释放资源 36 channel.close(); 37 connection.close(); 38 } 39 }
测试方式:
对于进入死信队列的信息,可以通过启动私信队列的消费者完成消费。