RabbitMQ Fanout交换机代码实现

一般情况下,生产者发送消息,先到先得,一个消费者消费之后,该条消息便消失不会再被消费,抢完即止。

那能否生产者发送的消息每个消费者都能接收到,都能消费呢?

 

Fanout交换机就可以实现。

代码实现:

生产者:

public class FanoutExchange
    {
        public void FanoutPublish()
        {
            MQHelper mh = new MQHelper();
            using (var conn = mh.GetConnection())
            {
                using (IModel channel = conn.CreateModel())
                {
                    //声明队列
                    channel.QueueDeclare(queue: "FanoutAdu001", durable: true, exclusive: false, autoDelete: false, arguments: null);
                    channel.QueueDeclare(queue: "FanoutAdu002", durable: true, exclusive: false, autoDelete: false, arguments: null);

                    //声明交换机
                    channel.ExchangeDeclare(exchange: "FanoutExchange", type: ExchangeType.Fanout, durable: true, autoDelete: false, arguments: null);

                    //绑定
                    channel.QueueBind(queue: "FanoutAdu001", exchange: "FanoutExchange", routingKey: string.Empty,arguments:null);
                    channel.QueueBind(queue: "FanoutAdu002", exchange: "FanoutExchange", routingKey: string.Empty, arguments: null);

                    //发布
                    int i = 0;
                    while (true)
                    {
                        string message = $"通知{i}";
                        var body = Encoding.UTF8.GetBytes(message);
                        channel.BasicPublish(exchange: "FanoutExchange", routingKey: string.Empty, basicProperties: null, body: body);
                        Console.WriteLine($"通知{i}已发送到队列");
                        Thread.Sleep(2000);
                        i++;
                    }

                }
            }
        }
    }

可以看到,这里和Direct交换机代码相比,类型发生了变化,同时路由键变成了Empty

消费者:

public class FanoutExchangeConsumer
    {
        public void FanoutConsume()
        {
            var factory = new ConnectionFactory();
            factory.HostName = "localhost";
            factory.UserName = "guest";
            factory.Password = "guest";

            using (var conn = factory.CreateConnection())
            {
                using (IModel channel = conn.CreateModel())
                {
                    //声明队列
                    channel.QueueDeclare(queue: "FanoutAdu002", durable: true, exclusive: false, autoDelete: false, arguments: null);

                    //声明交换机
                    channel.ExchangeDeclare(exchange: "FanoutExchange", type: ExchangeType.Fanout, durable: true, autoDelete: false, arguments: null);

                    //绑定
                    channel.QueueBind(queue: "FanoutAdu002", exchange: "FanoutExchange", routingKey: string.Empty);


                    //消费消息
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body.ToArray());
                        Console.WriteLine($"接收成功,[{message}]");
                    };
                    //处理消息
                    channel.BasicConsume(queue: "FanoutAdu002", autoAck: true, consumer: consumer);
                }
            }
        }
    }

这里每个消费者一条路由,都能够接收生产者发送的所有消息

上一篇:阿里acp认证考试相关答疑 阿里acp认证费用等相关资讯介绍


下一篇:spring boot和RabbitMQ整合实现