Netty编解码器&TCP粘包拆包

一、Netty编解码器

(一)Netty编解码器概述

  1、Java的编解码

    在Java中编码(Encode)称为序列化, 它将对象序列化为字节数组,?于?络传输、数据持久化或者其它?途。解码(Decode)称为反序列化,它把从?络、磁盘等读取的字节数组还原成原始对象(通常是原始对象的拷?),以?便后续的业务逻辑操作。

    java序列化对象只需要实现java.io.Serializable接?并?成序列化ID,这个类就能够通过java.io.ObjectInput和java.io.ObjectOutput序列化和反序列化。

    java序列化?的是为了?络传输和对象持久化。java序列化存在一系列的缺点,例如?法跨语?、序列化后码流太?、序列化性能太低等问题。同时java序列化仅仅是Java编解码技术的?种,由于它的种种缺陷,衍?出了多种编解码技术和框架,这些编解码框架实现了消息的?效序列化。

  2、Netty编解码器基本说明

    在Netty中,ChannelHandler 充当了处理?站和出站数据的应?程序逻辑的容器。例如,实现ChannelInboundHandler 接?(或 ChannelInboundHandlerAdapter),就可以接收?站事件和数据,这些数据会被业务逻辑处理。当要给客户端发送响应时,也可以从 ChannelInboundHandler 冲刷数据。业务逻辑通常写在?个或者多个 ChannelInboundHandler 中。ChannelOutboundHandler 原理?样,只不过它是?来处理出站数据的。

    ChannelPipeline 提供了 ChannelHandler 链的容器。如果事件的运动?向是从客户端到服务端的,如果我们站在客户端的角度,那么就是出站事件,即客户端发送给服务端的数据会通过pipeline 中的?系列 ChannelOutboundHandler,并被这些 Handler 处理,反之则称为?站的。

   3、编码解码器

    在?络应?中需要实现某种编解码器,将原始字节数据与?定义的消息对象进?互相转换。?络中都是以字节码的数据形式来传输数据的,服务器编码数据后发送到客户端,客户端需要对数据进?解码。

    netty提供了强?的编解码器框架,使得我们编写?定义的编解码器很容易,也容易封装重?。同时Netty 的编(解)码器实现了 ChannelHandlerAdapter,也是?种特殊的 ChannelHandler,所以依赖于 ChannelPipeline,可以将多个编(解)码器链接在?起,以实现复杂的转换逻辑。对于Netty??,编解码器由两部分组成:编码器、解码器。

      解码器:负责将消息从字节或其他序列形式转成指定的消息对象。负责处理?站 InboundHandler数据;

      编码器:将消息对象转成字节或其他序列形式在?络上传输。负责处理出站 OutboundHandler”数据;

   不论解码器 handler 还是编码器 handler ,接收的消息类型必须与待处理的消息类型?致,否则该 handler 不会被执?。Netty提供了很多编解码器,例如:

    StringEncoder字符串编码器

    StringDecoder字符串解码器

    ObjectEncoder 对象编码器

    ObjectDecoder 对象解码器

    FixedLengthFrameDecoder 固定?度的解码器

    LineBasedFrameDecoder 以换?符为结束标识的解码器

    DelimiterBasedFrameDecoder 指定消息分隔符的解码器

    LengthFieldBasedFrameDecoder基于?度通?解码器

