C#利用RabbitMQ实现消息订阅与发布

在消息队列模型中,如何将消息广播到所有的消费者,这种模式成为“发布/订阅”。本文主要以一个简单的小例子,简述通过fanout交换机,实现消息的发布与订阅,仅供学习分享使用,如有不足之处,还请指正。

Fanout交换机模型

扇形交换机,采用广播模式,根据绑定的交换机,路由到与之对应的所有队列。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。

C#利用RabbitMQ实现消息订阅与发布

RabbitMQ控制台操作

新增两个队列

在同一个Virtual host下新增两个队列Q1,Q2,如下图所示:

C#利用RabbitMQ实现消息订阅与发布

绑定fanout交换机

将两个队列绑定到系统默认的fanout交换机,如下所示:

C#利用RabbitMQ实现消息订阅与发布

示例效果图

生产者,采用Fanout类型交换机发布消息,如下图所示:

C#利用RabbitMQ实现消息订阅与发布

 

 当生产者发布 一条消息时,Q1,Q2两个队列均会收到,如下图所示:

C#利用RabbitMQ实现消息订阅与发布

当启动消费者后,两个消费者,均会订阅到相关消息,如下图所示:

C#利用RabbitMQ实现消息订阅与发布

核心代码

消息发布

建立连接后,将通道声明类型为Fanout的交换机,如下所示:

 1     /// <summary>
 2     /// fanout类型交换机,发送消息
 3     /// </summary>
 4     public class RabbitMqFanoutSendHelper : RabbitMqHelper {
 5         /// <summary>
 6         /// 发送消息
 7         /// </summary>
 8         /// <param name="msg"></param>
 9         /// <returns></returns>
10         public bool SendMsg(string msg)
11         {
12             try
13             {
14                 using (var conn = GetConnection("/Alan.hsiang"))
15                 {
16                     using (var channel = conn.CreateModel())
17                     {
18                         channel.ExchangeDeclare(exchange: "amq.fanout", type: ExchangeType.Fanout,durable:true);
19                         
20                         var body = Encoding.UTF8.GetBytes(msg);
21 
22                         channel.BasicPublish(exchange: "amq.fanout",
23                                              routingKey: "",
24                                              basicProperties: null,
25                                              body: body);
26 
27                         //Console.WriteLine(" [x] Sent {0}", message);
28                     };
29                 };
30                 return true;
31             }
32             catch (Exception ex)
33             {
34                 throw ex;
35             }
36         }
37     }

消息订阅

建立连接后,通道声明类型为Fanout的交换机,并绑定队列进行订阅,如下所示:

 1    /// <summary>
 2     /// 扇形交换机接收消息
 3     /// </summary>
 4     public class RabbitMqFanoutReceiveHelper : RabbitMqHelper
 5     {
 6         public RabbitMqReceiveEventHandler OnReceiveEvent;
 7 
 8         private IConnection conn;
 9 
10         private IModel channel;
11 
12         private EventingBasicConsumer consumer;
13 
14         public bool StartReceiveMsg(string queueName)
15         {
16             try
17             {
18                 conn = GetConnection("/Alan.hsiang");
19 
20                 channel = conn.CreateModel();
21                 channel.ExchangeDeclare(exchange: "amq.fanout", type: ExchangeType.Fanout,durable:true);
22                 //此处随机取出交换机下的队列
23                 //var queueName = channel.QueueDeclare().QueueName;
24                 channel.QueueBind(queue: queueName, exchange: "amq.fanout", routingKey: "");
25                 consumer = new EventingBasicConsumer(channel);
26                 consumer.Received += (model, ea) =>
27                 {
28                     var body = ea.Body.ToArray();
29                     var message = Encoding.UTF8.GetString(body);
30                     //Console.WriteLine(" [x] Received {0}", message);
31                     if (OnReceiveEvent != null)
32                     {
33                         OnReceiveEvent(queueName+"::"+message);
34                     }
35                 };
36                 channel.BasicConsume(queue: queueName,
37                                         autoAck: true,
38                                         consumer: consumer);
39                 return true;
40             }
41             catch (Exception ex)
42             {
43                 throw ex;
44             }
45         }
46     }

关于RabbitMQ的基础知识介绍,可参考前几篇博文。

备注

遣怀

唐代  [杜牧]

落魄江湖载酒行,楚腰纤细掌中轻。
十年一觉扬州梦,赢得青楼薄幸名。 

C#利用RabbitMQ实现消息订阅与发布

上一篇:Delphi中回调函数的简单例子


下一篇:linux中 -daemon、&、nohup指这三者的区别