前言
在前文中,我们分析了服务器是如何启动的。而服务器启动后肯定是要接受客户端请求并返回客户端想要的信息的,否则要你服务器干啥子呢?所以,我们今天就分析分析 Netty 在启动之后是如何接受客户端请求的。
开始吧!
1. 从源头开始
从之前服务器启动的源码中,我们得知,服务器最终注册了一个 Accept 事件等待客户端的连接。我们也知道,NioServerSocketChannel 将自己注册到了 boss 单例线程池(reactor 线程)上,也就是 EventLoop 。
楼主还没有仔细介绍 EventLoop ,但楼主这里先稍微讲一下他的逻辑:
EventLoop 的作用是一个死循环,而这个循环中做3件事情:
- 有条件的等待 Nio 事件。
- 处理 Nio 事件。
- 处理消息队列中的任务。
而我们今天看的就是第二个步骤。
首先需要进入到 NioEventLoop 源码中。
2. 开始 debug
进入到 NioEventLoop 源码中后,找到 private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) 方法 ,断点打在下方:
debug 启动我们的 EchoServer 的 main 方法。在浏览器键入 http://localhost:8007/,开始访问我们的 Netty 服务器,这时候,断点开始卡住。
从上图中的断点我们可以看到, readyOps 是16 ,也就是 Accept 事件。说明浏览器的请求已经进来了。那么这个 unsafe 是谁呢?就是 boss 线程中 NioServerSocketChannel 的AbstractNioMessageChannel$NioMessageUnsafe 对象。
我们进入到 AbstractNioMessageChannel$NioMessageUnsafe 的read 方法中。
AbstractNioMessageChannel$NioMessageUnsafe # read 方法
@Override
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
boolean closed = false;
Throwable exception = null;
try {
try {
do {
int localRead = doReadMessages(readBuf);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
} finally {
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
楼主限于篇幅,精简了很多代码,我们拆解一下代码:
- 检查该 eventloop 线程是否是当前线程。
- 执行 doReadMessages 方法,并传入一个 readBuf 变量,这个变量是一个 List,也就是容器。
- 循环容器,执行 pipeline.fireChannelRead(readBuf.get(i));
我们分析一下上面的步骤:doReadMessages 肯定是读取 boss 线程中的 NioServerSocketChannel 接受到的请求。并把这些请求放进容器,然后呢?循环容器中的所有请求,调用 pipeline 的 fireChannelRead 方法,用于处理这些接受的请求或者其他事件。
那么我们就来验证一下。进入 doReadMessages 方法。
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel());
buf.add(new NioSocketChannel(this, ch));
return 1;
}
楼主精简了代码,可以看到,和我们猜的不差,该方法很简单,通过工具类,调用 NioServerSocketChannel 内部封装的 serverSocketChannel 的 accept 方法,熟悉的 Nio 做法。然后获取到一个 JDK 的 SocketChannel,然后,使用 NioSocketChannel 进行封装。最后添加到容器中。
3. NioSocketChannel 是如何创建的?
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel());
buf.add(new NioSocketChannel(this, ch));
return 1;
}
我们另起一段研究这段代码,先看 SocketUtils.accept(javaChannel());
public static SocketChannel accept(final ServerSocketChannel serverSocketChannel) throws IOException {
try {
return AccessController.doPrivileged(new PrivilegedExceptionAction<SocketChannel>() {
@Override
public SocketChannel run() throws IOException {
return serverSocketChannel.accept();
}
});
} catch (PrivilegedActionException e) {
throw (IOException) e.getCause();
}
}
该方法调用了 NioServerSocketChannel 中的 serverSocketChannel.accept() 方法。返回了一个 Nio 的通道,注意:这个通道,就是我们刚刚 Boss 线程监听到的 Accept 事件,相当于一个 Tcp 连接。
然后我们看 NioSocketChannel 的创建过程,其中参数 this 是 NioServerSocketChannel ,这个就是 SocketChannel 的 parent 属性,ch 是 SocketChannel 。构造方法如下:
和 ServerSocket 类似,还记得 ServerSocket 是怎么创建的吗:
还是很相似的。
我们先略过 config 的创建过程,先看 super。
这里设置了 SelectableChannel 属性为 JDK 的 Nio 的 SocketChannel 和 感兴趣的事件。设置非阻塞。
进入到 super 构造方法中:
也是和 ServerSocket 一样了,注意:这里的 unsafe 就和 ServerSocket 不同了,此方法被重写了,返回的是
NioSocketChannel$NioSocketChannelUnsafe, 是 NioSocketChannel 的内部类。再看 pipeline ,是相同的 DefaultChannelPipeline。同样 pipeline 也会自己创建自己的 head 节点和 tail 节点。
好了,到这里,NioSocketChannel 就创建完毕了。
回到 最初的 read 方法中。
4. 循环执行 pipeline.fireChannelRead 方法
从上面我们可以看到,doReadMessages 方法的作用是通过 ServerSocket 的 accept 方法获取到 Tcp 连接,然后封装成 Netty 的 NioSocketChannel 对象。最后添加到 容器中。
然后再 read 方法中,循环调用 ServerSocket 的 pipeline 的 fireChannelRead 方法。从这个方法的名字可以感受到:开始执行 管道中的 handler 的 ChannelRead 方法。
那么我们就看看:
到这里,楼主就不一个一个 dubug 了,实际上,我们知道,pipeline 里面又 4 个 handler ,分别是 Head,LoggingHandler,ServerBootstrapAcceptor,Tail。我们重点关注 ServerBootstrapAcceptor。debug 之后,断点会进入到 ServerBootstrapAcceptor 中来。我们来看看 ServerBootstrapAcceptor 的 channelRead 方法。
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {// 将客户端连接注册到 worker 线程池
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
我们讲该方法拆解:
- msg 强转成 Channel ,实际上就是 NioSocketChannel 。
- 添加 NioSocketChannel 的 pipeline 的 handler ,就是我们 main 方法里面设置的 childHandler 方法里的。
- 设置 NioSocketChannel 的各种属性。
- 最重要的,将该 NioSocketChannel 注册到 childGroup 中的一个 EventLoop 上,并添加一个监听器。
我们重点看最后一步,这个 childGroup 就是我们 main 方法创建的数组大小为 16 的 workerGroup。在创建 ServerBootstrapAcceptor 添加进来的。
进入 register 方法查看:
这里的 next 方法我们之前介绍过了,使用位运算获取数组中的EventLoop。
这里创建 DefaultChannelPromise 我们之前也看过了,最后该方法返回的就是这个 DefaultChannelPromise。
这里链式调用说明一下:
- premise 的 channel 方法返回的是 NioSocketChannel。
- promise.channel().unsafe() 返回的是 NioSocketChannel$NioSocketChannelUnsafe。
所以最终调用的是 NioSocketChannel 的内部类的 register 方法。参数是当前的 EventLoop 和 promise。
查看这个 register 方法:
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
eventLoop.execute(new Runnable() {// 开始真正的异步,boss 线程开始启动
@Override
public void run() {
register0(promise);
}
});
}
楼主精简了一下代码逻辑,其实就是同步或者异步的调用 register0 方法。大家可以向一下到底是异步还是同步?应该是异步的。应为此时的线程是 boss 线程,而不是 worder 线程,所以肯定无法通过 inEventLoop 判断。
进入到异步线程中查看 register0 方法。其实和我们之前分析的注册 ServerSocket 的过程是一样的。
其中最核心的就是 doRegister 方法。
doRegister 方法
pipeline.invokeHandlerAddedIfNeeded() 方法
回到 register0 中,该方法在成功注册到 selector 的读事件后,继续执行管道中可能存在的任务。那么管道中会存在什么任务呢?我们来看看:
到这里,我们发出疑问,这个 task 从哪里来的?
经过查找,我们发现,这个 pendingHandlerCallbackHead 变量来自我们 addLast 的时候,如果该 pipeline 还没有注册到这个 eventLoop 上,则将这个包装过 handler 的 context 放进变量 pendingHandlerCallbackHead 中,事实上,这个 pendingHandlerCallbackHead 就是个链表的表头,后面的 Context 会被包装成一个任务,追加到链表的尾部。
那么这个 execute 方法如何执行呢?
主要是执行 callHandlerAdded0 方法,并且传入这个 Context:
注意:这里调用了包装了自定义 handler 的 Context 的 handlerAdded 方法,并且传入了这个 Context。然后这个方法我们并没有重写,我们看父类中方法逻辑:
完美,调用了 initChannel 方法。但注意,这里调用的并不是我们重写的 initChannel 方法,因为参数不是同一个类型,我们重写的方法的参数是 SocketChannel,而不是ChannelHandlerContext,所以,肯定还需要再调用一层。
这里才是调用用户代码的地方。
我们的用户代码添加了两个处理器,还有一个自定义的处理器。当然,现在添加处理器不会再添加到那个 pendingHandlerCallbackHead 任务链表里了,因为已经注册过了,if 判断过不了。
operationComplete 方法
然后设置promise 的 operationComplete 方法。还记得我们在ServerBootstrap 的 channelRead 方法中的代码吗?
在这里调用了我们之前的设置的监听器的 operationComplete 方法。
pipeline.fireChannelRegistered() 方法
好,再然后调用 pipeline.fireChannelRegistered() 的方法。大家可以按照之前的 pipeline 的路子想一下,会如何执行?pipeline 作为管道,其中有我们设置的 handler 链表,这里,肯定会顺序执行我们的 handler,比如 main 方法中的 childerHandler。我们继续验证一下。
该方法会继续调用桥梁 Context 的 fireChannelRegistered 方法,Context 包装的就是我们自定义的 handler。当然我们没有重写该方法。我们只重写了 initChannel 方法。
pipeline.fireChannelActive() 方法
回到 register0 方法中,我们继续向下走,如果是第一次注册的话,执行pipeline.fireChannelActive()代码,也就是执行 pipeline 管道中的 handler 的 ChannelActive 方法。
同样,我们也没有重写该方法,父类会继续回调 fireChannelActive 方法。而这个方法里会继续寻找下一个 Context,然后继续调用,直到遇到 pipeline 的 channelActive(ChannelHandlerContext ctx) 方法:
这里有一行 readIfIsAutoRead 方法,我们注意一下,上面的 ChannelActive 方法都执行结束后,也就是已经连接已经成功后,便开始调用read方法。
同样的,如果熟悉服务器启动过程的同学肯定看出来了,这里最终会调用 doBeginRead 方法,也就是 AbstractNioChannel 类的方法。
在之前的 doRegister 方法中,只是注册了0,为什么呢?如果直接注册1,也就是读事件,但系统还没有准备好读取,现在一切都初始化了,就可以读取了。而这里是管道的 head 节点调用 unsafe 方法的。
到这里,针对于这个客户端的连接就完成了,接下来就可以监听读事件了。
总结:服务器接受客户端过程
- 服务器轮询 Accept 事件,获取事件后调用 unsafe 的 read 方法,这个 unsafe 是 ServerSocket 的内部类,该方法内部由2部分组成。
- doReadMessages 用于创建 NioSocketChannel 对象,该对象包装 JDK 的 Nio Channel 客户端。该方法会像创建 ServerSocketChanel 类似创建相关的 pipeline , unsafe,config。
- 随后执行 执行 pipeline.fireChannelRead 方法,并将自己绑定到一个 chooser 选择器选择的 workerGroup 中的一个 EventLoop。并且注册一个0,表示注册成功,但并没有注册读(1)事件.
- 在注册的同时,调用用户程序中设置的 ChannelInitializer handler,向管道中添加一个自定义的处理器,随后立即删除这个 ChannelInitializer handler,为什么呢?因为初始化好了,不再需要。
- 其中再调用管道的 channelActive 方法中,会将曾经注册过的 Nio 事件改成读事件,开始真正的读监听。到此完成所有客户端连接的读前准备。
总的来说就是:接受连接----->创建一个新的NioSocketChannel----------->注册到一个 worker EventLoop 上--------> 注册selecot Read 事件。
当然,关于,获取到读事件后该怎么处理还没有说,限于篇幅,留在下篇文章中。
good luck!!!!