1.topic类型的Exchange
我们之前说过Topic类型的Exchange是direct类型的模糊查询模式,可以通过routkey来实现模糊消费message,topic的模糊匹配有两种模式:
1. 使用*来匹配一个单词
2.使用#来匹配0个或多个单词
我们来看代码
消费端
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading; namespace RabbitMQClient
{
class Program
{
private static readonly ConnectionFactory rabbitMqFactory = new ConnectionFactory()
{
HostName = "39.**.**.**",
Port = ,
UserName = "root",
Password = "root",
VirtualHost = "/"
};
static void Main(string[] args)
{
var exchangeAll = "changeAll";
var queueman = "queueman";
var quemankey = "man.#"; using (IConnection conn = rabbitMqFactory.CreateConnection())
using (IModel channel = conn.CreateModel())
{
channel.ExchangeDeclare(exchangeAll, type: "topic", durable: true, autoDelete: false);
channel.QueueDeclare(queueman, durable: true, exclusive: false, autoDelete: false);
channel.QueueBind(queueman, exchangeAll, quemankey); channel.BasicQos(prefetchSize: , prefetchCount: , global: false);
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
Byte[] body = ea.Body;
String message = Encoding.UTF8.GetString(body);
Console.WriteLine( message);
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
}; channel.BasicConsume(queue: queueman, autoAck: false, consumer: consumer);
Console.ReadLine();
}
}
}
}
生产者代码
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks; namespace RabbitMQConsole
{
class Program
{
static void Main(string[] args)
{
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = "39.**.**.**";
factory.Port = ;
factory.VirtualHost = "/";
factory.UserName = "root";
factory.Password = "root"; var exchangeAll = "changeAll";
//性别.姓氏.头发长度
var keymanA = "man.chen.long";
var keymanB = "man.liu.long";
var keymanC = "woman.liu.long";
var keymanD = "woman.chen.short"; using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchangeAll, type: "topic", durable: true, autoDelete: false); var properties = channel.CreateBasicProperties();
properties.Persistent = true;
//发布消息
channel.BasicPublish(exchange: exchangeAll,
routingKey: keymanA,
basicProperties: properties,
body: Encoding.UTF8.GetBytes(keymanA));
channel.BasicPublish(exchange: exchangeAll,
routingKey: keymanB,
basicProperties: properties,
body: Encoding.UTF8.GetBytes(keymanB));
channel.BasicPublish(exchange: exchangeAll,
routingKey: keymanC,
basicProperties: properties,
body: Encoding.UTF8.GetBytes(keymanC));
channel.BasicPublish(exchange: exchangeAll,
routingKey: keymanD,
basicProperties: properties,
body: Encoding.UTF8.GetBytes(keymanD));
}
}
}
}
}
我们先运行消费端再运行生产段,结果如下
消费端:
2.headers类型的exchange
生成者代码
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks; namespace RabbitMQConsole
{
class Program
{
static void Main(string[] args)
{
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = "39.**.**.**";
factory.Port = ;
factory.VirtualHost = "/";
factory.UserName = "root";
factory.Password = "root"; var exchangeAll = "changeHeader"; using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchangeAll, type: "headers", durable: true, autoDelete: false); var properties = channel.CreateBasicProperties();
properties.Persistent = true;
properties.Headers = new Dictionary<string, object> {
{ "sex","man"}
};
//发布消息
channel.BasicPublish(exchange: exchangeAll,
routingKey: "",
basicProperties: properties,
body: Encoding.UTF8.GetBytes("hihihi"));
}
}
}
}
}
消费端代码
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading; namespace RabbitMQClient
{
class Program
{
private static readonly ConnectionFactory rabbitMqFactory = new ConnectionFactory()
{
HostName = "39.**.**.**",
Port = ,
UserName = "root",
Password = "root",
VirtualHost = "/"
};
static void Main(string[] args)
{
var exchangeAll = "changeHeader";
var queueman = "queueHeader"; using (IConnection conn = rabbitMqFactory.CreateConnection())
using (IModel channel = conn.CreateModel())
{
channel.ExchangeDeclare(exchangeAll, type: "headers", durable: true, autoDelete: false);
channel.QueueDeclare(queueman, durable: true, exclusive: false, autoDelete: false);
channel.QueueBind(queueman, exchangeAll, "",new Dictionary<string, object> { { "sex","man" } }); channel.BasicQos(prefetchSize: , prefetchCount: , global: false);
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
Byte[] body = ea.Body;
String message = Encoding.UTF8.GetString(body);
Console.WriteLine( message);
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
}; channel.BasicConsume(queue: queueman, autoAck: false, consumer: consumer);
Console.ReadLine();
}
}
}
}