2021SC@SDUSC
下面分析一下出站数据传播的细节。我们从ChannelOutboundHandlerAdapter的write方法开始分析:
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ctx.write(msg, promise);
}
如果想要实现自己的业务处理逻辑,需要继承ChannelOutboundHandlerAdapter并且重写write方法。处理完数据之后,还需要继续向下传递数据,也就是需要调用ctx.write(msg, promise)方法,它会调用AbstractChannelHandlerContext的等效方法,下面是一个它调用链路上的方法:
private void write(Object msg, boolean flush, ChannelPromise promise) {
AbstractChannelHandlerContext next = findContextOutbound();
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
AbstractWriteTask task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, m, promise);
} else {
task = WriteTask.newInstance(next, m, promise);
}
safeExecute(executor, task, promise, m);
}
}
这里最关键的是通过findContextOutbound方法来寻找下一个符合要求的handler:
private AbstractChannelHandlerContext findContextOutbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.prev;
} while (!ctx.outbound);
return ctx;
}
这里从当前的handler,顺着prev往回找,找下一个outband为真的handler。看到这个方法可能会觉得很眼熟,因为和上一次博客中分析的findContextInbound(MASK_CHANNEL_READ)方法很相似,这个方法在入站事件传播时用于寻找下一个符合要求的ChannelHandler:
private AbstractChannelHandlerContext findContextInbound(int mask) {
AbstractChannelHandlerContext ctx = this;
EventExecutor currentExecutor = executor();
do {
ctx = ctx.next;
} while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND));
return ctx;
}
其实write事件原理和上次分析的read事件类似,区别就是方向和目标不同。入站事件的方向是next方向,目标是InboundHandler。出站事件的方向与入站事件相反,是prev方向,也就是往回传播,目标是OutboundHandler。所以从代码中我们可以看到,这个寻找是顺着prev的方向往回找,也就是出站事件的传播方向为从后往前。
到这里我们明白了ChannelPipeline的事件传播方向。现在来考虑对于入站数据,如果传递到了tail节点,到头了,会有怎样的处理?对于出站数据,如果传递到了head节点,会有怎样的处理?
来看一下 invokeWriteAndFlush方法,它有一个判断 invokeHandler()方法,下面是它的细节:
private boolean invokeHandler() {
int handlerState = this.handlerState;
return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING);
}
我们重点关注如果 handlerState == ADD_COMPLETE这个条件成立的时候,会返回true,那么就会执行下面的方法:
invokeWrite0(msg, promise);
private void invokeWrite0(Object msg, ChannelPromise promise) {
try {
((ChannelOutboundHandler) handler()).write(this, msg, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
我们先来考虑这个判断条件什么时候会成立。这里需要回到我们 HeadContext和TailContext的构造函数上:
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, false, true);
unsafe = pipeline.channel().unsafe();
setAddComplete();
}
TailContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, TAIL_NAME, true, false);
setAddComplete();
}
final void setAddComplete() {
for (;;) {
int oldState = handlerState;
// Ensure we never update when the handlerState is REMOVE_COMPLETE already.
// oldState is usually ADD_PENDING but can also be REMOVE_COMPLETE when an EventExecutor is used that is not
// exposing ordering guarantees.
if (oldState == REMOVE_COMPLETE || HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
return;
}
}
}
这里我们可以看到两个构造方法后面都会调用setAddComplete()方法,这个方法的代码我直接贴在后面了。这里表达的意思是进入一个无限循环,我们不会改变更新,直到oldState的状态改变为REMOVE_COMPLETE。然后会执行 HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)这个原子操作,将handlerState设置为ADD_COMPLETE。这样上面函数的成立条件就解释清楚了。
然后我们来看之后执行的操作:
private void invokeWrite0(Object msg, ChannelPromise promise) {
try {
((ChannelOutboundHandler) handler()).write(this, msg, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
我们先看handler()这个方法,这个方法是返回当前context。也就是说,会执行TailContext的等效方法。比如read,就会执行TailContext的channelRead方法:
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
onUnhandledInboundMessage(msg);
}
protected void onUnhandledInboundMessage(Object msg) {
try {
logger.debug(
"Discarded inbound message {} that reached at the tail of the pipeline. " +
"Please check your pipeline configuration.", msg);
} finally {
ReferenceCountUtil.release(msg);
}
}
可以看到消息传递到tail之后,就不会再传递下去了,并且会释放它。出站事件也是类似,当事件传递到head的时候,会调用HeadContext的等效方法,然后就不会再传递下去了。