用C#实现TCP连接池

背景

   最近对接一个TCP协议,这个协议定义的非常好,有头标识和校验位!但是,接口提供方定的通信协议还是欠缺考虑...正常情况下,这个协议是没有问题的,但是在高并发的情况下,客户端方就需要点真功夫了。

分析

    该通信协议中,没有使用事务号,也就是说,用同一条连接连续发送两次请求,你不知道返回的响应数据是哪个请求的。你可能会说,第一个响应是第一个请求的,第二个响应是第二个请求的!这是绝对的理想情况,服务器处理所有请求的耗时一样,网络没有抖动。如果耗时和网络抖动都无法确定的情况下,响应顺序与请求顺序就有可能不一致!结论就是,如果只有一个连接,那么所有请求只能排队,一个请求处理完才能发送下一个。那如果高并发请求比较多怎么办?

    如何用没有事务号的通信协议实现高并发?Http?bingo!Http的请求方式就是解决这个问题的榜样!Http就是在一个连接中一次只发送一个请求,请求没有收到响应或者超时断开的情况下,是不会发送第二个请求的。那么Http是怎么并发的?通过同时发送多个Http请求来实现并发!以Http为榜样,那么就可以通过一个TCP连接一次发送一个请求,这个请求没有结束之前不再发送新的请求,如此来保证请求与响应的匹配,通过多个连接同时发送多个请求来实现高并发。

    解决思路有了,还得考虑下性能。肯定不能每次发送请求就新建一个连接,请求结束就断开连接,这样很不TCP!那就考虑复用TCP连接,请求未结束之前,这个连接不可用,请求结束后不断开连接,可供新请求使用。并发量肯定有低峰和高峰,低峰的时候,不需要保留太多的连接;高峰的时候,如果不加以控制,TCP连接数量会飙升,也需要加以控制!

