路由模式(Routing)
有选择地(Routing key)接收消息,发送消息到交换机并且要指定路由key ,消费者将队列绑定到交换机时需要指定路由key,仅消费指定路由key的消息
Exchange不再把消息交给每一个绑定的队列,而是根据消息的RoutingKey进行判断,只有队列的RoutingKey与消息的RoutingKey完全一致,才会接收消息,路由模式使用的Exchange类型为Direct类型
应用场景: 如在商品库存中增加了1台iphone12,iphone12促销活动消费者指定routing key为iphone12,只有此促销活动会接收到消息,其它促销活动不关心也不会消费此routing key的消息
Demo 架构与订阅模式一样
生产者类WeatherDirect.cs
1 using System; 2 using System.Collections.Generic; 3 using System.Text; 4 using RabbitMQ.Client; 5 using RabbitMQ.Common; 6 7 namespace RabbitMQ.Producer.Producer 8 { 9 public class WeatherDirect 10 { 11 public static void SendWeather() 12 { 13 Dictionary<string, string> area = new Dictionary<string, string>(); 14 area.Add("china.hunan.changsha.20211231", "中国湖南长沙20211231天气数据"); 15 area.Add("china.hubei.wuhan.20211231", "中国湖北武汉20211231天气数据"); 16 area.Add("china.hubei.xiangyang.20211231", "中国湖北襄阳20211231天气数据"); 17 area.Add("us.cal.lsj.20211231", "美国加州洛杉矶20211231天气数据"); 18 19 using (var connection = RabbitUtils.GetConnection().CreateConnection()) 20 { 21 using (var channel = connection.CreateModel()) 22 { 23 foreach (var item in area) 24 { 25 channel.BasicPublish(RabbitConstant.EXCHANGE_WEATHER_ROUTING, item.Key, null, Encoding.UTF8.GetBytes(item.Value)); 26 } 27 Console.WriteLine("气象信息发送成功!"); 28 Console.WriteLine("Press [Enter] to exit"); 29 Console.ReadLine(); 30 } 31 } 32 } 33 } 34 }
消费者1 WeatherDirect.cs
1 using RabbitMQ.Client; 2 using RabbitMQ.Client.Events; 3 using RabbitMQ.Common; 4 using System; 5 using System.Collections.Generic; 6 using System.Text; 7 8 namespace RabbitMQ.Consumer01.Consumer 9 { 10 public class WeatherDirect 11 { 12 public static void ReceiveWeatherInfo() 13 { 14 using (var connection = RabbitUtils.GetConnection().CreateConnection()) 15 { 16 using (var channel = connection.CreateModel()) 17 { 18 channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER_ROUTING, ExchangeType.Direct); 19 channel.QueueDeclare(RabbitConstant.QUEUE_BAIDU, true, false, false, null); 20 /* 21 * QueueBind用于将队列与交换机绑定 22 * queue:队列名 23 * exchange:交换机名 24 * routingKey:路由key 25 */ 26 channel.QueueBind(queue: RabbitConstant.QUEUE_BAIDU, 27 exchange: RabbitConstant.EXCHANGE_WEATHER_ROUTING, 28 routingKey: "china.hunan.changsha.20211231"); 29 channel.QueueBind(queue: RabbitConstant.QUEUE_BAIDU, 30 exchange: RabbitConstant.EXCHANGE_WEATHER_ROUTING, 31 routingKey: "china.hubei.wuhan.20211231"); 32 channel.BasicQos(0, 1, false); 33 var consumer = new EventingBasicConsumer(channel); 34 35 consumer.Received += (model, ea) => 36 { 37 var message = Encoding.UTF8.GetString(ea.Body.ToArray()); 38 Console.WriteLine($"百度收到的气象信息:{message}"); 39 channel.BasicAck(ea.DeliveryTag, false); 40 }; 41 channel.BasicConsume(queue: RabbitConstant.QUEUE_BAIDU, 42 autoAck: false, 43 consumer: consumer); 44 Console.WriteLine(" Press [enter] to exit."); 45 Console.ReadLine(); 46 } 47 } 48 } 49 } 50 }
消费者2 WeatherDirect.cs
1 using System; 2 using System.Collections.Generic; 3 using System.Text; 4 using RabbitMQ.Client; 5 using RabbitMQ.Client.Events; 6 using RabbitMQ.Common; 7 8 namespace RabbitMQ.Consumer02.Consumer 9 { 10 public class WeatherDirect 11 { 12 public static void ReceiveWeatherInfo() 13 { 14 using (var connection = RabbitUtils.GetConnection().CreateConnection()) 15 { 16 using (var channel = connection.CreateModel()) 17 { 18 channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER_ROUTING, ExchangeType.Direct); 19 channel.QueueDeclare(RabbitConstant.QUEUE_SINA, true, false, false, null); 20 /* 21 * QueueBind用于将队列与交换机绑定 22 * queue:队列名 23 * exchange:交换机名 24 * routingKey:路由key 25 */ 26 channel.QueueBind(queue: RabbitConstant.QUEUE_SINA, 27 exchange: RabbitConstant.EXCHANGE_WEATHER_ROUTING, 28 routingKey: "china.hubei.xiangyang.20211231"); 29 channel.QueueBind(queue: RabbitConstant.QUEUE_SINA, 30 exchange: RabbitConstant.EXCHANGE_WEATHER_ROUTING, 31 routingKey: "china.hubei.wuhan.20211231"); 32 channel.QueueBind(queue: RabbitConstant.QUEUE_SINA, 33 exchange: RabbitConstant.EXCHANGE_WEATHER_ROUTING, 34 routingKey: "us.cal.lsj.20211231"); 35 channel.BasicQos(0, 1, false); 36 var consumer = new EventingBasicConsumer(channel); 37 38 consumer.Received += (model, ea) => 39 { 40 var message = Encoding.UTF8.GetString(ea.Body.ToArray()); 41 Console.WriteLine($"新浪收到的气象信息:{message}"); 42 channel.BasicAck(ea.DeliveryTag, false); 43 }; 44 channel.BasicConsume(queue: RabbitConstant.QUEUE_SINA, 45 autoAck: false, 46 consumer: consumer); 47 Console.WriteLine(" Press [enter] to exit."); 48 Console.ReadLine(); 49 } 50 } 51 } 52 } 53 }
执行结果
备注:若执行报错:'PRECONDITION_FAILED - inequivalent arg 'type' for exchange
原因:一旦创建了exchange, RabbitMQ是不允许对其改变的,不然会报错
解决方法:在web上Exchanges模块把之前的exchange删除。
参考连接:https://www.bilibili.com/video/BV1GU4y1w7Yq?p=8
https://mp.weixin.qq.com/s/QG3uXhhpkE_Uo6Me15mxdg
https://blog.csdn.net/adviceuser2014/article/details/102259145