一 NioEventLoop的创建大纲
1. new NioEventLoopGroup() //创建线程组,默认2*CPU
2. new ThreadPerTaskExecutor() //创建线程执行器
3. for() { newChild() } //构造NioEventLoop
4. 根据NioEventLoop数组创建Chooser //线程选择器
二 NioEventLoop的创建流程
1. 进入 MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args)
/** * Create a new instance. * * @param nThreads the number of threads that will be used by this instance. * @param executor the Executor to use, or {@code null} if the default should be used. * @param chooserFactory the {@link EventExecutorChooserFactory} to use. * @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call */ protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { if (nThreads <= 0) { throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); } //创建 executor, 默认为 ThreadPerTaskExecutor if (executor == null) { executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); } //children 创建 EventExecutor children = new EventExecutor[nThreads]; for (int i = 0; i < nThreads; i ++) { boolean success = false; try { /** * EventLoopGroup 创建 EventLoop, NioEventLoopGroup 执行的方法为 * {@link io.netty.channel.nio.NioEventLoopGroup#newChild(java.util.concurrent.Executor, java.lang.Object...)} */ children[i] = newChild(executor, args); success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); } finally { if (!success) { for (int j = 0; j < i; j ++) { children[j].shutdownGracefully(); } for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { // Let the caller handle the interruption. Thread.currentThread().interrupt(); break; } } } } } //创建 EventExecutorChooser chooser = chooserFactory.newChooser(children); final FutureListener<Object> terminationListener = new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { if (terminatedChildren.incrementAndGet() == children.length) { terminationFuture.setSuccess(null); } } }; for (EventExecutor e: children) { e.terminationFuture().addListener(terminationListener); } Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length); Collections.addAll(childrenSet, children); readonlyChildren = Collections.unmodifiableSet(childrenSet); }
MultithreadEventExecutorGroup 先创建一个 ThreadPerTaskExecutor 作为 NioEventLoop 的执行器,然后调用 MultithreadEventExecutorGroup#newChild(Executor executor, Object... args) 方法创建 NioEventLoop,最后调用 EventExecutorChooserFactory#newChooser 创建 EventExecutorChooser
查看 ThreadPerTaskExecutor
public final class ThreadPerTaskExecutor implements Executor { private final ThreadFactory threadFactory; public ThreadPerTaskExecutor(ThreadFactory threadFactory) { this.threadFactory = ObjectUtil.checkNotNull(threadFactory, "threadFactory"); } @Override public void execute(Runnable command) { /** * @see DefaultThreadFactory#newThread(java.lang.Runnable, java.lang.String) */ threadFactory.newThread(command).start(); } }
从 execute 方法可以看出,ThreadPerTaskExecutor 实例每次 executor 都会创建一个新的 thread 并执行,并且其默认的 threadFactory 为 io.netty.util.concurrent.DefaultThreadFactory,这个threadFactory 创建的线程为 FastThreadLocalThread 线程,这样每个 NioEventLoop 实例调用到 execute 方法时就会创建一个新的 FastThreadLocalThread 线程并执行
进入 newChild() 方法,具体方法为 io.netty.channel.nio.NioEventLoopGroup#newChild(java.util.concurrent.Executor, java.lang.Object...)
@Override protected EventLoop newChild(Executor executor, Object... args) throws Exception { EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null; //创建并返回 NioEventLoop return new NioEventLoop(this, executor, (SelectorProvider) args[0], ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory); }
从上一步可以看出, 这里的 executor 为 ThreadPerTaskExecutor, args 按顺序应为 SelectorProvider、SelectStrategyFactory、RejectedExecutionHandler、EventLoopTaskQueueFactory,继续调用 NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler, EventLoopTaskQueueFactory queueFactory) 构造方法创建 NioEventLoop 并返回
进入NioEventLoop 构造器
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler, EventLoopTaskQueueFactory queueFactory) { //创建两个非阻塞的 Queue 并继续调用父类的构造方法 super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory), rejectedExecutionHandler); this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider"); this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy"); //selectorTuple 里保存了一个 未装饰过的 selector 和一个经过装饰器的 selector final SelectorTuple selectorTuple = openSelector(); this.selector = selectorTuple.selector; this.unwrappedSelector = selectorTuple.unwrappedSelector; }
NioEventLoop 构造器先调用 newTaskQueue(EventLoopTaskQueueFactory queueFactory) 方法创建两个非阻塞的 Queue 并继续调用父类构造方法,调用父类构造方法就是保存属性,这里不过多深入,然后保存 provider、selectStrategy、 最后调用 openSelector() 方法创建 selector 并保存
进入 newTaskQueue(EventLoopTaskQueueFactory queueFactory) 方法
private static Queue<Runnable> newTaskQueue( EventLoopTaskQueueFactory queueFactory) { if (queueFactory == null) { return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS); } return queueFactory.newTaskQueue(DEFAULT_MAX_PENDING_TASKS); }
继续调用 Queue<Runnable> newTaskQueue0(int maxPendingTasks) 方法
进入 Queue<Runnable> newTaskQueue0(int maxPendingTasks) 方法
private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) { // This event loop never calls takeTask() //默认创建一个 newMpscQueue return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue() : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks); }
Queue<Runnable> newTaskQueue0(int maxPendingTasks) 方法创建了一个 mpscQueue 并返回,mpscQueue 全称为 multiple process single consumer queue,是一个 多处理单消费并发安全的 Queue
进入 openSelector() 方法查看 Selector 创建流程
private SelectorTuple openSelector() { final Selector unwrappedSelector; try { // provider 创建一个 Nio 的 原生selector unwrappedSelector = provider.openSelector(); } catch (IOException e) { throw new ChannelException("failed to open a new selector", e); } //如果禁用 KeySet 优化,返回原生selector if (DISABLE_KEY_SET_OPTIMIZATION) { return new SelectorTuple(unwrappedSelector); } //加载并获取 sun.nio.ch.SelectorImpl 类 Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() { @Override public Object run() { try { return Class.forName( "sun.nio.ch.SelectorImpl", false, PlatformDependent.getSystemClassLoader()); } catch (Throwable cause) { return cause; } } }); if (!(maybeSelectorImplClass instanceof Class) || // ensure the current selector implementation is what we can instrument. !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) { if (maybeSelectorImplClass instanceof Throwable) { Throwable t = (Throwable) maybeSelectorImplClass; logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t); } return new SelectorTuple(unwrappedSelector); } final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass; //创建一个 SelectedSelectionKeySet final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet(); Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() { @Override public Object run() { try { //获取 sun.nio.ch.SelectorImpl 类中的 selectedKeys 和 publicSelectedKeys 两个属性 Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys"); Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys"); if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) { // Let us try to use sun.misc.Unsafe to replace the SelectionKeySet. // This allows us to also do this in Java9+ without any extra flags. long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField); long publicSelectedKeysFieldOffset = PlatformDependent.objectFieldOffset(publicSelectedKeysField); if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) { PlatformDependent.putObject( unwrappedSelector, selectedKeysFieldOffset, selectedKeySet); PlatformDependent.putObject( unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet); return null; } // We could not retrieve the offset, lets try reflection as last-resort. } Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true); if (cause != null) { return cause; } cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true); if (cause != null) { return cause; } //通过反射将 selectedKeys 和 publicSelectedKeys 两个属性值强制设置成 selectedKeySet selectedKeysField.set(unwrappedSelector, selectedKeySet); publicSelectedKeysField.set(unwrappedSelector, selectedKeySet); return null; } catch (NoSuchFieldException e) { return e; } catch (IllegalAccessException e) { return e; } } }); if (maybeException instanceof Exception) { selectedKeys = null; Exception e = (Exception) maybeException; logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e); return new SelectorTuple(unwrappedSelector); } //本地保存 selectedKeySet,这样就不需要访问 selector 中的 selectedKeySet,直接在本地获取 selectedKeys = selectedKeySet; logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector); return new SelectorTuple(unwrappedSelector, new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet)); }
openSelector() 方法主要有以下几步
- 使用 provider 创建一个 Nio 原生的 selector
- 加载并获取 sun.nio.ch.SelectorImpl 类
- 创建一个 SelectedSelectionKeySet 实例 selectedKeySet
- 利用反射将原生 selector 中的 selectedKeySet、publicSelectedKeySet 替换成 selectedKeySet,这是 Netty 对 JDK Nio 的一步很重要的优化,JDK 中 SelectorImpl 的 selectedKeySet 底层是以 Set 来实现的,可能存在Hash冲突,而 Netty 自己实现的 SelectedSelectionKeySet 实例虽然叫 Set,但是底层是以数组来实现的,可以完全避免Hash 冲突
- 本地保存 selectedKeySet,这样就不需要访问 selector 中的 selectedKeySet,直接在本地获取
进入 EventExecutorChooserFactory#newChooser 方法,查看 chooser 创建流程
@Override public EventExecutorChooser newChooser(EventExecutor[] executors) { //判断 executors 的个数是不是 2 的幂,如2,4,8,16,如果是,创建 PowerOfTwoEventExecutorChooser, // 如果不是,创建GenericEventExecutorChooser if (isPowerOfTwo(executors.length)) { return new PowerOfTwoEventExecutorChooser(executors); } else { return new GenericEventExecutorChooser(executors); } }
EventExecutorChooserFactory#newChooser 方法会根据 executors 的个数是不是 2 的幂来创建 PowerOfTwoEventExecutorChooser 或 GenericEventExecutorChooser,PowerOfTwoEventExecutorChooser 会以 &运算 来计算下一个 EventExecutor,GenericEventExecutorChooser就是以常规取模运算来计算下一个 EventExecutor,这也是 Netty 的一步优化
至此 NioEventLoop 创建完毕
三 总结:
NioEventLoopGroup 会创建 nThreads 个NioEventLoop,每个 NioEventLoop 内部的 executor 都为同一个 ThreadPerTaskExecutor 实例,NioEventLoop 调用 Executor#execute 时,都会创建一个新的FastThreadLocalThread 线程实例来运行
Netty 优化点:
- netty 以自己重新实现的 SelectedSelectionKeySet 来代替了 SelectImpl 中的 selectedKeySet,使用数组方式代替 set,从而加快了 add 速度
- 如果 workGroup 指定 nThreads 长度为 2的幂次方,Netty就以与运算来代替取模原生来加快获取下一个 NioEventLoop 的速度