接口定义
io.netty.channel.EventLoopGroup extends EventExecutorGroup
方法
|
说明
|
ChannelFuture register(Channel channel)
|
把一个channel注册到一个EventLoop
|
ChannelFuture register(Channel channel, ChannelPromise promise);
|
同上
|
io.netty.channel.EventLoop extends OrderedEventExecutor, EventLoopGroup
方法
|
说明
|
EventLoopGroup parent()
|
得到创建这个eventLoop的EventLoopGroup
|
EventLoopGroup定义的主要方法是register, 这个方法的语义是把channel和eventLoop绑定在一起。一个channel对应一个eventLoop, 一个eventLoop会持有多个channel。
I/O线程EventLoopGroup的抽象实现
io.netty.channel.MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup
io.netty.channel.SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop
两个类主功能都是实现了EventLoopGroup定义的register方法
MultithreadEventLoopGroup
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
public ChannelFuture register(Channel channel, ChannelPromise promise) {
return next().register(channel, promise);
}
SingleThreadEventLoop
public ChannelFuture register(Channel channel) {
return register(channel, new DefaultChannelPromise(channel, this));
}
public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
channel.unsafe().register(this, promise);
return promise;
}
register的实现主要是为了调用Channel.Unsafe实例的register方法。
NIO实现
io.netty.channel.nio.NioEventLoopGroup extends MultithreadEventLoopGroup
io.netty.channel.nio.NioEventLoop extends SingleThreadEventLoop
NioEventLoopGroup是在MultithreadEventLoopGroup基础上实现了对JDK NIO Selector的封装, 它实现以下几个功能:
- 创建selector
- 在selector上注册channel感兴趣的NIO事件
- 实现EventExecutor的run方法,定义NIO事件和Executor任务的处理流程。
- 把NIO事件转换成对channel unsafe的调用或NioTask的调用
- 控制线程执行I/O操作和排队任务的用时比例
- 处理epoll selector cpu 100%的bug
下面来具体分析这几个功能的实现。
创建Selector
NioEventLoop#openSelector()实现了创建selector的功能,默认情况下,使用SelectorProvider#openSelector()方法创建一个新个selector:
final Selector unwrappedSelector = provider.openSelector();
如果设置环境变量io.netty.noKeySetOptimization=true, 会创建一个selectedKeySet = new SelectedSelectionKeySet(), 然后使用java的反射机制把selector的selectedKeys和publicSelectedKeys替换成selectedKeySet,具体步骤是:
1.得到selector的真正类型: sun.nio.ch.SelectorImpl
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
return Class.forName(
"sun.nio.ch.SelectorImpl",
false,
PlatformDependent.getSystemClassLoader());
} catch (Throwable cause) {
return cause;
}
}
});
final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
2.替换selector是属性unwrappedSelector
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
selectedKeysField.set(unwrappedSelector, selectedKeySet);
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
之所以会设计一个这样的优化选项,是因为一般情况下调用完selector的select或selectNow方法后需要调用Selector#selectedKeys()得到触发NIO事件的的SelectableChannel,这样优化之后,可以直接从selectedKeySet中得到已经触发了NIO事件的SelectableChannel。
在selector上注册channel感兴趣的NIO事件
NioEventLoop提供了unwrappedSelector方法,这个方法返回了它创建好的Selector实例。这样任何的外部类都可以把任意的SelectableChannel注册到这selector上。在AbstractNioChannel中, doRegister方法的实现就是使用了这个方法:
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
另外,它还提供了一个register方法:
public void register(final SelectableChannel ch, final int interestOps, final NioTask<?> task)
这个方法会把task当成SelectableChannel的附件注册到selector上:
ch.register(selector, interestOps, task);
实现EventExecutor的run方法,定义NIO事件和Executor任务的处理流程
在NioEventLoop的run方法中实现NIO事件和EventExecutor的任务处理逻辑,这个run方法在io.netty.util.concurrent.SingleThreadEventExecutor中定义。在上一章中,我们看到了DefaultEventExecutor中是如何实现这个run方法的,这里我们将要看到这run方法的另一个实现。和SingleThreadEventExecutor中的run方法相比,NioEventLoop的run方法不仅要及时地执行taskQueue中的任务,还要能及时地处理NIO事件,因此它会同时检查selector中的NIO事件和和taskQueue队列,任何一个中有事件需要处理或有任务需要执行,它不会阻塞线程。同时它也保证了在没有NIO事件和任务的情况下线程不会无谓的空转浪费CUP资源。
run主要实现如下,为了更清晰的说明它的主要功能,我对原来的代码进行了一些删减。
for(;;){
try{
//phase1: 同时检查NIO事件和任务
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false)); //在taskQueue中没有任务的时候执行select
}
//phase2: 进入处理NIO事件,执行executor任务
try{
//处理NIO事件
processSelectedKeys();
}finally{
//处理taskQueu中的任务
runAllTasks();
}
}catch(Throwable t){
handleLoopException(t);
}
}
run方法有两个阶段构成:
phase1: 检查NIO事件或executor任务,如果有任何的NIO事件或executor任务进入phase2。
这样阶段的主要工作在selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())和select中完成。
selectStrategy.calculateStrategy实现
selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())
这行代码的含义是: 如果hasTasks() == true, 调用以下selector#selectNow, 然后进入phase2。 否则调用select。这里使用了strategy模式,默认的strategy实现是io.netty.channe.DefaultSelectStrategy implements SelectStrategy
@Override
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}
DefaultSelectStrategy实现了SelectStrategy接口,这接口定义了两个常量:
int SELECT = -1;
int CONTINUE = -2;
运行时selectSuppler参数传入的是selectNowSupplier, 它的实现如下:
private final IntSupplier selectNowSupplier = new IntSupplier() {
@Override
public int get() throws Exception {
return selectNow();
}
};
这里的get方法调用了selectNow, selectNow调用的是Selector#selectNew方法,这个方法的返回值是>=0。
hashTasks的传入的参数是hasTask()的返回值: return !taskQueue.isEmpty();
代码读到这里就会发现,使用默认的的SelectStrategy实现,calculateStrategy在hasTasks()==true时返回值>=0, hasTasks() == false时返回值是SelectStrategy.SELECT,不会返回SelectStrategy.CONTINUE。
select实现
select的执行逻辑是:
1. 计算超select方法的结束时间selectDeadLineNanos
long currentTimeNanos = System.nanoTime();
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
2. 进入循环,检查超时--超时跳出循环。
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}
3. 如果在select执行过程中有executor任务提交或可以当前的wakeUp由false变成true, 跳出循环
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
}
4. 调用selector#select等待NIO事件。
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
5. 如果满足这些条件的任何一个,跳出循环: 有NIO事件、wakeUp的新旧值都是true、taskQueue中有任务、有定时任务到期。
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
break;
}
6. 如果线程被中断,跳出循环。
if (Thread.interrupted()) {
break;
}
7. 如果selector.select超时,没有检查到任何NIO事件, 会在下次循环开始时跳出循环。 如果每次超时,跳到第2步继续下一次循环。
long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
selectCnt = 1;
}
currentTimeNanos = time;
select 最迟会在当前时间>= selectDeadLineNanos时返回,这个时间是最近一个到期的定时任务执行的时间,换言之如果没有任何的NIO事件或executor任务,select会在定时任务到期时返回。如果没有定时任务,delayNanos(currentTimeNanos)返回的值是 TimeUnit.SECONDS.toNanos(1),即1秒。 select会在检查到任何NIO事件或executor任务时返回,为了保证这点,在selector.select(timeoutMillis)前后都会调用hasTasks检查executor任务,为了能在调用executet提交任务时唤醒selector.select,NioEventLoop覆盖了SingleThreadEventExecutor的wake方法:
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop && wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
}
这个方法会及时的唤醒selector.select, 保证新提交的任务可以得到及时的执行。
phase2: 进入处理NIO事件,执行executor任务
这个阶段是先调用processSelectedKeys()处理NIO事件,然后掉用 runAllTasks()处理所有已经到期的定时任务和已经在排队的任务。这个阶段还实现了NIO事件和executor任务的用时比例管理,这个特性稍后会详细分析。