Netty编解码器源码分析(上)(详细分析在注释中说明)

文章目录


解码定义:解码是指将二进制数据流转换成一个个bytebuf.
首先分析解码器顶层抽象类
Netty编解码器源码分析(上)(详细分析在注释中说明)Netty编解码器源码分析(上)(详细分析在注释中说明)

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    // 可以看到是基于ByteBuf进行解码的
    if (msg instanceof ByteBuf) {
        // 把callldecode的解析的对象都放到out当中
        CodecOutputList out = CodecOutputList.newInstance();
        try {
            ByteBuf data = (ByteBuf) msg;
          // 说明是第一次从io流里面读取数据
            first = cumulation == null;
            if (first) {
                // 第一次把累加器赋值给刚读进来的对象
                cumulation = data;
            } else {
                //不是第一次则将读进来的数据与cumulator进行累加
                cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
            }
            // 调用子类的decode方法进行解析
            callDecode(ctx, cumulation, out);
        } catch (DecoderException e) {
            throw e;
        } catch (Throwable t) {
            throw new DecoderException(t);
        } finally {
            if (cumulation != null && !cumulation.isReadable()) {
                numReads = 0;
                cumulation.release();
                cumulation = null;
            } else if (++ numReads >= discardAfterReads) {
                // We did enough reads already try to discard some bytes so we not risk to see a OOME.
                // See https://github.com/netty/netty/issues/4275
                numReads = 0;
                discardSomeReadBytes();
            }
           // 将解析出的bytebuf向下传播
            int size = out.size();
            decodeWasNull = !out.insertSinceRecycled();
            
            fireChannelRead(ctx, out, size);
            //跟进 fireChannelRead
            /*
                static void fireChannelRead(ChannelHandlerContext ctx, List<Object> msgs, int numElements) {
        if (msgs instanceof CodecOutputList) {
            fireChannelRead(ctx, (CodecOutputList) msgs, numElements);
        } else {
        
        **msgs.get(i)-->Bytebuf **
            for (int i = 0; i < numElements; i++) {
                ctx.fireChannelRead(msgs.get(i));
            }
        }
    }          
            */
            // 回收list(可以看到和之前的entry一样,采用了对象池的机制)
            out.recycle();
        }
    } else {
        ctx.fireChannelRead(msg);
    }
}

上面提到了comulation,这里我们讲下

public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
    @Override
    public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
        ByteBuf buffer;
        if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
                || cumulation.refCnt() > 1) {
            // Expand cumulation (by replace it) when either there is not more room in the buffer
            // or if the refCnt is greater then 1 which may happen when the user use slice().retain() or
            // duplicate().retain().
            //
            // See:
            // - https://github.com/netty/netty/issues/2327
            // - https://github.com/netty/netty/issues/1764
            //  扩容
            buffer = expandCumulation(alloc, cumulation, in.readableBytes());
        } else {
            buffer = cumulation;
        }
        // 写入cumulation
        buffer.writeBytes(in);
        in.release();
        return buffer;
    }
};

进入到calldecode方法中进行分析:

protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
    try {
        while (in.isReadable()) {
            // 一直检测bytebuf是否有数据可读
            // 记录一下outsize的大小
            int outSize = out.size();
            // 如果outsize的大小>0,说明已经有解析出的对象,则将事件向下传播
            if (outSize > 0) {
                fireChannelRead(ctx, out, outSize);
                // 清空out
                out.clear();

                // Check if this handler was removed before continuing with decoding.
                // If it was removed, it is not safe to continue to operate on the buffer.
                //
                // See:
                // - https://github.com/netty/netty/issues/4635
                if (ctx.isRemoved()) {
                    break;
                }
                outSize = 0;
            }
            // 在decode之前记录一下可读长度
            int oldInputLength = in.readableBytes();
            // 进行解码
            decode(ctx, in, out);

            // Check if this handler was removed before continuing the loop.
            // If it was removed, it is not safe to continue to operate on the buffer.
            //
            // See https://github.com/netty/netty/issues/1664
            if (ctx.isRemoved()) {
                break;
            }
			// 说明没有解析出对象
            if (outSize == out.size()) {
                // 说明本次读没有读取数据,当前累加的数据没有拼成一个完整的数据包
                if (oldInputLength == in.readableBytes()) {
                    break;
                } else {
                    // 说明读到了数据,则有可能触发解析,所以continue,进行下次循环
                    continue;
                }
            }
			// 走到这里说明解析出对象,但是没有从cumulation中读取数据,则报错
            if (oldInputLength == in.readableBytes()) {
                throw new DecoderException(
                        StringUtil.simpleClassName(getClass()) +
                        ".decode() did not read anything but decoded a message.");
            }

            if (isSingleDecode()) {
                break;
            }
            ........

}

行解码器:以/r/n 或者/n结尾的字节流

public class LineBasedFrameDecoder extends ByteToMessageDecoder {

    /** Maximum length of a frame we're willing to decode.  */
    // 数据包最大长度
    private final int maxLength;
    /** Whether or not to throw an exception as soon as we exceed maxLength. */
    //超过最大长度的时候是否立即抛出异常,如果为true,则抛出
    private final boolean failFast;
    // 最终解析出是否带换行符:true:不带换行符
    private final boolean stripDelimiter;

    /** True if we're discarding input because we're already over maxLength.  */
    // 丢弃模式
    private boolean discarding;
    // 解码到现在已经丢弃了多少字节
    private int discardedBytes;
 @Override
    protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        Object decoded = decode(ctx, in);
        if (decoded != null) {
            out.add(decoded);
        }
    }

接下来我们就进入decode方法进行分析

protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
    // 找到这一行的结尾位置 
    final int eol = findEndOfLine(buffer);
    //判断是否是丢失模式,第一次为false
    if (!discarding) {
        if (eol >= 0) {
            final ByteBuf frame;
            //计算换行符到可读字节之间的长度
            final int length = eol - buffer.readerIndex();
            // 拿到分隔符长度
            final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;
            // 超过最大长度
            if (length > maxLength) {
                //将readIndex移到换行符之后的位置
                buffer.readerIndex(eol + delimLength);
                // 传播异常
                fail(ctx, length);
                // 返回null 什么也没解析
                return null;
            }
            // 判断是否要把分隔符也算在完整数据包下。
            if (stripDelimiter) {
                // 不包含分隔符
                frame = buffer.readRetainedSlice(length);
                buffer.skipBytes(delimLength);
            } else {
                // 不包含分隔符
                frame = buffer.readRetainedSlice(length + delimLength);
            }

            return frame;
        } else {
            // 非丢弃模式下没有找到换行符
            final int length = buffer.readableBytes();
            if (length > maxLength) {
                // 超过最大长度
                // 将length长度丢弃
                discardedBytes = length;
                // 将读指针移动到写指针
                buffer.readerIndex(buffer.writerIndex());
                // 标记丢弃模式
                discarding = true;
                // 传播异常
                if (failFast) {
                    fail(ctx, "over " + discardedBytes);
                }
            }
            // 啥也没解析到
            return null;
        }
    } 
    // 进入丢弃模式
    else {
        // 找到endofline
        if (eol >= 0) {
            // 前面已经丢弃过的+这次要丢弃的
            final int length = discardedBytes + eol - buffer.readerIndex();
            final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;
            // 将读指针移到换行符之后第一个有效数据位置
            buffer.readerIndex(eol + delimLength);
            // 标记没有丢弃的数据了
            discardedBytes = 0;
            // 标记discarding为非丢弃
            discarding = false;
            if (!failFast) {
                fail(ctx, length);
            }
        } else {
            //没有找到endline,全部丢失,读指针与写指针置为相同
            discardedBytes += buffer.readableBytes();
            buffer.readerIndex(buffer.writerIndex());
        }
        return null;
    }
}

基于分隔符解码器分析

