使用JDK序列化不需要额外的类库,只需要实现Serializable即可,但是序列化之后的码流只有Java才能反序列化,所以它不是跨语言的,另外由于Java序列化后码流比较大,效率也不高,所以在RPC中很少使用,本文只是做学习之用。
编解码器:
public class JdkDecoder extends MessageToMessageDecoder<ByteBuf> {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
final int length = byteBuf.readableBytes();
final byte[] b = new byte[length];
byteBuf.getBytes(byteBuf.readerIndex(), b, 0, length); ByteArrayInputStream bis = new ByteArrayInputStream(b);
ObjectInputStream ois = new ObjectInputStream(bis);
list.add(ois.readObject());
ois.close();
}
} public class JdkEncoder extends MessageToByteEncoder<Object> {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos); oos.writeObject(o);
oos.flush();
byteBuf.writeBytes(bos.toByteArray());
bos.close();
oos.close();
}
}
---
传输对象:
public class Person implements Serializable{ private int age; private String name; private boolean man; private List<String> list; private Date birth; private Person son; public int getAge() {
return age;
} public void setAge(int age) {
this.age = age;
} 。。。 @Override
public String toString() {
return "Person{" +
"age=" + age +
", name='" + name + '\'' +
", man=" + man +
", list=" + list +
", birth=" + birth +
", son=" + son +
'}';
}
}
---
Server端:
public class EchoServer {
public static void main(String[] args) {
new EchoServer().bind(8080);
} public void bind(int port) {
//配置服务端的线程组,一个用于服务端接收客户端连接,另一个进行SocketChannel的网络读写
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//ServerBootstrap用于启动NIO服务端的辅助启动类
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
//.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2)); //ch.pipeline().addLast(new MsgpackDecoder());
ch.pipeline().addLast(new JdkDecoder()); //在报文前增加2个字节,写消息长度
ch.pipeline().addLast(new LengthFieldPrepender(2)); //ch.pipeline().addLast(new MsgpackEncoder());
ch.pipeline().addLast(new JdkEncoder()); ch.pipeline().addLast(new EchoServerHandler());
}
});
//绑定端口,sync为同步阻塞方法,等待绑定成功,ChannelFuture用于异步操作的通知回调
ChannelFuture future = bootstrap.bind(port).sync();
System.out.println("server started");
//等待服务端监听端口关闭
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("server shuting down");
//释放线程资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
} public class EchoServerHandler extends ChannelInboundHandlerAdapter {
int count = 0; @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if(msg instanceof Person){
Person p = (Person)msg;
Q.p(p.toString());
}else {
System.out.println("The server received(" + count++ + "): " + msg);
} ctx.writeAndFlush(msg);//异步发送
} @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
} }
---
Client端:
public class EchoClient { public static void main(String[] args) {
new EchoClient().connect("127.0.0.1", 8080);
} public void connect(String host, int port) {
//配置客户端NIO线程组
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2)); //ch.pipeline().addLast(new MsgpackDecoder());
ch.pipeline().addLast(new JdkDecoder()); ch.pipeline().addLast(new LengthFieldPrepender(2)); //ch.pipeline().addLast(new MsgpackEncoder());
ch.pipeline().addLast(new JdkEncoder()); ch.pipeline().addLast(new EchoClientHandler());
}
});
//发起异步连接操作,同步等待连接成功
ChannelFuture future = bootstrap.connect(host, port).sync();
System.out.println("client started");
//等待客户端链路关闭
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("client shuting down");
//释放NIO线程组
group.shutdownGracefully();
}
}
} public class EchoClientHandler extends ChannelInboundHandlerAdapter { private int count = 0; @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
List l = new ArrayList<String>();
l.add("abc");
l.add("123");
Person p = new Person();
p.setName("luangeng");
p.setMan(true);
p.setBirth(new Date());
p.setList(l);
for (int i = 0; i < 10; i++) {
p.setAge(i);
ctx.write(p);
}
ctx.flush();
} //服务端返回应答信息后调用
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if(msg instanceof Person){
Person p = (Person)msg;
Q.p(p.toString());
}else {
Q.p(count++ + " client get: " + msg);
}
} @Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
} @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
---
执行结果:
client started
Person{age=0, name='luangeng', man=true, list=[abc, 123], birth=Wed Nov 22 21:18:48 CST 2017, son=null}
Person{age=1, name='luangeng', man=true, list=[abc, 123], birth=Wed Nov 22 21:18:48 CST 2017, son=null}
Person{age=2, name='luangeng', man=true, list=[abc, 123], birth=Wed Nov 22 21:18:48 CST 2017, son=null}
Person{age=3, name='luangeng', man=true, list=[abc, 123], birth=Wed Nov 22 21:18:48 CST 2017, son=null}
Person{age=4, name='luangeng', man=true, list=[abc, 123], birth=Wed Nov 22 21:18:48 CST 2017, son=null}
Person{age=5, name='luangeng', man=true, list=[abc, 123], birth=Wed Nov 22 21:18:48 CST 2017, son=null}
Person{age=6, name='luangeng', man=true, list=[abc, 123], birth=Wed Nov 22 21:18:48 CST 2017, son=null}
Person{age=7, name='luangeng', man=true, list=[abc, 123], birth=Wed Nov 22 21:18:48 CST 2017, son=null}
Person{age=8, name='luangeng', man=true, list=[abc, 123], birth=Wed Nov 22 21:18:48 CST 2017, son=null}
Person{age=9, name='luangeng', man=true, list=[abc, 123], birth=Wed Nov 22 21:18:48 CST 2017, son=null}
Netty也提供了基于JDK的编解码器,可以直接使用,比如:
channel.pipeline()
.addLast(new LengthFieldBasedFrameDecoder(65535, 0, 4, 0, 4))
.addLast(new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())))
.addLast(new LengthFieldPrepender(4))
.addLast(new ObjectEncoder())
.addLast(new ServerHandler());
---
MessagePack工具:
MessagePack是与JSON数据格式类似的二进制序列化格式,更快更小,并且是跨语言的,用于在多个语言之间交换数据。使用MessagePack实现的编解码器如下:
public class MsgpackEncoder extends MessageToByteEncoder<Object> {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception {
MessagePack mp = new MessagePack();
byte[] raw = mp.write(o);
byteBuf.writeBytes(raw);
}
} public class MsgpackDecoder extends MessageToMessageDecoder<ByteBuf> {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
final int length = byteBuf.readableBytes();
final byte[] b = new byte[length];
byteBuf.getBytes(byteBuf.readerIndex(), b, 0, length);
MessagePack mp = new MessagePack();
list.add(mp.read(b));
}
}
---
使用这种编解码后,服务端和客户端接收到的对象都不能转换为Person对象。
end