(二)解码器(Decoder)

  解码器负责解码“?站”数据。从?种格式到另?种格式,解码器处理?站数据是抽象ChannelInboundHandler的实现。实践中使?解码器很简单,就是将?站数据转换格式后传递到ChannelPipeline中的下?个ChannelInboundHandler进?处理。

  对于解码器,Netty中主要提供了抽象基类ByteToMessageDecoder和MessageToMessageDecoder

      Netty编解码器&TCP粘包拆包

  抽象解码器:

    1) ByteToMessageDecoder: ?于将字节转为消息,需要检查缓冲区是否有?够的字节

    2) ReplayingDecoder: 继承ByteToMessageDecoder,不需要检查缓冲区是否有?够的字节,但是ReplayingDecoder速度略慢于ByteToMessageDecoder,同时不是所有的ByteBuf都?持。选择:项?复杂性?则使?ReplayingDecoder,否则使? ByteToMessageDecoder

    3)MessageToMessageDecoder: ?于从?种消息解码为另外?种消息(例如POJO到POJO)

  1、ByteToMessageDecoder

    ?于将接收到的?进制数据(Byte)解码,得到完整的请求报?(Message)。ByteToMessageDecoder是?种ChannelInboundHandler,可以称为解码器,负责将byte字节流(ByteBuf)转换成?种Message,Message是应?可以??定义的?种java对象。

    下?列出了ByteToMessageDecoder三个主要?法:

    protected abstract void decode(ChannelHandlerContext var1, ByteBuf var2, List<Object> var3) throws Exception;

    final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
     ......
       this.decode(ctx, in, out);
       ......
    }
    protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        if (in.isReadable()) {
            this.decodeRemovalReentryProtection(ctx, in, out);
        }

    }

    通过源码可以看到,decodeLast方法调用的额是decodeRemovalReentryProtection方法,而该方法又最终调用了decode方法,也就是说最终的实现逻辑是在decode方法中。同时decode方法也是该抽象类中唯一一个需要自己实现的方法。

    decodeLast方法参数的作?如下:

      BytuBuf in:需要解码的?进制数据。

      List out:解码后的有效报?列表,我们需要将解码后的报?添加到这个List中。之所以使??个List表示,是因为考虑到粘包问题,因此?参的in中可能包含多个有效报?。

    当然,也有可能发?了拆包,in中包含的数据还不?以构成?个有效报?,此时不往List中添加元素即可。另外特别要注意的是,在解码时,不能直接调?ByteBuf的readXXX?法来读取数据,?是应该?先要判断能否构成?个有效的报?。

    案例,假设协议规定传输的数据都是int类型的整数,因为int类型占用四个字节,因此我们需要判断当需要转码的二进制数据大于等于4个字节时,才进行解码操作。

      Netty编解码器&TCP粘包拆包

    上图中显式输?的ByteBuf中包含4个字节,每个字节的值分别为:1,2,3,4。我们?定义?个ToIntegerDecoder进?解码,尽管这?我看到了4个字节刚好可以构成?个int类型整数,但是在真正解码之前,我们并不知道ByteBuf包含的字节数能否构成完成的有效报?,因此需要?先判断ByteBuf中剩余可读的字节,是否?于等于4,如下:

public class ToIntegerDecoder extends ByteToMessageDecoder {
  @Override
  public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
    if (in.readableBytes() >= 4) {
      out.add(in.readInt());
     } 
  }
}

    只有在可读字节数>=4的情况下,我们才进?解码,即读取?个int,并添加到List中。在可读字节数?于4的情况下,我们并没有做任何处理,假设剩余可读字节数为3,不?以构成1个int。那么?类ByteToMessageDecoder发现这次解码List中的元素没有变化,则会对in中的剩余3个字节进?缓存,等待下1个字节的到来,之后再回到调?ToIntegerDecoder的decode?法。

    另外需要注意: 在ToIntegerDecoder的decode?法中,每次最多只读取?个1个int。如果ByteBuf中的字节数很多,例如为16,那么可以构成4个int,?这?只读取了1个int,那么剩余12字节怎么办?ByteToMessageDecoder再每次回调?类的decode?法之后,都会判断输?的ByteBuf中是否还有剩余字节可读,如果还有,会再次回调?类的decode?法,直到某个回调decode?法List中的元素个数没有变化时才停?,元素个数没有变化,实际上意味着?类已经没有办法从剩余的字节中读取?个有效报?。由于存在剩余可读字节时,ByteToMessageDecoder会?动再次回调?类decode?法,在实现ByteToMessageDecoder时,decode?法每次只解析?个有效报?即可,没有必要?次全部解析出来。

    ByteToMessageDecoder提供的?些常?的实现类:

      FixedLengthFrameDecoder:定?协议解码器,我们可以指定固定的字节数算?个完整的报?

      LineBasedFrameDecoder: 换?分隔符解码器,遇到\n或者\r\n,则认为是?个完整的报?

      DelimiterBasedFrameDecoder: 分隔符解码器,与LineBasedFrameDecoder类似,只不过分隔符可以??指定

      LengthFieldBasedFrameDecoder:?度编码解码器,将报?划分为报?头/报?体,根据报?头中的Length字段确定报?体的?度,因此报?提的?度是可变的

      JsonObjectDecoder:json格式解码器,当检测到匹配数量的"{" 、”}”或”[””]”时,则认为是?个完整的json对象或者json数组。

    这些实现类,都只是将接收到的?进制数据,解码成包含完整报?信息的ByteBuf实例后,就直接交给了之后的ChannelInboundHandler处理。

  2、ReplayingDecoder

    ReplayingDecoder是byteToMessage的子类,他与byteToMessage最大的不同就是不需要检查缓冲区是否有足够的的字节;若ByteBuf中有?够的字节,则会正常读取;若没有?够的字节则会停?解码。

    ReplayingDecoder 使??便,但它也有?些局限性:

      1) 不是所有的操作都被ByteBuf?持,如果调??个不?持的操作会抛出DecoderException。

      2) ByteBuf.readableBytes()?部分时间不会返回期望值

      3)ReplayingDecoder 在某些情况下可能稍慢于 ByteToMessageDecoder,例如?络缓慢并且消息格式复杂时,消息会被拆成了多个碎?,速度变慢

    在满?需求的情况下推荐使?ByteToMessageDecoder,因为它的处理?较简单,没有ReplayingDecoder实现的那么复杂。ReplayingDecoder继承于ByteToMessageDecoder,所以他们提供的接?是相同的。下?代码是ReplayingDecoder的实现:

