netty是一个nio客户机-服务器框架,它简化了tcp和udp网络编程,相对于java传统nio,netty还屏蔽了操作系统的差异性,并且兼顾了性能。
Channel
channel封装了对socket的原子操作,实质是对socket的封装和扩展。
netty框架自己定义的通道接口,是对java nio channel的封装和扩展,客户端NIO套接字通道是NioSocketChannel,提供的服务器端NIO套接字通道是NioServerSocketChannel。
NioSocketChannel内部管理了一个Java NIO的SocketChanne实例,用来创建SocketChannel实例和设置该实例的属性,并调用Connect 方法向服务端发起 TCP 链接等。
NioServerSocketChannel内部管理了Java NIO的ServerSocketChannel实例,用来创建ServerSocketChannel实例和设置该实例属性,并调用实例的bind方法在指定端口监听客户端的链接。
EventLoopGroup
netty使用了主从多线程的Reactor模型。在netty中每个EventLoopGroup本身是一个线程池,其中包含了自定义个数的 NioEventLoop,每个NioEventLoop是一个线程,并且每个NioEventLoop都会关联一个selector选择器。
客户端一般只用一个EventLoopGroup来处理网络IO操作,服务器端一般使用两个,boss-group用来接收客户端发来的TCP链接请求,worker-group用来具体处理网络请求。
当Channel是客户端通道NioSocketChannel时,会注册NioSocketChannel管理的SocketChannel实例到自己关联的NioEventLoop的selector选择器上,然后NioEventLoop对应的线程会通过select命令监控感兴趣的网络读写事件。
当 Channel 是服务端通道NioServerSocketChannel时,NioServerSocketChannel本身会被注册到boss EventLoopGroup里面的某一个NioEventLoop管理的selector选择器,而完成三次握手的链接套接字是被注册到了worker EventLoopGroup里面的某一个NioEventLoop管理的selector选择器上。
多个Channel可以注册到同一个NioEventLoop管理的selector选择器上,这时NioEventLoop对应的单个线程就可以处理多个Channel的就绪事件;但是每个Channel只能注册到一个固定的NioEventLoop管理的selector上。
ChannelPipeline
ChannelPipeline持有一个ChannelHandler的双向链结构。每个Channel都有属于自己的ChannelPipeline,对从Channel 中读取或者要写入 Channel 中的数据进行依次处理。
多ChannelPipeline里面可以复用一个ChannelHandler。
Netty 客户端底层与 Java NIO 对应关系
NioSocketChannel是对Java NIO的SocketChannel的封装,从NioSocketChannel的构造函数可以看出:
public class NioSocketChannel extends AbstractNioByteChannel implements SocketChannel {
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
public NioSocketChannel() {
this(DEFAULT_SELECTOR_PROVIDER);
}
public NioSocketChannel(SelectorProvider provider) {
this(newSocket(provider));
}
private static java.nio.channels.SocketChannel newSocket(SelectorProvider provider) {
try {
return provider.openSocketChannel();
} catch (IOException var2) {
throw new ChannelException("Failed to open a socket.", var2);
}
}
}
SocketChannel的父类是AbstractNioChannel,在AbstractNioChannel中定义了java nio的SocketChannel类型的成员,并将其模式设置为非阻塞:
public abstract class AbstractNioChannel extends AbstractChannel {
private final SelectableChannel ch;
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
...
this.ch = ch;
this.readInterestOp = readInterestOp;
ch.configureBlocking(false);
}
}
所以说NioSocketChannel内部是对java nio的SocketChannel的封装扩展,创建NioSocketChannel实例对象时候相当于执行了Java NIO中:
SocketChannel socketChannel = SocketChannel.open();
NioSocketChannel实例的创建和注册是在Bootstrap的connect阶段。Bootstrap的connect代码:
public class Bootstrap extends AbstractBootstrap<Bootstrap, Channel> {
...
public ChannelFuture connect(InetAddress inetHost, int inetPort) {
return this.connect(new InetSocketAddress(inetHost, inetPort));
}
public ChannelFuture connect(SocketAddress remoteAddress) {
...
return this.doResolveAndConnect(remoteAddress, this.config.localAddress());
}
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
ChannelFuture regFuture = this.initAndRegister();
...
}
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = this.channelFactory.newChannel();
this.init(channel);
} catch (Throwable var3) {}
ChannelFuture regFuture = this.config().group().register(channel);
...
}
channelFactory.newChannel()创建了NIOSocketChannel实例。
继续跟踪this.config().group().register(channel)到:
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
...
public ChannelFuture register(Channel channel) {
return this.register((ChannelPromise)(new DefaultChannelPromise(channel, this)));
}
public ChannelFuture register(ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
最终会跟踪到AbstractNioChannel的doRegister():
public abstract class AbstractNioChannel extends AbstractChannel {
...
protected void doRegister() throws Exception {
boolean selected = false;
while(true) {
try {
this.selectionKey = this.javaChannel().register(this.eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException var3) {
if (selected) {
throw var3;
}
this.eventLoop().selectNow();
selected = true;
}
}
}
}
其中:
this.javaChannel().register(this.eventLoop().unwrappedSelector(), 0, this)
将NioSocketChannel实例注册到了当前NioEventLoop的选择器中。
从选择器获取就绪的事件是在该客户端套接关联的NioEventLoop里面的做的,每个NioEventLoop里面有一个线程用来循环从选择器里面获取就绪的事件:
protected void run() {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
default:
}
...
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {}
}
}
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
while(true) {
...
int selectedKeys = selector.select(timeoutMillis);
++selectCnt;
...
} catch (CancelledKeyException var13) {}
}
从选择器选取就绪的事件后,会最终调用processSelectedKey具体处理每个事件:
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
...
try {
int readyOps = k.readyOps();
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
// Process OP_WRITE
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {}
}
netty和reactor模型
netty实现单线程reactor:
private EventLoopGroup group = new NioEventLoopGroup(1);
ServerBootstrap bootstrap = new ServerBootstrap()
.group(group)
.childHandler(new HeartbeatInitializer());
netty实现多线程reactor:
// 默认线程数为CPU核心数 * 2,*2是因为考虑到超线程CPU包括两个逻辑线程
private EventLoopGroup group = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap()
.group(boss)
.childHandler(new HeartbeatInitializer());
netty实现主从多线程reactor:
private EventLoopGroup boss = new NioEventLoopGroup();
private EventLoopGroup work = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap()
.group(boss, work)
.childHandler(new HeartbeatInitializer());