通配符模式(Topics)
根据通配符(Topics)来接收消息,将路由key和某模式进行匹配,此时队列需要绑定在一个模式上,#
匹配一个词或多个词,*
只匹配一个词。
使用的Exchange类型为Topic
Topic类型的Exchange:
(1)Topic类型与Direct相比,都是可以根据RoutingKey将消息路由到不同的队列,只不过Topic类型可以让队列在绑定路由时使用通配符
(2)Topic类型的RoutingKey一般是由一个或多个单词构成,多个单词之间用“.”分割,如item.insert
(3)Topic类型通配符规则:#匹配一个或多个词,*恰好匹配一个词,例如item.#,可以匹配item.insert或item.insert.user,item.*只能匹配
item.insert或item.user
Demo
生产者类WeatherTopic.cs
1 using System; 2 using System.Collections.Generic; 3 using System.Text; 4 using RabbitMQ.Client; 5 using RabbitMQ.Client.Events; 6 using RabbitMQ.Common; 7 8 namespace RabbitMQ.Producer.Producer 9 { 10 public class WeatherTopic 11 { 12 public static void SendWeather() 13 { 14 Dictionary<string, string> area = new Dictionary<string, string>(); 15 area.Add("china.hunan.changsha.20211231", "中国湖南长沙20211231天气数据"); 16 area.Add("china.hubei.wuhan.20211231", "中国湖北武汉20211231天气数据"); 17 area.Add("china.hubei.xiangyang.20211231", "中国湖北襄阳20211231天气数据"); 18 area.Add("us.cal.lsj.20211231", "美国加州洛杉矶20211231天气数据"); 19 20 using (var connection = RabbitUtils.GetConnection().CreateConnection()) 21 { 22 using (var channel = connection.CreateModel()) 23 { 24 foreach (var item in area) 25 { 26 channel.BasicPublish(RabbitConstant.EXCHANGE_WEATHER_TOPIC, item.Key, null, Encoding.UTF8.GetBytes(item.Value)); 27 } 28 Console.WriteLine("气象信息发送成功!"); 29 Console.WriteLine("Press [Enter] to exit"); 30 Console.ReadLine(); 31 } 32 } 33 } 34 } 35 }
消费者01 WeatherTopic.cs
1 using System; 2 using System.Collections.Generic; 3 using System.Text; 4 using RabbitMQ.Client; 5 using RabbitMQ.Client.Events; 6 using RabbitMQ.Common; 7 8 namespace RabbitMQ.Consumer01.Consumer 9 { 10 public class WeatherTopic 11 { 12 public static void ReceiveWeatherInfo() 13 { 14 using (var connection = RabbitUtils.GetConnection().CreateConnection()) 15 { 16 using (var channel = connection.CreateModel()) 17 { 18 channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER_TOPIC, ExchangeType.Topic); 19 channel.QueueDeclare(RabbitConstant.QUEUE_BAIDU, true, false, false, null); 20 /* 21 * QueueBind用于将队列与交换机绑定 22 * queue:队列名 23 * exchange:交换机名 24 * routingKey:路由key 25 */ 26 channel.QueueBind(queue: RabbitConstant.QUEUE_BAIDU, 27 exchange: RabbitConstant.EXCHANGE_WEATHER_TOPIC, 28 routingKey: "china.hubei.*.20211231"); 29 channel.BasicQos(0, 1, false); 30 var consumer = new EventingBasicConsumer(channel); 31 32 consumer.Received += (model, ea) => 33 { 34 var message = Encoding.UTF8.GetString(ea.Body.ToArray()); 35 Console.WriteLine($"百度收到的气象信息:{message}"); 36 channel.BasicAck(ea.DeliveryTag, false); 37 }; 38 channel.BasicConsume(queue: RabbitConstant.QUEUE_BAIDU, 39 autoAck: false, 40 consumer: consumer); 41 Console.WriteLine(" Press [enter] to exit."); 42 Console.ReadLine(); 43 } 44 } 45 } 46 } 47 }
消费者02 WeatherTopic.cs代码
1 using System; 2 using System.Collections.Generic; 3 using System.Text; 4 using RabbitMQ.Client; 5 using RabbitMQ.Client.Events; 6 using RabbitMQ.Common; 7 8 namespace RabbitMQ.Consumer02.Consumer 9 { 10 public class WeatherTopic 11 { 12 public static void ReceiveWeatherInfo() 13 { 14 using (var connection = RabbitUtils.GetConnection().CreateConnection()) 15 { 16 using (var channel = connection.CreateModel()) 17 { 18 channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER_TOPIC, ExchangeType.Topic); 19 channel.QueueDeclare(RabbitConstant.QUEUE_SINA, true, false, false, null); 20 /* 21 * QueueBind用于将队列与交换机绑定 22 * queue:队列名 23 * exchange:交换机名 24 * routingKey:路由key 25 */ 26 channel.QueueBind(queue: RabbitConstant.QUEUE_SINA, 27 exchange: RabbitConstant.EXCHANGE_WEATHER_TOPIC, 28 routingKey: "china.#"); 29 channel.BasicQos(0, 1, false); 30 var consumer = new EventingBasicConsumer(channel); 31 32 consumer.Received += (model, ea) => 33 { 34 var message = Encoding.UTF8.GetString(ea.Body.ToArray()); 35 Console.WriteLine($"新浪收到的气象信息:{message}"); 36 channel.BasicAck(ea.DeliveryTag, false); 37 }; 38 channel.BasicConsume(queue: RabbitConstant.QUEUE_SINA, 39 autoAck: false, 40 consumer: consumer); 41 Console.WriteLine(" Press [enter] to exit."); 42 Console.ReadLine(); 43 } 44 } 45 } 46 47 } 48 }
执行结果
运行前配置:删除原来的Exchange,具体操作如下
参考链接:https://www.bilibili.com/video/BV1GU4y1w7Yq?p=8
https://mp.weixin.qq.com/s/QG3uXhhpkE_Uo6Me15mxdg