rabbitmq的生产者的TTL模式(三)

TTL 全称 Time To Live(存活时间/过期时间)。

当消息到达存活时间后,还没有被消费,会被自动清除。

RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。

 

rabbitmq的生产者的TTL模式(三)

 

 

 过期是在生产者设置,主要有2种方式:

  1. 指定一条消息的过期时间。
  2. 给队列设置消息过期时间,队列中的所有消息都有同样的过期时间。

 application.properties文件

server.port=8081
# ip
spring.rabbitmq.host=127.0.0.1
#默认5672
spring.rabbitmq.port=5672
#用户名
spring.rabbitmq.username=guest
#密码
spring.rabbitmq.password=guest
#连接到代理时用的虚拟主机
spring.rabbitmq.virtual-host=/
#是否启用【发布确认】,默认false
#spring.rabbitmq.publisher-confirm-type=correlated替换spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-confirm-type=correlated
#是否启用【发布返回】,默认false
spring.rabbitmq.publisher-returns=true
#表示消息确认方式,其有三种配置方式,分别是none、manual和auto;默认auto
spring.rabbitmq.listener.simple.acknowledge-mode=manual
#rabbitmq限流,必须在ack确认才能使用
#消费者最小数量
spring.rabbitmq.listener.simple.concurrency=1
#最大的消费者数量
spring.rabbitmq.listener.simple.max-concurrency=10
#在单个请求中处理的消息个数,他应该大于等于事务数量(unack的最大数量)
spring.rabbitmq.listener.simple.prefetch=2

 

1.指定一条消息的过期时间

TTLController 类
package com.qingfeng.rabbitmqhighproducer.ttl;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.UUID;

@RestController
public class TTLController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    // 指定消息的过期时间
    //消息推送到队列后,如果指定时间内没有被消费,则会自动过期。
    //http://127.0.0.1:8081/testTTL
    @GetMapping("/testTTL")
    public String testTTL() {
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setExpiration("60000"); // 设置过期时间,单位:毫秒
        byte[] msgBytes = "测试消息自动过期".getBytes();
        Message message = new Message(msgBytes, messageProperties);
        rabbitTemplate.convertAndSend("ttl_exchange", "ttl", message);
        return "ok";
    }

}

测试:http://127.0.0.1:8081/testTTL

rabbitmq的生产者的TTL模式(三)

 

 

 

 

 2.给队列设置消息过期时间,队列中的所有消息都有同样的过期时间。

TTLQueueRabbitConfig 类
package com.qingfeng.rabbitmqhighproducer.ttl.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class TTLQueueRabbitConfig {
    @Bean
    public Queue TTLQueue() {
        Map<String, Object> map = new HashMap<>();
        map.put("x-message-ttl", 30000); // 队列中的消息未被消费则30秒后过期
        return new Queue("ttl_queue", true, false, false, map);
    }

    @Bean
    public DirectExchange TTLExchange() {
        return new DirectExchange("ttl_exchange", true, false);
    }

    @Bean
    public Binding bindingDirect() {
        return BindingBuilder.bind(TTLQueue()).to(TTLExchange()).with("ttl");
    }
}

 

TTLController 类
package com.qingfeng.rabbitmqhighproducer.ttl;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.UUID;

@RestController
public class TTLController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    //给队列设置消息过期时间,队列中的所有消息都有同样的过期时间
    //http://127.0.0.1:8081/testQueueTTL
    @GetMapping("/testQueueTTL")
    public String testQueueTTL() {
        String messageId = String.valueOf(UUID.randomUUID());
        rabbitTemplate.convertAndSend("ttl_exchange", "ttl", messageId);
        return "ok";
    }
}
}

测试:http://127.0.0.1:8081/testQueueTTL

rabbitmq的生产者的TTL模式(三)

 

 如果同时指定了Message TTLQueue TTL,则优先较小的那一个

 

上一篇:【MQ中间件】RabbitMQ -- RabbitMQ死信队列及内存监控(4)


下一篇:树莓派和电脑之间串口通信