/**
* Integer解码器,ReplayingDecoder实现
*/
public class ToIntegerReplayingDecoder extends ReplayingDecoder<Void> {
  @Override
  protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
    System.out.println("ToIntegerReplayingDecoder 被调?");
    //在 ReplayingDecoder 不需要判断数据是否?够读取,内部会进?处理判断
    out.add(in.readInt());
   }
}

  3、MessageToMessageDecoder

    ByteToMessageDecoder是将?进制流进?解码后,得到有效报?。?MessageToMessageDecoder则是将?个本身就包含完整报?信息的对象转换成另?个java对象。前?介绍了ByteToMessageDecoder的部分?类解码后,会直接将包含了报?完整信息的ByteBuf实例交由之后的ChannelInboundHandler处理,此时,你可以在ChannelPipeline中,再添加?个MessageToMessageDecoder,将ByteBuf中的信息解析后封装到Java对象中,简化之后的ChannelInboundHandler的操作。另外,在?些场景下,有可能你的报?信息已经封装到了Java对象中,但是还要继续转成另外的Java对象,因此?个MessageToMessageDecoder后?可能还跟着另?个MessageToMessageDecoder。?个?较容易的理解的类?案例是java Web编程,通常客户端浏览器发送过来的?进制数据,已经被web容器(如tomcat)解析成了?个HttpServletRequest对象,但是我们还是需要将HttpServletRequest中的数据提取出来,封装成我们??的POJO类,也就是从?个java对象(HttpServletRequest)转换成另?个Java对象(我们的POJO类)。

  MessageToMessageDecoder的类声明如下:

/**
* 其中泛型参数I表示我们要解码的消息类型。例前?,我们在ToIntegerDecoder中,把?进制字节
流转换成了?个int类型的整数。
*/
public abstract class MessageToMessageDecoder<I> extends ChannelInboundHandlerAdapter

  类似的,MessageToMessageDecoder也有?个decode?法需要覆盖 ,如下: 

