背景
最近对接一个TCP协议,这个协议定义的非常好,有头标识和校验位!但是,接口提供方定的通信协议还是欠缺考虑...正常情况下,这个协议是没有问题的,但是在高并发的情况下,客户端方就需要点真功夫了。
分析
该通信协议中,没有使用事务号,也就是说,用同一条连接连续发送两次请求,你不知道返回的响应数据是哪个请求的。你可能会说,第一个响应是第一个请求的,第二个响应是第二个请求的!这是绝对的理想情况,服务器处理所有请求的耗时一样,网络没有抖动。如果耗时和网络抖动都无法确定的情况下,响应顺序与请求顺序就有可能不一致!结论就是,如果只有一个连接,那么所有请求只能排队,一个请求处理完才能发送下一个。那如果高并发请求比较多怎么办?
如何用没有事务号的通信协议实现高并发?Http?bingo!Http的请求方式就是解决这个问题的榜样!Http就是在一个连接中一次只发送一个请求,请求没有收到响应或者超时断开的情况下,是不会发送第二个请求的。那么Http是怎么并发的?通过同时发送多个Http请求来实现并发!以Http为榜样,那么就可以通过一个TCP连接一次发送一个请求,这个请求没有结束之前不再发送新的请求,如此来保证请求与响应的匹配,通过多个连接同时发送多个请求来实现高并发。
解决思路有了,还得考虑下性能。肯定不能每次发送请求就新建一个连接,请求结束就断开连接,这样很不TCP!那就考虑复用TCP连接,请求未结束之前,这个连接不可用,请求结束后不断开连接,可供新请求使用。并发量肯定有低峰和高峰,低峰的时候,不需要保留太多的连接;高峰的时候,如果不加以控制,TCP连接数量会飙升,也需要加以控制!
实现
基于以上分析,需要写一个TCP连接池,该连接池可配置最少、最多连接数量,以及连接可空闲时间。当需要发送请求时,如果没有可用连接,并且池内连接数量不超过最大数量,就创建新的连接;当池内数量达到上限,并且需要发送请求时,需要阻塞,直至有可用的连接;当连接空闲时间达到设置的可空闲时间,并且池内连接数量大于最小值时,清理掉多余的连接。
以下是连接池类,ConnectionInfo类和接口IConnection没有发,但是不影响看连接池代码。欢迎拍砖!
1 /// <summary> 2 /// 连接池 3 /// 对于不能单连并发的,用此连接池实现多连接并发,可控制最少最大连接数量 4 /// 连接保活 5 /// </summary> 6 public class ConnectionPool : IDisposable 7 { 8 private ConcurrentQueue<ConnectionInfo> m_IdleConnections; 9 private ConcurrentDictionary<long, ConnectionInfo> m_Connections; 10 private object m_ConnectionLock = new object(); 11 12 private int m_Max; 13 private int m_Min; 14 private TimeSpan m_Expired; 15 private Func<IConnection> m_ConnectionFactory; 16 17 private bool m_IsConnected = false; 18 /// <summary> 19 /// 是否连接 20 /// </summary> 21 public bool IsConnected 22 { 23 get 24 { 25 return m_IsConnected; 26 } 27 set 28 { 29 if (m_IsConnected != value) 30 { 31 m_IsConnected = value; 32 if (m_IsConnected) 33 { 34 Connected?.Invoke(this, EventArgs.Empty); 35 } 36 else 37 { 38 Disconnected?.Invoke(this, EventArgs.Empty); 39 } 40 } 41 } 42 } 43 44 /// <summary> 45 /// 构造 46 /// </summary> 47 /// <param name="minConnection">最小连接数</param> 48 /// <param name="maxConnection">最大连接数</param> 49 /// <param name="expired">过期时间 空闲时间超过此值则会被清理掉</param> 50 /// <param name="connectionFactory">创建连接的回调</param> 51 public ConnectionPool(int minConnection, int maxConnection, TimeSpan expired, Func<IConnection> connectionFactory) 52 { 53 m_Min = minConnection; 54 m_Max = maxConnection; 55 m_Expired = expired; 56 m_ConnectionFactory = connectionFactory; 57 m_Connections = new ConcurrentDictionary<long, ConnectionInfo>(); 58 m_IdleConnections = new ConcurrentQueue<ConnectionInfo>(); 59 StartClear(); 60 } 61 62 private long m_No = 0; 63 private object m_NoLock = new object(); 64 65 /// <summary> 66 /// 生成新的编号 67 /// </summary> 68 /// <returns></returns> 69 private long NewNo() 70 { 71 long no; 72 lock (m_NoLock) 73 { 74 no = ++m_No; 75 } 76 return no; 77 } 78 79 /// <summary> 80 /// 抓取连接 81 /// </summary> 82 /// <returns></returns> 83 private ConnectionInfo GrabConnection() 84 { 85 //判断空闲队列是否有 86 ConnectionInfo connectionInfo = null; 87 //开始抓取连接 88 Begin: 89 while (!m_IsDisposed && !m_IdleConnections.IsEmpty) 90 { 91 if (m_IdleConnections.TryDequeue(out connectionInfo)) 92 { 93 if (m_Connections.ContainsKey(connectionInfo.No)) 94 {//取到的连接没有被销毁 95 break; 96 } 97 else 98 {//不可用则销毁此连接,继续寻找 99 DestoryConnection(connectionInfo); 100 connectionInfo = null; 101 } 102 } 103 else 104 { 105 connectionInfo = null; 106 } 107 } 108 if (!m_IsDisposed && connectionInfo == null) 109 {//没有取到连接 110 if (!CreateOrAddConnection(null)) 111 {//创建连接失败,睡眠10ms 112 Thread.Sleep(10); 113 } 114 //继续抓取连接 115 goto Begin; 116 } 117 return connectionInfo; 118 } 119 120 /// <summary> 121 /// 创建或者添加连接 122 /// </summary> 123 public bool CreateOrAddConnection(IConnection connection) 124 { 125 bool rst = false; 126 if (m_Connections.Count < m_Max) 127 { 128 lock (m_ConnectionLock) 129 { 130 if (m_Connections.Count < m_Max) 131 { 132 if (connection == null) 133 { 134 connection = m_ConnectionFactory.Invoke(); 135 } 136 var conInfo = new ConnectionInfo() 137 { 138 No = NewNo(), 139 Connection = connection, 140 CreateTime = DateTime.Now 141 }; 142 if (m_Connections.TryAdd(conInfo.No, conInfo)) 143 { 144 connection.Connect(); 145 IsConnected = connection.IsConnected; 146 m_IdleConnections.Enqueue(conInfo); 147 rst = true; 148 OutputDebugInfo(string.Format("创建{0}", conInfo.No)); 149 } 150 } 151 } 152 } 153 return rst; 154 } 155 156 /// <summary> 157 /// 销毁连接 158 /// </summary> 159 /// <param name="connectionInfo"></param> 160 private void DestoryConnection(ConnectionInfo connectionInfo) 161 { 162 ConnectionInfo temp; 163 while (m_Connections.ContainsKey(connectionInfo.No)) 164 { 165 if (m_Connections.TryRemove(connectionInfo.No, out temp)) 166 { 167 break; 168 } 169 else 170 { 171 Thread.Sleep(10); 172 } 173 } 174 try 175 { 176 connectionInfo.Connection.Disconnect(); 177 } 178 catch (Exception ex) 179 { 180 OutputDebugInfo(string.Format("断开连接失败:{0}", ex.ToString())); 181 } 182 OutputDebugInfo(string.Format("销毁{0}", connectionInfo.No)); 183 } 184 185 /// <summary> 186 /// 发送 187 /// </summary> 188 /// <typeparam name="T1"></typeparam> 189 /// <typeparam name="T2"></typeparam> 190 /// <param name="rstData"></param> 191 /// <param name="callback"></param> 192 public void Send(byte[] rstData, Action<string, byte[]> callback) 193 { 194 var connInfo = GrabConnection(); 195 if (connInfo == null) 196 { 197 OutputDebugInfo("未获取到可用连接"); 198 callback?.Invoke("无可用连接", null); 199 return; 200 } 201 OutputDebugInfo(string.Format("获取到连接{0}", connInfo.No)); 202 if (connInfo.Connection.IsConnected) 203 { 204 connInfo.LastUsedTime = DateTime.Now; 205 try 206 { 207 connInfo.Connection.Send(rstData, 208 (error, rndData) => 209 { 210 callback?.BeginInvoke(error, rndData, null, null); 211 //重新加入空闲队列 212 if (string.IsNullOrEmpty(error) && m_Connections.ContainsKey(connInfo.No)) 213 { 214 m_IdleConnections.Enqueue(connInfo); 215 } 216 }); 217 } 218 catch (Exception ex) 219 { 220 OutputDebugInfo(string.Format("发送失败:{0}", ex.ToString())); 221 } 222 } 223 else 224 { 225 callback?.BeginInvoke("断开连接", null, null, null); 226 DestoryConnection(connInfo); 227 } 228 } 229 230 /// <summary> 231 /// 是否已清理 232 /// </summary> 233 private bool m_IsDisposed = false; 234 /// <summary> 235 /// 清理 236 /// </summary> 237 public void Dispose() 238 { 239 if (m_IsDisposed) 240 { 241 return; 242 } 243 m_IsDisposed = true; 244 245 Clear(true); 246 OutputDebugInfo("释放完成"); 247 } 248 249 private object m_ClearLock = new object(); 250 251 /// <summary> 252 /// 开始清理 253 /// </summary> 254 private void StartClear() 255 { 256 ThreadPool.QueueUserWorkItem( 257 (obj) => 258 { 259 Clear(false); 260 Thread.Sleep(1000); 261 if (!m_IsDisposed) 262 { 263 StartClear(); 264 } 265 }); 266 } 267 268 /// <summary> 269 /// 清理 270 /// </summary> 271 private void Clear(bool isForece) 272 { 273 lock (m_ClearLock) 274 { 275 var nos = m_Connections.Keys.ToList(); 276 ConnectionInfo connInfo = null; 277 foreach (var no in nos) 278 { 279 if (m_Connections.TryGetValue(no, out connInfo)) 280 { 281 try 282 { 283 connInfo.Connection.Send(new byte[0], null); 284 IsConnected = true; 285 } 286 catch 287 { 288 IsConnected = false; 289 } 290 if ((m_Connections.Count > m_Min && DateTime.Now - connInfo.LastUsedTime > m_Expired) || isForece) 291 { 292 DestoryConnection(connInfo); 293 } 294 } 295 } 296 } 297 } 298 299 /// <summary> 300 /// 301 /// </summary> 302 /// <param name="debugInfo"></param> 303 private void OutputDebugInfo(string debugInfo) 304 { 305 System.Diagnostics.Debug.WriteLine(string.Format("{0}-ConnectionPool-{1}", m_ConnectionFactory == null ? "Server" : "Client", debugInfo)); 306 } 307 308 public event EventHandler Connected; 309 public event EventHandler Disconnected; 310 }