RabbitMQ-ttl+dlx实现延迟队列

文章目录

什么是延迟队列?

延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。

若有如下需求:

1.下单后,30分钟未支付,取消订单,回滚库存。

2.新用户注册成功7天后,发送短信问候。

其实现方式:

1.定时器

2.延迟队列

RabbitMQ-ttl+dlx实现延迟队列
但,在RabbitMQ中没有直接提供延迟队列。
不过我们可以使用 TTL + DLX 组合来实现延迟队列
RabbitMQ-ttl+dlx实现延迟队列

延迟队列的实现测试

工程结构:
RabbitMQ-ttl+dlx实现延迟队列

spring-rabbitmq-producer.xml中相关配置:

    <!--加载配置文件-->
    <context:property-placeholder location="classpath:rabbitmq.properties"/>

    <!-- 定义rabbitmq connectionFactory -->
    <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
                               port="${rabbitmq.port}"
                               username="${rabbitmq.username}"
                               password="${rabbitmq.password}"
                               virtual-host="${rabbitmq.virtual-host}"
                               publisher-confirms="true"
                               publisher-returns="true"
    />
    <!--定义管理交换机、队列-->
    <rabbit:admin connection-factory="connectionFactory"/>

    <!--定义rabbitTemplate对象操作可以在代码中方便发送消息-->
    <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
    <!--
        延迟队列的实现:ttl + dlx
        1.定义正常的队列(test_queue_dq)与正常的交换机(test_exchange_dq),同时设置正常队列的存活时间,并两者相绑定
        2.定义死信队列(queue_dq)与死信交换机(exchange_dq),并两者相绑定
        3.将正常的队列与死信交换机相绑定
    -->
    <!--1.1创建正常的队列(test_queue_dq)-->
    <rabbit:queue id="test_queue_dq" name="test_queue_dq">

        <rabbit:queue-arguments>
            <!--1.1.1设置正常队列的ttl-->
            <entry key="x-message-ttl" value="20000" value-type="java.lang.Integer"/>
            <!--3.绑定正常队列与死信交换机-->
            <entry key="x-dead-letter-exchange" value="exchange_dq"/>
            <entry key="x-dead-letter-routing-key" value="dq.oh"/>
        </rabbit:queue-arguments>
    </rabbit:queue>
    <!--1.2创建正常的交换机(test_exchange_dq)-->
    <rabbit:topic-exchange id="test_exchange_dq" name="test_exchange_dq">
        <rabbit:bindings>
            <rabbit:binding queue="test_queue_dq" pattern="test.dq.#"/>
        </rabbit:bindings>
    </rabbit:topic-exchange>

    <!--2.1创建死信队列(queue_dq)-->
    <rabbit:queue id="queue_dq" name="queue_dq"/>
    <!--2.2创建死信交换机(exchange_dq)-->
    <rabbit:topic-exchange id="exchange_dq" name="exchange_dq">
        <rabbit:bindings>
            <rabbit:binding pattern="dq.#" queue="queue_dq"/>
        </rabbit:bindings>
    </rabbit:topic-exchange>

给延迟队列发送一条消息测试一下:

    /**
     * 延迟队列的实现:ttl + dlx
     */
    @Test
    public void testDq(){
        rabbitTemplate.convertAndSend("test_exchange_dq","test.dq.hh","ttl + dlx 实现了延迟队列");
    }

消费端:

    <!--加载配置文件-->
    <context:property-placeholder location="classpath:rabbitmq.properties"/>

    <!-- 定义rabbitmq connectionFactory -->
    <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
                               port="${rabbitmq.port}"
                               username="${rabbitmq.username}"
                               password="${rabbitmq.password}"
                               virtual-host="${rabbitmq.virtual-host}"
    />
    <!--定义管理交换机、队列-->
    <rabbit:admin connection-factory="connectionFactory"/>

    <!--定义rabbitTemplate对象操作可以在代码中方便发送消息-->
    <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>

    <!--包扫描-->
    <context:component-scan base-package="com.itheima.listener"/>

    <!--定义监听器bean-->                                                <!--手动签收      一次从mq中拉取的消息数-->
    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1">
<!--        <rabbit:listener ref="ackListener" queue-names="test_queue_confirm"/>-->
<!--        <rabbit:listener ref="qosListener" queue-names="test_queue_confirm"/>-->
<!--        <rabbit:listener ref="dlxListener" queue-names="test_queue_dlx"/>-->
        <rabbit:listener ref="dqListener" queue-names="queue_dq"/><!--延迟队列的实现,记得要监听的是死信队列-->
    </rabbit:listener-container>

消费端监听器:

@Component
public class DqListener implements ChannelAwareMessageListener {


    public void onMessage(Message message, Channel channel) throws Exception {
        System.out.println(new String(message.getBody()));
    }


    public void onMessage(Message message) {

    }
}

生产端发送消息后,在过了设定的延迟时间后,监听延迟队列的消费者才消费了消息。

上一篇:迅为IMX6ULL开发板NFS服务器的搭建


下一篇:基础入门-系统及数据库等