RabbitMQ 优先级队列

一、概述

在实际应用场景中,我们推送消息,希望给消息设置优先级,比如说京东双 11 活动,它希望将消息优先推送给京东的 vip,而对于非 vip 用户消息推送的优先级就低一些,那么怎么实现呢?

其实很简单,通过优先级队列就可以完美解决上述应用场景

 

二、原理图

RabbitMQ 优先级队列

 

三、编码

1、applicaiton.yml

spring:
  rabbitmq:
    host: 192.168.59.135
    port: 5672
    username: admin
    password: admin123
    publisher-confirm-type: correlated
    publisher-returns: true
    # 开启 ack
    listener:
      direct:
        acknowledge-mode: manual
      simple:
        acknowledge-mode: manual #采取手动应答
        #concurrency: 1 # 指定最小的消费者数量
        #max-concurrency: 1 #指定最大的消费者数量
        retry:
          enabled: true # 是否支持重试

2、自定义配置类 PriorityConfig

@Configuration
public class PriorityConfig {
    private static final String PRIORITY_EXCHANGE = "priority_exchange";
    private static final String PRIORITY_QUEUE = "priority_queue";
    private static final String PRIORITY_KEY = "priority";

    // 声明优先级交换机(type = direct)
    @Bean(PRIORITY_EXCHANGE)
    public DirectExchange priorityExchange() {
        return ExchangeBuilder.directExchange(PRIORITY_EXCHANGE).durable(true).build();
    }

    // 声明优先级队列
    @Bean(PRIORITY_QUEUE)
    public Queue priorityQueue() {
        /**
         * maxPriority(int maxPriority):设置队列支持的最大优先级数量,如果没有设置,则队列将不支持消息优先级
         * 官方支持的优先级范围是 0 ~ 255,超过 255 就会发生报错,但是一般企业使用的优先级是 0 ~ 10,如果 maxPriority 设置
         * 的太大,会浪费 cpu 和 内存,因为消息是要在队列中排队的,队列长度太大,排序的过程中会损耗性能
         */
        return QueueBuilder.durable(PRIORITY_QUEUE).maxPriority(10).build();
    }

    // 优先级队列绑定优先级交换机
    @Bean
    public Binding priorityQueueBindingPriorityExchange(@Qualifier(PRIORITY_QUEUE) Queue queue,
                                                        @Qualifier(PRIORITY_EXCHANGE) DirectExchange exchange) {

        return BindingBuilder.bind(queue).to(exchange).with(PRIORITY_KEY);
    }
}

3、发布确认自定义类 MyConfirmCallback

@Slf4j
@Component
public class MyConfirmCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
    /**
     * 交换机确认回调方法
     * 1、Producer 发送的消息,交换机确认收到
     * correlationData:保存消息回调 ID 及其它相关的信息
     * ack:true
     * cause:null
     * <p>
     * 2、Producer 发送的消息,交换机没有收到
     * correlationData:保存消息回调 ID 及其它相关的信息
     * ack:false
     * cause:交换机没有收到消息的原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData != null ? correlationData.getId() : "";
        if (ack) {
            log.info("交换机已经收到 id 为:{}的消息", id);
        } else {
            log.info("交换机还未收到 id 为:{}消息,由于原因:{}", id, cause);
        }
    }

    /**
     * 如果交换机没有将消息路由到队列,会触发该回调方法
     */
    @Override
    public void returnedMessage(ReturnedMessage returned) {
        log.info("消息: {} 被服务器退回--->退回原因: {},交换机是: {},路由key是:{},退回编号是:{}",
                new String(returned.getMessage().getBody()), returned.getReplyText(), returned.getExchange(),
                returned.getRoutingKey(), returned.getReplyCode());
    }
}

4、Producer

@Slf4j
@RestController
public class Producer {
    private static final String PRIORITY_EXCHANGE = "priority_exchange";
    private static final String PRIORITY_QUEUE = "priority_queue";
    private static final String PRIORITY_KEY = "priority";

