延迟队列
延迟队列 又被称为 延时队列、死信队列 ,它也是 RabbitMQ 队列中的一种,指进入该队列中的消息会被延迟消费的队列。
顾名思义,延迟队列和普通队列的区别在于:
- 进入普通队列的消息将会立即『走向』下一个环节,而下一个环节就是消费者;而
- 进入延迟队列的消息将会被延迟队列『持有』若干时间,而后才『走向』下一个环节,而且下一个环节是另一个交换机。这个『另一个交换机』也被称为死信交换机。
RabbitMQ 引入延迟队列主要是用于『延迟』一定时间再处理特定的业务逻辑,而这种『延迟』在 RabbitMQ 看来是『自动化』的,无须人为进行干预。
延迟队列的使用价值在于:
-
某些业务需要这种机制。例如,订单 30 分钟内未支付则需要取消订单。
-
在某种程度上,它可以替代定时任务。
#1. 专有词汇
与普通的队列一样,延迟队列也具有消息、交换机、路由和队列等名词。不过,它还增加了 3 个专有名词:
- DLX
- Dead Letter Exchange,死信队列交换机,是一种特殊类型的交换机。
- DLK
- Dead Letter Routing-Key,死信路由,同样也是一种特殊类型的路由。主要是和 DLX 组合在一起构成死信队列。
- TTL
- Time To Live,指进入延迟队列中的消息可以存活的时间。当 TTL 一到,将意味着该消息『死了』,从而进入下一个『中转站』,等待被真正的消息队列监听消费。
普通队列 + 三个特殊设置 = 延迟队列/死信队列
在 http://localhost:15672/#/queues
中创建延迟队列时,在普通队列的基础上需要设定三个『额外』的属性。
-
Dead letter exchange:x-dead-letter-exchange 。指定延迟队列的『下家』交换机。
-
Dead letter routing key:x-dead-letter-routing-key 。延迟队列自动向『下家』交换机投递消息时所使用的消息的 routing-key。
-
Message TTL:x-message-ttl 。延迟队列要持有消息的时长。例如:10000 ,即 10s 。
#2. 延迟队列流程模型
以延迟消息为例来描述延迟队列的流程模型:
表面上看,消息生产者发出消息若干秒(以 5 秒为例)消息的消费者才消费该消息,才触发相应方法的执行。其中,核心问题的关键点在于:延迟队列(在中间环节)持有了该消息 5 秒,从而达到了延迟 5 秒的效果。
因此,简单而言,整体流程分为 3 步:
-
消息生产者将消息发送到延迟队列;
-
延迟队列(持有消息 5秒后)将消息转发给消费者队列;
-
由于消费者正『监听着』消费者队列,一旦消费者队列收到消息,消费者就从中读取消息,消费。
所以,整个环节中有 2 套 交换机 - 路由
:
-
第一套
交换机 - 路由
负责将消息从生产者路由到延迟队列; -
第二套
交换即 - 路由
负责将消息从延迟队列路由到消费者队列;
TIP
第二个交换机也被称为死信交换机,不过它的创建和设置与普通交换机没有区别。
#3. 代码配置
充分理解上图后,下面的代码配置的含义和目的就一目了然了。
@Configuration
@EnableRabbit
public class RabbitMQConfig {
public static final String first_exchange_name = "first-exchange";
public static final String second_exchange_name = "second-exchange";
public static final String first_routing_key = "first-routing-key";
public static final String second_routing_key = "second-routing-key";
public static final String first_binding = "first-binding";
public static final String second_binding = "second-binding";
public static final String dead_queue_name = "dead-queue";
public static final String real_queue_name = "real-queue";
@Bean("dead-queue")
public Queue deadQueue() {
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", second_exchange_name); // 指定时期后消息投递给哪个交换器。
args.put("x-dead-letter-routing-key", second_routing_key); // 指定到期后投递消息时以哪个路由键进行投递。
args.put("x-message-ttl", 5000); // 指定到期时间。5 秒
return new Queue(dead_queue_name, true, false, false, args);
}
@Bean("real-queue")
public Queue realQueue() {
return new Queue(real_queue_name, true, false, false);
}
/* 问题一:发出的消息凭什么会到死信队列。*/
@Bean(first_exchange_name)
public DirectExchange firstExchange() {
return new DirectExchange(first_exchange_name, true, false);
}
@Bean(first_binding)
public Binding firstBinding(@Qualifier(dead_queue_name) Queue queue,
@Qualifier(first_exchange_name) Exchange exchange) {
return BindingBuilder.bind(queue)
.to(exchange)
.with(first_routing_key)
.noargs();
}
/* 问题二:延迟队列凭什么会把消息再转给 real-queue 。*/
@Bean(second_exchange_name)
public DirectExchange secondExchange() {
return new DirectExchange(second_exchange_name, true, false);
}
@Bean(second_binding)
public Binding secondBiding(@Qualifier(real_queue_name) Queue queue,
@Qualifier(second_exchange_name) Exchange exchange) {
return BindingBuilder.bind(queue)
.to(exchange)
.with(second_routing_key)
.noargs();
}
}