Netty概述

前言

如果说http对应着《Jetty概述》,那为tcp就对应着netty.Netty对于程序员的重要性可想而知

Netty架构

netty主要有以下三部份构成:

  1. ByteBuf内存
  2. Channel管道
  3. Pipeline事件模式
    Netty概述

ByteBuf内存篇

内核概述与NIO》与《JAVA直接内存(堆外内存)》中详细介绍了buff操作。分别对应Netty中的

  • UnpooledHeapByteBuf(堆内)
  • UnpooledDirectByteBuf (堆外)
  • UnpooledUnsafeHeapByteBuf(unsafe堆内)
  • UnpooledUnsafeDirectByteBuf(unsafe堆外)
  1. Netty对ByteBuf增加了基本控制,诸如 readerIndex, writerIndex, markedReaderIndex, markedWriterIndex, maxCapacity。
  2. 使用虚拟机提但的UNSAFE类可以直接跳过虚拟机的检测,可提交内存分配数据
  3. 使用ResourceLeakDetector封装RefQueue来检测堆外内存的回收

通过阅读AbstractNioByteChannel中的代码,read()方法发生在nio的op_read事件时触发

        @Override
        public final void read() {
            final ChannelConfig config = config();
            if (shouldBreakReadReady(config)) {
                clearReadPending();
                return;
            }
            //每个channel对应一个PPLine
            final ChannelPipeline pipeline = pipeline();
            //ByteBuf分配器
            final ByteBufAllocator allocator = config.getAllocator();
            //容量计算器
            final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
            //重置,把之前计数的值全部清空
            allocHandle.reset(config);
 
            ByteBuf byteBuf = null;
            boolean close = false;
            try {
                do {
                    //分配内存,关键在于计算分配内存的大小(小了不够,大了浪费)
                    byteBuf = allocHandle.allocate(allocator);
                    //doReadBytes,从socket读取字节到byteBuf,返回真实读取数量
                    //更新容量计算器
                    allocHandle.lastBytesRead(doReadBytes(byteBuf));
                    
                    //如果小于0 则socket关闭,如果等于0则没读取到数据
                    if (allocHandle.lastBytesRead() <= 0) {
                        //释放资源
                        byteBuf.release();
                        byteBuf = null;
                        //如果小于0则意味着socket关闭
                        close = allocHandle.lastBytesRead() < 0;
                        if (close) {
                            readPending = false;
                        }
                        break;
                    }
 
                    //增加循环计数器
                    allocHandle.incMessagesRead(1);
                    readPending = false;
                    //把读取到的数据,交给管道去处理
                    pipeline.fireChannelRead(byteBuf);
                    byteBuf = null;
                    //判断是否继续从socket读取数据
                } while (allocHandle.continueReading());
 
                //读取完成后调用readComplete,重新估算内存分配容量
                allocHandle.readComplete();
                //事件激发
                pipeline.fireChannelReadComplete();
 
                //如果需要关闭,则处理关闭
                if (close) {
                    closeOnRead(pipeline);
                }
            } catch (Throwable t) {
                handleReadException(pipeline, byteBuf, t, close, allocHandle);
            } finally {
                //根据情况移除OP_READ事件
                if (!readPending && !config.isAutoRead()) {
                    removeReadOp();
                }
            }
        }
  1. ByteBufAllocator 是内存分配者,用于申请内存
  2. RecvByteBufAllocator.Handle 内存读取策略,不可能一次性读完所有的数据
  3. allocHandle.lastBytesRead方法标记已读取的内容大下
  4. doReadBytes是真正的从管道读取数据
  5. 从上述代码可知,如果读取数据过大,会多次触发pipeline.fireChannelRead
ByteToMessageDecoder解决多次触发问题

