Netty框架编解码之ObjectDecoder码流测量

netty实战a Java object into a ByteBuf码流

1. 背景

java序列化码流比较大,netty提供了一个Java Object到ByteBuf的编码器。

注意使用了ObjectEncoder进行编码的消息,必须使用ObjectDecoder或者ObjectDecoderInputStream进行解码,以保证互操作性。

2. 源码分析

  • ObjectEncoder源码
@Sharable
public class ObjectEncoder extends MessageToByteEncoder<Serializable> {
    private static final byte[] LENGTH_PLACEHOLDER = new byte[4];

    @Override
    protected void encode(ChannelHandlerContext ctx, Serializable msg, ByteBuf out) throws Exception {
        int startIdx = out.writerIndex();

        ByteBufOutputStream bout = new ByteBufOutputStream(out);
        ObjectOutputStream oout = null;
        try {
            // 四字节长度
            bout.write(LENGTH_PLACEHOLDER);
            oout = new CompactObjectOutputStream(bout);
            // 写入序列化的对象
            oout.writeObject(msg);
            oout.flush();
        } finally {
            if (oout != null) {
                oout.close();
            } else {
                bout.close();
            }
        }

        int endIdx = out.writerIndex();

        out.setInt(startIdx, endIdx - startIdx - 4);
    }
}

3. 实现

3.1 ObjServer

package com.ll.serialization;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

public class ObjServer {

	public static final Logger log = LoggerFactory.getLogger(ObjServer.class);
	private final String ip = "127.0.0.1";
    private final String port = "8001";
	
    public void init(){
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        ServerBootstrap bs = new ServerBootstrap();
        bs.group(bossGroup, workerGroup)
          .channel(NioServerSocketChannel.class)
          .option(ChannelOption.SO_BACKLOG, 1024)
          .childHandler(new ChannelInitializer<Channel>(){
            @Override
            protected void initChannel(Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                pipeline.addLast(new LoggingHandler(LogLevel.INFO));
                // 服务端解码接收的对象码流
                pipeline.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(ClassLoader.getSystemClassLoader())));
                // 服务端返回客户端,编码java对象
                pipeline.addLast(new ObjectEncoder());
                pipeline.addLast(new ObjHandler());
            }
        });

        try {
            ChannelFuture channelFuture = bs.bind(ip, Integer.parseInt(port)).sync();
            log.info("Netty Server 启动成功! Ip: " + channelFuture.channel().localAddress().toString() + " ! ");

            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

3.2 ObjHandler

public class ObjHandler extends ChannelInboundHandlerAdapter {

	public static final Logger log = LoggerFactory.getLogger(ObjHandler.class);
	
	int counter = 0;

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		Dept body = (Dept)msg;
		log.info("this is " + ++counter + " times receive client [" + body.getDeptName() + "]");
		
		ctx.writeAndFlush(body);
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		ctx.close();
	}
}

3.3 ObjClientHandler

使用熟悉的“Hello world!”字符实验。

public class ObjClientHandler extends ChannelInboundHandlerAdapter {

	public static final Logger log = LoggerFactory.getLogger(ObjClientHandler.class);
	
	int counter = 0;
	
	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		Dept dept = new Dept("Hel", "Hello world!");
		ctx.writeAndFlush(dept);
	}

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		log.info("this is " + ++counter + " times receive server [" + msg.toString() + "]");
	}

	@Override
	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
		ctx.flush();
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		ctx.close();
	}	
}

3.4 ObjClient

public class ObjClient {
	public static final Logger log = LoggerFactory.getLogger(ObjClient.class);

	public void connect(int port, String host) throws InterruptedException {
		EventLoopGroup group = new NioEventLoopGroup();
		
		Bootstrap bootstrap = new Bootstrap();
		bootstrap.group(group)
		    .channel(NioSocketChannel.class)
		    .option(ChannelOption.TCP_NODELAY, true)
		    .handler(new ChannelInitializer<NioSocketChannel>() {
		    	@Override
	            protected void initChannel(NioSocketChannel ch) throws Exception {
	                final ChannelPipeline pipeline = ch.pipeline();
	                pipeline.addLast(new LoggingHandler(LogLevel.INFO));
	                // 客户端编码要发送的java对象
	                pipeline.addLast(new ObjectEncoder());
	                // 客户端解码接收的java对象码流
	                pipeline.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(ClassLoader.getSystemClassLoader())));
	                pipeline.addLast(new ObjClientHandler());
	            }
		    });
		
		ChannelFuture future = bootstrap.connect(host, port).sync();
		
		future.channel().closeFuture().sync();
	}
	
	public static void main(String[] args) throws InterruptedException {
		new ObjClient().connect(8001, "127.0.0.1");
	}
}

3.5 Dept序列化对象

public class Dept implements java.io.Serializable {

	private static final long serialVersionUID = 4663236112565339000L;
	
	public static final Logger log = LoggerFactory.getLogger(Dept.class);

	private String deptName;
	
	private String deptRemark;
	
	public Dept(String deptName, String deptRemark) {
		this.deptName = deptName;
		this.deptRemark = deptRemark;
	}

	public String getDeptName() {
		return deptName;
	}

	public void setDeptName(String deptName) {
		this.deptName = deptName;
	}

	public String getDeptRemark() {
		return deptRemark;
	}

	public void setDeptRemark(String deptRemark) {
		this.deptRemark = deptRemark;
	}
	
	@Override
	public String toString() {
		return "deptName: "+deptName+", deptRemark: "+deptRemark;
	}
}

3.6 服务端日志

  • 读取数据日志
    Netty框架编解码之ObjectDecoder码流测量

服务端读取到58字节。
日志打印数据用16进制展示ascii码。
"Hello world!"数据的十进制和十六进制对应关系如下表。

H e l l o w o r l d !
Hex 48 65 6c 6c 6f 20 77 6f 72 6c 64 21
Dec 72 101 108 108 111 32 119 111 114 108 100 33

3.7 客户端日志

  • 发送数据日志
    Netty框架编解码之ObjectDecoder码流测量
  • 接收数据日志
    Netty框架编解码之ObjectDecoder码流测量

4. 总结

同样的java object,在上一篇文章中使用java 序列化码流大小为117字节,使用netty自带的码流为58字节,只看体积就可以看出是有差距的。当传输数据量比较大的时候,编码后的字节数组越大,存储的时候越占空间,存储的硬件成本响应变高,网络传输时更占带宽,系统的存储量就降低。所以相比较而言,Java序列化方式在码流大小方面局限性还是有的。

参照上一篇:Netty框架编解码之Java序列化码流测量

上一篇:Netty实现Rpc调用


下一篇:2021Java精选面试实战总结整理,netty视频教程