RabbitMQ实现延时队列

什么是延时队列

指消息进入队列后不会立即被消费,可以被延迟一定的时间,再进行消费.RabbitMQ没有提供延迟队列功能,但是可以使用TTL+DLX来实现延迟队列效果

使用场景

电商平台下单后,30分钟未支付,取消订单回滚库存;新用户注册成功一周后,发送问候短信等等.

延时队列实现

模拟电商平台下单后,30分钟后未支付,取消订单回滚库存
RabbitMQ实现延时队列

创建配置类

@Configuration
public class DelayConfig {

    /**
     * 创建一个正常的队列
     *
     * @return
     */
    @Bean
    public Queue createNormalQueue() {
        return QueueBuilder.durable("order_queue").build();
    }

    /**
     * 创建一个死信队列
     *
     * @return
     */
    @Bean
    public Queue createDeadQueue() {
        return QueueBuilder.durable("order_dead_queue")
                .withArgument("x-dead-letter-exchange", "order_dead_exchange") //设置死信交换机
                .withArgument("x-dead-letter-routing-key", "order_dead")//设置死信路由key
                .withArgument("x-message-ttl", 30000)// 队列中消息30秒过期
                .build();
    }

    /**
     * 创建一个正常的交换机
     *
     * @return
     */
    @Bean
    public DirectExchange createNormalExchange() {
        return new DirectExchange("order_exchange");
    }

    /**
     * 创建死信交换机
     *
     * @return
     */
    @Bean
    public DirectExchange createDeadExchange() {
        return new DirectExchange("order_dead_exchange");
    }

    /**
     * 创建绑定:将正常队列绑定到死信交换机上面
     *
     * @return
     */
    @Bean
    public Binding createDeadBinding(@Qualifier(value = "createNormalQueue") Queue queue,
                                     @Qualifier(value = "createDeadExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("order_dead");
    }

    /**
     * 创建绑定:将死信队列绑定到正常的交换机上面
     *
     * @return
     */
    @Bean
    public Binding binding(@Qualifier(value = "createDeadQueue")Queue queue,
                           @Qualifier(value = "createNormalExchange")DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("order");
    }
}

创建监听类

@Component
public class DelayListener {


    @RabbitListener(queues = "order_queue")
    public void listener(Message message, Channel channel, String msg) throws IOException {
        // 模拟业务代码执行
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println(simpleDateFormat.format(new Date()) + "收到消息:" + msg);
        System.out.println("检查订单是否付款操作开始::没有支付就取消订单,回滚库存");
        // 签收消息
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

创建controller用于测试

@RestController
public class DelayController {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @GetMapping(value = "/send")
    public void send(){
        // 模拟业务代码执行
        String orderId = UUID.randomUUID().toString().replace("-","");
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println(simpleDateFormat.format(new Date())+"创建订单:"+orderId);
        // 通过正常的交换机和routingKey把orderId发送到死信队列
        rabbitTemplate.convertAndSend("order_exchange","order",orderId);
    }
}

注意

  • 为了方便测试,我在配置类中的死信队列消息过期时间设置的是30秒,再真实的场景中根据自己的需求来就好了.
  • 发送消息要发送给order_dead_queue(死信队列),监听要监听order_queue(正常队列)

测试

http://localhost:18081/send:再发出信息后,延迟了30秒后,消费到了信息
RabbitMQ实现延时队列

上一篇:Spring Cloud Gateway 自定义Filter


下一篇:RabbitMQ -高级特性