064:Netty实战与反序列化与序列化协议
1 回顾上节课TCP协议粘包拆分解决方案
课程内容:
1.实现序列化与反序列化
2.序列化的技术实现方案
3.使用json格式实现rpc通讯
4.Netty使用 MessagePack编码器
常用编码器
DelimiterBasedFrameDecoder 解决TCP的粘包解码器
StringDecoder 消息转成String解码器
LineBasedFrameDecoder 自动完成标识符分隔解码器
FixedLengthFrameDecoder 固定长度解码器,二进制
Base64Decoder解码器
2 Java序列化与反序列化的概念
序列化与反序列化
序列化:把对象转换成字节的过程,称为对象序列化
反序列化:把字节恢复成对象的过程,称为反序列化
网络通讯中不可以直接传递对象。
序列化模式有两种:
对象持久化概念:将对象转换成字节,存放到硬盘或者是数据库中;
网络传输对象概念:客户端将对象转成字节的形式(序列化)、变成二进制的形式发送给服务器端,服务器端接收到字节之后,反序列化成对象(rpc远程通讯)
3 Java实现对象持久化操作
Java实现序列化与反序列化
@Data
@AllArgsConstructor
public class MsgEntity implements Serializable {
private String msgContext;
private String msgId;
}
public class Test001 {
public static void main(String[] args) {
String fileName = "D:\\codes\\msg";
serialize(fileName);
deserialize(fileName);
}
/**
* 序列化对象到文件
*
* @param fileName
*/
public static void serialize(String fileName) {
try {
ObjectOutputStream out = new ObjectOutputStream(new FileOutputStream(fileName));
//序列化一个字符串到文件
out.writeObject("序列化的日期是:");
//序列化一个当前日期对象到文件
out.writeObject(new Date());
MsgEntity msgEntity = new MsgEntity(UUID.randomUUID().toString(), "java序列化到硬盘");
//序列化一个MsgEntity对象
out.writeObject(msgEntity);
out.close();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 反序列化
*
* @param fileName
*/
public static void deserialize(String fileName) {
try {
//从本地文件中读取字节 变为对象
ObjectInputStream in = new ObjectInputStream(new FileInputStream(fileName));
String str = (String) in.readObject();
//日期对象
Date date = (Date) in.readObject();
//MsgEntity对象
MsgEntity userInfo = (MsgEntity) in.readObject();
System.out.println(str);
System.out.println(date);
System.out.println(userInfo);
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
}
测试结果:
注意:序列化的类必须要实现Serializable接口;transient修饰的变量不被序列化
4 构建String类型客户与服务器端通讯
服务端
public class NettyServer {
private static int inetPort = 8080;
public static void main(String[] args) {
// 使用netty创建服务器端的时候,采用两个线程池
// boss线程池 负责接收请求
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
// work线程池 处理请求读写操作
NioEventLoopGroup workGroup = new NioEventLoopGroup();
// 创建serverBootstrap
ServerBootstrap serverBootstrap = new ServerBootstrap();
// serverSocketChannel管理多个socketChannel
// NioServerSocketChannel标记当前为服务器端
serverBootstrap.group(workGroup, workGroup).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
// 处理每个请求handler
sc.pipeline().addLast(new StringEncoder());
sc.pipeline().addLast(new StringDecoder());
sc.pipeline().addLast(new ServerHandler());
}
});
try {
// 绑定端口号
ChannelFuture channelFuture = serverBootstrap.bind(inetPort).sync();
System.out.println("服务器端启动成功:" + inetPort);
// 等待监听请求
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 关闭线程池
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
public class ServerHandler extends SimpleChannelInboundHandler<String> {
/**
* 获取数据
*
* @param channelHandlerContext
* @param msg
* @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
System.out.println("msg:" + msg);
channelHandlerContext.writeAndFlush("平均月薪突破3w");
}
}
客户端
public class NettyClient {
public static void main(String[] args) {
//创建nioEventLoopGroup
NioEventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class)
.remoteAddress(new InetSocketAddress("127.0.0.1", 8080))
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast(new StringEncoder());
sc.pipeline().addLast(new StringDecoder());
sc.pipeline().addLast(new ClientHandler());
}
});
try {
// 发起同步连接
ChannelFuture sync = bootstrap.connect().sync();
sync.channel().closeFuture().sync();
} catch (Exception e) {
} finally {
group.shutdownGracefully();
}
}
}
public class ClientHandler extends SimpleChannelInboundHandler<String> {
/**
* 活跃通道
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush("蚂蚁课堂第6期平均月薪多少?");
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String resp) throws Exception {
System.out.println("resp:" + resp);
}
}
测试结果:
5 对象序列化协议有那些实现方案
思考:网络传输中如何实现对象的传输?其次需要考虑跨语言
其他方式实现序列化:序列化的协议
1.将对象转换成json类型 实现跨语言
客户端将对象转换成json类型,传递给服务器端 序列化
服务器端获取到json,再将json转换成对象 反序列化
2.Xml类型(银行项目报文形式、更重)
3.ProtoBuf 谷歌自定义协议
4.MessagePack
5.企业内部自定义序列化协议
6 使用json协议实现对象的传输
json协议传输弊端:key值占用一定带宽
{“msgContext”:“e5b2625b-5fb6-4b28-9fbe-e01bca47ae8a”,“msgId”:“蚂蚁课堂”}
==>
{“e5b2625b-5fb6-4b28-9fbe-e01bca47ae8a”,“蚂蚁课堂”}
7 MessagePack编码器基本介绍
MessagePack编码器
它像 JSON,但是更快更小。
MessagePack 是一种高效的二进制序列化格式。它允许在JSON等多种语言之间交换数据,但它更快速更小巧。小整数被编码为单个字节,典型的短字符串除了字符串本身之外只需要一个额外的字节。
支持Python、Ruby、Java、C/C++等众多语言。宣称比Google Protocol Buffers还要快4倍。
maven依赖
<!-- MessagePack dependency -->
<dependency>
<groupId>org.msgpack</groupId>
<artifactId>msgpack</artifactId>
<version>0.6.12</version>
</dependency>
测试MessagePack
public class Test002 {
public static void main(String[] args) throws IOException {
MsgEntity msgEntity = new MsgEntity(UUID.randomUUID().toString(),"mayikt");
String json = JSONObject.toJSONString(msgEntity);
System.out.println("json:"+json);
MessagePack messagePack = new MessagePack();
// 序列化 将对象转换为字节
byte[] bs = messagePack.write(msgEntity);
// 反序列化
Value read = messagePack.read(bs);
System.out.println(read);
}
// json:{"msgContext":"438d6165-9f01-46e1-b9d6-483111d51d2f","msgId":"mayikt"}
// ["438d6165-9f01-46e1-b9d6-483111d51d2f","mayikt"]
}
注意:
1.MsgEntity类上加注解@Message、@NoArgsConstructor(或自定义无参构造函数)
2.MessagePack处理MsgEntity类字段严格按照字段排序解析,收发双方必须保证对象字段顺序一致。
8 Netty框架使用MessagePack编码器
MessagePack解码器/编码器
public class MsgpackDecoder extends MessageToMessageDecoder<ByteBuf> {
/**
* 服务器解码数据
*
* @param channelHandlerContext
* @param byteBuf
* @param list
* @throws Exception
*/
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
final int length = byteBuf.readableBytes();
byte[] b = new byte[length];
byteBuf.getBytes(byteBuf.readerIndex(), b, 0, length);
MessagePack msgpack = new MessagePack();
list.add(msgpack.read(b));
}
}
public class MsgpackEncoder extends MessageToByteEncoder {
/**
* 对我们数据实现编码
*
* @param channelHandlerContext
* @param msg
* @param byteBuf
* @throws Exception
*/
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, Object msg, ByteBuf byteBuf) throws Exception {
MessagePack msgpack = new MessagePack();
byteBuf.writeBytes(msgpack.write(msg));
}
}
NettyServer解码
sc.pipeline().addLast(new MsgpackDecoder());
NettyClient编码
sc.pipeline().addLast(new MsgpackEncoder());
Handler收发消息
public class ClientHandler extends SimpleChannelInboundHandler {
/**
* 活跃通道
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
MsgEntity msgEntity = new MsgEntity(UUID.randomUUID().toString(), "蚂蚁课堂");
ctx.writeAndFlush(msgEntity);
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object resp) throws Exception {
System.out.println("resp:" + resp);
}
}
public class ServerHandler extends SimpleChannelInboundHandler {
/**
* 获取数据
*
* @param channelHandlerContext
* @param msg
* @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object msg) throws Exception {
System.out.println("服务器端获取:" + msg);
// 使用反射机制给对象属性赋值
MsgEntity msgEntity = new MsgEntity();
Class<?> cl = msgEntity.getClass();
Field[] declaredFields = cl.getDeclaredFields();
for (int i = 0; i < declaredFields.length; i++) {
Field declaredField = declaredFields[i];
declaredField.setAccessible(true);
declaredField.set(msgEntity, msg.toString().split(",")[i]);
}
System.out.println("msgEntity:" + msgEntity);
}
}
运行结果: