ChannelPipeline 和 ChannelHandler 是 Netty 重要的组件之一,通过这篇文章,重点了解这些组件是如何驱动数据流动和处理的。
一、ChannelHandler
在上一篇的整体架构图里可以看到,ChannelHandler 负责处理入站和出站的数据。对于入站和出站,ChannelHandler 由不同类型的 Handler 进行处理。下面通过一个示例来演示,将上一篇文章里的 Demo 做一些修改:
增加以下类:
// OneChannelInBoundHandler.java
package com.niklai.demo.handler.inbound;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class OneChannelInBoundHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = LoggerFactory.getLogger(OneChannelInBoundHandler.class.getSimpleName());
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
logger.info("channel active.....");
ctx.fireChannelActive();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
logger.info("read message: {}....", buf.toString(CharsetUtil.UTF_8));
ctx.fireChannelRead(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.write(Unpooled.copiedBuffer("OneChannelInBoundHandler answer...", CharsetUtil.UTF_8));
ctx.fireChannelReadComplete();
}
}
// TwoChannelInBoundHandler.java
package com.niklai.demo.handler.inbound;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TwoChannelInBoundHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = LoggerFactory.getLogger(TwoChannelInBoundHandler.class.getSimpleName());
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
logger.info("channel active.....");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
logger.info("read message: {}....", buf.toString(CharsetUtil.UTF_8));
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.write(Unpooled.copiedBuffer("TwoChannelInBoundHandler answer...", CharsetUtil.UTF_8));
ctx.close().addListener(ChannelFutureListener.CLOSE);
}
}
// OneChannelOutBoundHandler.java
package com.niklai.demo.handler.outbound;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.util.CharsetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class OneChannelOutBoundHandler extends ChannelOutboundHandlerAdapter {
private static final Logger logger = LoggerFactory.getLogger(OneChannelOutBoundHandler.class.getSimpleName());
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ByteBuf buf = (ByteBuf) msg;
logger.info("write msg: {}.....", buf.toString(CharsetUtil.UTF_8));
ctx.writeAndFlush(msg, promise);
}
}
修改 Server.java 类初始化的 childHandler 逻辑:
// Server.java
// 省略部分代码
public static void init() {
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
NioEventLoopGroup group = new NioEventLoopGroup();
serverBootstrap.group(group)
.channel(NioServerSocketChannel.class)
.localAddress(new InetSocketAddress("localhost", 9999))
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 添加ChannelHandler
socketChannel.pipeline().addLast(new OneChannelOutBoundHandler());
socketChannel.pipeline().addLast(new OneChannelInBoundHandler());
socketChannel.pipeline().addLast(new TwoChannelInBoundHandler());
}
});
ChannelFuture future = serverBootstrap.bind().sync();
future.channel().closeFuture().sync();
group.shutdownGracefully().sync();
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
}
// 省略部分代码
在上面的例子里,我们声明了 OneChannelInBoundHandler 和 TwoChannelInBoundHandler 两个类继承 ChannelInBoundHandlerAdapter 处理入站数据,一个 OneChannelOutBoundHandler 类继承 ChannelOutBoundHandlerAdapter 处理出站数据,依次添加到 ChannelPipeline 里。两个 ChannelInBoundHandler 类都重写了 channelActive、channelRead 和 channelReadComplete 方法,OneChannelOutBoundHandler 类重写了 write 方法。
运行单元测试,控制台得到如下结果:
通过日志输出结果,我们可以看到 Client 发送消息后,OneChannelInBoundHandler 的 channelRead 方法被触发先获得消息内容,调用 ctx.fireChannelRead(msg)方法后 TwoChannelInBoundHandler 的 channelRead 方法被触发再次获得到消息内容,此时消息已经到达队尾。在 channelReadComplete 方法里调用 ctx.write(obj)方法依次写入应答消息后,消息将会反向出站,OneChannelOutBoundHandler 的 write 被触发获得应答消息内容,在这个方法里调用 ctx.writeAndFlush(msg, promise)将应答消息继续发送出去。
注意两个 ChannelInBoundHandler 获取消息是有先后顺序的,顺序取决于添加到 ChannelPipeline 的先后,并且只有当前 ChannelInBoundHandler 的 channelRead 方法里调用了 ctx.fireChannelRead(msg)方法后,消息才能被传递到后面的 ChannelInBoundHandler 的 channelRead 方法,channelReadComplete 方法同理。而在出站时,ChannelOutBoundHandler 的 write 方法会获取到将要写出的消息,可以选择是否对消息进行再次处理后发送出去。
ChannelHandler 相关的类关系图如下,ChannelInBoundHandlerAdapter 和 ChannelOutBoundHandlerAdapter 分别实现了 ChannelInBoundHandler 和 ChannelOutBoundHandler。接口一般通过继承 ChannelInBoundHandlerAdapter 和 ChannelOutBoundHandlerAdapter 来实现业务数据处理:
以下两个接口部分事件方法,更多方法可以查阅官方文档
ChannelInBoundHandler
方法 | 描述 |
---|---|
channelActive | Channel 已经连接就绪时被调用 |
channelRead | 当从 Channel 读取数据时被调用 |
channelReadComplete | 当 Channel 的读取操作完成时被调用 |
exceptionCaught | 当入站事件处理过程中出现异常时被调用 |
ChannelOutBoundHandler
方法 | 描述 |
---|---|
write | 当通过 Channel 写数据时被调用 |
read | 当从 Channel 读取数据时被调用 |
二、ChannelPipeline
从上面的例子可以看到,加入到 ChannelPipeline 的一系列 ChannelHandler 组成了一个有序的链。每一个新创建的 Channel 都将被分配一个 ChannelPipeline,Channel 不能自己附加另外一个 ChannelPipeline,也不能取消当前的,这个是由框架决定的,不需要开发人员干预。
从上图可以看到,事件消息会从头部传递到尾部,然后再从尾部传递到头部。在传递过程中,将会识别 ChannelHandler 的类型,入站事件由 InBoundHandler 处理,出站事件由 OutBoundHandler 处理,如果传递到下一个 ChannelHandler 时发现类型与当前方向不匹配,将会直接跳过并前进到下一个。如果某个 ChannelHandler 同时实现了 ChannelInBoundHandler 和 ChannelOutBundHandler 接口,那么当前 ChannelHandler 将会同时处理入站和出站事件。
以下是 ChannelPipeline 的一些主要方法:
方法 | 说明 |
---|---|
addFirst addLast |
添加 ChannelHander 到当前 ChannelPipeline 的头/尾部 |
addBefore addAfter |
添加 ChannelHander 到当前 ChannelPipeline 里某个 ChannelHandler 的前/后面 |
remove | 将某个 ChannelHandler 从当前 ChannelPipeline 里移除 |
replace | 将当前 ChannelPipeline 里的某个 ChannelHandler 替换成另外一个 ChannelHandler |
除此之外,ChannelPipeline 也有一些触发事件的方法,以下列出跟当前演示例子相关的事件方法,更多方法可以查阅官方文档
方法 | 描述 |
---|---|
fireChannelActive | 调用 ChannelPipeline 里下一个 ChannelInBoundHandler 的 channelActive 方法 |
fireChannelRead | 调用 ChannelPipeline 里下一个 ChannelInBoundHandler 的 ChannelRead 方法 |
write | 调用 ChannelPipeline 里下一个 ChannelOutBoundHandler 的 write 方法 |
我们修改一下 Demo 中的例子:
// OneChannelInBoundHandler.java
// 省略代码
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
logger.info("read message: {}....", buf.toString(CharsetUtil.UTF_8));
ctx.pipeline().fireChannelRead(msg); // 调用ChannelPipeline的fireChannelRead方法
}
// 省略代码
运行单元测试查看控制台日志,发现事件会反复触发 OneChannelInBoundHandler 的 channelRead 方法,直到死循环。对比之前的运行结果可以看到,ChannelPipeline 的 fireChannelRead 方法会将事件重新从头部开始向后传递,而 ctx.fireChannelRead 方法会将事件从当前的下一个 ChannelHandler 开始向后传递。
三、ChannelHandlerContext
ChannelHandlerContext 是一个接口,它维护了 ChannelHandler 和 ChannelPipeline 两者之间的关系。当一个 ChannelHandler 加入到 ChannelPipeline 里时,就会创建一个 ChannelHandlerContext 关联它们。下图展示了它们之间的关系,当调用 ChannelHandlerContext 的 fire...方法时,事件都将会被传递到它关联的 ChannelHandler 的下一个 ChannelHandler 上
ChannelHandlerContext 部分的 API 如下,更多 API 可以查阅官方文档
方法 | 描述 |
---|---|
pipeline | 获取关联的 ChannelPipeline |
handler | 获取关联的 ChannelHandler |
fireChannelRead | 触发下一个 ChannelInBoundHandler 的 channelRead 方法 |
四、异常处理
入站异常
如果在处理入站事件过程中发生了异常,则该异常将会从它所在的 ChannelInBoundHandler 开始传递直到 ChannelPipeline 尾部。通过重写 exceptionCaught 方法,可以处理异常。
修改一下 Demo,增加 exceptionCaught
// OneChannelInBoundHandler.java
// 省略部分代码
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.write(Unpooled.copiedBuffer("OneChannelInBoundHandler answer...", CharsetUtil.UTF_8));
ctx.fireChannelReadComplete();
throw new Exception("This is an exception!"); // 模拟抛出一个异常
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error("OneChannelInBoundHandler exception:{}....", cause.getMessage(), cause);
}
// 省略部分代码
运行测试,可以看到异常信息已经打印到控制台日志:
再次修改 Demo,调用 ChannelHandlerContext 的 fireExceptionCaught 方法将异常继续传递下去
// OneChannelInBoundHandler.java
// 省略部分代码
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error("OneChannelInBoundHandler exception:{}....", cause.getMessage(), cause);
ctx.fireExceptionCaught(cause); // 将异常传递下去
}
// 省略部分代码
// TwoChannelInBoundHandler.java
// 省略部分代码
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error("TwoChannelInBoundHandler exception:{}....", cause.getMessage(), cause);
}
// 省略部分代码
运行测试,查看控制台日志,两个 ChannelInBoundHandler 都会打印异常日志:
如果,两个 ChannelInBoundHandler 都不重写 exceptionCaught 方法处理异常,会怎样?修改 Demo,删除 exceptionCaught 后再次运行测试,查看控制台日志:
控制台输出一条日志信息:An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
如果异常发生但是没有被处理,异常将会一直传递到 ChannelPipeline 并记录为未处理异常,以 WARN 级别日志输出。
出站异常
出站操作的相关方法是异步的,处理异常信息都是基于通知机制。处理方式有两种:
第一种是通过在方法返回值上注册 listener:
// OneChannelOutBoundHandler.java
// 省略部分代码
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ByteBuf buf = (ByteBuf) msg;
logger.info("write msg: {}.....", buf.toString(CharsetUtil.UTF_8));
ctx.close(); // 在发送消息之前关闭channel,后序写入数据将会引发异常。
ChannelFuture channelFuture = ctx.writeAndFlush(msg);
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (!f.isSuccess()) {
logger.error("OneChannelOutBoundHandler cause:{}.......", f.cause().getMessage(), f.cause());
}
}
});
}
// 省略部分代码
第二种是在传入的参数 promise 上注册 listener:
// OneChannelOutBoundHandler.java
// 省略部分代码
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ByteBuf buf = (ByteBuf) msg;
logger.info("write msg: {}.....", buf.toString(CharsetUtil.UTF_8));
ctx.close(); // 在发送消息之前关闭channel,后序写入数据将会引发异常。
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (!f.isSuccess()) {
logger.error("OneChannelOutBoundHandler cause:{}.......", f.cause().getMessage(), f.cause());
}
}
});
ctx.writeAndFlush(msg, promise);
}
// 省略部分代码