在日常的网络开发当中,协议解析都是必须的工作内容,Netty中虽然内置了基于长度、分隔符的编解码器,但在大部分场景中我们使用的都是自定义协议,所以Netty提供了 MessageToByteEncoder<I> 与 ByteToMessageDecoder 两个抽象类,通过继承重写其中的encode与decode方法实现私有协议的编解码。这篇文章我们就对Netty中的自定义编解码器进行实践与分析。
一、编解码器的使用
下面是MessageToByteEncoder与ByteToMessageDecoder使用的简单示例,其中不涉及具体的协议编解码。
创建一个sever端服务
EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); final CodecHandler codecHandler = new CodecHandler(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc())); } //添加编解码handler p.addLast(new MessagePacketDecoder(),new MessagePacketEncoder()); //添加自定义handler p.addLast(codecHandler); } }); // Start the server. ChannelFuture f = b.bind(PORT).sync();
继承MessageToByteEncoder并重写encode方法,实现编码功能
public class MessagePacketEncoder extends MessageToByteEncoder<byte[]> { @Override protected void encode(ChannelHandlerContext ctx, byte[] bytes, ByteBuf out) throws Exception { //进行具体的编码处理 这里对字节数组进行打印 System.out.println("编码器收到数据:"+BytesUtils.toHexString(bytes)); //写入并传送数据 out.writeBytes(bytes); } }
继承ByteToMessageDecoder 并重写decode方法,实现解码功能
public class MessagePacketDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out){ try { if (buffer.readableBytes() > 0) { // 待处理的消息包 byte[] bytesReady = new byte[buffer.readableBytes()]; buffer.readBytes(bytesReady); //进行具体的解码处理 System.out.println("解码器收到数据:"+ByteUtils.toHexString(bytesReady)); //这里不做过多处理直接把收到的消息放入链表中,并向后传递 out.add(bytesReady); } }catch(Exception ex) { } } }
实现自定义的消息处理handler,到这里其实你拿到的已经是编解码后的数据
public class CodecHandler extends ChannelInboundHandlerAdapter{ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { System.out.println("CodecHandler收到数据:"+ByteUtils.toHexString((byte[])msg)); byte[] sendBytes = new byte[] {0x7E,0x01,0x02,0x7e}; ctx.write(sendBytes); } @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // Close the connection when an exception is raised. cause.printStackTrace(); ctx.close(); } }
运行一个客户端模拟发送字节0x01,0x02,看一下输出的执行结果
解码器收到数据:0102 CodecHandler收到数据:0102 编码器收到数据:7E01027E
根据输出的结果可以看到消息的入站与出站会按照pipeline中自定义的顺序传递,同时通过重写encode与decode方法实现我们需要的具体协议编解码操作。
二、源码分析
通过上面的例子可以看到MessageToByteEncoder<I>与ByteToMessageDecoder分别继承了ChannelInboundHandlerAdapter与ChannelOutboundHandlerAdapter,所以它们也是channelHandler的具体实现,并在创建sever时被添加到pipeline中, 同时为了方便我们使用,netty在这两个抽象类中内置与封装了一些其操作;消息的出站和入站会分别触发write与channelRead事件方法,所以上面例子中我们重写的encode与decode方法,也都是在父类的write与channelRead方法中被调用,下面我们就别从这两个方法入手,对整个编解码的流程进行梳理与分析。
1、MessageToByteEncoder
编码需要操作的是出站数据,所以在MessageToByteEncoder的write方法中会调用我们重写的encode具体实现, 把我们内部定义的消息实体编码为最终要发送的字节流数据发送出去。
@Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ByteBuf buf = null; try { if (acceptOutboundMessage(msg)) {//判断传入的msg与你定义的类型是否一致 @SuppressWarnings("unchecked") I cast = (I) msg;//转为你定义的消息类型 buf = allocateBuffer(ctx, cast, preferDirect);//包装成一个ByteBuf try { encode(ctx, cast, buf);//传入声明的ByteBuf,执行具体编码操作 } finally { /** * 如果你定义的类型就是ByteBuf 这里可以帮助你释放资源,不需要在自己释放 * 如果你定义的消息类型中包含ByteBuf,这里是没有作用,需要你自己主动释放 */ ReferenceCountUtil.release(cast);//释放你传入的资源 } //发送buf if (buf.isReadable()) { ctx.write(buf, promise); } else { buf.release(); ctx.write(Unpooled.EMPTY_BUFFER, promise); } buf = null; } else { //类型不一致的话,就直接发送不再执行encode方法,所以这里要注意如果你传递的消息与泛型类型不一致,其实是不会执行的 ctx.write(msg, promise); } } catch (EncoderException e) { throw e; } catch (Throwable e) { throw new EncoderException(e); } finally { if (buf != null) { buf.release();//释放资源 } } }
MessageToByteEncoder的write方法要实现的功能还是比较简单的,就是把你传入的数据类型进行转换和发送;这里有两点需要注意:
- 一般情况下,需要通过重写encode方法把定义的泛型类型转换为ByteBuf类型, write方法内部自动帮你执行传递或发送操作;
- 代码中虽然有通过ReferenceCountUtil.release(cast)释放你定义的类型资源,但如果定义的消息类中包含ByteBuf对象,仍需要主动释放该对象资源;
2、ByteToMessageDecoder
从命名上就可以看出ByteToMessageDecoder解码器的作用是把字节流数据编码转换为我们需要的数据格式
作为入站事件,解码操作的入口自然是channelRead方法
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof ByteBuf) {//如果消息是bytebuff CodecOutputList out = CodecOutputList.newInstance();//实例化一个链表 try { ByteBuf data = (ByteBuf) msg; first = cumulation == null; if (first) { cumulation = data; } else { cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data); } callDecode(ctx, cumulation, out);//开始解码 } catch (DecoderException e) { throw e; } catch (Exception e) { throw new DecoderException(e); } finally { if (cumulation != null && !cumulation.isReadable()) {//不为空且没有可读数据,释放资源 numReads = 0; cumulation.release(); cumulation = null; } else if (++ numReads >= discardAfterReads) { // We did enough reads already try to discard some bytes so we not risk to see a OOME. // See https://github.com/netty/netty/issues/4275 numReads = 0; discardSomeReadBytes(); } int size = out.size(); decodeWasNull = !out.insertSinceRecycled(); fireChannelRead(ctx, out, size);//向下传递消息 out.recycle(); } } else { ctx.fireChannelRead(msg); } }
callDecode方法内部通过while循环的方式对ByteBuf数据进行解码,直到其中没有可读数据
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { try { while (in.isReadable()) {//判断ByteBuf是还有可读数据 int outSize = out.size();//获取记录链表大小 if (outSize > 0) {//判断链表中是否已经有数据 fireChannelRead(ctx, out, outSize);//如果有数据继续向下传递 out.clear();//清空链表 // Check if this handler was removed before continuing with decoding. // If it was removed, it is not safe to continue to operate on the buffer. // // See: // - https://github.com/netty/netty/issues/4635 if (ctx.isRemoved()) { break; } outSize = 0; } int oldInputLength = in.readableBytes(); decodeRemovalReentryProtection(ctx, in, out);//开始调用decode方法 // Check if this handler was removed before continuing the loop. // If it was removed, it is not safe to continue to operate on the buffer. // // See https://github.com/netty/netty/issues/1664 if (ctx.isRemoved()) { break; } //这里如果链表为空且bytebuf没有可读数据,就跳出循环 if (outSize == out.size()) { if (oldInputLength == in.readableBytes()) { break; } else {//有可读数据继续读取 continue; } } if (oldInputLength == in.readableBytes()) {//beytebuf没有读取,但却进行了解码 throw new DecoderException( StringUtil.simpleClassName(getClass()) + ".decode() did not read anything but decoded a message."); } if (isSingleDecode()) {//是否设置了每条入站数据只解码一次,默认false break; } } } catch (DecoderException e) { throw e; } catch (Exception cause) { throw new DecoderException(cause); } }
decodeRemovalReentryProtection方法内部会调用我们重写的decode解码实现
final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { decodeState = STATE_CALLING_CHILD_DECODE;//标记状态 try { decode(ctx, in, out);//调用我们重写的decode解码实现 } finally { boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING; decodeState = STATE_INIT; if (removePending) {//这里判断标记,防止handlerRemoved事件与解码操作冲突 handlerRemoved(ctx); } } }
channelRead方法中接受到数据经过一系列逻辑处理,最终会调用我们重写的decode方法实现具体的解码功能;在decode方法中我们只需要ByteBuf类型的数据解析为我们需要的数据格式直接放入 List<Object> out链表中即可,ByteToMessageDecoder会自动帮你向下传递消息。
三、总结
通过上面的讲解,我们可以对Netty中内置自定义编解码器MessageToByteEncoder与ByteToMessageDecoder有一定的了解,其实它们本质上是Netty封装的一组专门用于自定义编解码的channelHandler实现类。在实际开发当中基于这两个抽象类的实现非常具有实用性,所以在这里稍作分析, 其中如有不足与不正确的地方还望指出与海涵。
关注微信公众号,查看更多技术文章。
转载说明:未经授权不得转载,授权后务必注明来源(注明:来源于公众号:架构空间, 作者:大凡)