Netty 序列化框架

Netty 序列化框架

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-vp7ElmXY-1631631213919)(C:\Users\XIAODO~1\AppData\Local\Temp\1631623869664.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-NiseYZNc-1631631213923)(C:\Users\XIAODO~1\AppData\Local\Temp\1631623931118.png)]

常见的序列化工具有7中,其中protobuf无论是编解码速度还是字节码流的大小,就效果而言都是最优的选择。因此,这里只研究Protobuf在Netty中的使用。

一、Encoder通用

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-79Ls2Qm7-1631631213926)(C:\Users\XIAODO~1\AppData\Local\Temp\1631624664422.png)]

自定义的编码器需要 继承MessageToByteEncoder这个抽象类,并且实现抽象方法

protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out) throws Exception;

将需要传输的对象序列化后,写入out这个buf即可。如ObjectEncoder:

@Override
protected void encode(ChannelHandlerContext ctx, Serializable msg, ByteBuf out) throws Exception {   // 在缓冲区的起始地址
    int startIdx = out.writerIndex();
    // 包装一层
    ByteBufOutputStream bout = new ByteBufOutputStream(out);
    // 占位符4个比特位,标志位
    bout.write(LENGTH_PLACEHOLDER);
    // 封装一层
    ObjectOutputStream oout = new CompactObjectOutputStream(bout);
    // 写数据
    oout.writeObject(msg);
    oout.flush();
    oout.close();

    int endIdx = out.writerIndex();
	// 替换分块编码的标志位,设置成数据长度,4byte
    out.setInt(startIdx, endIdx - startIdx - 4);
}

二、Decoder通用

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-D1elMPjZ-1631631213930)(C:\Users\XIAODO~1\AppData\Local\Temp\1631625434535.png)]

自定义解码器需要继承ByteToMessageDecoder抽象类,并且实现抽象方法:

protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;

ObjectDecoder的实现:

@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
    // 从缓冲读取字节流到buf中
    ByteBuf frame = (ByteBuf) super.decode(ctx, in);
    if (frame == null) {
        return null;
    }
	// 返回解析好的object对象,这也是Handler的read,write,等方法中的入参msg
    return new CompactObjectInputStream(
            new ByteBufInputStream(frame), classResolver).readObject();
}

Handler:

@Override
   public void channelRead(ChannelHandlerContext ctx, Object msg)
    throws Exception {
SubscribeReq req = (SubscribeReq) msg;
if ("Lilinfeng".equalsIgnoreCase(req.getUserName())) {
    System.out.println("Service accept client subscrib req : ["
       + req.toString() + "]");
    ctx.writeAndFlush(resp(req.getSubReqID()));
}
}

三、MessagToMessageDecoder

// 将一种message转为另一种mesage
protected abstract void encode(ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception;

StringDecoder

将一个事件映射为多个事件在管道上传播。

// out 每多加一个Object,管道中就会多一个事件,
protected void encode(ChannelHandlerContext ctx, CharSequence msg, List<Object> out) throws Exception {
    if (msg.length() != 0) {
        out.add(ByteBufUtil.encodeString(ctx.alloc(), CharBuffer.wrap(msg), this.charset));
        // handler会读到连个读事件
        out.add("-------");
    }
}

ffer.wrap(msg), this.charset));
// handler会读到连个读事件
out.add("-------");
}
}


上一篇:Netty 协议设计与解析 (自定义协议)


下一篇:RPC的通信Netty的底层是Nio,字节跳动三场技术面+HR面