/**
* 参数msg,需要进?解码的参数。例如ByteToMessageDecoder解码后的得到的包含完整报?信息
ByteBuf
* List<Object> out参数:将msg经过解析后得到的java对象,添加到放到List<Object> out中
*/
protected abstract void decode(ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception;

  例如,现在我们想编写?个IntegerToStringDecoder,把前?编写的ToIntegerDecoder输出的int参数转换成字符串,此时泛型I就应该是Integer类型。

      Netty编解码器&TCP粘包拆包

  integerToStringDecoder源码如下所示:

public class IntegerToStringDecoder extends MessageToMessageDecoder<Integer> {
  @Override
  public void decode(ChannelHandlerContext ctx, Integer msg, List<Object> out) throws Exception {
    out.add(String.valueOf(msg));
   }
}

  此时我们应该按照如下顺序组织ChannelPipieline中ToIntegerDecoder和IntegerToStringDecoder 的关系:也就是说,前?个ChannelInboudHandler输出的参数类型,就是后?个ChannelInboudHandler的输?类型。特别注意,如果我们指定MessageToMessageDecoder的泛型参数为ByteBuf,表示其可以直接针对ByteBuf进?解码,那么其是否能替代ByteToMessageDecoder呢?

  答案是不可以的。因为ByteToMessageDecoder除了进?解码,还要会对不?以构成?个完整数据的报?拆包数据(拆包)进?缓存。?MessageToMessageDecoder则没有这样的逻辑。因此通常的使?建议是,使??个ByteToMessageDecoder进?粘包、拆包处理,得到完整的有效报?的ByteBuf实例,然后交由之后的?个或者多个MessageToMessageDecoder对ByteBuf实例中的数据进?解析,转换成POJO类。

  4、?定义解码器

  通过继承ByteToMessageDecoder?定义解码器在解码器进?数据解码时,需判断缓存区(ByteBuf)的数据是否?够,否则收到结果与期望结果可能不?致

/**
* todo ?定义解码器
*/
public class ByteToLongDecoder extends ByteToMessageDecoder {
  @Override
  protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
    System.out.println("ByteToLongDecoder decode 被调?");
    //todo 因为long占8个字节, 需要判断?于等于8个字节时才能读取?个long
    if(in.readableBytes() >= 8) {
      out.add(in.readLong());
     }
   }
}

(三)编码器(Encoder)

  Netty提供了对应的编码器实现MessageToByteEncoder和MessageToMessageEncoder,?者都实现ChannelOutboundHandler接?。

       Netty编解码器&TCP粘包拆包

   相对来说,编码器?解码器的实现要更加简单,原因在于解码器除了要按照协议解析数据,还要要处理粘包、拆包问题;?编码器只要将数据转换成协议规定的?进制格式发送即可。

  1、抽象类MessageToByteEncoder

    MessageToByteEncoder也是?个泛型类,泛型参数I表示将需要编码的对象的类型,编码的结果是将信息转换成?进制流放?ByteBuf中。?类通过覆写其抽象?法encode来实现编码,如下所示:

public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter {
  ....
  protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out) throws Exception;
}
    可以看到,MessageToByteEncoder的输出对象out是?个ByteBuf实例,我们应该将泛型参数msg包含的信息写?到这个out对象中。
public class IntegerToByteEncoder extends MessageToByteEncoder<Integer> {
  @Override
  protected void encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out) throws Exception {
    out.writeInt(msg);//将Integer转成?进制字节流写?ByteBuf中
  }
}

  2、抽象类MessageToMessageEncoder

    MessageToMessageEncoder同样是?个泛型类,泛型参数I表示将需要编码的对象的类型,编码的结果是将信息放到?个List中。?类通过覆写其抽象?法encode,来实现编码,如下所示:

public abstract class MessageToMessageEncoder<I> extends ChannelOutboundHandlerAdapter {
  ...
  protected abstract void encode(ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception;
  ...
}

     与MessageToByteEncoder不同的,MessageToMessageEncoder编码后的结果放到的out参数类型是?个List中。例如,你?次发送2个报?,因此msg参数中实际上包含了2个报?,因此应该解码出两个报?对象放到List中。

    MessageToMessageEncoder提供的常??类包括:

      LineEncoder:按?编码,给定?个CharSequence(如String),在其之后添加换?符\n或者\r\n,并封装到ByteBuf进?输出,与LineBasedFrameDecoder相对应。

      Base64Encoder:给定?个ByteBuf,得到对其包含的?进制数据进?Base64编码后的新的ByteBuf进?输出,与Base64Decoder相对应。

      LengthFieldPrepender:给定?个ByteBuf,为其添加报?头Length字段,得到?个新的ByteBuf进?输出。Length字段表示报??度,与LengthFieldBasedFrameDecoder相对应。

      StringEncoder:给定?个CharSequence(如:StringBuilder、StringBuffer、String等),将其转换成ByteBuf进?输出,与StringDecoder对应。

    这些MessageToMessageEncoder实现类最终输出的都是ByteBuf,因为最终在?络上传输的都要是?进制数据。

  3、?定义编码器

