文章目录
什么是延迟队列?
延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。
若有如下需求:
1.下单后,30分钟未支付,取消订单,回滚库存。
2.新用户注册成功7天后,发送短信问候。
其实现方式:
1.定时器
2.延迟队列
但,在RabbitMQ中没有直接提供延迟队列。
不过我们可以使用 TTL + DLX 组合来实现延迟队列
延迟队列的实现测试
工程结构:
spring-rabbitmq-producer.xml中相关配置:
<!--加载配置文件-->
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<!-- 定义rabbitmq connectionFactory -->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"
publisher-confirms="true"
publisher-returns="true"
/>
<!--定义管理交换机、队列-->
<rabbit:admin connection-factory="connectionFactory"/>
<!--定义rabbitTemplate对象操作可以在代码中方便发送消息-->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
<!--
延迟队列的实现:ttl + dlx
1.定义正常的队列(test_queue_dq)与正常的交换机(test_exchange_dq),同时设置正常队列的存活时间,并两者相绑定
2.定义死信队列(queue_dq)与死信交换机(exchange_dq),并两者相绑定
3.将正常的队列与死信交换机相绑定
-->
<!--1.1创建正常的队列(test_queue_dq)-->
<rabbit:queue id="test_queue_dq" name="test_queue_dq">
<rabbit:queue-arguments>
<!--1.1.1设置正常队列的ttl-->
<entry key="x-message-ttl" value="20000" value-type="java.lang.Integer"/>
<!--3.绑定正常队列与死信交换机-->
<entry key="x-dead-letter-exchange" value="exchange_dq"/>
<entry key="x-dead-letter-routing-key" value="dq.oh"/>
</rabbit:queue-arguments>
</rabbit:queue>
<!--1.2创建正常的交换机(test_exchange_dq)-->
<rabbit:topic-exchange id="test_exchange_dq" name="test_exchange_dq">
<rabbit:bindings>
<rabbit:binding queue="test_queue_dq" pattern="test.dq.#"/>
</rabbit:bindings>
</rabbit:topic-exchange>
<!--2.1创建死信队列(queue_dq)-->
<rabbit:queue id="queue_dq" name="queue_dq"/>
<!--2.2创建死信交换机(exchange_dq)-->
<rabbit:topic-exchange id="exchange_dq" name="exchange_dq">
<rabbit:bindings>
<rabbit:binding pattern="dq.#" queue="queue_dq"/>
</rabbit:bindings>
</rabbit:topic-exchange>
给延迟队列发送一条消息测试一下:
/**
* 延迟队列的实现:ttl + dlx
*/
@Test
public void testDq(){
rabbitTemplate.convertAndSend("test_exchange_dq","test.dq.hh","ttl + dlx 实现了延迟队列");
}
消费端:
<!--加载配置文件-->
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<!-- 定义rabbitmq connectionFactory -->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"
/>
<!--定义管理交换机、队列-->
<rabbit:admin connection-factory="connectionFactory"/>
<!--定义rabbitTemplate对象操作可以在代码中方便发送消息-->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
<!--包扫描-->
<context:component-scan base-package="com.itheima.listener"/>
<!--定义监听器bean--> <!--手动签收 一次从mq中拉取的消息数-->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1">
<!-- <rabbit:listener ref="ackListener" queue-names="test_queue_confirm"/>-->
<!-- <rabbit:listener ref="qosListener" queue-names="test_queue_confirm"/>-->
<!-- <rabbit:listener ref="dlxListener" queue-names="test_queue_dlx"/>-->
<rabbit:listener ref="dqListener" queue-names="queue_dq"/><!--延迟队列的实现,记得要监听的是死信队列-->
</rabbit:listener-container>
消费端监听器:
@Component
public class DqListener implements ChannelAwareMessageListener {
public void onMessage(Message message, Channel channel) throws Exception {
System.out.println(new String(message.getBody()));
}
public void onMessage(Message message) {
}
}
生产端发送消息后,在过了设定的延迟时间后,监听延迟队列的消费者才消费了消息。