【Netty4】Netty核心组件之NioEventLoop(二)处理消息

文章目录

系类文章:
《Netty服务端启动源码分析(一)整体流程》
《Netty服务端启动源码分析(二)服务端Channel的端口绑定》
《Netty核心组件之NioEventLoop(一)创建》
《Netty核心组件之NioEventLoop(二)处理消息》

1. 概述

本篇接《Netty核心组件之NioEventLoop(一)创建》之后,上一篇文章中我们讲述了NioEventLoop的创建过程,这里继续讲述消息是如何处理的。

2. processSelectedKeys()

private void c() {
    // 看文章最开头是否启用优化的设置,如果启用了会走这里
    if (selectedKeys != null) {
        processSelectedKeysOptimized();
    } else {
        processSelectedKeysPlain(selector.selectedKeys());
    }
}


为什么processSelectedKeysOptimized不需要入参,而processSelectedKeysPlain需要入参?参考 《netty对selectedKeys进行优化》

从源码可以看到,processSelectedKeysOptimized和processSelectedKeysPlain的大部分处理逻辑是相同的,区别就在于对selectedKey的迭代逻辑,记得一开始说过如果开启了优化,netty对selectedKey的底层集合进行了优化,将HashSet改为了数组,HashSet底层用HashMap实现,迭代的效率是没有数组高的。

既然内部逻辑类似,重点看一下processSelectedKeysOptimized()方法:

private void processSelectedKeysOptimized() {
    for (int i = 0; i < selectedKeys.size; ++i) {
        final SelectionKey k = selectedKeys.keys[i];
        // 方便GC回收
        selectedKeys.keys[i] = null;

        final Object a = k.attachment();

        // 根据类型不同执行不同的处理逻辑
        if (a instanceof AbstractNioChannel) {
           //[1]
            processSelectedKey(k, (AbstractNioChannel) a);
        } else {
            // [2]一般不会走这个分支,除非用户主动注册NioTask到selector,netty单元测试里有案例
            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
            processSelectedKey(k, task);
        }
        // 如果为true,则重置之后的所有selectKey,并调用selectNow()方法
        // 因为run()方法执行本方法前已经置为false,所以不会进这里
        if (needsToSelectAgain) {
            selectedKeys.reset(i + 1);
            selectAgain();
            i = -1;
        }
    }
}

【2】NioTask分支一般不会走,感兴趣可以看一下netty的单元测试。

重点看一下【1】AbstractNioChannel分支,如果attachment是AbstractNioChannel类型,说明它是NioServerSocketChannel或者NioSocketChannel,需要进行IO读写相关的操作。

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    // SelectionKey无效的处理
    if (!k.isValid()) {
        final EventLoop eventLoop;
        try {
            eventLoop = ch.eventLoop();
        } catch (Throwable ignored) {
            // channel没有关联的eventLoop直接返回
            return;
        }
        // channel关联的eventLoop不是本eventLoop,直接返回,不应关闭channel
        if (eventLoop != this || eventLoop == null) {
            return;
        }
        // 关闭channel
        unsafe.close(unsafe.voidPromise());
        return;
    }

    try {
        int readyOps = k.readyOps();
        //[1] 对于NioSocketChannel,连接需要先finishConnect才能继续读写
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
            // 下面3行的操作只是将OP_CONNECT从感兴趣选项中移除,防止重复触发
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps(ops);

            unsafe.finishConnect();
        }

        // [2]说明有半包消息未发送完成,调用flush发送即可
        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
            ch.unsafe().forceFlush();
        }

        //[3] unsafe是多态,对于NioServerSocketChannel,read就是接受客户端TCP连接
        // 对于NioSocketChannel,就是从channel中读取ByteBuffer
        // 同时检测readyOps == 0 是解决JDK的循环bug
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
        }
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    }
}
  • 【1】处理connect事件
  • 【2】处理write事件
  • 【3】处理读和Accept事件 疑问:为什么读和Accept在一起处理?

这里可以看到熟悉的代码,当有链接(OP_ACCEPT )进来的时候,便会走 unsafe.read():这个里面就调用了doReadMessages,汇总下事件类型:

  • Connect, 即连接事件(TCP 连接), 对应于SelectionKey.OP_CONNECT int值为16
  • Accept, 即确认事件, 对应于SelectionKey.OP_ACCEPT int值为8
  • Read, 即读事件, 对应于SelectionKey.OP_READ, 表示 buffer 可读 int值为1
  • Write, 即写事件, 对应于SelectionKey.OP_WRITE, 表示 buffer 可写 int值为4
上一篇:模拟栈与队列


下一篇:[源码解析] 深度学习分布式训练框架 horovod (7) --- DistributedOptimizer