Bootstrap与Unsafe

开篇

今天,给大家分享一下netty中的bootstrap与unsafe。

unsafe

unsafe接口是内部接口,是netty为了方便操作channel而设计的一个辅助接口,它一般不允许被用户直接调用。主要用于实际的IO操作,例如:bind端口、处理accept、read事件、把channel注册到NioEventLoop上…。
下面的图是unsafe的继承类图,我们主要关注下nio相关的类:NioByteUnsafe以及NioMessageUnsafe。
NioByteUnsafe以及NioMessageUnsafe都继承于AbstarctNioUnsafe。
NioByteUnsafe主要是为NioSocketChannel提供相关的IO操作的。
而NioMessageUnsafe主要是为NioServerSocketChannel提供相关的IO操作的。
它们的相关的操作,我们在后续源码中也会提到。
Bootstrap与Unsafe

bootstrap

bootstrap class是netty提供的一个方便我们使用的工厂类。我们可以利用它来初始化netty,完成netty的客户端、服务端所需要的组件(例如:channel、selector、evenloop、pipeline)的组装。
在netty中,有两个启动class,分别用在客户端和服务端。class 关系如下图所示:
Bootstrap与Unsafe

Bootstrap和ServerBootstrap都继承于抽象类AbstarctBootstrap,它们仅仅是使用的地方不同,大致的配置和使用方法都是相同的。
因此,我将着重分享ServerBootstrap。对Bootstrap有兴趣的童鞋,欢迎在分享结束后找我讨论。

原生nio-demo

Bootstrap与Unsafe

先来看看此demo的整体架构:在主线程中,创建一个selector选择器。该选择器监听ServerSocketChannel的accept事件以及SocketChannel的read事件,并对接收到的事件进行处理。

接着,来看看demo运行的整体效果。(详见demo)

接下来,我们来看看详细的代码实现:

public class EasyServer {

