C# 消息队列之 RabbitMQ 进阶篇

?  简介

在之前的 C# 消息队列之 RabbitMQ 基础入门 中介绍了 RabbitMQ 的基本用法,其实要更全面的掌握 RabbitMQ 这个消息队列服务,我们还需要掌握以下内容:

1.   轮询分发

2.   消息响应

3.   公平分发

4.   消息持久化

 

1.   轮询分发

默认情况下,RabbitMQ 会按照消息顺序依次分发给每个消费者,也就是每个消费者接收到的消息基本是平均的,这种分发方式称之为轮询分发。话不多说看示例:

1)   生产者代码(其他代码省略)

//随机一个“生产者”名称

string pname = $"[P{(new Random()).Next(1, 1000)}]";

Console.WriteLine($"生产者{pname}已启动:");

for (int i = 0; i < 6; i++)

{

    string message;

    if (i == 1) //第二条消息,需要耗时10

        message = $"{pname}, task{i + 1}, time of 10 seconds";

    else

        message = $"{pname}, task{i + 1}, time of 1 seconds";

    byte[] body = Encoding.UTF8.GetBytes(message);

    channel.BasicPublish("", "myQueue1", properties, body);

    Console.WriteLine($"生产者{message}\t{DateTime.Now.ToString("HH:mm:ss fff")}");

}

 

2)   消费者代码(其他代码省略)

//随机一个“消费者”名称

string cname = $"[C{(new Random()).Next(1, 1000)}]";

Console.WriteLine($"消费者{cname}已开启");

consumer.Received += (sender, e) =>

{

    byte[] body = e.Body;   //消息字节数组

    string message = Encoding.UTF8.GetString(body); //消息内容

    Console.WriteLine($"消费者{cname}接收到消息:{message}\t{DateTime.Now.ToString("HH:mm:ss fff")},开始处理...");

 

    //模拟处理耗时操作

    string second = Regex.Replace(message, ".+time of ", "");

    second = Regex.Replace(second, " seconds", "");

    System.Threading.Thread.Sleep(1000 * int.Parse(second));

};

 

3)   运行代码

首先,开启两个消费者,再打开一个生产者发送6条消息,运行结果如下:

C# 消息队列之 RabbitMQ 进阶篇

从以上结果中可以得出以下结论:

1.   一共6条消息,2个消费者接收的消息数量是一致的(各3条);

2.   尽管 Task2 消息处理时间较长,也会等待该消息处理完成之后,再处理被依次分发的消息,所有导致了 Task4 的处理时间在 Task5 之后;

3.   同一时间段一个消费者只会处理一条消息,只有当该消息处理完成后,才会处理下一条消息(或者说接收下一条消息),并不会同时处理多条消息。

 

C# 消息队列之 RabbitMQ 进阶篇

上一篇:Flink入门 - API


下一篇:Radware Alteon Radius认证+Windows Radius server