C# .NET Socket SocketHelper 高性能 5000客户端 异步接收数据

    网上有很多Socket框架,但是我想,C#既然有Socket类,难道不是给人用的吗?

    写了一个SocketServerHelper和SocketClientHelper,分别只有5、6百行代码,比不上大神写的,和业务代码耦合也比较重,但对新手非常友好,容易看懂。

    支持返回值或回调,支持不定长度的数据包。客户端和服务端均支持断线重连。

    自己本机测试,5000个客户端并发发送消息正常,CPU压力有点大。由于局域网机子性能差,局域网只测试了500个客户端并发发送消息正常。

    短短1000多行代码,花了好多天心血,改了无数BUG,越写代码,越觉得自己资质平平,逻辑思维不够用。写Socket代码不像写一般的代码,实在不行加个try catch完事,这个东西既要稳定,又要性能,真的是每一个逻辑分支,每一个异常分支,都要想清楚,都要处理好,代码里我还是Exception用习惯了,没细分。

    有时候为了解决一个BUG,找了一整天,也找不出BUG在哪,现在终于测试难过了,达到了自己的预想。

    通过这几天的踩坑,测试,得出结论:

    1、Socket TCP 不会丢包,TCP是可靠的。(本机测试、局域网测试,可能没有遇到更恶劣的网络环境)

    2、Socket TCP 能够保证顺序,接收到的顺序和发送的顺序一致

    3、代码里有数据校验,但是错误的分支永远都不会走,校验是一定能通过的,不存在数据校验不通过,把错误的数据包简单丢弃的情况,否则说明代码写的还是有BUG

    以下是主要代码:

    SocketServerHelper代码:

