netty中MessagePack框架应用
1. 介绍
MessagePack是一个高效的二进制框架,主要体现在性能和码流大小方面。并且该框架是支持多语言的。
2. 应用
2.1 添加依赖
<!-- https://mvnrepository.com/artifact/org.msgpack/msgpack -->
<dependency>
<groupId>org.msgpack</groupId>
<artifactId>msgpack</artifactId>
<version>0.6.12</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.javassist/javassist -->
<dependency>
<groupId>org.javassist</groupId>
<artifactId>javassist</artifactId>
<version>3.28.0-GA</version>
</dependency>
2.2 pojo定义
定义一个Dept和RetMsg,Dept用于模拟业务数据,RetMsg用于模拟服务端返回的数据。
- Dept.java
@Message(value=FieldOption.DEFAULT)
public class Dept {
public static final Logger log = LoggerFactory.getLogger(Dept.class);
private String deptName;
private String deptRemark;
public Dept() {}
public Dept(String deptName, String deptRemark) {
this.deptName = deptName;
this.deptRemark = deptRemark;
}
public String getDeptName() {
return deptName;
}
public void setDeptName(String deptName) {
this.deptName = deptName;
}
public String getDeptRemark() {
return deptRemark;
}
public void setDeptRemark(String deptRemark) {
this.deptRemark = deptRemark;
}
@Override
public String toString() {
return "deptName: "+deptName+", deptRemark: "+deptRemark;
}
}
- RetMsg.java
@Message(value=FieldOption.DEFAULT)
public class RetMsg {
private Integer code;
private String msg;
public Integer getCode() {
return code;
}
public void setCode(Integer code) {
this.code = code;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
@Override
public String toString() {
return "code=" + String.valueOf(getCode()) + ", msg: " + getMsg();
}
}
2.3 自定义解码器(MsgpackDecoder)
进行反序列化操作,将byte array解码为org.msgpack.type.Value。
public class MsgpackDecoder extends MessageToMessageDecoder<ByteBuf>{
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
final int length = msg.readableBytes();
final byte[] dst = new byte[length];
msg.getBytes(msg.readerIndex(), dst, 0, length);
MessagePack messagePack = new MessagePack();
out.add(messagePack.read(dst));
}
}
2.4 自定义编码器(MsgpackEncoder)
使用MessagePack将java object编码为byte数组。
public class MsgpackEncoder extends MessageToByteEncoder<Object>{
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
MessagePack messagePack = new MessagePack();
byte[] sendMsg = messagePack.write(msg);
out.writeBytes(sendMsg);
}
}
2.5 自定义服务端业务处理器(MsgpackHandler)
解码器传过来的数据是Value,此处模拟业务处理,并向客户端返回数据。
public class MsgpackHandler extends ChannelInboundHandlerAdapter {
public static final Logger log = LoggerFactory.getLogger(MsgpackHandler.class);
int counter = 0;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("this is " + ++counter + " times receive client [" + msg.toString() + "]");
ArrayValue value = (ArrayValue)msg;
Value[] array = value.getElementArray();
RawValue v0 = array[0].asRawValue();
RawValue v1 = array[1].asRawValue();
// 模拟获取业务数据
Dept dept = new Dept();
dept.setDeptName(v0.getString());
dept.setDeptRemark(v1.getString());
RetMsg retMsg= new RetMsg();
// 根据客户端传送的业务数据返回不同的数据
if (dept.getDeptName().equals("Hel")) {
retMsg.setCode(0);
retMsg.setMsg("成功");
} else {
retMsg.setCode(1);
retMsg.setMsg("失败");
}
ctx.write(retMsg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
2.6 自定义客户端业务处理器(MsgpackClientHandler)
public class MsgpackClientHandler extends ChannelInboundHandlerAdapter {
public static final Logger log = LoggerFactory.getLogger(MsgpackClientHandler.class);
int counter = 0;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 此处修改dept内容,可以模拟服务端不同的返回数据
Dept dept = new Dept("Hel", "Hello world!");
ctx.write(dept);
ctx.flush();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("this is " + ++counter + " times receive server [" + msg.toString() + "]");
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
2.7 服务端引导程序(MsgpackServer)
public class MsgpackServer {
public static final Logger log = LoggerFactory.getLogger(MsgpackServer.class);
private final String ip = "127.0.0.1";
private final String port = "8001";
public void init(){
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap bs = new ServerBootstrap();
bs.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<Channel>(){
@Override
protected void initChannel(Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LoggingHandler(LogLevel.INFO));
pipeline.addLast("msgpack decoder", new MsgpackDecoder());
pipeline.addLast("msgpack encoder", new MsgpackEncoder());
pipeline.addLast(new MsgpackHandler());
}
});
try {
ChannelFuture channelFuture = bs.bind(ip, Integer.parseInt(port)).sync();
log.info("Netty Server 启动成功! Ip: " + channelFuture.channel().localAddress().toString() + " ! ");
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
2.8 客户端引导程序(MsgpackClient)
public class MsgpackClient {
public static final Logger log = LoggerFactory.getLogger(MsgpackClient.class);
public void connect(int port, String host) throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LoggingHandler(LogLevel.INFO));
pipeline.addLast("msgpack decoder", new MsgpackDecoder());
pipeline.addLast("msgpack encoder", new MsgpackEncoder());
pipeline.addLast(new MsgpackClientHandler());
}
});
ChannelFuture future = bootstrap.connect(host, port).sync();
future.channel().closeFuture().sync();
}
public static void main(String[] args) throws InterruptedException {
new MsgpackClient().connect(8001, "127.0.0.1");
}
}
2.9 服务端日志
服务端读取到18字节,内容为“Hel”和“Hello world!”,返回数据是9个字节。
2.10 客户端日志
客户端发送dept对象数据,内容分别是“Hel”和“Hello world!”。
客户端接收到服务端的数据是9字节,对象是retMsg,数据内容是0和“成功”。
3. 总结
使用MessagePack二进制框架后,同样的Dept对象,同样的内容(“Hel”,“Hello world!”)MessagePack码流更小了,只有18字节。
java序列化 | netty ObjectDecoder | MessagePack框架 |
---|---|---|
117B | 58B | 18B |