    通过继承MessageToByteEncoder?定义编码器

public class LongToByteEncoder extends MessageToByteEncoder<Long> {
  @Override
  protected void encode(ChannelHandlerContext ctx, Long msg, ByteBuf out) throws Exception {
    System.out.println("LongToByteEncoder encode被调?");
    System.out.println("msg=" + msg);
    out.writeLong(msg);
  }
}

(四)编码解码器(Codec)

  编码解码器同时具有编码与解码功能,特点是同时实现了ChannelInboundHandler和ChannelOutboundHandler接?,因此在数据输?和输出时都能进?处理。

      Netty编解码器&TCP粘包拆包

  Netty提供提供了?个ChannelDuplexHandler适配器类,编码解码器的抽象基类ByteToMessageCodec 、MessageToMessageCodec都继承与此类。

  ByteToMessageCodec内部维护了?个ByteToMessageDecoder和?个MessageToByteEncoder实例,可以认为是?者的功集合,泛型参数I是接受的编码类型:

public abstract class ByteToMessageCodec<I> extends ChannelDuplexHandler {
  private final TypeParameterMatcher outboundMsgMatcher;
  private final MessageToByteEncoder<I> encoder;
  private final ByteToMessageDecoder decoder = new ByteToMessageDecoder(){…}
  ...
  protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out) throws Exception;
  protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
  ...
}
  MessageToMessageCodec内部维护了?个MessageToMessageDecoder和?个MessageToMessageEncoder实例,泛型参数INBOUND_IN和OUTBOUND_IN分别表示需要解码和编码的数据类型。
public abstract class MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN> extends ChannelDuplexHandler {
  private final MessageToMessageEncoder<Object> encoder= ...
  private final MessageToMessageDecoder<Object> decoder =…
  ...
  protected abstract void encode(ChannelHandlerContext ctx, OUTBOUND_IN msg, List<Object> out) throws Exception;
  protected abstract void decode(ChannelHandlerContext ctx, INBOUND_IN msg, List<Object> out) throws Exception;
}

   其他编解码?式:

    使?编解码器来充当编码器和解码器的组合失去了单独使?编码器或解码器的灵活性,编解码器是要么都有要么都没有。你可能想知道是否有解决这个僵化问题的?式,还可以让编码器和解码器在ChannelPipeline中作为?个逻辑单元。幸运的是,Netty提供了?种解决?案,使?CombinedChannelDuplexHandler。如何使?CombinedChannelDuplexHandler来结合解码器和编码器呢?下?我们从两个简单的例?看了解。

/**
* 解码器,将byte转成char
*/
public class ByteToCharDecoder extends ByteToMessageDecoder {
  @Override
  protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
    while(in.readableBytes() >= 2){
    out.add(Character.valueOf(in.readChar()));
  }
}
/**
* 编码器,将char转成byte
*/
public class CharToByteEncoder extends MessageToByteEncoder<Character> {
  @Override
  protected void encode(ChannelHandlerContext ctx, Character msg, ByteBuf out) throws Exception {
    out.writeChar(msg);
  }
}
/**
* 继承CombinedChannelDuplexHandler,?于绑定解码器和编码器
*/
public class CharCodec extends CombinedChannelDuplexHandler<ByteToCharDecoder, CharToByteEncoder> {
  public CharCodec(){
    super(new ByteToCharDecoder(), new CharToByteEncoder());
  }
}

   从上?代码可以看出,使?CombinedChannelDuplexHandler绑定解码器和编码器很容易实现,?使?Codec更灵活。

二、TCP粘包和拆包

(一)TCP粘包和拆包基本介绍

  TCP是个流协议,所谓流,就是没有界限的?串数据。TCP底层并不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进?包的划分,所以在业务上认为,?个完整的包可能会被TCP拆分成多个包进?发送,也有可能把多个?的包封装成?个?的数据包发送,这就是所谓的TCP粘包和拆包问题。

  TCP粘包:把多个?的包封装成?个?的数据包发送,发送?发送的若?数据包到接收?时粘成?个包

  TCP拆包:把?个完整的包拆分为多个?包进?发送,发送?发送?个数据包到接收?时被拆分为若?个?包

      Netty编解码器&TCP粘包拆包

  如上图所示,就是粘包拆包的各个场景演示。假设客户端分别发送了两个数据包 D1 和 D2 给服务端,由于服务端?次读取到字节数是不确定的,故可能存在以下四种情况:

    1. 服务端分两次读取到了两个独?的数据包,分别是 D1 和 D2 ,没有粘包和拆包

    2. 服务端?次接受到了两个数据包, D1 和 D2 粘合在?起,称之为 TCP 粘包

    3. 服务端分两次读取到了数据包,第?次读取到了完整的 D1 包和 D2 包的部分内容,第?次读取到了 D2 包的剩余内容,这称之为 TCP 拆包

    4. 服务端分两次读取到了数据包,第?次读取到了 D1 包的部分内容 D1_1 ,第?次读取到了 D1包的剩余部分内容 D1_2 和完整的 D2 包。