C# .NET Socket SocketHelper 高性能 5000客户端 异步接收数据
using Models;
using Newtonsoft.Json;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Configuration;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Runtime.InteropServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace Utils
{
    /// <summary>
    /// Socket服务端帮助类
    /// </summary>
    public class SocketServerHelper
    {
        #region 变量
        private int _serverPort;
        private Socket serverSocket;
        private ConcurrentDictionary<ClientSocket, string> clientSocketList = new ConcurrentDictionary<ClientSocket, string>();
        private ConcurrentDictionary<string, ClientSocket> _dictRoomNoClientSocket = new ConcurrentDictionary<string, ClientSocket>();
        private ConcurrentDictionary<string, ClientSocket> _dictDevNoClientSocket = new ConcurrentDictionary<string, ClientSocket>();

        public int _CallbackTimeout = 20;
        /// <summary>
        /// 等待回调超时时间(单位:秒)
        /// </summary>
        public int CallbackTimeout
        {
            get { return _CallbackTimeout; }
            set { value = _CallbackTimeout; }
        }

        public int _WaitResultTimeout = 20;
        /// <summary>
        /// 等待返回结果超时时间(单位:秒)
        /// </summary>
        public int WaitResultTimeout
        {
            get { return _WaitResultTimeout; }
            set { value = _WaitResultTimeout; }
        }

        private object _lockSend = new object();

        public event EventHandler<ReceivedSocketResultEventArgs> ReceivedSocketResultEvent;

        private System.Timers.Timer _checkClientTimer;
        #endregion

        #region SocketServerHelper 构造函数
        public SocketServerHelper(int serverPort)
        {
            _serverPort = serverPort;
        }
        #endregion

        #region 启动服务
        /// <summary>
        /// 启动服务
        /// </summary>
        public bool StartServer()
        {
            try
            {
                IPEndPoint ipEndPoint = new IPEndPoint(IPAddress.Any, _serverPort);
                serverSocket = new Socket(ipEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
                serverSocket.Bind(ipEndPoint);
                serverSocket.Listen(5000);
                Thread thread = new Thread(new ThreadStart(delegate ()
                {
                    while (true)
                    {
                        Socket client = null;
                        ClientSocket clientSocket = null;

                        try
                        {
                            client = serverSocket.Accept();
                            client.SendTimeout = 20000;
                            client.ReceiveTimeout = 20000;
                            client.SendBufferSize = 10240;
                            client.ReceiveBufferSize = 10240;
                            clientSocket = new ClientSocket(client);
                            clientSocketList.TryAdd(clientSocket, null);
                            LogUtil.Log("监听到新的客户端,当前客户端数:" + clientSocketList.Count);
                        }
                        catch (Exception ex)
                        {
                            LogUtil.Error(ex);
                            Thread.Sleep(1);
                            continue;
                        }

                        if (client == null) continue;

                        Task.Run(() =>
                        {
                            try
                            {
                                byte[] buffer = new byte[10240];
                                SocketAsyncEventArgs args = new SocketAsyncEventArgs();
                                clientSocket.SocketAsyncArgs = args;
                                clientSocket.SocketAsyncCompleted = (s, e) =>
                                {
                                    ReceiveData(clientSocket, e);
                                };
                                args.SetBuffer(buffer, 0, buffer.Length);
                                args.Completed += clientSocket.SocketAsyncCompleted;
                                client.ReceiveAsync(args);
                            }
                            catch (Exception ex)
                            {
                                LogUtil.Error(ex);
                            }
                        });
                    }
                }));
                thread.IsBackground = true;
                thread.Start();

                //检测客户端
                _checkClientTimer = new System.Timers.Timer();
                _checkClientTimer.AutoReset = false;
                _checkClientTimer.Interval = 1000;
                _checkClientTimer.Elapsed += CheckClient;
                _checkClientTimer.Start();

                LogUtil.Log("服务已启动");
                return true;
            }
            catch (Exception ex)
            {
                LogUtil.Error(ex, "启动服务出错");
                return false;
            }
        }
        #endregion

        #region 检测客户端
        /// <summary>
        /// 检测客户端
        /// </summary>
        private void CheckClient(object sender, System.Timers.ElapsedEventArgs e)
        {
            Task.Run(() =>
            {
                try
                {
                    foreach (ClientSocket clientSkt in clientSocketList.Keys.ToArray())
                    {
                        Socket skt = clientSkt.Socket;
                        ClientSocket temp;
                        string strTemp;

                        DateTime now = DateTime.Now;
                        if (now.Subtract(clientSkt.LastHeartbeat).TotalSeconds > 60)
                        {
                            clientSocketList.TryRemove(clientSkt, out strTemp);
                            LogUtil.Log("客户端已失去连接,当前客户端数:" + clientSocketList.Count);
                            ActionUtil.TryDoAction(() => { if (skt.Connected) skt.Disconnect(false); });
                            ActionUtil.TryDoAction(() =>
                            {
                                skt.Close();
                                skt.Dispose();
                                if (clientSkt.SocketAsyncArgs != null)
                                {
                                    if (clientSkt.SocketAsyncCompleted != null)
                                    {
                                        clientSkt.SocketAsyncArgs.Completed -= clientSkt.SocketAsyncCompleted;
                                    }
                                    clientSkt.SocketAsyncArgs.Dispose();
                                }
                                clientSkt.SocketAsyncCompleted = null;
                                clientSkt.SocketAsyncArgs = null;
                            });
                        }
                    }

                }
                catch (Exception ex)
                {
                    LogUtil.Error(ex, "检测客户端出错");
                }
                finally
                {
                    _checkClientTimer.Start();
                }
            });
        }
        #endregion

        #region 接收数据
        /// <summary>
        /// 处理接收的数据包
        /// </summary>
        private void ReceiveData(ClientSocket clientSkt, SocketAsyncEventArgs e)
        {
            if (clientSkt == null) return;
            Socket skt = clientSkt.Socket;

            try
            {
                CopyTo(e.Buffer, clientSkt.Buffer, 0, e.BytesTransferred);

                #region 校验数据
                if (clientSkt.Buffer.Count < 4)
                {
                    if (skt.Connected) skt.ReceiveAsync(e);
                    return;
                }
                else
                {
                    byte[] bArrHeader = new byte[4];
                    CopyTo(clientSkt.Buffer, bArrHeader, 0, 0, bArrHeader.Length);
                    string strHeader = Encoding.ASCII.GetString(bArrHeader);
                    if (strHeader.ToUpper() == "0XFF")
                    {
                        if (clientSkt.Buffer.Count < 5)
                        {
                            if (skt.Connected) skt.ReceiveAsync(e);
                            return;
                        }
                        else
                        {
                            byte[] bArrType = new byte[1];
                            CopyTo(clientSkt.Buffer, bArrType, 4, 0, bArrType.Length);
                            if (bArrType[0] == 0) { } //心跳包
                            else if (bArrType[0] == 2 || bArrType[0] == 4) //注册包、返回值包
                            {
                                if (clientSkt.Buffer.Count < 9)
                                {
                                    if (skt.Connected) skt.ReceiveAsync(e);
                                    return;
                                }
                                else
                                {
                                    byte[] bArrLength = new byte[4];
                                    CopyTo(clientSkt.Buffer, bArrLength, 5, 0, bArrLength.Length);
                                    int dataLength = BitConverter.ToInt32(bArrLength, 0);
                                    if (dataLength == 0 || clientSkt.Buffer.Count < dataLength + 9)
                                    {
                                        if (skt.Connected) skt.ReceiveAsync(e);
                                        return;
                                    }
                                }
                            }
                            else
                            {
                                LogUtil.Error("type错误,丢掉错误数据,重新接收");
                                clientSkt.Buffer.Clear(); //把错误的数据丢掉
                                if (skt.Connected) skt.ReceiveAsync(e);
                                return;
                            }
                        }
                    }
                    else
                    {
                        LogUtil.Error("不是0XFF,丢掉错误数据,重新接收");
                        clientSkt.Buffer.Clear(); //把错误的数据丢掉
                        if (skt.Connected) skt.ReceiveAsync(e);
                        return;
                    }
                }
                #endregion

                SocketData data = null;
                do
                {
                    data = ProcessSocketData(clientSkt);
                } while (data != null);

                if (skt.Connected) skt.ReceiveAsync(e);
            }
            catch (Exception ex)
            {
                LogUtil.Error(ex, "处理接收的数据包 异常");
            }
        }
        #endregion

        #region 处理接收的数据包
        /// <summary>
        /// 处理接收的数据包
        /// </summary>
        private SocketData ProcessSocketData(ClientSocket clientSkt)
        {
            int readLength = 0;
            SocketData data = ResolveBuffer(clientSkt.Buffer, out readLength);
            if (data != null)
            {
                if (readLength > 0) clientSkt.RemoveBufferData(readLength);
                if (data.Type == 0) //收到心跳包
                {
                    clientSkt.LastHeartbeat = DateTime.Now;

                    //心跳应答
                    if (clientSkt.RoomNo != null || clientSkt.DevNo != null)
                    {
                        ThreadHelper.Run(() =>
                        {
                            lock (clientSkt.LockSend)
                            {
                                byte[] bArrHeader = Encoding.ASCII.GetBytes("0XFF");
                                SocketHelper.Send(clientSkt.Socket, bArrHeader);
                                SocketHelper.Send(clientSkt.Socket, new byte[] { 0x01 });
                            }
                        });
                    }
                    else
                    {
                        LogUtil.Log("没有注册信息");
                    }

                    LogUtil.Log("收到心跳包,客户端连接正常,roomNo=" + clientSkt.RoomNo + ",devNo=" + clientSkt.DevNo);
                }

                if (data.Type == 2) //收到注册包
                {
                    if (data.SocketRegisterData != null && clientSkt != null)
                    {
                        ClientSocket temp;
                        if (data.SocketRegisterData.RoomNo != null) _dictRoomNoClientSocket.TryRemove(data.SocketRegisterData.RoomNo, out temp);
                        if (data.SocketRegisterData.DevNo != null) _dictDevNoClientSocket.TryRemove(data.SocketRegisterData.DevNo, out temp);
                        clientSkt.RoomNo = data.SocketRegisterData.RoomNo;
                        clientSkt.DevNo = data.SocketRegisterData.DevNo;
                        if (data.SocketRegisterData.RoomNo != null) _dictRoomNoClientSocket.TryAdd(data.SocketRegisterData.RoomNo, clientSkt);
                        if (data.SocketRegisterData.DevNo != null) _dictDevNoClientSocket.TryAdd(data.SocketRegisterData.DevNo, clientSkt);
                        LogUtil.Log("收到注册包,roomNo=" + clientSkt.RoomNo + ",devNo=" + clientSkt.DevNo);

                        //注册反馈
                        ThreadHelper.Run(() =>
                        {
                            lock (clientSkt.LockSend)
                            {
                                byte[] bArrHeader = Encoding.ASCII.GetBytes("0XFF");
                                SocketHelper.Send(clientSkt.Socket, bArrHeader);
                                SocketHelper.Send(clientSkt.Socket, new byte[] { 0x05 });
                            }
                        });
                    }
                }

                if (data.Type == 4) //收到返回值包
                {
                    ThreadHelper.Run(() =>
                    {
                        if (data.SocketResult != null) clientSkt.CallbackDict.TryAdd(data.SocketResult.callbackId, data.SocketResult);

                        if (ReceivedSocketResultEvent != null)
                        {
                            ReceivedSocketResultEvent(null, new Models.ReceivedSocketResultEventArgs(data.SocketResult));
                        }
                    });
                    LogUtil.Log("收到返回值包,roomNo=" + clientSkt.RoomNo + ",devNo=" + clientSkt.DevNo);
                }
            }
            return data;
        }
        #endregion

        #region ResolveBuffer
        /// <summary>
        /// 解析字节数组
        /// </summary>
        private SocketData ResolveBuffer(List<byte> buffer, out int readLength)
        {
            SocketData socketData = null;
            readLength = 0;

            try
            {
                if (buffer.Count < 4) return null;
                byte[] bArrHeader = new byte[4];
                CopyTo(buffer, bArrHeader, 0, 0, bArrHeader.Length);
                readLength += bArrHeader.Length;
                string strHeader = Encoding.ASCII.GetString(bArrHeader);
                if (strHeader.ToUpper() == "0XFF")
                {
                    if (buffer.Count < 5) return null;
                    byte[] bArrType = new byte[1];
                    CopyTo(buffer, bArrType, 4, 0, bArrType.Length);
                    readLength += bArrType.Length;
                    byte bType = bArrType[0];
                    socketData = new SocketData();
                    socketData.Type = bType;

                    if (socketData.Type == 2)
                    {
                        if (buffer.Count < 9) return null;
                        byte[] bArrLength = new byte[4];
                        CopyTo(buffer, bArrLength, 5, 0, bArrLength.Length);
                        readLength += bArrLength.Length;
                        int dataLength = BitConverter.ToInt32(bArrLength, 0);

                        if (dataLength == 0 || buffer.Count < dataLength + 9) return null;
                        byte[] dataBody = new byte[dataLength];
                        CopyTo(buffer, dataBody, 9, 0, dataBody.Length);
                        readLength += dataBody.Length;
                        string jsonString = Encoding.UTF8.GetString(dataBody);
                        socketData.SocketRegisterData = JsonConvert.DeserializeObject<SocketRegisterData>(jsonString);
                    }

                    if (socketData.Type == 4)
                    {
                        if (buffer.Count < 9) return null;
                        byte[] bArrLength = new byte[4];
                        CopyTo(buffer, bArrLength, 5, 0, bArrLength.Length);
                        readLength += bArrLength.Length;
                        int dataLength = BitConverter.ToInt32(bArrLength, 0);

                        if (dataLength == 0 || buffer.Count < dataLength + 9) return null;
                        byte[] dataBody = new byte[dataLength];
                        CopyTo(buffer, dataBody, 9, 0, dataBody.Length);
                        readLength += dataBody.Length;
                        string jsonString = Encoding.UTF8.GetString(dataBody);
                        socketData.SocketResult = JsonConvert.DeserializeObject<SocketResult>(jsonString);
                    }
                }
                else
                {
                    LogUtil.Error("不是0XFF");
                    return null;
                }

            }
            catch (Exception ex)
            {
                LogUtil.Error(ex, "解析字节数组 出错");
                return null;
            }

            return socketData;
        }
        #endregion

        #region CopyTo
        /// <summary>
        /// 数组复制
        /// </summary>
        private void CopyTo(byte[] bArrSource, List<byte> listTarget, int sourceIndex, int length)
        {
            for (int i = 0; i < length; i++)
            {
                if (sourceIndex + i < bArrSource.Length)
                {
                    listTarget.Add(bArrSource[sourceIndex + i]);
                }
            }
        }

        /// <summary>
        /// 数组复制
        /// </summary>
        private void CopyTo(List<byte> listSource, byte[] bArrTarget, int sourceIndex, int targetIndex, int length)
        {
            for (int i = 0; i < length; i++)
            {
                if (targetIndex + i < bArrTarget.Length && sourceIndex + i < listSource.Count)
                {
                    bArrTarget[targetIndex + i] = listSource[sourceIndex + i];
                }
            }
        }
        #endregion

        #region 停止服务
        /// <summary>
        /// 停止服务
        /// </summary>
        public void StopServer()
        {
            try
            {
                foreach (ClientSocket clientSocket in clientSocketList.Keys.ToArray())
                {
                    Socket socket = clientSocket.Socket;
                    ActionUtil.TryDoAction(() => { if (socket.Connected) socket.Disconnect(false); });
                    ActionUtil.TryDoAction(() =>
                    {
                        socket.Close();
                        socket.Dispose();
                    });
                }
                clientSocketList.Clear();
                _dictDevNoClientSocket.Clear();
                _dictRoomNoClientSocket.Clear();
                if (serverSocket != null)
                {
                    ActionUtil.TryDoAction(() => { if (serverSocket.Connected) serverSocket.Disconnect(false); });
                    ActionUtil.TryDoAction(() =>
                    {
                        serverSocket.Close();
                        serverSocket.Dispose();
                    });
                }
                LogUtil.Log("服务已停止");
            }
            catch (Exception ex)
            {
                LogUtil.Error(ex, "停止服务出错");
            }
        }
        #endregion

        #region 释放资源
        /// <summary>
        /// 释放资源
        /// </summary>
        public void Dispose()
        {
            if (_checkClientTimer != null)
            {
                _checkClientTimer.Stop();
                _checkClientTimer.Close();
            }
        }
        #endregion

        #region Send
        /// <summary>
        /// Send 单个发送 并等待结果
        /// </summary>
        /// <returns>false:发送失败 true:发送成功,但接收端是否处理成功要等待返回结果</returns>
        public SocketResult Send(WebApiMsgContent msgContent, string roomNo, string devNo)
        {
            SocketData data = new SocketData();
            data.Type = 3;
            data.MsgContent = msgContent;

            ClientSocket clientSocket = null;
            if (roomNo != null) _dictRoomNoClientSocket.TryGetValue(roomNo, out clientSocket);
            if (devNo != null) _dictDevNoClientSocket.TryGetValue(devNo, out clientSocket);

            if (clientSocket != null)
            {
                if (string.IsNullOrWhiteSpace(msgContent.callbackId))
                {
                    msgContent.callbackId = Guid.NewGuid().ToString("N");
                }

                Send(clientSocket, data);
                return WaitSocketResult(clientSocket, msgContent.callbackId);
            }
            else
            {
                SocketResult socketResult = new SocketResult();
                socketResult.success = false;
                socketResult.errorMsg = "客户端不存在";
                return socketResult;
            }
        }

        /// <summary>
        /// Send 单个发送
        /// </summary>
        /// <returns>false:发送失败 true:发送成功,但接收端是否处理成功要等待返回结果</returns>
        public void Send(WebApiMsgContent msgContent, string roomNo, string devNo, Action<SocketResult> callback = null)
        {
            SocketData data = new SocketData();
            data.Type = 3;
            data.MsgContent = msgContent;

            ClientSocket clientSocket = null;
            if (roomNo != null) _dictRoomNoClientSocket.TryGetValue(roomNo, out clientSocket);
            if (devNo != null) _dictDevNoClientSocket.TryGetValue(devNo, out clientSocket);

            if (clientSocket != null)
            {
                if (string.IsNullOrWhiteSpace(msgContent.callbackId))
                {
                    msgContent.callbackId = Guid.NewGuid().ToString("N");
                }

                if (callback != null)
                {
                    WaitCallback(clientSocket, msgContent.callbackId, callback);
                }

                Send(clientSocket, data);
            }
            else
            {
                SocketResult socketResult = new SocketResult();
                socketResult.success = false;
                socketResult.errorMsg = "客户端不存在";
                if (callback != null) callback(socketResult);
            }
        }

        /// <summary>
        /// 等待回调
        /// </summary>
        private void WaitCallback(ClientSocket clientSocket, string callbackId, Action<SocketResult> callback = null)
        {
            DateTime dt = DateTime.Now.AddSeconds(_CallbackTimeout);
            System.Timers.Timer timer = new System.Timers.Timer();
            timer.AutoReset = false;
            timer.Interval = 100;
            timer.Elapsed += (s, e) =>
            {
                try
                {
                    SocketResult socketResult;
                    if (!clientSocket.CallbackDict.TryGetValue(callbackId, out socketResult) && DateTime.Now < dt)
                    {
                        timer.Start();
                        return;
                    }
                    SocketResult sktResult;
                    clientSocket.CallbackDict.TryRemove(callbackId, out sktResult);
                    if (socketResult == null)
                    {
                        socketResult = new SocketResult();
                        socketResult.success = false;
                        socketResult.errorMsg = "超时";
                    }

                    if (callback != null) callback(socketResult);

                    timer.Close();
                }
                catch (Exception ex)
                {
                    LogUtil.Error("WaitCallback error" + ex);
                }
            };
            timer.Start();
        }

        /// <summary>
        /// 等待SocketResult
        /// </summary>
        private SocketResult WaitSocketResult(ClientSocket clientSocket, string callbackId)
        {
            SocketResult socketResult;
            DateTime dt = DateTime.Now.AddSeconds(_WaitResultTimeout);
            while (!clientSocket.CallbackDict.TryGetValue(callbackId, out socketResult) && DateTime.Now < dt)
            {
                Thread.Sleep(10);
            }
            SocketResult sktResult;
            clientSocket.CallbackDict.TryRemove(callbackId, out sktResult);
            if (socketResult == null)
            {
                socketResult = new SocketResult();
                socketResult.success = false;
                socketResult.errorMsg = "超时";
            }
            return socketResult;
        }

        /// <summary>
        /// Send
        /// </summary>
        /// <returns>false:发送失败 true:发送成功,但不表示对方已收到</returns>
        private void Send(ClientSocket clientSocket, SocketData data)
        {
            bool bl = false;
            Socket socket = clientSocket.Socket;

            lock (clientSocket.LockSend)
            {
                byte[] bArrHeader = Encoding.ASCII.GetBytes("0XFF"); //发送header
                bl = SocketHelper.Send(socket, bArrHeader);

                if (data.Type == 1)
                {
                    if (bl) bl = SocketHelper.Send(socket, new byte[] { 0x01 }); //发送type
                }

                if (data.Type == 3)
                {
                    if (bl) bl = SocketHelper.Send(socket, new byte[] { 0x03 }); //发送type

                    if (data.MsgContent != null)
                    {
                        byte[] bArrData = null;
                        if (bl) bArrData = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(data.MsgContent));
                        if (bl) bl = SocketHelper.Send(socket, BitConverter.GetBytes(bArrData.Length)); //发送length
                        if (bl) bl = SocketHelper.Send(socket, bArrData); //发送body
                    }
                }
            }
        }
        #endregion

    }

}
View Code

    SocketClientHelper代码:

