序列化在Netty中的使用

  • Java序列化的缺点

1、无法跨语言

  对于Java序列化后的字节数组,别的语言无法进行反序列化

2、序列化后的码流过大

3、序列化性能低

  • 使用JDK自带的序列化进行对象的传输

被传输的,实现了序列化接口的POJO

package org.zln.netty.five.part04.dto;

import org.apache.commons.lang3.builder.ToStringBuilder;

import java.io.Serializable;

/**
* 用户订购请求信息
* Created by sherry on 16/11/7.
*/
public class SubscribeReq implements Serializable{ //序列化
private static final long serialVersionUID = 1L; //订购编号
private int subReqID;
//用户名
private String userName;
//订购的产品名称
private String productName;
//订购者联系电话
private String phoneName;
//订购者家庭地址
private String address; @Override
public String toString() {
return new ToStringBuilder(this)
.append("subReqID", subReqID)
.append("userName", userName)
.append("productName", productName)
.append("phoneName", phoneName)
.append("address", address)
.toString();
} public int getSubReqID() {
return subReqID;
} public void setSubReqID(int subReqID) {
this.subReqID = subReqID;
} public String getUserName() {
return userName;
} public void setUserName(String userName) {
this.userName = userName;
} public String getProductName() {
return productName;
} public void setProductName(String productName) {
this.productName = productName;
} public String getPhoneName() {
return phoneName;
} public void setPhoneName(String phoneName) {
this.phoneName = phoneName;
} public String getAddress() {
return address;
} public void setAddress(String address) {
this.address = address;
}
}

SubscribeReq

package org.zln.netty.five.part04.dto;

import org.apache.commons.lang3.builder.ToStringBuilder;

import java.io.Serializable;

/**
* 订单请求的响应信息
* Created by sherry on 16/11/7.
*/
public class SubscribeResp implements Serializable { //序列化
private static final long serialVersionUID = 1L; //订购编号
private int subReqID;
//订购结果 0-未成功
private int respCode;
//可选的详细描述信息
private String desc; @Override
public String toString() {
return new ToStringBuilder(this)
.append("subReqID", subReqID)
.append("respCode", respCode)
.append("desc", desc)
.toString();
} public int getSubReqID() {
return subReqID;
} public void setSubReqID(int subReqID) {
this.subReqID = subReqID;
} public int getRespCode() {
return respCode;
} public void setRespCode(int respCode) {
this.respCode = respCode;
} public String getDesc() {
return desc;
} public void setDesc(String desc) {
this.desc = desc;
}
}

SubscribeResp

服务端

package org.zln.netty.five.part04.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory; /**
* Created by sherry on 16/11/5.
*/
public class SubReqServer {
/**
* 服务端绑定端口号
*/
private int PORT; public SubReqServer(int PORT) {
this.PORT = PORT;
} /**
* 日志
*/
private static Logger logger = LoggerFactory.getLogger(SubReqServer.class); public void bind() {
/*
NioEventLoopGroup是线程池组
包含了一组NIO线程,专门用于网络事件的处理
bossGroup:服务端,接收客户端连接
workGroup:进行SocketChannel的网络读写
*/
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
try {
/*
ServerBootstrap:用于启动NIO服务的辅助类,目的是降低服务端的开发复杂度
*/
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)//配置TCP参数,能够设置很多,这里就只设置了backlog=1024,
.handler(new LoggingHandler(LogLevel.DEBUG))
.childHandler(new SubReqServerInitializer());//绑定I/O事件处理类
logger.debug("绑定端口号:" + PORT + ",等待同步成功");
/*
bind:绑定端口
sync:同步阻塞方法,等待绑定完成,完成后返回 ChannelFuture ,主要用于通知回调
*/
ChannelFuture channelFuture = serverBootstrap.bind(PORT).sync();
logger.debug("等待服务端监听窗口关闭");
/*
closeFuture().sync():为了阻塞,服务端链路关闭后才退出.也是一个同步阻塞方法
*/
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
} finally {
logger.debug("优雅退出,释放线程池资源");
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}

SubReqServer

package org.zln.netty.five.part04.server;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder; /**
* Created by sherry on 16/11/5.
*/
public class SubReqServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); /*
ObjectDecoder:对实现了Serializable接口的POJO进行解码,用于解码请求对象
1024 * 1024:设置1M大小,为最大的单个对象的序列化后的字节长度
ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())):对类加载器进行缓存,线程安全.支持多线程并发访问,当虚拟机内存不足时,释放缓存时的内存,防止内存泄漏
*/
pipeline.addLast(new ObjectDecoder(1024 * 1024, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
/*
ObjectEncoder:服务端在发送的时候,将对象进行编码操作
*/
pipeline.addLast(new ObjectEncoder());
pipeline.addLast(new SubReqServerHandler()); }
}

