? 简介
在之前的 C# 消息队列之 RabbitMQ 基础入门 中介绍了 RabbitMQ 的基本用法,其实要更全面的掌握 RabbitMQ 这个消息队列服务,我们还需要掌握以下内容:
1. 轮询分发
2. 消息响应
3. 公平分发
4. 消息持久化
1. 轮询分发
默认情况下,RabbitMQ 会按照消息顺序依次分发给每个消费者,也就是每个消费者接收到的消息基本是平均的,这种分发方式称之为轮询分发。话不多说看示例:
1) 生产者代码(其他代码省略)
//随机一个“生产者”名称
string pname = $"[P{(new Random()).Next(1, 1000)}]";
Console.WriteLine($"生产者{pname}已启动:");
for (int i = 0; i < 6; i++)
{
string message;
if (i == 1) //第二条消息,需要耗时10秒
message = $"{pname}, task{i + 1}, time of 10 seconds";
else
message = $"{pname}, task{i + 1}, time of 1 seconds";
byte[] body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish("", "myQueue1", properties, body);
Console.WriteLine($"生产者{message}\t{DateTime.Now.ToString("HH:mm:ss fff")}");
}
2) 消费者代码(其他代码省略)
//随机一个“消费者”名称
string cname = $"[C{(new Random()).Next(1, 1000)}]";
Console.WriteLine($"消费者{cname}已开启");
consumer.Received += (sender, e) =>
{
byte[] body = e.Body; //消息字节数组
string message = Encoding.UTF8.GetString(body); //消息内容
Console.WriteLine($"消费者{cname}接收到消息:{message}\t{DateTime.Now.ToString("HH:mm:ss fff")},开始处理...");
//模拟处理耗时操作
string second = Regex.Replace(message, ".+time of ", "");
second = Regex.Replace(second, " seconds", "");
System.Threading.Thread.Sleep(1000 * int.Parse(second));
};
3) 运行代码
首先,开启两个消费者,再打开一个生产者发送6条消息,运行结果如下:
从以上结果中可以得出以下结论:
1. 一共6条消息,2个消费者接收的消息数量是一致的(各3条);
2. 尽管 Task2 消息处理时间较长,也会等待该消息处理完成之后,再处理被依次分发的消息,所有导致了 Task4 的处理时间在 Task5 之后;
3. 同一时间段一个消费者只会处理一条消息,只有当该消息处理完成后,才会处理下一条消息(或者说接收下一条消息),并不会同时处理多条消息。