Netty服务端启动(一)——创建和初始化channel

以netty的一个小demo为例(使用的源码版本为4.1.50.Final)

b.bind(PORT)跟进

最终调用到AbstractBootstrap#doBind这个方法

private ChannelFuture doBind(final SocketAddress localAddress) {
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }

    if (regFuture.isDone()) {
        // At this point we know that the registration was complete and successful.
        ChannelPromise promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        // Registration future is almost always fulfilled already, but just in case it‘s not.
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Throwable cause = future.cause();
                if (cause != null) {
                    // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                    // IllegalStateException once we try to access the EventLoop of the Channel.
                    promise.setFailure(cause);
                } else {
                    // Registration was successful, so set the correct executor to use.
                    // See https://github.com/netty/netty/issues/2586
                    promise.registered();

                    doBind0(regFuture, channel, localAddress, promise);
                }
            }
        });
        return promise;
    }
}

这里就是服务端创建的主要代码

创建Channel

主要概括如下

1.通过demo里传入的NioServerSocketChannel类反射创建channel

2.在NioServerSocketChannel的构造函数里

  • 通过java底层的SelectorProvider创建ServerSocketChannel
  • 调用父类构造函数设置阻塞模式,创建id,unsafe,pipeline等
  • 创建NioServerSocketChannelConfig,便于进行参数配置

方法体里第一行代码的initAndRegister方法就是在创建和初始化channel

final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
        channel = channelFactory.newChannel();   //创建channel,本节分析
        init(channel);
    } catch (Throwable t) {
        //省略
    }

    ChannelFuture regFuture = config().group().register(channel);
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }
    return regFuture;
}

创建channel就在channelFactory.newChannel()这行代码里,这里会调用到ReflectiveChannelFactory#newChannel这个方法

public T newChannel() {
    try {
        return constructor.newInstance();
    } catch (Throwable t) {
        //省略
    }
}

可以看到这里用反射创建一个channel,也就是说channel是通过channelFactoryconstructor来反射创建的,而channelFactory是在demo里调用channel方法时初始化的,如下

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
    .channel(NioServerSocketChannel.class)   //在这里初始化了`channelFactory
    //省略

AbstractBootstrap#channel方法里

public B channel(Class<? extends C> channelClass) {
    return channelFactory(new ReflectiveChannelFactory<C>(
        ObjectUtil.checkNotNull(channelClass, "channelClass")
    ));
}

最终调用到AbstractBootstrap#channelFactory方法里

public B channelFactory(ChannelFactory<? extends C> channelFactory) {
    ObjectUtil.checkNotNull(channelFactory, "channelFactory");
    if (this.channelFactory != null) {
        throw new IllegalStateException("channelFactory set already");
    }

    this.channelFactory = channelFactory;
    return self();
}

既然这里是通过反射创建了一个NioServerSocketChannel,接下来看看NioServerSocketChannel这个类的构造函数

public NioServerSocketChannel() {
    this(newSocket(DEFAULT_SELECTOR_PROVIDER));  
}

先看看NioServerSocketChannel#newSocket这个方法

private static ServerSocketChannel newSocket(SelectorProvider provider) {
    try {
        return provider.openServerSocketChannel(); //provider就是DEFAULT_SELECTOR_PROVIDER,为SelectorProvider.provider()
    } catch (IOException e) {
        //省略
    }
}

上面的构造函数调用到了另一个构造函数

public NioServerSocketChannel(ServerSocketChannel channel) {
    //注意这里传入SelectionKey.OP_ACCEPT,用于之后注册accept事件
    super(null, channel, SelectionKey.OP_ACCEPT);
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

super方法调用父类做一些简单的初始化,主要在AbstractNioChannelAbstractChannel这两个类这种

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent);
    this.ch = ch;
    this.readInterestOp = readInterestOp;
    try {
        ch.configureBlocking(false);
    } catch (IOException e) {
        //省略
    }
}
protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();
}

NioServerSocketChannelConfig则是传入java底层的ServerSocketChannel方便以后对做一些配置

初始化channel

先概括一下,主要做了这几件事

  • 将用户设置的options和attrs设置到channel的config里
  • 向channel的pipeline里添加用户传入的handler
  • 创建一个特殊的handler即ServerBootstrapAcceptor(用于处理新连接接入)并添加进pipeline里

接下来分析初始化channel的过程,即init(channel)这行代码

这里会调用到ServerBootstrap#init这个方法

void init(Channel channel) {
    //newOptionsArray这个方法拿到用户设置的options设置到channel的config里
    setChannelOptions(channel, newOptionsArray(), logger);
    //attrs0这个方法拿到用户设置的attrs设置到channel里
    setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));

    ChannelPipeline p = channel.pipeline();

    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption<?>, Object>[] currentChildOptions;
    //拿到用于配置childHandler的childOptions
    synchronized (childOptions) {
        currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);
    }
    //拿到用于配置childHandler的childAttrs
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);

    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(final Channel ch) {
            final ChannelPipeline pipeline = ch.pipeline();
            //拿到demo里调用childHandler方法时添加的handler
            ChannelHandler handler = config.handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }

            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    //ServerBootstrapAcceptor是一个特殊的handler,用于处理新连接接入
                    pipeline.addLast(new ServerBootstrapAcceptor(
                        ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}

下篇 Netty服务端启动(二)——注册selector和端口绑定

Netty服务端启动(一)——创建和初始化channel

上一篇:PHP7无法加载curl模块


下一篇:vue动态index.html默认标题