    public static void main(String[] args) {
        // 1.a 创建服务器channel即ServerSocketChannel
        try (ServerSocketChannel serverSocketChannel = ServerSocketChannel.open()) {
            // 1.b 把ServerSocketChannel设置为非阻塞模式
            //     若设置为阻塞模式,则在把ServerSocketChannel注册到selector上的时候会抛出IllegalBlockingModeException
            serverSocketChannel.configureBlocking(false);

            // 1.c 创建selector多路复用器,一个selector可以管理多个channel
            Selector selector = Selector.open();
            // 1.d 把ServerSocketChannel注册到selector上并关注aceept事件
            SelectionKey selectionKey = serverSocketChannel.register(selector, 0, null);
            selectionKey.interestOps(SelectionKey.OP_ACCEPT);

            // 1.e ServerSocketChannel绑定端口,开始接收请求
            serverSocketChannel.bind(new InetSocketAddress(8899));
            System.out.println("serverSocketChannel bind success,serverSocketChannel:" + serverSocketChannel);

            while (true) {
                System.out.println("select wait channel already I/O operations.....");
                // 2.a 用selector的select方法,阻塞等待channel已经ready的IO操作;当客户端connect到服务端暴露的端口后,唤醒select方法
                // 3.a 当客户端与服务器成功建立连接后,客户端开始向服务端发送数据;
                // 3.b 服务端在while true循环中调用selector的select方法,阻塞等待channel已经ready的IO操作;
                //     当客户端向服务端写入数据后,唤醒select方法
                selector.select();
                Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                while (iterator.hasNext()) {
                    // 2.b&3.c 迭代获取channel事件,并移除当前事件,防止当前事件滞留在selectKeys集合中。
                    SelectionKey key = iterator.next();
                    //   若未移除的话,还会滞留在selectedKeys集合中。当下次遍历到的时候,此时没有真正的socket连接,会造成空指针异常
                    iterator.remove();
					// OP_READ = 1 << 0;OP_ACCEPT = 1 << 4
                    System.out.println("handle key:" + key.readyOps());

                    // 处理accept事件
                    if (key.isAcceptable()) {
                        ServerSocketChannel tempServerSocketChannel = (ServerSocketChannel) key.channel();
                        // 2.c 若为accept事件,则调用ServerSocketChannel.accept()接收来自客户端的连接SocketChannel
                        SocketChannel acceptSocketChannel = tempServerSocketChannel.accept();
                        System.out.println("selector accept socket channel:" + acceptSocketChannel);
                        // 2.d 把SocketChannel设置为非阻塞模式
                        acceptSocketChannel.configureBlocking(false);
                        ByteBuffer buffer = ByteBuffer.allocate(8);

                        // 2.e 把SocketChannel注册到selector上,并绑定一个8字节的bytebuffer作为附件,
                        //     缓存读取客户端发送过来的数据;并关注read事件
                        SelectionKey socketKey = acceptSocketChannel.register(selector, 0, buffer);
                        socketKey.interestOps(SelectionKey.OP_READ);
                    } else if (key.isReadable()) {
                        try {
                            SocketChannel socketChannel = (SocketChannel) key.channel();
                            // 3.d 若为read事件,则获取key中绑定的附件byteBuffer,
                            //     这边的byteBuffer每次缓存的是客户端中的完整的一个包(包以\n作为分割)
                            ByteBuffer buffer = (ByteBuffer) key.attachment();
                            // 3.e 从socketChannel中read数据到byteBuffer缓存中
                            int read = socketChannel.read(buffer);
                            // 3.h 当客户端发送完所有的数据断开的时候,则read返回的值为-1,
                            //     事件需要cancel掉(当事件发生后,要么处理,要么cancel,否则下次该事件仍然会触发;因为nio是水平触发的)
                            if (read == -1) {
                                key.cancel();
                                System.out.println("client close socket, so cancel key.");
                            } else {
                                // 3.f 当read值不为-1时,表明socketChannel能正常读取客户端的数据。
                                //     由于客户端每次发送过来的数据不一定是完整的一个包,便会出现粘包、半包问题
                                //     (粘包:abcdefghijklmnopq\nABCDEFGH、半包:01),因此我们得对这2种情况进行处理,以获得完整的一个包
								// 	   主要思路是:以\n为分隔符,当没读到\n时先存入ByteBuffer。直到读到\n,则输出ByteBuffer的内容
                                split(buffer);
                                // 3.g 当byteBuffer的容量不足以容纳下一个包的大小时(也就是buffer.position() == buffer.limit()),
                                //     便需要对其进行扩容,扩容完再作为附件绑定到事件上进行下一次的读取
                                if (buffer.position() == buffer.limit()) {
                                    ByteBuffer enlargeBuffer = ByteBuffer.allocate(buffer.capacity() * 2);
                                    buffer.flip();
                                    enlargeBuffer.put(buffer);
                                    key.attach(enlargeBuffer);
                                }
                            }
                        } catch (IOException e) {
                            e.printStackTrace();
                            key.cancel();
                        }
                    }
                }
            }

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private static void split(ByteBuffer source) {
        source.flip();
        for (int i = 0; i < source.limit(); i++) {
            if ('\n' == source.get(i)) {
                int length = i + 1 - source.position();
                // 把这条完整消息存入新的 ByteBuffer
                ByteBuffer target = ByteBuffer.allocate(length);
                // 从 source 读,向 target 写
                for (int j = 0; j < length; j++) {
                    target.put(source.get());
                }
                debugAll(target);
            }
        }
        source.compact();
    }

}

我们不妨把这段代码拆解成3个步骤来看:

服务器绑定端口
创建服务器channel即ServerSocketChannel
把ServerSocketChannel设置为非阻塞模式,防止accept的时候阻塞。若设置为阻塞模式,则在把ServerSocketChannel注册到selector上的时候会抛出IllegalBlockingModeException
创建selector多路复用器,一个selector可以管理多个channel
把ServerSocketChannel注册到selector上并关注aceept事件
ServerSocketChannel绑定端口,开始接收请求
accept客户端的连接请求
调用selector的select方法,阻塞等待channel已经ready的IO操作;当客户端connect到服务端暴露的端口后,唤醒select方法
迭代获取channel事件,并移除当前事件,防止当前事件滞留在selectKeys集合中。
若为accept事件,则调用ServerSocketChannel.accept()接收来自客户端的连接SocketChannel
把SocketChannel设置为非阻塞模式
把SocketChannel注册到selector上,并绑定一个8字节的bytebuffer作为附件,缓存读取客户端发送过来的数据;并关注read事件
读取客户端发送的数据
当客户端与服务器成功建立连接后,客户端开始向服务端发送数据
服务端在while true循环中调用selector的select方法,阻塞等待channel已经ready的IO操作;当客户端向服务端写入数据后,唤醒select方法
迭代获取channel事件,并移除当前事件,防止当前事件滞留在selectKeys集合中。
若为read事件,则获取key中绑定的附件byteBuffer,这边的byteBuffer每次缓存的是客户端中的完整的一个包(包以\n作为分割)
从socketChannel中read数据到byteBuffer缓存中
当read值不为-1时,表明socketChannel能正常读取客户端的数据。由于客户端每次发送过来的数据不一定是完整的一个包,便会出现粘包、半包问题(粘包:abcdefghijklmnopq\nABCDEFGH、半包:01),因此我们得对这2种情况进行处理,以获得完整的一个包
当byteBuffer的容量不足以容纳下一个包的大小时(也就是buffer.position() == buffer.limit()),便需要对其进行扩容,扩容完再作为附件绑定到事件上进行下一次的读取
当客户端发送完所有的数据断开的时候,则read返回的值为-1,事件需要cancel掉(当事件发生后,要么处理,要么cancel,否则下次该事件仍然会触发;因为nio是水平触发的);

水平触发也被称为条件触发: 只要满足条件,就触发一个事件(只要有数据没有被获取,内核就会不断通知你)。 边缘触发:
每当状态变化时,触发一个事件。 水平触发和边缘触发在IO编程的区别: 举个socket
read的例子,假定经过长时间的等待后,客户端发送了100个字节,这时无论边缘触发和水平触发都会产生一个read ready
notification通知应用程序可读。
应用程序读了50个字节,然后重新调用API等待io事件。这时条件触发的api会因为还有50个字节可读从 而立即返回用户一个read
ready notification。
而边缘触发的api会因为可读这个状态没有发生变化而陷入长期等待。因此边缘触发需要一次性处理完read到的数据。

深入源码

上面,我们用nio构造了服务端的demo,相信大家对服务端构造整体的流程有了一个大概的认知。
接下来,我们一起来看看如何使用ServerBootStrap类构建自己的服务端。

ServerBootStrap工具类

public class HelloServer {

    public static void main(String[] args) {
        // 创建boss NioEventLoopGroup,包含1个NioEventLoop,通常用来处理accept事件
        NioEventLoopGroup boss = new NioEventLoopGroup(1);
        // 创建worker NioEventLoopGroup,包含2个NioEventLoop,通常用来处理read、write事件以及业务逻辑handler
        NioEventLoopGroup worker = new NioEventLoopGroup(2);

        //1、创建ServerBootstrap实例
        ServerBootstrap serverBootstrap = new ServerBootstrap()
                //2、设置并绑定Reactor线程池
                .group(boss, worker)
                //3、设置并绑定服务端NioServerSocketChannel
                .channel(NioServerSocketChannel.class)
                //4、设置ChannelInitializer处理器,它的作用是:等待SocketChannel注册到eventLoop时,执行initChannel以便添加更多的handler
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                        //5、添加StringDecoder解码器到pipeline上,把read的数据从ByteBuf转为String
                        nioSocketChannel.pipeline().addLast(new StringDecoder());
                        //6、使用上一个handler的处理结果string,处理业务逻辑
                        nioSocketChannel.pipeline().addLast(new SimpleChannelInboundHandler<String>() {
                            @Override
                            protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
                                System.out.println("read msg:" + s);
                            }
                        });
                    }
                });
        //7、把ServerSocketChannel绑定到8080端口
        serverBootstrap.bind(9988);

    }
}

