ChannelHandlerContext.write()
(或者writeAndFlush()
) 方法返回ChannelFuture对象,一个ChannelFuture对象代表尚未发生的IO操作,因为在Netty中所有的操作都是异步的,下面的方法可能会在发送消息之前关闭连接。
Channel ch = ...;
ch.writeAndFlush(message);
ch.close();
因为在Netty中操作是异步的,ch.writeAndFlush(message)也是异步的,该方法只是把发送消息加入了任务队列,这时直接关闭连接会导致问题。所以我们需要在消息发送完毕后在去关闭连接。
final ChannelFuture f = ctx.writeAndFlush(time);
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
ctx.close();
}
});
通过ChannelFuture我们可以添加Listener,那么在消息发送完成后会进行回调,我们再去处理关闭连接等业务逻辑。
下面给出添加Listener的源码分析
@Override
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
checkNotNull(listener, "listener");
//把listener对象加入到数组中
synchronized (this) {
addListener0(listener);
}
//判断当前任务是否已经完成,如完成这里直接触发回调
if (isDone()) {
notifyListeners();
}
return this;
}
addListener0方法,负责把用户一个一个添加的listener对象转换为数组结构DefaultFutureListeners,存储到listeners 成员变量
private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
if (listeners == null) {
listeners = listener;
} else if (listeners instanceof DefaultFutureListeners) {
((DefaultFutureListeners) listeners).add(listener);
} else {
listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners, listener);
}
}
notifyListeners方法,将通知回调任务添加到eventloop当中,那么eventloop当中的任务顺序就是- ctx.writeAndFlush(time),notifyListeners,这就保证了再发送消息完毕后,会执行notifyListeners去回调监听器
private void notifyListeners() {
EventExecutor executor = executor();
if (executor.inEventLoop()) {
final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
final int stackDepth = threadLocals.futureListenerStackDepth();
if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
threadLocals.setFutureListenerStackDepth(stackDepth + 1);
try {
notifyListenersNow();
} finally {
threadLocals.setFutureListenerStackDepth(stackDepth);
}
return;
}
}
safeExecute(executor, new Runnable() {
@Override
public void run() {
notifyListenersNow();
}
});
}