在使用rabbitmq过程中可能会遇到的情况:
1、消费者挂了,导致队列积压,超出最大长度,超时若有新的消息过来,这消息会被丢弃或发送不成功;
2、对于特殊场景的消息,可能有需要处理消息超时的需求;
以上两种情况,应用rabbitmq的死信队列可以比较好的解决。死信队列也是一个正常的exchange,其下绑定了一个或多个queue。这个exchange有dead-letter-exchange的标志。
而消息进入死信队列的情况主要有
1. 消息设置了ttl,并且超时;
2. 消息被拒绝;
3. 队列未被消费的消息数量达到了队列的最大长度。
废话不多说,下面做个简单的demo,直接上代码(.net core3.1 + Rabbitmq.Client 6.1):
1、DMRabbitmqClient.cs, 简单封装的rabbitmqclient。
public class DMRabbitmqClient
{
private ConnectionFactory Factory { get; }
public DMRabbitmqClient(string host, string username = "guest", string password = "guest", int port = 5672)
{
Factory = new ConnectionFactory();
Factory.UserName = username;
Factory.HostName = host;
Factory.Password = password;
Factory.Port = port;
}
public IConnection CreateConnection()
{
return Factory.CreateConnection();
}
public IModel GetModel(IConnection connection)
{
return connection.CreateModel();
}
public void ExchangeDeclare(IModel model, string exchangename, string type, bool durable, bool autoDelete, IDictionary<string, object> arguments)
{
if (arguments == null)
arguments = new Dictionary<string, object>();
model.ExchangeDeclare(exchangename, type, durable, autoDelete, arguments);
}
public void QueueDeclare(IModel model, string queue, bool durable, bool exclusive, bool autoDelete, IDictionary<string, object> arguments)
{
if (arguments == null)
arguments = new Dictionary<string, object>();
model.QueueDeclare(queue, durable, exclusive, autoDelete, arguments);
}
public void QueueBind(IModel model, string queue, string exchange, string routeKey, IDictionary<string, object> arguments)
{
if (arguments == null)
arguments = new Dictionary<string, object>();
model.QueueBind(queue, exchange, routeKey, arguments);
}
public void Publish(IModel model, string dto, string exchange, string routekey)
{
IBasicProperties properties = model.CreateBasicProperties();
properties.DeliveryMode = 2;
properties.Expiration = "10000";
properties.ContentEncoding = "UTF-8";
var content = Encoding.UTF8.GetBytes(dto);
model.BasicPublish(exchange, routekey, false, properties, content);
}
public EventingBasicConsumer AddConsumerEvent(IModel model,EventHandler<BasicDeliverEventArgs> action)
{
var eventConsumer = new EventingBasicConsumer(model);
eventConsumer.Received += action;
return eventConsumer;
}
}
2、生产者代码
```
class Program
{
static void Main(string[] args)
{
var client = new DMRabbitmqClient("127.0.0.1", "guest", "guest");
IConnection connection = client.CreateConnection();
var model = client.GetModel(connection);
//定义死信队列
client.ExchangeDeclare(model, "test_exchange_dl", "topic", false, false, null);
client.QueueDeclare(model, "test_queue_dl", false, false, false, null);
//exchange接收到消息都往这个队列里面发
client.QueueBind(model, "test_queue_dl", "test_exchange_dl", "#", null);
//定义死信队列消费者
var dlconsumer = client.AddConsumerEvent(model,DLReceiver);
model.BasicConsume("test_queue_dl", true, "", false, true, null, dlconsumer);
//定义正常的队列
IModel common_model = client.GetModel(connection);
client.ExchangeDeclare(common_model, "test_exchange", "topic", false, false, null);
var arguments = new Dictionary<string, object>();
arguments.Add("x-dead-letter-exchange", "test_exchange_dl");//定义reject或者ttl超时后,发送到死信队列的exchange
arguments.Add("x-max-length", 3);//定义队列最大长度
client.QueueDeclare(common_model, "test_queue", false, false, false, arguments);
client.QueueBind(common_model, "test_queue", "test_exchange", "test_#", null);
Console.WriteLine("输入1、发送消息;2、退出");
string key = Console.ReadLine();
bool isExists = false;
while (!string.IsNullOrWhiteSpace(key))
{
switch (key)
{
case "1":
client.Publish(common_model, $"{DateTime.Now:yyyy-MM-dd HH:mm:ss},您好。", "test_exchange", "test_#");
break;
case "2":
isExists = true;
break;
}
if (isExists)
break;
Console.WriteLine("输入1、发送消息;2、退出");
key = Console.ReadLine();
}
Console.WriteLine("please pass any key to exists");
Console.ReadKey();
}
private static void DLReceiver(object sender, BasicDeliverEventArgs args)
{
string msg = Encoding.UTF8.GetString(args.Body.ToArray());
Console.WriteLine($"{DateTime.Now:yyyy-MM-dd HH:mm:ss}死信队列接收到消息:{msg}");
}
}
2、消费者代码
```
class Program
{
static void Main(string[] args)
{
var client = new DMRabbitmqClient("127.0.0.1", "guest", "guest");
IConnection connection = client.CreateConnection();
var model = client.GetModel(connection);
//model.ExchangeDeclare("test_exchange", "topic", false, false, null);
//model.QueueDeclare("test_queue", true, true, false);
Console.WriteLine("请输入:1、拉取消息并自动ack;2、拉取消息但是不ack;3、退出。");
string key = Console.ReadLine();
bool isExists = false;
while (!string.IsNullOrWhiteSpace(key))
{
switch (key)
{
case "1":
PullMessage(model, true);
break;
case "2":
PullMessage(model, false);
break;
case "3":
isExists = true;
break;
}
if (isExists)
break;
Console.WriteLine("请输入:1、拉取消息并自动ack;2、拉取消息但是不ack;3、退出。");
key = Console.ReadLine();
}
Console.WriteLine("please pass any keys to exists");
Console.ReadKey();
}
private static void PullMessage(IModel model ,bool isAck)
{
var result = model.BasicGet("test_queue", isAck);
if (result != null && !result.Body.IsEmpty)
{
string message = Encoding.UTF8.GetString(result.Body.ToArray());
Console.WriteLine($"{DateTime.Now:yyyy-MM-dd HH:mm:ss}接收到消息:{message},ack:{isAck}");
if (!isAck)
{
Console.WriteLine("ack为false,拒绝...");
model.BasicReject(1, false);//拒绝ack,并且deliverytag设置为1,requeue设置为false
}
}
}
}
实践
1、生产者发送消息,但是不主动拉取消息,此时消息进入私信队列。如下图
2、生产者发送消息,消费者端接收消息,但是调用basereject方法拒绝。如下图
3、此处生产者代码已经设置了最大队列长度为3,我们尝试连续发送消息,让队列积压的消息达到最大长度,如下图