一、Netty编解码器
(一)Netty编解码器概述
1、Java的编解码
在Java中编码(Encode)称为序列化, 它将对象序列化为字节数组,⽤于⽹络传输、数据持久化或者其它⽤途。解码(Decode)称为反序列化,它把从⽹络、磁盘等读取的字节数组还原成原始对象(通常是原始对象的拷⻉),以⽅便后续的业务逻辑操作。
java序列化对象只需要实现java.io.Serializable接⼝并⽣成序列化ID,这个类就能够通过java.io.ObjectInput和java.io.ObjectOutput序列化和反序列化。
java序列化⽬的是为了⽹络传输和对象持久化。java序列化存在一系列的缺点,例如⽆法跨语⾔、序列化后码流太⼤、序列化性能太低等问题。同时java序列化仅仅是Java编解码技术的⼀种,由于它的种种缺陷,衍⽣出了多种编解码技术和框架,这些编解码框架实现了消息的⾼效序列化。
2、Netty编解码器基本说明
在Netty中,ChannelHandler 充当了处理⼊站和出站数据的应⽤程序逻辑的容器。例如,实现ChannelInboundHandler 接⼝(或 ChannelInboundHandlerAdapter),就可以接收⼊站事件和数据,这些数据会被业务逻辑处理。当要给客户端发送响应时,也可以从 ChannelInboundHandler 冲刷数据。业务逻辑通常写在⼀个或者多个 ChannelInboundHandler 中。ChannelOutboundHandler 原理⼀样,只不过它是⽤来处理出站数据的。
ChannelPipeline 提供了 ChannelHandler 链的容器。如果事件的运动⽅向是从客户端到服务端的,如果我们站在客户端的角度,那么就是出站事件,即客户端发送给服务端的数据会通过pipeline 中的⼀系列 ChannelOutboundHandler,并被这些 Handler 处理,反之则称为⼊站的。
3、编码解码器在⽹络应⽤中需要实现某种编解码器,将原始字节数据与⾃定义的消息对象进⾏互相转换。⽹络中都是以字节码的数据形式来传输数据的,服务器编码数据后发送到客户端,客户端需要对数据进⾏解码。
netty提供了强⼤的编解码器框架,使得我们编写⾃定义的编解码器很容易,也容易封装重⽤。同时Netty 的编(解)码器实现了 ChannelHandlerAdapter,也是⼀种特殊的 ChannelHandler,所以依赖于 ChannelPipeline,可以将多个编(解)码器链接在⼀起,以实现复杂的转换逻辑。对于Netty⽽⾔,编解码器由两部分组成:编码器、解码器。
解码器:负责将消息从字节或其他序列形式转成指定的消息对象。负责处理⼊站 InboundHandler数据;
编码器:将消息对象转成字节或其他序列形式在⽹络上传输。负责处理出站 OutboundHandler”数据;
不论解码器 handler 还是编码器 handler ,接收的消息类型必须与待处理的消息类型⼀致,否则该 handler 不会被执⾏。Netty提供了很多编解码器,例如:
StringEncoder字符串编码器
StringDecoder字符串解码器
ObjectEncoder 对象编码器
ObjectDecoder 对象解码器
FixedLengthFrameDecoder 固定⻓度的解码器
LineBasedFrameDecoder 以换⾏符为结束标识的解码器
DelimiterBasedFrameDecoder 指定消息分隔符的解码器
LengthFieldBasedFrameDecoder基于⻓度通⽤解码器
(二)解码器(Decoder)
解码器负责解码“⼊站”数据。从⼀种格式到另⼀种格式,解码器处理⼊站数据是抽象ChannelInboundHandler的实现。实践中使⽤解码器很简单,就是将⼊站数据转换格式后传递到ChannelPipeline中的下⼀个ChannelInboundHandler进⾏处理。
对于解码器,Netty中主要提供了抽象基类ByteToMessageDecoder和MessageToMessageDecoder
抽象解码器:
1) ByteToMessageDecoder: ⽤于将字节转为消息,需要检查缓冲区是否有⾜够的字节
2) ReplayingDecoder: 继承ByteToMessageDecoder,不需要检查缓冲区是否有⾜够的字节,但是ReplayingDecoder速度略慢于ByteToMessageDecoder,同时不是所有的ByteBuf都⽀持。选择:项⽬复杂性⾼则使⽤ReplayingDecoder,否则使⽤ ByteToMessageDecoder
3)MessageToMessageDecoder: ⽤于从⼀种消息解码为另外⼀种消息(例如POJO到POJO)
1、ByteToMessageDecoder
⽤于将接收到的⼆进制数据(Byte)解码,得到完整的请求报⽂(Message)。ByteToMessageDecoder是⼀种ChannelInboundHandler,可以称为解码器,负责将byte字节流(ByteBuf)转换成⼀种Message,Message是应⽤可以⾃⼰定义的⼀种java对象。
下⾯列出了ByteToMessageDecoder三个主要⽅法:
protected abstract void decode(ChannelHandlerContext var1, ByteBuf var2, List<Object> var3) throws Exception; final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { ...... this.decode(ctx, in, out); ...... } protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { if (in.isReadable()) { this.decodeRemovalReentryProtection(ctx, in, out); } }
通过源码可以看到,decodeLast方法调用的额是decodeRemovalReentryProtection方法,而该方法又最终调用了decode方法,也就是说最终的实现逻辑是在decode方法中。同时decode方法也是该抽象类中唯一一个需要自己实现的方法。
decodeLast方法参数的作⽤如下:
BytuBuf in:需要解码的⼆进制数据。
List out:解码后的有效报⽂列表,我们需要将解码后的报⽂添加到这个List中。之所以使⽤⼀个List表示,是因为考虑到粘包问题,因此⼊参的in中可能包含多个有效报⽂。
当然,也有可能发⽣了拆包,in中包含的数据还不⾜以构成⼀个有效报⽂,此时不往List中添加元素即可。另外特别要注意的是,在解码时,不能直接调⽤ByteBuf的readXXX⽅法来读取数据,⽽是应该⾸先要判断能否构成⼀个有效的报⽂。
案例,假设协议规定传输的数据都是int类型的整数,因为int类型占用四个字节,因此我们需要判断当需要转码的二进制数据大于等于4个字节时,才进行解码操作。
上图中显式输⼊的ByteBuf中包含4个字节,每个字节的值分别为:1,2,3,4。我们⾃定义⼀个ToIntegerDecoder进⾏解码,尽管这⾥我看到了4个字节刚好可以构成⼀个int类型整数,但是在真正解码之前,我们并不知道ByteBuf包含的字节数能否构成完成的有效报⽂,因此需要⾸先判断ByteBuf中剩余可读的字节,是否⼤于等于4,如下:
public class ToIntegerDecoder extends ByteToMessageDecoder { @Override public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { if (in.readableBytes() >= 4) { out.add(in.readInt()); } } }
只有在可读字节数>=4的情况下,我们才进⾏解码,即读取⼀个int,并添加到List中。在可读字节数⼩于4的情况下,我们并没有做任何处理,假设剩余可读字节数为3,不⾜以构成1个int。那么⽗类ByteToMessageDecoder发现这次解码List中的元素没有变化,则会对in中的剩余3个字节进⾏缓存,等待下1个字节的到来,之后再回到调⽤ToIntegerDecoder的decode⽅法。
另外需要注意: 在ToIntegerDecoder的decode⽅法中,每次最多只读取⼀个1个int。如果ByteBuf中的字节数很多,例如为16,那么可以构成4个int,⽽这⾥只读取了1个int,那么剩余12字节怎么办?ByteToMessageDecoder再每次回调⼦类的decode⽅法之后,都会判断输⼊的ByteBuf中是否还有剩余字节可读,如果还有,会再次回调⼦类的decode⽅法,直到某个回调decode⽅法List中的元素个数没有变化时才停⽌,元素个数没有变化,实际上意味着⼦类已经没有办法从剩余的字节中读取⼀个有效报⽂。由于存在剩余可读字节时,ByteToMessageDecoder会⾃动再次回调⼦类decode⽅法,在实现ByteToMessageDecoder时,decode⽅法每次只解析⼀个有效报⽂即可,没有必要⼀次全部解析出来。
ByteToMessageDecoder提供的⼀些常⻅的实现类:
FixedLengthFrameDecoder:定⻓协议解码器,我们可以指定固定的字节数算⼀个完整的报⽂
LineBasedFrameDecoder: 换⾏分隔符解码器,遇到\n或者\r\n,则认为是⼀个完整的报⽂
DelimiterBasedFrameDecoder: 分隔符解码器,与LineBasedFrameDecoder类似,只不过分隔符可以⾃⼰指定
LengthFieldBasedFrameDecoder:⻓度编码解码器,将报⽂划分为报⽂头/报⽂体,根据报⽂头中的Length字段确定报⽂体的⻓度,因此报⽂提的⻓度是可变的
JsonObjectDecoder:json格式解码器,当检测到匹配数量的"{" 、”}”或”[””]”时,则认为是⼀个完整的json对象或者json数组。
这些实现类,都只是将接收到的⼆进制数据,解码成包含完整报⽂信息的ByteBuf实例后,就直接交给了之后的ChannelInboundHandler处理。
2、ReplayingDecoder
ReplayingDecoder是byteToMessage的子类,他与byteToMessage最大的不同就是不需要检查缓冲区是否有足够的的字节;若ByteBuf中有⾜够的字节,则会正常读取;若没有⾜够的字节则会停⽌解码。
ReplayingDecoder 使⽤⽅便,但它也有⼀些局限性:
1) 不是所有的操作都被ByteBuf⽀持,如果调⽤⼀个不⽀持的操作会抛出DecoderException。
2) ByteBuf.readableBytes()⼤部分时间不会返回期望值
3)ReplayingDecoder 在某些情况下可能稍慢于 ByteToMessageDecoder,例如⽹络缓慢并且消息格式复杂时,消息会被拆成了多个碎⽚,速度变慢
在满⾜需求的情况下推荐使⽤ByteToMessageDecoder,因为它的处理⽐较简单,没有ReplayingDecoder实现的那么复杂。ReplayingDecoder继承于ByteToMessageDecoder,所以他们提供的接⼝是相同的。下⾯代码是ReplayingDecoder的实现:
/** * Integer解码器,ReplayingDecoder实现 */ public class ToIntegerReplayingDecoder extends ReplayingDecoder<Void> { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { System.out.println("ToIntegerReplayingDecoder 被调⽤"); //在 ReplayingDecoder 不需要判断数据是否⾜够读取,内部会进⾏处理判断 out.add(in.readInt()); } }
3、MessageToMessageDecoder
ByteToMessageDecoder是将⼆进制流进⾏解码后,得到有效报⽂。⽽MessageToMessageDecoder则是将⼀个本身就包含完整报⽂信息的对象转换成另⼀个java对象。前⾯介绍了ByteToMessageDecoder的部分⼦类解码后,会直接将包含了报⽂完整信息的ByteBuf实例交由之后的ChannelInboundHandler处理,此时,你可以在ChannelPipeline中,再添加⼀个MessageToMessageDecoder,将ByteBuf中的信息解析后封装到Java对象中,简化之后的ChannelInboundHandler的操作。另外,在⼀些场景下,有可能你的报⽂信息已经封装到了Java对象中,但是还要继续转成另外的Java对象,因此⼀个MessageToMessageDecoder后⾯可能还跟着另⼀个MessageToMessageDecoder。⼀个⽐较容易的理解的类⽐案例是java Web编程,通常客户端浏览器发送过来的⼆进制数据,已经被web容器(如tomcat)解析成了⼀个HttpServletRequest对象,但是我们还是需要将HttpServletRequest中的数据提取出来,封装成我们⾃⼰的POJO类,也就是从⼀个java对象(HttpServletRequest)转换成另⼀个Java对象(我们的POJO类)。
MessageToMessageDecoder的类声明如下:
/** * 其中泛型参数I表示我们要解码的消息类型。例前⾯,我们在ToIntegerDecoder中,把⼆进制字节 流转换成了⼀个int类型的整数。 */ public abstract class MessageToMessageDecoder<I> extends ChannelInboundHandlerAdapter
类似的,MessageToMessageDecoder也有⼀个decode⽅法需要覆盖 ,如下:
/** * 参数msg,需要进⾏解码的参数。例如ByteToMessageDecoder解码后的得到的包含完整报⽂信息 ByteBuf * List<Object> out参数:将msg经过解析后得到的java对象,添加到放到List<Object> out中 */ protected abstract void decode(ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception;
例如,现在我们想编写⼀个IntegerToStringDecoder,把前⾯编写的ToIntegerDecoder输出的int参数转换成字符串,此时泛型I就应该是Integer类型。
integerToStringDecoder源码如下所示:
public class IntegerToStringDecoder extends MessageToMessageDecoder<Integer> { @Override public void decode(ChannelHandlerContext ctx, Integer msg, List<Object> out) throws Exception { out.add(String.valueOf(msg)); } }
此时我们应该按照如下顺序组织ChannelPipieline中ToIntegerDecoder和IntegerToStringDecoder 的关系:也就是说,前⼀个ChannelInboudHandler输出的参数类型,就是后⼀个ChannelInboudHandler的输⼊类型。特别注意,如果我们指定MessageToMessageDecoder的泛型参数为ByteBuf,表示其可以直接针对ByteBuf进⾏解码,那么其是否能替代ByteToMessageDecoder呢?
答案是不可以的。因为ByteToMessageDecoder除了进⾏解码,还要会对不⾜以构成⼀个完整数据的报⽂拆包数据(拆包)进⾏缓存。⽽MessageToMessageDecoder则没有这样的逻辑。因此通常的使⽤建议是,使⽤⼀个ByteToMessageDecoder进⾏粘包、拆包处理,得到完整的有效报⽂的ByteBuf实例,然后交由之后的⼀个或者多个MessageToMessageDecoder对ByteBuf实例中的数据进⾏解析,转换成POJO类。
4、⾃定义解码器
通过继承ByteToMessageDecoder⾃定义解码器在解码器进⾏数据解码时,需判断缓存区(ByteBuf)的数据是否⾜够,否则收到结果与期望结果可能不⼀致
/** * todo ⾃定义解码器 */ public class ByteToLongDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { System.out.println("ByteToLongDecoder decode 被调⽤"); //todo 因为long占8个字节, 需要判断⼤于等于8个字节时才能读取⼀个long if(in.readableBytes() >= 8) { out.add(in.readLong()); } } }
(三)编码器(Encoder)
Netty提供了对应的编码器实现MessageToByteEncoder和MessageToMessageEncoder,⼆者都实现ChannelOutboundHandler接⼝。
相对来说,编码器⽐解码器的实现要更加简单,原因在于解码器除了要按照协议解析数据,还要要处理粘包、拆包问题;⽽编码器只要将数据转换成协议规定的⼆进制格式发送即可。
1、抽象类MessageToByteEncoder
MessageToByteEncoder也是⼀个泛型类,泛型参数I表示将需要编码的对象的类型,编码的结果是将信息转换成⼆进制流放⼊ByteBuf中。⼦类通过覆写其抽象⽅法encode来实现编码,如下所示:
public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter { .... protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out) throws Exception; }可以看到,MessageToByteEncoder的输出对象out是⼀个ByteBuf实例,我们应该将泛型参数msg包含的信息写⼊到这个out对象中。
public class IntegerToByteEncoder extends MessageToByteEncoder<Integer> { @Override protected void encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out) throws Exception { out.writeInt(msg);//将Integer转成⼆进制字节流写⼊ByteBuf中 } }
2、抽象类MessageToMessageEncoder
MessageToMessageEncoder同样是⼀个泛型类,泛型参数I表示将需要编码的对象的类型,编码的结果是将信息放到⼀个List中。⼦类通过覆写其抽象⽅法encode,来实现编码,如下所示:
public abstract class MessageToMessageEncoder<I> extends ChannelOutboundHandlerAdapter { ... protected abstract void encode(ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception; ... }
与MessageToByteEncoder不同的,MessageToMessageEncoder编码后的结果放到的out参数类型是⼀个List中。例如,你⼀次发送2个报⽂,因此msg参数中实际上包含了2个报⽂,因此应该解码出两个报⽂对象放到List中。
MessageToMessageEncoder提供的常⻅⼦类包括:
LineEncoder:按⾏编码,给定⼀个CharSequence(如String),在其之后添加换⾏符\n或者\r\n,并封装到ByteBuf进⾏输出,与LineBasedFrameDecoder相对应。
Base64Encoder:给定⼀个ByteBuf,得到对其包含的⼆进制数据进⾏Base64编码后的新的ByteBuf进⾏输出,与Base64Decoder相对应。
LengthFieldPrepender:给定⼀个ByteBuf,为其添加报⽂头Length字段,得到⼀个新的ByteBuf进⾏输出。Length字段表示报⽂⻓度,与LengthFieldBasedFrameDecoder相对应。
StringEncoder:给定⼀个CharSequence(如:StringBuilder、StringBuffer、String等),将其转换成ByteBuf进⾏输出,与StringDecoder对应。
这些MessageToMessageEncoder实现类最终输出的都是ByteBuf,因为最终在⽹络上传输的都要是⼆进制数据。
3、⾃定义编码器
通过继承MessageToByteEncoder⾃定义编码器
public class LongToByteEncoder extends MessageToByteEncoder<Long> { @Override protected void encode(ChannelHandlerContext ctx, Long msg, ByteBuf out) throws Exception { System.out.println("LongToByteEncoder encode被调⽤"); System.out.println("msg=" + msg); out.writeLong(msg); } }
(四)编码解码器(Codec)
编码解码器同时具有编码与解码功能,特点是同时实现了ChannelInboundHandler和ChannelOutboundHandler接⼝,因此在数据输⼊和输出时都能进⾏处理。
Netty提供提供了⼀个ChannelDuplexHandler适配器类,编码解码器的抽象基类ByteToMessageCodec 、MessageToMessageCodec都继承与此类。
ByteToMessageCodec内部维护了⼀个ByteToMessageDecoder和⼀个MessageToByteEncoder实例,可以认为是⼆者的功集合,泛型参数I是接受的编码类型:
public abstract class ByteToMessageCodec<I> extends ChannelDuplexHandler { private final TypeParameterMatcher outboundMsgMatcher; private final MessageToByteEncoder<I> encoder; private final ByteToMessageDecoder decoder = new ByteToMessageDecoder(){…} ... protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out) throws Exception; protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception; ... }MessageToMessageCodec内部维护了⼀个MessageToMessageDecoder和⼀个MessageToMessageEncoder实例,泛型参数INBOUND_IN和OUTBOUND_IN分别表示需要解码和编码的数据类型。
public abstract class MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN> extends ChannelDuplexHandler { private final MessageToMessageEncoder<Object> encoder= ... private final MessageToMessageDecoder<Object> decoder =… ... protected abstract void encode(ChannelHandlerContext ctx, OUTBOUND_IN msg, List<Object> out) throws Exception; protected abstract void decode(ChannelHandlerContext ctx, INBOUND_IN msg, List<Object> out) throws Exception; }
其他编解码⽅式:
使⽤编解码器来充当编码器和解码器的组合失去了单独使⽤编码器或解码器的灵活性,编解码器是要么都有要么都没有。你可能想知道是否有解决这个僵化问题的⽅式,还可以让编码器和解码器在ChannelPipeline中作为⼀个逻辑单元。幸运的是,Netty提供了⼀种解决⽅案,使⽤CombinedChannelDuplexHandler。如何使⽤CombinedChannelDuplexHandler来结合解码器和编码器呢?下⾯我们从两个简单的例⼦看了解。
/** * 解码器,将byte转成char */ public class ByteToCharDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { while(in.readableBytes() >= 2){ out.add(Character.valueOf(in.readChar())); } }
/** * 编码器,将char转成byte */ public class CharToByteEncoder extends MessageToByteEncoder<Character> { @Override protected void encode(ChannelHandlerContext ctx, Character msg, ByteBuf out) throws Exception { out.writeChar(msg); } }
/** * 继承CombinedChannelDuplexHandler,⽤于绑定解码器和编码器 */ public class CharCodec extends CombinedChannelDuplexHandler<ByteToCharDecoder, CharToByteEncoder> { public CharCodec(){ super(new ByteToCharDecoder(), new CharToByteEncoder()); } }
从上⾯代码可以看出,使⽤CombinedChannelDuplexHandler绑定解码器和编码器很容易实现,⽐使⽤Codec更灵活。
二、TCP粘包和拆包
(一)TCP粘包和拆包基本介绍
TCP是个流协议,所谓流,就是没有界限的⼀串数据。TCP底层并不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进⾏包的划分,所以在业务上认为,⼀个完整的包可能会被TCP拆分成多个包进⾏发送,也有可能把多个⼩的包封装成⼀个⼤的数据包发送,这就是所谓的TCP粘包和拆包问题。
TCP粘包:把多个⼩的包封装成⼀个⼤的数据包发送,发送⽅发送的若⼲数据包到接收⽅时粘成⼀个包
TCP拆包:把⼀个完整的包拆分为多个⼩包进⾏发送,发送⽅发送⼀个数据包到接收⽅时被拆分为若⼲个⼩包
如上图所示,就是粘包拆包的各个场景演示。假设客户端分别发送了两个数据包 D1 和 D2 给服务端,由于服务端⼀次读取到字节数是不确定的,故可能存在以下四种情况:
1. 服务端分两次读取到了两个独⽴的数据包,分别是 D1 和 D2 ,没有粘包和拆包
2. 服务端⼀次接受到了两个数据包, D1 和 D2 粘合在⼀起,称之为 TCP 粘包
3. 服务端分两次读取到了数据包,第⼀次读取到了完整的 D1 包和 D2 包的部分内容,第⼆次读取到了 D2 包的剩余内容,这称之为 TCP 拆包
4. 服务端分两次读取到了数据包,第⼀次读取到了 D1 包的部分内容 D1_1 ,第⼆次读取到了 D1包的剩余部分内容 D1_2 和完整的 D2 包。
(二)TCP粘包和拆包产生原因
发⽣TCP粘包、拆包主要是由于下⾯⼀些原因:
应⽤程序写⼊的数据⼤于缓冲区⼤⼩,这将会发⽣拆包。
应⽤程序写⼊数据⼩于字缓冲区⼤⼩,⽹卡将应⽤多次写⼊的数据发送到⽹络上,这将会发⽣粘包。
进⾏MSS(最⼤报⽂⻓度)⼤⼩的TCP分段,当TCP报⽂⻓度-TCP头部⻓度>MSS的时候将发⽣拆包。
接收⽅法不及时读取套接字缓冲区数据,这将发⽣粘包。
MSS: 是Maximum Segement Size缩写,表示TCP报⽂中data部分的最⼤⻓度,是TCP协议在OSI五层⽹络模型中传输层对⼀次可以发送的最⼤数据的限制。
MTU: 最⼤传输单元,是Maxitum Transmission Unit的简写,是OSI五层⽹络模型中链路层(datalink layer)对⼀次可以发送的最⼤数据的限制。
当需要传输的数据⼤于MSS或者MTU时,数据会被拆分成多个包进⾏传输。由于MSS是根据MTU计算出来的,因此当发送的数据满⾜MSS时,必然满⾜MTU。
发送端的字节流都会先传⼊缓冲区,再通过⽹络传⼊到接收端的缓冲区中,最终由接收端获取。当我们发送两个完整包到接收端的时候,正常情况会接收到两个完整的报⽂。但也有可能接收到的是⼀个报⽂,它是由发送的两个报⽂组成的,这样对于应⽤程序来说就很难处理了(这样称为粘包);还有可能出现上⾯这样的虽然收到了两个包,但是⾥⾯的内容却是互相包含,对于应⽤来说依然⽆法解析(拆包)。
(三)解决⽅案
拆包解决思路:
基本思路就是不断的从TCP缓冲区中读取数据,每次读取完都需要判断是否是⼀个完整的数据包。
若当前读取的数据不⾜以拼接成⼀个完整的业务数据包,那就保留该数据,继续从tcp缓冲区中读取,直到得到⼀个完整的数据包
若当前读到的数据加上已经读取的数据⾜够拼接成⼀个数据包,那就将已经读取的数据拼接上本次读取的数据,构成⼀个完整的业务数据包传递到业务逻辑,多余的数据仍然保留,以便和下次读到的数据尝试拼接
关键点是如何判断是⼀个完整的数据包,解决策略:
(1)设置消息边界(分隔符,对应Netty提供的LineBasedFrameDecoder、DelimiterBasedFrameDecoder解码器),例如换行符或者自定义的分隔符。
(2)设置定⻓消息(对应Netty提供的FixedLengthFrameDecoder解码器),例如对Integer指定4个字节。
(3)使⽤带消息头的协议,消息头存储消息开始标识及消息的⻓度信息Header+Body(对应Netty提供的LengthFieldBasedFrameDecoder解码器),例如http形式的处理,使用报文头中的长度截取报文体。
(4)发送消息⻓度,⾃定义消息解码器
1、LineBasedFrameDecoder
LineBasedFrameDecoder是回⻋换⾏解码器,如果⽤户发送的消息以回⻋换⾏符作为消息结束的标识,则可以直接使⽤Netty的LineBasedFrameDecoder对消息进⾏解码,只需要在初始化Netty服务端或者客户端时将LineBasedFrameDecoder正确的添加到ChannelPipeline中即可,不需要⾃⼰重新实现⼀套换⾏解码器。
LineBasedFrameDecoder的⼯作原理是它依次遍历ByteBuf中的可读字节,判断是否有“\n”或“\r\n”,如果有,就以此位置为结束位置,从可读索引到结束位置区间的字节就组成了⼀⾏。它是以换⾏符为结束标志的解码器,⽀持携带结束符或不携带结束符两种解码⽅式,同时⽀持配置单⾏的最⼤⻓度。如果连接读取到最⼤⻓度后仍然没有发现换⾏符,就会抛出异常,同时忽略掉之前读到的异常码流。防⽌由于数据报没有携带换⾏符导致接收到 ByteBuf ⽆限制积压,引起系统内存溢出。
通常LineBasedFrameDecoder会和StringDecoder搭配使⽤。StringDecoder的功能⾮常简单,就是将接收到的对象转换成字符串,然后继续调⽤后⾯的Handler。LineBasedFrameDecoder+StringDecoder组合就是按⾏切换的⽂本解码器,⽤来⽀持TCP的粘包和拆包。
对于⽂本类协议的解析,⽂本换⾏解码器⾮常实⽤,例如对 HTTP 消息头的解析、FTP 协议消息的解析等。
LineBasedFrameDecoder使⽤起来⼗分简单,只需要在ChannelPipeline 中添加即可,如下所示
ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new LineBasedFrameDecoder(1024)); p.addLast(new StringDecoder()); p.addLast(new StringEncoder()); p.addLast(new LineServerHandler()); } });
2、DelimiterBasedFrameDecoder
DelimiterBasedFrameDecoder是分隔符解码器,⽤户可以指定消息结束的分隔符,它可以⾃动完成以分隔符作为码流结束标识的消息的解码。回⻋换⾏解码器实际上是⼀种特殊的DelimiterBasedFrameDecoder解码器。
⾸先将分隔符转换成 ByteBuf 对象,作为参数构造 DelimiterBasedFrameDecoder,将其添加到ChannelPipeline 中,然后依次添加字符串解码器(通常⽤于⽂本解码)和⽤户 Handler。
DelimiterBasedFrameDecoder 原理分析:解码时,判断当前已经读取的 ByteBuf 中是否包含分隔符 ByteBuf,如果包含,则截取对应的 ByteBuf 返回。
示例:发消息时自定义使用&_作为分隔符,解码时也使用其作为分隔符。
// 客户端发送数据 public class ClientHandler extends ChannelInboundHandlerAdapter { /** * 当客户端连接服务器完成就会触发该⽅法 * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { String message = "aaaaaaaaaaaaaaaa&_bbbbbbbbbbbbbbbbbb&_ccccccccccc&_"; ByteBuf byteBuf = Unpooled.buffer(message.getBytes().length); byteBuf.writeBytes(message.getBytes()); ctx.writeAndFlush(byteBuf); } }
// 服务端添加分隔符解码器 public class EchoServer { public static void main(String[] args) { // 设置两个线程组 serverBootstrap.group(bossGroup,workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG,1024) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { // 向pipeline加⼊分隔符解码器 ByteBuf delimiter = Unpooled.copiedBuffer("&_".getBytes()); ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,true,delimiter)); ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new ServerHandler()); } }); } }
3、FixedLengthFrameDecoder
FixedLengthFrameDecoder是固定⻓度解码器,它能够按照指定的⻓度对消息进⾏⾃动解码,开发者不需要考虑TCP的粘包/拆包等问题,⾮常实⽤。对于定⻓消息,如果消息实际⻓度⼩于定⻓,则往往会进⾏补位操作,它在⼀定程度上导致了空间和资源的浪费。但是它的优点也是⾮常明显的,编解码⽐较简单。
ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO))//配置⽇志输出 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new FixedLengthFrameDecoder(10)); ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new StringEncoder()); ch.pipeline().addLast(new ServerHandler()); } });
利⽤ FixedLengthFrameDecoder 解码器,⽆论⼀次接收到多少数据报,它都会按照构造函数中设置的固定⻓度进⾏解码,如果是半包消息,FixedLengthFrameDecoder 会缓存半包消息并等待下个包到达后进⾏拼包,直到读取到⼀个完整的包。
4、LengthFieldBasedFrameDecoder
⼤多数的协议(私有或者公有),协议头中会携带⻓度字段,⽤于标识消息体或者整包消息的⻓度,例如SMPP、HTTP协议等。由于基于⻓度解码需求的通⽤性,以及为了降低⽤户的协议开发难度,Netty提供了LengthFieldBasedFrameDecoder,⾃动屏蔽TCP底层的拆包和粘包问题,只需要传⼊正确的参数,即可轻松解决“读半包“问题。
源码的构造函数:
public LengthFieldBasedFrameDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip) { this(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip, true); }
参数解释:
* <pre> * lengthFieldOffset = 0 * lengthFieldLength = 2 * lengthAdjustment = 0 * <b>initialBytesToStrip</b> = <b>2</b> (= the length of the Length field) * BEFORE DECODE (14 bytes) AFTER DECODE (12 bytes) * +--------+----------------+ +----------------+ * | Length | Actual Content |----->| Actual Content | * | 0x000C | "HELLO, WORLD" | | "HELLO, WORLD" | * +--------+----------------+ +----------------+ * </pre> lengthFieldOffset = 0,⻓度字段偏移位置为0表示从包的第⼀个字节开始读取; lengthFieldLength = 2,⻓度字段⻓为2,从包的开始位置往后2个字节的⻓度为⻓度字段; lengthAdjustment = 0 ,解析的时候⽆需跳过任何⻓度; initialBytesToStrip = 2,去掉当前数据包的开头2字节,去掉 header。 0x000C 转为 int = 12。
5、⾃定义消息解码器
//协议包 public class MessageProtocol { private int len; //关键 private byte[] content; public int getLen() { return len; } public void setLen(int len) { this.len = len; } public byte[] getContent() { return content; } public void setContent(byte[] content) { this.content = content; } }
服务端示例
public class MyProtocolServer { private int port; public MyProtocolServer(int port) { this.port = port; } public void start(){ EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workGroup = new NioEventLoopGroup(); ServerBootstrap server = new ServerBootstrap() .group(bossGroup,workGroup) .channel(NioServerSocketChannel.class) .childHandler(new ServerChannelInitializer()); try { ChannelFuture future = server.bind(port).sync(); future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); }finally { bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } } public static void main(String[] args) { MyProtocolServer server = new MyProtocolServer(7788); server.start(); } }
客户端示例
public class MyProtocolClient { private int port; private String address; public MyProtocolClient(int port, String address) { this.port = port; this.address = address; } public void start(){ EventLoopGroup group = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ClientChannelInitializer()); try { ChannelFuture future = bootstrap.connect(address,port).sync(); future.channel().writeAndFlush("Hello world, i'm online"); future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); }finally { group.shutdownGracefully(); } } public static void main(String[] args) { MyProtocolClient client = new MyProtocolClient(7788,"127.0.0.1"); client.start(); } }