本文参考
参考《Spark快速大数据分析》动物书中的第六章"ChannelHandler和ChannelPipeline",主要内容为ChannelHandler API 、ChannelPipeline API、检测资源泄漏和异常处理
这一篇文章讲到的内容,在前面几篇文章中或多或少已有涉及,那些重复的部分算作是回顾吧
Channel的生命周期
registered(Channel已经被注册到EventLoop) -> active(Channel处于活动状态,已经连接到远程节点,可以接收和发送数据) - > inactive(Channel没有连接到远程节点)-> unregistered(Channel已经被创建,但还未注册到EventLoop或已从EventLoop中注销)
当这些状态发生改变时,将会生成对应的事件,这些事件将会被转发给ChannelPipeline中的ChannelHandler,如ChannelInboundHandler中的channelRegistered()、channelActive()、channelInactive()、channelUnregistered()方法
ChannelHandler生命周期
handlerAdded() -> 当把ChannelHandler添加到ChannelPipeline中时被调用
handlerRemoved() -> 当从ChannelPipeline中移除ChannelHandler时被调用
exceptionCaught() -> 当处理过程中在ChannelPipeline中有错误产生时被调用
在ChannelHandler 被添加到ChannelPipeline中或者被从ChannelPipeline中移除时会调用这些操作
ChannelInboundHandler接口 API
ChannelHandler which adds callbacks for state changes. This allows the user to hook in to state changes easily
下面是ChannelInboundHandler的生命周期方法。这些方法将会在数据被接收时或者与其对应的Channel状态发生改变时被调用。正如Channel生命周期中所提到的,这些方法和Channel的生命周期密切相关
我们在Netty客户端 / 服务端一文中提到由一方发送的消息可能会被分块接收,channelRead()方法可能会被调用多次,channelReadComplete()方法仅会被调用一次。因此在服务端的业务实现中,我们在channelRead()方法内仅使用write()方法,将接收到的消息写给发送者,而不冲刷出站消息,而是在channelReadComplete()方法内使用writeAndFlush()方法冲刷消息
当某个ChannelInboundHandler的实现重写channelRead()方法时,它将负责显式地释放与池化的 ByteBuf 实例相关的内存。Netty 为此提供了一个实用方法 ReferenceCountUtil.release()。该操作已经在SimpleChannelInboundHandler的channelRead()方法中实现,我们只需重载channelRead0()方法,所以也就不能够存储指向任何消息的引用供将来使用,因为这些引用都将会失效
?
ChannelOutboundHandler 接口 API
ChannelHandler which will get notified for IO-outbound-operations
ChannelOutboundHandler 的一个强大的功能是可以按需推迟操作或者事件,这使得可以通过一些复杂的方法来处理请求。例如,如果到远程节点的写入被暂停了,那么你可以推迟冲刷操作并在稍后继续
ChannelOutboundHandler中的大部分方法都需要一个 ChannelPromise参数,以便在操作完成时得到通知。ChannelPromise是ChannelFuture的一个子类,其定义了一些可写的方法,如setSuccess()和setFailure(),当一个 Promise 被完成之后,其对应的 Future 的值便 不能再进行任何修改了
?
ChannelHandlerAdapter适配器
Skeleton implementation of a ChannelHandler
ChannelHandlerAdapter抽象类实现了Channel接口的isSharable()方法,如果其对应的实现被标 注为 Sharable,那么这个方法将返回 true,表示它可以被添加到多个 ChannelPipeline 中
ChannelInboundHandlerAdapter 和ChannelOutboundHandlerAdapter这两个适配器分别提供了ChannelInboundHandler 和ChannelOutboundHandler的基本实现,也通过扩展抽象类ChannelHandlerAdapter获得了它们共同的超接口ChannelHandler的方法
?
资源管理
每当通过调用 ChannelInboundHandler.channelRead()或者 ChannelOutboundHandler.write()方法来处理数据时,都需要确保没有任何的资源泄漏
若ChannelInBoundHandlerAdapter的channelRead()方法直接消费入站消息,不会通过调用 ChannelHandlerContext.fireChannelRead() 方法将入站消息转发给下一个ChannelInboundHandler时,应当通过ReferenceCountUtil.release(msg);释放资源
若ChannelOutBoundHandlerAdapter的write()方法的响应操作完成,不会传递给 ChannelPipeline 中的下一个 ChannelOutboundHandler时,注意不仅要通过ReferenceCountUtil.release(msg);释放资源,还要通知 ChannelPromise数据已经被处理,即promise.setSuccess();
Netty提供了class ResourceLeakDetector 对应用程序的缓冲区分配做大约1%的采样来检测内存泄露
泄露检测级别可以通过将下面的 Java 系统属性设置为表中的一个值来定义
java -Dio.netty.leakDetectionLevel=ADVANCED
Running io.netty.handler.codec.xml.XmlFrameDecoderTest
15:03:36.886 [main] ERROR io.netty.util.ResourceLeakDetector - LEAK:
ByteBuf.release() was not called before it‘s garbage-collected.
Recent access records: 1 #1: io.netty.buffer.AdvancedLeakAwareByteBuf.toString(AdvancedLeakAwareByteBuf.java:697) io.netty.handler.codec.xml.XmlFrameDecoderTest.testDecodeWithXml(XmlFrameDecoderTest.java:157) io.netty.handler.codec.xml.XmlFrameDecoderTest.testDecodeWithTwoMessages(XmlFrameDecoderTest.java:133) ...
?
ChannelPipeline接口操作ChannelHandler API
在前面的学习中我们已经知道ChannelPipeline是一个拦截流经Channel的入站和出站事件的ChannelHandler 实例链
每一个新创建的Channel都将会被分配一个新的ChannelPipeline。这项关联是永久性的;Channel 既不能附加另外一个 ChannelPipeline,也不能分离其当前的。
但是ChannelHandler 可以通过添加、删除或者替换其他的 ChannelHandler 来实时地修改 ChannelPipeline的布局(也可以将它自己从ChannelPipeline中移除)
ChannelPipeline pipeline = channelInstance.pipeline();
//创建一个 FirstHandler 的实例
FirstHandler firstHandler = new FirstHandler();
//将该实例作为"handler1"添加到ChannelPipeline 中
pipeline.addLast("handler1", firstHandler);
//将一个 SecondHandler的实例作为"handler2"添加到 ChannelPipeline的第一个槽中
// 这意味着它将被放置在已有的"handler1"之前
pipeline.addFirst("handler2", new SecondHandler());
//将一个 ThirdHandler 的实例作为"handler3"添加到 ChannelPipeline 的最后一个槽中
pipeline.addLast("handler3", new ThirdHandler());
//通过名称移除"handler3"
pipeline.remove("handler3");
//通过引用移除FirstHandler(它是唯一的,所以不需要它的名称)
pipeline.remove(firstHandler);
//将 SecondHandler("handler2")替换为 FourthHandler:"handler4"
pipeline.replace("handler2", "handler4", new FourthHandler());
除了这些操作,还有别的通过类型或者名称来访问ChannelHandler的方法。
?
ChannelPipeline 接口入站事件API
?
ChannelPipeline 接口出站事件API
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?