什么是延时队列
指消息进入队列后不会立即被消费,可以被延迟一定的时间,再进行消费.RabbitMQ没有提供延迟队列功能,但是可以使用TTL+DLX来实现延迟队列效果
使用场景
电商平台下单后,30分钟未支付,取消订单回滚库存;新用户注册成功一周后,发送问候短信等等.
延时队列实现
模拟电商平台下单后,30分钟后未支付,取消订单回滚库存
创建配置类
@Configuration
public class DelayConfig {
/**
* 创建一个正常的队列
*
* @return
*/
@Bean
public Queue createNormalQueue() {
return QueueBuilder.durable("order_queue").build();
}
/**
* 创建一个死信队列
*
* @return
*/
@Bean
public Queue createDeadQueue() {
return QueueBuilder.durable("order_dead_queue")
.withArgument("x-dead-letter-exchange", "order_dead_exchange") //设置死信交换机
.withArgument("x-dead-letter-routing-key", "order_dead")//设置死信路由key
.withArgument("x-message-ttl", 30000)// 队列中消息30秒过期
.build();
}
/**
* 创建一个正常的交换机
*
* @return
*/
@Bean
public DirectExchange createNormalExchange() {
return new DirectExchange("order_exchange");
}
/**
* 创建死信交换机
*
* @return
*/
@Bean
public DirectExchange createDeadExchange() {
return new DirectExchange("order_dead_exchange");
}
/**
* 创建绑定:将正常队列绑定到死信交换机上面
*
* @return
*/
@Bean
public Binding createDeadBinding(@Qualifier(value = "createNormalQueue") Queue queue,
@Qualifier(value = "createDeadExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("order_dead");
}
/**
* 创建绑定:将死信队列绑定到正常的交换机上面
*
* @return
*/
@Bean
public Binding binding(@Qualifier(value = "createDeadQueue")Queue queue,
@Qualifier(value = "createNormalExchange")DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("order");
}
}
创建监听类
@Component
public class DelayListener {
@RabbitListener(queues = "order_queue")
public void listener(Message message, Channel channel, String msg) throws IOException {
// 模拟业务代码执行
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println(simpleDateFormat.format(new Date()) + "收到消息:" + msg);
System.out.println("检查订单是否付款操作开始::没有支付就取消订单,回滚库存");
// 签收消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
创建controller用于测试
@RestController
public class DelayController {
@Autowired
RabbitTemplate rabbitTemplate;
@GetMapping(value = "/send")
public void send(){
// 模拟业务代码执行
String orderId = UUID.randomUUID().toString().replace("-","");
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println(simpleDateFormat.format(new Date())+"创建订单:"+orderId);
// 通过正常的交换机和routingKey把orderId发送到死信队列
rabbitTemplate.convertAndSend("order_exchange","order",orderId);
}
}
注意
- 为了方便测试,我在配置类中的死信队列消息过期时间设置的是30秒,再真实的场景中根据自己的需求来就好了.
- 发送消息要发送给order_dead_queue(死信队列),监听要监听order_queue(正常队列)
测试
http://localhost:18081/send:再发出信息后,延迟了30秒后,消费到了信息