- 延迟消息是指的消息发送出去后并不想立即就被消费,而是需要等(指定的)一段时间后才触发消 费。
- 例如下面的业务场景:在支付宝上面买电影票,锁定了一个座位后系统默认会帮你保留15分钟时
间,如果15分钟后还没付款那么不好意思系统会自动把座位释放掉。怎么实现类似的功能呢?
-
可以用定时任务每分钟扫一次,发现有占座超过15分钟还没付款的就释放掉。但是这样做很
低效,很多时候做的都是些无用功; -
可以用分布式锁、分布式缓存的被动过期时间,15分钟过期后锁也释放了,缓存key也不存在
了; -
还可以用延迟队列,锁座成功后会发送1条延迟消息,这条消息15分钟后才会被消费,消费的
过程就是检查这个座位是否已经是“已付款”状态;
可以使用rabbitmq_delayed_message_exchange插件实现。
这里和TTL方式有个很大的不同就是TTL存放消息在死信队列(delayqueue)里,二基于插件存放消息
在延时交换机里(x-delayed-message exchange)。
1.下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
2.安装插件
将插件拷贝到rabbitmq-server的安装路径:/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.4/plugins
3.启用插件
rabbitmq-plugins list
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
4.重启rabbitmq-server
systemctl restart rabbitmq-server
- 编写代码,首先是SpringBootApplication主入口类
package com.demo.demo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
RabbitMQ的对象配置
package com.demo.demo.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitConfig {
@Bean
public Queue queue(){
return new Queue("q.delayed",false,false,false);
}
@Bean
public Exchange exchange(){
Map<String,Object> arguments = new HashMap<>();
arguments.put("x-delayed-type",ExchangeTypes.DIRECT);
return new CustomExchange("ex.delayed","x-delayed-message",false,false,arguments);
}
@Bean
public Binding binding(){
return BindingBuilder.bind(queue()).to(exchange()).with("key.delayed").noargs();
}
}
监听延迟队列
package com.demo.demo.listener;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class DlxListener {
@RabbitListener(queues = "q.delayed")
public void onMessage(Message mess, Channel channel) throws IOException {
System.out.println("-----》》》》》》10秒后"+new String(mess.getBody()));
channel.basicAck(mess.getMessageProperties().getDeliveryTag(),false);
}
}
开发RestController,用于向延迟队列发送消息,并指定延迟的时长
package com.demo.demo.controller;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
@RestController
public class PayCotroller {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("pay")
public String pay() throws UnsupportedEncodingException {
int seconds = 20;
// RabbitMQ只会检查队列头部的消息是否过期,如果过期就放到死信队列
// 假如第一个过期时间很长,10s,第二个消息3s,则系统先看第一个消息,等到第一个消息过期,放到DLX
// 此时才会检查第二个消息,但实际上此时第二个消息早已经过期了,但是并没有先于第一个消息放到DLX。
// 插件rabbitmq_delayed_message_exchange帮我们搞定这个。
MessageProperties properties = new MessageProperties();
properties.setHeader("x-delay", (seconds - 10) * 1000);
Message message = new Message(((seconds-10) + "秒后召开销售部门会议。").getBytes("utf-8"), properties);
// 如果不设置message的properties,也可以使用下述方法设置x-delay属性的值
// rabbitTemplate.convertAndSend("ex.delayed",
// "key.delayed", message, msg -> {
// // 使用定制的属性x-delay设置过期时间,也就是提前5s提醒
// // 当消息转换完,设置消息头字段
// msg.getMessageProperties().setHeader("x-delay",
// (seconds - 5) * 1000);
// return msg;
// });
message.getMessageProperties().setDeliveryTag(1);
rabbitTemplate.convertAndSend("ex.delayed", "key.delayed",
message);
return "已经定好闹钟了,到时提前告诉大家";
}
}
application.properties中添加rabbitmq的配置
spring.application.name=rabbitmq_ttl
spring.rabbitmq.host=192.168.1.82
spring.rabbitmq.username=root
spring.rabbitmq.password=123456
spring.rabbitmq.virtual-host=/
spring.rabbitmq.port=5672
spring.rabbitmq.listener.simple.acknowledge-mode=manual
pom.xml添加依赖:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>