netty和nio

netty是一个nio客户机-服务器框架,它简化了tcp和udp网络编程,相对于java传统nio,netty还屏蔽了操作系统的差异性,并且兼顾了性能。

Channel

channel封装了对socket的原子操作,实质是对socket的封装和扩展。

netty框架自己定义的通道接口,是对java nio channel的封装和扩展,客户端NIO套接字通道是NioSocketChannel,提供的服务器端NIO套接字通道是NioServerSocketChannel。

NioSocketChannel内部管理了一个Java NIO的SocketChanne实例,用来创建SocketChannel实例和设置该实例的属性,并调用Connect 方法向服务端发起 TCP 链接等。

NioServerSocketChannel内部管理了Java NIO的ServerSocketChannel实例,用来创建ServerSocketChannel实例和设置该实例属性,并调用实例的bind方法在指定端口监听客户端的链接。

EventLoopGroup

netty使用了主从多线程的Reactor模型。在netty中每个EventLoopGroup本身是一个线程池,其中包含了自定义个数的 NioEventLoop,每个NioEventLoop是一个线程,并且每个NioEventLoop都会关联一个selector选择器。

客户端一般只用一个EventLoopGroup来处理网络IO操作,服务器端一般使用两个,boss-group用来接收客户端发来的TCP链接请求,worker-group用来具体处理网络请求。

当Channel是客户端通道NioSocketChannel时,会注册NioSocketChannel管理的SocketChannel实例到自己关联的NioEventLoop的selector选择器上,然后NioEventLoop对应的线程会通过select命令监控感兴趣的网络读写事件。

当 Channel 是服务端通道NioServerSocketChannel时,NioServerSocketChannel本身会被注册到boss EventLoopGroup里面的某一个NioEventLoop管理的selector选择器,而完成三次握手的链接套接字是被注册到了worker EventLoopGroup里面的某一个NioEventLoop管理的selector选择器上。

多个Channel可以注册到同一个NioEventLoop管理的selector选择器上,这时NioEventLoop对应的单个线程就可以处理多个Channel的就绪事件;但是每个Channel只能注册到一个固定的NioEventLoop管理的selector上。

ChannelPipeline

ChannelPipeline持有一个ChannelHandler的双向链结构。每个Channel都有属于自己的ChannelPipeline,对从Channel 中读取或者要写入 Channel 中的数据进行依次处理。

多ChannelPipeline里面可以复用一个ChannelHandler。

Netty 客户端底层与 Java NIO 对应关系

NioSocketChannel是对Java NIO的SocketChannel的封装,从NioSocketChannel的构造函数可以看出:

public class NioSocketChannel extends AbstractNioByteChannel implements SocketChannel {
    private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();

    public NioSocketChannel() {
        this(DEFAULT_SELECTOR_PROVIDER);
    }

    public NioSocketChannel(SelectorProvider provider) {
        this(newSocket(provider));
    }
    
    private static java.nio.channels.SocketChannel newSocket(SelectorProvider provider) {
        try {
            return provider.openSocketChannel();
        } catch (IOException var2) {
            throw new ChannelException("Failed to open a socket.", var2);
        }
    }
}

SocketChannel的父类是AbstractNioChannel,在AbstractNioChannel中定义了java nio的SocketChannel类型的成员,并将其模式设置为非阻塞:

public abstract class AbstractNioChannel extends AbstractChannel {
    private final SelectableChannel ch;
    
    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        ...
        this.ch = ch;
        this.readInterestOp = readInterestOp;
        ch.configureBlocking(false);
    }
}

所以说NioSocketChannel内部是对java nio的SocketChannel的封装扩展,创建NioSocketChannel实例对象时候相当于执行了Java NIO中:

SocketChannel socketChannel = SocketChannel.open();

NioSocketChannel实例的创建和注册是在Bootstrap的connect阶段。Bootstrap的connect代码:

public class Bootstrap extends AbstractBootstrap<Bootstrap, Channel> {
    ...
    public ChannelFuture connect(InetAddress inetHost, int inetPort) {
        return this.connect(new InetSocketAddress(inetHost, inetPort));
    }
 
    public ChannelFuture connect(SocketAddress remoteAddress) {
        ...
        return this.doResolveAndConnect(remoteAddress, this.config.localAddress());
    }
 
    private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
        ChannelFuture regFuture = this.initAndRegister();
        ...
    }
    
    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            channel = this.channelFactory.newChannel();
            this.init(channel);
        } catch (Throwable var3) {}

        ChannelFuture regFuture = this.config().group().register(channel);
        ...
    }

channelFactory.newChannel()创建了NIOSocketChannel实例。

继续跟踪this.config().group().register(channel)到:

public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
    ...
    public ChannelFuture register(Channel channel) {
        return this.register((ChannelPromise)(new DefaultChannelPromise(channel, this)));
    }
    
    public ChannelFuture register(ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        promise.channel().unsafe().register(this, promise);
        return promise;
    }

最终会跟踪到AbstractNioChannel的doRegister():

public abstract class AbstractNioChannel extends AbstractChannel {
    ...
    protected void doRegister() throws Exception {
        boolean selected = false;

        while(true) {
            try {
                this.selectionKey = this.javaChannel().register(this.eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException var3) {
                if (selected) {
                    throw var3;
                }

                this.eventLoop().selectNow();
                selected = true;
            }
        }
    }
}    

其中:

this.javaChannel().register(this.eventLoop().unwrappedSelector(), 0, this)

将NioSocketChannel实例注册到了当前NioEventLoop的选择器中。

从选择器获取就绪的事件是在该客户端套接关联的NioEventLoop里面的做的,每个NioEventLoop里面有一个线程用来循环从选择器里面获取就绪的事件:

    protected void run() {
        for (;;) {
            try {
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;
                    case SelectStrategy.SELECT:
                        select(wakenUp.getAndSet(false));

                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                    default:
                }
                ...
                if (ioRatio == 100) {
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        runAllTasks();
                    }
                } else {
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        final long ioTime = System.nanoTime() - ioStartTime;
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                }
            } catch (Throwable t) {}
        }
    }

    private void select(boolean oldWakenUp) throws IOException {
        Selector selector = this.selector;

        try {
            while(true) {
                ...
                int selectedKeys = selector.select(timeoutMillis);
                ++selectCnt;
                ...
        } catch (CancelledKeyException var13) {}

    }

从选择器选取就绪的事件后,会最终调用processSelectedKey具体处理每个事件:

    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        ...
        try {
            int readyOps = k.readyOps();
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);

                unsafe.finishConnect();
            }

            // Process OP_WRITE
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            }

            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {}
    }

netty和reactor模型

netty实现单线程reactor:

private EventLoopGroup group = new NioEventLoopGroup(1);
ServerBootstrap bootstrap = new ServerBootstrap()
    .group(group)
    .childHandler(new HeartbeatInitializer());

netty实现多线程reactor:

// 默认线程数为CPU核心数 * 2,*2是因为考虑到超线程CPU包括两个逻辑线程
private EventLoopGroup group = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap()
    .group(boss)
    .childHandler(new HeartbeatInitializer());

netty实现主从多线程reactor:

private EventLoopGroup boss = new NioEventLoopGroup();
private EventLoopGroup work = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap()
    .group(boss, work)
    .childHandler(new HeartbeatInitializer());
上一篇:数据库常用SQL操作


下一篇:运行时数据区