C# .NET Socket SocketHelper 高性能 5000客户端 异步接收数据
using Models;
using Newtonsoft.Json;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Configuration;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace Utils
{
    /// <summary>
    /// Socket客户端帮助类
    /// </summary>
    public class SocketClientHelper
    {
        #region 变量
        private string _serverIP;
        private int _serverPort;
        private object _lockSend = new object();
        private Socket clientSocket;
        private SocketAsyncEventArgs _socketAsyncArgs;
        public EventHandler<SocketAsyncEventArgs> _socketAsyncCompleted { get; set; }
        private System.Timers.Timer heartbeatTimer;
        public event EventHandler<SocketReceivedEventArgs> SocketReceivedEvent;
        private System.Timers.Timer _checkServerTimer;
        private DateTime _lastHeartbeat;
        private List<byte> _buffer = new List<byte>();
        private string _roomNo;
        private string _devNo;
        private bool _registerSuccess = false;

        public string RoomNo
        {
            get { return _roomNo; }
        }

        public string DevNo
        {
            get { return _devNo; }
        }

        /// <summary>
        /// 删除接收到的一个包
        /// </summary>
        private void RemoveBufferData(int count)
        {
            for (int i = 0; i < count; i++)
            {
                if (_buffer.Count > 0)
                {
                    _buffer.RemoveAt(0);
                }
            }
        }
        #endregion

        #region SocketClientHelper 构造函数
        public SocketClientHelper(string serverIP, int serverPort)
        {
            _serverIP = serverIP;
            _serverPort = serverPort;
        }
        #endregion

        #region 连接服务器
        /// <summary>
        /// 连接服务器
        /// </summary>
        public bool ConnectServer()
        {
            try
            {
                if (clientSocket == null || !clientSocket.Connected)
                {
                    if (clientSocket != null)
                    {
                        clientSocket.Close();
                        clientSocket.Dispose();
                    }
                    string ip = ConfigurationManager.AppSettings["ServerIP"];
                    string hostName = ConfigurationManager.AppSettings["HostName"];
                    int port = Convert.ToInt32(ConfigurationManager.AppSettings["ServerPort"]);
                    IPEndPoint ipep = null;
                    if (hostName != null)
                    {
                        IPHostEntry host = Dns.GetHostEntry(hostName);
                        IPAddress ipAddr = host.AddressList[0];
                        ipep = new IPEndPoint(ipAddr, port);
                    }
                    else
                    {
                        ipep = new IPEndPoint(IPAddress.Parse(ip), port);
                    }
                    clientSocket = new Socket(ipep.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
                    clientSocket.SendTimeout = 20000;
                    clientSocket.ReceiveTimeout = 20000;
                    clientSocket.SendBufferSize = 10240;
                    clientSocket.ReceiveBufferSize = 10240;

                    try
                    {
                        clientSocket.Connect(ipep);
                    }
                    catch (Exception ex)
                    {
                        LogUtil.Error(ex);
                        return false;
                    }

                    if (clientSocket == null || !clientSocket.Connected) return false;

                    _lastHeartbeat = DateTime.Now;

                    Task.Run(() =>
                    {
                        try
                        {
                            byte[] buffer = new byte[10240];
                            _socketAsyncArgs = new SocketAsyncEventArgs();
                            _socketAsyncArgs.SetBuffer(buffer, 0, buffer.Length);
                            _socketAsyncCompleted = (s, e) =>
                            {
                                ReceiveData(clientSocket, e);
                            };
                            _socketAsyncArgs.Completed += _socketAsyncCompleted;
                            clientSocket.ReceiveAsync(_socketAsyncArgs);
                        }
                        catch (Exception ex)
                        {
                            LogUtil.Error(ex);
                        }
                    });

                    //检测服务端
                    _checkServerTimer = new System.Timers.Timer();
                    _checkServerTimer.AutoReset = false;
                    _checkServerTimer.Interval = 1000;
                    _checkServerTimer.Elapsed += CheckServer;
                    _checkServerTimer.Start();

                    LogUtil.Log("已连接服务器");
                    return true;
                }
                return true;
            }
            catch (Exception ex)
            {
                LogUtil.Error(ex, "连接服务器失败");
                return false;
            }
        }
        #endregion

        #region 检测服务端
        /// <summary>
        /// 检测服务端
        /// </summary>
        private void CheckServer(object sender, System.Timers.ElapsedEventArgs e)
        {
            Task.Run(() =>
            {
                try
                {
                    DateTime now = DateTime.Now;
                    if (now.Subtract(_lastHeartbeat).TotalSeconds > 60)
                    {
                        LogUtil.Log("服务端已失去连接");
                        try
                        {
                            if (clientSocket.Connected) clientSocket.Disconnect(false);
                            clientSocket.Close();
                            clientSocket.Dispose();
                            _socketAsyncArgs.Completed -= _socketAsyncCompleted;
                            _socketAsyncCompleted = null;
                            _socketAsyncArgs.Dispose();
                            _socketAsyncArgs = null;
                        }
                        catch (Exception ex)
                        {
                            LogUtil.Error(ex);
                        }

                        Thread.Sleep(3000);
                        int tryCount = 0;
                        while (!ConnectServer() && tryCount++ < 10000) //重连
                        {
                            Thread.Sleep(3000);
                        }
                        RegisterToServer(_roomNo, _devNo); //重新注册
                    }
                }
                catch (Exception ex)
                {
                    LogUtil.Error(ex, "检测服务端出错");
                }
                finally
                {
                    _checkServerTimer.Start();
                }
            });
        }
        #endregion

        #region 断开服务器
        /// <summary>
        /// 断开服务器
        /// </summary>
        public void DisconnectServer()
        {
            try
            {
                if (clientSocket != null)
                {
                    if (clientSocket.Connected) clientSocket.Disconnect(false);
                    clientSocket.Close();
                    clientSocket.Dispose();
                }
                LogUtil.Log("已断开服务器");
            }
            catch (Exception ex)
            {
                LogUtil.Error(ex, "断开服务器失败");
            }
        }
        #endregion

        #region 释放资源 
        /// <summary>
        /// 释放资源
        /// </summary>
        public void Dispose()
        {
            if (heartbeatTimer != null)
            {
                heartbeatTimer.Stop();
                heartbeatTimer.Close();
            }
            if (_checkServerTimer != null)
            {
                _checkServerTimer.Stop();
                _checkServerTimer.Close();
            }
        }
        #endregion

        #region 心跳
        public void StartHeartbeat()
        {
            heartbeatTimer = new System.Timers.Timer();
            heartbeatTimer.AutoReset = false;
            heartbeatTimer.Interval = 10000;
            heartbeatTimer.Elapsed += new System.Timers.ElapsedEventHandler((obj, eea) =>
            {
                lock (_lockSend)
                {
                    try
                    {
                        byte[] bArrHeader = Encoding.ASCII.GetBytes("0XFF");
                        SocketHelper.Send(clientSocket, bArrHeader);
                        SocketHelper.Send(clientSocket, new byte[] { 0x00 });
                    }
                    catch (Exception ex)
                    {
                        LogUtil.Error("向服务器发送心跳包出错:" + ex.Message);
                    }
                    finally
                    {
                        heartbeatTimer.Start();
                    }
                }
            });
            heartbeatTimer.Start();
        }
        #endregion

        #region 停止心跳
        public void StopHeartbeat()
        {
            heartbeatTimer.Stop();
        }
        #endregion

        #region 注册
        /// <summary>
        /// 注册
        /// </summary>
        public bool RegisterToServer(string roomNo, string devNo)
        {
            _registerSuccess = false;
            SocketData data = new SocketData();
            data.Type = 2;
            data.SocketRegisterData = new SocketRegisterData();
            data.SocketRegisterData.RoomNo = roomNo;
            data.SocketRegisterData.DevNo = devNo;
            _roomNo = roomNo;
            _devNo = devNo;
            Send(data);

            DateTime dt = DateTime.Now;
            while (!_registerSuccess && DateTime.Now.Subtract(dt).TotalMilliseconds < 5000)
            {
                Thread.Sleep(100);
            }
            return _registerSuccess;
        }
        #endregion

        #region 接收数据
        /// <summary>
        /// 处理接收的数据包
        /// </summary>
        private void ReceiveData(Socket socket, SocketAsyncEventArgs e)
        {
            try
            {
                CopyTo(e.Buffer, _buffer, 0, e.BytesTransferred);

                #region 校验数据
                if (_buffer.Count < 4)
                {
                    if (socket.Connected) socket.ReceiveAsync(e);
                    return;
                }
                else
                {
                    byte[] bArrHeader = new byte[4];
                    CopyTo(_buffer, bArrHeader, 0, 0, bArrHeader.Length);
                    string strHeader = Encoding.ASCII.GetString(bArrHeader);
                    if (strHeader.ToUpper() == "0XFF")
                    {
                        if (_buffer.Count < 5)
                        {
                            if (socket.Connected) socket.ReceiveAsync(e);
                            return;
                        }
                        else
                        {
                            byte[] bArrType = new byte[1];
                            CopyTo(_buffer, bArrType, 4, 0, bArrType.Length);
                            if (bArrType[0] == 1 || bArrType[0] == 5) { } //心跳应答包、注册反馈包
                            else if (bArrType[0] == 3) //消息包
                            {
                                if (_buffer.Count < 9)
                                {
                                    if (socket.Connected) socket.ReceiveAsync(e);
                                    return;
                                }
                                else
                                {
                                    byte[] bArrLength = new byte[4];
                                    CopyTo(_buffer, bArrLength, 5, 0, bArrLength.Length);
                                    int dataLength = BitConverter.ToInt32(bArrLength, 0);
                                    if (dataLength == 0 || _buffer.Count < dataLength + 9)
                                    {
                                        if (socket.Connected) socket.ReceiveAsync(e);
                                        return;
                                    }
                                }
                            }
                            else
                            {
                                LogUtil.Error("type错误,丢掉错误数据,重新接收");
                                _buffer.Clear(); //把错误的数据丢掉
                                if (socket.Connected) socket.ReceiveAsync(e);
                                return;
                            }
                        }
                    }
                    else
                    {
                        LogUtil.Error("不是0XFF,丢掉错误数据,重新接收");
                        _buffer.Clear(); //把错误的数据丢掉
                        if (socket.Connected) socket.ReceiveAsync(e);
                        return;
                    }
                }
                #endregion

                SocketData data = null;
                do
                {
                    data = ProcessSocketData(socket);
                } while (data != null);

                if (socket.Connected) socket.ReceiveAsync(e);
            }
            catch (Exception ex)
            {
                LogUtil.Error(ex, "处理接收的数据包 异常");
            }
        }
        #endregion

        #region 处理接收的数据包
        /// <summary>
        /// 处理接收的数据包
        /// </summary>
        private SocketData ProcessSocketData(Socket socket)
        {
            int readLength = 0;
            SocketData data = ResolveBuffer(_buffer, out readLength);
            if (data != null)
            {
                if (readLength > 0) RemoveBufferData(readLength);
                if (data.Type == 1) //心跳应答
                {
                    _lastHeartbeat = DateTime.Now;
                    LogUtil.Log("收到心跳应答包,服务端正常");
                }

                if (data.Type == 3) //消息数据
                {
                    if (SocketReceivedEvent != null)
                    {
                        SocketReceivedEventArgs args = new SocketReceivedEventArgs(data.MsgContent);
                        args.Callback = new CallbackSocket(socket);
                        ThreadHelper.Run((obj) =>
                        {
                            try
                            {
                                SocketReceivedEvent(this, obj as SocketReceivedEventArgs);
                            }
                            catch (Exception ex)
                            {
                                LogUtil.Error(ex);
                            }
                        }, args);
                    }
                }

                if (data.Type == 5) //注册反馈
                {
                    _registerSuccess = true;
                    LogUtil.Log("收到注册反馈包,注册成功");
                }
            }
            return data;
        }
        #endregion

        #region ResolveBuffer
        /// <summary>
        /// 解析字节数组
        /// </summary>
        private SocketData ResolveBuffer(List<byte> buffer, out int readLength)
        {
            SocketData socketData = null;
            readLength = 0;

            try
            {
                if (buffer.Count < 4) return null;
                byte[] bArrHeader = new byte[4];
                CopyTo(buffer, bArrHeader, 0, 0, bArrHeader.Length);
                readLength += bArrHeader.Length;
                string strHeader = Encoding.ASCII.GetString(bArrHeader);
                if (strHeader.ToUpper() == "0XFF")
                {
                    if (buffer.Count < 5) return null;
                    byte[] bArrType = new byte[1];
                    CopyTo(buffer, bArrType, 4, 0, bArrType.Length);
                    readLength += bArrType.Length;
                    byte bType = bArrType[0];
                    socketData = new SocketData();
                    socketData.Type = bType;

                    if (socketData.Type == 3)
                    {
                        if (buffer.Count < 9) return null;
                        byte[] bArrLength = new byte[4];
                        CopyTo(buffer, bArrLength, 5, 0, bArrLength.Length);
                        readLength += bArrLength.Length;
                        int dataLength = BitConverter.ToInt32(bArrLength, 0);

                        if (dataLength == 0 || buffer.Count < dataLength + 9) return null;
                        byte[] dataBody = new byte[dataLength];
                        CopyTo(buffer, dataBody, 9, 0, dataBody.Length);
                        readLength += dataBody.Length;
                        string jsonString = Encoding.UTF8.GetString(dataBody);
                        socketData.MsgContent = JsonConvert.DeserializeObject<WebApiMsgContent>(jsonString);
                    }
                }
                else
                {
                    LogUtil.Error("不是0XFF");
                    return null;
                }
            }
            catch (Exception ex)
            {
                LogUtil.Error(ex, "解析字节数组 出错");
                return null;
            }

            return socketData;
        }
        #endregion

        #region CopyTo
        /// <summary>
        /// 数组复制
        /// </summary>
        private void CopyTo(byte[] bArrSource, List<byte> listTarget, int sourceIndex, int length)
        {
            for (int i = 0; i < length; i++)
            {
                if (sourceIndex + i < bArrSource.Length)
                {
                    listTarget.Add(bArrSource[sourceIndex + i]);
                }
            }
        }

        /// <summary>
        /// 数组复制
        /// </summary>
        private void CopyTo(List<byte> listSource, byte[] bArrTarget, int sourceIndex, int targetIndex, int length)
        {
            for (int i = 0; i < length; i++)
            {
                if (targetIndex + i < bArrTarget.Length && sourceIndex + i < listSource.Count)
                {
                    bArrTarget[targetIndex + i] = listSource[sourceIndex + i];
                }
            }
        }
        #endregion

        #region Send
        /// <summary>
        /// Send
        /// </summary>
        public void Send(SocketData data)
        {
            Send(clientSocket, data);
        }

        /// <summary>
        /// Send
        /// </summary>
        public void Send(Socket socket, SocketData data)
        {
            lock (_lockSend)
            {
                byte[] bArrHeader = Encoding.ASCII.GetBytes("0XFF"); //发送header
                bool bl = SocketHelper.Send(socket, bArrHeader);

                if (data.Type == 0)
                {
                    if (bl) bl = SocketHelper.Send(socket, new byte[] { 0x00 }); //发送type
                }

                else if (data.Type == 2)
                {
                    if (bl) bl = SocketHelper.Send(socket, new byte[] { 0x02 }); //发送type

                    if (data.SocketRegisterData != null)
                    {
                        byte[] bArrData = null;
                        if (bl) bArrData = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(data.SocketRegisterData));
                        if (bl) bl = SocketHelper.Send(socket, BitConverter.GetBytes(bArrData.Length)); //发送length
                        if (bl) bl = SocketHelper.Send(socket, bArrData); //发送body
                    }
                }

                if (data.Type == 4)
                {
                    if (bl) bl = SocketHelper.Send(socket, new byte[] { 0x04 }); //发送type

                    if (data.SocketResult != null)
                    {
                        byte[] bArrData = null;
                        if (bl) bArrData = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(data.SocketResult));
                        if (bl) bl = SocketHelper.Send(socket, BitConverter.GetBytes(bArrData.Length)); //发送length
                        if (bl) bl = SocketHelper.Send(socket, bArrData); //发送body
                    }
                }
            }
        }
        #endregion

    }
}
View Code

    SocketHelper代码(里面同步接收的方法Receive和ReceiveByte没有用到):

