????博客首页:崇尚学技术的科班人
????小肖来了
????今天给大家带来的文章是《万字 +图片解析死信队列和死信实战演练》????
????有的小伙伴可能会问死信队列有啥用?你看了这篇文章就知道了????
????希望各位小伙伴们能够耐心的读完这篇文章????
????博主也在学习阶段,如若发现问题,请告知,非常感谢????
????同时也非常感谢各位小伙伴们的支持????
1、死信队列
1.1、概念
-
死信:就是
无法被消费的消息
。由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。 - 应用场景:保证订单业务的消息数据不丢失,当消息发生异常时,将消息投入死信队列中。比如说:用户在商城下单成功并点击去支付后在指定时间未支付时自动失效。
1.2、死信来源
- 消息
TTL
过期 - 队列达到最大长度(队列满了,无法再添加数据到队列中)。
- 消息被拒绝并且
requeue = false
1.3、死信实战
1.3.1、代码架构图
1.3.2、TTL过期情况
1. 消费者01
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xiao.utils.RabbitmqUtil;
import java.util.HashMap;
import java.util.Map;
public class Consumer01 {
public static final String DEAD_EXCHANGE = "dead_exchange";
public static final String NORMAL_EXCHANGE = "normal_exchange";
public static final String DEAD_QUEUE = "dead_queue";
public static final String NORMAL_QUEUE = "normal_queue";
/**
* 死信实战
* 消费者01
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception{
Channel channel = RabbitmqUtil.getChannel();
// 死信交换机
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
// 普通交换机
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
Map<String,Object> map = new HashMap<>();
map.put("x-dead-letter-exchange",DEAD_EXCHANGE);
map.put("x-dead-letter-routing-key","lisi");
map.put("x-message-ttl",10000);
// 普通队列
channel.queueDeclare(NORMAL_QUEUE,false,false,false,map);
// 死信队列
channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
// 队列绑定
channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
System.out.println("等待接收消息......");
DeliverCallback deliverCallback = (var1, var2)->{
System.out.println("Consumer01控制台接收到的消息是:" + new String(var2.getBody(),"UTF-8"));
};
channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,var1->{});
}
}
- 最为复杂的就是
消费者01
,它需要进行 死信交换机绑定死信队列、普通交换机绑定普通队列、普通队列绑定死信交换机。 - 我们为了让消息不被消费,我们需要制造假死现象,也就是
关闭消费者01
。
2. 消费者02
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xiao.utils.RabbitmqUtil;
import java.util.HashMap;
import java.util.Map;
public class Consumer02 {
/**
* 消费者02
*/
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception{
Channel channel = RabbitmqUtil.getChannel();
System.out.println("等待接收消息......");
DeliverCallback deliverCallback = (var1, var2)->{
System.out.println("Consumer02控制台接收到的消息是:" + new String(var2.getBody(),"UTF-8"));
};
// 只需要面向 死信队列消费消息,里面的关系绑定已经由C1完成
channel.basicConsume(DEAD_QUEUE,true,deliverCallback,var1->{});
}
}
3. 生产者
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.xiao.utils.RabbitmqUtil;
import java.nio.charset.StandardCharsets;
public class Producer {
public static final String NORMAL_EXCHANGE = "normal_exchange";
public static void main(String[] args) throws Exception {
Channel channel = RabbitmqUtil.getChannel();
// 单位是毫秒
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
for(int i = 1; i < 11; i ++){
String message = "info" + i;
// 只需要向交换机里面发送消息,里面的关系绑定已经由C1完成
channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes());
}
}
}
4. 测试结果
- 所有的消息在超过过期时间之后,全部转移到了死信队列中。
1.3.3、队列达到最大长度情况
1. 消费者01
public class Consumer01 {
public static final String DEAD_EXCHANGE = "dead_exchange";
public static final String NORMAL_EXCHANGE = "normal_exchange";
public static final String DEAD_QUEUE = "dead_queue";
public static final String NORMAL_QUEUE = "normal_queue";
/**
* 死信实战
* 消费者01
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception{
Channel channel = RabbitmqUtil.getChannel();
// 死信交换机
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
// 普通交换机
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
Map<String,Object> map = new HashMap<>();
map.put("x-dead-letter-exchange",DEAD_EXCHANGE);
map.put("x-dead-letter-routing-key","lisi");
map.put("x-max-length",6);
//map.put("x-message-ttl",10000);
// 普通队列
channel.queueDeclare(NORMAL_QUEUE,false,false,false,map);
// 死信队列
channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
// 队列绑定
channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
System.out.println("等待接收消息......");
DeliverCallback deliverCallback = (var1, var2)->{
System.out.println("Consumer01控制台接收到的消息是:" + new String(var2.getBody(),"UTF-8"));
};
channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,var1->{});
}
}
- 这里我们将过期时间参数改为了
队列最大长度
- 我们为了让消息不被消费和观察到明显现象,我们需要制造假死现象,也就是
关闭消费者01
。
2. 消费者02
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xiao.utils.RabbitmqUtil;
import java.util.HashMap;
import java.util.Map;
public class Consumer02 {
/**
* 消费者02
*/
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception{
Channel channel = RabbitmqUtil.getChannel();
System.out.println("等待接收消息......");
DeliverCallback deliverCallback = (var1, var2)->{
System.out.println("Consumer02控制台接收到的消息是:" + new String(var2.getBody(),"UTF-8"));
};
// 只需要面向 死信队列消费消息,里面的关系绑定已经由C1完成
channel.basicConsume(DEAD_QUEUE,true,deliverCallback,var1->{});
}
}
- 消费者02和TTL过期情况下的一模一样
3. 生产者
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.xiao.utils.RabbitmqUtil;
import java.nio.charset.StandardCharsets;
public class Producer {
public static final String NORMAL_EXCHANGE = "normal_exchange";
public static void main(String[] args) throws Exception {
Channel channel = RabbitmqUtil.getChannel();
// 单位是毫秒
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
for(int i = 1; i < 11; i ++){
String message = "info" + i;
// 只需要向交换机里面发送消息,里面的关系绑定已经由C1完成
channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes());
}
}
}
- 我们将对应的设置过期时间注释掉
4. 测试结果
- 如果我们启动消费者01会报错,那是因为我们所创建的队列已经存在,我们需要把普通队列删除,因为只有它的参数发生了改变。
- 因为我们设置了普通队列的最大长度6,所以当超过了最大长度的消息都会被作为死信。
1.3.4、消息被拒情况
1. 消费者01
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xiao.utils.RabbitmqUtil;
import java.util.HashMap;
import java.util.Map;
public class Consumer01 {
public static final String DEAD_EXCHANGE = "dead_exchange";
public static final String NORMAL_EXCHANGE = "normal_exchange";
public static final String DEAD_QUEUE = "dead_queue";
public static final String NORMAL_QUEUE = "normal_queue";
/**
* 死信实战
* 消费者01
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception{
Channel channel = RabbitmqUtil.getChannel();
// 死信交换机
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
// 普通交换机
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
Map<String,Object> map = new HashMap<>();
map.put("x-dead-letter-exchange",DEAD_EXCHANGE);
map.put("x-dead-letter-routing-key","lisi");
//map.put("x-max-length",6);
//map.put("x-message-ttl",10000);
// 普通队列
channel.queueDeclare(NORMAL_QUEUE,false,false,false,map);
// 死信队列
channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
// 队列绑定
channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
System.out.println("等待接收消息......");
DeliverCallback deliverCallback = (var1, var2)->{
String msg = new String(var2.getBody(),"UTF-8");
if(msg.equals("info5")){
System.out.println("Consumer01控制台接收到的消息是:" + msg + ": 此消息被拒" );
channel.basicReject(var2.getEnvelope().getDeliveryTag(),false);
}else{
System.out.println("Consumer01控制台接收到的消息是:" + msg);
channel.basicAck(var2.getEnvelope().getDeliveryTag(),false);
}
};
channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,var1->{});
}
}
- 这里我们将
队列最大长度
注释掉 - 我们
还需要开启手动应答
,因为不开启就不会存在消息被拒 的问题。
2. 消费者02
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xiao.utils.RabbitmqUtil;
import java.util.HashMap;
import java.util.Map;
public class Consumer02 {
/**
* 消费者02
*/
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception{
Channel channel = RabbitmqUtil.getChannel();
System.out.println("等待接收消息......");
DeliverCallback deliverCallback = (var1, var2)->{
System.out.println("Consumer02控制台接收到的消息是:" + new String(var2.getBody(),"UTF-8"));
};
// 只需要面向 死信队列消费消息,里面的关系绑定已经由C1完成
channel.basicConsume(DEAD_QUEUE,true,deliverCallback,var1->{});
}
}
- 消费者02和队列达到最大长度情况下的一模一样
3. 生产者
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.xiao.utils.RabbitmqUtil;
import java.nio.charset.StandardCharsets;
public class Producer {
public static final String NORMAL_EXCHANGE = "normal_exchange";
public static void main(String[] args) throws Exception {
Channel channel = RabbitmqUtil.getChannel();
// 单位是毫秒
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
for(int i = 1; i < 11; i ++){
String message = "info" + i;
// 只需要向交换机里面发送消息,里面的关系绑定已经由C1完成
channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes());
}
}
}
- 生产者和队列达到最大长度情况下的一模一样
4. 测试结果
- 测试之前我们需要将队列中的消息消费掉,并且需要将普通队列删除。
- 可见只有
"info5"
被作为死信。