分布式中间件消息队列(3)---.Net Core 使用rabbitmq

一、通过install-package rabbitmq.client命令或nuget安装rabbitmq.client

二、rabbitmq操作

 

#region 1、生产者
                {
                    //1、创建rabbitmq连接
                    var rabbitmqFactory = new ConnectionFactory()
                    {
                        HostName = "localhost",//IP地址
                        Port = 5672,//端口
                        UserName = "admin",//用户账号
                        Password = "admin@123",//密码
                        VirtualHost = "/"
                    };
                    //创建连接对象
                    using (var connection = rabbitmqFactory.CreateConnection())
                    {
                        var channel = connection.CreateModel();//创建连接会话对象
                        string name = "product-create";

                        //2、声明一个队列
                        channel.QueueDeclare(
                            queue: name,//消息队列名称
                            durable: false,//是否持久化,true持久化,队列会保存磁盘,服务器重启时可以保证不丢失相关信息。
                            exclusive: false,//是否排他,true排他的,如果一个队列声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除.
                            autoDelete: false,//是否自动删除。true是自动删除。自动删除的前提是:致少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除.
                          arguments: null//设置队列的一些其它参数
                            );
                        string productJson = JsonConvert.SerializeObject(productCreateDto);
                        var body = Encoding.UTF8.GetBytes(productJson);

                        //3、发送消息
                        var properties = channel.CreateBasicProperties();
                        properties.Persistent = true; // 设置消息持久化(个性化控制)
                        channel.BasicPublish(exchange: "",
                                             routingKey: name,
                                             basicProperties: properties,
                                             body: body);
                        connection.Close();
                        channel.Close();
                        _logger.LogInformation("发送消息到rabbitMQ成功");
                    }
                }
                #endregion

 

2.2 消费者

 

 #region 1、工作队列(单消费者)
            {
                var channel = connection.CreateModel();

                // 2、定义队列
                channel.QueueDeclare(queue: "product-create",
                                     durable: false,
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: null);

                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {

                    Console.WriteLine($"model:{model}");
                    var body = ea.Body;
                    // 1、逻辑代码,添加商品到数据库
                    var message = Encoding.UTF8.GetString(body.ToArray());
                    Console.WriteLine(" [x] 创建商品 {0}", message);
                };

                channel.BasicConsume(queue: "product-create",
                                     autoAck: true, // 消息确认(防止消息重新消费)
                                     consumer: consumer);
            }
            #endregion

 

三、防止消息丢失和消息重复消费

3.1 防止消息丢失

使用消息确认机制

设置消息持久化

channel.QueueDeclare(
queue: name,//消息队列名称
durable: true,//是否持久化,true持久化,队列会保存磁盘,服务器重启时可以保证不丢失相关信息。
exclusive: false,//是否排他,true排他的,如果一个队列声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除.
autoDelete: false,//是否自动删除。true是自动删除。自动删除的前提是:致少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除.
arguments: null//设置队列的一些其它参数
);

3.2 防止消息重复消费

autoAck设置为true,设置消息消费应答机制,不会导致消息堆积;
如果autoAck为false就会产生消息堆积,导致重复消费
 channel.BasicConsume(queue: "product-create",
                                     autoAck: true, // 自动消息确认
                                     consumer: consumer);*/

 

上一篇:SpringCloud-Eureka


下一篇:C#递归法便利某个目录下的所有子文件夹和文件