TTL 全称 Time To Live(存活时间/过期时间)。
当消息到达存活时间后,还没有被消费,会被自动清除。
RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。
过期是在生产者设置,主要有2种方式:
- 指定一条消息的过期时间。
- 给队列设置消息过期时间,队列中的所有消息都有同样的过期时间。
application.properties文件
server.port=8081 # ip spring.rabbitmq.host=127.0.0.1 #默认5672 spring.rabbitmq.port=5672 #用户名 spring.rabbitmq.username=guest #密码 spring.rabbitmq.password=guest #连接到代理时用的虚拟主机 spring.rabbitmq.virtual-host=/ #是否启用【发布确认】,默认false #spring.rabbitmq.publisher-confirm-type=correlated替换spring.rabbitmq.publisher-confirms=true spring.rabbitmq.publisher-confirm-type=correlated #是否启用【发布返回】,默认false spring.rabbitmq.publisher-returns=true #表示消息确认方式,其有三种配置方式,分别是none、manual和auto;默认auto spring.rabbitmq.listener.simple.acknowledge-mode=manual #rabbitmq限流,必须在ack确认才能使用 #消费者最小数量 spring.rabbitmq.listener.simple.concurrency=1 #最大的消费者数量 spring.rabbitmq.listener.simple.max-concurrency=10 #在单个请求中处理的消息个数,他应该大于等于事务数量(unack的最大数量) spring.rabbitmq.listener.simple.prefetch=2
1.指定一条消息的过期时间
TTLController 类
package com.qingfeng.rabbitmqhighproducer.ttl; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RestController; import java.util.UUID; @RestController public class TTLController { @Autowired private RabbitTemplate rabbitTemplate; // 指定消息的过期时间 //消息推送到队列后,如果指定时间内没有被消费,则会自动过期。 //http://127.0.0.1:8081/testTTL @GetMapping("/testTTL") public String testTTL() { MessageProperties messageProperties = new MessageProperties(); messageProperties.setExpiration("60000"); // 设置过期时间,单位:毫秒 byte[] msgBytes = "测试消息自动过期".getBytes(); Message message = new Message(msgBytes, messageProperties); rabbitTemplate.convertAndSend("ttl_exchange", "ttl", message); return "ok"; } }
测试:http://127.0.0.1:8081/testTTL
2.给队列设置消息过期时间,队列中的所有消息都有同样的过期时间。
TTLQueueRabbitConfig 类
package com.qingfeng.rabbitmqhighproducer.ttl.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class TTLQueueRabbitConfig { @Bean public Queue TTLQueue() { Map<String, Object> map = new HashMap<>(); map.put("x-message-ttl", 30000); // 队列中的消息未被消费则30秒后过期 return new Queue("ttl_queue", true, false, false, map); } @Bean public DirectExchange TTLExchange() { return new DirectExchange("ttl_exchange", true, false); } @Bean public Binding bindingDirect() { return BindingBuilder.bind(TTLQueue()).to(TTLExchange()).with("ttl"); } }
TTLController 类
package com.qingfeng.rabbitmqhighproducer.ttl; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RestController; import java.util.UUID; @RestController public class TTLController { @Autowired private RabbitTemplate rabbitTemplate; //给队列设置消息过期时间,队列中的所有消息都有同样的过期时间 //http://127.0.0.1:8081/testQueueTTL @GetMapping("/testQueueTTL") public String testQueueTTL() { String messageId = String.valueOf(UUID.randomUUID()); rabbitTemplate.convertAndSend("ttl_exchange", "ttl", messageId); return "ok"; } }
}
测试:http://127.0.0.1:8081/testQueueTTL
如果同时指定了Message TTL
和Queue TTL
,则优先较小的那一个