(二)TCP粘包和拆包产生原因

      Netty编解码器&TCP粘包拆包

  发?TCP粘包、拆包主要是由于下??些原因:

    应?程序写?的数据?于缓冲区??,这将会发?拆包。

    应?程序写?数据?于字缓冲区??,?卡将应?多次写?的数据发送到?络上,这将会发?粘包。

    进?MSS(最?报??度)??的TCP分段,当TCP报??度-TCP头部?度>MSS的时候将发?拆包。

    接收?法不及时读取套接字缓冲区数据,这将发?粘包。

  MSS: 是Maximum Segement Size缩写,表示TCP报?中data部分的最??度,是TCP协议在OSI五层?络模型中传输层对?次可以发送的最?数据的限制。

  MTU: 最?传输单元,是Maxitum Transmission Unit的简写,是OSI五层?络模型中链路层(datalink layer)对?次可以发送的最?数据的限制。

  当需要传输的数据?于MSS或者MTU时,数据会被拆分成多个包进?传输。由于MSS是根据MTU计算出来的,因此当发送的数据满?MSS时,必然满?MTU。

  发送端的字节流都会先传?缓冲区,再通过?络传?到接收端的缓冲区中,最终由接收端获取。当我们发送两个完整包到接收端的时候,正常情况会接收到两个完整的报?。但也有可能接收到的是?个报?,它是由发送的两个报?组成的,这样对于应?程序来说就很难处理了(这样称为粘包);还有可能出现上?这样的虽然收到了两个包,但是??的内容却是互相包含,对于应?来说依然?法解析(拆包)。

(三)解决?案

  拆包解决思路:

    基本思路就是不断的从TCP缓冲区中读取数据,每次读取完都需要判断是否是?个完整的数据包。

      若当前读取的数据不?以拼接成?个完整的业务数据包,那就保留该数据,继续从tcp缓冲区中读取,直到得到?个完整的数据包

      若当前读到的数据加上已经读取的数据?够拼接成?个数据包,那就将已经读取的数据拼接上本次读取的数据,构成?个完整的业务数据包传递到业务逻辑,多余的数据仍然保留,以便和下次读到的数据尝试拼接

  关键点是如何判断是?个完整的数据包,解决策略:

    (1)设置消息边界(分隔符,对应Netty提供的LineBasedFrameDecoder、DelimiterBasedFrameDecoder解码器),例如换行符或者自定义的分隔符。

    (2)设置定?消息(对应Netty提供的FixedLengthFrameDecoder解码器),例如对Integer指定4个字节。

    (3)使?带消息头的协议,消息头存储消息开始标识及消息的?度信息Header+Body(对应Netty提供的LengthFieldBasedFrameDecoder解码器),例如http形式的处理,使用报文头中的长度截取报文体。

    (4)发送消息?度,?定义消息解码器

  1、LineBasedFrameDecoder

    LineBasedFrameDecoder是回?换?解码器,如果?户发送的消息以回?换?符作为消息结束的标识,则可以直接使?Netty的LineBasedFrameDecoder对消息进?解码,只需要在初始化Netty服务端或者客户端时将LineBasedFrameDecoder正确的添加到ChannelPipeline中即可,不需要??重新实现?套换?解码器。

    LineBasedFrameDecoder的?作原理是它依次遍历ByteBuf中的可读字节,判断是否有“\n”或“\r\n”,如果有,就以此位置为结束位置,从可读索引到结束位置区间的字节就组成了??。它是以换?符为结束标志的解码器,?持携带结束符或不携带结束符两种解码?式,同时?持配置单?的最??度。如果连接读取到最??度后仍然没有发现换?符,就会抛出异常,同时忽略掉之前读到的异常码流。防?由于数据报没有携带换?符导致接收到 ByteBuf ?限制积压,引起系统内存溢出。

    通常LineBasedFrameDecoder会和StringDecoder搭配使?。StringDecoder的功能?常简单,就是将接收到的对象转换成字符串,然后继续调?后?的Handler。LineBasedFrameDecoder+StringDecoder组合就是按?切换的?本解码器,?来?持TCP的粘包和拆包。

    对于?本类协议的解析,?本换?解码器?常实?,例如对 HTTP 消息头的解析、FTP 协议消息的解析等。

    LineBasedFrameDecoder使?起来?分简单,只需要在ChannelPipeline 中添加即可,如下所示

