<2021SC@SDUSC>netty常见编解码器(一)

2021SC@SDUSC

文章目录

前言

从本篇博客开始,会介绍netty中已经实现的几种编码器和解码器。在本篇博客中,将会介绍netty的LineBasedFrameDecoder类,它是基于换行符\r\n将消息分割成不同的部分。

一、LineBasedFrameDecoder

package io.netty.handler.codec;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ByteProcessor;

import java.util.List;

public class LineBasedFrameDecoder extends ByteToMessageDecoder {

    private final int maxLength;
    private final boolean failFast;
    private final boolean stripDelimiter;

    private boolean discarding;
    private int discardedBytes;

    private int offset;

    public LineBasedFrameDecoder(final int maxLength) {
        this(maxLength, true, false);
    }

    public LineBasedFrameDecoder(final int maxLength, final boolean stripDelimiter, final boolean failFast) {
        this.maxLength = maxLength;
        this.failFast = failFast;
        this.stripDelimiter = stripDelimiter;
    }

    @Override
    protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        Object decoded = decode(ctx, in);
        if (decoded != null) {
            out.add(decoded);
        }
    }
    
    protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
        final int eol = findEndOfLine(buffer);
        if (!discarding) {
            if (eol >= 0) {
                final ByteBuf frame;
                final int length = eol - buffer.readerIndex();
                final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;

                if (length > maxLength) {
                    buffer.readerIndex(eol + delimLength);
                    fail(ctx, length);
                    return null;
                }

                if (stripDelimiter) {
                    frame = buffer.readRetainedSlice(length);
                    buffer.skipBytes(delimLength);
                } else {
                    frame = buffer.readRetainedSlice(length + delimLength);
                }

                return frame;
            } else {
                final int length = buffer.readableBytes();
                if (length > maxLength) {
                    discardedBytes = length;
                    buffer.readerIndex(buffer.writerIndex());
                    discarding = true;
                    offset = 0;
                    if (failFast) {
                        fail(ctx, "over " + discardedBytes);
                    }
                }
                return null;
            }
        } else {
            if (eol >= 0) {
                final int length = discardedBytes + eol - buffer.readerIndex();
                final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;
                buffer.readerIndex(eol + delimLength);
                discardedBytes = 0;
                discarding = false;
                if (!failFast) {
                    fail(ctx, length);
                }
            } else {
                discardedBytes += buffer.readableBytes();
                buffer.readerIndex(buffer.writerIndex());
                // We skip everything in the buffer, we need to set the offset to 0 again.
                offset = 0;
            }
            return null;
        }
    }

    private void fail(final ChannelHandlerContext ctx, int length) {
        fail(ctx, String.valueOf(length));
    }

    private void fail(final ChannelHandlerContext ctx, String length) {
        ctx.fireExceptionCaught(
                new TooLongFrameException(
                        "frame length (" + length + ") exceeds the allowed maximum (" + maxLength + ')'));
    }

    private int findEndOfLine(final ByteBuf buffer) {
        int totalLength = buffer.readableBytes();
        int i = buffer.forEachByte(buffer.readerIndex() + offset, totalLength - offset, ByteProcessor.FIND_LF);
        if (i >= 0) {
            offset = 0;
            if (i > 0 && buffer.getByte(i - 1) == '\r') {
                i--;
            }
        } else {
            offset = totalLength;
        }
        return i;
    }
}

二、分析

首先,LindeBasedFrameDecoder类继承自ByteToMessageDecoder,该类接受的消息是ByteBuf,并将消息转换为其它类型的消息。此外,继承ByteToMessageDecoder类的所有解码器不允许带有@Sharable注解。
在LineBasedFrameDecoder中,有两个构造函数,分别是

public LineBasedFrameDecoder(final int maxLength)

public LineBasedFrameDecoder(final int maxLength, final boolean stripDelimiter, final boolean failFast)

阅读源码后,知到第一个构造函数调用了第二个构造函数,这里会解释三个属性的意思。maxLength是解码器支持的最大消息长度,stripDelimiter表示是否需要在解析消息后将换行符去除,failFast为true,则当解析的消息长度一超过maxLength,就会抛出TooLongFrameException异常,否则,等到全部数据读取完后,再抛出异常。
如之前所说的,LineBasedFrameDecoder会将消息根据换行符分割成不同的部分。首先是通过调用findEndOfLine()方法找到消息中换行符的位置。

int i = buffer.forEachByte(buffer.readerIndex() + offset, totalLength - offset, ByteProcessor.FIND_LF);

在该方法中,通过遍历bytebuf中的内容,找到换行符。offset是偏移,表示上一次扫描到的位置,因为decode方法的调用,所以一个消息并不是一次性分割成多个部分,所以,需要保存一些信息,而正是因为要保存信息,所以这些decoder不能被共享。
ByteProcessor.FIND_LF就是\n,所以,这里的i是消息中\n的位置,之后,再判断该位置前一个的位置是不是\r,如果是,i减去1,返回,所以,这个方法,找到的如果只有\n,直接返回\n的位置,如果找到的是\r\n,返回的是\r的位置,如果没有,返回-1。
接下来,将会分析decode()方法。在LineBasedFrameDecoder类中,有两个decode方法。

protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception

protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception

第一个decode方法重写了父类的decode方法,而具体的实现,则调用了第二个decode方法,所以,会主要分析第二个decode方法。
在这个方法中,首先是找到了换行符的位置。之后,根据是否discarding进入不同分支。discarding为true,表示数据长度超过maxLength,就丢弃。
先分析非丢弃的模式,首先判断eol是否大于等于0,也就是有没有换行符。假设有换行符,首先获取这一次要截取的数据的长度,如果长度超过了maxLength,则跳过本次数据,否则,判断是否要带换行符,根据结果,返回这次的数据。如果没有换行符,则读取的数据为数据的全部,如果读取的数据的长度超过了最大长度设置为丢弃模式,将所有数据跳过。
接下来,分析discarding为true的情况。依然是首先判断是否有换行符,假设有换行符,要读取的数据长度为之前丢弃的数据的长度加上从数据中截下来的部分的长度,然后将数据丢弃。这里,我的理解是,假设一份数据由于在传递过程中由于网络原因分成了两份,总的数据长度应该是两份数据长度之和,而且,由于第一份数据的长度已经超过了最大长度,所以,这两份数据应该都丢弃掉,假如仅仅丢弃了第一份数据,而保留了第二份数据,则数据不正确且不完整,依然是错误的。之后将discarding设置为false。
最后,如果数据中没有换行符,跳过所有的数据即可。

三、总结

在本篇博客中,分析了netty中常见的解码器之一,LineBasedFrameDecoder的代码,该类是为了解决拆包粘包的问题,通过换行符将不同数据包中的同一份数据重新拼起来,或者,将一份数据包中不同的数据分开。据说,在ftp协议中,就是通过在最后增加换行符的方式,将数据分隔开的。

上一篇:CMU15-445 Lecture #05 Buffer Pools


下一篇:Ch12 块设备I/O和缓冲区管理