首先,我们通过一个DEMO来模拟TCP的拆包粘包的情况:客户端连续向服务端发送100个相同消息。服务端的代码如下:
AtomicLong count = new AtomicLong(0);
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap()
.group(boss, worker)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf byteBuf = (ByteBuf) msg;
long l = count.incrementAndGet();
System.out.println(l + ": " + byteBuf.toString(StandardCharsets.UTF_8));
}
});
}
});
serverBootstrap.bind(8080);
客户端代码如下:
NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap()
.group(nioEventLoopGroup)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(
new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for (int i = 0; i < 1000; i++) {
byte[] bytes = "欢迎关注我,微信公众号:元本一兀!".getBytes(StandardCharsets.UTF_8);
ByteBuf buffer = ctx.alloc().buffer();
buffer.writeBytes(bytes);
ctx.channel().writeAndFlush(buffer);
}
}
}
);
}
});
bootstrap.connect("localhost", 8080);
运行结果如下:
首先,我们发了1000个消息,但是在服务端有49行输出,同时,有些消息是合并在一起的,有些消息解析出了乱码。上面的输出中,包含三种情况:
- 正确的结果输出
- 多个消息拼在一起,也就是粘包情况;
- 另一种情况是半包,导致包不完成,解析出来的数据是乱码
为什么会粘包、拆包?
这个是因为Netty底层是走的TCP协议,说白了传输的是就是字节流,消息与消息之间是没有边界的。发生TCP粘包拆包的原因主要有:
- 当连续发送数据时,由于TCP协议的nagle算法,会将较小的内容拼接成较大的包一次性发送到服务器端,因而导致粘包;
- 当发送的内容较大时,由于服务器端的recv(buffer_size)方法中buffer_size较小,不能一次性读完所有数据,从而导致一个消息分拆成多次读取,产生非拆包的情况。
本质上来讲,TCP协议的包并不是按照业务消息来拆分的,TCP层并不感知发送的消息的大小。
解决粘包拆包的方法
解决粘包拆包的思路,其实就是在接收数据的时候,将字节流拆分成完整的包:
- 如果当前读到的数据不是一个完整的业务数据包,则继续从TCP缓冲区中读取数据,知道读到的数据中包含完整的数据包;
- 如果档次读取到的数据加上内存中已有的数据,包含一个完整的业务数据包,则将完整的业务包拼成一个包,并返回应用层处理;对于多余的数据,仍保留在内存中,待和后续加载的数据合并处理。
Netty中提供了一些拆包器,能够满足大部分的使用场景:
- FixedLengthFrameDecoder
- LineBasedFrameDecoder
- DelimiterBasedFrameDecoder
- LengthFieldBasedFrameDecoder
Netty中常用的拆包器
定长拆包器-FixedLengthFrameDecoder
如果你的业务消息格式很简单,是固定长度的,则使用该拆包器很方便。
比如上面的代码,发送的数据是固定的51个字节,我们在服务端的pipeline中加上定长拆包器:
AtomicLong count = new AtomicLong(0);
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap()
.group(boss, worker)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new FixedLengthFrameDecoder(51)).addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf byteBuf = (ByteBuf) msg;
long l = count.incrementAndGet();
System.out.println(l + ": " + byteBuf.toString(StandardCharsets.UTF_8));
}
});
}
});
serverBootstrap.bind(8080);
结果如下:
可以看到,服务端收到了1000个完整的独立的包。
行拆包器-LineBasedFrameDecoder
这个拆包器拆包的逻辑就是按行拆分,发送端每个数据之间用换行符作为分隔符,接收端通过也会按照换行符将字节流拆分成业务消息。
修改一下的上面的客户端,消息后追加一个\r\n
:
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for (int i = 0; i < 1000; i++) {
byte[] bytes = ("欢迎关注我,微信公众号:元本一兀!" + i+"\r\n").getBytes(StandardCharsets.UTF_8);
ByteBuf buffer = ctx.alloc().buffer();
buffer.writeBytes(bytes);
ctx.channel().writeAndFlush(buffer);
}
}
接收数据段,添加行拆包器:
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new LineBasedFrameDecoder(Integer.MAX_VALUE))
.addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf byteBuf = (ByteBuf) msg;
long l = count.incrementAndGet();
System.out.println(l + ": " + byteBuf.toString(StandardCharsets.UTF_8));
}
});
}
运行后,我们可以看到接收端能够接收到1000个完整的包。
基于分隔符的拆包器-DelimiterBasedFrameDecoder
DelimiterBasedFrameDecoder允许指定一个分隔符,在收到消息的时候按照指定的分隔符进行拆包。其实上面说的LineBasedFrameDecoder是一个特定的分隔符拆包器,它指定的是使用换行符作为分隔符,下面使用DelimiterBasedFrameDecoder来实现行拆包器:
ch.pipeline().addLast(
new DelimiterBasedFrameDecoder(Integer.MAX_VALUE, delimiterLine, delimiterSharp))
.addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf byteBuf = (ByteBuf) msg;
long l = count.incrementAndGet();
System.out.println(l + ": " + byteBuf.toString(StandardCharsets.UTF_8));
}
});
基于分隔符的拆包器允许设置多个分隔符,在设置多个分隔符的情况下,会将包拆分成最小的满足分隔符的包。
基于长度域的拆包器-LengthFieldBasedFrameDecoder
最后一种拆包器是通用性最强的一种拆包器,只要我们协议的中有一个固定的区域来表示数据长度,就可以方便的使用该拆包器。LengthFieldBaesdFrameDecoder有很多可配置的参数,用来应对各种情况的长度域。
长度域的offset为0
假设消息中长度域就在开头,这种情况下不需要考虑长度域之前是否有其他内容,配置LengthFieldBasedFrameDecoder很简单,设置offset为0,以及长度域的字节数。这里以长度域占2字节为例:
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 2)
一个解析的例子:
BEFORE DECODE (14 bytes) AFTER DECODE (14 bytes)
+--------+----------------+ +--------+----------------+
| Length | Actual Content |----->| Length | Actual Content |
| 0x000C | "HELLO, WORLD" | | 0x000C | "HELLO, WORLD" |
+--------+----------------+ +--------+----------------+
长度域offset为0,去掉协议头
上面的例子中,我们保留了协议头,虽然这里的协议头就有长度域。如果我们只想保留数据域,这里需要设置跳过的字节数为2字节:
new LengthFieldBasedFrameDecoder(
Integer.MAX_VALE, // maxFrameLength
0, // lengthFieldOffset
2, // lengthFieldLength
0, // lengthAdjustment
2) // initalBytesToStrip 跳过2字节,也就是跳过长度域
拆包示例:
* BEFORE DECODE (14 bytes) AFTER DECODE (12 bytes)
* +--------+----------------+ +----------------+
* | Length | Actual Content |----->| Actual Content |
* | 0x000C | "HELLO, WORLD" | | "HELLO, WORLD" |
* +--------+----------------+ +----------------+
长度域占2字节,offset为0,不跳过header,长度域表示所有消息的大小
在前面两个例子,长度的长度表示的是长度之后的数据长度。这里我们考虑长度里面设置的长度表示的是整个消息的大小,包括头部和数据部分。这种情况下我们需要制定lengthAdjustment
,数据部分的长度为 长度域里的长度 - lengthAdjustment
。
new LengthFieldBasedFrameDecoder(
Integer.MAX_VALE, // maxFrameLength
0, // lengthFieldOffset
2, // lengthFieldLength
2, // lengthAdjustment 长度 - lengthAdjustment为数据部分的长度
0) // initalBytesToStrip
拆包示例:
* BEFORE DECODE (14 bytes) AFTER DECODE (14 bytes)
* +--------+----------------+ +--------+----------------+
* | Length | Actual Content |----->| Length | Actual Content |
* | 0x000E | "HELLO, WORLD" | | 0x000E | "HELLO, WORLD" |
* +--------+----------------+ +--------+----------------+
HEADER中不只有长度域的情况
前面的case中,协议头部只有长度域,但是更多的情况中,HEADER中不只有长度域。比如下面这个例子,HEADER部分有三部分,HDR1,Length,HDR2三部分,分别占1个字节,2个字节,1个字节。
在长度域之前还有HDR1,要定位到长度域,需要指定长度域的offset(lengthFieldOffset=1)。
这里长度域存的是所有的数据长度,如果我们希望拆包的结果中包含HDR2+Data两部分,可以通过设置lengthAdjustment=-3,长度域之后的内容长度是HDR2+DATA。拆包的结果中,只想包含HDR2+DATA,所以整个消息跳过钱3个字节(HDR1+Lenght部分)。
* +------+--------+------+----------------+
* | HDR1 | Length | HDR2 | Data |
* | 0xCA | 0x000C | 0xFE | "HELLO, WORLD" |
* +------+--------+------+----------------+
代码如下:
new LengthFieldBasedFrameDecoder(
Integer.MAX_VALE, // maxFrameLength
3, // lengthFieldOffset
2, // lengthFieldLength
-3, // lengthAdjustment 长度 - lengthAdjustment为数据部分的长度
3) // initalBytesToStrip
解析结果:
* BEFORE DECODE (16 bytes) AFTER DECODE (13 bytes)
* +------+--------+------+----------------+ +------+----------------+
* | HDR1 | Length | HDR2 | Data |----->| HDR2 | Data |
* | 0xCA | 0x0010 | 0xFE | "HELLO, WORLD" | | 0xFE | "HELLO, WORLD" |
* +------+--------+------+----------------+ +------+----------------+