Netty框架编解码之MessagePack框架应用

netty中MessagePack框架应用

1. 介绍

MessagePack是一个高效的二进制框架,主要体现在性能和码流大小方面。并且该框架是支持多语言的。

2. 应用

2.1 添加依赖

<!-- https://mvnrepository.com/artifact/org.msgpack/msgpack -->
<dependency>
    <groupId>org.msgpack</groupId>
    <artifactId>msgpack</artifactId>
    <version>0.6.12</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.javassist/javassist -->
<dependency>
    <groupId>org.javassist</groupId>
    <artifactId>javassist</artifactId>
    <version>3.28.0-GA</version>
</dependency>

2.2 pojo定义

定义一个Dept和RetMsg,Dept用于模拟业务数据,RetMsg用于模拟服务端返回的数据。

  • Dept.java
@Message(value=FieldOption.DEFAULT)
public class Dept {

	public static final Logger log = LoggerFactory.getLogger(Dept.class);

	private String deptName;
	
	private String deptRemark;
	
	public Dept() {}
	
	public Dept(String deptName, String deptRemark) {
		this.deptName = deptName;
		this.deptRemark = deptRemark;
	}

	public String getDeptName() {
		return deptName;
	}

	public void setDeptName(String deptName) {
		this.deptName = deptName;
	}

	public String getDeptRemark() {
		return deptRemark;
	}

	public void setDeptRemark(String deptRemark) {
		this.deptRemark = deptRemark;
	}
	
	@Override
	public String toString() {
		return "deptName: "+deptName+", deptRemark: "+deptRemark;
	}
}
  • RetMsg.java
@Message(value=FieldOption.DEFAULT)
public class RetMsg {

	private Integer code;
	
	private String msg;

	public Integer getCode() {
		return code;
	}

	public void setCode(Integer code) {
		this.code = code;
	}

	public String getMsg() {
		return msg;
	}

	public void setMsg(String msg) {
		this.msg = msg;
	}

	@Override
	public String toString() {
		return "code=" + String.valueOf(getCode()) + ", msg: " + getMsg();
	}
}

2.3 自定义解码器(MsgpackDecoder)

进行反序列化操作,将byte array解码为org.msgpack.type.Value。

public class MsgpackDecoder extends MessageToMessageDecoder<ByteBuf>{

	@Override
	protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
		
		final int length = msg.readableBytes();
		
		final byte[] dst = new byte[length];
		msg.getBytes(msg.readerIndex(), dst, 0, length);
		
		MessagePack messagePack = new MessagePack();
		out.add(messagePack.read(dst));
	}

}

2.4 自定义编码器(MsgpackEncoder)

使用MessagePack将java object编码为byte数组。

public class MsgpackEncoder extends MessageToByteEncoder<Object>{

	@Override
	protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
		MessagePack messagePack = new MessagePack();
		byte[] sendMsg = messagePack.write(msg);
		out.writeBytes(sendMsg);
	}
}

2.5 自定义服务端业务处理器(MsgpackHandler)

解码器传过来的数据是Value,此处模拟业务处理,并向客户端返回数据。

public class MsgpackHandler extends ChannelInboundHandlerAdapter {

	public static final Logger log = LoggerFactory.getLogger(MsgpackHandler.class);
	
	int counter = 0;

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		log.info("this is " + ++counter + " times receive client [" + msg.toString() + "]");
		ArrayValue value = (ArrayValue)msg;
		
		Value[] array = value.getElementArray();
		RawValue v0 = array[0].asRawValue();
		RawValue v1 = array[1].asRawValue();
		// 模拟获取业务数据
		Dept dept = new Dept();
		dept.setDeptName(v0.getString());
		dept.setDeptRemark(v1.getString());
				
		RetMsg retMsg= new RetMsg();
		
		// 根据客户端传送的业务数据返回不同的数据
		if (dept.getDeptName().equals("Hel")) {
			retMsg.setCode(0);
			retMsg.setMsg("成功");
		} else {
			retMsg.setCode(1);
			retMsg.setMsg("失败");
		}

