文接上一篇。上篇讲到netty暴露一个端口出来,acceptor, handler, pipeline, eventloop 都已准备好。但是并没体现其如何处理接入新的网络请求,今天我们就一起来看看吧。
1. 回顾下eventloop主循环
上篇讲到,netty启动起来之后,就会有很多个eventloop线程会在一直工作,比如进行select或者执行task. 我们再来回顾 NioEventLoop 的实现方式吧!
我们先看看下 NioEventLoop 的类图吧:
看起来非常复杂,不管它。它核心方法自然是 run();
// io.netty.channel.nio.NioEventLoop#run
@Override
protected void run() {
// 一个死循环检测任务, 这就 eventloop 的大杀器哦
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
// 有任务时执行任务, 否则阻塞等待网络事件, 或被唤醒
case SelectStrategy.SELECT:
// select.select(), 带超时限制
select(wakenUp.getAndSet(false)); // 'wakenUp.compareAndSet(false, true)' is always evaluated
// before calling 'selector.wakeup()' to reduce the wake-up
// overhead. (Selector.wakeup() is an expensive operation.)
//
// However, there is a race condition in this approach.
// The race condition is triggered when 'wakenUp' is set to
// true too early.
//
// 'wakenUp' is set to true too early if:
// 1) Selector is waken up between 'wakenUp.set(false)' and
// 'selector.select(...)'. (BAD)
// 2) Selector is waken up between 'selector.select(...)' and
// 'if (wakenUp.get()) { ... }'. (OK)
//
// In the first case, 'wakenUp' is set to true and the
// following 'selector.select(...)' will wake up immediately.
// Until 'wakenUp' is set to false again in the next round,
// 'wakenUp.compareAndSet(false, true)' will fail, and therefore
// any attempt to wake up the Selector will fail, too, causing
// the following 'selector.select(...)' call to block
// unnecessarily.
//
// To fix this problem, we wake up the selector again if wakenUp
// is true immediately after selector.select(...).
// It is inefficient in that it wakes up the selector for both
// the first case (BAD - wake-up required) and the second case
// (OK - no wake-up required). if (wakenUp.get()) {
selector.wakeup();
}
// fall through
default:
} cancelledKeys = 0;
needsToSelectAgain = false;
// ioRatio 为io操作的占比, 和运行任务相比, 默认为 50:50
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
// step1. 运行io操作
processSelectedKeys();
} finally {
// Ensure we always run tasks.
// step2. 运行task任务
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
// 运行任务的最长时间
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
// select, 事件循环的依据
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
// 带超时限制, 默认最大超时1s, 但当有延时任务处理时, 以它为标准
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); for (;;) {
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {
// 超时则立即返回
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
} // If a task was submitted when wakenUp value was true, the task didn't get a chance to call
// Selector#wakeup. So we need to check task queue again before executing select operation.
// If we don't, the task might be pended until select operation was timed out.
// It might be pended until idle timeout if IdleStateHandler existed in pipeline.
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
} int selectedKeys = selector.select(timeoutMillis);
selectCnt ++; if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
// - Selected something,
// - waken up by user, or
// - the task queue has a pending task.
// - a scheduled task is ready for processing
break;
}
if (Thread.interrupted()) {
// Thread was interrupted so reset selected keys and break so we not run into a busy loop.
// As this is most likely a bug in the handler of the user or it's client library we will
// also log it.
//
// See https://github.com/netty/netty/issues/2426
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely because " +
"Thread.currentThread().interrupt() was called. Use " +
"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
}
selectCnt = 1;
break;
} long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// timeoutMillis elapsed without anything selected.
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// The selector returned prematurely many times in a row.
// Rebuild the selector to work around the problem.
logger.warn(
"Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
selectCnt, selector); rebuildSelector();
selector = this.selector; // Select again to populate selectedKeys.
selector.selectNow();
selectCnt = 1;
break;
} currentTimeNanos = time;
} if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
}
} catch (CancelledKeyException e) {
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
// Harmless exception - log anyway
}
}
大体来说就是:eventloop是一个一直在运行的线程,它会不停地检测是否发生了网络事件或者被提交上来了新任务,如果有那么就会去执行这些任务。
在处理io事件和task时,为防止调度的饥饿调度,它设置了一个ioRatio来避免发生。即如果io事件占用了ioTime时间,那么task也应该占用相应剩下比例的时间,以保持公平性。
在实现上,发现网络io事件是通过 selector.select()的,而发现task任务是通过 hasTasks()来实现检测的。每检测一次,一般不超过1s的休眠时间,以免在特殊情况下发生意外而导致系统假死。
2. 运行io操作
io操作主要就是监控一些网络事件,比如新连接请求,请请求,写请求,关闭请求等。它是一个网络应用的非常核心的功能之一。从eventloop的核心循环中,我们看到其 processSelectedKeys() 就做这一事情的。
// io.netty.channel.nio.NioEventLoop#processSelectedKeys
private void processSelectedKeys() {
// selectedKeys 为前面进行bind()时初始化掉的,所以不会为空
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
} private void processSelectedKeysOptimized() {
// 当无网络事件发生时,selectedKeys.size=0, 不会发生处理行为
for (int i = 0; i < selectedKeys.size; ++i) {
// 当有网络事件发生时,selectedKeys 为各就绪事件
final SelectionKey k = selectedKeys.keys[i];
// null out entry in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.keys[i] = null; final Object a = k.attachment(); if (a instanceof AbstractNioChannel) {
// 转换成相应的channel, 调用
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
} if (needsToSelectAgain) {
// null out entries in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.reset(i + 1); selectAgain();
i = -1;
}
}
}
// 处理具体的socket
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
//
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
// If the channel implementation throws an exception because there is no event loop, we ignore this
// because we are only trying to determine if ch is registered to this event loop and thus has authority
// to close ch.
return;
}
// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
// still healthy and should not be closed.
// See https://github.com/netty/netty/issues/5125
if (eventLoop != this || eventLoop == null) {
return;
}
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
return;
} try {
// 取出就绪事件类型进行判断
int readyOps = k.readyOps();
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
// 如果是连接事件,则先进行连接操作,触发 finishConnect() 事件链
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops); unsafe.finishConnect();
} // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
// 如果是写事件,则强制channel写数据
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
} // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
// 读取数据, OP_READ, OP_ACCEPT 会进入到此处,事件处理从此开始
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
// io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read
@Override
public void read() {
// 此处断言,只有io线程本身才可以进行read()操作,如果被其他线程执行,那就是有总是的
assert eventLoop().inEventLoop();
// 取出config, Pipeline...
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
// 调用 allocator 分配接收内存, io.netty.channel.AdaptiveRecvByteBufAllocator.HandleImpl
// 并重置读取状态
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config); boolean closed = false;
Throwable exception = null;
try {
try {
do {
// 1. 初步读取数据
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
allocHandle.incMessagesRead(localRead);
// 通过allocHandle判定是否已读取数据完成
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
} int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
// 2. 事件通知: fireChannelRead(), accept() 之后的channel作为数据源传入pipeline中
// 此 pipeline 结构为 head -> ServerBootstrapAcceptor -> tail
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
// 事件通知: channelReadComplete()
// 注意,此时read操作极有可能还未完成,而此进进行 complete 操作是否为时过早呢?
// 是的,但是不用担心,eventLoop可以保证先提交的事件会先执行,所以这里就只管放心提交吧
// 这也是accept不会阻塞eventLoop的原因,即虽然大家同在 eventLoop 上,但是accept很快就返回了
pipeline.fireChannelReadComplete(); if (exception != null) {
closed = closeOnReadError(exception); pipeline.fireExceptionCaught(exception);
} if (closed) {
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}
以上是处理一条io事件的大体流程:
1. 调用 AdaptiveRecvByteBufAllocator 分配一个新的 ByteBuf, 用于接收新数据;
2. 调用 doReadMessages() 转到 accept() 接收socket进来, 存入 ByteBuf 备用;
3. 对接入的socket, 调用pipeline.fireChannelRead(), 处理读过程;
4. 调用pipeline.fireChannelReadComplete() 方法,触发read完成事件;
5. 异常处理;
注意,当前运行的线程是在bossGroup中,它的pipeline是相对固定的,即只有head -> acceptor -> tail, 而我们的handler是在childGroup中的,所以我们只能再等等咯。
下面我们就来细分解下这几个步骤!
2.1 acceptor 接入socket
在调用AdaptiveRecvByteBufAllocator, 分配一个新的 allocHandle 之后,就进行socket的接入,实际上就是调用 serverSocketChannel.accept() 方法, 初步读取数据。来看下!
// 处理预备 allocHandle, 以便进行判定是否数据读取完成
// io.netty.channel.AbstractChannel.AbstractUnsafe#recvBufAllocHandle
@Override
public RecvByteBufAllocator.Handle recvBufAllocHandle() {
if (recvHandle == null) {
recvHandle = config().getRecvByteBufAllocator().newHandle();
}
return recvHandle;
}
// 重置读取状态
// io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator.MaxMessageHandle#reset
@Override
public void reset(ChannelConfig config) {
this.config = config;
maxMessagePerRead = maxMessagesPerRead();
totalMessages = totalBytesRead = 0;
}
// 通过allocHandle判定是否已读取数据完成
// io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator.MaxMessageHandle#continueReading()
@Override
public boolean continueReading() {
return continueReading(defaultMaybeMoreSupplier);
} @Override
public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
return config.isAutoRead() &&
(!respectMaybeMoreData || maybeMoreDataSupplier.get()) &&
// accept 时, totalMessages = 1, 此条件必成立。
// 但totalBytesRead=0, 所以必然返回false, 还需要继续读数据
totalMessages < maxMessagePerRead &&
totalBytesRead > 0;
} // accept 新的socket
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
// 也就是说, 对于netty而言, 是先知道有事件到来, 然后才去调用 accept() 方法的
// 而accept() 方法则是会阻塞当前线程的哟, 但此时select()已经唤醒, 所以也意味着数据已经准备就绪,此处将会立即返回了
SocketChannel ch = SocketUtils.accept(javaChannel()); try {
if (ch != null) {
// 将当前注册的accept() 添加的buf结果中
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t); try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
} return 0;
}
// io.netty.util.internal.SocketUtils#accept
public static SocketChannel accept(final ServerSocketChannel serverSocketChannel) throws IOException {
try {
return AccessController.doPrivileged(new PrivilegedExceptionAction<SocketChannel>() {
@Override
public SocketChannel run() throws IOException {
return serverSocketChannel.accept();
}
});
} catch (PrivilegedActionException e) {
throw (IOException) e.getCause();
}
}
将新接入的socket封装成 NioSocketChannel 后, 添加到 readBuf 中, 进入下一步.
2.2 read 事件传播
socket 接入完成后, 会依次读取数据. (所以, 前面会同时接入多个 socket ??) pipeline 机制正式上场. 此时pipeline中有head,acceptor,tail, 但只有acceptor会真正处理数据.
// channelRead() 事件通知, 从 head 开始, 由 acceptor 处理
// io.netty.channel.DefaultChannelPipeline#fireChannelRead
@Override
public final ChannelPipeline fireChannelRead(Object msg) {
// 将pipeline中的head节点作为起始channelHandler传入,处理消息
// head 实现: efaultChannelPipeline.HeadContext, 它既能处理 inbound, 也能处理 outbound 数据。
// 即其实现了 ChannelOutboundHandler, ChannelInboundHandler
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
// io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead(io.netty.channel.AbstractChannelHandlerContext, java.lang.Object)
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
// 此处也是一个扩展点, 如果该channel实现了 ReferenceCounted, 则创建一个新的 ReferenceCounted msg 包装, 并调用其touch 方法
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
// 当前事件循环发现的数据,直接走此处
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
// io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead(java.lang.Object)
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
// 开始调用真正的 channelRead()
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}
// io.netty.channel.DefaultChannelPipeline.HeadContext#channelRead
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// head 节点没有什么特别需要处理的,直接继续调用 fireChannelRead() 即可
ctx.fireChannelRead(msg);
}
// io.netty.channel.AbstractChannelHandlerContext#fireChannelRead
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
// 查找下一个入站处理器(查找方式前面已看过,就是以当前节点作为起点查找pipeline的下一个入站 channelHandlerContext, 调用即可
// 此处调用与head节点的调用不同之处在于, head的调用是硬编码的, 但此处则是动态的, 可递归的
// 而真正的差别是在于 channelHandler 的实现不同,从而处理不同的业务
// 对于刚刚 accept 之后的数据,必然会经过 Acceptor, 如下
invokeChannelRead(findContextInbound(), msg);
return this;
} // 几经周折, 最终转到 ServerBootstrapAcceptor, 它会进行真正的数据处理, 实际上就是提交数据到 childGroup 中
// io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 对外部的channel进行还原, 将业务的 childHandler 添加到 pipeline 中
// 添加方式与之前的一样,会涉及到name的生成,ChannelHandlerContext的构建。。。
final Channel child = (Channel) msg;
// 将业务设置的 childHandler 绑定到child pipeline 中, 即此时才会触发 ChannelInitializer.initChannel()
// 每次新的socket接入, 都会触发一次 initChannel() 哦
child.pipeline().addLast(childHandler);
// 复制各种配置属性到 child 中
setChannelOptions(child, childOptions, logger); for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
} try {
// 注册child, 以及添加一个 回调
// register 时就会将当前channel与一个eventLoop线程绑定起来,后续所有的操作将会在这个eventloop线程上执行
// 同时,它会将当前channel与 nio的selector 绑定注册起来
// 到此,acceptor的任务就算完成了
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
acceptor 最主要的工作就是将socket提交到 childGroup 中. 而childGroup的注册过程, 与bossGroup的注册过程是一致的, 它们的最大差异在于关注的事件不一致. acceptor 关注 OP_ACCEPT, 而childGroup 关注 OP_READ.
2.3 readComplete 事件的传播
实际上,在bossGroup中, readComplete() 事件基本是会被关注的, 但我们也可以通过它来了解下 readComplete 的传播方式吧! 总体和 read() 事件的传播是一致的.
// io.netty.channel.DefaultChannelPipeline#fireChannelReadComplete
@Override
public final ChannelPipeline fireChannelReadComplete() {
// 同样以 head 作为起点开始传播
AbstractChannelHandlerContext.invokeChannelReadComplete(head);
return this;
}
// 通用的调用 handler 方式
// io.netty.channel.AbstractChannelHandlerContext#invokeChannelReadComplete(io.netty.channel.AbstractChannelHandlerContext)
static void invokeChannelReadComplete(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelReadComplete();
} else {
Runnable task = next.invokeChannelReadCompleteTask;
if (task == null) {
next.invokeChannelReadCompleteTask = task = new Runnable() {
@Override
public void run() {
next.invokeChannelReadComplete();
}
};
}
executor.execute(task);
}
}
// 通用pipeline调用模型
// io.netty.channel.AbstractChannelHandlerContext#invokeChannelReadComplete()
private void invokeChannelReadComplete() {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelReadComplete(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelReadComplete();
}
}
// io.netty.channel.DefaultChannelPipeline.HeadContext#channelReadComplete
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelReadComplete(); readIfIsAutoRead();
}
// io.netty.channel.AbstractChannelHandlerContext#fireChannelReadComplete
@Override
public ChannelHandlerContext fireChannelReadComplete() {
// 通用的 fireXXX 事件传播方式,如果想调用下一节点,则调用 fireXXX, 否则pipeline将会被终止
// 以当前节点作为起点查找下一个入站处理器 handler
// 在acceptor中,最终会转到 ServerBootstrapAcceptor.readComplete()中
invokeChannelReadComplete(findContextInbound());
return this;
} // io.netty.channel.ChannelInboundHandlerAdapter#channelReadComplete
/**
* Calls {@link ChannelHandlerContext#fireChannelReadComplete()} to forward
* to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
*
* Sub-classes may override this method to change behavior.
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// 因为 ServerBootstrapAcceptor 并没有重写 channelReadComplete 方法,所以直接忽略该事件了
// 而 tail 节点中的默认 onUnhandledInboundChannelReadComplete() 也是空处理
ctx.fireChannelReadComplete();
}
总结下 pipeline 的传播方式:
1. 以 pipeline.fireChannelReadComplete() 等方式触发事件传播;
2. 调用 invokeChannelReadComplete, 传入 head或者tail作为传播的起点;
3. 判断是否在 eventloop 中,如果是则直接调用 next.invokeChannelReadComplete();
4. 调用 handler.channelReadComplete(this) 触发具体的事件;
5. 具体handler处理事务,如果想向下一节点传播,则调用 ctx.fireChannelReadComplete(), 否则停止传播;
以上是以 fireChannelReadComplete 来讲解的pipeline过程,实际上也是几乎所有的事件传播的方式。
3. childGroup 运行io操作
上一节讲到的是acceptor接入了socket, 他会提交到childGroup中进行处理, 然后自己就返回了。那么 childGroup 又是如何处理事务的呢?
实际上,它与bossGroup是完全一样的处理方式,差别在于它们各自的pipeline是不一样的,线程数是不一样的,从而实现处理不同业务。而它处理是的读写事件,而acceptor则是处理的OP_ACCEPT事件。它的OP_READ事件是在创建NioSocketChannel的时候注册好的。我们先看看下:
// 在bossGroup处理Accept事件时,创建 NioSocketChannel
// io.netty.channel.socket.nio.NioServerSocketChannel#doReadMessages
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel()); try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t); try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
} return 0;
}
// io.netty.channel.socket.nio.NioSocketChannel#NioSocketChannel
/**
* Create a new instance
*
* @param parent the {@link Channel} which created this instance or {@code null} if it was created by the user
* @param socket the {@link SocketChannel} which will be used
*/
public NioSocketChannel(Channel parent, SocketChannel socket) {
// 在父类中处理事件监听
super(parent, socket);
config = new NioSocketChannelConfig(this, socket.socket());
}
// io.netty.channel.nio.AbstractNioByteChannel#AbstractNioByteChannel
/**
* Create a new instance
*
* @param parent the parent {@link Channel} by which this instance was created. May be {@code null}
* @param ch the underlying {@link SelectableChannel} on which it operates
*/
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
// 注册 OP_READ 事件
super(parent, ch, SelectionKey.OP_READ);
}
ok, 说回childGroup处理事件流中。因大家都是 NioEventLoopGroup, 所以创建的eventloop自然都是一样的。即都会处理io事件和task运行。回顾下上节的processSelectedKey()操作:
// io.netty.channel.nio.NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey, io.netty.channel.nio.AbstractNioChannel)
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
// If the channel implementation throws an exception because there is no event loop, we ignore this
// because we are only trying to determine if ch is registered to this event loop and thus has authority
// to close ch.
return;
}
// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
// still healthy and should not be closed.
// See https://github.com/netty/netty/issues/5125
if (eventLoop != this || eventLoop == null) {
return;
}
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
return;
} try {
int readyOps = k.readyOps();
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops); unsafe.finishConnect();
} // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
} // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
// 走不一样的 unsafe 实现
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
} // io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read
@Override
public final void read() {
final ChannelConfig config = config();
// 判断是否终止读数据,比如socket关闭等原因
if (shouldBreakReadReady(config)) {
clearReadPending();
return;
}
// step1. 环境准备,pipeline, allocator...
// 这里的 pipeline 就是我们自定义传入的各种handler了
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config); ByteBuf byteBuf = null;
boolean close = false;
try {
do {
// 每次循环读取数据时,都进行重新内存分配,默认分配 1024的byte内存
byteBuf = allocHandle.allocate(allocator);
// step2. 将数据读取放入 byteBuf 中, 并由 allocHandle 记录读取的数据
allocHandle.lastBytesRead(doReadBytes(byteBuf));
// 当数据读取完成或者进行close时,会读取 -1
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read. release the buffer.
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
// There is nothing left to read as we received an EOF.
readPending = false;
}
break;
}
// 读取数据记录次数 +1
allocHandle.incMessagesRead(1);
readPending = false;
// step3. 触发pipeline 的channelRead() 事件
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading()); allocHandle.readComplete();
// 触发 channelReadComplete 事件,传播
pipeline.fireChannelReadComplete(); if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}
以上,就是 childGroup 处理 io 事件的基本过程了。总体和acceptor的差不多,这也是netty抽象得比较合理的地方,所有地方都可以套用同一个模式。
1. 准备环境,获取pipeline,配置config分配内存;
2. doReadBytes() 读取数据buffer, 最大读取1024字节;
3. 读取完成后记录并触发pipeline下游处理本次的channelRead()事件,保证各handler都有机会处理该部分数据;
4. 只要数据没读取完,且没有超过最大数据量限制,循环处理2/3步骤;
5. 总体触发一次 channelReadComplete 事件,并同理在pipeline中传播;
6. 异常处理,close处理;
pipeline 的传播方式, 前面我们已经见识过了,范式就是:read() 作为入站事件, 从head开始传播,依次调用各handler的channelRead()方法,直到链尾。
接下来我们就其中几个关键的步骤看下,netty都是如何实现的。
3.1. doReadBytes 读取socket数据
// 想想应该都能知道,就是从socket中将buffer读取存入到 byteBuf 中
// io.netty.channel.socket.nio.NioSocketChannel#doReadBytes
@Override
protected int doReadBytes(ByteBuf byteBuf) throws Exception {
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.attemptedBytesRead(byteBuf.writableBytes());
// 获取 SocketChannel, 然后读取其中的数据, 写入 byteBuf 中,也是一个从内核到heap的一个拷贝过程
return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
}
// io.netty.buffer.AbstractByteBuf#writeBytes
@Override
public int writeBytes(ScatteringByteChannel in, int length) throws IOException {
ensureWritable(length);
int writtenBytes = setBytes(writerIndex, in, length);
// 保证写指针的同步
if (writtenBytes > 0) {
writerIndex += writtenBytes;
}
return writtenBytes;
}
// io.netty.buffer.PooledUnsafeDirectByteBuf#setBytes
@Override
public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
checkIndex(index, length);
// 获取 ByteBuf 的共享变量,设值后 ByteBuf 可共享到
// DirectByteBuffer 就体现在这里
ByteBuffer tmpBuf = internalNioBuffer();
index = idx(index);
tmpBuf.clear().position(index).limit(index + length);
try {
// 从 socketChannel 中读取数据到 tmpBuf 中,
// 此处看起来是存在内存拷贝,但实际上被使用直接内存时,并不会新建,而直接共用内核中内存数据即可
return in.read(tmpBuf);
} catch (ClosedChannelException ignored) {
return -1;
}
}
以上就是socket数据的读取过程了,总体可以描述为内核内存到java堆内存的拷贝过程(当然具体实现方式是另一回事)。
数据读取完成后(可能是部分),就会交pipeline处理这部分数据,head -> handler... -> tail 的过程。我们还是一个具体的 netty提供的一个解码的实现:
3.2. netty解码实现1 byteToMsg
就是一个 channelRead 处理过程 。
// io.netty.handler.codec.ByteToMessageDecoder#channelRead
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
CodecOutputList out = CodecOutputList.newInstance();
try {
ByteBuf data = (ByteBuf) msg;
first = cumulation == null;
// 如果是第一次进来,则直接赋值data, 后续则附加到 cumulation 中,以达到连接字节的作用
// 一般每个连接进来之后,会创建一个 Decoder, 后续处理数据就会都会存在连接总是,但总体来说都是线程安全的
if (first) {
cumulation = data;
} else {
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
// 调用decode方法,将byte转换为string
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Exception e) {
throw new DecoderException(e);
} finally {
if (cumulation != null && !cumulation.isReadable()) {
numReads = 0;
// 释放buffer
cumulation.release();
cumulation = null;
} else if (++ numReads >= discardAfterReads) {
// We did enough reads already try to discard some bytes so we not risk to see a OOME.
// See https://github.com/netty/netty/issues/4275
numReads = 0;
discardSomeReadBytes();
} int size = out.size();
decodeWasNull = !out.insertSinceRecycled();
// 通知下游数据到来,依次遍历out的数据调用下游
fireChannelRead(ctx, out, size);
out.recycle();
}
} else {
ctx.fireChannelRead(msg);
}
} // io.netty.handler.codec.ByteToMessageDecoder#callDecode
/**
* Called once data should be decoded from the given {@link ByteBuf}. This method will call
* {@link #decode(ChannelHandlerContext, ByteBuf, List)} as long as decoding should take place.
*
* @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
* @param in the {@link ByteBuf} from which to read data
* @param out the {@link List} to which decoded messages should be added
*/
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
try {
while (in.isReadable()) {
int outSize = out.size();
// 处理遗留数据
if (outSize > 0) {
// out中有数据,则重新触发 channelRead() 以使下游可感知该数据
fireChannelRead(ctx, out, outSize);
out.clear(); // Check if this handler was removed before continuing with decoding.
// If it was removed, it is not safe to continue to operate on the buffer.
//
// See:
// - https://github.com/netty/netty/issues/4635
if (ctx.isRemoved()) {
break;
}
outSize = 0;
} int oldInputLength = in.readableBytes();
// 调用解码方法,对对in数据进行处理,并必要情况下输出结果到 out 中
decodeRemovalReentryProtection(ctx, in, out); // Check if this handler was removed before continuing the loop.
// If it was removed, it is not safe to continue to operate on the buffer.
//
// See https://github.com/netty/netty/issues/1664
if (ctx.isRemoved()) {
break;
}
// 没有读取到数据,或者未满足输出数据的要求(如读取到半包),前后的 out 大小相等
if (outSize == out.size()) {
if (oldInputLength == in.readableBytes()) {
break;
} else {
continue;
}
}
// 读取完成后, readableBytes() 一般会变为0
if (oldInputLength == in.readableBytes()) {
throw new DecoderException(
StringUtil.simpleClassName(getClass()) +
".decode() did not read anything but decoded a message.");
} if (isSingleDecode()) {
break;
}
}
} catch (DecoderException e) {
throw e;
} catch (Exception cause) {
throw new DecoderException(cause);
}
} // io.netty.handler.codec.ByteToMessageDecoder#decodeRemovalReentryProtection
/**
* Decode the from one {@link ByteBuf} to an other. This method will be called till either the input
* {@link ByteBuf} has nothing to read when return from this method or till nothing was read from the input
* {@link ByteBuf}.
*
* @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
* @param in the {@link ByteBuf} from which to read data
* @param out the {@link List} to which decoded messages should be added
* @throws Exception is thrown if an error occurs
*/
final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
throws Exception {
decodeState = STATE_CALLING_CHILD_DECODE;
try {
// 将byte数据转换为想要的类型,即我们自定义处理的地方
decode(ctx, in, out);
} finally {
boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
decodeState = STATE_INIT;
if (removePending) {
handlerRemoved(ctx);
}
}
}
// 比如如下实现,将byte转换为string
public class MessageDecoder extends ByteToMessageDecoder { //从ByteBuf中获取字节,转换成对象,写入到List中
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
buffer.markReaderIndex();
byte[] data=new byte[buffer.readableBytes()];
buffer.readBytes(data);
out.add(new String(data,"UTF-8"));
}
} // 触发pipeline下游handler处理数据
// io.netty.handler.codec.ByteToMessageDecoder#fireChannelRead
/**
* Get {@code numElements} out of the {@link CodecOutputList} and forward these through the pipeline.
*/
static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) {
for (int i = 0; i < numElements; i ++) {
ctx.fireChannelRead(msgs.getUnsafe(i));
}
}
总结下对数据的解码过程:
1. 接收外部读取的byteBuf;
2. 判断数据是否足够进行解码,如果解码成功将其添加到out中;
3. 将out的数据传入到pipeline下游,进行业务处理;
4. 释放已读取的buffer数据,进入下一次数据读取准备;
对于短连接请求,每次都会有新的encoder, decoder, 但对于长连接而言, 则会复用之前的handler, 从而也需要处理好各数据的分界问题,即自定义协议时得够严谨以避免误读。
4. write 数据的实现
write 数据是向对端进行数据输出的过程,一般有 write, 和 flush 过程, write 仅向应用缓冲中写入数据,在合适的时候flush到对端。而writeAndFlush则表示立即输出数据到对端。有 DefaultChannelHandlerContext 的实现:
// io.netty.channel.AbstractChannelHandlerContext#writeAndFlush
@Override
public ChannelFuture writeAndFlush(Object msg) {
return writeAndFlush(msg, newPromise());
}
// io.netty.channel.AbstractChannelHandlerContext#newPromise
@Override
public ChannelPromise newPromise() {
// channel 会从pipeline中获取, executor 即channel中绑定的io线程
return new DefaultChannelPromise(channel(), executor());
}
// io.netty.channel.AbstractChannelHandlerContext#writeAndFlush
@Override
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
if (msg == null) {
throw new NullPointerException("msg");
}
// channel 等信息校验
if (isNotValidPromise(promise, true)) {
ReferenceCountUtil.release(msg);
// cancelled
return promise;
}
// 写数据, flush=true
write(msg, true, promise); return promise;
} private void write(Object msg, boolean flush, ChannelPromise promise) {
// write 为出站事件, 从当前节点查找 出站handler, 直到head
AbstractChannelHandlerContext next = findContextOutbound();
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
// 下一节点处理
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
AbstractWriteTask task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, m, promise);
} else {
task = WriteTask.newInstance(next, m, promise);
}
safeExecute(executor, task, promise, m);
}
} // io.netty.channel.AbstractChannelHandlerContext#invokeWriteAndFlush
private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
if (invokeHandler()) {
// step1. write 事件写数据到缓冲区
invokeWrite0(msg, promise);
// step2. flush 事件写缓冲区数据到对端
invokeFlush0();
} else {
writeAndFlush(msg, promise);
}
}
4.1. netty write 的事件如何处理
write 含义明确,写数据到xxx。那这是如何实现的呢?(仅从应用层分析,咱们就不讨论底层TCP协议了)
实际上,它就是write事件的传播过程,最终由 head 节点处理。
private void invokeWrite0(Object msg, ChannelPromise promise) {
try {
// write 传递
((ChannelOutboundHandler) handler()).write(this, msg, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
} // 此处由 encoder 进行处理
// io.netty.handler.codec.MessageToByteEncoder#write
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ByteBuf buf = null;
try {
if (acceptOutboundMessage(msg)) {
@SuppressWarnings("unchecked")
I cast = (I) msg;
// 分配byteBuf, 处理输出,和读取一样,可以使用 DirectByteBuffer
buf = allocateBuffer(ctx, cast, preferDirect);
try {
// 调用业务实现的 encode 方法,写数据到 buf 中
encode(ctx, cast, buf);
} finally {
ReferenceCountUtil.release(cast);
} if (buf.isReadable()) {
// 如果被写入数据到 buf 中,则传播write事件
// 直到head 完成
ctx.write(buf, promise);
} else {
buf.release();
ctx.write(Unpooled.EMPTY_BUFFER, promise);
}
buf = null;
} else {
ctx.write(msg, promise);
}
} catch (EncoderException e) {
throw e;
} catch (Throwable e) {
throw new EncoderException(e);
} finally {
if (buf != null) {
buf.release();
}
}
} @Override
public ByteBuf ioBuffer() {
if (PlatformDependent.hasUnsafe()) {
return directBuffer(DEFAULT_INITIAL_CAPACITY);
}
return heapBuffer(DEFAULT_INITIAL_CAPACITY);
} // head 节点会处理具体的写入细节
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
unsafe.write(msg, promise);
}
// io.netty.channel.AbstractChannel.AbstractUnsafe#write
@Override
public final void write(Object msg, ChannelPromise promise) {
assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
// If the outboundBuffer is null we know the channel was closed and so
// need to fail the future right away. If it is not null the handling of the rest
// will be done in flush0()
// See https://github.com/netty/netty/issues/2362
safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
// release message now to prevent resource-leak
ReferenceCountUtil.release(msg);
return;
} int size;
try {
// 处理为 DirectByteBuffer
msg = filterOutboundMessage(msg);
size = pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
safeSetFailure(promise, t);
ReferenceCountUtil.release(msg);
return;
}
// 添加数据到 outboundBuffer 中,即输出缓冲区
outboundBuffer.addMessage(msg, size, promise);
} // io.netty.channel.nio.AbstractNioByteChannel#filterOutboundMessage
@Override
protected final Object filterOutboundMessage(Object msg) {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (buf.isDirect()) {
return msg;
} return newDirectBuffer(buf);
} if (msg instanceof FileRegion) {
return msg;
} throw new UnsupportedOperationException(
"unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
} // io.netty.channel.ChannelOutboundBuffer#addMessage
/**
* Add given message to this {@link ChannelOutboundBuffer}. The given {@link ChannelPromise} will be notified once
* the message was written.
*/
public void addMessage(Object msg, int size, ChannelPromise promise) {
Entry entry = Entry.newInstance(msg, size, total(msg), promise);
if (tailEntry == null) {
flushedEntry = null;
} else {
Entry tail = tailEntry;
tail.next = entry;
}
tailEntry = entry;
if (unflushedEntry == null) {
unflushedEntry = entry;
} // increment pending bytes after adding message to the unflushed arrays.
// See https://github.com/netty/netty/issues/1619
incrementPendingOutboundBytes(entry.pendingSize, false);
}
private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
if (size == 0) {
return;
} long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
// 超出一定数量后,需要主动flush
setUnwritable(invokeLater);
}
}
private void setUnwritable(boolean invokeLater) {
for (;;) {
final int oldValue = unwritable;
final int newValue = oldValue | 1;
if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
if (oldValue == 0 && newValue != 0) {
fireChannelWritabilityChanged(invokeLater);
}
break;
}
}
}
即write只向 outboundBuffer中写入数据,应该是比较快速的。但它也是经历了 pipeline 的事件流的层层处理,如果想在这其中做点什么,也是比较方便的。
4.2. flush 事件流处理
上面一步写入数据到 outboundBuffer 中,并未向对端响应数据,需要进行 flush 对端才能感知到。
private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
if (invokeHandler()) {
invokeWrite0(msg, promise);
invokeFlush0();
} else {
writeAndFlush(msg, promise);
}
}
// io.netty.channel.AbstractChannelHandlerContext#invokeFlush0
private void invokeFlush0() {
try {
// 由 MessageEncoder 处理
((ChannelOutboundHandler) handler()).flush(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
}
// io.netty.channel.ChannelOutboundHandlerAdapter#flush
/**
* Calls {@link ChannelHandlerContext#flush()} to forward
* to the next {@link ChannelOutboundHandler} in the {@link ChannelPipeline}.
*
* Sub-classes may override this method to change behavior.
*/
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
// io.netty.channel.AbstractChannelHandlerContext#flush
@Override
public ChannelHandlerContext flush() {
// 出站handler, 依次调用, 直到head
final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeFlush();
} else {
Runnable task = next.invokeFlushTask;
if (task == null) {
next.invokeFlushTask = task = new Runnable() {
@Override
public void run() {
next.invokeFlush();
}
};
}
safeExecute(executor, task, channel().voidPromise(), null);
} return this;
}
private void invokeFlush() {
if (invokeHandler()) {
// 遍历 pipeline
invokeFlush0();
} else {
flush();
}
}
// head 节点负责最终的数据flush
// io.netty.channel.DefaultChannelPipeline.HeadContext#flush
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
// unsafe 为 NioSocketChannel$NioSocketChannelUnsafe
unsafe.flush();
}
// io.netty.channel.AbstractChannel.AbstractUnsafe#flush
@Override
public final void flush() {
assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
return;
} outboundBuffer.addFlush();
flush0();
} // io.netty.channel.ChannelOutboundBuffer#addFlush
/**
* Add a flush to this {@link ChannelOutboundBuffer}. This means all previous added messages are marked as flushed
* and so you will be able to handle them.
*/
public void addFlush() {
// There is no need to process all entries if there was already a flush before and no new messages
// where added in the meantime.
//
// See https://github.com/netty/netty/issues/2577
// 使用 unflushedEntry 保存要被 flush 的数据
Entry entry = unflushedEntry;
if (entry != null) {
if (flushedEntry == null) {
// there is no flushedEntry yet, so start with the entry
flushedEntry = entry;
}
do {
flushed ++;
if (!entry.promise.setUncancellable()) {
// Was cancelled so make sure we free up memory and notify about the freed bytes
int pending = entry.cancel();
decrementPendingOutboundBytes(pending, false, true);
}
entry = entry.next;
} while (entry != null); // All flushed so reset unflushedEntry
unflushedEntry = null;
}
}
// io.netty.channel.nio.AbstractNioChannel.AbstractNioUnsafe#flush0
@Override
protected final void flush0() {
// Flush immediately only when there's no pending flush.
// If there's a pending flush operation, event loop will call forceFlush() later,
// and thus there's no need to call it now.
// 第一交进入此处,将会立即注册一个 OP_WRITE 事件,以触发写
if (!isFlushPending()) {
super.flush0();
}
}
private boolean isFlushPending() {
SelectionKey selectionKey = selectionKey();
return selectionKey.isValid() && (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0;
}
// io.netty.channel.AbstractChannel.AbstractUnsafe#flush0
@SuppressWarnings("deprecation")
protected void flush0() {
if (inFlush0) {
// Avoid re-entrance
return;
} final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null || outboundBuffer.isEmpty()) {
return;
} inFlush0 = true; // Mark all pending write requests as failure if the channel is inactive.
if (!isActive()) {
try {
if (isOpen()) {
outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true);
} else {
// Do not trigger channelWritabilityChanged because the channel is closed already.
outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
}
} finally {
inFlush0 = false;
}
return;
} try {
doWrite(outboundBuffer);
} catch (Throwable t) {
if (t instanceof IOException && config().isAutoClose()) {
/**
* Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
* failing all flushed messages and also ensure the actual close of the underlying transport
* will happen before the promises are notified.
*
* This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
* may still return {@code true} even if the channel should be closed as result of the exception.
*/
close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
} else {
try {
shutdownOutput(voidPromise(), t);
} catch (Throwable t2) {
close(voidPromise(), t2, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
}
}
} finally {
inFlush0 = false;
}
} // io.netty.channel.socket.nio.NioSocketChannel#doWrite
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
SocketChannel ch = javaChannel();
int writeSpinCount = config().getWriteSpinCount();
do {
if (in.isEmpty()) {
// All written so clear OP_WRITE
clearOpWrite();
// Directly return here so incompleteWrite(...) is not called.
return;
} // Ensure the pending writes are made of ByteBufs only.
int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();
ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
int nioBufferCnt = in.nioBufferCount(); // Always us nioBuffers() to workaround data-corruption.
// See https://github.com/netty/netty/issues/2761
switch (nioBufferCnt) {
case 0:
// We have something else beside ByteBuffers to write so fallback to normal writes.
writeSpinCount -= doWrite0(in);
break;
case 1: {
// Only one ByteBuf so use non-gathering write
// Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
// to check if the total size of all the buffers is non-zero.
ByteBuffer buffer = nioBuffers[0];
int attemptedBytes = buffer.remaining();
// 向socket中写入数据,完事,写入多少数据量返回,以便判定是否写完
final int localWrittenBytes = ch.write(buffer);
if (localWrittenBytes <= 0) {
incompleteWrite(true);
return;
}
adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
in.removeBytes(localWrittenBytes);
// 减少可写次数,超过最大可写次数,退出
--writeSpinCount;
break;
}
default: {
// Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
// to check if the total size of all the buffers is non-zero.
// We limit the max amount to int above so cast is safe
long attemptedBytes = in.nioBufferSize();
final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
if (localWrittenBytes <= 0) {
incompleteWrite(true);
return;
}
// Casting to int is safe because we limit the total amount of data in the nioBuffers to int above.
adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes,
maxBytesPerGatheringWrite);
in.removeBytes(localWrittenBytes);
--writeSpinCount;
break;
}
}
} while (writeSpinCount > 0);
// 数据未写完,注册 OP_WRITE 事件
incompleteWrite(writeSpinCount < 0);
}
protected final void clearOpWrite() {
final SelectionKey key = selectionKey();
// Check first if the key is still valid as it may be canceled as part of the deregistration
// from the EventLoop
// See https://github.com/netty/netty/issues/2104
if (!key.isValid()) {
return;
}
final int interestOps = key.interestOps();
// 取消写事件监听
if ((interestOps & SelectionKey.OP_WRITE) != 0) {
key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
}
} // 获取 nioBufers ----------------------------------------------------
/**
* Returns an array of direct NIO buffers if the currently pending messages are made of {@link ByteBuf} only.
* {@link #nioBufferCount()} and {@link #nioBufferSize()} will return the number of NIO buffers in the returned
* array and the total number of readable bytes of the NIO buffers respectively.
* <p>
* Note that the returned array is reused and thus should not escape
* {@link AbstractChannel#doWrite(ChannelOutboundBuffer)}.
* Refer to {@link NioSocketChannel#doWrite(ChannelOutboundBuffer)} for an example.
* </p>
* @param maxCount The maximum amount of buffers that will be added to the return value.
* @param maxBytes A hint toward the maximum number of bytes to include as part of the return value. Note that this
* value maybe exceeded because we make a best effort to include at least 1 {@link ByteBuffer}
* in the return value to ensure write progress is made.
*/
public ByteBuffer[] nioBuffers(int maxCount, long maxBytes) {
assert maxCount > 0;
assert maxBytes > 0;
long nioBufferSize = 0;
int nioBufferCount = 0;
final InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
ByteBuffer[] nioBuffers = NIO_BUFFERS.get(threadLocalMap);
Entry entry = flushedEntry;
while (isFlushedEntry(entry) && entry.msg instanceof ByteBuf) {
if (!entry.cancelled) {
ByteBuf buf = (ByteBuf) entry.msg;
final int readerIndex = buf.readerIndex();
final int readableBytes = buf.writerIndex() - readerIndex; if (readableBytes > 0) {
if (maxBytes - readableBytes < nioBufferSize && nioBufferCount != 0) {
// If the nioBufferSize + readableBytes will overflow maxBytes, and there is at least one entry
// we stop populate the ByteBuffer array. This is done for 2 reasons:
// 1. bsd/osx don't allow to write more bytes then Integer.MAX_VALUE with one writev(...) call
// and so will return 'EINVAL', which will raise an IOException. On Linux it may work depending
// on the architecture and kernel but to be safe we also enforce the limit here.
// 2. There is no sense in putting more data in the array than is likely to be accepted by the
// OS.
//
// See also:
// - https://www.freebsd.org/cgi/man.cgi?query=write&sektion=2
// - http://linux.die.net/man/2/writev
break;
}
nioBufferSize += readableBytes;
int count = entry.count;
if (count == -1) {
//noinspection ConstantValueVariableUse
entry.count = count = buf.nioBufferCount();
}
int neededSpace = min(maxCount, nioBufferCount + count);
if (neededSpace > nioBuffers.length) {
nioBuffers = expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount);
NIO_BUFFERS.set(threadLocalMap, nioBuffers);
}
if (count == 1) {
ByteBuffer nioBuf = entry.buf;
if (nioBuf == null) {
// cache ByteBuffer as it may need to create a new ByteBuffer instance if its a
// derived buffer
entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes);
}
nioBuffers[nioBufferCount++] = nioBuf;
} else {
ByteBuffer[] nioBufs = entry.bufs;
if (nioBufs == null) {
// cached ByteBuffers as they may be expensive to create in terms
// of Object allocation
entry.bufs = nioBufs = buf.nioBuffers();
}
for (int i = 0; i < nioBufs.length && nioBufferCount < maxCount; ++i) {
ByteBuffer nioBuf = nioBufs[i];
if (nioBuf == null) {
break;
} else if (!nioBuf.hasRemaining()) {
continue;
}
nioBuffers[nioBufferCount++] = nioBuf;
}
}
if (nioBufferCount == maxCount) {
break;
}
}
}
entry = entry.next;
}
this.nioBufferCount = nioBufferCount;
this.nioBufferSize = nioBufferSize; return nioBuffers;
}
// 未写完数据的处理: 注册OP_WRITE事件让后续eventloop处理
// io.netty.channel.nio.AbstractNioByteChannel#incompleteWrite
protected final void incompleteWrite(boolean setOpWrite) {
// Did not write completely.
if (setOpWrite) {
setOpWrite();
} else {
// It is possible that we have set the write OP, woken up by NIO because the socket is writable, and then
// use our write quantum. In this case we no longer want to set the write OP because the socket is still
// writable (as far as we know). We will find out next time we attempt to write if the socket is writable
// and set the write OP if necessary.
clearOpWrite(); // Schedule flush again later so other tasks can be picked up in the meantime
eventLoop().execute(flushTask);
}
} // io.netty.channel.nio.AbstractNioByteChannel#setOpWrite
protected final void setOpWrite() {
final SelectionKey key = selectionKey();
// Check first if the key is still valid as it may be canceled as part of the deregistration
// from the EventLoop
// See https://github.com/netty/netty/issues/2104
if (!key.isValid()) {
return;
}
final int interestOps = key.interestOps();
// 如果数据未被写完整,则主动注册写事件监听,让 eventloop 去处理
if ((interestOps & SelectionKey.OP_WRITE) == 0) {
key.interestOps(interestOps | SelectionKey.OP_WRITE);
}
}
如上,写数据的过程理论都是通用,都会先向应用缓冲中写入数据,然后再进行flush. netty 使用 DirectByteBuffer 进行写入优化,使用eventloop保证写入的完整性和及时性。
本文通过netty 对网络事件的处理过程,对通用网络io处理实现方式的理解必然有所加深呢。