C# .NET Socket SocketHelper 高性能 5000客户端 异步接收数据
using Models;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Configuration;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace Utils
{
    /// <summary>
    /// Socket封装
    /// </summary>
    public static class SocketHelper
    {
        #region 变量
        #endregion

        #region Send
        /// <summary>
        /// Send
        /// </summary>
        public static bool Send(Socket socket, byte[] data)
        {
            try
            {
                if (socket == null || !socket.Connected) return false;

                int sendTotal = 0;
                while (sendTotal < data.Length)
                {
                    int sendLength = data.Length - sendTotal;
                    if (sendLength > 1024) sendLength = 1024;
                    int sendOnce = socket.Send(data, sendTotal, sendLength, SocketFlags.None);
                    sendTotal += sendOnce;
                }
                return true;
            }
            catch (Exception ex)
            {
                LogUtil.Error(ex);
                return false;
            }
        }
        #endregion

        #region Receive
        /// <summary>
        /// Receive
        /// </summary>
        public static byte[] Receive(Socket socket, int length)
        {
            try
            {
                byte[] buffer = new byte[length];
                int receiveCount = 0;
                while ((receiveCount = socket.Receive(buffer, 0, length, SocketFlags.None)) == 0)
                {
                    Thread.Sleep(1);
                }
                while (receiveCount < length)
                {
                    int revCount = socket.Receive(buffer, receiveCount, buffer.Length - receiveCount, SocketFlags.None);
                    receiveCount += revCount;
                }
                return buffer;
            }
            catch (Exception ex)
            {
                return null;
            }
        }

        /// <summary>
        /// Receive
        /// </summary>
        public static byte? ReceiveByte(Socket socket)
        {
            try
            {
                byte[] buffer = new byte[1];
                int receiveCount = 0;
                while ((receiveCount = socket.Receive(buffer, 0, 1, SocketFlags.None)) == 0)
                {
                    Thread.Sleep(1);
                }
                return buffer[0];
            }
            catch (Exception ex)
            {
                return null;
            }
        }
        #endregion

        #region IsZero
        /// <summary>
        /// IsZero
        /// </summary>
        public static bool IsZero(byte[] data)
        {
            bool bl = true;
            foreach (byte b in data)
            {
                if (b != 0)
                {
                    return false;
                }
            }
            LogUtil.Error("接收的字节数组内容全是0");
            return bl;
        }
        #endregion

    }

}
View Code

    代码中接收消息是异步接收,提高性能,发送消息是同步发送,主要是为了和Android端对接方便,Android端按我这种方式发就可以了。

    下面是模拟500个客户端的程序代码下载链接:

    SocketHelper批量客户端代码

     由于网络、客户端可能不在线等原因,消息不一定能送达,所以为了保证消息送达,需要使用数据库,将发送失败的消息存入数据库,定时重发,发送成功或者超时2天则删除失败记录,下面是自己画的时序图,可能画的不太专业:

