网上有很多Socket框架,但是我想,C#既然有Socket类,难道不是给人用的吗?
写了一个SocketServerHelper和SocketClientHelper,分别只有5、6百行代码,比不上大神写的,和业务代码耦合也比较重,但对新手非常友好,容易看懂。
支持返回值或回调,支持不定长度的数据包。客户端和服务端均支持断线重连。
自己本机测试,5000个客户端并发发送消息正常,CPU压力有点大。由于局域网机子性能差,局域网只测试了500个客户端并发发送消息正常。
短短1000多行代码,花了好多天心血,改了无数BUG,越写代码,越觉得自己资质平平,逻辑思维不够用。写Socket代码不像写一般的代码,实在不行加个try catch完事,这个东西既要稳定,又要性能,真的是每一个逻辑分支,每一个异常分支,都要想清楚,都要处理好,代码里我还是Exception用习惯了,没细分。
有时候为了解决一个BUG,找了一整天,也找不出BUG在哪,现在终于测试难过了,达到了自己的预想。
通过这几天的踩坑,测试,得出结论:
1、Socket TCP 不会丢包,TCP是可靠的。(本机测试、局域网测试,可能没有遇到更恶劣的网络环境)
2、Socket TCP 能够保证顺序,接收到的顺序和发送的顺序一致
3、代码里有数据校验,但是错误的分支永远都不会走,校验是一定能通过的,不存在数据校验不通过,把错误的数据包简单丢弃的情况,否则说明代码写的还是有BUG
以下是主要代码:
SocketServerHelper代码:
using Models; using Newtonsoft.Json; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Configuration; using System.Linq; using System.Net; using System.Net.Sockets; using System.Runtime.InteropServices; using System.Text; using System.Threading; using System.Threading.Tasks; namespace Utils { /// <summary> /// Socket服务端帮助类 /// </summary> public class SocketServerHelper { #region 变量 private int _serverPort; private Socket serverSocket; private ConcurrentDictionary<ClientSocket, string> clientSocketList = new ConcurrentDictionary<ClientSocket, string>(); private ConcurrentDictionary<string, ClientSocket> _dictRoomNoClientSocket = new ConcurrentDictionary<string, ClientSocket>(); private ConcurrentDictionary<string, ClientSocket> _dictDevNoClientSocket = new ConcurrentDictionary<string, ClientSocket>(); public int _CallbackTimeout = 20; /// <summary> /// 等待回调超时时间(单位:秒) /// </summary> public int CallbackTimeout { get { return _CallbackTimeout; } set { value = _CallbackTimeout; } } public int _WaitResultTimeout = 20; /// <summary> /// 等待返回结果超时时间(单位:秒) /// </summary> public int WaitResultTimeout { get { return _WaitResultTimeout; } set { value = _WaitResultTimeout; } } private object _lockSend = new object(); public event EventHandler<ReceivedSocketResultEventArgs> ReceivedSocketResultEvent; private System.Timers.Timer _checkClientTimer; #endregion #region SocketServerHelper 构造函数 public SocketServerHelper(int serverPort) { _serverPort = serverPort; } #endregion #region 启动服务 /// <summary> /// 启动服务 /// </summary> public bool StartServer() { try { IPEndPoint ipEndPoint = new IPEndPoint(IPAddress.Any, _serverPort); serverSocket = new Socket(ipEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp); serverSocket.Bind(ipEndPoint); serverSocket.Listen(5000); Thread thread = new Thread(new ThreadStart(delegate () { while (true) { Socket client = null; ClientSocket clientSocket = null; try { client = serverSocket.Accept(); client.SendTimeout = 20000; client.ReceiveTimeout = 20000; client.SendBufferSize = 10240; client.ReceiveBufferSize = 10240; clientSocket = new ClientSocket(client); clientSocketList.TryAdd(clientSocket, null); LogUtil.Log("监听到新的客户端,当前客户端数:" + clientSocketList.Count); } catch (Exception ex) { LogUtil.Error(ex); Thread.Sleep(1); continue; } if (client == null) continue; Task.Run(() => { try { byte[] buffer = new byte[10240]; SocketAsyncEventArgs args = new SocketAsyncEventArgs(); clientSocket.SocketAsyncArgs = args; clientSocket.SocketAsyncCompleted = (s, e) => { ReceiveData(clientSocket, e); }; args.SetBuffer(buffer, 0, buffer.Length); args.Completed += clientSocket.SocketAsyncCompleted; client.ReceiveAsync(args); } catch (Exception ex) { LogUtil.Error(ex); } }); } })); thread.IsBackground = true; thread.Start(); //检测客户端 _checkClientTimer = new System.Timers.Timer(); _checkClientTimer.AutoReset = false; _checkClientTimer.Interval = 1000; _checkClientTimer.Elapsed += CheckClient; _checkClientTimer.Start(); LogUtil.Log("服务已启动"); return true; } catch (Exception ex) { LogUtil.Error(ex, "启动服务出错"); return false; } } #endregion #region 检测客户端 /// <summary> /// 检测客户端 /// </summary> private void CheckClient(object sender, System.Timers.ElapsedEventArgs e) { Task.Run(() => { try { foreach (ClientSocket clientSkt in clientSocketList.Keys.ToArray()) { Socket skt = clientSkt.Socket; ClientSocket temp; string strTemp; DateTime now = DateTime.Now; if (now.Subtract(clientSkt.LastHeartbeat).TotalSeconds > 60) { clientSocketList.TryRemove(clientSkt, out strTemp); LogUtil.Log("客户端已失去连接,当前客户端数:" + clientSocketList.Count); ActionUtil.TryDoAction(() => { if (skt.Connected) skt.Disconnect(false); }); ActionUtil.TryDoAction(() => { skt.Close(); skt.Dispose(); if (clientSkt.SocketAsyncArgs != null) { if (clientSkt.SocketAsyncCompleted != null) { clientSkt.SocketAsyncArgs.Completed -= clientSkt.SocketAsyncCompleted; } clientSkt.SocketAsyncArgs.Dispose(); } clientSkt.SocketAsyncCompleted = null; clientSkt.SocketAsyncArgs = null; }); } } } catch (Exception ex) { LogUtil.Error(ex, "检测客户端出错"); } finally { _checkClientTimer.Start(); } }); } #endregion #region 接收数据 /// <summary> /// 处理接收的数据包 /// </summary> private void ReceiveData(ClientSocket clientSkt, SocketAsyncEventArgs e) { if (clientSkt == null) return; Socket skt = clientSkt.Socket; try { CopyTo(e.Buffer, clientSkt.Buffer, 0, e.BytesTransferred); #region 校验数据 if (clientSkt.Buffer.Count < 4) { if (skt.Connected) skt.ReceiveAsync(e); return; } else { byte[] bArrHeader = new byte[4]; CopyTo(clientSkt.Buffer, bArrHeader, 0, 0, bArrHeader.Length); string strHeader = Encoding.ASCII.GetString(bArrHeader); if (strHeader.ToUpper() == "0XFF") { if (clientSkt.Buffer.Count < 5) { if (skt.Connected) skt.ReceiveAsync(e); return; } else { byte[] bArrType = new byte[1]; CopyTo(clientSkt.Buffer, bArrType, 4, 0, bArrType.Length); if (bArrType[0] == 0) { } //心跳包 else if (bArrType[0] == 2 || bArrType[0] == 4) //注册包、返回值包 { if (clientSkt.Buffer.Count < 9) { if (skt.Connected) skt.ReceiveAsync(e); return; } else { byte[] bArrLength = new byte[4]; CopyTo(clientSkt.Buffer, bArrLength, 5, 0, bArrLength.Length); int dataLength = BitConverter.ToInt32(bArrLength, 0); if (dataLength == 0 || clientSkt.Buffer.Count < dataLength + 9) { if (skt.Connected) skt.ReceiveAsync(e); return; } } } else { LogUtil.Error("type错误,丢掉错误数据,重新接收"); clientSkt.Buffer.Clear(); //把错误的数据丢掉 if (skt.Connected) skt.ReceiveAsync(e); return; } } } else { LogUtil.Error("不是0XFF,丢掉错误数据,重新接收"); clientSkt.Buffer.Clear(); //把错误的数据丢掉 if (skt.Connected) skt.ReceiveAsync(e); return; } } #endregion SocketData data = null; do { data = ProcessSocketData(clientSkt); } while (data != null); if (skt.Connected) skt.ReceiveAsync(e); } catch (Exception ex) { LogUtil.Error(ex, "处理接收的数据包 异常"); } } #endregion #region 处理接收的数据包 /// <summary> /// 处理接收的数据包 /// </summary> private SocketData ProcessSocketData(ClientSocket clientSkt) { int readLength = 0; SocketData data = ResolveBuffer(clientSkt.Buffer, out readLength); if (data != null) { if (readLength > 0) clientSkt.RemoveBufferData(readLength); if (data.Type == 0) //收到心跳包 { clientSkt.LastHeartbeat = DateTime.Now; //心跳应答 if (clientSkt.RoomNo != null || clientSkt.DevNo != null) { ThreadHelper.Run(() => { lock (clientSkt.LockSend) { byte[] bArrHeader = Encoding.ASCII.GetBytes("0XFF"); SocketHelper.Send(clientSkt.Socket, bArrHeader); SocketHelper.Send(clientSkt.Socket, new byte[] { 0x01 }); } }); } else { LogUtil.Log("没有注册信息"); } LogUtil.Log("收到心跳包,客户端连接正常,roomNo=" + clientSkt.RoomNo + ",devNo=" + clientSkt.DevNo); } if (data.Type == 2) //收到注册包 { if (data.SocketRegisterData != null && clientSkt != null) { ClientSocket temp; if (data.SocketRegisterData.RoomNo != null) _dictRoomNoClientSocket.TryRemove(data.SocketRegisterData.RoomNo, out temp); if (data.SocketRegisterData.DevNo != null) _dictDevNoClientSocket.TryRemove(data.SocketRegisterData.DevNo, out temp); clientSkt.RoomNo = data.SocketRegisterData.RoomNo; clientSkt.DevNo = data.SocketRegisterData.DevNo; if (data.SocketRegisterData.RoomNo != null) _dictRoomNoClientSocket.TryAdd(data.SocketRegisterData.RoomNo, clientSkt); if (data.SocketRegisterData.DevNo != null) _dictDevNoClientSocket.TryAdd(data.SocketRegisterData.DevNo, clientSkt); LogUtil.Log("收到注册包,roomNo=" + clientSkt.RoomNo + ",devNo=" + clientSkt.DevNo); //注册反馈 ThreadHelper.Run(() => { lock (clientSkt.LockSend) { byte[] bArrHeader = Encoding.ASCII.GetBytes("0XFF"); SocketHelper.Send(clientSkt.Socket, bArrHeader); SocketHelper.Send(clientSkt.Socket, new byte[] { 0x05 }); } }); } } if (data.Type == 4) //收到返回值包 { ThreadHelper.Run(() => { if (data.SocketResult != null) clientSkt.CallbackDict.TryAdd(data.SocketResult.callbackId, data.SocketResult); if (ReceivedSocketResultEvent != null) { ReceivedSocketResultEvent(null, new Models.ReceivedSocketResultEventArgs(data.SocketResult)); } }); LogUtil.Log("收到返回值包,roomNo=" + clientSkt.RoomNo + ",devNo=" + clientSkt.DevNo); } } return data; } #endregion #region ResolveBuffer /// <summary> /// 解析字节数组 /// </summary> private SocketData ResolveBuffer(List<byte> buffer, out int readLength) { SocketData socketData = null; readLength = 0; try { if (buffer.Count < 4) return null; byte[] bArrHeader = new byte[4]; CopyTo(buffer, bArrHeader, 0, 0, bArrHeader.Length); readLength += bArrHeader.Length; string strHeader = Encoding.ASCII.GetString(bArrHeader); if (strHeader.ToUpper() == "0XFF") { if (buffer.Count < 5) return null; byte[] bArrType = new byte[1]; CopyTo(buffer, bArrType, 4, 0, bArrType.Length); readLength += bArrType.Length; byte bType = bArrType[0]; socketData = new SocketData(); socketData.Type = bType; if (socketData.Type == 2) { if (buffer.Count < 9) return null; byte[] bArrLength = new byte[4]; CopyTo(buffer, bArrLength, 5, 0, bArrLength.Length); readLength += bArrLength.Length; int dataLength = BitConverter.ToInt32(bArrLength, 0); if (dataLength == 0 || buffer.Count < dataLength + 9) return null; byte[] dataBody = new byte[dataLength]; CopyTo(buffer, dataBody, 9, 0, dataBody.Length); readLength += dataBody.Length; string jsonString = Encoding.UTF8.GetString(dataBody); socketData.SocketRegisterData = JsonConvert.DeserializeObject<SocketRegisterData>(jsonString); } if (socketData.Type == 4) { if (buffer.Count < 9) return null; byte[] bArrLength = new byte[4]; CopyTo(buffer, bArrLength, 5, 0, bArrLength.Length); readLength += bArrLength.Length; int dataLength = BitConverter.ToInt32(bArrLength, 0); if (dataLength == 0 || buffer.Count < dataLength + 9) return null; byte[] dataBody = new byte[dataLength]; CopyTo(buffer, dataBody, 9, 0, dataBody.Length); readLength += dataBody.Length; string jsonString = Encoding.UTF8.GetString(dataBody); socketData.SocketResult = JsonConvert.DeserializeObject<SocketResult>(jsonString); } } else { LogUtil.Error("不是0XFF"); return null; } } catch (Exception ex) { LogUtil.Error(ex, "解析字节数组 出错"); return null; } return socketData; } #endregion #region CopyTo /// <summary> /// 数组复制 /// </summary> private void CopyTo(byte[] bArrSource, List<byte> listTarget, int sourceIndex, int length) { for (int i = 0; i < length; i++) { if (sourceIndex + i < bArrSource.Length) { listTarget.Add(bArrSource[sourceIndex + i]); } } } /// <summary> /// 数组复制 /// </summary> private void CopyTo(List<byte> listSource, byte[] bArrTarget, int sourceIndex, int targetIndex, int length) { for (int i = 0; i < length; i++) { if (targetIndex + i < bArrTarget.Length && sourceIndex + i < listSource.Count) { bArrTarget[targetIndex + i] = listSource[sourceIndex + i]; } } } #endregion #region 停止服务 /// <summary> /// 停止服务 /// </summary> public void StopServer() { try { foreach (ClientSocket clientSocket in clientSocketList.Keys.ToArray()) { Socket socket = clientSocket.Socket; ActionUtil.TryDoAction(() => { if (socket.Connected) socket.Disconnect(false); }); ActionUtil.TryDoAction(() => { socket.Close(); socket.Dispose(); }); } clientSocketList.Clear(); _dictDevNoClientSocket.Clear(); _dictRoomNoClientSocket.Clear(); if (serverSocket != null) { ActionUtil.TryDoAction(() => { if (serverSocket.Connected) serverSocket.Disconnect(false); }); ActionUtil.TryDoAction(() => { serverSocket.Close(); serverSocket.Dispose(); }); } LogUtil.Log("服务已停止"); } catch (Exception ex) { LogUtil.Error(ex, "停止服务出错"); } } #endregion #region 释放资源 /// <summary> /// 释放资源 /// </summary> public void Dispose() { if (_checkClientTimer != null) { _checkClientTimer.Stop(); _checkClientTimer.Close(); } } #endregion #region Send /// <summary> /// Send 单个发送 并等待结果 /// </summary> /// <returns>false:发送失败 true:发送成功,但接收端是否处理成功要等待返回结果</returns> public SocketResult Send(WebApiMsgContent msgContent, string roomNo, string devNo) { SocketData data = new SocketData(); data.Type = 3; data.MsgContent = msgContent; ClientSocket clientSocket = null; if (roomNo != null) _dictRoomNoClientSocket.TryGetValue(roomNo, out clientSocket); if (devNo != null) _dictDevNoClientSocket.TryGetValue(devNo, out clientSocket); if (clientSocket != null) { if (string.IsNullOrWhiteSpace(msgContent.callbackId)) { msgContent.callbackId = Guid.NewGuid().ToString("N"); } Send(clientSocket, data); return WaitSocketResult(clientSocket, msgContent.callbackId); } else { SocketResult socketResult = new SocketResult(); socketResult.success = false; socketResult.errorMsg = "客户端不存在"; return socketResult; } } /// <summary> /// Send 单个发送 /// </summary> /// <returns>false:发送失败 true:发送成功,但接收端是否处理成功要等待返回结果</returns> public void Send(WebApiMsgContent msgContent, string roomNo, string devNo, Action<SocketResult> callback = null) { SocketData data = new SocketData(); data.Type = 3; data.MsgContent = msgContent; ClientSocket clientSocket = null; if (roomNo != null) _dictRoomNoClientSocket.TryGetValue(roomNo, out clientSocket); if (devNo != null) _dictDevNoClientSocket.TryGetValue(devNo, out clientSocket); if (clientSocket != null) { if (string.IsNullOrWhiteSpace(msgContent.callbackId)) { msgContent.callbackId = Guid.NewGuid().ToString("N"); } if (callback != null) { WaitCallback(clientSocket, msgContent.callbackId, callback); } Send(clientSocket, data); } else { SocketResult socketResult = new SocketResult(); socketResult.success = false; socketResult.errorMsg = "客户端不存在"; if (callback != null) callback(socketResult); } } /// <summary> /// 等待回调 /// </summary> private void WaitCallback(ClientSocket clientSocket, string callbackId, Action<SocketResult> callback = null) { DateTime dt = DateTime.Now.AddSeconds(_CallbackTimeout); System.Timers.Timer timer = new System.Timers.Timer(); timer.AutoReset = false; timer.Interval = 100; timer.Elapsed += (s, e) => { try { SocketResult socketResult; if (!clientSocket.CallbackDict.TryGetValue(callbackId, out socketResult) && DateTime.Now < dt) { timer.Start(); return; } SocketResult sktResult; clientSocket.CallbackDict.TryRemove(callbackId, out sktResult); if (socketResult == null) { socketResult = new SocketResult(); socketResult.success = false; socketResult.errorMsg = "超时"; } if (callback != null) callback(socketResult); timer.Close(); } catch (Exception ex) { LogUtil.Error("WaitCallback error" + ex); } }; timer.Start(); } /// <summary> /// 等待SocketResult /// </summary> private SocketResult WaitSocketResult(ClientSocket clientSocket, string callbackId) { SocketResult socketResult; DateTime dt = DateTime.Now.AddSeconds(_WaitResultTimeout); while (!clientSocket.CallbackDict.TryGetValue(callbackId, out socketResult) && DateTime.Now < dt) { Thread.Sleep(10); } SocketResult sktResult; clientSocket.CallbackDict.TryRemove(callbackId, out sktResult); if (socketResult == null) { socketResult = new SocketResult(); socketResult.success = false; socketResult.errorMsg = "超时"; } return socketResult; } /// <summary> /// Send /// </summary> /// <returns>false:发送失败 true:发送成功,但不表示对方已收到</returns> private void Send(ClientSocket clientSocket, SocketData data) { bool bl = false; Socket socket = clientSocket.Socket; lock (clientSocket.LockSend) { byte[] bArrHeader = Encoding.ASCII.GetBytes("0XFF"); //发送header bl = SocketHelper.Send(socket, bArrHeader); if (data.Type == 1) { if (bl) bl = SocketHelper.Send(socket, new byte[] { 0x01 }); //发送type } if (data.Type == 3) { if (bl) bl = SocketHelper.Send(socket, new byte[] { 0x03 }); //发送type if (data.MsgContent != null) { byte[] bArrData = null; if (bl) bArrData = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(data.MsgContent)); if (bl) bl = SocketHelper.Send(socket, BitConverter.GetBytes(bArrData.Length)); //发送length if (bl) bl = SocketHelper.Send(socket, bArrData); //发送body } } } } #endregion } }
SocketClientHelper代码:
using Models; using Newtonsoft.Json; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Configuration; using System.Linq; using System.Net; using System.Net.Sockets; using System.Text; using System.Threading; using System.Threading.Tasks; namespace Utils { /// <summary> /// Socket客户端帮助类 /// </summary> public class SocketClientHelper { #region 变量 private string _serverIP; private int _serverPort; private object _lockSend = new object(); private Socket clientSocket; private SocketAsyncEventArgs _socketAsyncArgs; public EventHandler<SocketAsyncEventArgs> _socketAsyncCompleted { get; set; } private System.Timers.Timer heartbeatTimer; public event EventHandler<SocketReceivedEventArgs> SocketReceivedEvent; private System.Timers.Timer _checkServerTimer; private DateTime _lastHeartbeat; private List<byte> _buffer = new List<byte>(); private string _roomNo; private string _devNo; private bool _registerSuccess = false; public string RoomNo { get { return _roomNo; } } public string DevNo { get { return _devNo; } } /// <summary> /// 删除接收到的一个包 /// </summary> private void RemoveBufferData(int count) { for (int i = 0; i < count; i++) { if (_buffer.Count > 0) { _buffer.RemoveAt(0); } } } #endregion #region SocketClientHelper 构造函数 public SocketClientHelper(string serverIP, int serverPort) { _serverIP = serverIP; _serverPort = serverPort; } #endregion #region 连接服务器 /// <summary> /// 连接服务器 /// </summary> public bool ConnectServer() { try { if (clientSocket == null || !clientSocket.Connected) { if (clientSocket != null) { clientSocket.Close(); clientSocket.Dispose(); } string ip = ConfigurationManager.AppSettings["ServerIP"]; string hostName = ConfigurationManager.AppSettings["HostName"]; int port = Convert.ToInt32(ConfigurationManager.AppSettings["ServerPort"]); IPEndPoint ipep = null; if (hostName != null) { IPHostEntry host = Dns.GetHostEntry(hostName); IPAddress ipAddr = host.AddressList[0]; ipep = new IPEndPoint(ipAddr, port); } else { ipep = new IPEndPoint(IPAddress.Parse(ip), port); } clientSocket = new Socket(ipep.AddressFamily, SocketType.Stream, ProtocolType.Tcp); clientSocket.SendTimeout = 20000; clientSocket.ReceiveTimeout = 20000; clientSocket.SendBufferSize = 10240; clientSocket.ReceiveBufferSize = 10240; try { clientSocket.Connect(ipep); } catch (Exception ex) { LogUtil.Error(ex); return false; } if (clientSocket == null || !clientSocket.Connected) return false; _lastHeartbeat = DateTime.Now; Task.Run(() => { try { byte[] buffer = new byte[10240]; _socketAsyncArgs = new SocketAsyncEventArgs(); _socketAsyncArgs.SetBuffer(buffer, 0, buffer.Length); _socketAsyncCompleted = (s, e) => { ReceiveData(clientSocket, e); }; _socketAsyncArgs.Completed += _socketAsyncCompleted; clientSocket.ReceiveAsync(_socketAsyncArgs); } catch (Exception ex) { LogUtil.Error(ex); } }); //检测服务端 _checkServerTimer = new System.Timers.Timer(); _checkServerTimer.AutoReset = false; _checkServerTimer.Interval = 1000; _checkServerTimer.Elapsed += CheckServer; _checkServerTimer.Start(); LogUtil.Log("已连接服务器"); return true; } return true; } catch (Exception ex) { LogUtil.Error(ex, "连接服务器失败"); return false; } } #endregion #region 检测服务端 /// <summary> /// 检测服务端 /// </summary> private void CheckServer(object sender, System.Timers.ElapsedEventArgs e) { Task.Run(() => { try { DateTime now = DateTime.Now; if (now.Subtract(_lastHeartbeat).TotalSeconds > 60) { LogUtil.Log("服务端已失去连接"); try { if (clientSocket.Connected) clientSocket.Disconnect(false); clientSocket.Close(); clientSocket.Dispose(); _socketAsyncArgs.Completed -= _socketAsyncCompleted; _socketAsyncCompleted = null; _socketAsyncArgs.Dispose(); _socketAsyncArgs = null; } catch (Exception ex) { LogUtil.Error(ex); } Thread.Sleep(3000); int tryCount = 0; while (!ConnectServer() && tryCount++ < 10000) //重连 { Thread.Sleep(3000); } RegisterToServer(_roomNo, _devNo); //重新注册 } } catch (Exception ex) { LogUtil.Error(ex, "检测服务端出错"); } finally { _checkServerTimer.Start(); } }); } #endregion #region 断开服务器 /// <summary> /// 断开服务器 /// </summary> public void DisconnectServer() { try { if (clientSocket != null) { if (clientSocket.Connected) clientSocket.Disconnect(false); clientSocket.Close(); clientSocket.Dispose(); } LogUtil.Log("已断开服务器"); } catch (Exception ex) { LogUtil.Error(ex, "断开服务器失败"); } } #endregion #region 释放资源 /// <summary> /// 释放资源 /// </summary> public void Dispose() { if (heartbeatTimer != null) { heartbeatTimer.Stop(); heartbeatTimer.Close(); } if (_checkServerTimer != null) { _checkServerTimer.Stop(); _checkServerTimer.Close(); } } #endregion #region 心跳 public void StartHeartbeat() { heartbeatTimer = new System.Timers.Timer(); heartbeatTimer.AutoReset = false; heartbeatTimer.Interval = 10000; heartbeatTimer.Elapsed += new System.Timers.ElapsedEventHandler((obj, eea) => { lock (_lockSend) { try { byte[] bArrHeader = Encoding.ASCII.GetBytes("0XFF"); SocketHelper.Send(clientSocket, bArrHeader); SocketHelper.Send(clientSocket, new byte[] { 0x00 }); } catch (Exception ex) { LogUtil.Error("向服务器发送心跳包出错:" + ex.Message); } finally { heartbeatTimer.Start(); } } }); heartbeatTimer.Start(); } #endregion #region 停止心跳 public void StopHeartbeat() { heartbeatTimer.Stop(); } #endregion #region 注册 /// <summary> /// 注册 /// </summary> public bool RegisterToServer(string roomNo, string devNo) { _registerSuccess = false; SocketData data = new SocketData(); data.Type = 2; data.SocketRegisterData = new SocketRegisterData(); data.SocketRegisterData.RoomNo = roomNo; data.SocketRegisterData.DevNo = devNo; _roomNo = roomNo; _devNo = devNo; Send(data); DateTime dt = DateTime.Now; while (!_registerSuccess && DateTime.Now.Subtract(dt).TotalMilliseconds < 5000) { Thread.Sleep(100); } return _registerSuccess; } #endregion #region 接收数据 /// <summary> /// 处理接收的数据包 /// </summary> private void ReceiveData(Socket socket, SocketAsyncEventArgs e) { try { CopyTo(e.Buffer, _buffer, 0, e.BytesTransferred); #region 校验数据 if (_buffer.Count < 4) { if (socket.Connected) socket.ReceiveAsync(e); return; } else { byte[] bArrHeader = new byte[4]; CopyTo(_buffer, bArrHeader, 0, 0, bArrHeader.Length); string strHeader = Encoding.ASCII.GetString(bArrHeader); if (strHeader.ToUpper() == "0XFF") { if (_buffer.Count < 5) { if (socket.Connected) socket.ReceiveAsync(e); return; } else { byte[] bArrType = new byte[1]; CopyTo(_buffer, bArrType, 4, 0, bArrType.Length); if (bArrType[0] == 1 || bArrType[0] == 5) { } //心跳应答包、注册反馈包 else if (bArrType[0] == 3) //消息包 { if (_buffer.Count < 9) { if (socket.Connected) socket.ReceiveAsync(e); return; } else { byte[] bArrLength = new byte[4]; CopyTo(_buffer, bArrLength, 5, 0, bArrLength.Length); int dataLength = BitConverter.ToInt32(bArrLength, 0); if (dataLength == 0 || _buffer.Count < dataLength + 9) { if (socket.Connected) socket.ReceiveAsync(e); return; } } } else { LogUtil.Error("type错误,丢掉错误数据,重新接收"); _buffer.Clear(); //把错误的数据丢掉 if (socket.Connected) socket.ReceiveAsync(e); return; } } } else { LogUtil.Error("不是0XFF,丢掉错误数据,重新接收"); _buffer.Clear(); //把错误的数据丢掉 if (socket.Connected) socket.ReceiveAsync(e); return; } } #endregion SocketData data = null; do { data = ProcessSocketData(socket); } while (data != null); if (socket.Connected) socket.ReceiveAsync(e); } catch (Exception ex) { LogUtil.Error(ex, "处理接收的数据包 异常"); } } #endregion #region 处理接收的数据包 /// <summary> /// 处理接收的数据包 /// </summary> private SocketData ProcessSocketData(Socket socket) { int readLength = 0; SocketData data = ResolveBuffer(_buffer, out readLength); if (data != null) { if (readLength > 0) RemoveBufferData(readLength); if (data.Type == 1) //心跳应答 { _lastHeartbeat = DateTime.Now; LogUtil.Log("收到心跳应答包,服务端正常"); } if (data.Type == 3) //消息数据 { if (SocketReceivedEvent != null) { SocketReceivedEventArgs args = new SocketReceivedEventArgs(data.MsgContent); args.Callback = new CallbackSocket(socket); ThreadHelper.Run((obj) => { try { SocketReceivedEvent(this, obj as SocketReceivedEventArgs); } catch (Exception ex) { LogUtil.Error(ex); } }, args); } } if (data.Type == 5) //注册反馈 { _registerSuccess = true; LogUtil.Log("收到注册反馈包,注册成功"); } } return data; } #endregion #region ResolveBuffer /// <summary> /// 解析字节数组 /// </summary> private SocketData ResolveBuffer(List<byte> buffer, out int readLength) { SocketData socketData = null; readLength = 0; try { if (buffer.Count < 4) return null; byte[] bArrHeader = new byte[4]; CopyTo(buffer, bArrHeader, 0, 0, bArrHeader.Length); readLength += bArrHeader.Length; string strHeader = Encoding.ASCII.GetString(bArrHeader); if (strHeader.ToUpper() == "0XFF") { if (buffer.Count < 5) return null; byte[] bArrType = new byte[1]; CopyTo(buffer, bArrType, 4, 0, bArrType.Length); readLength += bArrType.Length; byte bType = bArrType[0]; socketData = new SocketData(); socketData.Type = bType; if (socketData.Type == 3) { if (buffer.Count < 9) return null; byte[] bArrLength = new byte[4]; CopyTo(buffer, bArrLength, 5, 0, bArrLength.Length); readLength += bArrLength.Length; int dataLength = BitConverter.ToInt32(bArrLength, 0); if (dataLength == 0 || buffer.Count < dataLength + 9) return null; byte[] dataBody = new byte[dataLength]; CopyTo(buffer, dataBody, 9, 0, dataBody.Length); readLength += dataBody.Length; string jsonString = Encoding.UTF8.GetString(dataBody); socketData.MsgContent = JsonConvert.DeserializeObject<WebApiMsgContent>(jsonString); } } else { LogUtil.Error("不是0XFF"); return null; } } catch (Exception ex) { LogUtil.Error(ex, "解析字节数组 出错"); return null; } return socketData; } #endregion #region CopyTo /// <summary> /// 数组复制 /// </summary> private void CopyTo(byte[] bArrSource, List<byte> listTarget, int sourceIndex, int length) { for (int i = 0; i < length; i++) { if (sourceIndex + i < bArrSource.Length) { listTarget.Add(bArrSource[sourceIndex + i]); } } } /// <summary> /// 数组复制 /// </summary> private void CopyTo(List<byte> listSource, byte[] bArrTarget, int sourceIndex, int targetIndex, int length) { for (int i = 0; i < length; i++) { if (targetIndex + i < bArrTarget.Length && sourceIndex + i < listSource.Count) { bArrTarget[targetIndex + i] = listSource[sourceIndex + i]; } } } #endregion #region Send /// <summary> /// Send /// </summary> public void Send(SocketData data) { Send(clientSocket, data); } /// <summary> /// Send /// </summary> public void Send(Socket socket, SocketData data) { lock (_lockSend) { byte[] bArrHeader = Encoding.ASCII.GetBytes("0XFF"); //发送header bool bl = SocketHelper.Send(socket, bArrHeader); if (data.Type == 0) { if (bl) bl = SocketHelper.Send(socket, new byte[] { 0x00 }); //发送type } else if (data.Type == 2) { if (bl) bl = SocketHelper.Send(socket, new byte[] { 0x02 }); //发送type if (data.SocketRegisterData != null) { byte[] bArrData = null; if (bl) bArrData = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(data.SocketRegisterData)); if (bl) bl = SocketHelper.Send(socket, BitConverter.GetBytes(bArrData.Length)); //发送length if (bl) bl = SocketHelper.Send(socket, bArrData); //发送body } } if (data.Type == 4) { if (bl) bl = SocketHelper.Send(socket, new byte[] { 0x04 }); //发送type if (data.SocketResult != null) { byte[] bArrData = null; if (bl) bArrData = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(data.SocketResult)); if (bl) bl = SocketHelper.Send(socket, BitConverter.GetBytes(bArrData.Length)); //发送length if (bl) bl = SocketHelper.Send(socket, bArrData); //发送body } } } } #endregion } }
SocketHelper代码(里面同步接收的方法Receive和ReceiveByte没有用到):
using Models; using Newtonsoft.Json; using System; using System.Collections.Generic; using System.Configuration; using System.Linq; using System.Net; using System.Net.Sockets; using System.Text; using System.Threading; using System.Threading.Tasks; namespace Utils { /// <summary> /// Socket封装 /// </summary> public static class SocketHelper { #region 变量 #endregion #region Send /// <summary> /// Send /// </summary> public static bool Send(Socket socket, byte[] data) { try { if (socket == null || !socket.Connected) return false; int sendTotal = 0; while (sendTotal < data.Length) { int sendLength = data.Length - sendTotal; if (sendLength > 1024) sendLength = 1024; int sendOnce = socket.Send(data, sendTotal, sendLength, SocketFlags.None); sendTotal += sendOnce; } return true; } catch (Exception ex) { LogUtil.Error(ex); return false; } } #endregion #region Receive /// <summary> /// Receive /// </summary> public static byte[] Receive(Socket socket, int length) { try { byte[] buffer = new byte[length]; int receiveCount = 0; while ((receiveCount = socket.Receive(buffer, 0, length, SocketFlags.None)) == 0) { Thread.Sleep(1); } while (receiveCount < length) { int revCount = socket.Receive(buffer, receiveCount, buffer.Length - receiveCount, SocketFlags.None); receiveCount += revCount; } return buffer; } catch (Exception ex) { return null; } } /// <summary> /// Receive /// </summary> public static byte? ReceiveByte(Socket socket) { try { byte[] buffer = new byte[1]; int receiveCount = 0; while ((receiveCount = socket.Receive(buffer, 0, 1, SocketFlags.None)) == 0) { Thread.Sleep(1); } return buffer[0]; } catch (Exception ex) { return null; } } #endregion #region IsZero /// <summary> /// IsZero /// </summary> public static bool IsZero(byte[] data) { bool bl = true; foreach (byte b in data) { if (b != 0) { return false; } } LogUtil.Error("接收的字节数组内容全是0"); return bl; } #endregion } }
代码中接收消息是异步接收,提高性能,发送消息是同步发送,主要是为了和Android端对接方便,Android端按我这种方式发就可以了。
下面是模拟500个客户端的程序代码下载链接:
由于网络、客户端可能不在线等原因,消息不一定能送达,所以为了保证消息送达,需要使用数据库,将发送失败的消息存入数据库,定时重发,发送成功或者超时2天则删除失败记录,下面是自己画的时序图,可能画的不太专业:
业务相关代码:
MsgUtil代码:
using Models; using Newtonsoft.Json; using *WebApi.Controllers.Common; using *WebApi.DAL; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.ComponentModel.DataAnnotations; using System.Configuration; using System.Linq; using System.Threading; using System.Timers; using System.Web; namespace Utils { /// <summary> /// Web API 消息工具类 /// </summary> public static class MsgUtil { #region 变量 private static WebApiMsgDal m_WebApiMsgDal = null; private static System.Timers.Timer _timer; private static SocketServerHelper _socketServerHelper; #endregion #region Init 初始化 /// <summary> /// 初始化 /// </summary> public static void Init() { ThreadHelper.Run(() => { m_WebApiMsgDal = ServiceHelper.Get<WebApiMsgDal>(); int port = int.Parse(ConfigurationManager.AppSettings["SocketServerPort"]); _socketServerHelper = new SocketServerHelper(port); _socketServerHelper.ReceivedSocketResultEvent += _socketServerHelper_ReceivedSocketResultEvent; _socketServerHelper.StartServer(); _timer = new System.Timers.Timer(); _timer.AutoReset = false; _timer.Interval = 40000; //注意,这个参数必须比Socket等待回调超时时间CallbackTimeout大 _timer.Elapsed += MsgTask; _timer.Start(); LogUtil.Log("Web API 消息工具类 初始化成功"); }, (ex) => { LogUtil.Error("Web API 消息工具类 初始化失败"); }); } #endregion #region 定时任务 /// <summary> /// 定时任务 /// </summary> private static void MsgTask(object sender, ElapsedEventArgs e) { ThreadHelper.Run(() => { try { m_WebApiMsgDal.DeleteTimeoutMsg(); //删除超时的消息 List<WEBAPI_MSG> list = m_WebApiMsgDal.GetMsgList(); foreach (WEBAPI_MSG msg in list) { WebApiMsgContent msgContent = JsonConvert.DeserializeObject<WebApiMsgContent>(msg.MSGCONTENT); msgContent.callbackId = msg.ID; Send(msgContent, msg.RECEIVER, msg.RECEIVER, null); } if (list.Count > 0) { LogUtil.Log("已重发" + list.Count.ToString() + "条消息"); } } catch (Exception ex) { LogUtil.Error(ex); } finally { _timer.Start(); } }); } #endregion #region 接收数据 /// <summary> /// 接收数据 /// </summary> private static void _socketServerHelper_ReceivedSocketResultEvent(object sender, ReceivedSocketResultEventArgs e) { Func<string, bool> func = (callbackId) => { try { if (m_WebApiMsgDal.Exists((string)callbackId)) { m_WebApiMsgDal.DeleteById((string)callbackId); } } catch (Exception ex) { LogUtil.Error(ex, "删除消息出错"); return false; } return true; }; int tryCount = 0; if (e.SocketResult != null) { while (!func(e.SocketResult.callbackId) && tryCount++ < 10) { Thread.Sleep(1000); } } } #endregion #region Send 发送消息 /// <summary> /// Send 发送消息 /// </summary> public static void Send(WebApiMsgContent msgContent, string roomNo, string devNo, Action<SocketResult> callback = null) { _socketServerHelper.Send(msgContent, roomNo, devNo, callback); } /// <summary> /// Send 发送消息 /// </summary> public static SocketResult Send(WebApiMsgContent msgContent, string roomNo, string devNo) { try { return _socketServerHelper.Send(msgContent, roomNo, devNo); } catch (Exception ex) { LogUtil.Error(ex, "发送消息失败"); return null; } } #endregion #region 释放资源 /// <summary> /// 释放资源 /// </summary> public static void Dispose() { ThreadHelper.Run(() => { _timer.Stop(); _timer.Elapsed -= MsgTask; _timer.Close(); _timer.Dispose(); _timer = null; _socketServerHelper.StopServer(); _socketServerHelper.ReceivedSocketResultEvent -= _socketServerHelper_ReceivedSocketResultEvent; LogUtil.Log("Web API 消息工具类 释放资源成功"); }, (ex) => { LogUtil.Error("Web API 消息工具类 释放资源失败"); }); } #endregion } }
Web API 接口 MsgController 代码:
using DBUtil; using Models; using Newtonsoft.Json; using *WebApi.DAL; using Swashbuckle.Swagger.Annotations; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.ComponentModel.DataAnnotations; using System.Globalization; using System.Linq; using System.Net; using System.Net.Http; using System.Web; using System.Web.Http; using Utils; namespace *WebApi.Controllers.Common { /// <summary> /// Web API 消息 /// </summary> [RoutePrefix("api/msg")] public class MsgController : ApiController { #region 变量属性 private WebApiMsgDal m_WebApiMsgDal = ServiceHelper.Get<WebApiMsgDal>(); private TwoCJsDal m_TwoCJsDal = ServiceHelper.Get<TwoCJsDal>(); private BackstageAppInstallDal m_BackstageAppInstallDal = ServiceHelper.Get<BackstageAppInstallDal>(); #endregion #region 发送消息 /// <summary> /// 发送消息 /// </summary> /// <param name="data">POST数据</param> [HttpPost] [Route("SendMsg")] [SwaggerResponse(HttpStatusCode.OK, "返回JSON", typeof(JsonResult<SendMsgData>))] public HttpResponseMessage SendMsg([FromBody] SendMsgData data) { JsonResult jsonResult = null; if (data == null || data.msgContent == null) { jsonResult = new JsonResult("请检查参数格式", ResultCode.参数不正确); return ApiHelper.ToJson(jsonResult); } if (data.roomNo != null && data.devNos != null) { jsonResult = new JsonResult("监室号和设备编码(指仓内屏或仓外屏的设备编码)不能都有值,应填写其中一个,或者都不填写", ResultCode.参数不正确); return ApiHelper.ToJson(jsonResult); } if (string.IsNullOrWhiteSpace(data.msgContent.msgTime)) data.msgContent.msgTime = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"); if (!string.IsNullOrWhiteSpace(data.devNos)) { try { foreach (string devNo in data.devNos.Split(‘,‘)) { data.msgContent.callbackId = Guid.NewGuid().ToString("N"); MsgUtil.Send(data.msgContent, null, devNo, (socketResult) => { if (socketResult == null || !socketResult.success) { WEBAPI_MSG info = new WEBAPI_MSG(); info.ID = Guid.NewGuid().ToString("N"); info.MSGTIME = DateTime.ParseExact(data.msgContent.msgTime, "yyyy-MM-dd HH:mm:ss", CultureInfo.InvariantCulture); info.RECEIVER = devNo; info.MSGCONTENT = JsonConvert.SerializeObject(data.msgContent); m_WebApiMsgDal.Insert(info); } }); } } catch (Exception ex) { LogUtil.Error(ex, "消息发送失败"); jsonResult = new JsonResult("消息发送失败", ResultCode.操作失败); return ApiHelper.ToJson(jsonResult); } } else { if (!string.IsNullOrWhiteSpace(data.roomNo)) { try { data.msgContent.callbackId = Guid.NewGuid().ToString("N"); MsgUtil.Send(data.msgContent, data.roomNo, null, (socketResult) => { if (socketResult == null || !socketResult.success) { WEBAPI_MSG info = new WEBAPI_MSG(); info.ID = Guid.NewGuid().ToString("N"); info.MSGTIME = DateTime.ParseExact(data.msgContent.msgTime, "yyyy-MM-dd HH:mm:ss", CultureInfo.InvariantCulture); info.RECEIVER = data.roomNo; info.MSGCONTENT = JsonConvert.SerializeObject(data.msgContent); m_WebApiMsgDal.Insert(info); } }); } catch (Exception ex) { LogUtil.Error(ex, "消息发送失败"); jsonResult = new JsonResult("消息发送失败", ResultCode.操作失败); return ApiHelper.ToJson(jsonResult); } } else { try { List<string> roomNoList = m_TwoCJsDal.GetRoomNoListAll(); foreach (string roomNo in roomNoList) { data.msgContent.callbackId = Guid.NewGuid().ToString("N"); MsgUtil.Send(data.msgContent, roomNo, null, (socketResult) => { if (socketResult == null || !socketResult.success) { WEBAPI_MSG info = new WEBAPI_MSG(); info.ID = Guid.NewGuid().ToString("N"); info.MSGTIME = DateTime.ParseExact(data.msgContent.msgTime, "yyyy-MM-dd HH:mm:ss", CultureInfo.InvariantCulture); info.RECEIVER = roomNo; info.MSGCONTENT = JsonConvert.SerializeObject(data.msgContent); m_WebApiMsgDal.Insert(info); } }); } } catch (Exception ex) { LogUtil.Error(ex, "消息发送失败"); jsonResult = new JsonResult("消息发送失败", ResultCode.操作失败); return ApiHelper.ToJson(jsonResult); } } } jsonResult = new JsonResult<CommonSubmitResult>(new CommonSubmitResult() { msg = "消息发送成功" }); return ApiHelper.ToJson(jsonResult); } #endregion #region APP安装消息反馈 /// <summary> /// APP安装消息反馈 /// </summary> /// <param name="data">POST数据</param> [HttpPost] [Route("InstallMsgFeedback")] [SwaggerResponse(HttpStatusCode.OK, "返回JSON", typeof(JsonResult<CommonSubmitResult>))] public HttpResponseMessage InstallMsgFeedback([FromBody] InstallMsgFeedbackData data) { JsonResult jsonResult = null; if (data == null) { jsonResult = new JsonResult("请检查参数格式", ResultCode.参数不正确); return ApiHelper.ToJson(jsonResult); } BACKSTAGE_APP_INSTALL info = m_BackstageAppInstallDal.Get(data.id); if (info != null) { if (data.success) { info.STATUS = "1"; m_BackstageAppInstallDal.Update(info); } jsonResult = new JsonResult<CommonSubmitResult>(new CommonSubmitResult() { msg = "反馈成功", id = info.ID }); } else { jsonResult = new JsonResult("反馈失败:安装记录不存在", ResultCode.操作失败); return ApiHelper.ToJson(jsonResult); } return ApiHelper.ToJson(jsonResult); } #endregion #region 发起点名成功反馈 /// <summary> /// 发起点名成功反馈 /// </summary> /// <param name="data">POST数据</param> [HttpPost] [Route("RollCallMsgFeedback")] [SwaggerResponse(HttpStatusCode.OK, "返回JSON", typeof(JsonResult<CommonSubmitResult>))] public HttpResponseMessage RollCallMsgFeedback([FromBody] RollCallMsgFeedbackData data) { JsonResult jsonResult = null; if (data == null) { jsonResult = new JsonResult("请检查参数格式", ResultCode.参数不正确); return ApiHelper.ToJson(jsonResult); } //TODO:此处尚未完成 jsonResult = new JsonResult<CommonSubmitResult>(new CommonSubmitResult() { msg = "反馈成功", id = null }); return ApiHelper.ToJson(jsonResult); } #endregion } #region SendMsgData 发送消息数据 /// <summary> /// 发送消息数据 /// </summary> [MyValidate] public class SendMsgData { /// <summary> /// 消息内容 /// </summary> [Required] public WebApiMsgContent msgContent { get; set; } /// <summary> /// 监室号(如果为空,并且devNos也为空,则发送到所有监室;如果为空,并且devNos不为空,则按devNos发送) /// </summary> public string roomNo { get; set; } /// <summary> /// 设备编码(逗号隔开)(仓内屏或仓外屏的设备编码) /// </summary> public string devNos { get; set; } } /// <summary> /// APP安装消息反馈 /// </summary> [MyValidate] public class InstallMsgFeedbackData { /// <summary> /// 安装记录ID /// </summary> [Required] public string id { get; set; } /// <summary> /// 安装是否成功 /// </summary> [Required] public bool success { get; set; } /// <summary> /// 安装失败原因 /// </summary> public string errorMsg { get; set; } } /// <summary> /// 发起点名成功反馈 /// </summary> [MyValidate] public class RollCallMsgFeedbackData { /// <summary> /// 点名ID /// </summary> [Required] public string id { get; set; } /// <summary> /// 发起点名是否成功 /// </summary> [Required] public bool success { get; set; } /// <summary> /// 发起点名失败原因 /// </summary> public string errorMsg { get; set; } } #endregion }
C# Socket,没有人比我的代码更简单明了了