1. ChannelPipeline、ChannelHandler、ChannelHandlerContext 的关系
1. 每创建一个Socket 就会分配一个全新的ChannelPipeline (简称pipeline)
2. 每一个 ChannelPipeline 内部包含多个 ChannelHandlerContext (简称Context)
3. 他们一起组成了一个双向链表,这些Context 用于封装我们调用addLast 时添加的Channelhandler(以下简称Handler)
也就是说ChannelSocket 和 ChannelPipeline 是一对一的关系,而pipeline内部的多个Context 行成了链表,Context 只是对Handler 的封装。
当一个请求进来时会进入socket 对应的pipeline,并经过pipeline 所有的handler。 可以理解为过滤器模式。
2. 设计
1. ChannelPipeline 设计
该接口继承了ChannelInboundInvoker、 ChannelOutboundInvoker、 Iterable 接口。 标识可以调用数据出站的方法和入站的方法, 同时也能遍历内部的链表。
public interface ChannelPipeline extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable<Entry<String, ChannelHandler>> {
处理过程如下:
(1)入站事件由入站处理程序自下而上的方向处理。入站处理程序通常处理由地步的IO线程生成入站数据,入站数据通常从SocketChannel#read(ByteBuffer) 获取
(2) 通常一个pipeline 有多个handler。例如一个典型的服务器在每个通常的管道中都会有一下处理程序:
协议解码器-将二进制数据转换为Java 对象;
协议编码器-将java 对象转换为二进制数据
业务逻辑处理程序-执行实际业务逻辑
(3) 业务程序不能将线程阻塞,会影响IO的速度,进而影响整个Netty 程序的性能。如果业务程序很快,可以放在IO线程中,反之就需要异步执行。 或者在添加handler的时候添加一个线程池。
2. ChannelHandler 作用以及设计
(1) 源码
public interface ChannelHandler { // 当把channelHandler 添加到pipeline 时被调用 void handlerAdded(ChannelHandlerContext ctx) throws Exception; // 当从pipeline 移除时调用 void handlerRemoved(ChannelHandlerContext ctx) throws Exception; // 处理发生异常时调用 @Deprecated void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception; @Inherited @Documented @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @interface Sharable { // no value } }
(2) 作用: 作用是处理IO事件或拦截IO事件,并将其转发给下一个handler。 handler 处理事件是分入站和出站的(入站是说读取数据到程序处理的过程,出站是说写出数据到调用内核write方法写出去数据的过程)。两个方向的操作都是不同的,因此,netty 定义了两个子接口继承ChannelHandler。
入站:ChannelInboundHandler
出站: ChannelOutboundHandler
入站出站都可以处理的handler:ChannelDuplexHandler
public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implements ChannelOutboundHandler {
3. ChannelHandlerContext 作用
ChannelHandlerContext 同时继承了 ChannelInboundInvoker, ChannelOutboundInvoker。ChannelHandlerContext 也 定义了一些自己的方法。这些方法能够获取Context 上下文环境的对象,比如channel、executor、handler、pipeline, 内存分配器,关联的handler 是否被删除等信息。Context 就是包装了handler相关的一切,以方便Contex 可以在pipeline 方便的操作handler。
public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker {
如下自身的方法:
3. 创建过程
1. 任何一个ChannelSocket 创建的同时都会创建一个pipeline
io.netty.channel.AbstractChannel#AbstractChannel(io.netty.channel.Channel) 源码如下:
protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); } protected DefaultChannelPipeline newChannelPipeline() { return new DefaultChannelPipeline(this); }
io.netty.channel.DefaultChannelPipeline#DefaultChannelPipeline:
protected DefaultChannelPipeline(Channel channel) { this.channel = ObjectUtil.checkNotNull(channel, "channel"); succeededFuture = new SucceededChannelFuture(channel, null); voidPromise = new VoidChannelPromise(channel, true); tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head; }
可以看到链表有两个伪节点(头和尾)。
1》头节点:
io.netty.channel.DefaultChannelPipeline.TailContext (入站处理handler)
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler { TailContext(DefaultChannelPipeline pipeline) { super(pipeline, null, TAIL_NAME, true, false); setAddComplete(); } 。。。 }
2》伪节点:
HeadContext 是一个入站和出站都兼顾的handler
final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler { private final Unsafe unsafe; HeadContext(DefaultChannelPipeline pipeline) { super(pipeline, null, HEAD_NAME, false, true); unsafe = pipeline.channel().unsafe(); setAddComplete(); } 。。。 }
创建是在客户端建立连接时:
2. 当用户或系统内部调用pipeline的addXX 方法添加handler 时,都会创建一个包装这handler 的Context
io.netty.channel.DefaultChannelPipeline#addLast(io.netty.channel.ChannelHandler...)
@Override public final ChannelPipeline addLast(ChannelHandler... handlers) { return addLast(null, handlers); } @Override public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) { if (handlers == null) { throw new NullPointerException("handlers"); } for (ChannelHandler h: handlers) { if (h == null) { break; } addLast(executor, null, h); } return this; } @Override public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; synchronized (this) { checkMultiplicity(handler); newCtx = newContext(group, filterName(name, handler), handler); addLast0(newCtx); // If the registered is false it means that the channel was not registered on an eventloop yet. // In this case we add the context to the pipeline and add a task that will call // ChannelHandler.handlerAdded(...) once the channel is registered. if (!registered) { newCtx.setAddPending(); callHandlerCallbackLater(newCtx, true); return this; } EventExecutor executor = newCtx.executor(); if (!executor.inEventLoop()) { newCtx.setAddPending(); executor.execute(new Runnable() { @Override public void run() { callHandlerAdded0(newCtx); } }); return this; } } callHandlerAdded0(newCtx); return this; } private void addLast0(AbstractChannelHandlerContext newCtx) { AbstractChannelHandlerContext prev = tail.prev; newCtx.prev = prev; newCtx.next = tail; prev.next = newCtx; tail.prev = newCtx; }
解释:
1. io.netty.channel.DefaultChannelPipeline#checkMultiplicity 检查该实例是否是共享的,如果不是并且已经被别的pipeline 使用了,则抛出异常
2. 调用io.netty.channel.DefaultChannelPipeline#newContext 创建一个DefaultChannelHandlerContext
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) { return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler); }
3. 调用io.netty.channel.DefaultChannelPipeline#addLast0 添加到链条尾部
4. 做一些其他处理
4. 调用过程
读的时候从head开始, 写的时候从tail 开始。
调用过程可以用下图标识:
1. 入站读取数据追踪
io.netty.channel.DefaultChannelPipeline#fireChannelRead代码如下:
@Override public final ChannelPipeline fireChannelRead(Object msg) { AbstractChannelHandlerContext.invokeChannelRead(head, msg); return this; }
读取数据时调用过程如下:
1》 继续调用io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead(io.netty.channel.AbstractChannelHandlerContext, java.lang.Object)
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(m); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRead(m); } }); } }
2》 io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead(java.lang.Object)
private void invokeChannelRead(Object msg) { if (this.invokeHandler()) { try { ((ChannelInboundHandler)this.handler()).channelRead(this, msg); } catch (Throwable var3) { this.invokeExceptionCaught(var3); } } else { this.fireChannelRead(msg); } }
这时候会调用handler的channelRead 方法。也就是具体的handler 的方法。
3》io.netty.channel.DefaultChannelPipeline.HeadContext#channelRead 方法如下: 相当于没做任何逻辑处理,直接调用下一个处理器处理
public void channelRead(ChannelHandlerContext ctx, Object msg) { ctx.fireChannelRead(msg); }
4》io.netty.channel.AbstractChannelHandlerContext#fireChannelRead
@Override public ChannelHandlerContext fireChannelRead(final Object msg) { invokeChannelRead(findContextInbound(), msg); return this; }
io.netty.channel.AbstractChannelHandlerContext#findContextInbound 如下:(可以看到入站是找inbound属性为true的context,然后继续进行调用)
private AbstractChannelHandlerContext findContextInbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.next; } while (!ctx.inbound); return ctx; }
5》继续调用io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead(io.netty.channel.AbstractChannelHandlerContext, java.lang.Object)方法(同1》 一样)
也就是如果希望pipeline 中的context 继续处理,需要在handler中继续调用 ctx.fireXXX 方法,比如io.netty.handler.logging.LoggingHandler#channelRead
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (logger.isEnabled(internalLevel)) { logger.log(internalLevel, format(ctx, "READ", msg)); } ctx.fireChannelRead(msg); }
2. 出站数据跟踪
1》io.netty.channel.DefaultChannelPipeline#write(java.lang.Object, io.netty.channel.ChannelPromise)
public final ChannelFuture write(Object msg, ChannelPromise promise) { return tail.write(msg, promise); }
2》io.netty.channel.AbstractChannelHandlerContext#write(java.lang.Object, io.netty.channel.ChannelPromise)
public ChannelFuture write(final Object msg, final ChannelPromise promise) { write(msg, false, promise); return promise; }
3》io.netty.channel.AbstractChannelHandlerContext#write(java.lang.Object, boolean, io.netty.channel.ChannelPromise)
private void write(Object msg, boolean flush, ChannelPromise promise) { ObjectUtil.checkNotNull(msg, "msg"); try { if (isNotValidPromise(promise, true)) { ReferenceCountUtil.release(msg); // cancelled return; } } catch (RuntimeException e) { ReferenceCountUtil.release(msg); throw e; } final AbstractChannelHandlerContext next = findContextOutbound(flush ? (MASK_WRITE | MASK_FLUSH) : MASK_WRITE); final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { if (flush) { next.invokeWriteAndFlush(m, promise); } else { next.invokeWrite(m, promise); } } else { final AbstractWriteTask task; if (flush) { task = WriteAndFlushTask.newInstance(next, m, promise); } else { task = WriteTask.newInstance(next, m, promise); } if (!safeExecute(executor, task, promise, m)) { // We failed to submit the AbstractWriteTask. We need to cancel it so we decrement the pending bytes // and put it back in the Recycler for re-use later. // // See https://github.com/netty/netty/issues/8343. task.cancel(); } } }
4》io.netty.channel.AbstractChannelHandlerContext#invokeWriteAndFlush
private void invokeWriteAndFlush(Object msg, ChannelPromise promise) { if (invokeHandler()) { invokeWrite0(msg, promise); invokeFlush0(); } else { writeAndFlush(msg, promise); } }
5》io.netty.channel.AbstractChannelHandlerContext#invokeWrite0 这里调用handler的write 方法
private void invokeWrite0(Object msg, ChannelPromise promise) { try { ((ChannelOutboundHandler) handler()).write(this, msg, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } }
6》handler 处理完成如果需要继续处理调用ctx.write(msg, promise); 会重新调用到 io.netty.channel.AbstractChannelHandlerContext#write(java.lang.Object, io.netty.channel.ChannelPromise)