RabbitMQ Topic交换机代码实现

Topic交换机和队列绑定是允许使用通配符

*匹配一个单词

#匹配多个单词

 

发布

public void TopicPublish()
        {
            MQHelper mh = new MQHelper();
            using (var conn = mh.GetConnection())
            {
                using (IModel channel = conn.CreateModel())
                {
                    // 声明队列
                    channel.QueueDeclare(queue: "ChinaQueue", durable: true, exclusive: false, autoDelete: false, arguments: null);
                    channel.QueueDeclare(queue: "NewsQueue", durable: true, exclusive: false, autoDelete: false, arguments: null);

                    //声明交换机
                    channel.ExchangeDeclare(exchange: "TopicExchange", type: ExchangeType.Topic, durable: true, autoDelete: false, arguments: null);

                    //绑定
                    channel.QueueBind(queue: "ChinaQueue", exchange: "TopicExchange", routingKey: "China.#", arguments: null);
                    channel.QueueBind(queue: "NewsQueue", exchange: "TopicExchange", routingKey: "#.News", arguments: null);

                    string message = "来自中国的新闻消息";
                    var body = Encoding.UTF8.GetBytes(message);
                    channel.BasicPublish(exchange: "TopicExchange", routingKey:"China.News", basicProperties: null, body: body);
                    Console.WriteLine($"[{message}]已发送到队列");

                    string message2 = "来自中国的天气消息";
                    var body2 = Encoding.UTF8.GetBytes(message2);
                    channel.BasicPublish(exchange: "TopicExchange", routingKey: "China.Weather", basicProperties: null, body: body2);
                    Console.WriteLine($"[{message2}]已发送到队列");

                    string message3 = "来自美国的新闻消息";
                    var body3 = Encoding.UTF8.GetBytes(message2);
                    channel.BasicPublish(exchange: "TopicExchange", routingKey: "US.News", basicProperties: null, body: body3);
                    Console.WriteLine($"[{message3}]已发送到队列");
                }
            }
        }

上面发布了三条消息。China.News和China.Weather会匹配到ChinaQueue,而China.News和US.News会匹配到NewsQueue

 

消费:

        public void TopicConsume()
        {
            var factory = new ConnectionFactory();
            factory.HostName = "localhost";
            factory.UserName = "guest";
            factory.Password = "guest";

            using (var conn = factory.CreateConnection())
            {
                using (IModel channel = conn.CreateModel())
                {
                    // 声明队列
                    // channel.QueueDeclare(queue: "ChinaQueue", durable: true, exclusive: false, autoDelete: false, arguments: null);
                    channel.QueueDeclare(queue: "NewsQueue", durable: true, exclusive: false, autoDelete: false, arguments: null);

                    //声明交换机
                    channel.ExchangeDeclare(exchange: "TopicExchange", type: ExchangeType.Topic, durable: true, autoDelete: false, arguments: null);

                    //绑定
                    // channel.QueueBind(queue: "ChinaQueue", exchange: "TopicExchange", routingKey: "China.#", arguments: null);
                    channel.QueueBind(queue: "NewsQueue", exchange: "TopicExchange", routingKey: "#.News", arguments: null);


                    //消费消息
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body.ToArray());
                        Console.WriteLine($"接收成功,[{message}]");
                    };
                    //处理消息
                    channel.BasicConsume(queue: "NewsQueue", autoAck: true, consumer: consumer);
                }
            }
        }

 

RabbitMQ Topic交换机代码实现

上一篇:远程分支删除后, git 后续的处理的情况;


下一篇:并发编程中一种经典的分而治之的思想!!