1.Netty的介绍
-
Netty
是由JBOSS
提供的一个Java
开源框架,现为Github
上的独立项目。 -
Netty
是一个异步的、基于事件驱动的网络应用框架,用以快速开发高性能、高可靠性的网络IO
程序。 -
Netty
主要针对在TCP
协议下,面向Client
端的高并发应用,或者Peer-to-Peer
场景下的大量数据持续传输的应用。 -
Netty
本质是一个NIO
框架,适用于服务器通讯相关的多种应用场景。 - 要透彻理解
Netty
,需要先学习NIO
,这样我们才能阅读Netty
的源码。
2.原生NIO存在的问题
-
NIO
的类库和API
繁杂,使用麻烦:需要熟练掌握Selector
、ServerSocketChannel
、SocketChannel
、ByteBuffer
等。 - 需要具备其他的额外技能:要熟悉
Java
多线程编程,因为NIO
编程涉及到Reactor
模式,你必须对多线程和网络编程非常熟悉,才能编写出高质量的NIO
程序。 - 开发工作量和难度都非常大:例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常流的处理等等。4.
JDK NIO
的Bug
:例如臭名昭著的Epoll Bug
,它会导致Selector
空轮询,最终导致CPU100%
。直到JDK1.7
版本该问题仍旧存在,没有被根本解决。
3.Netty的官网说明
Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients.
4.Netty的优点
Netty
对 JDK
自带的 NIO
的 API
进行了封装,解决了上述问题。
- 设计优雅:适用于各种传输类型的统一
API
阻塞和非阻塞Socket
;基于灵活且可扩展的事件模型,可以清晰地分离关注点;高度可定制的线程模型-单线程,一个或多个线程池。 - 使用方便:详细记录的
Javadoc
,用户指南和示例;没有其他依赖项,JDK5(Netty3.x)
或6(Netty4.x)
就足够了。 - 高性能、吞吐量更高:延迟更低;减少资源消耗;最小化不必要的内存复制。
- 安全:完整的
SSL/TLS
和StartTLS
支持。 - 社区活跃、不断更新:社区活跃,版本迭代周期短,发现的
Bug
可以被及时修复,同时,更多的新功能会被加入。
5.Netty的模型
5.1 Netty工作原理示意图
Netty
主要基于主从 Reactors
多线程模型(如图)做了一定的改进,其中主从 Reactor
多线程模型有多个 Reactor
5.2 对上图的说明
-
Netty
抽象出两组线程池BossGroup
专门负责接收客户端的连接,WorkerGroup
专门负责网络的读写 -
BossGroup
和WorkerGroup
类型都是NioEventLoopGroup
-
NioEventLoopGroup
相当于一个事件循环组,这个组中含有多个事件循环,每一个事件循环是NioEventLoop
-
NioEventLoop
表示一个不断循环的执行处理任务的线程,每个NioEventLoop
都有一个Selector
,用于监听绑定在其上的socket
的网络通讯 -
NioEventLoopGroup
可以有多个线程,即可以含有多个NioEventLoop
- 每个
BossNioEventLoop
循环执行的步骤有3
步- 轮询
accept
事件 - 处理
accept
事件,与client
建立连接,生成NioScocketChannel
,并将其注册到某个worker
NIOEventLoop
上的Selector
- 处理任务队列的任务,即
runAllTasks
- 轮询
- 每个
Worker
NIOEventLoop
循环执行的步骤- 轮询
read
,write
事件 - 处理
I/O
事件,即read
,write
事件,在对应NioScocketChannel
处理 - 处理任务队列的任务,即
runAllTasks
- 轮询
- 每个
Worker
NIOEventLoop
处理业务时,会使用pipeline
(管道),pipeline
中包含了channel
,即通过pipeline
可以获取到对应通道,管道中维护了很多的处理器
6.Netty入门实例--TCP服务
实例要求:使用 IDEA
创建 Netty
项目
-
Netty
服务器在9000
端口监听,客户端能发送消息给服务器16进制报文 - 服务器可以回复消息给客户端发送的16进制报文
- 目的:对
Netty
线程模型有一个初步认识,便于理解Netty
模型理论
6.1 引入Netty依赖
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> </dependency>
6.2 服务端
public class Myserver { public static void main(String[] args) throws InterruptedException { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workGroup = new NioEventLoopGroup(); ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workGroup) //保持连接数 .option(ChannelOption.SO_BACKLOG, 1024) // 保持连接 .childOption(ChannelOption.SO_KEEPALIVE, true) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { //设置客户端连接socket参数 @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast(new DecodeUtil()); pipeline.addLast(new EncodeUtil()); pipeline.addLast(new ServerHandler()); } }); ChannelFuture future = null; try { future = serverBootstrap.bind(9000).sync(); if (future.isSuccess()) { System.out.println("Netty服务端启动成功"); } else { System.out.println("Netty服务端启动失败"); } future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } } }
6.3 解码器
/** * 解码器 */ public class DecodeUtil extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) { try { //byteBuf的长度 int bufNum = byteBuf.readableBytes(); //byteBuf当前的读索引 int readerIndex = byteBuf.readerIndex(); byte[] bytes = new byte[2]; if (bufNum >= 4) { //byteBuf的长度大于4, //查看前两个字节判断消息头 for (int index = 0; index < 2; index++) { bytes[index] = byteBuf.getByte(readerIndex); readerIndex++; } //将前2个字节转换为16进制 String header = ConvertCode.receiveHexToString(bytes); int length = 0; if (header.toUpperCase().equals("AAF5")) { //获取包长度 bytes = new byte[2]; bytes[0] = byteBuf.getByte(2); bytes[1] = byteBuf.getByte(3); length = ConvertCode.getShort(bytes, 0); } else { return; } if (bufNum >= length) { bytes = new byte[length]; byteBuf.readBytes(bytes); list.add(bytes); } } } catch (Exception e) { e.printStackTrace(); } } }
6.4 编码器
/** * 编码器 */ public class EncodeUtil extends MessageToByteEncoder { @Override protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception { // netty需要用ByteBuf传输 ByteBuf buff = Unpooled.buffer(); // 对接需要16进制 buff.writeBytes(ConvertCode.hexString2Bytes(o.toString())); channelHandlerContext.writeAndFlush(buff); } }
6.5 业务处理的handler
public class ServerHandler extends SimpleChannelInboundHandler<byte[]> { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("客户端" + ctx.channel().remoteAddress() + "连接了"); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("客户端" + ctx.channel().remoteAddress() + "离开了"); } @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, byte[] msg) { try { System.out.println("收到消息:" + ConvertCode.receiveHexToString(msg)); //报文原样返回 channelHandlerContext.writeAndFlush(ConvertCode.receiveHexToString(msg)); } catch (Exception e) { e.printStackTrace(); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { super.channelReadComplete(ctx); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
6.5 工具类
public class ConvertCode { /** * @Title:bytes2HexString * @Description:字节数组转16进制字符串 * @param b * 字节数组 * @return 16进制字符串 * @throws */ public static String bytes2HexString(byte[] b) { StringBuffer result = new StringBuffer(); String hex; for (int i = 0; i < b.length; i++) { hex = Integer.toHexString(b[i] & 0xFF); if (hex.length() == 1) { hex = ‘0‘ + hex; } result.append(hex.toUpperCase()); } return result.toString(); } /** * @Title:hexString2Bytes * @Description:16进制字符串转字节数组 * @param src 16进制字符串 * @return 字节数组 */ public static byte[] hexString2Bytes(String src) { int l = src.length() / 2; byte[] ret = new byte[l]; for (int i = 0; i < l; i++) { ret[i] = (byte) Integer.valueOf(src.substring(i * 2, i * 2 + 2), 16).byteValue(); } return ret; } /** * @Title:string2HexString * @Description:字符串转16进制字符串 * @param strPart 字符串 * @return 16进制字符串 */ public static String string2HexString(String strPart) { StringBuffer hexString = new StringBuffer(); for (int i = 0; i < strPart.length(); i++) { int ch = (int) strPart.charAt(i); String strHex = Integer.toHexString(ch); hexString.append(strHex); } return hexString.toString(); } /** * @Title:hexString2String * @Description:16进制字符串转字符串 * @param src * 16进制字符串 * @return 字节数组 * @throws */ public static String hexString2String(String src) { String temp = ""; for (int i = 0; i < src.length() / 2; i++) { //System.out.println(Integer.valueOf(src.substring(i * 2, i * 2 + 2),16).byteValue()); temp = temp+ (char)Integer.valueOf(src.substring(i * 2, i * 2 + 2),16).byteValue(); } return temp; } /** * @Title:char2Byte * @Description:字符转成字节数据char-->integer-->byte * @param src * @return * @throws */ public static Byte char2Byte(Character src) { return Integer.valueOf((int)src).byteValue(); } /** * @Title:intToHexString * @Description:10进制数字转成16进制 * @param a 转化数据 * @param len 占用字节数 * @return * @throws */ public static String intToHexString(int a,int len){ len<<=1; String hexString = Integer.toHexString(a); int b = len -hexString.length(); if(b>0){ for(int i=0;i<b;i++) { hexString = "0" + hexString; } } return hexString; } /** * 将16进制的2个字符串进行异或运算 * http://blog.csdn.net/acrambler/article/details/45743157 * @param strHex_X * @param strHex_Y * 注意:此方法是针对一个十六进制字符串一字节之间的异或运算,如对十五字节的十六进制字符串异或运算:1312f70f900168d900007df57b4884 先进行拆分:13 12 f7 0f 90 01 68 d9 00 00 7d f5 7b 48 84 13 xor 12-->1 1 xor f7-->f6 f6 xor 0f-->f9 .... 62 xor 84-->e6 即,得到的一字节校验码为:e6 * @return */ public static String xor(String strHex_X,String strHex_Y){ //将x、y转成二进制形式 String anotherBinary=Integer.toBinaryString(Integer.valueOf(strHex_X,16)); String thisBinary=Integer.toBinaryString(Integer.valueOf(strHex_Y,16)); String result = ""; //判断是否为8位二进制,否则左补零 if(anotherBinary.length() != 8){ for (int i = anotherBinary.length(); i <8; i++) { anotherBinary = "0"+anotherBinary; } } if(thisBinary.length() != 8){ for (int i = thisBinary.length(); i <8; i++) { thisBinary = "0"+thisBinary; } } //异或运算 for(int i=0;i<anotherBinary.length();i++){ //如果相同位置数相同,则补0,否则补1 if(thisBinary.charAt(i)==anotherBinary.charAt(i)) result+="0"; else{ result+="1"; } } return Integer.toHexString(Integer.parseInt(result, 2)); } /** * Convert byte[] to hex string.这里我们可以将byte转换成int * @param src byte[] data * @return hex string */ public static String bytes2Str(byte[] src){ StringBuilder stringBuilder = new StringBuilder(""); if (src == null || src.length <= 0) { return null; } for (int i = 0; i < src.length; i++) { int v = src[i] & 0xFF; String hv = Integer.toHexString(v); if (hv.length() < 2) { stringBuilder.append(0); } stringBuilder.append(hv); } return stringBuilder.toString(); } /** * @return 接收字节数据并转为16进制字符串 */ public static String receiveHexToString(byte[] by) { try { /*io.netty.buffer.WrappedByteBuf buf = (WrappedByteBuf)msg; ByteBufInputStream is = new ByteBufInputStream(buf); byte[] by = input2byte(is);*/ String str = bytes2Str(by); str = str.toUpperCase(); return str; } catch (Exception ex) { ex.printStackTrace(); System.out.println("接收字节数据并转为16进制字符串异常"); } return null; } /** * "7dd",4,‘0‘==>"07dd" * @param input 需要补位的字符串 * @param size 补位后的最终长度 * @param symbol 按symol补充 如‘0‘ * @return * N_TimeCheck中用到了 */ public static String fill(String input, int size, char symbol) { while (input.length() < size) { input = symbol + input; } return input; } // public static void main(String args[]) { // String productNo = "3030303032383838"; // System.out.println(hexString2String(productNo)); // productNo = "04050103000001070302050304"; // System.out.println(hexString2String(productNo)); // } /** * 获取short,小端 * * @param src * @param index * @return */ public static short getShort(byte[] src, int index) { return (short) (((src[index + 1] << 8) | src[index] & 0xff)); } }
6.6 服务运行演示
6.6.1 使用TCP Socket测试工具建立连接,发送报文:
AAF56E0010026A0000000000363130313133303032373030303031000000000000000000000000000000000000A851000001006400000001010000010A02ED0B000020210414180000FF00000000000000000000000000000000000000000000000000000035C901000000000024
6.6.2 我们看到服务器收到了客户端的报文,并将报文原样返回给了客户端
完毕,有什么问题,请大家指正!