C# .NET Socket SocketHelper 高性能 5000客户端 异步接收数据

    业务相关代码:

    MsgUtil代码:

C# .NET Socket SocketHelper 高性能 5000客户端 异步接收数据
using Models;
using Newtonsoft.Json;
using *WebApi.Controllers.Common;
using *WebApi.DAL;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.ComponentModel.DataAnnotations;
using System.Configuration;
using System.Linq;
using System.Threading;
using System.Timers;
using System.Web;

namespace Utils
{
    /// <summary>
    /// Web API 消息工具类
    /// </summary>
    public static class MsgUtil
    {
        #region 变量
        private static WebApiMsgDal m_WebApiMsgDal = null;
        private static System.Timers.Timer _timer;
        private static SocketServerHelper _socketServerHelper;
        #endregion

        #region Init 初始化
        /// <summary>
        /// 初始化
        /// </summary>
        public static void Init()
        {
            ThreadHelper.Run(() =>
            {
                m_WebApiMsgDal = ServiceHelper.Get<WebApiMsgDal>();
                int port = int.Parse(ConfigurationManager.AppSettings["SocketServerPort"]);
                _socketServerHelper = new SocketServerHelper(port);
                _socketServerHelper.ReceivedSocketResultEvent += _socketServerHelper_ReceivedSocketResultEvent;
                _socketServerHelper.StartServer();

                _timer = new System.Timers.Timer();
                _timer.AutoReset = false;
                _timer.Interval = 40000; //注意,这个参数必须比Socket等待回调超时时间CallbackTimeout大
                _timer.Elapsed += MsgTask;
                _timer.Start();

                LogUtil.Log("Web API 消息工具类 初始化成功");
            }, (ex) =>
            {
                LogUtil.Error("Web API 消息工具类 初始化失败");
            });
        }
        #endregion

