订阅模式(Publish/Subscribe)
一次向许多消费者发送消息,一个生产者发送的消息会被多个消费者获取,也就是将消息将广播到所有的消费者中。
应用场景: 更新商品库存后需要通知多个缓存和多个数据库,这里的结构应该是:
- 一个fanout类型交换机扇出两个个消息队列,分别为缓存消息队列、数据库消息队列
- 一个缓存消息队列对应着多个缓存消费者
- 一个数据库消息队列对应着多个数据库消费者
在订阅模型中,多了一个Exchange角色,而且过程略有变化:
Exchange:交换机(X)。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交所有队列、或是将消息丢弃。如何操作,取决于 Exchange的类型。Exchange有常见的以下三种类型
Fanout:广播,将消息交给所有绑定到交换机的队列,订阅模式使用此类型
Direct:定向,将消息交给符合指定routing key的队列 路由模式使用此类型
Topic:通配符,把消息交给符合routing pattern的队列 主题模式使用此类型
Exchange只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失
Demo 模拟百度和新浪订阅气象局发布的天气信息
1.RabbitMQ.Common类库代码
(1)RabbitConstant.cs代码
1 using System; 2 using System.Collections.Generic; 3 using System.Text; 4 5 namespace RabbitMQ.Common 6 { 7 public class RabbitConstant 8 { 9 public const string QUEUE_HELLO_WORLD = "helloworld.queue"; 10 public const string QUEUE_SMS = "sms.queue"; 11 public const string EXCHANGE_WEATHER = "weather.exchange"; 12 public const string QUEUE_BAIDU = "baidu.queue"; 13 public const string QUEUE_SINA= "sina.queue"; 14 public const string EXCHANGE_WEATHER_ROUTING = "weather.routing.exchange"; 15 public const string EXCHANGE_WEATHER_TOPIC = "weather.topic.exchange"; 16 } 17 }
(2)RabbitUtils.cs代码
1 using System; 2 using RabbitMQ.Client; 3 4 namespace RabbitMQ.Common 5 { 6 public class RabbitUtils 7 { 8 public static ConnectionFactory GetConnection() 9 { 10 var factory = new ConnectionFactory(); 11 factory.HostName = "127.0.0.1"; 12 factory.Port = 5672;//是服务端的端口号,与页面的端口号15672区分开 13 factory.UserName = "guest"; 14 factory.Password = "guest"; 15 //factory.VirtualHost = "/"; 16 return factory; 17 } 18 } 19 }
2. RabbitMQ.Producer控制台项目代码
(1)WeatherFanout.cs代码
1 using System; 2 using System.Text; 3 using RabbitMQ.Client; 4 using RabbitMQ.Common; 5 6 namespace RabbitMQ.Producer.Producer 7 { 8 public class WeatherFanout 9 { 10 public static void SendWeatherInfo() 11 { 12 using (var connection = RabbitUtils.GetConnection().CreateConnection()) 13 { 14 using (var channel = connection.CreateModel()) 15 { 16 string message = "零下10度"; 17 var body = Encoding.UTF8.GetBytes(message); 18 19 channel.BasicPublish(exchange: RabbitConstant.EXCHANGE_WEATHER, 20 routingKey: "", 21 basicProperties: null, 22 body: body); 23 Console.WriteLine("天气信息发送成功"); 24 Console.WriteLine("Press [Enter] to exit"); 25 Console.ReadLine(); 26 } 27 } 28 } 29 } 30 }
(2)Program.cs代码
1 using RabbitMQ.Producer.Producer; 2 using System; 3 4 namespace RabbitMQ.Producer 5 { 6 class Program 7 { 8 static void Main(string[] args) 9 { 10 WeatherFanout.SendWeatherInfo(); 11 } 12 } 13 }
3.RabbitMQ.Consumer01控制台项目代码
(1)WeatherFanout.cs
1 using System; 2 using System.Collections.Generic; 3 using System.Text; 4 using RabbitMQ.Common; 5 using RabbitMQ.Client; 6 using RabbitMQ.Client.Events; 7 8 namespace RabbitMQ.Consumer01.Consumer 9 { 10 public class WeatherFanout 11 { 12 public static void ReceiveWeatherInfo() 13 { 14 using (var connection = RabbitUtils.GetConnection().CreateConnection()) 15 { 16 using (var channel = connection.CreateModel()) 17 { 18 channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER, ExchangeType.Fanout); 19 channel.QueueDeclare(RabbitConstant.QUEUE_BAIDU, true, false, false, null); 20 /* 21 * QueueBind用于将队列与交换机绑定 22 * queue:队列名 23 * exchange:交换机名 24 * routingKey:路由key 25 */ 26 channel.QueueBind(queue: RabbitConstant.QUEUE_BAIDU, 27 exchange:RabbitConstant.EXCHANGE_WEATHER, 28 routingKey:""); 29 channel.BasicQos(0, 1, false); 30 var consumer = new EventingBasicConsumer(channel); 31 32 consumer.Received += (model, ea) => 33 { 34 var message = Encoding.UTF8.GetString(ea.Body.ToArray()); 35 Console.WriteLine($"百度收到的气象信息:{message}"); 36 channel.BasicAck(ea.DeliveryTag, false); 37 }; 38 channel.BasicConsume(queue: RabbitConstant.QUEUE_BAIDU, 39 autoAck: false, 40 consumer: consumer); 41 Console.WriteLine(" Press [enter] to exit."); 42 Console.ReadLine(); 43 } 44 } 45 } 46 } 47 }
(2)Program.cs代码
1 using RabbitMQ.Consumer01.Consumer; 2 using System; 3 4 namespace RabbitMQ.Consumer01 5 { 6 class Program 7 { 8 static void Main(string[] args) 9 { 10 WeatherFanout.ReceiveWeatherInfo(); 11 } 12 } 13 }
4.RabbitMQ.Consumer02控制台项目代码
(1)WeatherFanout.cs
1 using System; 2 using System.Collections.Generic; 3 using System.Text; 4 using RabbitMQ.Common; 5 using RabbitMQ.Client; 6 using RabbitMQ.Client.Events; 7 8 namespace RabbitMQ.Consumer02.Consumer 9 { 10 public class WeatherFanout 11 { 12 public static void ReceiveWeatherInfo() 13 { 14 using (var connection = RabbitUtils.GetConnection().CreateConnection()) 15 { 16 using (var channel = connection.CreateModel()) 17 { 18 channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER, ExchangeType.Fanout); 19 channel.QueueDeclare(RabbitConstant.QUEUE_SINA, true, false, false, null); 20 /* 21 * QueueBind用于将队列与交换机绑定 22 * queue:队列名 23 * exchange:交换机名 24 * routingKey:路由key 25 */ 26 channel.QueueBind(queue: RabbitConstant.QUEUE_SINA, 27 exchange: RabbitConstant.EXCHANGE_WEATHER, 28 routingKey: ""); 29 channel.BasicQos(0, 1, false); 30 var consumer = new EventingBasicConsumer(channel); 31 32 consumer.Received += (model, ea) => 33 { 34 var message = Encoding.UTF8.GetString(ea.Body.ToArray()); 35 Console.WriteLine($"新浪收到的气象信息:{message}"); 36 channel.BasicAck(ea.DeliveryTag, false); 37 }; 38 channel.BasicConsume(queue: RabbitConstant.QUEUE_SINA, 39 autoAck: false, 40 consumer: consumer); 41 Console.WriteLine(" Press [enter] to exit."); 42 Console.ReadLine(); 43 } 44 } 45 } 46 } 47 }
(2)Program.cs代码
using RabbitMQ.Consumer02.Consumer; using System; namespace RabbitMQ.Consumer02 { class Program { static void Main(string[] args) { WeatherFanout.ReceiveWeatherInfo(); } } }
5.执行,使用powershell运行
参考连接:https://mp.weixin.qq.com/s/QG3uXhhpkE_Uo6Me15mxdg
https://www.bilibili.com/video/BV1GU4y1w7Yq?p=8