Netty什么?
Netty项目是一个提供异步事件驱动网络应用框架和快速开发可维护的高性能高扩展性服务端和客户端协议工具集的成果。
换句话说,Netty是一个NIO客户端服务端框架,它使得快速而简单的开发像服务端客户端协议的网络应用成为了可能。它它极大的简化并流线化了如TCP和UDP套接字服务器开发的网络编程。
“快速且简便”不意味着目标应用将容忍维护性和性能上的问题。Netty在吸取了大量协议实现(如FTP,SMTP,HTTP以及各种二进制,基于文本的传统协议)的经验上进行了精心的设计。由此,Netty成功找到了一个无需折衷妥协而让开发、性能、稳定性和灵活性相互协调的方法。
1、回顾
a) http协议是我们接触最多的,定义的API都是基于Http协议的,Http协议属于应用层协议。传输层协议,TCP / UDP协议,TCP是通过三次握手,保障通信的可信,也就是我们常说的长连接。Netty是对TCP协议的封装。
b) JAVA BIO / NIO / AIO
Java1.4版本引入NIO概念,实现了对“多路复用IO”的支持,Java1.7版本引入AIO概念。AIO是最晚提出的,理应是更先进的技术,但是并没有大规模的在商业领域应用。Unix提供了五种参考网络模型,在linux领域,并没有异步IO网络模型的成熟方案(linux发行版内核不支持AIO,需要自己安装扩展包)。JAVA的新版本的AIO,还是用采用多路复用IO实现。在Windows平台通过IOCP协议实现,可参考 IOCP浅析。
2、Netty 编程基础
现在从最简单的Echo程序入手,逐步深入地分享利用Netty如何编程。
Netty Server Exemple
EventLoopGroup group = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup(); try {
ServerBootstrap b = new ServerBootstrap();
b.group(group, workGroup)
.channel(NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new EchoServerHandler());
}
}); ChannelFuture f = b.bind().sync();
System.out.println(EchoServer.class.getName() + " started and listen on " + f.channel().localAddress());
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync();
}
Netty Server Handler Exemple
public class EchoServerHandler extends SimpleChannelInboundHandler<ByteBuf> { @Override
public void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
ByteBuf in = (ByteBuf) msg;
System.out.println("Server received: " + in.toString(CharsetUtil.UTF_8));
ctx.write(Unpooled.copiedBuffer("Response from server. You have input \"" + in.toString(CharsetUtil.UTF_8) + "\"!", CharsetUtil.UTF_8));
ctx.flush();
} @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
Netty Client Exemple
EventLoopGroup group = new NioEventLoopGroup(); try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.remoteAddress(new InetSocketAddress(host, port))
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch)
throws Exception {
ch.pipeline().addLast(
new EchoClientHandler());
}
}); ChannelFuture f = b.connect().sync();
if (f.channel().isActive()) {
f.channel().writeAndFlush(Unpooled.copiedBuffer("Hello Casper!", CharsetUtil.UTF_8));
} Thread.sleep(1000); } finally {
group.shutdownGracefully().sync();
}
Netty Client Handler Exemple
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> { @Override
public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) {
System.out.println("Client received: " + in.toString(CharsetUtil.UTF_8));
} @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
} }
3、BootStrap
一个Netty程序开始于Bootstrap类,Bootstrap类是Netty提供的一个可以通过简单配置来设置或"引导"程序的一个很重要的类。启动Sever端,需要初始化ServerBootStrap。定义两个EventLoopGroup,分别处理连接请求和socket数据传输。Netty巧妙的把接收请求和处理请求,都抽象成ChannelHanndler处理,模式上更加统一。通过阅读代码,接收请求的处理如下:接到连接请求后,设置其初始化参数,然后注册到childGroup处理。
void init(Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = this.childGroup;
final ChannelHandler currentChildHandler = this.childHandler; p.addLast(new ChannelHandler[]{new ChannelInitializer<Channel>() {
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ch.eventLoop().execute(new Runnable() {
public void run() {
pipeline.addLast(new ChannelHandler[]{new ServerBootstrap.ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)});
}
});
}
}});
}
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
private final EventLoopGroup childGroup;
private final ChannelHandler childHandler;
ServerBootstrapAcceptor(final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler, Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
this.childGroup = childGroup;
this.childHandler = childHandler;
} public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel)msg;
child.pipeline().addLast(new ChannelHandler[]{this.childHandler});
try {
this.childGroup.register(child).addListener(new ChannelFutureListener() {
});
} catch (Throwable var8) {
}
}
}
4、EventLoopGroup
SingleThreadEventLoopGroup 一个线程处理所有的Channel
ThreadPerChannelLoopGroup 每个线程处理一个channel
MultiThreadEventLoopGroup 通过线程组处理channel
NIOEventLoopGroup
EpollEventLoopGroup 根据Selecter的不同实现,不同的处理策略。NIOEventLoopGroup,默认采用sellect方式,参考JAVA NIO实现。EpollEventLoopGroup只能在linux平台使用,更高效。
5、编程模式
宏观上,ChannelPipline串联起所有的ChannelHandler,在特定的时间节点,调用Handler的函数,完成处理流程。处理流程如下图所示:
public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler { public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelRegistered();
} public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelUnregistered();
} public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
} public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelInactive();
} public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
} public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelReadComplete();
} public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
ctx.fireUserEventTriggered(evt);
} public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelWritabilityChanged();
} public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.fireExceptionCaught(cause);
}
}
ChannelPipline:内部包含一个ChannelHandlerContext的链表数据结构,每次从header item开始,调用context.invoke函数,开始处理流程。
ChannelHandlerContext:在Channelhandler中,调用context的fire函数;fire函数在pipline中查找需要执行的下一个context,调用context.invoke函数;调用channelhandler的函数。
ChannelHandler:channelChandler分为inbound和outbond,inbound处理接收消息,outbound处理传出。如上所示,channelInboundHandler中定义了一系列的函数,一般情况下,编写Netty程序,只需要在ChannelHandler中处理业务逻辑,编程模型相当简单。
7、编码与解码
代码实例中,channelHandler中读到的是ByteBuf对象,其实就是byte数组,在程序内部,把byte数组转换成了字符串。
问题1、半包问题,没有读到完整的数据,就进行了转换,导致错误。
问题2、编解码与业务柔和在一起,设计上不完美。
Netty提供了编码器,解码器以及编解码器。
public class RpcChannelHandler extends SimpleChannelInboundHandler<RpcResponse> implements RpcChannel { // 读取数据,读的是对象
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcResponse rpcResponse) throws Exception {
String requestId = rpcResponse.getRequestId();
if (rpcFutureMap.containsKey(requestId)) {
RpcFuture rpcFuture = rpcFutureMap.get(requestId);
rpcFutureMap.remove(requestId);
rpcFuture.finish(rpcResponse);
}
} @Override
public RpcFuture call(RpcRequest request, RpcCallback callback) {
this.channel.writeAndFlush(request); // 写数据,写的也是对象
}
}
编码器
public class RpcEncoder extends MessageToByteEncoder { @Override
public void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) throws Exception {
if (genericClass.isInstance(in)) {
byte[] data = SerializationUtil.serialize(in);
out.writeInt(data.length);
out.writeBytes(data);
}
}
}
解码器
public class RpcDecoder extends ByteToMessageDecoder { @Override
public final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (in.readableBytes() < 4) {
return;
}
in.markReaderIndex();
int dataLength = in.readInt(); if (in.readableBytes() < dataLength) {
in.resetReaderIndex();
return;
}
byte[] data = new byte[dataLength];
in.readBytes(data); Object obj = SerializationUtil.deserialize(data, genericClass);
out.add(obj);
}
}
处理流程如下:
编码解码器类结构
编码器是一个ChannelHandler,放在pipline中,作为处理请求消息的一个环节。处理传入参数的是Decoder,负责把二进制的数据,转换成程序可识别的数据结构,其实现了InboundChannelHandler接口;一般情况下,decoder要放在pip的header位置,即addFrist。处理传出参数的是Encoder,负责把程序内部的数据结构,转换成可在网络传输的二进制数据;一般情况下,encoder需要放在pipline的最后处理。
分别实现Encoder和Decoder,可能是代码放在两个类里实现;Netty提供了Codec,在一个类里实现编码和解码。
Netty对协议的支持,都是通过提供编码解码器实现的。例如:http协议
HttpRequestEncoder,将HttpRequest或HttpContent编码成ByteBuf
HttpRequestDecoder,将ByteBuf解码成HttpRequest和HttpContent
HttpResponseEncoder,将HttpResponse或HttpContent编码成ByteBuf
HttpResponseDecoder,将ByteBuf解码成HttpResponse和HttpContent
8、Netty 为什么高效?
1)、线程模型
- Reactor单线程模型;
- 作为NIO服务端,接收客户端的TCP连接;
- 作为NIO客户端,向服务端发起TCP连接;
- 读取通信对端的请求或者应答消息;
- 向通信对端发送消息请求或者应答消息。
- Reactor多线程模型;
- 有专门一个NIO线程-Acceptor线程用于监听服务端,接收客户端的TCP连接请求;
- 网络IO操作-读、写等由一个NIO线程池负责,线程池可以采用标准的JDK线程池实现,它包含一个任务队列和N个可用的线程,由这些NIO线程负责消息的读取、解码、编码和发送;
- 1个NIO线程可以同时处理N条链路,但是1个链路只对应1个NIO线程,防止发生并发操作问题。
- 主从Reactor多线程模型。
- 为了解决1个服务端监听线程无法有效处理所有客户端连接的性能不足问题,有一组NIO线程处理服务气短的监听。
如上的几种模式,在ServerBootStrap.goup初始化时设置,我们的例子,采用的是主从Reactor多线程模型。
2)、Zero Copy
- Netty的接收和发送ByteBuffer采用DIRECT BUFFERS,使用堆外直接内存进行Socket读写,不需要进行字节缓冲区的二次拷贝。如果使用传统的堆内存(HEAP BUFFERS)进行Socket读写,JVM会将堆内存Buffer拷贝一份到直接内存中,然后才写入Socket中。相比于堆外直接内存,消息在发送过程中多了一次缓冲区的内存拷贝。
ByteBufAllocator 通过ioBuffer分配堆外内存:public ByteBuf ioBuffer() {
return PlatformDependent.hasUnsafe() ? this.directBuffer(256) : this.heapBuffer(256);
} - Netty提供了组合Buffer对象,可以聚合多个ByteBuffer对象,用户可以像操作一个Buffer那样方便的对组合Buffer进行操作,避免了传统通过内存拷贝的方式将几个小Buffer合并成一个大的Buffer。
private int addComponent0(boolean increaseWriterIndex, int cIndex, ByteBuf buffer) {
......
wasAdded = this.components.add(c);
......
return var11;
} - Netty的文件传输采用了transferTo方法,它可以直接将文件缓冲区的数据发送到目标Channel,避免了传统通过循环write方式导致的内存拷贝问题。linux零拷贝,sendfile https://blog.csdn.net/caianye/article/details/7576198
private native long transferTo0(FileDescriptor var1, long var2, long var4, FileDescriptor var6);
3)、Select VS Epoll
多路复用IO有多种实现方式,其中,select/poll是所有操作系统都支持的方式,提出时间较早。JAVA NIO默认实现是用的Select,windows操作系统只支持Select。Epoll提出较晚,在linux系统提供,是目前比较成熟稳定的方案。
- select/poll 函数监视的文件描述符分3类,分别是writefds(可写)、readfds(可读)、和exceptfds(异常)。调用后select函数会阻塞,直到有描述符就绪(有数据 可读、可写、或者有except),或者超时(timeout指定等待时间,如果立即返回设为null即可),函数返回。当select函数返回后,可以通过遍历fdset,来找到就绪的描述符。
select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout); -
epoll是在2.6内核中提出的,是之前的select和poll的增强版本。首先,epoll使用一组函数来完成,而不是单独的一个函数;其次,epoll把用户关心的文件描述符上的事件放在内核里的一个事件表中,无须向select和poll那样每次调用都要重复传入文件描述符集合事件集。int epoll_create(int size); // 该函数返回的文件描述符指定要访问的内核事件表,是其他所有epoll系统调用的句柄,int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); //告诉操作系统需要关注哪些文件描述符int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout); // 等待事件发生
9、Netty能做什么?
1)、Rpc框架,代表 Dubbo
Http VS TCP
- 采用Http协议,增加了http协议的解码编码过程
- http协议本身无状态的,连接的复用不好;(可通过keep-alive解决一部分连接复用的问题)
Tips:
2)、服务代理,代表 NRedis-Proxy 参考: https://my.oschina.net/u/2608504/blog/787976
NRedis-Proxy 是一个Redis中间件服务,第一个Java 版本开源Redis中间件,无须修改业务应用程序任何代码与配置,与业务解耦;以Spring为基础开发自定义标签,让它可配置化,使其更加容易上手;以netty 作为通信传输工具,让它具有高性能,高并发,可分布式扩展部署等特点,单片性能损耗约5%左右。
3)、中间件Vert.x 参考 https://vertx.io/
clipse Vert.x is event driven and non blocking. This means your app can handle a lot of concurrency using a small number of kernel threads. Vert.x lets your app scale with minimal hardware.
参考文章