// 分隔符可以变化
public DelimiterBasedFrameDecoder(int maxFrameLength, ByteBuf... delimiters) {
    this(maxFrameLength, true, delimiters);
}
@Override
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
    Object decoded = decode(ctx, in);
    if (decoded != null) {
        // 由父类向下传播
        out.add(decoded);
    }
}
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
    // 受限判断是不是lineBasedDecoder,当指定delimiter为/r/n /n 时,在创建 
    //  DelimiterBasedFrameDecoder的时候就会创建lineBasedDecoder
    if (lineBasedDecoder != null) {
        return lineBasedDecoder.decode(ctx, buffer);
    }
    // Try all delimiters and choose the delimiter which yields the shortest frame.
    int minFrameLength = Integer.MAX_VALUE;
    ByteBuf minDelim = null;
    // 遍历分隔符找到其中一个分隔符划分最小数据包的长度,将此分隔符置为 minDelim,将 minFrameLength
    // 置为该分割符划分的长度
    for (ByteBuf delim: delimiters) {
        int frameLength = indexOf(buffer, delim);
        if (frameLength >= 0 && frameLength < minFrameLength) {
            minFrameLength = frameLength;
            minDelim = delim;
        }
    }
    // 找到分割符不为null
    if (minDelim != null) {
        int minDelimLength = minDelim.capacity();
        ByteBuf frame;
        // 判断是否处于丢弃模式,第一次不会处于丢弃模式,
        if (discardingTooLongFrame) {
            // 处于丢弃模式下
            // We've just finished discarding a very large frame.
            // Go back to the initial state.
            // 标记为非丢弃模式
            discardingTooLongFrame = false;
            //跳过这段数据包
            buffer.skipBytes(minFrameLength + minDelimLength);
			//this.tooLongFrameLength:丢弃的字节数
            int tooLongFrameLength = this.tooLongFrameLength;
            this.tooLongFrameLength = 0;
            if (!failFast) {
                fail(tooLongFrameLength);
            }
            return null;
        }
        // 超过规定最大数据包长度
        if (minFrameLength > maxFrameLength) {
            // Discard read frame.
            buffer.skipBytes(minFrameLength + minDelimLength);
            fail(minFrameLength);
            return null;
        }
        // 是否包含分隔符
        if (stripDelimiter) {
            frame = buffer.readRetainedSlice(minFrameLength);
            buffer.skipBytes(minDelimLength);
        } else {
            frame = buffer.readRetainedSlice(minFrameLength + minDelimLength);
        }

        return frame;
    } else {
        //没有找到分隔符:所有可读字段全部丢弃
        // 处于非丢弃模式
        if (!discardingTooLongFrame) {
            if (buffer.readableBytes() > maxFrameLength) {
                // 标记丢弃字节长度
                // Discard the content of the buffer until a delimiter is found.
                tooLongFrameLength = buffer.readableBytes();
                buffer.skipBytes(buffer.readableBytes());
                // 转换为丢弃模式
                discardingTooLongFrame = true;
                if (failFast) {
                    fail(tooLongFrameLength);
                }
            }
        }
        // 处于丢弃模式:已经丢弃+当前要丢弃的
        else {
            // Still discarding the buffer since a delimiter is not found.
            tooLongFrameLength += buffer.readableBytes();
            buffer.skipBytes(buffer.readableBytes());
        }
        return null;
    }
}

基于长度域解码器分析

public class LengthFieldBasedFrameDecoder extends ByteToMessageDecoder {
    // 下面介绍几个颇为重要的参数
    private final ByteOrder byteOrder;
    private final int maxFrameLength;
    
    //指出包长度的偏移量 
    private final int lengthFieldOffset;
    // 指出数据包长度
    private final int lengthFieldLength;
    private final int lengthFieldEndOffset;
    // 后面数据包实际长度等于 lengthFieldLength+lengthAdjustment
    private final int lengthAdjustment;
    // 在最后解析的数据包中需要跳过多少字节
    private final int initialBytesToStrip;
    private final boolean failFast;
    private boolean discardingTooLongFrame;
    private long tooLongFrameLength;
    private long bytesToDiscard;
    
        @Override
    protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        Object decoded = decode(ctx, in);
        if (decoded != null) {
            out.add(decoded);
        }
    }
    
    下面介绍实际的逻辑,逻辑较为复杂,所以下面分为三个部分进行介绍
        

    
}
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
    // 开启丢弃模式
    if (discardingTooLongFrame) {
        long bytesToDiscard = this.bytesToDiscard;
        // 计算现在能够丢弃的数据
        int localBytesToDiscard = (int) Math.min(bytesToDiscard, in.readableBytes());
        in.skipBytes(localBytesToDiscard);
        bytesToDiscard -= localBytesToDiscard;
        this.bytesToDiscard = bytesToDiscard;
        // 通过failIfNecessary判断是否丢弃完了,然后从该方法中恢复非丢弃状态
        failIfNecessary(false);
    }
    /*
        private void failIfNecessary(boolean firstDetectionOfTooLongFrame) {
        if (bytesToDiscard == 0) {
            // Reset to the initial state and tell the handlers that
            // the frame was too large.
            long tooLongFrameLength = this.tooLongFrameLength;
            this.tooLongFrameLength = 0;
            discardingTooLongFrame = false;
            if (!failFast ||
                failFast && firstDetectionOfTooLongFrame) {
                fail(tooLongFrameLength);
            }
        } else {
            // Keep discarding and notify handlers if necessary.
            if (failFast && firstDetectionOfTooLongFrame) {
                fail(tooLongFrameLength);
            }
        }
    }
    */
    
    
