Inbound事件传播

ChannelHandler继承体系

Inbound事件传播

ChannelHandler:所有逻辑处理器的抽象。

public interface ChannelHandler {

    // handler添加完成回调
    void handlerAdded(ChannelHandlerContext ctx) throws Exception;

    // handler删除完成回调
    void handlerRemoved(ChannelHandlerContext ctx) throws Exception;

    // 异常回调
    @Deprecated
    void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;

    // 指定handler为共享handler,可重复添加到pipeline
    @Inherited
    @Documented
    @Target(ElementType.TYPE)
    @Retention(RetentionPolicy.RUNTIME)
    @interface Sharable {
        // no value
    }
}

 ChannelInboundHandler:基于ChannelHandler,扩展了一些Inbound事件。

public interface ChannelInboundHandler extends ChannelHandler {

    // channel注册回调,当channel注册到NioEventLoop对应的Selector是触发该回调
    void channelRegistered(ChannelHandlerContext ctx) throws Exception;

    // channel注销回调
    void channelUnregistered(ChannelHandlerContext ctx) throws Exception;

    // channel激活回调
    void channelActive(ChannelHandlerContext ctx) throws Exception;

    // channel失效回调
    void channelInactive(ChannelHandlerContext ctx) throws Exception;

    // 读到数据时执行该方法
    void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;

    // 数据读完的回调
    void channelReadComplete(ChannelHandlerContext ctx) throws Exception;

    // 用户自定义的触发事件
    void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;

    // 改变channel的可写状态
    void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;

    // 异常捕获
    @Override
    @SuppressWarnings("deprecation")
    void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}

事件传播过程

定义三个自定义的InBoundHandler类:

InBoundHandlerA

public class InBoundHandlerA extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("InBoundHandlerA: " + msg);
        ctx.fireChannelRead(msg);
    }
}

InBoundHandlerB 

public class InBoundHandlerB extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("InBoundHandlerB: " + msg);
        ctx.fireChannelRead(msg);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        System.out.println("InBoundHandlerB: channelActive");
        ctx.channel().pipeline().fireChannelRead("hello world");
    }
}

 InBoundHandlerC

public class InBoundHandlerC extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("InBoundHandlerC: " + msg);
        ctx.fireChannelRead(msg);
    }
}

 在启动类中添加这三个handler

                    // 配置业务处理链 handler pipeline
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) {
                            ch.pipeline().addLast(new InBoundHandlerA());
                            ch.pipeline().addLast(new InBoundHandlerB());
                            ch.pipeline().addLast(new InBoundHandlerC());
                        }
                    });

启动服务端,通过“telnet 127.0.0.1 8888”测试一下调用结果:

Inbound事件传播

因为InBoundHandlerB实现了channelActive方法,所以在channel被激活之后会首先调用该方法。

在该方法中,我们通过 ctx.channel().pipeline().fireChannelRead("hello world") 进行事件传播,传播顺序为A->B->C,与添加时的顺序是一样的。

fireChannelRead是从head节点或当前节点开始传播,先找到下一个inbound节点,然后再执行其channelRead方法,最后由tail节点做一些收尾工作并释放。

    @Override
    public final ChannelPipeline fireChannelRead(Object msg) {
        AbstractChannelHandlerContext.invokeChannelRead(head, msg);
        return this;
    }
    
    @Override
    public ChannelHandlerContext fireChannelRead(final Object msg) {
        // findContextInbound方法遍历链表,寻找下一个inbound节点,传播事件
        invokeChannelRead(findContextInbound(), msg);
        return this;
    }

    static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
        final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelRead(m);
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelRead(m);
                }
            });
        }
    }

    private void invokeChannelRead(Object msg) {
        if (invokeHandler()) {
            try {
                // 调用channelRead方法
                ((ChannelInboundHandler) handler()).channelRead(this, msg);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            fireChannelRead(msg);
        }
    }

 

SimpleChannelInboundHandler

在上面了解到的事件传播机制中,资源的释放都是由tail节点完成的。如果由于某种原因,资源没有被传递到tail节点,这时就需要节点自己释放资源。

SimpleChannelInboundHandler提供了对channelRead方法的增强,封装了资源释放的逻辑。

当我们自定义的handler节点需要自己释放资源时,可以通过继承SimpleChannelInboundHandler来实现。

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        boolean release = true;
        try {
            if (acceptInboundMessage(msg)) {
                @SuppressWarnings("unchecked")
                I imsg = (I) msg;
                channelRead0(ctx, imsg);
            } else {
                release = false;
                ctx.fireChannelRead(msg);
            }
        } finally {
            if (autoRelease && release) {
                ReferenceCountUtil.release(msg);
            }
        }
    }

 

Inbound事件传播Inbound事件传播 KeepMoving++ 发布了151 篇原创文章 · 获赞 101 · 访问量 19万+ 私信 关注
上一篇:[论文阅读笔记43]Hierarchical Multi-Label Classification Networks


下一篇:把Cloud for Customer的attachment视图直接放到工作中心视图去