深入源码

我们不妨以bind为入口,来追踪ServerBootStrap的启动过程。

bind端口

bind方法主要完成了NioServerSocketChannel与ServerSocketChannel的创建、把ServerSocketChannel注册到selector中、绑定端口这三个主要工作。

io.netty.bootstrap.ServerBootstrap#bind

public ChannelFuture bind(SocketAddress localAddress) {
    validate();
    return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
}
io.netty.bootstrap.AbstractBootstrap#doBind
ServerBootstrap#bind->AbstractBootstrap#doBind

bind方法真正调用的是doBind,doBind中有2个重要方法:initAndRegister、doBind0。

顾名思义,initAndRegister主要完成的是初始化和注册:初始化就是初始化NioServerSocketChannel、注册就是把ServerSocketChannel注册到selector中。

而doBind0需要等待回调,然后完成绑定端口这项工作。

由于下面流程较复杂,我们不妨把initAndRegister和doBind分成a、b部分来阅读。

private ChannelFuture doBind(final SocketAddress localAddress) {
	final ChannelFuture regFuture = initAndRegister();
	...
	...
	if (regFuture.isDone()) {
		doBind0(regFuture, channel, localAddress, promise);
	} else {
		regFuture.addListener(new ChannelFutureListener() {
    		@Override
    		public void operationComplete(ChannelFuture future) throws Exception {
				.....
            	doBind0(regFuture, channel, localAddress, promise);
				....
    		}
		});
	}
	...
	...
}
a io.netty.bootstrap.AbstractBootstrap#initAndRegister
ServerBootstrap#bind->AbstractBootstrap#doBind->AbstractBootstrap#initAndRegister