    // 1、依赖注入 rabbitTemplate
    @Autowired
    private RabbitTemplate rabbitTemplate;
    // 2、依赖注入 myConfirmCallback
    @Autowired
    private MyConfirmCallback myConfirmCallback;

    // 3、完成了 1、2 的注入之后再设置 rabbitTemplate 的回调对象
    @PostConstruct
    public void init() {
        // 消息成功传递给交换机时会触发 MyConfirmCallback 中的回调方法 confirm()
        rabbitTemplate.setConfirmCallback(myConfirmCallback);
        // 消息回退时会触发 MyConfirmCallback 中的回调方法 returnedMessage()
        rabbitTemplate.setReturnsCallback(myConfirmCallback);
    }

    @GetMapping("/priority/sendMessage/{msg}")
    public void sendMessage(@PathVariable("msg") String msg) {
        // 设置唯一 ID
        CorrelationData correlationData = new CorrelationData();
        correlationData.setId(UUID.randomUUID().toString());

        rabbitTemplate.convertAndSend(PRIORITY_EXCHANGE, PRIORITY_KEY, msg, correlationData);
        log.info("发送一条未设置优先级的消息", msg);

        String msg1 = msg + 0;
        rabbitTemplate.convertAndSend(PRIORITY_EXCHANGE, PRIORITY_KEY, msg1, (message -> {
            message.getMessageProperties().setPriority(0);
            return message;
        }), correlationData);
        log.info("发送一条优先级为 0 的消息", msg1);

        String msg2 = msg + 2;
        rabbitTemplate.convertAndSend(PRIORITY_EXCHANGE, PRIORITY_KEY, msg2, (message -> {
            message.getMessageProperties().setPriority(2);
            return message;
        }), correlationData);
        log.info("发送一条优先级为 2 的消息", msg2);

        String msg3 = msg + 5;
        rabbitTemplate.convertAndSend(PRIORITY_EXCHANGE, PRIORITY_KEY, msg3, (message -> {
            message.getMessageProperties().setPriority(5);
            return message;
        }), correlationData);
        log.info("发送一条优先级为 5 的消息", msg3);

        String msg4 = msg + 10;
        rabbitTemplate.convertAndSend(PRIORITY_EXCHANGE, PRIORITY_KEY, msg4, (message -> {
            message.getMessageProperties().setPriority(10);
            return message;
        }), correlationData);
        log.info("发送一条优先级为 10 的消息", msg4);
    }
}

5、Consumer

@Slf4j
@Component
public class Consumer {
    private static final String PRIORITY_QUEUE = "priority_queue";

    @RabbitListener(queues = {PRIORITY_QUEUE})
    public void receivedMessage(Message message, Channel channel, CorrelationData correlationData) throws IOException {
        try {
            String msg = new String(message.getBody());
            log.info("消费者成功接收到消息:{}", msg);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            log.info("消息消费错误");
            // 出现异常之后拒绝消息,并且消息重新入队
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
        }
    }
}

  

四、测试

要让队列实现优先级需要做的事情如下

1、队列需要设置为优先级队列

2、消息需要设置消息的优先级

3、生产者必须先将消息发送到队列中,让队列对设置了优先级的消息进行排队

4、1、2、3 完成之后再启动消费者进行消费即可

要想实现上述功能,我们先将 Consumer 的 @RabbitListener 注解注释掉,然后启动 Springboot 项目

浏览器发送请求: http://localhost:8080/priority/sendMessage/小毛毛是最可爱的RabbitMQ 优先级队列

RabbitMQ 优先级队列

消息发送完成之后,然后打开 Consumer 的 @RabbitListener 注解,再次启动 Springboot 项目

RabbitMQ 优先级队列

从消费者的消费结果可以看出,优先级越高的消息越早被消费,如果未设置消息的优先级,那么该默认的优先级看起来是 1

 

上一篇:优先级倒挂(priority inversion)


下一篇:JAVA学习线程优先级