rabbitmq (三) 发布/订阅

rabbitmq的目的并不是让生产者把消息直接发到队列里面去,

这样不能实现解耦的目的,也不利于程序的扩展.

所以就有交换机(exchanges)的概念.

交换机有几种类型:direct, topic, headers 和fanout,

可以为交换机命名,还有一种没有命名的交换机,上几章的消息都是发布到没有命名的交换机.

channel.ExchangeDeclare(exchange: "logs", type: "fanout");
var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "logs",
routingKey: "",
basicProperties: null,
body: body);

感觉交换机主要控制消息的投递方式.

临时队列:

可以通过创建队列的方式对消息的存储等方式进行管理.

var queueName = channel.QueueDeclare().QueueName;

绑定:

最后通过绑定的方式把交换机和队列进行关联.

channel.QueueBind(queue: queueName,
exchange: "logs",
routingKey: "");

publish:

using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks; namespace RMQ_Publish
{
class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "your host name", UserName = "wc", Password = "wc" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "logs", type: "fanout"); var message = GetMessage(args);
int i = ;
while (true)
{
var body = Encoding.UTF8.GetBytes(message + ":" + i.ToString());
channel.BasicPublish(exchange: "logs",
routingKey: "",
basicProperties: null,
body: body);
Console.WriteLine(" [x] Sent {0}", message); if (i-- == )
i = ;
}
} Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
} private static string GetMessage(string[] args)
{
return ((args.Length > )
? string.Join(" ", args)
: "info: Hello World!");
}
}
}

subscribe:

var factory = new ConnectionFactory() { HostName = "your host name", UserName = "wc", Password = "wc" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "logs", type: "fanout"); var queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(queue: queueName,
exchange: "logs",
routingKey: ""); Console.WriteLine(" [*] Waiting for logs."); var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] {0}", message);
Thread.Sleep();
};
channel.BasicConsume(queue: queueName,
autoAck: true,
consumer: consumer); Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
上一篇:Java和Android开发IDE---IntelliJ IDEA使用技巧(转)


下一篇:[原]jenkins(六)---jenkins远程部署脚本