先来看看initAndRegister方法。
initAndRegister方法主要做三个事情:

  1. 调用channelFactory.newChannel()完成NioServerSocketChannel与ServerSocketChannel的创建(main线程中完成)
  2. 调用init方法向NioServerSocketChannel中的pipeline添加handler:ServerBootstrapAcceptor(main线程中完成)
  3. 把ServerSocketChannel注册到bossGroup的selector中(NIO线程中完成)工作。
    下面,我们来详细看看这三项工作。
final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
		a.1、完成NioServerSocketChannel与ServerSocketChannel的创建(main线程中完成)
        channel = channelFactory.newChannel();
		a.2、向NioServerSocketChannel中的pipeline添加handler:ServerBootstrapAcceptor
        init(channel);
    } catch (Throwable t) {
        .....
    }
	.....
	a.3、把ServerSocketChannel注册到bossGroup的selector中
    ChannelFuture regFuture = config().group().register(channel);
	.....
    return regFuture;
}
a.1 channelFactory.newChannel()
ServerBootstrap#bind->AbstractBootstrap#doBind->AbstractBootstrap#initAndRegister->ReflectiveChannelFactory#newChannel

我们先来看看a.1部分。

channel = channelFactory.newChannel();
channelFactory.newChannel()实际上会调用ReflectiveChannelFactory#newChannel。这是因为在构建ServerBootStrap的时候,会设置channel属性,此时传入NioServerSocketChannel.class,此时会new一个
ReflectiveChannelFactory工厂类。

io.netty.bootstrap.AbstractBootstrap#channel
public B channel(Class<? extends C> channelClass) {
    return channelFactory(new ReflectiveChannelFactory<C>(
            ObjectUtil.checkNotNull(channelClass, "channelClass")
    ));
}

因此,我们调用constructor.newInstance(),会通过反射的机制创建NioServerSocketChannel对象并调用NioServerSocketChannel的构造方法。

io.netty.channel.ReflectiveChannelFactory#newChannel
public T newChannel() {
    try {
        return constructor.newInstance();
    } catch (Throwable t) {
        throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
    }
}

在NioServerSocketChannel构造方法中,创建了ServerSocketChannel,并把其设置为非阻塞模式。
同时创建了NioMessageUnsafe、DefaultChannelPipeline,并设置readInterestOp为accept。
因此我们能看出,NioServerSocketChannel中绑定了ServerSocketChannel、NioMessageUnsafe、DefaultChannelPipeline。

io.netty.channel.socket.nio.NioServerSocketChannel
public NioServerSocketChannel() {
    this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
public NioServerSocketChannel(ServerSocketChannel channel) {
	...
    unsafe = newUnsafe();
	pipeline = newChannelPipeline();
	this.readInterestOp = SelectionKey.OP_ACCEPT;
	channel.configureBlocking(false);
	...
}
a.2 init(channel)
ServerBootstrap#bind->AbstractBootstrap#doBind->AbstractBootstrap#initAndRegister
->ReflectiveChannelFactory#newChannel->ServerBootstrap#init

再来看看a.2部分。a.2部分获取到NioServerSocketChannel中的pipeline,并向pipeline中添加了一个ChannelInitializer处理器,当前不会执行initChannel相关逻辑。

io.netty.bootstrap.ServerBootstrap#init
void init(Channel channel) {
	...
    ChannelPipeline p = channel.pipeline();
	...
    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(final Channel ch) {
            final ChannelPipeline pipeline = ch.pipeline();
            ...
            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}
a.3 config().group().register(channel)
ServerBootstrap#bind->AbstractBootstrap#doBind->AbstractBootstrap#initAndRegister
->ReflectiveChannelFactory#newChannel->ServerBootstrap#init->AbstractChannel#register

最后来看看a.3部分:ChannelFuture regFuture = config().group().register(channel);。最终会调用到AbstractChannel#register。

a.3部分的主要工作是:把ServerSocketChannel注册到bossGroup的selector中。注意,这个操作是在NIO线程中完成的。

a.3中首先调用eventLoop.inEventLoop判断当前线程是不是创建NioEventLoop绑定的线程;如果是的话,直接调用register0;否则调用eventLoop.execute,若是首次执行execute,那么会在bossGroup中初始化并启动一个nio线程,之后注册的操作会在这个线程中执行。这样就完成了main线程到boss group的nio线程的切换。

eventLoop.inEventLoop以及eventLoop.execute会在异步串行无锁化设计中详细介绍。

io.netty.channel.AbstractChannel#register
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    AbstractChannel.this.eventLoop = eventLoop;
	// 如果当前线程为boss group线程
    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        try {
			// 第一次执行execute时会在bossGroup中启动一个NioEventLoop,之后的注册等操作会在这个线程中执行。
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            ...
        }
    }
}
a.4 pipeline.invokeHandlerAddedIfNeeded()
ServerBootstrap#bind->AbstractBootstrap#doBind->AbstractBootstrap#initAndRegister->ReflectiveChannelFactory#newChannel
->ServerBootstrap#init->AbstractChannel#register->DefaultChannelPipeline#invokeHandlerAddedIfNeeded