从上述源码分析得,如果NIO一次性传取的数据过大会多次触发fireChannelRead。可在代码编写的时候感觉不出来,Netty在ByteToMessageDecoder做处理. ByteToMessageDecoder往往是头部pipeline.

   @Override
   public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //只处理字节缓冲区类型的
        if (msg instanceof ByteBuf) {
            CodecOutputList out = CodecOutputList.newInstance();
            try {
                first = cumulation == null;
                // 如果是第一次读取则创建一个,不是则累加
                cumulation = cumulator.cumulate(ctx.alloc(),
                        first ? Unpooled.EMPTY_BUFFER : cumulation, (ByteBuf) msg);
                
                // 具体解析数据,到数据符合规则解析到out对象中。
                // 如果不合规定,不操作,等待后面的数据累加      
                callDecode(ctx, cumulation, out);
            } catch (DecoderException e) {
                throw e;
            } catch (Exception e) {
                throw new DecoderException(e);
            } finally {
                 //当数据不为空也不可读,要释放。证明数据有丢失不处理
                if (cumulation != null && !cumulation.isReadable()) {
                    numReads = 0;
                    cumulation.release();
                    cumulation = null;
                } else if (++ numReads >= discardAfterReads) {
                    //读取数据的次数大于阈值,则尝试丢弃已读的,避免占着内存
                    numReads = 0;
                    discardSomeReadBytes();
                }

                int size = out.size();
                //有被添加或者设置,表是有读过了
                firedChannelRead |= out.insertSinceRecycled();
                // 如果size > 0  尝试向下面Handler传递数据
                fireChannelRead(ctx, out, size);
                out.recycle();
            }
        } else {
            ctx.fireChannelRead(msg);//其他类型继续传递
        }
    }

Channel通道

网络IO模型》中epoll框架.Netty框架运行例子如下

        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.option(ChannelOption.SO_BACKLOG, 1024);
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .handler(new LoggingHandler(LogLevel.INFO))
             .childHandler(new ChannelInitializer(){
                 @Override
                 protected void initChannel(Channel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     // http加解码器
                     p.addLast(new HttpServerCodec());
                     // 自定义handler
                     p.addLast(new HttpHelloWorldServerHandler());
                 }
             });

            Channel ch = b.bind(PORT).sync().channel();

            System.err.println("Open your web browser and navigate to http://127.0.0.1:" + PORT + '/');

            ch.closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

其中bind调用过程如下所示
Netty概述

  1. initAndRegister创建的管道并调用init做初始化
  2. init对channel进行了pipeline的ServerBootstrapAcceptor设置及bossGroup的使用
  3. register0将channel事件注册并获取 selectionKey
  4. run 循环遍历监听nio事件,并分配pipeline处理
  5. 通过pipeline事件,ServerBootstrapAcceptor获取连接进来的childChannel,对其注册并配置workerGroup
  6. childChannel注册流程和channel相似

其中bossGroup,workerGroup的设计为传统Reactor反应堆设计模式。
Netty概述

Pipeline事件模型

在Netty 中每个Channel 都有且仅有一个ChannelPipeline 与之对应。
Netty概述
Pipeline的元素包括2部份:

  • Context 标记Handler的位置,状态及提供上下文的依赖,事件传播
  • Handler 专门处理事件
Pipeline 的事件传播机制

Netty 中的传播事件可以分为两种:Inbound 事件和Outbound 事件。
Netty概述
从上图可以看出,inbound 事件和outbound 事件的流向是不一样的,inbound 事件的流行是从下至上,而outbound刚好相反,是从上到下。

通过 AbstractChannelHandlerContext 中 executionMask属性来标识MASK_ALL_INBOUND,MASK_ALL_OUTBOUND 所支持的事务。

// 向下链路查找符合当前事件传播规则的Context
// mask 标识当前要传递的事件
private AbstractChannelHandlerContext findContextInbound(int mask) {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.next;
        } while ((ctx.executionMask & mask) == 0);
        return ctx;
    }

链表中head 是一个ChannelOutboundHandler,而tail则是一个ChannelInboundHandler.表示当Inbound事件没人处理tail空处理,反之Outbound没人处理head空处理

主要参考

Netty4源码分析——框架总结
源码分析Netty系列
Netty之大动脉Pipeline
Netty 启动过程源码分析

上一篇:不要慌,这个EXCEL模板帮你完成数据报表


下一篇:Redis管道符(pipeline)