        #region 定时任务
        /// <summary>
        /// 定时任务
        /// </summary>
        private static void MsgTask(object sender, ElapsedEventArgs e)
        {
            ThreadHelper.Run(() =>
            {
                try
                {
                    m_WebApiMsgDal.DeleteTimeoutMsg(); //删除超时的消息
                    List<WEBAPI_MSG> list = m_WebApiMsgDal.GetMsgList();
                    foreach (WEBAPI_MSG msg in list)
                    {
                        WebApiMsgContent msgContent = JsonConvert.DeserializeObject<WebApiMsgContent>(msg.MSGCONTENT);
                        msgContent.callbackId = msg.ID;

                        Send(msgContent, msg.RECEIVER, msg.RECEIVER, null);
                    }
                    if (list.Count > 0)
                    {
                        LogUtil.Log("已重发" + list.Count.ToString() + "条消息");
                    }
                }
                catch (Exception ex)
                {
                    LogUtil.Error(ex);
                }
                finally
                {
                    _timer.Start();
                }
            });
        }
        #endregion

        #region 接收数据
        /// <summary>
        /// 接收数据
        /// </summary>
        private static void _socketServerHelper_ReceivedSocketResultEvent(object sender, ReceivedSocketResultEventArgs e)
        {
            Func<string, bool> func = (callbackId) =>
            {
                try
                {
                    if (m_WebApiMsgDal.Exists((string)callbackId))
                    {
                        m_WebApiMsgDal.DeleteById((string)callbackId);
                    }
                }
                catch (Exception ex)
                {
                    LogUtil.Error(ex, "删除消息出错");
                    return false;
                }
                return true;
            };

            int tryCount = 0;
            if (e.SocketResult != null)
            {
                while (!func(e.SocketResult.callbackId) && tryCount++ < 10)
                {
                    Thread.Sleep(1000);
                }
            }
        }
        #endregion

