Netty - 粘包和半包(下)

上一篇介绍了粘包和半包及其通用的解决方案,今天重点来看一下 Netty 是如何实现封装成帧(Framing)方案的。

解码核心流程

之前介绍过三种解码器FixedLengthFrameDecoderDelimiterBasedFrameDecoderLengthFieldBasedFrameDecoder,它们都继承自ByteToMessageDecoder,而ByteToMessageDecoder继承自ChannelInboundHandlerAdapter,其核心方法为channelRead。因此,我们来看看ByteToMessageDecoderchannelRead方法:

    @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
CodecOutputList out = CodecOutputList.newInstance();
try {
// 将传入的消息转化为data
ByteBuf data = (ByteBuf) msg;
// 最终实现的目标是将数据全部放进cumulation中
first = cumulation == null;
// 第一笔数据直接放入
if (first) {
cumulation = data;
} else {
// 不是第一笔数据就进行追加
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
// 解码
callDecode(ctx, cumulation, out);
}
// 以下代码省略,因为不属于解码过程
}

再来看看callDecode方法:

    protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
try {
while (in.isReadable()) {
int outSize = out.size(); if (outSize > 0) {
// 以下代码省略,因为初始状态时,outSize 只可能是0,不可能进入这里
} int oldInputLength = in.readableBytes();
// 在进行 decode 时,不执行handler的remove操作。
// 只有当 decode 执行完之后,开始清理数据。
decodeRemovalReentryProtection(ctx, in, out); // 省略以下代码,因为后面的内容也不是解码的过程

再来看看decodeRemovalReentryProtection方法:

    final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
throws Exception {
// 设置当前状态为正在解码
decodeState = STATE_CALLING_CHILD_DECODE;
try {
// 解码
decode(ctx, in, out);
} finally {
// 执行hander的remove操作
boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
decodeState = STATE_INIT;
if (removePending) {
handlerRemoved(ctx);
}
}
} // 子类都重写了该方法,每种实现都会有自己特殊的解码方式
protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;

从上面的过程可以总结出,在解码之前,需要先将数据写入cumulation,当解码结束后,需要通过 handler 进行移除。

具体解码过程

刚刚说到decode方法在子类中都有实现,那针对我们说的三种解码方式,一一看其实现。

FixedLengthFrameDecoder

其源码为:

    @Override
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
Object decoded = decode(ctx, in);
if (decoded != null) {
out.add(decoded);
}
} protected Object decode(
@SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception {
// 收集到的数据是否小于固定长度,小于就代表无法解析
if (in.readableBytes() < frameLength) {
return null;
} else {
return in.readRetainedSlice(frameLength);
}
}

就和这个类的名字一样简单,就是固定长度进行解码,因此,在设置该解码器的时候,需要在构造方式里传入frameLength

DelimiterBasedFrameDecoder

其源码为:

    @Override
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
Object decoded = decode(ctx, in);
if (decoded != null) {
out.add(decoded);
}
} protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
// 当前的分割符是否是换行分割符(\n或者\r\n)
if (lineBasedDecoder != null) {
return lineBasedDecoder.decode(ctx, buffer);
}
// Try all delimiters and choose the delimiter which yields the shortest frame.
int minFrameLength = Integer.MAX_VALUE;
ByteBuf minDelim = null;
// 其他分割符进行一次切分
for (ByteBuf delim: delimiters) {
int frameLength = indexOf(buffer, delim);
if (frameLength >= 0 && frameLength < minFrameLength) {
minFrameLength = frameLength;
minDelim = delim;
}
}
// 以下代码省略

根据它的名字可以知道,分隔符才是它的核心。它将分割符分成两类,只有换行分割符(\n或者\r\n)其他。因此,需要注意的是,你可以定义多种分割符,它都是支持的。

LengthFieldBasedFrameDecoder

该类比较复杂,如果直接看方法容易把自己看混乱,因此我准备结合类上的解释,先看看其私有变量。

2 bytes length field at offset 1 in the middle of 4 bytes header, strip the first header field and the length field, the length field represents the length of the whole message

Let's give another twist to the previous example. The only difference from the previous example is that the length field represents the length of the whole message instead of the message body, just like the third example. We have to count the length of HDR1 and Length into lengthAdjustment. Please note that we don't need to take the length of HDR2 into account because the length field already includes the whole header length.

 * BEFORE DECODE (16 bytes)                       AFTER DECODE (13 bytes)
* +------+--------+------+----------------+ +------+----------------+
* | HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content |
* | 0xCA | 0x0010 | 0xFE | "HELLO, WORLD" | | 0xFE | "HELLO, WORLD" |
* +------+--------+------+----------------+ +------+----------------+

lengthFieldOffset : 该字段代表 Length 字段是从第几个字节开始的。上面的例子里,Length 字段是从第1个字节开始(HDR1 是第0个字节),因此该值即为0。

lengthFieldLength : 该字段代表 Length 字段所占用的字节数。上面的例子里,Length 字段占用2个字节,因此该值为2。

lengthAdjustment : 该字段代表 Length 字段结束位置到真正的内容开始位置的距离。上面例子里,因为 Length 字段的含义是整个消息(包括 HDR1、Length、HDR2、Actual Content,一般 Length 指的只是 Actual Content),所以 Length 末尾到真正的内容开始位置(HDR1的开始处),相当于减少3个字节,所以是-3。

initialBytesToStrip : 展示时需要从 Length 字段末尾开始跳过几个字节。上面例子里,因为真正的内容是从 HDR1 开始的,最终展示的内容是从 HDR2 开始的,所以中间差了3个字节,所以该值是3。

该类的解码方法比较复杂,有兴趣的同学可以试着自己分析一下。

总结

这一篇主要是结合 Netty 里的源代码讲解了 Netty 中封装成帧(Framing)的三种方式,相信你一定有了不一样的理解。

有兴趣的话可以访问我的博客或者关注我的公众号、头条号,说不定会有意外的惊喜。

https://death00.github.io/

Netty - 粘包和半包(下)

Netty - 粘包和半包(下)

上一篇:RESTful架构概念


下一篇:HADOOP docker(六):hive简易使用指南