快速阅读
利用Exchange的Direct类型,实现对队列的过滤,消费者启动以后,输入相应的key值,攻取该key值对应的在队列中的消息 。
从一节知道Exchange有四种类型
Direct,Topic,headers,fanout
前面我们说了fanout类型,可以把消息发送给所有的消费者,
在用Fanout类型的时候,我们绑定的时候是没有指定Routing key的【空值】
channel.BasicPublish(exchange: "logs",
routingKey: "",
basicProperties: null,
body: body);
这次我们说一下Direct类型
Exchange的Direct类型将与队列中的routing key进行精确的匹配。
生产者代码
- 创建连接和信道
- 声明交换器名字和指定类型为direct
- 发送routingkey=rk1 和rk2的消息各五次
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "directType1", type: "direct");
for (var i = 0; i < 5; i++)
{
string message = "Hello World!this rk1 message " + i;
var body = Encoding.UTF8.GetBytes(message);
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
channel.BasicPublish(exchange: "directType1",
routingKey: "rk1",
basicProperties: null,
body: body);
Console.WriteLine(" [x] Sent {0},id={1}", message,i);
Thread.Sleep(1000);
}
for (var i = 0; i < 5; i++)
{
string message = "Hello World!this rk2 message " + i;
var body = Encoding.UTF8.GetBytes(message);
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
channel.BasicPublish(exchange: "directType1",
routingKey: "rk2",
basicProperties: null,
body: body);
Console.WriteLine(" [x] Sent {0},id={1}", message, i);
Thread.Sleep(1000);
}
}
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
消费者代码
- 输入要查看的消息类型,支持rk1 和rk2
- 创建连接和信道
- 声明交换器名字和指定类型为direct
- 指定队列名称,并且把routingkey的值赋值给控制台手动需要输入的rk1或者rk2
- 接收消息并回馈,和fanout类型一样的代码了。
static void Main(string[] args)
{
bool flag = true;
string level = "";
while (flag)
{
Console.WriteLine("请选择要查看的消息类型");
level = Console.ReadLine();
if (level == "rk1" || level == "rk2" )
flag = false;
else
Console.Write("仅支持rk1与rk2");
}
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "directType1", type: "direct");
var queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(queue: queueName, exchange: "directType1", routingKey: level);
//以下是区别生产者的
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (sender, e) =>
{
var body = e.Body;
var message = Encoding.UTF8.GetString(body);
var rk = e.RoutingKey;
Console.WriteLine("Received {0},routingKey:{1}", message, rk);
Thread.Sleep(3000);//模拟耗时任务 ,
Console.WriteLine("Received over");
channel.BasicAck(deliveryTag: e.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
Console.WriteLine("");
Console.ReadLine();
}
}
查看结果
我们看到生产者分别生产了五条rk1和五条rk2的消息
消费者1输入只查看rk1的消息,成功获得了rk1的消息
同样的
消费者2输入只查看rk2的消息,成功获得了rk2的消息
要注意的是先把先消费者启动起来