2.6.4 RabbitMQ -- 工作队列和交换机
- WorkQueue
- Publish/Subscribe
- Routing
- EmitLog
WorkQueue
WorkQueue:https://www.rabbitmq.com/tutorials/tutorial-two-dotnet.html
- 一个消息生产者,多个消息消费者
- exchange 交换机自动恢复
- 对消息进行持久化
- 手动确认消息
对消息进行持久化
var properties = channel.CreateBasicProperties(); properties.Persistent = true; channel.BasicPublish(exchange: "", routingKey: "task_queue", basicProperties: properties, body: body);
手动确认消息
autoAck: false
channel.BasicConsume(queue: "task_queue", autoAck: false, consumer: consumer);
手动调用 BasicAck
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
修改接收端为手动确认消息
autoAck: false
channel.BasicConsume(queue: "hello", autoAck: false, consumer: consumer);
BasicAck
consumer.Received += (model, ea) => { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); Thread.Sleep(2000);// 演示多个接收端 channel.BasicAck(ea.DeliveryTag, false); Console.WriteLine(" [x] Received {0}", message); };
启动多个接收端
Publish/Subscribe
Publish/Subscribe:https://www.rabbitmq.com/tutorials/tutorial-three-dotnet.html
Fanout 交换机,每个队列都会收到
channel.ExchangeDeclare("logs", ExchangeType.Fanout);
Routing
Routing:https://www.rabbitmq.com/tutorials/tutorial-four-dotnet.html
Bindings
channel.QueueBind(queue: queueName, exchange: "logs", routingKey: "");
Direct exchange
channel.ExchangeDeclare(exchange: "direct_logs", type: "direct");
EmitLog
新建控制台项目 EmitLogDirect,ReceiveLogsDirect
发送端
namespace EmitLogDirect { class EmitLogDirect { public static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using(var connection = factory.CreateConnection()) using(var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "direct_logs", type: ExchangeType.Direct);// 声明交换机 var severity = (args.Length > 0) ? args[0] : "info"; var message = (args.Length > 1) ? string.Join(" ", args.Skip( 1 ).ToArray()) : "Hello World!"; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "direct_logs", routingKey: severity,// 路由 Key 自动带上严重级别 basicProperties: null, body: body); Console.WriteLine(" [x] Sent '{0}':'{1}'", severity, message); } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } }
error 级别单独发送到一个队列
接收端
namespace ReceiveLogsDirect { class ReceiveLogsDirect { public static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "direct_logs", type: ExchangeType.Direct);// 声明交换机 var queueName = channel.QueueDeclare().QueueName; if (args.Length < 1) { Console.Error.WriteLine("Usage: {0} [info] [warning] [error]", Environment.GetCommandLineArgs()[0]); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); Environment.ExitCode = 1; return; } foreach (var severity in args) { channel.QueueBind(queue: queueName, exchange: "direct_logs", routingKey: severity);// 路由 Key 自动带上严重级别 } Console.WriteLine(" [*] Waiting for messages."); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); var routingKey = ea.RoutingKey; Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message); }; channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } } }
替换发送端,接收端的 localhost 为服务器地址
接收端控制台启动
dotnet run info waring error
发送端控制台启动
dotnet run info dotnet run error dotnet run waring test
接收端输出
[x] Received 'info':'Hello World!' [x] Received 'error':'Hello World!' [x] Received 'waring':'test'
GitHub源码链接:
https://github.com/MINGSON666/Personal-Learning-Library/tree/main/ArchitectTrainingCamp