        #region Send 发送消息
        /// <summary>
        /// Send 发送消息
        /// </summary>
        public static void Send(WebApiMsgContent msgContent, string roomNo, string devNo, Action<SocketResult> callback = null)
        {
            _socketServerHelper.Send(msgContent, roomNo, devNo, callback);
        }

        /// <summary>
        /// Send 发送消息
        /// </summary>
        public static SocketResult Send(WebApiMsgContent msgContent, string roomNo, string devNo)
        {
            try
            {
                return _socketServerHelper.Send(msgContent, roomNo, devNo);
            }
            catch (Exception ex)
            {
                LogUtil.Error(ex, "发送消息失败");
                return null;
            }
        }
        #endregion

        #region 释放资源
        /// <summary>
        /// 释放资源
        /// </summary>
        public static void Dispose()
        {
            ThreadHelper.Run(() =>
            {
                _timer.Stop();
                _timer.Elapsed -= MsgTask;
                _timer.Close();
                _timer.Dispose();
                _timer = null;

                _socketServerHelper.StopServer();
                _socketServerHelper.ReceivedSocketResultEvent -= _socketServerHelper_ReceivedSocketResultEvent;

                LogUtil.Log("Web API 消息工具类 释放资源成功");
            }, (ex) =>
            {
                LogUtil.Error("Web API 消息工具类 释放资源失败");
            });
        }
        #endregion

    }
}
View Code

    Web API 接口 MsgController 代码:

C# .NET Socket SocketHelper 高性能 5000客户端 异步接收数据
using DBUtil;
using Models;
using Newtonsoft.Json;
using *WebApi.DAL;
using Swashbuckle.Swagger.Annotations;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.ComponentModel.DataAnnotations;
using System.Globalization;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Web;
using System.Web.Http;
using Utils;

namespace *WebApi.Controllers.Common
{
    /// <summary>
    /// Web API 消息
    /// </summary>
    [RoutePrefix("api/msg")]
    public class MsgController : ApiController
    {
        #region 变量属性
        private WebApiMsgDal m_WebApiMsgDal = ServiceHelper.Get<WebApiMsgDal>();
        private TwoCJsDal m_TwoCJsDal = ServiceHelper.Get<TwoCJsDal>();
        private BackstageAppInstallDal m_BackstageAppInstallDal = ServiceHelper.Get<BackstageAppInstallDal>();
        #endregion