实现

    基于以上分析,需要写一个TCP连接池,该连接池可配置最少、最多连接数量,以及连接可空闲时间。当需要发送请求时,如果没有可用连接,并且池内连接数量不超过最大数量,就创建新的连接;当池内数量达到上限,并且需要发送请求时,需要阻塞,直至有可用的连接;当连接空闲时间达到设置的可空闲时间,并且池内连接数量大于最小值时,清理掉多余的连接。

    以下是连接池类,ConnectionInfo类和接口IConnection没有发,但是不影响看连接池代码。欢迎拍砖!

  1     /// <summary>
  2     /// 连接池
  3     /// 对于不能单连并发的,用此连接池实现多连接并发,可控制最少最大连接数量
  4     /// 连接保活
  5     /// </summary>
  6     public class ConnectionPool : IDisposable
  7     {
  8         private ConcurrentQueue<ConnectionInfo> m_IdleConnections;
  9         private ConcurrentDictionary<long, ConnectionInfo> m_Connections;
 10         private object m_ConnectionLock = new object();
 11 
 12         private int m_Max;
 13         private int m_Min;
 14         private TimeSpan m_Expired;
 15         private Func<IConnection> m_ConnectionFactory;
 16 
 17         private bool m_IsConnected = false;
 18         /// <summary>
 19         /// 是否连接
 20         /// </summary>
 21         public bool IsConnected
 22         {
 23             get
 24             {
 25                 return m_IsConnected;
 26             }
 27             set
 28             {
 29                 if (m_IsConnected != value)
 30                 {
 31                     m_IsConnected = value;
 32                     if (m_IsConnected)
 33                     {
 34                         Connected?.Invoke(this, EventArgs.Empty);
 35                     }
 36                     else
 37                     {
 38                         Disconnected?.Invoke(this, EventArgs.Empty);
 39                     }
 40                 }
 41             }
 42         }
 43 
 44         /// <summary>
 45         /// 构造
 46         /// </summary>
 47         /// <param name="minConnection">最小连接数</param>
 48         /// <param name="maxConnection">最大连接数</param>
 49         /// <param name="expired">过期时间 空闲时间超过此值则会被清理掉</param>
 50         /// <param name="connectionFactory">创建连接的回调</param>
 51         public ConnectionPool(int minConnection, int maxConnection, TimeSpan expired, Func<IConnection> connectionFactory)
 52         {
 53             m_Min = minConnection;
 54             m_Max = maxConnection;
 55             m_Expired = expired;
 56             m_ConnectionFactory = connectionFactory;
 57             m_Connections = new ConcurrentDictionary<long, ConnectionInfo>();
 58             m_IdleConnections = new ConcurrentQueue<ConnectionInfo>();
 59             StartClear();
 60         }
 61 
 62         private long m_No = 0;
 63         private object m_NoLock = new object();
 64 
 65         /// <summary>
 66         /// 生成新的编号
 67         /// </summary>
 68         /// <returns></returns>
 69         private long NewNo()
 70         {
 71             long no;
 72             lock (m_NoLock)
 73             {
 74                 no = ++m_No;
 75             }
 76             return no;
 77         }
 78 
 79         /// <summary>
 80         /// 抓取连接
 81         /// </summary>
 82         /// <returns></returns>
 83         private ConnectionInfo GrabConnection()
 84         {
 85             //判断空闲队列是否有
 86             ConnectionInfo connectionInfo = null;
 87         //开始抓取连接
 88         Begin:
 89             while (!m_IsDisposed && !m_IdleConnections.IsEmpty)
 90             {
 91                 if (m_IdleConnections.TryDequeue(out connectionInfo))
 92                 {
 93                     if (m_Connections.ContainsKey(connectionInfo.No))
 94                     {//取到的连接没有被销毁
 95                         break;
 96                     }
 97                     else
 98                     {//不可用则销毁此连接,继续寻找
 99                         DestoryConnection(connectionInfo);
100                         connectionInfo = null;
101                     }
102                 }
103                 else
104                 {
105                     connectionInfo = null;
106                 }
107             }
108             if (!m_IsDisposed && connectionInfo == null)
109             {//没有取到连接
110                 if (!CreateOrAddConnection(null))
111                 {//创建连接失败,睡眠10ms
112                     Thread.Sleep(10);
113                 }
114                 //继续抓取连接
115                 goto Begin;
116             }
117             return connectionInfo;
118         }
119 
120         /// <summary>
121         /// 创建或者添加连接
122         /// </summary>
123         public bool CreateOrAddConnection(IConnection connection)
124         {
125             bool rst = false;
126             if (m_Connections.Count < m_Max)
127             {
128                 lock (m_ConnectionLock)
129                 {
130                     if (m_Connections.Count < m_Max)
131                     {
132                         if (connection == null)
133                         {
134                             connection = m_ConnectionFactory.Invoke();
135                         }
136                         var conInfo = new ConnectionInfo()
137                         {
138                             No = NewNo(),
139                             Connection = connection,
140                             CreateTime = DateTime.Now
141                         };
142                         if (m_Connections.TryAdd(conInfo.No, conInfo))
143                         {
144                             connection.Connect();
145                             IsConnected = connection.IsConnected;
146                             m_IdleConnections.Enqueue(conInfo);
147                             rst = true;
148                             OutputDebugInfo(string.Format("创建{0}", conInfo.No));
149                         }
150                     }
151                 }
152             }
153             return rst;
154         }
155 
156         /// <summary>
157         /// 销毁连接
158         /// </summary>
159         /// <param name="connectionInfo"></param>
160         private void DestoryConnection(ConnectionInfo connectionInfo)
161         {
162             ConnectionInfo temp;
163             while (m_Connections.ContainsKey(connectionInfo.No))
164             {
165                 if (m_Connections.TryRemove(connectionInfo.No, out temp))
166                 {
167                     break;
168                 }
169                 else
170                 {
171                     Thread.Sleep(10);
172                 }
173             }
174             try
175             {
176                 connectionInfo.Connection.Disconnect();
177             }
178             catch (Exception ex)
179             {
180                 OutputDebugInfo(string.Format("断开连接失败:{0}", ex.ToString()));
181             }
182             OutputDebugInfo(string.Format("销毁{0}", connectionInfo.No));
183         }
184 
185         /// <summary>
186         /// 发送
187         /// </summary>
188         /// <typeparam name="T1"></typeparam>
189         /// <typeparam name="T2"></typeparam>
190         /// <param name="rstData"></param>
191         /// <param name="callback"></param>
192         public void Send(byte[] rstData, Action<string, byte[]> callback)
193         {
194             var connInfo = GrabConnection();
195             if (connInfo == null)
196             {
197                 OutputDebugInfo("未获取到可用连接");
198                 callback?.Invoke("无可用连接", null);
199                 return;
200             }
201             OutputDebugInfo(string.Format("获取到连接{0}", connInfo.No));
202             if (connInfo.Connection.IsConnected)
203             {
204                 connInfo.LastUsedTime = DateTime.Now;
205                 try
206                 {
207                     connInfo.Connection.Send(rstData,
208                     (error, rndData) =>
209                     {
210                         callback?.BeginInvoke(error, rndData, null, null);
211                         //重新加入空闲队列
212                         if (string.IsNullOrEmpty(error) && m_Connections.ContainsKey(connInfo.No))
213                         {
214                             m_IdleConnections.Enqueue(connInfo);
215                         }
216                     });
217                 }
218                 catch (Exception ex)
219                 {
220                     OutputDebugInfo(string.Format("发送失败:{0}", ex.ToString()));
221                 }
222             }
223             else
224             {
225                 callback?.BeginInvoke("断开连接", null, null, null);
226                 DestoryConnection(connInfo);
227             }
228         }
229 
230         /// <summary>
231         /// 是否已清理
232         /// </summary>
233         private bool m_IsDisposed = false;
234         /// <summary>
235         /// 清理
236         /// </summary>
237         public void Dispose()
238         {
239             if (m_IsDisposed)
240             {
241                 return;
242             }
243             m_IsDisposed = true;
244 
245             Clear(true);
246             OutputDebugInfo("释放完成");
247         }
248 
249         private object m_ClearLock = new object();
250 
251         /// <summary>
252         /// 开始清理
253         /// </summary>
254         private void StartClear()
255         {
256             ThreadPool.QueueUserWorkItem(
257                 (obj) =>
258                 {
259                     Clear(false);
260                     Thread.Sleep(1000);
261                     if (!m_IsDisposed)
262                     {
263                         StartClear();
264                     }
265                 });
266         }
267 
268         /// <summary>
269         /// 清理
270         /// </summary>
271         private void Clear(bool isForece)
272         {
273             lock (m_ClearLock)
274             {
275                 var nos = m_Connections.Keys.ToList();
276                 ConnectionInfo connInfo = null;
277                 foreach (var no in nos)
278                 {
279                     if (m_Connections.TryGetValue(no, out connInfo))
280                     {
281                         try
282                         {
283                             connInfo.Connection.Send(new byte[0], null);
284                             IsConnected = true;
285                         }
286                         catch
287                         {
288                             IsConnected = false;
289                         }
290                         if ((m_Connections.Count > m_Min && DateTime.Now - connInfo.LastUsedTime > m_Expired) || isForece)
291                         {
292                             DestoryConnection(connInfo);
293                         }
294                     }
295                 }
296             }
297         }
298 
299         /// <summary>
300         /// 
301         /// </summary>
302         /// <param name="debugInfo"></param>
303         private void OutputDebugInfo(string debugInfo)
304         {
305             System.Diagnostics.Debug.WriteLine(string.Format("{0}-ConnectionPool-{1}", m_ConnectionFactory == null ? "Server" : "Client", debugInfo));
306         }
307 
308         public event EventHandler Connected;
309         public event EventHandler Disconnected;
310     }

 

用C#实现TCP连接池

上一篇:APICloud推出“云定制”服务,在“无接触”下帮助企业应对数字化需求


下一篇:Lucene全文搜索之分词器:使用IK Analyzer中文分词器(修改IK Analyzer源码使其支持lucene5.5.x)