文章目录
系类文章:
《Netty服务端启动源码分析(一)整体流程》
《Netty服务端启动源码分析(二)服务端Channel的端口绑定》
《Netty核心组件之NioEventLoop(一)创建》
《Netty核心组件之NioEventLoop(二)处理消息》
1. 概述
本篇接《Netty核心组件之NioEventLoop(一)创建》之后,上一篇文章中我们讲述了NioEventLoop的创建过程,这里继续讲述消息是如何处理的。
2. processSelectedKeys()
private void c() {
// 看文章最开头是否启用优化的设置,如果启用了会走这里
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
为什么processSelectedKeysOptimized不需要入参,而processSelectedKeysPlain需要入参?参考 《netty对selectedKeys进行优化》
从源码可以看到,processSelectedKeysOptimized和processSelectedKeysPlain的大部分处理逻辑是相同的,区别就在于对selectedKey的迭代逻辑,记得一开始说过如果开启了优化,netty对selectedKey的底层集合进行了优化,将HashSet改为了数组,HashSet底层用HashMap实现,迭代的效率是没有数组高的。
既然内部逻辑类似,重点看一下processSelectedKeysOptimized
()方法:
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
// 方便GC回收
selectedKeys.keys[i] = null;
final Object a = k.attachment();
// 根据类型不同执行不同的处理逻辑
if (a instanceof AbstractNioChannel) {
//[1]
processSelectedKey(k, (AbstractNioChannel) a);
} else {
// [2]一般不会走这个分支,除非用户主动注册NioTask到selector,netty单元测试里有案例
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
// 如果为true,则重置之后的所有selectKey,并调用selectNow()方法
// 因为run()方法执行本方法前已经置为false,所以不会进这里
if (needsToSelectAgain) {
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
}
}
【2】NioTask分支一般不会走,感兴趣可以看一下netty的单元测试。
重点看一下【1】AbstractNioChannel分支,如果attachment是AbstractNioChannel类型,说明它是NioServerSocketChannel或者NioSocketChannel,需要进行IO读写相关的操作。
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
// SelectionKey无效的处理
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
// channel没有关联的eventLoop直接返回
return;
}
// channel关联的eventLoop不是本eventLoop,直接返回,不应关闭channel
if (eventLoop != this || eventLoop == null) {
return;
}
// 关闭channel
unsafe.close(unsafe.voidPromise());
return;
}
try {
int readyOps = k.readyOps();
//[1] 对于NioSocketChannel,连接需要先finishConnect才能继续读写
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// 下面3行的操作只是将OP_CONNECT从感兴趣选项中移除,防止重复触发
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
// [2]说明有半包消息未发送完成,调用flush发送即可
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
//[3] unsafe是多态,对于NioServerSocketChannel,read就是接受客户端TCP连接
// 对于NioSocketChannel,就是从channel中读取ByteBuffer
// 同时检测readyOps == 0 是解决JDK的循环bug
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
- 【1】处理connect事件
- 【2】处理write事件
- 【3】处理读和Accept事件 疑问:为什么读和Accept在一起处理?
这里可以看到熟悉的代码,当有链接(OP_ACCEPT )进来的时候,便会走 unsafe.read():这个里面就调用了doReadMessages,汇总下事件类型:
- Connect, 即连接事件(TCP 连接), 对应于SelectionKey.OP_CONNECT int值为16
- Accept, 即确认事件, 对应于SelectionKey.OP_ACCEPT int值为8
- Read, 即读事件, 对应于SelectionKey.OP_READ, 表示 buffer 可读 int值为1
- Write, 即写事件, 对应于SelectionKey.OP_WRITE, 表示 buffer 可写 int值为4