Netty-源码分析ChannelFutureListener添加异步回调事件

ChannelHandlerContext.write() (或者writeAndFlush()) 方法返回ChannelFuture对象,一个ChannelFuture对象代表尚未发生的IO操作,因为在Netty中所有的操作都是异步的,下面的方法可能会在发送消息之前关闭连接。

Channel ch = ...;
ch.writeAndFlush(message);
ch.close();

因为在Netty中操作是异步的,ch.writeAndFlush(message)也是异步的,该方法只是把发送消息加入了任务队列,这时直接关闭连接会导致问题。所以我们需要在消息发送完毕后在去关闭连接。

final ChannelFuture f = ctx.writeAndFlush(time);
  f.addListener(new ChannelFutureListener() {
       @Override
       public void operationComplete(ChannelFuture future) {
            ctx.close();
    }
});

通过ChannelFuture我们可以添加Listener,那么在消息发送完成后会进行回调,我们再去处理关闭连接等业务逻辑。

 

下面给出添加Listener的源码分析

@Override
    public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
        checkNotNull(listener, "listener");
        //把listener对象加入到数组中
        synchronized (this) {
            addListener0(listener);
        }

        //判断当前任务是否已经完成,如完成这里直接触发回调
        if (isDone()) {
            notifyListeners();
        }

        return this;
    }

addListener0方法,负责把用户一个一个添加的listener对象转换为数组结构DefaultFutureListeners,存储到listeners 成员变量

    private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
        if (listeners == null) {
            listeners = listener;
        } else if (listeners instanceof DefaultFutureListeners) {
            ((DefaultFutureListeners) listeners).add(listener);
        } else {
            listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners, listener);
        }
    }

 

notifyListeners方法,将通知回调任务添加到eventloop当中,那么eventloop当中的任务顺序就是- ctx.writeAndFlush(time),notifyListeners,这就保证了再发送消息完毕后,会执行notifyListeners去回调监听器

  private void notifyListeners() {
        EventExecutor executor = executor();
        if (executor.inEventLoop()) {
            final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
            final int stackDepth = threadLocals.futureListenerStackDepth();
            if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
                threadLocals.setFutureListenerStackDepth(stackDepth + 1);
                try {
                    notifyListenersNow();
                } finally {
                    threadLocals.setFutureListenerStackDepth(stackDepth);
                }
                return;
            }
        }

        safeExecute(executor, new Runnable() {
            @Override
            public void run() {
                notifyListenersNow();
            }
        });
    }

 

上一篇:SpringBoot2---Web原生组件注入(Servlet、Filter、Listener)和嵌入式Servlet容器


下一篇:Vue3.0 declare it using the "emits" option警告