Netty源码剖析之一:NioEventLoopGroup

1. NioEventLoopGroup介绍

Netty启动的时候会构建NioEventLoopGroup实例,NioEventLoopGroup内部维护了一组线程NioEventLoop,用于处理IO事件(OP_ACCEPTOP_CONNECTOP_READOP_WRITE),每个线程负责处理多个Channel上的事件,而一个Channel只对应于一个线程,这样实现了Netty串行无锁化

2. NioEventLoopGroup类图

Netty源码剖析之一:NioEventLoopGroup

3. 构造函数

NioEventLoopGroup的构造函数有多个,最终调用其父类MultithreadEventLoopGroup的构造函数,不指定线程的数量则默认为CPU核心数*2

private static final int DEFAULT_EVENT_LOOP_THREADS;
static {
    DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
            "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
    if (logger.isDebugEnabled()) {
        logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
    }
}
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}

继续调用其父类MultithreadEventExecutorGroup的构造函数,大概流程为:

  1. 初始化线程池的大小,如果不指定默认CPU核心数*2
  2. 设置线程工厂类,用来创建线程。
  3. 实例化子线程,类型为NioEventLoop
  4. 确定线程池的线程调度算法,默认是轮询调度。
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
		EventExecutorChooserFactory chooserFactory, Object... args) {
    if (nThreads <= 0) {
        throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
    }
    // 用来创建线程并启动 threadFactory.newThread(command).start();
    if (executor == null) {
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }
    // 线程池,
    children = new EventExecutor[nThreads];
    for (int i = 0; i < nThreads; i ++) {
        boolean success = false;
        try {
            children[i] = newChild(executor, args); // 初始化线程
            success = true;
        } catch (Exception e) {
            throw new IllegalStateException("failed to create a child event loop", e);
        } finally {
            if (!success) {
                for (int j = 0; j < i; j ++) {
                    children[j].shutdownGracefully();
                }
                for (int j = 0; j < i; j ++) {
                    EventExecutor e = children[j];
                    try {
                        while (!e.isTerminated()) {
                            e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                        }
                    } catch (InterruptedException interrupted) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        }
    }
    chooser = chooserFactory.newChooser(children); // 子线程调度算法
    final FutureListener<Object> terminationListener = new FutureListener<Object>() {
        @Override
        public void operationComplete(Future<Object> future) throws Exception {
            if (terminatedChildren.incrementAndGet() == children.length) {
                terminationFuture.setSuccess(null);
            }
        }
    };
    for (EventExecutor e: children) {
        e.terminationFuture().addListener(terminationListener);
    }
    Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
    Collections.addAll(childrenSet, children);
    readonlyChildren = Collections.unmodifiableSet(childrenSet);
}

子类NioEventLoopGroupnewChild()方法,创建线程NioEventLoop

@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
    return new NioEventLoop(this, executor, (SelectorProvider) args[0],
        ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
}

提示:这里可以看到Netty的一个性能优化,就是线程的调度算法的实现chooser = chooserFactory.newChooser(children);默认实现为DefaultEventExecutorChooserFactory类,这里是一个策略模式。虽然这两者都是轮询调度算法,但如果线程池的大小等于2的幂次方,则使用位运算,因为位运算比取模运算效率高。

public EventExecutorChooser newChooser(EventExecutor[] executors) {
    if (isPowerOfTwo(executors.length)) {
        return new PowerOfTwoEventExecutorChooser(executors);
    } else {
        return new GenericEventExecutorChooser(executors);
    }
}
private static boolean isPowerOfTwo(int val) {
    return (val & -val) == val;
}
// 位运算
private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
    private final AtomicInteger idx = new AtomicInteger();
    private final EventExecutor[] executors;
    PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
        this.executors = executors;
    }
    @Override
    public EventExecutor next() {
        return executors[idx.getAndIncrement() & executors.length - 1];
    }
}
// 取模运算
private static final class GenericEventExecutorChooser implements EventExecutorChooser {
    private final AtomicInteger idx = new AtomicInteger();
    private final EventExecutor[] executors;
    GenericEventExecutorChooser(EventExecutor[] executors) {
        this.executors = executors;
    }
    @Override
    public EventExecutor next() {
        return executors[Math.abs(idx.getAndIncrement() % executors.length)];
    }
}
上一篇:Confluence 6 升级以后


下一篇:Exception in thread "Thread-5" java.lang.NoSuchMethodError: io.netty.util.concurrent.Singl