rabbitmq的目的并不是让生产者把消息直接发到队列里面去,
这样不能实现解耦的目的,也不利于程序的扩展.
所以就有交换机(exchanges)的概念.
交换机有几种类型:direct, topic, headers 和fanout,
可以为交换机命名,还有一种没有命名的交换机,上几章的消息都是发布到没有命名的交换机.
channel.ExchangeDeclare(exchange: "logs", type: "fanout");
var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "logs",
routingKey: "",
basicProperties: null,
body: body);
感觉交换机主要控制消息的投递方式.
临时队列:
可以通过创建队列的方式对消息的存储等方式进行管理.
var queueName = channel.QueueDeclare().QueueName;
绑定:
最后通过绑定的方式把交换机和队列进行关联.
channel.QueueBind(queue: queueName,
exchange: "logs",
routingKey: "");
publish:
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks; namespace RMQ_Publish
{
class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "your host name", UserName = "wc", Password = "wc" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "logs", type: "fanout"); var message = GetMessage(args);
int i = ;
while (true)
{
var body = Encoding.UTF8.GetBytes(message + ":" + i.ToString());
channel.BasicPublish(exchange: "logs",
routingKey: "",
basicProperties: null,
body: body);
Console.WriteLine(" [x] Sent {0}", message); if (i-- == )
i = ;
}
} Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
} private static string GetMessage(string[] args)
{
return ((args.Length > )
? string.Join(" ", args)
: "info: Hello World!");
}
}
}
subscribe:
var factory = new ConnectionFactory() { HostName = "your host name", UserName = "wc", Password = "wc" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "logs", type: "fanout"); var queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(queue: queueName,
exchange: "logs",
routingKey: ""); Console.WriteLine(" [*] Waiting for logs."); var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] {0}", message);
Thread.Sleep();
};
channel.BasicConsume(queue: queueName,
autoAck: true,
consumer: consumer); Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}