切换完线程后,最终都会调用register0,register0主要做3件事情。

先来看看doRegister方法,doRegister把ServerSocketChannel注册到selector中,并把NioServerSocketChannel作为附件,且未关注任何事件。

io.netty.channel.AbstractChannel
private void register0(ChannelPromise promise) {
    try {
        ......
        doRegister();

      	......


		// a.4唤醒b部分ChannelInitializer处理器的initChannel方法,向NioServerSocketChannel的pipline中添加ServerBootstrapAcceptor
        pipeline.invokeHandlerAddedIfNeeded();

		// a.5回调doBind0方法,开启b线
		safeSetSuccess(promise);
        ......
    } catch (Throwable t) {
        ......
    }
}


protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
            return;
        } catch (CancelledKeyException e) {
           ......
    }
}

在注册完成后,会调用a.4部分pipeline.invokeHandlerAddedIfNeeded()唤醒a.2部分ChannelInitializer处理器的initChannel方法,向NioServerSocketChannel的pipline中添加ServerBootstrapAcceptor。
Bootstrap与Unsafe
此时,NioServerSocketChannel的pipline的结构为:
Bootstrap与Unsafe

a.5 safeSetSuccess(promise)
ServerBootstrap#bind->AbstractBootstrap#doBind->AbstractBootstrap#initAndRegister
->ReflectiveChannelFactory#newChannel->ServerBootstrap#init->AbstractChannel#register
->AbstractChannel#safeSetSuccess

接下来,会调用a.5部分后,把promise设置为success,此时会触发ChannelFuture的operationComplete事件触发回调doBind0。

io.netty.channel.AbstractChannel
private void register0(ChannelPromise promise) {
    try {
        ......
		// a.5回调doBind0方法,开启b线
		safeSetSuccess(promise);
        ......
    } catch (Throwable t) {
        ......
    }
}

AbstractBootstrap#doBind
private ChannelFuture doBind(final SocketAddress localAddress) {
	final ChannelFuture regFuture = initAndRegister();
	...
	...
	if (regFuture.isDone()) {
		doBind0(regFuture, channel, localAddress, promise);
	} else {
		regFuture.addListener(new ChannelFutureListener() {
    		@Override
    		public void operationComplete(ChannelFuture future) throws Exception {
				.....
            	doBind0(regFuture, channel, localAddress, promise);
				....
    		}
		});
	}
	...
	...
}
b io.netty.bootstrap.AbstractBootstrap#doBind0

doBind0最终会调用到io.netty.channel.AbstractChannel.AbstractUnsafe#bind

在bind方法里主要做两件事:

  1. 调用doBind绑定端口
  2. 调用pipeline.fireChannelActive使得ServerSocketChannel关注accept事件
io.netty.channel.AbstractChannel.AbstractUnsafe#bind
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
	......
	//b.1绑定对应端口
    doBind(localAddress);
	......

    if (!wasActive && isActive()) {
        invokeLater(new Runnable() {
            @Override
            public void run() {
				b.2 ServerSocketChannel关注accept事件
                pipeline.fireChannelActive();
            }
        });
    }

    safeSetSuccess(promise);
}
b.1 doBind
ServerBootstrap#bind->AbstractBootstrap#doBind->AbstractBootstrap#initAndRegister
->AbstractUnsafe#bind

调用doBind方法绑定端口

b.2 doBeginRead
ServerBootstrap#bind->AbstractBootstrap#doBind->AbstractBootstrap#initAndRegister
->AbstractUnsafe#bind->AbstractNioChannel#doBeginRead

