4-2~4-5 NioEventLoop的创建

一  NioEventLoop的创建大纲

 1. new NioEventLoopGroup()  //创建线程组,默认2*CPU

 2. new ThreadPerTaskExecutor()  //创建线程执行器

 3. for() { newChild() }  //构造NioEventLoop

 4. 根据NioEventLoop数组创建Chooser  //线程选择器

 

二  NioEventLoop的创建流程

 1. 进入 MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args)

    /**
     * Create a new instance.
     *
     * @param nThreads          the number of threads that will be used by this instance.
     * @param executor          the Executor to use, or {@code null} if the default should be used.
     * @param chooserFactory    the {@link EventExecutorChooserFactory} to use.
     * @param args              arguments which will passed to each {@link #newChild(Executor, Object...)} call
     */
    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        if (nThreads <= 0) {
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        }

        //创建 executor, 默认为 ThreadPerTaskExecutor
        if (executor == null) {
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }

        //children 创建 EventExecutor
        children = new EventExecutor[nThreads];

        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
                /**
                 * EventLoopGroup 创建 EventLoop, NioEventLoopGroup 执行的方法为
                 * {@link io.netty.channel.nio.NioEventLoopGroup#newChild(java.util.concurrent.Executor, java.lang.Object...)}
                 */
                children[i] = newChild(executor, args);
                success = true;
            } catch (Exception e) {
                // TODO: Think about if this is a good exception type
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                if (!success) {
                    for (int j = 0; j < i; j ++) {
                        children[j].shutdownGracefully();
                    }

                    for (int j = 0; j < i; j ++) {
                        EventExecutor e = children[j];
                        try {
                            while (!e.isTerminated()) {
                                e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                            }
                        } catch (InterruptedException interrupted) {
                            // Let the caller handle the interruption.
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                }
            }
        }

        //创建 EventExecutorChooser
        chooser = chooserFactory.newChooser(children);

        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                if (terminatedChildren.incrementAndGet() == children.length) {
                    terminationFuture.setSuccess(null);
                }
            }
        };

        for (EventExecutor e: children) {
            e.terminationFuture().addListener(terminationListener);
        }

        Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
        Collections.addAll(childrenSet, children);
        readonlyChildren = Collections.unmodifiableSet(childrenSet);
    }

 MultithreadEventExecutorGroup 先创建一个 ThreadPerTaskExecutor 作为 NioEventLoop 的执行器,然后调用 MultithreadEventExecutorGroup#newChild(Executor executor, Object... args) 方法创建 NioEventLoop,最后调用 EventExecutorChooserFactory#newChooser 创建 EventExecutorChooser

 

 查看 ThreadPerTaskExecutor 

