C# RabbitMq 连接池封装

设计思路,基于前人的杰作,略作改造。

首先我们要知道:

1.创建Connection代价是巨大的(Rabbitmq没有实现连接池机制)。

2.基于Connection创建Channel代价小的多,理论上,一个connection创建channel次数是没有限制的。

(说得再多,还是图片具体点。)流程如下图所示:

C# RabbitMq 连接池封装

 

这里做了个小小改造,就是根据系统自身的需要创建自己所需要的连接。优先使用空闲连接,而不是还没达到最大的连接限制时,优先进行创建新连接。

代码改造实现如下:

 public class MQHelper
    {
        private const string CacheKey_MQConnectionSetting = "MQConnectionSetting";
        private const string CacheKey_MQMaxConnectionCount = "MQMaxConnectionCount";

        // 空闲连接对象队列
        private readonly static ConcurrentQueue<IConnection> FreeConnectionQueue;
        //使用中(忙)连接对象字典
        private readonly static ConcurrentDictionary<IConnection, bool> BusyConnectionDic;
        //连接池使用率
        private readonly static ConcurrentDictionary<IConnection, int> MQConnectionPoolUsingDicNew;

        private readonly static Semaphore MQConnectionPoolSemaphore;
        //释放和添加连接时的锁对象
        private readonly static object freeConnLock = new object(), addConnLock = new object(), getConnLock = new object();

        // 连接总数
        private static int connCount = 0;
        //默认最大保持可用连接数
        public const int DefaultMaxConnectionCount = 50;

        //默认最大连接数可访问次数
        public const int DefaultMaxConnectionUsingCount = 10000;

        public const int DefaultRetryConnectionCount = 1;//默认重试连接次数

        /// <summary>
        /// 初始化最大连接数
        /// </summary>
        private static int MaxConnectionCount
        {
            get
            {
                //if (HttpRuntime.Cache[CacheKey_MQMaxConnectionCount] != null)
                //{
                //    return Convert.ToInt32(HttpRuntime.Cache[CacheKey_MQMaxConnectionCount]);
                //}
                //else
                //{
                //    int mqMaxConnectionCount = 0;
                //    string mqMaxConnectionCountStr = ConfigurationManager.AppSettings[CacheKey_MQMaxConnectionCount];
                //    if (!int.TryParse(mqMaxConnectionCountStr, out mqMaxConnectionCount) || mqMaxConnectionCount <= 0)
                //    {
                //        mqMaxConnectionCount = DefaultMaxConnectionCount;
                //    }

                //    string appConfigPath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "App.config");
                //    HttpRuntime.Cache.Insert(CacheKey_MQMaxConnectionCount, mqMaxConnectionCount, new CacheDependency(appConfigPath));
                    return 50;
                //}

            }

        }

        /// <summary>
        /// 建立连接
        /// </summary>
        /// <param name="hostName">服务器地址</param>
        /// <param name="userName">登录账号</param>
        /// <param name="passWord">登录密码</param>
        /// <returns></returns>
        private ConnectionFactory CrateFactory()
        {
            var mqConfigDom = MqConfigDomFactory.CreateConfigDomInstance(); //获取MQ的配置
            var connectionfactory = new ConnectionFactory();
            connectionfactory.HostName = mqConfigDom.MqHost;
            connectionfactory.UserName = mqConfigDom.MqUserName;
            connectionfactory.Password = mqConfigDom.MqPassword;
            connectionfactory.Port = mqConfigDom.MqPort;
            connectionfactory.VirtualHost = mqConfigDom.MqVirtualHost;
        
            return connectionfactory;
        }

        /// <summary>
        /// 创建connection连接
        /// </summary>
        /// <returns></returns>
        public IConnection CreateMQConnection()
        {
            var factory = CrateFactory();
            factory.AutomaticRecoveryEnabled = true;//自动重连
            var connection = factory.CreateConnection();
            connection.AutoClose = false;
            return connection;
        }

        /// <summary>
        /// 初始化
        /// </summary>
        static MQHelper()
        {
            FreeConnectionQueue = new ConcurrentQueue<IConnection>();
            BusyConnectionDic = new ConcurrentDictionary<IConnection, bool>();
            MQConnectionPoolUsingDicNew = new ConcurrentDictionary<IConnection, int>();//连接池使用率
            string semaphoreName = "MQConnectionPoolSemaphore";
            try
            {
                if (null == MQConnectionPoolSemaphore)
                {
                    bool semaphoreWasCreated;
                    SemaphoreSecurity sems = new SemaphoreSecurity();
                    MQConnectionPoolSemaphore = new Semaphore(MaxConnectionCount, MaxConnectionCount, "MQConnectionPoolSemaphore", out semaphoreWasCreated);//信号量,控制同时并发可用线程数

                    if (semaphoreWasCreated)
                    {
                        MQConnectionPoolSemaphore = Semaphore.OpenExisting("MQConnectionPoolSemaphore", SemaphoreRights.FullControl);
                    }

                }
            }
            catch (WaitHandleCannotBeOpenedException)
            {
                bool semaphoreWasCreated;


                string user = Environment.UserDomainName + "\\" + Environment.UserName;
                SemaphoreSecurity semSec = new SemaphoreSecurity();

                SemaphoreAccessRule rule = new SemaphoreAccessRule(user, SemaphoreRights.Synchronize | SemaphoreRights.Modify, AccessControlType.Deny);
                semSec.AddAccessRule(rule);

                rule = new SemaphoreAccessRule(user, SemaphoreRights.ReadPermissions | SemaphoreRights.ChangePermissions, AccessControlType.Allow);
                semSec.AddAccessRule(rule);

                // Create a Semaphore object that represents the system
                // semaphore named by the constant ‘semaphoreName‘, with
                // maximum count three, initial count three, and the
                // specified security access. The Boolean value that 
                // indicates creation of the underlying system object is
                // placed in semaphoreWasCreated.
                //
                MQConnectionPoolSemaphore = new Semaphore(MaxConnectionCount, MaxConnectionCount, semaphoreName, out semaphoreWasCreated, semSec);
            }
            catch (UnauthorizedAccessException ex)
            {
                MQConnectionPoolSemaphore = Semaphore.OpenExisting(semaphoreName, SemaphoreRights.ReadPermissions | SemaphoreRights.ChangePermissions);
                // Get the current ACL. This requires 
                // SemaphoreRights.ReadPermissions.
                SemaphoreSecurity semSec = MQConnectionPoolSemaphore.GetAccessControl();
                string user = Environment.UserDomainName + "\\" + Environment.UserName;
                SemaphoreAccessRule rule = new SemaphoreAccessRule(user, SemaphoreRights.Synchronize | SemaphoreRights.Modify, AccessControlType.Deny);
                semSec.RemoveAccessRule(rule);//移除
                // Now grant the user the correct rights.
                rule = new SemaphoreAccessRule(user, SemaphoreRights.Synchronize | SemaphoreRights.Modify, AccessControlType.Allow);
                semSec.AddAccessRule(rule); //重新授权
                MQConnectionPoolSemaphore.SetAccessControl(semSec);
                MQConnectionPoolSemaphore = Semaphore.OpenExisting(semaphoreName);
            }
        }

//释放连接
public void CreateNewConnection2FreeQueue() { IConnection mqConnection = null; if (FreeConnectionQueue.Count + BusyConnectionDic.Count < MaxConnectionCount)//如果已有连接数小于最大可用连接数,则直接创建新连接 { lock (addConnLock) { if (FreeConnectionQueue.Count + BusyConnectionDic.Count < MaxConnectionCount) { mqConnection = CreateMQConnection(); FreeConnectionQueue.Enqueue(mqConnection);//加入到空闲队列连接集合中 MQConnectionPoolUsingDicNew[mqConnection] = 0; } } } } public string MqConnectionInfo() { int scount = 0; try { scount=MQConnectionPoolSemaphore.Release(); MQConnectionPoolSemaphore.WaitOne(1); scount -= 1; } catch(SemaphoreFullException ex) { scount = MaxConnectionCount; } return $"当前信号量计数={scount},当前空闲连接长度 ={ FreeConnectionQueue.Count},当前忙连接长度 ={ BusyConnectionDic.Count},连接使用频率信息如下:已达最大使用次数的有:{MQConnectionPoolUsingDicNew.Where(l=>l.Value>= DefaultMaxConnectionUsingCount).Count()},剩余{MQConnectionPoolUsingDicNew.Where(l => l.Value < DefaultMaxConnectionUsingCount).Count()}\r\n {JsonConvert.SerializeObject(MQConnectionPoolUsingDicNew)}"; } /// <summary> /// 在mq连接池中创建新连接 /// </summary> /// <returns></returns> public IConnection CreateMQConnectionInPoolNew(ref StringBuilder spanMsg) { Stopwatch sw = new Stopwatch(); long spanSum = 0; sw.Start(); // IConnection mqConnection = null; bool waitFree = false; int tryTimeCount = 0; try { TryEnter: waitFree = MQConnectionPoolSemaphore.WaitOne(10);//当<MaxConnectionCount时,会直接进入,否则会等待10ms继续监测直到空闲连接信号出现 if(!waitFree) { tryTimeCount++; spanMsg.AppendLine($"阻塞10ms,空闲信号=[{waitFree}],进入第[{tryTimeCount}]次尝试,"); if (tryTimeCount <= 99) { goto TryEnter; } } spanMsg.Append($"空闲信号=[{waitFree}],"); if (!FreeConnectionQueue.TryDequeue(out mqConnection)) //没有可用的 { sw.Stop(); spanSum += sw.ElapsedMilliseconds; spanMsg.Append($"尝试获取可用空闲连接,没有可用空闲连接,span:{sw.ElapsedMilliseconds}ms,"); sw.Restart(); if (FreeConnectionQueue.Count + BusyConnectionDic.Count < MaxConnectionCount)//如果已有连接数小于最大可用连接数,则直接创建新连接 { lock (addConnLock) { if (FreeConnectionQueue.Count + BusyConnectionDic.Count < MaxConnectionCount) { mqConnection = CreateMQConnection(); BusyConnectionDic[mqConnection] = true;//加入到忙连接集合中 MQConnectionPoolUsingDicNew[mqConnection] = 1; sw.Stop(); spanSum += sw.ElapsedMilliseconds; spanMsg.Append($"创建一个新连接,并加到使用中连接集合中,span:{sw.ElapsedMilliseconds}ms,"); return mqConnection; } } } sw.Stop(); spanSum += sw.ElapsedMilliseconds; spanMsg.Append($"没有空闲连接,已到最大连接数{FreeConnectionQueue.Count + BusyConnectionDic.Count},等待连接释放,span:{sw.ElapsedMilliseconds}ms,"); if(waitFree)//重试需要释放之前占用的信号量 { int scount=MQConnectionPoolSemaphore.Release(); waitFree = false; spanMsg.Append($"释放空闲信号,当前信号计数={scount},"); } return CreateMQConnectionInPoolNew(); } else if (MQConnectionPoolUsingDicNew[mqConnection] + 1 > DefaultMaxConnectionUsingCount || !mqConnection.IsOpen) //如果取到空闲连接,判断是否使用次数是否超过最大限制,超过则释放连接并重新创建 { if (mqConnection.IsOpen) { mqConnection.Close(); } mqConnection.Dispose(); mqConnection = CreateMQConnection(); MQConnectionPoolUsingDicNew[mqConnection] = 0; sw.Stop(); spanSum += sw.ElapsedMilliseconds; spanMsg.Append($"获取到的可用空闲连接可能因累计使用通道次数{MQConnectionPoolUsingDicNew[mqConnection] + 1 }>最大可用通道次数{DefaultMaxConnectionUsingCount},或连接状态处于不是开启状态={mqConnection.IsOpen},释放当前连接并重建一个连接,span:{sw.ElapsedMilliseconds}ms,"); } sw.Restart(); BusyConnectionDic[mqConnection] = true;//加入到忙连接集合中 MQConnectionPoolUsingDicNew[mqConnection] = MQConnectionPoolUsingDicNew[mqConnection] + 1;//使用次数加1 sw.Stop(); spanSum += sw.ElapsedMilliseconds; spanMsg.AppendLine($"将获取到得空闲连接放入到忙集合中,并累加使用次数+1={MQConnectionPoolUsingDicNew[mqConnection] + 1},span:{sw.ElapsedMilliseconds}ms,"); return mqConnection; } catch(UnauthorizedAccessException ex) { throw ex; } catch (Exception ex) { if (null != mqConnection) { ResetMQConnectionToFree(mqConnection); } else if(waitFree)//信号量没释放,则进行释放 { MQConnectionPoolSemaphore.Release(); } return null; } finally { spanMsg.AppendLine( $"获取一个可用连接过程耗费{spanSum}ms,当前空闲连接长度={FreeConnectionQueue.Count},当前忙连接长度={BusyConnectionDic.Count}"); } } /// <summary> /// 在mq连接池中创建新连接 /// </summary> /// <returns></returns> public IConnection CreateMQConnectionInPoolNew() { //string spanMsg = string.Empty; StringBuilder spanMsg = new StringBuilder(); return CreateMQConnectionInPoolNew(ref spanMsg); } /// <summary> /// 释放连接池中的连接 /// </summary> /// <param name="connection"></param> private void ResetMQConnectionToFree(IConnection connection) { try { lock (freeConnLock) { bool result = false; if (BusyConnectionDic.TryRemove(connection, out result)) //从忙队列中取出 { } else { //if(!BusyConnectionDic.TryRemove(connection,out result)) } if (FreeConnectionQueue.Count + BusyConnectionDic.Count > MaxConnectionCount)//如果因为高并发出现极少概率的>MaxConnectionCount,则直接释放该连接 { connection.Close(); connection.Dispose(); } else if (connection.IsOpen) //如果OPEN状态才加入空闲链接队列 { FreeConnectionQueue.Enqueue(connection);//加入到空闲队列,以便持续提供连接服务 } } } catch { throw; } finally { MQConnectionPoolSemaphore.Release();//释放一个空闲连接信号 } } /// <summary> /// 发送消息 /// </summary> /// <param name="connection">消息队列连接对象</param> /// <typeparam name="T">消息类型</typeparam> /// <param name="queueName">队列名称</param> /// <param name="durable">是否持久化</param> /// <param name="msg">消息</param> /// <returns></returns> public string SendMsg(IConnection connection, string queueName, string msg, bool durable = true, string exchange = "", string type = "fanout") { bool reTry = false; int reTryCount = 0; string sendErrMsg = string.Empty; do { reTry = false; try { using (var channel = connection.CreateModel())//建立通讯信道 { // 参数从前面开始分别意思为:队列名称,是否持久化,独占的队列,不使用时是否自动删除,其他参数 channel.QueueDeclare(queueName, durable, false, false, null); if (!exchange.IsNullOrEmpty()) { channel.ExchangeDeclare(exchange: exchange, type: type, durable: durable); } channel.QueueBind(queueName, exchange, ""); //ExchangeDeclare(model, exchange, RabbitMqProxyConfig.ExchangeType.Fanout, isProperties); //QueueDeclare(model, queue, isProperties); //model.QueueBind(queue, exchange, routingKey); var properties = channel.CreateBasicProperties(); properties.DeliveryMode = 2;//1表示不持久,2.表示持久化 ////properties.Type = ""; ////properties.CorrelationId if (!durable) properties = null; var body = Encoding.UTF8.GetBytes(msg); channel.BasicPublish(exchange, queueName, properties, body); } sendErrMsg = string.Empty; } catch (Exception ex) { if ((++reTryCount) <= DefaultRetryConnectionCount) { ResetMQConnectionToFree(connection); connection = CreateMQConnectionInPoolNew(); reTry = true; } return ex.ToString(); } finally { if (!reTry) { ResetMQConnectionToFree(connection); } } } while (reTry); return sendErrMsg; } /// <summary> /// 消费消息 /// </summary> /// <param name="connection">消息队列连接对象</param> /// <param name="queueName">队列名称</param> /// <param name="durable">是否持久化</param> /// <param name="dealMessage">消息处理函数</param> /// <param name="saveLog">保存日志方法,可选</param> public void ConsumeMsg(IConnection connection, string queueName, bool durable, Func<string, ConsumeAction> dealMessage, Action<string, Exception> saveLog = null) { try { using (var channel = connection.CreateModel()) { channel.QueueDeclare(queueName, durable, false, false, null); //获取队列 channel.BasicQos(0, 1, false); //分发机制为触发式 var consumer = new QueueingBasicConsumer(channel); //建立消费者 // 从左到右参数意思分别是:队列名称、是否读取消息后直接删除消息,消费者 channel.BasicConsume(queueName, false, consumer); while (true) //如果队列中有消息 { ConsumeAction consumeResult = ConsumeAction.RETRY; var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); //获取消息 string message = null; try { var body = ea.Body; message = Encoding.UTF8.GetString(body); consumeResult = dealMessage(message); } catch (Exception ex) { if (saveLog != null) { saveLog(message, ex); } } if (consumeResult == ConsumeAction.ACCEPT) { channel.BasicAck(ea.DeliveryTag, false); //消息从队列中删除 } else if (consumeResult == ConsumeAction.RETRY) { channel.BasicNack(ea.DeliveryTag, false, true); //消息重回队列 } else { channel.BasicNack(ea.DeliveryTag, false, false); //消息直接丢弃 } } } } catch (Exception ex) { if (saveLog != null) { saveLog("QueueName:" + queueName, ex); } throw ex; } finally { ResetMQConnectionToFree(connection); } } /// <summary> /// 依次获取单个消息 /// </summary> /// <param name="connection">消息队列连接对象</param> /// <param name="QueueName">队列名称</param> /// <param name="durable">持久化</param> /// <param name="dealMessage">处理消息委托</param> public void ConsumeMsgSingle(IConnection connection, string QueueName, bool durable, Func<string, ConsumeAction> dealMessage) { try { using (var channel = connection.CreateModel()) { channel.QueueDeclare(QueueName, durable, false, false, null); //获取队列 channel.BasicQos(0, 1, false); //分发机制为触发式 uint msgCount = channel.MessageCount(QueueName); if (msgCount > 0) { var consumer = new QueueingBasicConsumer(channel); //建立消费者 // 从左到右参数意思分别是:队列名称、是否读取消息后直接删除消息,消费者 channel.BasicConsume(QueueName, false, consumer); ConsumeAction consumeResult = ConsumeAction.RETRY; var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); //获取消息 try { var body = ea.Body; var message = Encoding.UTF8.GetString(body); consumeResult = dealMessage(message); } catch (Exception ex) { throw ex; } finally { if (consumeResult == ConsumeAction.ACCEPT) { channel.BasicAck(ea.DeliveryTag, false); //消息从队列中删除 } else if (consumeResult == ConsumeAction.RETRY) { channel.BasicNack(ea.DeliveryTag, false, true); //消息重回队列 } else { channel.BasicNack(ea.DeliveryTag, false, false); //消息直接丢弃 } } } else { dealMessage(string.Empty); } } } catch (Exception ex) { throw ex; } finally { ResetMQConnectionToFree(connection); } } /// <summary> /// 获取队列消息数 /// </summary> /// <param name="connection"></param> /// <param name="QueueName"></param> /// <returns></returns> public int GetMessageCount(IConnection connection, string QueueName) { int msgCount = 0; bool reTry = false; int reTryCount = 0; do { reTry = false; try { using (var channel = connection.CreateModel()) { channel.QueueDeclare(QueueName, true, false, false, null); //获取队列 msgCount = (int)channel.MessageCount(QueueName); } } catch (Exception ex) { //if (BaseUtil.IsIncludeException<SocketException>(ex)) { if ((++reTryCount) <= DefaultRetryConnectionCount)//可重试1次 { ResetMQConnectionToFree(connection); connection = CreateMQConnectionInPoolNew(); reTry = true; } } throw ex; } finally { if (!reTry) { ResetMQConnectionToFree(connection); } } } while (reTry); return msgCount; } } public enum ConsumeAction { ACCEPT, // 消费成功 RETRY, // 消费失败,可以放回队列重新消费 REJECT, // 消费失败,直接丢弃 }

在站点开启多个工作进程的情况下,信号量的控制显得很重要,它可以有效的控制并发。信号量是针对单机而言的。通过计数可以有效控制使用中的连接个数。从而有效控制连接池的总体数量,防止过度的创建带来的毁灭性打击。

C# RabbitMq 连接池封装

上一篇:Windows API——文件处理函数


下一篇:NGINX 命令 重启 WINDOWS