SubReqServerInitializer

package org.zln.netty.five.part04.server;

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zln.netty.five.part03.SessionUtils;
import org.zln.netty.five.part04.dto.SubscribeReq;
import org.zln.netty.five.part04.dto.SubscribeResp; /**
* Handler主要用于对网络事件进行读写操作,是真正的业务类
* 通常只需要关注 channelRead 和 exceptionCaught 方法
* Created by sherry on 16/11/5.
*/
public class SubReqServerHandler extends ChannelHandlerAdapter { /**
* 日志
*/
private Logger logger = LoggerFactory.getLogger(SubReqServerHandler.class); private static int count = 0; @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
SubscribeReq subscribeReq = (SubscribeReq) msg;
logger.debug("收到的订单请求信息:\n"+subscribeReq); //根据请求信息,设置响应信息
SubscribeResp subscribeResp = getSubscribeResp(subscribeReq);
ctx.writeAndFlush(subscribeResp); } private SubscribeResp getSubscribeResp(SubscribeReq subscribeReq) {
SubscribeResp subscribeResp = new SubscribeResp();
subscribeResp.setSubReqID(subscribeReq.getSubReqID());
subscribeResp.setRespCode(subscribeReq.getSubReqID());
subscribeResp.setDesc("你订购的是一双红皮鞋,三天后发货");
return subscribeResp;
} @Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//将消息发送队列中的消息写入到SocketChannel中发送给对方
logger.debug("channelReadComplete");
ctx.flush();
} @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//发生异常时,关闭 ChannelHandlerContext,释放ChannelHandlerContext 相关的句柄等资源
logger.error(cause.getMessage(),cause);
ctx.close();
}
}

SubReqServerHandler

客户端

package org.zln.netty.five.part04.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zln.netty.five.part03.EchoServer; /**
* Created by sherry on 16/11/5.
*/
public class SubReqClient {
/**
* 日志
*/
private Logger logger = LoggerFactory.getLogger(EchoServer.class); private String HOST;
private int PORT; public SubReqClient(String HOST, int PORT) {
this.HOST = HOST;
this.PORT = PORT;
} public void connect(){
//配置客户端NIO线程组
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY,true)
.handler(new SubReqClientInitializer());
//发起异步连接操作
logger.debug("发起异步连接操作 - start");
ChannelFuture channelFuture = bootstrap.connect(HOST,PORT).sync();
logger.debug("发起异步连接操作 - end");
//等待客户端链路关闭
logger.debug("等待客户端链路关闭 - start");
channelFuture.channel().closeFuture().sync();
logger.debug("等待客户端链路关闭 - end");
} catch (InterruptedException e) {
logger.error(e.getMessage(),e);
}finally {
//优雅的关闭
eventLoopGroup.shutdownGracefully();
}
}
}

SubReqClient

package org.zln.netty.five.part04.client;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder; /**
* Created by sherry on 16/11/5.
*/
public class SubReqClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); //与服务端不同,这里禁止对类加载器进行缓存
pipeline.addLast(new ObjectDecoder(1024, ClassResolvers.cacheDisabled(this.getClass().getClassLoader())));
pipeline.addLast(new ObjectEncoder());
pipeline.addLast(new SubReqClientHandler());
}
}