// lengthFieldEndOffset = lengthFieldOffset + lengthFieldLength;
// 所以 lengthFieldEndOffset即为指向真正数据的前一个指针,
    if (in.readableBytes() < lengthFieldEndOffset) {
        // 说明此时还没有真正的数据可以读
        return null;
    }

    int actualLengthFieldOffset = in.readerIndex() + lengthFieldOffset;
    // 表示从actualLengthFieldOffset起,读lengthFieldLength的value字节数,这个framelength
    // 相当于lengthFieldLength中所表示的值
    long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder);

    if (frameLength < 0) {
        in.skipBytes(lengthFieldEndOffset);
        throw new CorruptedFrameException(
                "negative pre-adjustment length field: " + frameLength);
    }
	// 完整的数据包字节数=frame+前面指针值+lengthAdjustment
    frameLength += lengthAdjustment + lengthFieldEndOffset;

    if (frameLength < lengthFieldEndOffset) {
        in.skipBytes(lengthFieldEndOffset);
        throw new CorruptedFrameException(
                "Adjusted frame length (" + frameLength + ") is less " +
                "than lengthFieldEndOffset: " + lengthFieldEndOffset);
    }
      //(在这个逻辑处将会触发进入丢弃模式)
    if (frameLength > maxFrameLength) {
        // 计算可以丢弃的字节数
        long discard = frameLength - in.readableBytes();
        tooLongFrameLength = frameLength;
        // 如果小于0,则代表frameLength小于可以读的数据,所以这次数据全部丢掉
        if (discard < 0) {
            // buffer contains more bytes then the frameLength so we can discard all now
            in.skipBytes((int) frameLength);
        } else {
            // discard>0,则代表这一次无法全部丢弃,保留仍待丢弃的数据,待下次在进行丢弃,并开启丢弃模式,丢弃字节数由bytesToDiscard保存,然后进入上面源码开始的地方,进行分析
            // Enter the discard mode and discard everything received so far.
            discardingTooLongFrame = true;
            bytesToDiscard = discard;
            in.skipBytes(in.readableBytes());
        }
        failIfNecessary(true);
        return null;
    }

    // never overflows because it's less than maxFrameLength
    int frameLengthInt = (int) frameLength;
    // 什么也不做,因为可读的数据流小于我们想要定义读的一个完整数据包
    if (in.readableBytes() < frameLengthInt) {
        return null;
    }
    //(2):进入到跳过多少字节这一个步骤,首先如果initialBytesToStrip(跳过字节数)>frameLengthInt
    // 则抛出异常,因为这样子相当于没有任何意义。
    if (initialBytesToStrip > frameLengthInt) {
        in.skipBytes(frameLengthInt);
        throw new CorruptedFrameException(
                "Adjusted frame length (" + frameLength + ") is less " +
                "than initialBytesToStrip: " + initialBytesToStrip);
    }
    // 如果initialBytesToStrip合理,则跳过指定的字节数
    in.skipBytes(initialBytesToStrip);

    // extract frame
    // 获取当前读指针
    int readerIndex = in.readerIndex();
    // 获取需要读取的真正字节数
    int actualFrameLength = frameLengthInt - initialBytesToStrip;
    // 表示从buf里readindex指针处开始,抽取出actualFrameLength字节
    ByteBuf frame = extractFrame(ctx, in, readerIndex, actualFrameLength);
    // 然后将读指针进行向下偏移	
    in.readerIndex(readerIndex + actualFrameLength);
    return frame;
}

总结步骤:

Netty编解码器源码分析(上)(详细分析在注释中说明)

上一篇:Netty源码(九)之服务端读取数据的过程


下一篇:Netty由浅入深的学习指南(入门)