在绑定端口操作完成后,会判断各种所有初始化操作是否已经完成。若完成,则会调用pipeline.fireChannelActive()。
最终会调用到AbstractNioChannel.doBeginRead方法中:若ServerSocketChannel没有关注Accept事件,则让其关注Accept事件。
这边的interestOps属性就是我们在NioServerSocketChannel中初始化的ACCEPT。

io.netty.channel.nio.AbstractNioChannel#doBeginRead

protected void doBeginRead() throws Exception {
    // Channel.read() or ChannelHandlerContext.read() was called
    final SelectionKey selectionKey = this.selectionKey;
    if (!selectionKey.isValid()) {
        return;
    }

    readPending = true;

    final int interestOps = selectionKey.interestOps();
	// 如果ServerSocketChannel没有关注Accept事件
    if ((interestOps & readInterestOp) == 0) {
		// 让其关注Accepet事件
        // readInterestOp取值为16,在NioServerSocketChannel创建时(a.1部分)初始化
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}

总结
bind方法做了以下事情:
a、initAndRegister

  1. 【主线程】创建了NioServerSocketChannel,并在初始化的过程中:
    1.1 创建了ServerSocketChannel。真正的实干人员,后面注册到selector上、bind端口都靠它。
    1.2 创建了NioMessageUnsafe
    1.3 创建了DefaultChannelPipeline
    1.4 把readInterestOp属性设置为accept
  2. 【主线程】向NioServerSocketChannel的pipeline中添加了一个ChannelInitializer处理器,以便后续添加ServerBootstrapAcceptor
  3. 【boos nio线程】在bossGroup中初始化并启动一个nio线程,在这个线程中把ServerSocketChannel注册到selector中
  4. 【boss nio线程】回调a.2的ChannelInitializer处理器,向NioServerSocketChannel的pipeline添加ServerBootstrapAcceptor
  5. 【boss nio线程】回调doBind方法

b、doBind0

  1. 【boss nio线程】绑定端口
  2. 【boss nio线程】在初始化工作完成后,让ServerSocketChannel关注accept事件
accept客户端请求

在服务端bind端口以及ServerSocketChannel注册了accept事件后,我们来看看服务端是如何accept客户端请求的?
accept客户端请求可以从io.netty.channel.nio.NioEventLoop#run开始跟踪,主要做了2件事:

  1. 调用selector的select方法,阻塞等待channel已经ready的IO操作
  2. 遍历已经ready的IO操作,处理对应的事件
io.netty.channel.nio.NioEventLoop#run
protected void run() {
    int selectCnt = 0;
    for (;;) {
		int strategy;
		strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
        switch (strategy) {
        	.......
           	case SelectStrategy.SELECT:
            	.....
				// c 调用selector的select方法,阻塞等待channel已经ready的IO操作
                strategy = select(curDeadlineNanos);
               	.....
        }
		// d 遍历已经ready的IO操作,处理对应的事件
		processSelectedKeys();
    }
}
c io.netty.channel.nio.NioEventLoop#select
NioEventLoop#run->NioEventLoop#select

在run方法中,首先会调用selector的select方法,阻塞等待channel已经ready的IO操作。

private int select(long deadlineNanos) throws IOException {
    if (deadlineNanos == NONE) {
        return selector.select();
    }
    long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
    return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
}
d io.netty.channel.nio.NioEventLoop#processSelectedKeys
NioEventLoop#run->NioEventLoop#select->NioEventLoop#processSelectedKeys

processSelectedKeys主要用来处理已就绪的各种IO事件,例如:accept、read…
当客户端connect到服务端暴露的端口后,唤醒select方法,触发accept事件。
processSelectedKeys最终会调用到io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read
在read方法中主要做两件事:doReadMessages、pipeline.fireChannelRead。

@Override
public void read() {
	......
	// d.1 调用accept获得了SocketChannel,并创建了NioSocketChannel作为消息放入readBuf(List<Object> readBuf = new ArrayList<Object>())
	int localRead = doReadMessages(readBuf);
	......
	
	for (int i = 0; i < size; i ++) {
    	readPending = false;
		// d.2 触发pipline上的handler处理read事件
    	pipeline.fireChannelRead(readBuf.get(i));
	}
}
d.1 NioServerSocketChannel#doReadMessages
NioEventLoop#run->NioEventLoop#select->NioEventLoop#processSelectedKeys
->NioServerSocketChannel#doReadMessages

在doReadMessages中:

  • 调用ServerSocketChannel的accept获取SocketChannel
  • 创建NioSocketChannel
    • 把SocketChannel作为NioSocketChannel的属性,并设置为非阻塞模式
    • 创建NioByteUnsafe、DefaultChannelPipeline,并设置readInterestOp为accept
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
	// ServerSocketChannel调用accept获取SocketChannel
    SocketChannel ch = SocketUtils.accept(javaChannel());

    try {
        if (ch != null) {
			// 创建NioSocketChannel
			// 把SocketChannel作为NioSocketChannel的属性,并设置为非阻塞模式
			// 创建NioMessageUnsafe、DefaultChannelPipeline,并设置readInterestOp为accept
            buf.add(new NioSocketChannel(this, ch));
            return 1;
        }
    } catch (Throwable t) {
        logger.warn("Failed to create a new channel from an accepted socket.", t);

        try {
            ch.close();
        } catch (Throwable t2) {
            logger.warn("Failed to close a socket.", t2);
        }
    }

    return 0;
}
d.2 pipeline.fireChannelRead
NioEventLoop#run->NioEventLoop#select->NioEventLoop#processSelectedKeys
->NioServerSocketChannel#doReadMessages->DefaultChannelPipeline#fireChannelRead