public final class ThreadPerTaskExecutor implements Executor {
    private final ThreadFactory threadFactory;

    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
        this.threadFactory = ObjectUtil.checkNotNull(threadFactory, "threadFactory");
    }

    @Override
    public void execute(Runnable command) {
        /**
         * @see DefaultThreadFactory#newThread(java.lang.Runnable, java.lang.String)
         */
        threadFactory.newThread(command).start();
    }
}

 从 execute 方法可以看出,ThreadPerTaskExecutor 实例每次 executor 都会创建一个新的 thread 并执行,并且其默认的 threadFactory 为 io.netty.util.concurrent.DefaultThreadFactory,这个threadFactory 创建的线程为 FastThreadLocalThread 线程,这样每个 NioEventLoop 实例调用到 execute 方法时就会创建一个新的 FastThreadLocalThread 线程并执行

 

 进入 newChild() 方法,具体方法为 io.netty.channel.nio.NioEventLoopGroup#newChild(java.util.concurrent.Executor, java.lang.Object...)

    @Override
    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
        //创建并返回 NioEventLoop
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
    }

 从上一步可以看出, 这里的 executor 为 ThreadPerTaskExecutor, args 按顺序应为 SelectorProvider、SelectStrategyFactory、RejectedExecutionHandler、EventLoopTaskQueueFactory,继续调用 NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler, EventLoopTaskQueueFactory queueFactory) 构造方法创建 NioEventLoop 并返回

 

 进入NioEventLoop 构造器

    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
                 EventLoopTaskQueueFactory queueFactory) {
        //创建两个非阻塞的 Queue 并继续调用父类的构造方法
        super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
                rejectedExecutionHandler);
        this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
        this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
        //selectorTuple 里保存了一个 未装饰过的 selector 和一个经过装饰器的 selector
        final SelectorTuple selectorTuple = openSelector();
        this.selector = selectorTuple.selector;
        this.unwrappedSelector = selectorTuple.unwrappedSelector;
    }

 NioEventLoop 构造器先调用 newTaskQueue(EventLoopTaskQueueFactory queueFactory) 方法创建两个非阻塞的 Queue 并继续调用父类构造方法,调用父类构造方法就是保存属性,这里不过多深入,然后保存 provider、selectStrategy、 最后调用 openSelector() 方法创建 selector 并保存

 

 进入 newTaskQueue(EventLoopTaskQueueFactory queueFactory) 方法

    private static Queue<Runnable> newTaskQueue(
            EventLoopTaskQueueFactory queueFactory) {
        if (queueFactory == null) {
            return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS);
        }
        return queueFactory.newTaskQueue(DEFAULT_MAX_PENDING_TASKS);
    }

  继续调用 Queue<Runnable> newTaskQueue0(int maxPendingTasks) 方法

 

 进入 Queue<Runnable> newTaskQueue0(int maxPendingTasks) 方法

    private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {
        // This event loop never calls takeTask()
        //默认创建一个 newMpscQueue
        return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
                : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
    }

 Queue<Runnable> newTaskQueue0(int maxPendingTasks) 方法创建了一个 mpscQueue 并返回,mpscQueue 全称为 multiple process single consumer queue,是一个 多处理单消费并发安全的 Queue

 

 进入 openSelector() 方法查看 Selector 创建流程

    private SelectorTuple openSelector() {
        final Selector unwrappedSelector;
        try {
            // provider 创建一个 Nio 的 原生selector
            unwrappedSelector = provider.openSelector();
        } catch (IOException e) {
            throw new ChannelException("failed to open a new selector", e);
        }

        //如果禁用 KeySet 优化,返回原生selector
        if (DISABLE_KEY_SET_OPTIMIZATION) {
            return new SelectorTuple(unwrappedSelector);
        }

        //加载并获取 sun.nio.ch.SelectorImpl 类
        Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
            @Override
            public Object run() {
                try {
                    return Class.forName(
                            "sun.nio.ch.SelectorImpl",
                            false,
                            PlatformDependent.getSystemClassLoader());
                } catch (Throwable cause) {
                    return cause;
                }
            }
        });

        if (!(maybeSelectorImplClass instanceof Class) ||
            // ensure the current selector implementation is what we can instrument.
            !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
            if (maybeSelectorImplClass instanceof Throwable) {
                Throwable t = (Throwable) maybeSelectorImplClass;
                logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
            }
            return new SelectorTuple(unwrappedSelector);
        }

        final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
        //创建一个 SelectedSelectionKeySet
        final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

        Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
            @Override
            public Object run() {
                try {
                    //获取 sun.nio.ch.SelectorImpl 类中的 selectedKeys 和 publicSelectedKeys 两个属性
                    Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
                    Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

                    if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
                        // Let us try to use sun.misc.Unsafe to replace the SelectionKeySet.
                        // This allows us to also do this in Java9+ without any extra flags.
                        long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
                        long publicSelectedKeysFieldOffset =
                                PlatformDependent.objectFieldOffset(publicSelectedKeysField);

                        if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
                            PlatformDependent.putObject(
                                    unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
                            PlatformDependent.putObject(
                                    unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
                            return null;
                        }
                        // We could not retrieve the offset, lets try reflection as last-resort.
                    }

                    Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
                    if (cause != null) {
                        return cause;
                    }
                    cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
                    if (cause != null) {
                        return cause;
                    }

                    //通过反射将 selectedKeys 和 publicSelectedKeys 两个属性值强制设置成 selectedKeySet
                    selectedKeysField.set(unwrappedSelector, selectedKeySet);
                    publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
                    return null;
                } catch (NoSuchFieldException e) {
                    return e;
                } catch (IllegalAccessException e) {
                    return e;
                }
            }
        });

        if (maybeException instanceof Exception) {
            selectedKeys = null;
            Exception e = (Exception) maybeException;
            logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
            return new SelectorTuple(unwrappedSelector);
        }
        //本地保存 selectedKeySet,这样就不需要访问 selector 中的 selectedKeySet,直接在本地获取
        selectedKeys = selectedKeySet;
        logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
        return new SelectorTuple(unwrappedSelector,
                                 new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
    }

 openSelector() 方法主要有以下几步

  1. 使用 provider 创建一个 Nio 原生的 selector
  2. 加载并获取 sun.nio.ch.SelectorImpl 类
  3. 创建一个 SelectedSelectionKeySet 实例 selectedKeySet
  4. 利用反射将原生 selector 中的 selectedKeySet、publicSelectedKeySet 替换成 selectedKeySet,这是 Netty 对 JDK Nio 的一步很重要的优化,JDK 中 SelectorImpl 的 selectedKeySet 底层是以 Set 来实现的,可能存在Hash冲突,而 Netty 自己实现的 SelectedSelectionKeySet 实例虽然叫 Set,但是底层是以数组来实现的,可以完全避免Hash 冲突
  5. 本地保存 selectedKeySet,这样就不需要访问 selector 中的 selectedKeySet,直接在本地获取

 

 进入 EventExecutorChooserFactory#newChooser 方法,查看 chooser 创建流程

    @Override
    public EventExecutorChooser newChooser(EventExecutor[] executors) {
        //判断 executors 的个数是不是 2 的幂,如2,4,8,16,如果是,创建 PowerOfTwoEventExecutorChooser,
        // 如果不是,创建GenericEventExecutorChooser
        if (isPowerOfTwo(executors.length)) {
            return new PowerOfTwoEventExecutorChooser(executors);
        } else {
            return new GenericEventExecutorChooser(executors);
        }
    }

 EventExecutorChooserFactory#newChooser 方法会根据 executors 的个数是不是 2 的幂来创建 PowerOfTwoEventExecutorChooser 或 GenericEventExecutorChooser,PowerOfTwoEventExecutorChooser 会以 &运算 来计算下一个 EventExecutor,GenericEventExecutorChooser就是以常规取模运算来计算下一个 EventExecutor,这也是 Netty 的一步优化

 

 至此 NioEventLoop 创建完毕

 

三 总结:

 NioEventLoopGroup 会创建 nThreads 个NioEventLoop,每个 NioEventLoop 内部的 executor 都为同一个 ThreadPerTaskExecutor 实例,NioEventLoop 调用 Executor#execute 时,都会创建一个新的FastThreadLocalThread 线程实例来运行

  Netty 优化点:

  1. netty 以自己重新实现的 SelectedSelectionKeySet 来代替了 SelectImpl 中的 selectedKeySet,使用数组方式代替 set,从而加快了 add 速度
  2. 如果 workGroup 指定 nThreads 长度为 2的幂次方,Netty就以与运算来代替取模原生来加快获取下一个 NioEventLoop 的速度
上一篇:Netty-NioEventLoop线程工作机制


下一篇:Java300集强势来袭,你还在等什么!!!