        #region 发送消息
        /// <summary>
        /// 发送消息
        /// </summary>
        /// <param name="data">POST数据</param>
        [HttpPost]
        [Route("SendMsg")]
        [SwaggerResponse(HttpStatusCode.OK, "返回JSON", typeof(JsonResult<SendMsgData>))]
        public HttpResponseMessage SendMsg([FromBody] SendMsgData data)
        {
            JsonResult jsonResult = null;

            if (data == null || data.msgContent == null)
            {
                jsonResult = new JsonResult("请检查参数格式", ResultCode.参数不正确);
                return ApiHelper.ToJson(jsonResult);
            }

            if (data.roomNo != null && data.devNos != null)
            {
                jsonResult = new JsonResult("监室号和设备编码(指仓内屏或仓外屏的设备编码)不能都有值,应填写其中一个,或者都不填写", ResultCode.参数不正确);
                return ApiHelper.ToJson(jsonResult);
            }

            if (string.IsNullOrWhiteSpace(data.msgContent.msgTime)) data.msgContent.msgTime = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss");

            if (!string.IsNullOrWhiteSpace(data.devNos))
            {
                try
                {
                    foreach (string devNo in data.devNos.Split(,))
                    {
                        data.msgContent.callbackId = Guid.NewGuid().ToString("N");
                        MsgUtil.Send(data.msgContent, null, devNo, (socketResult) =>
                        {
                            if (socketResult == null || !socketResult.success)
                            {
                                WEBAPI_MSG info = new WEBAPI_MSG();
                                info.ID = Guid.NewGuid().ToString("N");
                                info.MSGTIME = DateTime.ParseExact(data.msgContent.msgTime, "yyyy-MM-dd HH:mm:ss", CultureInfo.InvariantCulture);
                                info.RECEIVER = devNo;
                                info.MSGCONTENT = JsonConvert.SerializeObject(data.msgContent);
                                m_WebApiMsgDal.Insert(info);
                            }
                        });
                    }
                }
                catch (Exception ex)
                {
                    LogUtil.Error(ex, "消息发送失败");
                    jsonResult = new JsonResult("消息发送失败", ResultCode.操作失败);
                    return ApiHelper.ToJson(jsonResult);
                }
            }
            else
            {
                if (!string.IsNullOrWhiteSpace(data.roomNo))
                {
                    try
                    {
                        data.msgContent.callbackId = Guid.NewGuid().ToString("N");
                        MsgUtil.Send(data.msgContent, data.roomNo, null, (socketResult) =>
                        {
                            if (socketResult == null || !socketResult.success)
                            {
                                WEBAPI_MSG info = new WEBAPI_MSG();
                                info.ID = Guid.NewGuid().ToString("N");
                                info.MSGTIME = DateTime.ParseExact(data.msgContent.msgTime, "yyyy-MM-dd HH:mm:ss", CultureInfo.InvariantCulture);
                                info.RECEIVER = data.roomNo;
                                info.MSGCONTENT = JsonConvert.SerializeObject(data.msgContent);
                                m_WebApiMsgDal.Insert(info);
                            }
                        });
                    }
                    catch (Exception ex)
                    {
                        LogUtil.Error(ex, "消息发送失败");
                        jsonResult = new JsonResult("消息发送失败", ResultCode.操作失败);
                        return ApiHelper.ToJson(jsonResult);
                    }
                }
                else
                {
                    try
                    {
                        List<string> roomNoList = m_TwoCJsDal.GetRoomNoListAll();
                        foreach (string roomNo in roomNoList)
                        {
                            data.msgContent.callbackId = Guid.NewGuid().ToString("N");
                            MsgUtil.Send(data.msgContent, roomNo, null, (socketResult) =>
                            {
                                if (socketResult == null || !socketResult.success)
                                {
                                    WEBAPI_MSG info = new WEBAPI_MSG();
                                    info.ID = Guid.NewGuid().ToString("N");
                                    info.MSGTIME = DateTime.ParseExact(data.msgContent.msgTime, "yyyy-MM-dd HH:mm:ss", CultureInfo.InvariantCulture);
                                    info.RECEIVER = roomNo;
                                    info.MSGCONTENT = JsonConvert.SerializeObject(data.msgContent);
                                    m_WebApiMsgDal.Insert(info);
                                }
                            });
                        }
                    }
                    catch (Exception ex)
                    {
                        LogUtil.Error(ex, "消息发送失败");
                        jsonResult = new JsonResult("消息发送失败", ResultCode.操作失败);
                        return ApiHelper.ToJson(jsonResult);
                    }
                }
            }

            jsonResult = new JsonResult<CommonSubmitResult>(new CommonSubmitResult()
            {
                msg = "消息发送成功"
            });

            return ApiHelper.ToJson(jsonResult);
        }
        #endregion

        #region APP安装消息反馈
        /// <summary>
        /// APP安装消息反馈
        /// </summary>
        /// <param name="data">POST数据</param>
        [HttpPost]
        [Route("InstallMsgFeedback")]
        [SwaggerResponse(HttpStatusCode.OK, "返回JSON", typeof(JsonResult<CommonSubmitResult>))]
        public HttpResponseMessage InstallMsgFeedback([FromBody] InstallMsgFeedbackData data)
        {
            JsonResult jsonResult = null;

            if (data == null)
            {
                jsonResult = new JsonResult("请检查参数格式", ResultCode.参数不正确);
                return ApiHelper.ToJson(jsonResult);
            }

            BACKSTAGE_APP_INSTALL info = m_BackstageAppInstallDal.Get(data.id);
            if (info != null)
            {
                if (data.success)
                {
                    info.STATUS = "1";
                    m_BackstageAppInstallDal.Update(info);
                }

                jsonResult = new JsonResult<CommonSubmitResult>(new CommonSubmitResult()
                {
                    msg = "反馈成功",
                    id = info.ID
                });
            }
            else
            {
                jsonResult = new JsonResult("反馈失败:安装记录不存在", ResultCode.操作失败);
                return ApiHelper.ToJson(jsonResult);
            }

            return ApiHelper.ToJson(jsonResult);
        }
        #endregion

        #region 发起点名成功反馈
        /// <summary>
        /// 发起点名成功反馈
        /// </summary>
        /// <param name="data">POST数据</param>
        [HttpPost]
        [Route("RollCallMsgFeedback")]
        [SwaggerResponse(HttpStatusCode.OK, "返回JSON", typeof(JsonResult<CommonSubmitResult>))]
        public HttpResponseMessage RollCallMsgFeedback([FromBody] RollCallMsgFeedbackData data)
        {
            JsonResult jsonResult = null;

            if (data == null)
            {
                jsonResult = new JsonResult("请检查参数格式", ResultCode.参数不正确);
                return ApiHelper.ToJson(jsonResult);
            }

            //TODO:此处尚未完成

            jsonResult = new JsonResult<CommonSubmitResult>(new CommonSubmitResult()
            {
                msg = "反馈成功",
                id = null
            });

            return ApiHelper.ToJson(jsonResult);
        }
        #endregion

    }

    #region SendMsgData 发送消息数据
    /// <summary>
    /// 发送消息数据
    /// </summary>
    [MyValidate]
    public class SendMsgData
    {
        /// <summary>
        /// 消息内容
        /// </summary>
        [Required]
        public WebApiMsgContent msgContent { get; set; }

        /// <summary>
        /// 监室号(如果为空,并且devNos也为空,则发送到所有监室;如果为空,并且devNos不为空,则按devNos发送)
        /// </summary>
        public string roomNo { get; set; }

        /// <summary>
        /// 设备编码(逗号隔开)(仓内屏或仓外屏的设备编码)
        /// </summary>
        public string devNos { get; set; }
    }

    /// <summary>
    /// APP安装消息反馈
    /// </summary>
    [MyValidate]
    public class InstallMsgFeedbackData
    {
        /// <summary>
        /// 安装记录ID
        /// </summary>
        [Required]
        public string id { get; set; }

        /// <summary>
        /// 安装是否成功
        /// </summary>
        [Required]
        public bool success { get; set; }

        /// <summary>
        /// 安装失败原因
        /// </summary>
        public string errorMsg { get; set; }
    }

    /// <summary>
    /// 发起点名成功反馈
    /// </summary>
    [MyValidate]
    public class RollCallMsgFeedbackData
    {
        /// <summary>
        /// 点名ID
        /// </summary>
        [Required]
        public string id { get; set; }

        /// <summary>
        /// 发起点名是否成功
        /// </summary>
        [Required]
        public bool success { get; set; }

        /// <summary>
        /// 发起点名失败原因
        /// </summary>
        public string errorMsg { get; set; }
    }
    #endregion

}
View Code

     

    C# Socket,没有人比我的代码更简单明了了

 

C# .NET Socket SocketHelper 高性能 5000客户端 异步接收数据

上一篇:Photoshop教程:花卉更加嫩绿清晰


下一篇:MSSQL数据库索引的应用