rabbitmq延迟队列

  • 延迟消息是指的消息发送出去后并不想立即就被消费,而是需要等(指定的)一段时间后才触发消 费。
  • 例如下面的业务场景:在支付宝上面买电影票,锁定了一个座位后系统默认会帮你保留15分钟时
    间,如果15分钟后还没付款那么不好意思系统会自动把座位释放掉。怎么实现类似的功能呢?
  1. 可以用定时任务每分钟扫一次,发现有占座超过15分钟还没付款的就释放掉。但是这样做很
    低效,很多时候做的都是些无用功;

  2. 可以用分布式锁、分布式缓存的被动过期时间,15分钟过期后锁也释放了,缓存key也不存在
    了;

  3. 还可以用延迟队列,锁座成功后会发送1条延迟消息,这条消息15分钟后才会被消费,消费的
    过程就是检查这个座位是否已经是“已付款”状态;

可以使用rabbitmq_delayed_message_exchange插件实现。

这里和TTL方式有个很大的不同就是TTL存放消息在死信队列(delayqueue)里,二基于插件存放消息
在延时交换机里(x-delayed-message exchange)。

1.下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

2.安装插件
将插件拷贝到rabbitmq-server的安装路径:/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.4/plugins
3.启用插件

rabbitmq-plugins list
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

4.重启rabbitmq-server

systemctl restart rabbitmq-server
  1. 编写代码,首先是SpringBootApplication主入口类
package com.demo.demo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

}

RabbitMQ的对象配置

package com.demo.demo.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitConfig {

    @Bean
    public Queue queue(){
        return new Queue("q.delayed",false,false,false);
    }

    @Bean
    public Exchange exchange(){
        Map<String,Object> arguments = new HashMap<>();
        arguments.put("x-delayed-type",ExchangeTypes.DIRECT);
       return    new CustomExchange("ex.delayed","x-delayed-message",false,false,arguments);
    }

    @Bean
    public Binding binding(){
        return BindingBuilder.bind(queue()).to(exchange()).with("key.delayed").noargs();
    }
}

监听延迟队列

package com.demo.demo.listener;

 import com.rabbitmq.client.Channel;
 import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

 import java.io.IOException;

@Component
public class DlxListener {

    @RabbitListener(queues = "q.delayed")
    public void onMessage(Message mess, Channel channel) throws IOException {
        System.out.println("-----》》》》》》10秒后"+new String(mess.getBody()));
        channel.basicAck(mess.getMessageProperties().getDeliveryTag(),false);

    }

}

开发RestController,用于向延迟队列发送消息,并指定延迟的时长

package com.demo.demo.controller;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;

@RestController
public class PayCotroller {

    @Autowired
    private RabbitTemplate rabbitTemplate;


    @GetMapping("pay")
    public String pay() throws UnsupportedEncodingException {

        int seconds = 20;


        // RabbitMQ只会检查队列头部的消息是否过期,如果过期就放到死信队列
// 假如第一个过期时间很长,10s,第二个消息3s,则系统先看第一个消息,等到第一个消息过期,放到DLX
// 此时才会检查第二个消息,但实际上此时第二个消息早已经过期了,但是并没有先于第一个消息放到DLX。
// 插件rabbitmq_delayed_message_exchange帮我们搞定这个。
        MessageProperties properties = new MessageProperties();
        properties.setHeader("x-delay", (seconds - 10) * 1000);
        Message message = new Message(((seconds-10) + "秒后召开销售部门会议。").getBytes("utf-8"), properties);
// 如果不设置message的properties,也可以使用下述方法设置x-delay属性的值
// rabbitTemplate.convertAndSend("ex.delayed",
  //              "key.delayed", message, msg -> {
// // 使用定制的属性x-delay设置过期时间,也就是提前5s提醒
// // 当消息转换完,设置消息头字段
// msg.getMessageProperties().setHeader("x-delay",
//            (seconds - 5) * 1000);
// return msg;
// });
        message.getMessageProperties().setDeliveryTag(1);
            rabbitTemplate.convertAndSend("ex.delayed", "key.delayed",
                    message);
            return "已经定好闹钟了,到时提前告诉大家";
    }
}

application.properties中添加rabbitmq的配置

spring.application.name=rabbitmq_ttl

spring.rabbitmq.host=192.168.1.82
spring.rabbitmq.username=root
spring.rabbitmq.password=123456
spring.rabbitmq.virtual-host=/
spring.rabbitmq.port=5672
spring.rabbitmq.listener.simple.acknowledge-mode=manual

pom.xml添加依赖:

  <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>


    </dependencies>
上一篇:【linux驱动基础】linux工作队列work_struct,delayed_work的使用


下一篇:构建rabbitmq docker镜像,并添加延时消息插件