SubReqClientInitializer

package org.zln.netty.five.part04.client;

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zln.netty.five.part04.dto.SubscribeReq;
import org.zln.netty.five.part04.dto.SubscribeResp; /**
* Created by sherry on 16/11/5.
*/
public class SubReqClientHandler extends ChannelHandlerAdapter { /**
* 日志
*/
private Logger logger = LoggerFactory.getLogger(SubReqClientHandler.class); private static int count = 0; @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
logger.debug("客户端连接上了服务端"); for (int i = 0; i < 10; i++) {
SubscribeReq subscribeReq = getSubscribeReq(i);
ctx.writeAndFlush(subscribeReq);
} } private SubscribeReq getSubscribeReq(int i) {
SubscribeReq subscribeReq = new SubscribeReq();
subscribeReq.setSubReqID(i);
subscribeReq.setUserName("张柳宁"+i);
subscribeReq.setProductName("红皮鞋"+i);
subscribeReq.setPhoneName("123"+i);
subscribeReq.setAddress("地址"+i);
return subscribeReq;
} @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
SubscribeResp subscribeResp = (SubscribeResp) msg;
logger.debug("这是收到的第 " + (++count) + " 笔响应 -- " + subscribeResp);
} @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}

SubReqClientHandler

  • 使用Marshalling第三方jar进行序列化

POJO不变

工具类

package org.zln.netty.five.part05.marshalling;

import io.netty.handler.codec.marshalling.DefaultMarshallerProvider;
import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider;
import io.netty.handler.codec.marshalling.MarshallerProvider;
import io.netty.handler.codec.marshalling.MarshallingDecoder;
import io.netty.handler.codec.marshalling.MarshallingEncoder;
import io.netty.handler.codec.marshalling.UnmarshallerProvider; import org.jboss.marshalling.MarshallerFactory;
import org.jboss.marshalling.Marshalling;
import org.jboss.marshalling.MarshallingConfiguration; public class MarshallingCodeCFactory {
public static MarshallingDecoder buildMarshallingDecoder() {
/*
* 通过 Marshalling 工具类的 getProvidedMarshallerFactory
* 静态方法获取MarshallerFactory 实例, , 参数 serial 表示创建的是 Java 序列化工厂对象.它是由
* jboss-marshalling-serial 包提供
*/
final MarshallerFactory marshallerFactory = Marshalling
.getProvidedMarshallerFactory("serial");
/*
* 创建
*/
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5); UnmarshallerProvider provider = new DefaultUnmarshallerProvider(
marshallerFactory, configuration);
/*
* provider : 提供商 maxSize : 单个对象最大尺寸
*/
int maxSize = 1024 << 2;
MarshallingDecoder decoder = new MarshallingDecoder(provider, maxSize);
return decoder;
} public static MarshallingEncoder buildMarshallingEncoder() {
final MarshallerFactory marshallerFactory = Marshalling
.getProvidedMarshallerFactory("serial");
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
MarshallerProvider provider = new DefaultMarshallerProvider(
marshallerFactory, configuration);
MarshallingEncoder decoder = new MarshallingEncoder(provider);
return decoder;
} }

MarshallingCodeCFactory

服务端

package org.zln.netty.five.part05.server;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import org.zln.netty.five.part05.marshalling.MarshallingCodeCFactory; /**
* Created by sherry on 16/11/5.
*/
public class SubReqServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); //解码器
pipeline.addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
//编码器
pipeline.addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
pipeline.addLast(new SubReqServerHandler()); }
}

客户端

package org.zln.netty.five.part05.client;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import org.zln.netty.five.part05.marshalling.MarshallingCodeCFactory; /**
* Created by sherry on 16/11/5.
*/
public class SubReqClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); //解码器
pipeline.addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
//编码器
pipeline.addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
pipeline.addLast(new SubReqClientHandler());
}
}

其他代码和使用JDK序列化的时候完全一样

上一篇:beaglebone black 安装QNX过程


下一篇:(转)jquery.url.js 插件的使用