在accept并创建了NioSocketChannel之后,会调用NioServerSocketChannel的pipline触发read事件。
由于NioServerSocketChannel的plpline的结构如下:
Bootstrap与Unsafe
因此,我们会调用到ServerBootstrapAcceptor的channelRead方法,并传递刚刚创建好的NioSocketChannel作为参数

d.3 ServerBootstrapAcceptor.channelRead
NioEventLoop#run->NioEventLoop#select->NioEventLoop#processSelectedKeys
->NioServerSocketChannel#doReadMessages->DefaultChannelPipeline#fireChannelRead
->ServerBootstrapAcceptor#channelRead

在ServerBootstrapAcceptor的channelRead中,做了以下事情:

  • 往NioSocketChannel的pipline中添加用户自定义的childHandler
  • 并把NioSocketChannel注册到worker group上
io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    final Channel child = (Channel) msg;

	// 往NioSocketChannel的pipline中添加用户自定义的childHandler
    child.pipeline().addLast(childHandler);

    setChannelOptions(child, childOptions, logger);
    setAttributes(child, childAttrs);

    try {
		// 把NioSocketChannel注册到worker group上
        childGroup.register(child).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    forceClose(child, future.cause());
                }
            }
        });
    } catch (Throwable t) {
        forceClose(child, t);
    }
}

这边的把NioSocketChannel注册到worker group上实际上调用的是io.netty.channel.AbstractChannel#register,其流程与a.3类似:

  • 当前线程为boss group的nio线程,需要把当前线程切换为worker group的nio线程。若为首次执行,那么会在workerGroup中初始化并启动一个nio线程,之后的操作会在这个线程中执行。
  • 切换完线程后,会调用doRegister方法把SocketChannel注册到selector中,并把NioSocketChannel作为附件,且未关注任何事件。
  • 在注册完成后,会调用pipeline.invokeHandlerAddedIfNeeded()唤醒用户设置的childHandler中的ChannelInitializer处理器的initChannel方法,向SocketChannel的pipline中添加业务handler。添加完后的pipline的结构为:
    Bootstrap与Unsafe
  • 让SocketChannel关注read事件
总结

accept客户端请求主要做了以下事情:
c NioEventLoop#select:
【boos nio】调用selector的select方法,阻塞等待channel已经ready的IO操作

d NioEventLoop#processSelectedKeys:

  1. 【boss nio】调用ServerSocketChannel的accept获取SocketChannel:把SocketChannel作为NioSocketChannel的属性,并设置为非阻塞模式;创建NioByteUnsafe、DefaultChannelPipeline,并设置readInterestOp为accept。
  2. 【boss nio】触发pipline的read事件,调用ServerBootstrapAcceptor的channelRead方法
  3. 【worker nio】当前线程切换为worker group的nio线程;在此线程上:会向SocketChannel的pipline中添加业务handler;把SocketChannel注册到selector中,并关注read事件。

read客户端数据

当客户端已经和服务端连接后,客户端就开始发送数据了。我们来看看服务端是这么接受数据的?
服务端接收数据可以从NioEventLoop的run方法开始跟踪,同样的会调用selector的select方法,阻塞等待channel已经ready的IO操作。
当客户端发送数据时,会唤醒当前线程,调用processSelectedKeys,processSelectedKeys最终会调用到io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read。