		ctx.write(retMsg);
	}

	@Override
	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
		ctx.flush();
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		cause.printStackTrace();
		ctx.close();
	}
}

2.6 自定义客户端业务处理器(MsgpackClientHandler)

public class MsgpackClientHandler extends ChannelInboundHandlerAdapter {

	public static final Logger log = LoggerFactory.getLogger(MsgpackClientHandler.class);
	
	int counter = 0;
	
	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		// 此处修改dept内容,可以模拟服务端不同的返回数据
		Dept dept = new Dept("Hel", "Hello world!");
		ctx.write(dept);
		ctx.flush();
	}

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		log.info("this is " + ++counter + " times receive server [" + msg.toString() + "]");
	}

	@Override
	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
		ctx.flush();
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		ctx.close();
	}
}

2.7 服务端引导程序(MsgpackServer)

public class MsgpackServer {

	public static final Logger log = LoggerFactory.getLogger(MsgpackServer.class);
	private final String ip = "127.0.0.1";
    private final String port = "8001";
	
    public void init(){
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        ServerBootstrap bs = new ServerBootstrap();
        bs.group(bossGroup, workerGroup)
          .channel(NioServerSocketChannel.class)
          .option(ChannelOption.SO_BACKLOG, 1024)
          .childHandler(new ChannelInitializer<Channel>(){
            @Override
            protected void initChannel(Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                pipeline.addLast(new LoggingHandler(LogLevel.INFO));
                pipeline.addLast("msgpack decoder", new MsgpackDecoder());
                pipeline.addLast("msgpack encoder", new MsgpackEncoder());
                pipeline.addLast(new MsgpackHandler());
            }
        });

        try {
            ChannelFuture channelFuture = bs.bind(ip, Integer.parseInt(port)).sync();
            log.info("Netty Server 启动成功! Ip: " + channelFuture.channel().localAddress().toString() + " ! ");

            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

2.8 客户端引导程序(MsgpackClient)

public class MsgpackClient {
	public static final Logger log = LoggerFactory.getLogger(MsgpackClient.class);

	public void connect(int port, String host) throws InterruptedException {
		EventLoopGroup group = new NioEventLoopGroup();
		
		Bootstrap bootstrap = new Bootstrap();
		bootstrap.group(group)
		    .channel(NioSocketChannel.class)
		    .option(ChannelOption.TCP_NODELAY, true)
		    .handler(new ChannelInitializer<NioSocketChannel>() {
		    	@Override
	            protected void initChannel(NioSocketChannel ch) throws Exception {
	                final ChannelPipeline pipeline = ch.pipeline();
	                pipeline.addLast(new LoggingHandler(LogLevel.INFO));
	                pipeline.addLast("msgpack decoder", new MsgpackDecoder());
	                pipeline.addLast("msgpack encoder", new MsgpackEncoder());
	                pipeline.addLast(new MsgpackClientHandler());
	            }
		    });
		
		ChannelFuture future = bootstrap.connect(host, port).sync();
		
		future.channel().closeFuture().sync();
	}
	
	public static void main(String[] args) throws InterruptedException {
		new MsgpackClient().connect(8001, "127.0.0.1");
	}
}

2.9 服务端日志

Netty框架编解码之MessagePack框架应用

服务端读取到18字节,内容为“Hel”和“Hello world!”,返回数据是9个字节。

2.10 客户端日志

Netty框架编解码之MessagePack框架应用

客户端发送dept对象数据,内容分别是“Hel”和“Hello world!”。
客户端接收到服务端的数据是9字节,对象是retMsg,数据内容是0和“成功”。

3. 总结

使用MessagePack二进制框架后,同样的Dept对象,同样的内容(“Hel”,“Hello world!”)MessagePack码流更小了,只有18字节。

java序列化 netty ObjectDecoder MessagePack框架
117B 58B 18B

参照:Netty框架编解码之ObjectDecoder码流测量
参照:Netty框架编解码之Java序列化码流测量

上一篇:浅谈Netty中的FastThreadLocal的优势和实现


下一篇:Netty实现Rpc调用