前言
RabbitMQ环境环境搭建及基本配置,在此不讨论。网上一大堆。
NET环境下,Rabbit库可以在官网或NUGET上查找得到。
生产者
static void Main(string[] args) { var factory = new ConnectionFactory();//连接工厂 factory.HostName = "127.0.0.1";//地址 factory.UserName = "Test";//登录名 factory.Password = "t123456!";//密码 using (var connection = factory.CreateConnection())//创建连接 { using (var channel = connection.CreateModel())//创建通道 { //创建队列,第二个参数bool:是否队列持久化 channel.QueueDeclare( queue: "test", //消息队列名称 durable: false,//消息队列是否持久化 exclusive: false,//消息队列是否被本次连接connection独享。(本次连接 //connection创建的信道可以共用).排外的queue在当前连接被断开的时候会 //自动消失(清除)无论是否设置了持久化. autoDelete: false,//消息队列是否自动删除。也就是说queue会清理自己,但 是是在最后一个connection断开的时候。 arguments: null);//参数对 var properties = channel.CreateBasicProperties();//可以为null properties.DeliveryMode = 2;//多个消费工作队列时,设置此属性 properties.SetPersistent(true);//消息持久化 string message = "Hello World"; var body = Encoding.UTF8.GetBytes(message); //发布消息 //第一个参数:交换器,空默认为direct //第二个参数:direct时,为队列名 //第三个参数:通道属性,可以是BasicProperties,也可以是属性接口 //第四个参数:消息正文 channel.BasicPublish("", "hello", properties, body); Console.WriteLine(" set {0}", message); } } }
消费者
static void Main(string[] args) { var factory = new ConnectionFactory();//连接工厂 factory.HostName = "127.0.0.1";//地址 factory.UserName = "Test";//登录名 factory.Password = "t123456!";//密码 using (var connection = factory.CreateConnection())//创建连接 { using (var channel = connection.CreateModel())//创建通道 { //创建队列,第二个参数bool:是否队列持久化 channel.QueueDeclare( queue: "test", //消息队列名称 durable: false,//消息队列是否持久化 exclusive: false,//消息队列是否被本次连接connection独享。(本次连接 //connection创建的信道可以共用).排外的queue在当前连接被断开的时候会 //自动消失(清除)无论是否设置了持久化. autoDelete: false,//消息队列是否自动删除。也就是说queue会清理自己,但 是是在最后一个connection断开的时候。 arguments: null);//参数对 //第一种接收方式:循环接收方式 var consumer = new QueueingBasicConsumer(channel);//消费者实例 channel.BasicConsume( queue: "Test", //队列名称 autoAck: false, //是否开启收到消息自动回复 consumer: consumer//消费者 ); //多个消费者确宝公平,设置同一时间一个消费只能接收一个消息;此方法慎用 channel.BasicQos(0, 1, false); while (true) { //队列消息对象 var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); var body = ea.Body; var message = Encoding.UTF8.GetString(body); //设置睡眠时间,可模拟工作队列多个消息者模式,自行决定 Thread.Sleep(5* 1000); //假如,前面未设置自动回复,则可以手动; //响应给RabbitMQ服务:收到并处理了消息。 channel.BasicAck(ea.DeliveryTag, false); //遇到无法处理的消息,拒绝且此消息是否放回队列中,发送给其他消费者 channel.BasicReject(ea.DeliveryTag, false); Console.WriteLine("Received {0}", message); Console.WriteLine("Done"); } //第二种接收方式:事件方式 //实例化一个事件型消费者 var consumer = new EventingBasicConsumer(channel); //订阅消费者接收消息的事件 consumer.Received += (model, ea) => { //获取并解析数据 var body = ea.Body; var message = Encoding.UTF8.GetString(body); //响应给RabbitMQ服务:收到并处理了消息。 channel.BasicAck(ea.DeliveryTag, false); //遇到无法处理的消息,拒绝且此消息是否放回队列中,发送给其他消费者 channel.BasicReject(ea.DeliveryTag, false); Console.WriteLine($"收到: {message}"); }; } } }