Netty之责任链模式的过滤链实现源码分析(二)

2021SC@SDUSC

下面分析一下出站数据传播的细节。我们从ChannelOutboundHandlerAdapter的write方法开始分析:


    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
       ctx.write(msg, promise);
   }

如果想要实现自己的业务处理逻辑,需要继承ChannelOutboundHandlerAdapter并且重写write方法。处理完数据之后,还需要继续向下传递数据,也就是需要调用ctx.write(msg, promise)方法,它会调用AbstractChannelHandlerContext的等效方法,下面是一个它调用链路上的方法:


    private void write(Object msg, boolean flush, ChannelPromise promise) {
       AbstractChannelHandlerContext next = findContextOutbound();
       final Object m = pipeline.touch(msg, next);
       EventExecutor executor = next.executor();
       if (executor.inEventLoop()) {
           if (flush) {
               next.invokeWriteAndFlush(m, promise);
           } else {
               next.invokeWrite(m, promise);
           }
       } else {
           AbstractWriteTask task;
           if (flush) {
               task = WriteAndFlushTask.newInstance(next, m, promise);
           }  else {
               task = WriteTask.newInstance(next, m, promise);
           }
           safeExecute(executor, task, promise, m);
       }
   }

 

这里最关键的是通过findContextOutbound方法来寻找下一个符合要求的handler:

   private AbstractChannelHandlerContext findContextOutbound() {
       AbstractChannelHandlerContext ctx = this;
       do {
           ctx = ctx.prev;
       } while (!ctx.outbound);
       return ctx;
   }

这里从当前的handler,顺着prev往回找,找下一个outband为真的handler。看到这个方法可能会觉得很眼熟,因为和上一次博客中分析的findContextInbound(MASK_CHANNEL_READ)方法很相似,这个方法在入站事件传播时用于寻找下一个符合要求的ChannelHandler:

    private AbstractChannelHandlerContext findContextInbound(int mask) {
        AbstractChannelHandlerContext ctx = this;
        EventExecutor currentExecutor = executor();
        do {
            ctx = ctx.next;
        } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND));
        return ctx;
    }

其实write事件原理和上次分析的read事件类似,区别就是方向和目标不同。入站事件的方向是next方向,目标是InboundHandler。出站事件的方向与入站事件相反,是prev方向,也就是往回传播,目标是OutboundHandler。所以从代码中我们可以看到,这个寻找是顺着prev的方向往回找,也就是出站事件的传播方向为从后往前。

到这里我们明白了ChannelPipeline的事件传播方向。现在来考虑对于入站数据,如果传递到了tail节点,到头了,会有怎样的处理?对于出站数据,如果传递到了head节点,会有怎样的处理?

来看一下 invokeWriteAndFlush方法,它有一个判断 invokeHandler()方法,下面是它的细节:

    private boolean invokeHandler() {
       int handlerState = this.handlerState;
       return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING);
   }

我们重点关注如果 handlerState == ADD_COMPLETE这个条件成立的时候,会返回true,那么就会执行下面的方法:


           invokeWrite0(msg, promise);


  
private void invokeWrite0(Object msg, ChannelPromise promise) {
       try {
           ((ChannelOutboundHandler) handler()).write(this, msg, promise);
       } catch (Throwable t) {
           notifyOutboundHandlerException(t, promise);
       }
   }

我们先来考虑这个判断条件什么时候会成立。这里需要回到我们 HeadContext和TailContext的构造函数上:


        HeadContext(DefaultChannelPipeline pipeline) {
           super(pipeline, null, HEAD_NAME, false, true);
           unsafe = pipeline.channel().unsafe();
           setAddComplete();

       }
       
       TailContext(DefaultChannelPipeline pipeline) {
           super(pipeline, null, TAIL_NAME, true, false);
           setAddComplete();

       }        

    final void setAddComplete() {
       for (;;) {
           int oldState = handlerState;
           // Ensure we never update when the handlerState is REMOVE_COMPLETE already.
           // oldState is usually ADD_PENDING but can also be REMOVE_COMPLETE when an EventExecutor is used that is not
           // exposing ordering guarantees.
           if (oldState == REMOVE_COMPLETE || HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
               return;
           }
       }
   }

这里我们可以看到两个构造方法后面都会调用setAddComplete()方法,这个方法的代码我直接贴在后面了。这里表达的意思是进入一个无限循环,我们不会改变更新,直到oldState的状态改变为REMOVE_COMPLETE。然后会执行 HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)这个原子操作,将handlerState设置为ADD_COMPLETE。这样上面函数的成立条件就解释清楚了。

然后我们来看之后执行的操作:

​

private void invokeWrite0(Object msg, ChannelPromise promise) {
       try {
           ((ChannelOutboundHandler) handler()).write(this, msg, promise);
       } catch (Throwable t) {
           notifyOutboundHandlerException(t, promise);
       }
   }

​

我们先看handler()这个方法,这个方法是返回当前context。也就是说,会执行TailContext的等效方法。比如read,就会执行TailContext的channelRead方法:


        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
           onUnhandledInboundMessage(msg);
       }

    protected void onUnhandledInboundMessage(Object msg) {
       try {
           logger.debug(
                   "Discarded inbound message {} that reached at the tail of the pipeline. " +
                           "Please check your pipeline configuration.", msg);
       } finally {
           ReferenceCountUtil.release(msg);
       }
   }

可以看到消息传递到tail之后,就不会再传递下去了,并且会释放它。出站事件也是类似,当事件传递到head的时候,会调用HeadContext的等效方法,然后就不会再传递下去了。

上一篇:A complete log of this run can be found in & no such file or directory


下一篇:rancher 管理容器集群部署