一、概述
在实际应用场景中,我们推送消息,希望给消息设置优先级,比如说京东双 11 活动,它希望将消息优先推送给京东的 vip,而对于非 vip 用户消息推送的优先级就低一些,那么怎么实现呢?
其实很简单,通过优先级队列就可以完美解决上述应用场景
二、原理图
三、编码
1、applicaiton.yml
spring: rabbitmq: host: 192.168.59.135 port: 5672 username: admin password: admin123 publisher-confirm-type: correlated publisher-returns: true # 开启 ack listener: direct: acknowledge-mode: manual simple: acknowledge-mode: manual #采取手动应答 #concurrency: 1 # 指定最小的消费者数量 #max-concurrency: 1 #指定最大的消费者数量 retry: enabled: true # 是否支持重试
2、自定义配置类 PriorityConfig
@Configuration public class PriorityConfig { private static final String PRIORITY_EXCHANGE = "priority_exchange"; private static final String PRIORITY_QUEUE = "priority_queue"; private static final String PRIORITY_KEY = "priority"; // 声明优先级交换机(type = direct) @Bean(PRIORITY_EXCHANGE) public DirectExchange priorityExchange() { return ExchangeBuilder.directExchange(PRIORITY_EXCHANGE).durable(true).build(); } // 声明优先级队列 @Bean(PRIORITY_QUEUE) public Queue priorityQueue() { /** * maxPriority(int maxPriority):设置队列支持的最大优先级数量,如果没有设置,则队列将不支持消息优先级 * 官方支持的优先级范围是 0 ~ 255,超过 255 就会发生报错,但是一般企业使用的优先级是 0 ~ 10,如果 maxPriority 设置 * 的太大,会浪费 cpu 和 内存,因为消息是要在队列中排队的,队列长度太大,排序的过程中会损耗性能 */ return QueueBuilder.durable(PRIORITY_QUEUE).maxPriority(10).build(); } // 优先级队列绑定优先级交换机 @Bean public Binding priorityQueueBindingPriorityExchange(@Qualifier(PRIORITY_QUEUE) Queue queue, @Qualifier(PRIORITY_EXCHANGE) DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(PRIORITY_KEY); } }
3、发布确认自定义类 MyConfirmCallback
@Slf4j @Component public class MyConfirmCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback { /** * 交换机确认回调方法 * 1、Producer 发送的消息,交换机确认收到 * correlationData:保存消息回调 ID 及其它相关的信息 * ack:true * cause:null * <p> * 2、Producer 发送的消息,交换机没有收到 * correlationData:保存消息回调 ID 及其它相关的信息 * ack:false * cause:交换机没有收到消息的原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { String id = correlationData != null ? correlationData.getId() : ""; if (ack) { log.info("交换机已经收到 id 为:{}的消息", id); } else { log.info("交换机还未收到 id 为:{}消息,由于原因:{}", id, cause); } } /** * 如果交换机没有将消息路由到队列,会触发该回调方法 */ @Override public void returnedMessage(ReturnedMessage returned) { log.info("消息: {} 被服务器退回--->退回原因: {},交换机是: {},路由key是:{},退回编号是:{}", new String(returned.getMessage().getBody()), returned.getReplyText(), returned.getExchange(), returned.getRoutingKey(), returned.getReplyCode()); } }
4、Producer
@Slf4j @RestController public class Producer { private static final String PRIORITY_EXCHANGE = "priority_exchange"; private static final String PRIORITY_QUEUE = "priority_queue"; private static final String PRIORITY_KEY = "priority"; // 1、依赖注入 rabbitTemplate @Autowired private RabbitTemplate rabbitTemplate; // 2、依赖注入 myConfirmCallback @Autowired private MyConfirmCallback myConfirmCallback; // 3、完成了 1、2 的注入之后再设置 rabbitTemplate 的回调对象 @PostConstruct public void init() { // 消息成功传递给交换机时会触发 MyConfirmCallback 中的回调方法 confirm() rabbitTemplate.setConfirmCallback(myConfirmCallback); // 消息回退时会触发 MyConfirmCallback 中的回调方法 returnedMessage() rabbitTemplate.setReturnsCallback(myConfirmCallback); } @GetMapping("/priority/sendMessage/{msg}") public void sendMessage(@PathVariable("msg") String msg) { // 设置唯一 ID CorrelationData correlationData = new CorrelationData(); correlationData.setId(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend(PRIORITY_EXCHANGE, PRIORITY_KEY, msg, correlationData); log.info("发送一条未设置优先级的消息", msg); String msg1 = msg + 0; rabbitTemplate.convertAndSend(PRIORITY_EXCHANGE, PRIORITY_KEY, msg1, (message -> { message.getMessageProperties().setPriority(0); return message; }), correlationData); log.info("发送一条优先级为 0 的消息", msg1); String msg2 = msg + 2; rabbitTemplate.convertAndSend(PRIORITY_EXCHANGE, PRIORITY_KEY, msg2, (message -> { message.getMessageProperties().setPriority(2); return message; }), correlationData); log.info("发送一条优先级为 2 的消息", msg2); String msg3 = msg + 5; rabbitTemplate.convertAndSend(PRIORITY_EXCHANGE, PRIORITY_KEY, msg3, (message -> { message.getMessageProperties().setPriority(5); return message; }), correlationData); log.info("发送一条优先级为 5 的消息", msg3); String msg4 = msg + 10; rabbitTemplate.convertAndSend(PRIORITY_EXCHANGE, PRIORITY_KEY, msg4, (message -> { message.getMessageProperties().setPriority(10); return message; }), correlationData); log.info("发送一条优先级为 10 的消息", msg4); } }
5、Consumer
@Slf4j @Component public class Consumer { private static final String PRIORITY_QUEUE = "priority_queue"; @RabbitListener(queues = {PRIORITY_QUEUE}) public void receivedMessage(Message message, Channel channel, CorrelationData correlationData) throws IOException { try { String msg = new String(message.getBody()); log.info("消费者成功接收到消息:{}", msg); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { log.info("消息消费错误"); // 出现异常之后拒绝消息,并且消息重新入队 channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); } } }
四、测试
要让队列实现优先级需要做的事情如下
1、队列需要设置为优先级队列
2、消息需要设置消息的优先级
3、生产者必须先将消息发送到队列中,让队列对设置了优先级的消息进行排队
4、1、2、3 完成之后再启动消费者进行消费即可
要想实现上述功能,我们先将 Consumer 的 @RabbitListener 注解注释掉,然后启动 Springboot 项目
浏览器发送请求: http://localhost:8080/priority/sendMessage/小毛毛是最可爱的
消息发送完成之后,然后打开 Consumer 的 @RabbitListener 注解,再次启动 Springboot 项目
从消费者的消费结果可以看出,优先级越高的消息越早被消费,如果未设置消息的优先级,那么该默认的优先级看起来是 1