先决条件
本教程假定RabbitMQ已经安装,并运行在localhost
标准端口(5672)。如果你使用不同的主机、端口或证书,则需要调整连接设置。从哪里获得帮助
如果您在阅读本教程时遇到困难,可以通过邮件列表 联系我们。
工作队列
(使用.NET Client)
在第一篇教程中,我们编写了两个程序,用于从一个指定的队列发送和接收消息。在本文中,我们将创建一个工作队列,用于在多个工作线程间分发耗时的任务。
工作队列(又名:任务队列)背后的主要想法是避免立即执行资源密集型、且必须等待其完成的任务。相反的,我们把这些任务安排在稍后完成。我们可以将任务封装为消息并把它发送到队列中,在后台运行的工作进程将从队列中取出任务并最终执行。当您运行多个工作线程,这些任务将在这些工作线程之间共享。
这个概念在Web应用程序中特别有用,因为在一个HTTP请求窗口中无法处理复杂的任务。
准备
我们将略微修改上一个示例中的Send程序,以其可以在命令行发送任意消息。
这个程序将调度任务到我们的工作队列中,所以让我们把它命名为NewTask
:
像 教程[1]一样,我们需要生成两个项目:
dotnet new console --name NewTask
mv NewTask/Program.cs NewTask/NewTask.cs
dotnet new console --name Worker
mv Worker/Program.cs Worker/Worker.cs
cd NewTask
dotnet add package RabbitMQ.Client
dotnet restore
cd ../Worker
dotnet add package RabbitMQ.Client
dotnet restore
var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
channel.BasicPublish(exchange: "",
routingKey: "task_queue",
basicProperties: properties,
body: body);
从命令行参数获取消息的帮助方法:
private static string GetMessage(string[] args)
{
return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
}
我们旧的Receive.cs
脚本也需要进行一些更改:它需要为消息体中的每个点模拟一秒种的时间消耗。它将处理由RabbitMQ发布的消息,并执行任务,因此我们把它复制到Worker
项目并修改:
// 构建消费者实例。
var consumer = new EventingBasicConsumer(channel);
// 绑定消息接收事件。
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
// 模拟耗时操作。
int dots = message.Split('.').Length - 1;
Thread.Sleep(dots * 1000);
Console.WriteLine(" [x] Done");
};
channel.BasicConsume(queue: "task_queue", autoAck: true, consumer: consumer);
模拟虚拟任务的执行时间:
int dots = message.Split('.').Length - 1;
Thread.Sleep(dots * 1000);
循环调度
使用任务队列的优点之一是能够轻松地并行工作。如果我们正在积累积压的工作,我们仅要增加更多的工作者,并以此方式可以轻松扩展。
首先,我们尝试同时运行两个Worker
实例。他们都会从队列中获取消息,但究竟如何?让我们来看看。
您需要打开三个控制台,两个运行Worker
程序,这些控制台作为我们的两个消费者 - C1和C2。
# shell 1
cd Worker
dotnet run
# => [*] Waiting for messages. To exit press CTRL+C
# shell 2
cd Worker
dotnet run
# => [*] Waiting for messages. To exit press CTRL+C
在第三个控制台中,我们将发布一些新的任务。一旦你已经运行了消费者,你可以尝试发布几条消息:
# shell 3
cd NewTask
dotnet run "First message."
dotnet run "Second message.."
dotnet run "Third message..."
dotnet run "Fourth message...."
dotnet run "Fifth message....."
让我们看看有什么发送到了我们的Worker
程序:
# shell 1
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'First message.'
# => [x] Received 'Third message...'
# => [x] Received 'Fifth message.....'
# shell 2
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'Second message..'
# => [x] Received 'Fourth message....'
默认情况下,RabbitMQ会按顺序将每条消息发送给下一个消费者。消费者数量平均的情况下,每个消费者将会获得相同数量的消息。这种分配消息的方式称为循环(Round-Robin)。请尝试开启三个或更多的Worker
程序来验证。
消息确认
处理一项任务可能会需要几秒钟的时间。如果其中一个消费者开启了一项长期的任务并且只完成了部分就挂掉了,您可能想知道会发生什么?在我们当前的代码中,一旦RabbitMQ把消息分发给了消费者,它会立即将这条消息标记为删除。在这种情况下,如果您停掉某一个Worker,我们将会丢失这条正在处理的消息,也将丢失所有分发到该Worker但尚未处理的消息。
但是我们不想丢失任何一个任务。如果一个Worker挂掉了,我们希望这个任务能被重新分发给其他Worker。
为了确保消息永远不会丢失,RabbitMQ支持 消息确认 机制。消费者回发一个确认信号 Ack(nowledgement)给RabbitMQ,告诉它某个消息已经被接收、处理并且可以*删除它。
如果一个消费者在还没有回发确认信号之前就挂了(其通道关闭,连接关闭或者TCP连接丢失),RabbitMQ会认为该消息未被完全处理,并将其重新排队。如果有其他消费者同时在线,该消息将会被会迅速重新分发给其他消费者。这样,即便Worker意外挂掉,也可以确保消息不会丢失。
没有任何消息会超时;当消费者死亡时,RabbitMQ将会重新分发消息。即使处理消息需要非常非常长的时间也没关系。
默认情况下,手动消息确认 模式是开启的。在前面的例子中,我们通过将autoAck
(“自动确认模式”)参数设置为true
来明确地关闭手动消息确认模式。一旦完成任务,是时候删除这个标志并且从Worker手动发送一个恰当的确认信号给RabbitMQ。
// 构建消费者实例。
var consumer = new EventingBasicConsumer(channel);
// 绑定消息接收事件。
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
// 模拟耗时操作。
int dots = message.Split('.').Length - 1;
Thread.Sleep(dots * 1000);
Console.WriteLine(" [x] Done");
// 手动发送消息确认信号。
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
// autoAck:false - 关闭自动消息确认,调用`BasicAck`方法进行手动消息确认。
// autoAck:true - 开启自动消息确认,当消费者接收到消息后就自动发送ack信号,无论消息是否正确处理完毕。
channel.BasicConsume(queue: "task_queue", autoAck: false, consumer: consumer);
使用上面这段代码,我们可以确定的是,即使一个Worker在处理消息时,我们通过使用CTRL + C
来终止它,也不会丢失任何消息。Worker挂掉不久,所有未确认的消息将会被重新分发。
忘记确认
遗漏BasicAck
是一个常见的错误。这是一个很简单的错误,但导致的后果却是严重的。当客户端退出时(看起来像是随机分发的),消息将会被重新分发,但是RabbitMQ会吃掉越来越多的内存,因为它不能释放未确认的消息。
为了调试这种错误,您可以使用rabbitmqctl
来打印messages_unacknowledged
字段:sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
在Windows上,删除
sudo
:rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
消息持久化
我们已经学习了如何确保即使消费者挂掉,任务也不会丢失。但是如果RabbitMQ服务器停止,我们的任务还是会丢失。
当RabbitMQ退出或崩溃时,它会忘记已存在的队列和消息,除非告诉它不要这样做。为了确保消息不会丢失,有两件事是必须的:我们需要将队列和消息标记为持久。
首先,我们需要确保RabbitMQ永远不会丢失我们的队列。为了做到这一点,我们需要把队列声明是持久的(Durable):
// 声明队列,通过指定durable参数为true,对消息进行持久化处理。
channel.QueueDeclare(queue: "hello",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
虽然这个命令本身是正确的,但是它在当前设置中不会起作用。那是因为我们已经定义过一个名为hello
的队列,并且这个队列不是持久化的。RabbitMQ不允许使用不同的参数重新定义已经存在的队列,并会向尝试执行该操作的程序返回一个错误。但有一个快速的解决办法 - 让我们用不同的名称声明一个队列,例如task_queue
:
channel.QueueDeclare(queue: "task_queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
注意,该声明队列QueueDeclare
方法的更改需要同时应用于生产者和消费者代码。
此时,我们可以确定的是,即使RabbitMQ重新启动,task_queue
队列也不会丢失。现在我们需要将我们的消息标记为持久的(Persistent) - 通过将IBasicProperties.Persistent
设置为true
。
// 将消息标记为持久性。
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
关于消息持久性的说明
将消息标记为Persistent
并不能完全保证消息不会丢失。尽管它告诉RabbitMQ将消息保存到磁盘,但当RabbitMQ接收到消息并且尚未保存消息时仍有一段时间间隔。此外,RabbitMQ不会为每条消息执行fsync(2)
- 它可能只是保存到缓存中,并没有真正写入磁盘。消息的持久化保证并不健壮,但对于简单的任务队列来说已经足够了。如果您需要一个更加健壮的保证,可以使用 发布者确认。
公平调度
您可能已经注意到调度仍然无法完全按照我们期望的方式工作。例如,在有两个Worker的情况下,假设所有奇数消息都很庞大、偶数消息都很轻量,那么一个Worker将会一直忙碌,而另一个Worker几乎不做任何工作。是的,RabbitMQ并不知道存在这种情况,它仍然会平均地分发消息。
发生这种情况是因为RabbitMQ只是在消息进入队列后就将其分发。它不会去检查每个消费者所拥有的未确认消息的数量。它只是盲目地将第n条消息分发给第n位消费者。
为了改变上述这种行为,我们可以使用参数设置prefetchCount = 1
的basicQos
方法。
这就告诉RabbitMQ同一时间不要给一个Worker发送多条消息。或者换句话说,不要向一个Worker发送新的消息,直到它处理并确认了前一个消息。
相反,它会这个消息调度给下一个不忙碌的Worker。
channel.BasicQos(0, 1, false);
关于队列大小的说明
如果所有的Worker都很忙,您的队列可能会被填满。请留意这一点,可以尝试添加更多的Worker,或者使用其他策略。
组合在一起
我们NewTask.cs
类的最终代码:
using System;
using RabbitMQ.Client;
using System.Text;
class NewTask
{
public static void Main(string[] args)
{
// 实例化连接工厂。
var factory = new ConnectionFactory() { HostName = "localhost" };
// 创建连接、信道。
using(var connection = factory.CreateConnection())
using(var channel = connection.CreateModel())
{
// 声明队列,标记为持久性。
channel.QueueDeclare(queue: "task_queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
// 获取发送消息。
var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
// 将消息标记为持久性。
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
// 发送数据包。
channel.BasicPublish(exchange: "",
routingKey: "task_queue",
basicProperties: properties,
body: body);
Console.WriteLine(" [x] Sent {0}", message);
}
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
private static string GetMessage(string[] args)
{
return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
}
}
还有我们的Worker.cs
:
using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using System.Threading;
class Worker
{
public static void Main()
{
// 实例化连接工厂。
var factory = new ConnectionFactory() { HostName = "localhost" };
// 创建连接、信道。
using(var connection = factory.CreateConnection())
using(var channel = connection.CreateModel())
{
// 声明队列,标记为持久性。
channel.QueueDeclare(queue: "task_queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
// 告知RabbitMQ,在未收到当前Worker的消息确认信号时,不再分发给消息,确保公平调度。
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
Console.WriteLine(" [*] Waiting for messages.");
// 构建消费者实例。
var consumer = new EventingBasicConsumer(channel);
// 绑定消息接收事件。
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
// 模拟耗时操作。
int dots = message.Split('.').Length - 1;
Thread.Sleep(dots * 1000);
Console.WriteLine(" [x] Done");
// 手动发送消息确认信号。
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: "task_queue",
autoAck: false,
consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
使用消息确认机制和BasicQ
您可以创建一个工作队列。即使RabbitMQ重新启动,通过持久性选项也可让任务继续存在。
有关IModel
方法和IBasicProperties
的更多信息,您可以在线浏览 RabbitMQ .NET客户端API参考。
现在,我们可以继续阅读 教程[3],学习如何向多个消费者发送相同的消息。
写在最后
本文翻译自RabbitMQ官方教程C#版本。如本文介绍内容与官方有所出入,请以官方最新内容为准。水平有限,翻译的不好请见谅,如有翻译错误还请指正。
- 原文链接:RabbitMQ tutorial - Work Queues
- 实验环境:RabbitMQ 3.7.4 、.NET Core 2.1.2、Visual Studio Code
- 最后更新:2018-04-03