DotNetty实现高性能tcpserver,超时断开链路,垃圾包,断包,粘包处理
初始化类
using DotNetty.Handlers.Timeout; using DotNetty.Transport.Bootstrapping; using DotNetty.Transport.Channels; using DotNetty.Transport.Channels.Sockets; using System; using System.Threading; namespace DotNettyUtil.tcpserver { public class TcpServerIntance { private int port; private IProtocolHandler handle; private static int READ_IDEL_TIME_OUT = 0; // 读超时 private static int WRITE_IDEL_TIME_OUT = 0;// 写超时 private static int ALL_IDEL_TIME_OUT = 90; // 所有超时 // IEventLoopGroup bossGroup; IEventLoopGroup workerGroup; IChannel boundChannel; public TcpServerIntance(int port, IProtocolHandler handle) { this.port = port; this.handle = handle; ThreadPool.QueueUserWorkItem(new WaitCallback(RunThread)); } async void RunThread(object obj) { bossGroup = new MultithreadEventLoopGroup(); workerGroup = new MultithreadEventLoopGroup(); try { var bootstrap = new ServerBootstrap(); bootstrap.Group(bossGroup, workerGroup); bootstrap.ChildHandler(new ActionChannelInitializer<IChannel>(channel => { IChannelPipeline pipeline = channel.Pipeline; pipeline.AddLast(new IdleStateHandler(READ_IDEL_TIME_OUT, WRITE_IDEL_TIME_OUT, ALL_IDEL_TIME_OUT)); // 需要定时心跳包 pipeline.AddLast(new TcpServerHandler(handle)); })); bootstrap.Channel<TcpServerSocketChannel>() .Option(ChannelOption.SoBacklog, 128) .ChildOption(ChannelOption.SoKeepalive, true); // LogHelper.AppendLog("TcpServer 监听端口:" + port); // boundChannel = await bootstrap.BindAsync(port); } catch (Exception ex) { LogHelper.AppendLog("[Error] TcpServer_RunThread,errmsg=" + ex.Message); } } void Close() { try { boundChannel.CloseAsync(); } catch (Exception ex) { LogHelper.AppendLog("[Error] TcpServer_Close,errmsg=" + ex.Message); } finally { bossGroup.ShutdownGracefullyAsync(); workerGroup.ShutdownGracefullyAsync(); } } } }
协议解析类
using DotNetty.Buffers; using DotNetty.Common.Utilities; using DotNetty.Transport.Channels; using DotNettyUtil.tcpserver; using System; namespace TcpServer { public class ClientProtocol : IProtocolHandler { const int HEAD1 = 0x48;// H const int HEAD2 = 0x54;// T const int HEAD3 = 0x45;// E const int HEAD4 = 0x4D;// M const int HEAD5 = 0x50;// P const int HEAD6 = 0x3D;// = public const char SPLIT1 = '#'; const char SPLIT2 = '@'; const char SPLIT3 = '='; const char SPLIT4 = '+'; const char SPLIT5 = ','; public void ChannelRead(IChannelHandlerContext ctx, object msg) { if (!ctx.Channel.Active) return; string data_content = ""; try { string sn = TcpServerMgr.GetSN(ctx); UidEntity uidEntity = TcpServerMgr.GetUid(sn); string uid = ""; if (null != uidEntity) { uid = uidEntity.Uid; uidEntity.LastTime = Common.GetNowTimestamp(); } if (!TcpServerMgr.dicSn2Buffer.ContainsKey(sn)) TcpServerMgr.dicSn2Buffer.Add(sn, Unpooled.Buffer(1024)); IByteBuffer oldBuffer = TcpServerMgr.dicSn2Buffer[sn]; if (null != msg) { IByteBuffer recvBuffer = (IByteBuffer)msg; int size = recvBuffer.ReadableBytes; if (size > 0) { //recvBuffer.markReaderIndex(); oldBuffer.WriteBytes(recvBuffer); ReferenceCountUtil.Release(recvBuffer); //recvBuffer.resetReaderIndex(); //byte[] recv = new byte[size]; //recvBuffer.readBytes(recv); //CTxtHelp.AppendLog("接收数据:" + CDataHelper.ArrayByteToString(recv)); } } byte head1 = 0; byte head2 = 0; byte head3 = 0; byte head4 = 0; byte head5 = 0; byte head6 = 0; bool headok = false; oldBuffer.MarkReaderIndex(); while (oldBuffer.IsReadable()) { head1 = oldBuffer.ReadByte(); if (HEAD1 == head1)// 垃圾包处理 { head2 = oldBuffer.ReadByte(); if (!oldBuffer.IsReadable()) { oldBuffer.ResetReaderIndex(); return; } head3 = oldBuffer.ReadByte(); if (!oldBuffer.IsReadable()) { oldBuffer.ResetReaderIndex(); return; } head4 = oldBuffer.ReadByte(); if (!oldBuffer.IsReadable()) { oldBuffer.ResetReaderIndex(); return; } head5 = oldBuffer.ReadByte(); if (!oldBuffer.IsReadable()) { oldBuffer.ResetReaderIndex(); return; } head6 = oldBuffer.ReadByte(); if (!oldBuffer.IsReadable()) { oldBuffer.ResetReaderIndex(); return; } if (HEAD2 == head2 && HEAD3 == head3 && HEAD4 == head4 && HEAD5 == head5 && HEAD6 == head6) { headok = true; break; } break; } else { oldBuffer.MarkReaderIndex(); LogHelper.AppendLog("Error,Unable to parse the data:" + head1 + " source:" + (string.IsNullOrEmpty(uid) ? sn : uid)); } } if (!oldBuffer.IsReadable()) { if (headok) oldBuffer.ResetReaderIndex(); return; } //byte[] arrlen = bBuffer.GetByteArray(4); if (!analysis.IsRemainData(iPosition, bBuffer, analysis)) return; byte[] arrlen = new byte[4]; oldBuffer.ReadBytes(arrlen); if (!oldBuffer.IsReadable()) { oldBuffer.ResetReaderIndex(); return; } int len = Common.String2Int(Common.ByteToString(arrlen)); if (-1 == len) return; if (TcpServerMgr.GetWaitRecvRemain(oldBuffer, len)) { oldBuffer.ResetReaderIndex(); return; } byte[] source = new byte[len]; oldBuffer.ReadBytes(source); string data = Common.ByteToString(source); if (null == data || 0 == data.Length || data.Length - 1 != data.LastIndexOf(SPLIT1)) { return; } data = data.Substring(1, data.Length - 2); string[] item = data.Split(SPLIT1); if (null == item || 4 != item.Length) { return; } uid = item[0]; string taskid = item[1]; int cmd = Common.String2Int(item[2]); string content = item[3]; //Program.AddMessage("R: [" + sn + "] cmd=" + cmd.ToString() + " data=" + data); switch (cmd) { case 1: //analysis.Msg = "ok"; TcpServerMgr.AddUid(sn, uid, ctx); LogHelper.AppendLog("心跳反馈,uid=" + uid); break; case 2: //analysis.Msg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"); break; case 3: // HTEMP=0263#WaterMeter-001#1520557004#03#buildid=44@edmid=37@meterid=1228@senddate=2018-02-05 17:36:22@[{132,0.0000}+{132,0.0000}+{132,0.0000}+{132,0.0000}+{132,0.0000}+{132,0.0000}+{132,0.0000}+{132,0.0000}+{132,0.0000}+{132,0.0000}+{132,0.0000}+{132,0.0000}+{132,0.0000}]# //analysis.Msg = "ok"; break; } if (!oldBuffer.IsReadable()) { oldBuffer.Clear(); } else { ChannelRead(ctx, null);// 处理粘包 } } catch (Exception ex) { LogHelper.AppendLog("[Error] ClientProtocol_ChannelRead,data_content=" + data_content + ",errmsg=" + ex.Message); } } } }