首先我们必须知道Tcp粘包和拆包的,TCP是个“流”协议,所谓流,就是没有界限的一串数据,TCP底层并不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际数据进行包的划分,一个完整的包可能会被拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包进行发送。这里引用Netty官网的User guide里面的图进行说明:
Dealing with a Stream-based Transport
那么一般情况下我们是如何解决这种问题的呢?我所知道的有这几种方案:
>1.消息定长
>2.在包尾增加一个标识,通过这个标志符进行分割
>3.将消息分为两部分,也就是消息头和消息尾,消息头中写入要发送数据的总长度,通常是在消息头的第一个字段使用int值来标识发送数据的长度。
这里以第三种方式为例,进行对象传输。Netty4本身自带了ObjectDecoder,ObjectEncoder来实现自定义对象的序列化,但是用的是java内置的序列化,由于java序列化的性能并不是很好,所以很多时候我们需要用其他序列化方式,常见的有Kryo,Jackson,fastjson,protobuf等。这里要写的其实用什么序列化不是重点,而是我们怎么设计我们的Decoder和Encoder。
首先我们写一个Encoder,我们继承自MessageToByteEncoder<T> ,把对象转换成byte,继承这个对象,会要求我们实现一个encode方法:
@Override protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception { byte[] body = convertToBytes(msg); //将对象转换为byte,伪代码,具体用什么进行序列化,你们自行选择。可以使用我上面说的一些 int dataLength = body.length; //读取消息的长度 out.writeInt(dataLength); //先将消息长度写入,也就是消息头 out.writeBytes(body); //消息体中包含我们要发送的数据 }
那么当我们在Decode的时候,该怎么处理发送过来的数据呢?这里我们继承ByteToMessageDecoder方法,继承这个对象,会要求我们实现一个decode方法
public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { if (in.readableBytes() < HEAD_LENGTH) { //这个HEAD_LENGTH是我们用于表示头长度的字节数。 由于上面我们传的是一个int类型的值,所以这里HEAD_LENGTH的值为4. return; } in.markReaderIndex(); //我们标记一下当前的readIndex的位置 int dataLength = in.readInt(); // 读取传送过来的消息的长度。ByteBuf 的readInt()方法会让他的readIndex增加4 if (dataLength < 0) { // 我们读到的消息体长度为0,这是不应该出现的情况,这里出现这情况,关闭连接。 ctx.close(); } if (in.readableBytes() < dataLength) { //读到的消息体长度如果小于我们传送过来的消息长度,则resetReaderIndex. 这个配合markReaderIndex使用的。把readIndex重置到mark的地方 in.resetReaderIndex(); return; } byte[] body = new byte[dataLength]; // 嗯,这时候,我们读到的长度,满足我们的要求了,把传送过来的数据,取出来吧~~ in.readBytes(body); // Object o = convertToObject(body); //将byte数据转化为我们需要的对象。伪代码,用什么序列化,自行选择 out.add(o); }
当然我们Netty也有自带的LengthFieldBasedFrameDecoder,但是在使用自定义序列化的时候,我觉得还是自己写比较方便一点,反正总不是要写代码。
我走过的坑:用读取ByteBuf的使用,一定要注意,其中的方法是否会增加readIndex,不然的话会造成无法正常读到我们想要的数据。