一、引言
在具体业务中可能会遇到一些要提前处理的消息,比如普通客户的消息按先进先出的顺序处理,Vip客户的消息要提前处理。在RabbitMQ中,消息优先级的实现方式是:在声明queue时设置队列的x-max-priority属性,然后在publish消息时,设置消息的优先级即可。
RabbitMQ优先级队列注意事项:
1)RabbitMQ3.5以后才支持优先级队列。
2)只有当消费者不足,不能及时进行消费的情况下,优先级队列才会生效。
3)优先级取值范围在0~9之间,数值越大则优先级越高。
二、示例
2.1、发送端(生产端)
新建一个控制台项目Send,并添加一个类RabbitMQConfig。
class RabbitMQConfig { public static string Host { get; set; } public static string VirtualHost { get; set; } public static string UserName { get; set; } public static string Password { get; set; } public static int Port { get; set; } static RabbitMQConfig() { Host = "192.168.2.242"; VirtualHost = "/"; UserName = "hello"; Password = "world"; Port = 5672; } }
class Program { static void Main(string[] args) { Console.WriteLine("按任意键开始生产。"); Console.ReadLine(); PriorityMessagePublish(); Console.ReadLine(); } private static void PriorityMessagePublish() { const string MessagePrefix = "message_"; const int PublishMessageCount = 6; byte messagePriority = 0; var factory = new ConnectionFactory() { HostName = RabbitMQConfig.Host, Port = RabbitMQConfig.Port, VirtualHost = RabbitMQConfig.VirtualHost, UserName = RabbitMQConfig.UserName, Password = RabbitMQConfig.Password, Protocol = Protocols.DefaultProtocol }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { //设置队列优先级,取值范围在0~255之间。 Dictionary<string, object> dict = new Dictionary<string, object> { { "x-max-priority", 255 } }; //声明队列 channel.QueueDeclare(queue: "priority", durable: true, exclusive: false, autoDelete: false, arguments: dict); //向该消息队列发送消息message Random random = new Random(); for (int i = 0; i < PublishMessageCount; i++) { var properties = channel.CreateBasicProperties(); messagePriority = (byte)random.Next(0, 9); properties.Priority = messagePriority;//设置消息优先级,取值范围在0~9之间。 var message = MessagePrefix + i.ToString(); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "", routingKey: "priority", basicProperties: properties, body: body); Console.WriteLine($"{DateTime.Now.ToString()} Send {message} , Priority {messagePriority}"); } } } } }
2.2、接收端(消费端)
新建一个控制台项目Receive,按住Alt键,将发送端RabbitMQConfig类拖一个快捷方式到Receive项目中。
class Program { static void Main(string[] args) { Console.WriteLine("按任意键开始消费。"); Console.ReadLine(); PriorityMessageSubscribe(); } public static void PriorityMessageSubscribe() { var factory = new ConnectionFactory() { HostName = RabbitMQConfig.Host, UserName = RabbitMQConfig.UserName, Password = RabbitMQConfig.Password }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); var consumer = new EventingBasicConsumer(channel); consumer.Received += async (model, ea) => { await Task.Run(() => { var message = Encoding.UTF8.GetString(ea.Body); Thread.Sleep(1000 * 2); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);//手动消息确认 Console.WriteLine($"{DateTime.Now.ToString()} Received {message}"); }); }; channel.BasicConsume(queue: "priority", noAck: false, consumer: consumer);//需要启用消息响应,否则priority无效。 Console.ReadKey(); } } } }
2.3、运行结果
从消费情况可以看出,message_2及message_3由于priority优先级最高都是7,所以它们会被最早消费,而message_5的priority是0,所以最后才被消费。