能在window(IOCP)/linux(epoll)运行,基于C# .net standard2.0 写的socket框架,可使用于.net Framework/dotnet core程序集,.使用异步连接,异步发送,异步接收,性能爆表,并且通过压力测试。
源码下载地址:
https://download.csdn.net/download/guosa542129/11980602
通过并发测试,多线程测试程序地址:
https://download.csdn.net/download/guosa542129/11980605
操作过程:
安装NuGet: https://www.nuget.org/packages/socket.core/
Package Manager: Install-Package socket.core
.Net CLI :dotnet add package socket.core
Paket CLI:paket add socket.core
一:TCP模块介绍
服务端所在socket.core.Server命名空间下,分别为三种模式 push/pull/pack
客户端所在socket.core.Client命名空间下,分别为三种模式 push/pull/pack
主要流程与对应的方法和事件介绍.
注:connectId(int)代表着一个连接对象,data(byte[]),success(bool)
- 1.初始化TCP实现类(对应的三种模式)
实例化服务端类 TcpPushServer/TcpPullServer/TcpPackServer
实例化客户端类 TcpPushClient/TcpPullClient/TcpPackClient
参数介绍int numConnections同时处理的最大连接数,int receiveBufferSize用于每个套接字I/O操作的缓冲区大小(接收端), int overtime超时时长,单位秒.(每10秒检查一次),当值为0时,不设置超时,uint headerFlag包头标记范围0~1023(0x3FF),当包头标识等于0时,不校验包头 - 2.启动监听/连接服务器
服务端 server.Start(port);
客户端 client.Connect(ip,port); - 3.触发连接事件
服务端 server.OnAccept(connectId); 接收到一个连接id,可用他来发送,接收,关闭的标记
客户端 client.OnConnect(success); 接收是否成功连接到服务器 - 4.发送消息
服务端 server.Send(connectId,data,offset,length);
客户端 client.Send(data,offset,length); - 5.触发已发送事件
服务端 server.OnSend(connectId,length);
客户端 client.OnSend(length); - 6.触发接收事件
服务端 server.OnReceive(connectId, data);
客户端 client.OnReceive(data); - 7.关闭连接
服务端 server.Close(connectId);
客户端 client.Close(); - 8.触发关闭连接事件
服务端 server.OnClose(connectId);
客户端 client.OnClose();
三种模型简介
- 一:push
当接收到数据时会触发监听事件OnReceive(connectId,data);把数据立马“推”给应用程序
- 二:pull
当接收到数据时会触发监听事件OnReceive(connectId,length),告诉应用程序当前已经接收到了多少数据长度,应用程序可使用GetLength(connectId)方法检查已接收的数据的长度,如果满足则调用组件的Fetch(connectId,length)方法,把需要的数据“拉”出来
- 三:pack
pack模型组件是push和pull模型的结合体,应用程序不必要处理分包/合包,组件保证每个server.OnReceive(connectId,data)/client.OnReceive(data)事件都向应用程序提供一个完整的数据包
注:pack模型组件会对应用程序发送的每个数据包自动加上4个字节(32bit)的包头,组件接收到数据时,根据包头信息自动分包,每个完整的数据包通过OnReceive(connectId, data)事件发送给应用程序
PACK包头格式(4字节)4*8=32
XXXXXXXXXXYYYYYYYYYYYYYYYYYYYYYY
前10位X为包头标识位,用于数据包校验,有效包头标识取值范围0~1023(0x3FF),当包头标识等于0时,不校验包头,后22位Y为长度位,记录包体长度。有效数据包最大长度不能超过4194303(0x3FFFFF)字节(byte),应用程序可以通过TcpPackServer/TcpPackClient构造函数参数headerFlag设置
服务端其它方法介绍
-
- bool SetAttached(int connectId, object data)
服务端为每个客户端设置附加数据,避免用户自己再建立用户映射表
-
- T GetAttached(int connectId)
获取指定客户端的附加数据
-
- 属性:ConcurrentDictionary<int, string> ClientList
获取正在连接的客户端信息<connectId,ip和端口>
二:核心源码
-
using System; using System.Collections.Generic; using System.Net; using System.Net.Sockets; using System.Text; using System.Threading; using System.Linq; using System.Collections.Concurrent; using socket.core.Common; namespace socket.core.Server { /// <summary> /// tcp Socket监听基库 /// </summary> internal class TcpServer { /// <summary> /// 连接标示 自增长 /// </summary> private int connectId; /// <summary> /// 同时处理的最大连接数 /// </summary> private int m_numConnections; /// <summary> /// 用于每个套接字I/O操作的缓冲区大小 /// </summary> private int m_receiveBufferSize; /// <summary> /// 所有套接字接收操作的一个可重用的大型缓冲区集合。 /// </summary> private BufferManager m_bufferManager; /// <summary> /// 用于监听传入连接请求的套接字 /// </summary> private Socket listenSocket; /// <summary> /// 接受端SocketAsyncEventArgs对象重用池,接受套接字操作 /// </summary> private SocketAsyncEventArgsPool m_receivePool; /// <summary> /// 发送端SocketAsyncEventArgs对象重用池,发送套接字操作 /// </summary> private SocketAsyncEventArgsPool m_sendPool; /// <summary> /// 超时,如果超时,服务端断开连接,客户端需要重连操作 /// </summary> private int overtime; /// <summary> /// 超时检查间隔时间(秒) /// </summary> private int overtimecheck = 1; /// <summary> /// 能接到最多客户端个数的原子操作 /// </summary> private Semaphore m_maxNumberAcceptedClients; /// <summary> /// 已经连接的对象池 /// </summary> internal ConcurrentDictionary<int, ConnectClient> connectClient; /// <summary> /// 发送线程数 /// </summary> private int sendthread = 10; /// <summary> /// 需要发送的数据 /// </summary> private ConcurrentQueue<SendingQueue>[] sendQueues; /// <summary> /// 锁 /// </summary> private Mutex mutex = new Mutex(); /// <summary> /// 连接成功事件 /// </summary> internal event Action<int> OnAccept; /// <summary> /// 接收通知事件 /// </summary> internal event Action<int, byte[], int, int> OnReceive; /// <summary> /// 已送通知事件 /// </summary> internal event Action<int, int> OnSend; /// <summary> /// 断开连接通知事件 /// </summary> internal event Action<int> OnClose; /// <summary> /// 设置基本配置 /// </summary> /// <param name="numConnections">同时处理的最大连接数</param> /// <param name="receiveBufferSize">用于每个套接字I/O操作的缓冲区大小(接收端)</param> /// <param name="overTime">超时时长,单位秒.(每10秒检查一次),当值为0时,不设置超时</param> internal TcpServer(int numConnections, int receiveBufferSize, int overTime) { overtime = overTime; m_numConnections = numConnections; m_receiveBufferSize = receiveBufferSize; m_bufferManager = new BufferManager(receiveBufferSize * m_numConnections, receiveBufferSize); m_receivePool = new SocketAsyncEventArgsPool(m_numConnections); m_sendPool = new SocketAsyncEventArgsPool(m_numConnections); m_maxNumberAcceptedClients = new Semaphore(m_numConnections, m_numConnections); Init(); } /// <summary> /// 初始化服务器通过预先分配的可重复使用的缓冲区和上下文对象。这些对象不需要预先分配或重用,但这样做是为了说明API如何可以易于用于创建可重用对象以提高服务器性能。 /// </summary> private void Init() { connectClient = new ConcurrentDictionary<int, ConnectClient>(); sendQueues = new ConcurrentQueue<SendingQueue>[sendthread]; for (int i = 0; i < sendthread; i++) { sendQueues[i] = new ConcurrentQueue<SendingQueue>(); } //分配一个大字节缓冲区,所有I/O操作都使用一个。这个侍卫对内存碎片 m_bufferManager.InitBuffer(); //预分配的接受对象池socketasynceventargs,并分配缓存 SocketAsyncEventArgs saea_receive; //分配的发送对象池socketasynceventargs,但是不分配缓存 SocketAsyncEventArgs saea_send; for (int i = 0; i < m_numConnections; i++) { //预先接受端分配一组可重用的消息 saea_receive = new SocketAsyncEventArgs(); saea_receive.Completed += new EventHandler<SocketAsyncEventArgs>(IO_Completed); //分配缓冲池中的字节缓冲区的socketasynceventarg对象 m_bufferManager.SetBuffer(saea_receive); m_receivePool.Push(saea_receive); //预先发送端分配一组可重用的消息 saea_send = new SocketAsyncEventArgs(); saea_send.Completed += new EventHandler<SocketAsyncEventArgs>(IO_Completed); m_sendPool.Push(saea_send); } } /// <summary> /// 启动tcp服务侦听 /// </summary> /// <param name="port">监听端口</param> internal void Start(int port) { IPEndPoint localEndPoint = new IPEndPoint(IPAddress.Any, port); //创建listens是传入的套接字。 listenSocket = new Socket(localEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp); listenSocket.NoDelay = true; //绑定端口 listenSocket.Bind(localEndPoint); //挂起的连接队列的最大长度。 listenSocket.Listen(1000); //在监听套接字上接受 StartAccept(null); //发送线程 for (int i = 0; i < sendthread; i++) { Thread thread = new Thread(StartSend); thread.IsBackground = true; thread.Priority = ThreadPriority.AboveNormal; thread.Start(i); } //超时机制 if (overtime > 0) { Thread heartbeat = new Thread(new ThreadStart(() => { Heartbeat(); })); heartbeat.IsBackground = true; heartbeat.Priority = ThreadPriority.Lowest; heartbeat.Start(); } } /// <summary> /// 超时机制 /// </summary> private void Heartbeat() { //计算超时次数 ,超过count就当客户端断开连接。服务端清除该连接资源 int count = overtime / overtimecheck; while (true) { foreach (var item in connectClient.Values) { if (item.keep_alive >= count) { item.keep_alive = 0; CloseClientSocket(item.saea_receive); } } foreach (var item in connectClient.Values) { item.keep_alive++; } Thread.Sleep(overtimecheck * 1000); } } #region Accept /// <summary> /// 开始接受客户端的连接请求的操作。 /// </summary> /// <param name="acceptEventArg">发布时要使用的上下文对象服务器侦听套接字上的接受操作</param> private void StartAccept(SocketAsyncEventArgs acceptEventArg) { if (acceptEventArg == null) { acceptEventArg = new SocketAsyncEventArgs(); acceptEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(IO_Completed); } else { // 套接字必须被清除,因为上下文对象正在被重用。 acceptEventArg.AcceptSocket = null; } m_maxNumberAcceptedClients.WaitOne(); //准备一个客户端接入 if (!listenSocket.AcceptAsync(acceptEventArg)) { ProcessAccept(acceptEventArg); } } /// <summary> /// 当异步连接完成时调用此方法 /// </summary> /// <param name="e">操作对象</param> private void ProcessAccept(SocketAsyncEventArgs e) { connectId++; //把连接到的客户端信息添加到集合中 ConnectClient connecttoken = new ConnectClient(); connecttoken.socket = e.AcceptSocket; //从接受端重用池获取一个新的SocketAsyncEventArgs对象 connecttoken.saea_receive = m_receivePool.Pop(); connecttoken.saea_receive.UserToken = connectId; connecttoken.saea_receive.AcceptSocket = e.AcceptSocket; connectClient.TryAdd(connectId, connecttoken); //一旦客户机连接,就准备接收。 if (!e.AcceptSocket.ReceiveAsync(connecttoken.saea_receive)) { ProcessReceive(connecttoken.saea_receive); } //事件回调 if (OnAccept != null) { OnAccept(connectId); } //接受第二连接的请求 StartAccept(e); } #endregion #region 接受处理 receive /// <summary> /// 接受处理回调 /// </summary> /// <param name="e">操作对象</param> private void ProcessReceive(SocketAsyncEventArgs e) { //检查远程主机是否关闭连接 if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success) { int connectId = (int)e.UserToken; ConnectClient client; if (!connectClient.TryGetValue(connectId, out client)) { return; } //如果接收到数据,超时记录设置为0 if (overtime > 0) { if (client != null) { client.keep_alive = 0; } } //回调 if (OnReceive != null) { if (client != null) { OnReceive(connectId, e.Buffer, e.Offset, e.BytesTransferred); } } //准备下次接收数据 try { if (!e.AcceptSocket.ReceiveAsync(e)) { ProcessReceive(e); } } catch (ObjectDisposedException ex) { if (OnClose != null) { OnClose(connectId); } } } else { CloseClientSocket(e); } } #endregion #region 发送处理 send /// <summary> /// 开始启用发送 /// </summary> private void StartSend(object thread) { while (true) { SendingQueue sending; if (sendQueues[(int)thread].TryDequeue(out sending)) { Send(sending); } else { Thread.Sleep(100); } } } /// <summary> /// 异步发送消息 /// </summary> /// <param name="connectId">连接ID</param> /// <param name="data">数据</param> /// <param name="offset">偏移位</param> /// <param name="length">长度</param> internal void Send(int connectId, byte[] data, int offset, int length) { sendQueues[connectId % sendthread].Enqueue(new SendingQueue() { connectId = connectId, data = data, offset = offset, length = length }); } /// <summary> /// 异步发送消息 /// </summary> /// <param name="sendQuere">发送消息体</param> private void Send(SendingQueue sendQuere) { ConnectClient client; if (!connectClient.TryGetValue(sendQuere.connectId, out client)) { return; } //如果发送池为空时,临时新建一个放入池中 mutex.WaitOne(); if (m_sendPool.Count == 0) { SocketAsyncEventArgs saea_send = new SocketAsyncEventArgs(); saea_send.Completed += new EventHandler<SocketAsyncEventArgs>(IO_Completed); m_sendPool.Push(saea_send); } SocketAsyncEventArgs sendEventArgs = m_sendPool.Pop(); mutex.ReleaseMutex(); sendEventArgs.UserToken = sendQuere.connectId; sendEventArgs.SetBuffer(sendQuere.data, sendQuere.offset, sendQuere.length); try { if (!client.socket.SendAsync(sendEventArgs)) { ProcessSend(sendEventArgs); } } catch (ObjectDisposedException ex) { if (OnClose != null) { OnClose(sendQuere.connectId); } } sendQuere = null; } /// <summary> /// 发送回调 /// </summary> /// <param name="e">操作对象</param> private void ProcessSend(SocketAsyncEventArgs e) { if (e.SocketError == SocketError.Success) { m_sendPool.Push(e); if (OnSend != null) { OnSend((int)e.UserToken, e.BytesTransferred); } } else { CloseClientSocket(e); } } #endregion /// <summary> /// 每当套接字上完成接收或发送操作时,都会调用此方法。 /// </summary> /// <param name="sender"></param> /// <param name="e">与完成的接收操作关联的SocketAsyncEventArg</param> private void IO_Completed(object sender, SocketAsyncEventArgs e) { //确定刚刚完成哪种类型的操作并调用相关的处理程序 switch (e.LastOperation) { case SocketAsyncOperation.Receive: ProcessReceive(e); break; case SocketAsyncOperation.Send: ProcessSend(e); break; case SocketAsyncOperation.Accept: ProcessAccept(e); break; default: break; } } #region 断开连接处理 /// <summary> /// 客户端断开一个连接 /// </summary> /// <param name="connectId">连接标记</param> internal void Close(int connectId) { ConnectClient client; if (!connectClient.TryGetValue(connectId, out client)) { return; } CloseClientSocket(client.saea_receive); } /// <summary> /// 断开一个连接 /// </summary> /// <param name="e">操作对象</param> private void CloseClientSocket(SocketAsyncEventArgs e) { if (e.LastOperation == SocketAsyncOperation.Receive) { int connectId = (int)e.UserToken; ConnectClient client; if (!connectClient.TryGetValue(connectId, out client)) { return; } if (client.socket.Connected == false) { return; } try { client.socket.Shutdown(SocketShutdown.Both); } // 抛出客户端进程已经关闭 catch (Exception) { } client.socket.Close(); m_receivePool.Push(e); m_maxNumberAcceptedClients.Release(); if (OnClose != null) { OnClose(connectId); } connectClient.TryRemove((int)e.UserToken, out client); client = null; } } #endregion #region 附加数据 /// <summary> /// 给连接对象设置附加数据 /// </summary> /// <param name="connectId">连接标识</param> /// <param name="data">附加数据</param> /// <returns>true:设置成功,false:设置失败</returns> internal bool SetAttached(int connectId, object data) { ConnectClient client; if (!connectClient.TryGetValue(connectId, out client)) { return false; } client.attached = data; return true; } /// <summary> /// 获取连接对象的附加数据 /// </summary> /// <param name="connectId">连接标识</param> /// <returns>附加数据,如果没有找到则返回null</returns> internal T GetAttached<T>(int connectId) { ConnectClient client; if (!connectClient.TryGetValue(connectId, out client)) { return default(T); } else { return (T)client.attached; } } #endregion } } socket核心类
1.初始化UDP实现类UdpServer/UdpClients
服务端socket.core.Server.UdpServer
客户端socket.core.Client.UdpClients
参数int receiveBufferSize用于每个套接字I/O操作的缓冲区大小(接收端) -
2.发送数据
服务端 server.Send(remoteEndPoint,data,offset,length)
客户端 client.Send(data,offset,length)
客户端 client.Send(remoteEndPoint,data,offset,length) -
3.触发已发送事件
服务端 server.OnSend(remoteEndPoint,length)
客户端 client.OnSend(length) -
3.触发接收事件
服务端 server.OnReceive(remoteEndPoint,data,offset,length)
客户端 client.OnReceive(data,offset,length)