ServerBootstrap b = new ServerBootstrap();
  b.group(bossGroup, workerGroup)
     .channel(NioServerSocketChannel.class)
     .option(ChannelOption.SO_BACKLOG, 1024)
     .childHandler(new ChannelInitializer<SocketChannel>() {
  @Override
  public void initChannel(SocketChannel ch) throws Exception {
    ChannelPipeline p = ch.pipeline();
    p.addLast(new LineBasedFrameDecoder(1024));
    p.addLast(new StringDecoder());
    p.addLast(new StringEncoder());
    p.addLast(new LineServerHandler());
   }
 });

  2、DelimiterBasedFrameDecoder

    DelimiterBasedFrameDecoder是分隔符解码器,?户可以指定消息结束的分隔符,它可以?动完成以分隔符作为码流结束标识的消息的解码。回?换?解码器实际上是?种特殊的DelimiterBasedFrameDecoder解码器。

    ?先将分隔符转换成 ByteBuf 对象,作为参数构造 DelimiterBasedFrameDecoder,将其添加到ChannelPipeline 中,然后依次添加字符串解码器(通常?于?本解码)和?户 Handler。

    DelimiterBasedFrameDecoder 原理分析:解码时,判断当前已经读取的 ByteBuf 中是否包含分隔符 ByteBuf,如果包含,则截取对应的 ByteBuf 返回。 

    示例:发消息时自定义使用&_作为分隔符,解码时也使用其作为分隔符。

// 客户端发送数据
public class ClientHandler extends ChannelInboundHandlerAdapter {
  /**
  * 当客户端连接服务器完成就会触发该?法
  * @param ctx
  * @throws Exception
  */
  @Override
  public void channelActive(ChannelHandlerContext ctx) throws Exception {
    String message = "aaaaaaaaaaaaaaaa&_bbbbbbbbbbbbbbbbbb&_ccccccccccc&_";
    ByteBuf byteBuf = Unpooled.buffer(message.getBytes().length);
    byteBuf.writeBytes(message.getBytes());
    ctx.writeAndFlush(byteBuf);
 }
}  
// 服务端添加分隔符解码器
public class EchoServer {
  public static void main(String[] args) {
    // 设置两个线程组
    serverBootstrap.group(bossGroup,workerGroup)
       .channel(NioServerSocketChannel.class)
       .option(ChannelOption.SO_BACKLOG,1024)
       .childHandler(new ChannelInitializer<SocketChannel>() {
          @Override
          protected void initChannel(SocketChannel ch) throws Exception {
            // 向pipeline加?分隔符解码器
            ByteBuf delimiter = Unpooled.copiedBuffer("&_".getBytes());
            ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,true,delimiter));
            ch.pipeline().addLast(new StringDecoder());
            ch.pipeline().addLast(new ServerHandler());
           }
       });
   }
}

  3、FixedLengthFrameDecoder

    FixedLengthFrameDecoder是固定?度解码器,它能够按照指定的?度对消息进??动解码,开发者不需要考虑TCP的粘包/拆包等问题,?常实?。对于定?消息,如果消息实际?度?于定?,则往往会进?补位操作,它在?定程度上导致了空间和资源的浪费。但是它的优点也是?常明显的,编解码?较简单。

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
   .channel(NioServerSocketChannel.class)
   .option(ChannelOption.SO_BACKLOG, 100)
   .handler(new LoggingHandler(LogLevel.INFO))//配置?志输出
   .childHandler(new ChannelInitializer<SocketChannel>() {
      @Override
      protected void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new FixedLengthFrameDecoder(10));
        ch.pipeline().addLast(new StringDecoder());
        ch.pipeline().addLast(new StringEncoder());
        ch.pipeline().addLast(new ServerHandler());
       }
   });

 

    利? FixedLengthFrameDecoder 解码器,?论?次接收到多少数据报,它都会按照构造函数中设置的固定?度进?解码,如果是半包消息,FixedLengthFrameDecoder 会缓存半包消息并等待下个包到达后进?拼包,直到读取到?个完整的包。

  4、LengthFieldBasedFrameDecoder

  ?多数的协议(私有或者公有),协议头中会携带?度字段,?于标识消息体或者整包消息的?度,例如SMPP、HTTP协议等。由于基于?度解码需求的通?性,以及为了降低?户的协议开发难度,Netty提供了LengthFieldBasedFrameDecoder,?动屏蔽TCP底层的拆包和粘包问题,只需要传?正确的参数,即可轻松解决“读半包“问题。

  源码的构造函数:

public LengthFieldBasedFrameDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip) {
  this(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip, true);
 }

  参数解释:

* <pre> 
* lengthFieldOffset = 0 
* lengthFieldLength = 2 
* lengthAdjustment = 0 
* <b>initialBytesToStrip</b> = <b>2</b> (= the length of the Length field) 
* BEFORE DECODE (14 bytes)      AFTER DECODE (12 bytes) 
* +--------+----------------+    +----------------+
* | Length | Actual Content |----->| Actual Content | 
* | 0x000C | "HELLO, WORLD" |    | "HELLO, WORLD" | 
* +--------+----------------+ +----------------+
* </pre>
lengthFieldOffset = 0,?度字段偏移位置为0表示从包的第?个字节开始读取;
lengthFieldLength = 2,?度字段?为2,从包的开始位置往后2个字节的?度为?度字段;
lengthAdjustment = 0 ,解析的时候?需跳过任何?度;
initialBytesToStrip = 2,去掉当前数据包的开头2字节,去掉 header。
0x000C 转为 int = 12。  

  5、?定义消息解码器

//协议包
public class MessageProtocol {
  private int len; //关键
  private byte[] content;
  public int getLen() {
    return len;
  }
  public void setLen(int len) {
    this.len = len;
  }
  public byte[] getContent() {
    return content;
  }
  public void setContent(byte[] content) {
    this.content = content;
  }
}

  服务端示例

public class MyProtocolServer {
  private int port;
  public MyProtocolServer(int port) {
    this.port = port;
  }
  public void start(){
    EventLoopGroup bossGroup = new NioEventLoopGroup();
    EventLoopGroup workGroup = new NioEventLoopGroup();
    ServerBootstrap server = new ServerBootstrap()
      .group(bossGroup,workGroup)
      .channel(NioServerSocketChannel.class)
      .childHandler(new ServerChannelInitializer());
    try {
      ChannelFuture future = server.bind(port).sync();
      future.channel().closeFuture().sync();
    } catch (InterruptedException e) {
      e.printStackTrace();
    }finally {
      bossGroup.shutdownGracefully();
      workGroup.shutdownGracefully();
   }
  }
  public static void main(String[] args) {
    MyProtocolServer server = new MyProtocolServer(7788);
    server.start();
  }
} 

  客户端示例

public class MyProtocolClient {
  private int port;
  private String address;
  public MyProtocolClient(int port, String address) {
    this.port = port;
    this.address = address;
  }
  public void start(){
    EventLoopGroup group = new NioEventLoopGroup();
    Bootstrap bootstrap = new Bootstrap();
    bootstrap.group(group)
       .channel(NioSocketChannel.class)
       .option(ChannelOption.TCP_NODELAY, true)
       .handler(new ClientChannelInitializer());
    try {
      ChannelFuture future = bootstrap.connect(address,port).sync();
      future.channel().writeAndFlush("Hello world, i‘m online");
      future.channel().closeFuture().sync();
    } catch (Exception e) {
      e.printStackTrace();
    }finally {
      group.shutdownGracefully();
    }
  }
  public static void main(String[] args) {
    MyProtocolClient client = new MyProtocolClient(7788,"127.0.0.1");
    client.start();
  }
}

 

  

Netty编解码器&TCP粘包拆包

上一篇:Spring MVC 解析JSON入参意外的丢失参数


下一篇:4,Spark中 join的原理