e NioByteUnsafe.read

由于客户端发送的数据未必能够一次读完,因此会触发多次 nio read 事件。
一次事件会把数据读取到byetBuf中,接着会触发 pipeline read。这样read的数据便能在pipline中传递,供我们业务的handler使用。
一次事件会触发多次pipeline read,会触发一次 pipeline read complete。

public final void read() {
	......
	final ChannelPipeline pipeline = pipeline();
	......
	do {
		......
		// 读取数据放到byteBuf中
		allocHandle.lastBytesRead(doReadBytes(byteBuf));
		......
		// 触发多次pipeline read,传递byteBuf
		pipeline.fireChannelRead(byteBuf);
		......
	} // 是否要继续循环
    while (allocHandle.continueReading());
	......
	pipeline.fireChannelReadComplete();
}

设计亮点

学完了上述源码之后,相信我们对ServerBootStrap启动、ServerSocketChannel accpet客户端请求、SocketChannel read客户端数据的整体流程有了大概的了解。
下面我将介绍netty在设计过程中的亮点:

异步串行无锁化设计

在许多场景下,并行处理可以充分利用服务器并行能力提高服务整体的处理能力。但是,如果对于共享资源处理不当,会带来严重的锁竞争,最终也会导致性能下降。
为了尽可能地避免锁竞争带来的性能损耗,netty使用了无锁化的串行设计。

netty的串行无锁化设计主要运用在两次的线程切换:

  • 把线程从主线程切换到boss group线程,以便后续把ServerSocketChannel注册到selector上并关注accept事件以及初始化NioServerSocketChannel的pipline
  • 把线程从boss group的io线程切换到worker group的io线程,以便后续把SocketChannel注册到selector上并关注read事件以及初始化SocketChannel的pipline

那么netty是如何进行串行无锁话设计的呢?

其实,netty的异步串行无锁化设计主要思路是:每个NioEventLoop中只会有一个线程在循环运行。每当有其他任务向NioEventLoop提交时,便会放入队列中。线程在循环的过程中会把队列中的任务取出来运行。

我们先来看下NioEventLoop的结构。
在NioEventLoop中维护了一个Mpsc(ManyProducerSingleConsumer,这里不展开阐述)队列,队列会在创建NioEventLoop的时候被初始化,是用来实现串行无锁化的关键。

protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                    boolean addTaskWakesUp, int maxPendingTasks,
                                    RejectedExecutionHandler rejectedHandler) {
	....
	this.taskQueue = this.newTaskQueue(this.maxPendingTasks);
	....
}


@Override
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
    return newTaskQueue0(maxPendingTasks);
}


private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {
    // This event loop never calls takeTask()
    return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
            : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
}

NioEventLoop同时也封装了一个线程,用来处理accept等IO事件,同时也用来处理队列中的任务。

private volatile Thread thread;

此时,我们会产生疑问:任务时怎么提交到EventLoop中的呢?

每当调用SingleThreadEventExecutor#execute时:

  1. 调用inEventLoop判断当前线程是否是创建NioEventLoop时绑定的线程
  2. 向NioEventLoop的Mpsc队列中添加任务
  3. 若当前线程不是创建NioEventLoop时绑定的线程,则调用startThread:若NioEventLoop中的线程未启动,则启动线程
  4. 若线程需要立马唤醒,则唤醒线程
@Override
public boolean inEventLoop(Thread thread) {
    return thread == this.thread;
}


private void execute(Runnable task, boolean immediate) {
	1、调用inEventLoop判断当前线程是否是创建NioEventLoop时绑定的线程
    boolean inEventLoop = inEventLoop();
	2、向NioEventLoop的Mpsc队列中添加任务
    addTask(task);
    if (!inEventLoop) {
        startThread();
        if (isShutdown()) {
            boolean reject = false;
            try {
                if (removeTask(task)) {
                    reject = true;
                }
            } catch (UnsupportedOperationException e) {
                // The task queue does not support removal so the best thing we can do is to just move on and
                // hope we will be able to pick-up the task before its completely terminated.
                // In worst case we will log on termination.
            }
            if (reject) {
                reject();
            }
        }
    }

    if (!addTaskWakesUp && immediate) {
        wakeup(inEventLoop);
    }
}
上一篇:Bootstrap - blockquote


下一篇:推荐几个好用的Bootstrap后端UI代码库