JAVA NIO整理

I/O模型

Java共支持3种网络编程模型I/O模式:BIO、NIO、AIO。
BIO: 同步并阻塞。就是Java原生的IO,一请求一应答,一个连接一个线程,即客户端有连接请求时服务端就需要启动一个线程处理,并且如果这个连接没做任何事情会造成不必要的线程开销。可以使用线程池机制来改善。
NIO: 同步非阻塞,服务器实现模式为一个线程处理多个请求(连接),客户端发送的链接请求都会注册到多路复用器上,多路复用器轮训到连接有IO请求就处理。
AIO: 异步非阻塞,NIO2版本,目前来说应用还不是很广泛。

BIO以流的方式处理数据,NIO以块的方式处理数据,块IO的效率比流IO高很多。BIO基于字节流和字符流进行操作,而NIO基于Channel(通道)和Buffer(缓冲区)进行操作,数据总是从通道读取到缓冲区或从缓冲区写入到通道。Selector选择器用于监听多个通道的事件,比如连接请求,数据到达。因此使用单个线程就可以监听多个客户端通道。

NIO三个核心原理:Selector、Channel、Buffer
1、每个Channel都对应一个Buffer
2、Selelctor对应一个线程,一个线程对应一个Channel连接
3、下图反应了有三个channel注册到该selector
4、Selector会根据不同事件在各个通道上切换
5、Buffer是一个内存块,底层是一个数组
6、NIO的Buffer可以读也可以写,需要filp方法切换
JAVA NIO整理

Buffer: 是一个可以读写数据的内存块,对象里提供了一组方法可以更轻松使用内存块,程序读写数据都必须经过Buffer。常用Buffer子类有ByteBuffer、ShortBuffer、CharBuffer、IntBuffer、LongBuffer、DoubleBuffer、FloatBuffer,它们可以将对应的数据存储到缓冲区。
Buffer类定义了所有的缓冲区都具有的四个属性:

    private int position = 0; // 位置,下一个要被读或写的元素的索引,每次读写缓存区数据时都会改变这个值,为下次读写作准备
    private int limit; // 缓冲区终点,不能对超过极限的位置进行读写操作,可以修改
    private int capacity; // 可以容纳的最大数据量,缓冲区创建时就被设定且不可改变
    private int mark = -1; // 标记

主要方法:

public abstract class Buffer {
    // 返回此缓冲区的容量
    public final int capacity() {}

    // 返回缓冲区容量
    public final int position() {}

    // 设置缓冲区容量
    public final Buffer position(int newPosition) {}

    // 返回此缓冲区的限制
    public final int limit() {}

    // 设置此缓冲区的限制
    public final Buffer limit(int newLimit) {}

    // 清除缓冲区,将各个标记恢复到初始状态,但是数据并没有真正擦除
    public final Buffer clear() {
    }

    // 反转此缓冲区
    public final Buffer flip() {
        limit = position;
        position = 0;
        mark = -1;
        return this;
    }
    // 当前位置和限制之间是否有元素
    public final boolean hasRemaining() {
        return position < limit;
    }

    // 此缓冲区是否为只读 抽象方法
    public abstract boolean isReadOnly();

    // 此缓冲区是否具有可访问的底层实现数组  抽象方法 since 1.6 
    public abstract boolean hasArray();

    // 返回底层数组实现  抽象方法 since 1.6
    public abstract Object array();

}

Buffer类最常用的子类是用于网络传输的ByteBuffer:

public abstract class ByteBuffer extends Buffer implements Comparable<ByteBuffer>
{
    // 创建直接缓冲区
    public static ByteBuffer allocateDirect(int capacity) {
        return new DirectByteBuffer(capacity);
    }

    // 设置缓冲区的初始容量
    public static ByteBuffer allocate(int capacity) {
        if (capacity < 0)
            throw new IllegalArgumentException();
        return new HeapByteBuffer(capacity, capacity);
    }

    /** 存取相关API  **/
    // 从当前位置position上get,get之后position自动+1
    public abstract byte get();

    // 从当前位置上添加,put之后,position自动+1
    public abstract ByteBuffer put(byte b);

    // 从绝对位置get
    public abstract byte get(int index);

    // 从绝对位置上put
    public abstract ByteBuffer put(int index, byte b);

    // 其他api
    ......
}

Channel: 我们知道BIO中stream流是单向的,例如FileInputStream对象只能进行读取。而NIO中的通道是双向的,可读也可写。
Channel类是一个接口:

public interface Channel extends Closeable {}

常用的Channel类有:FileChannel用于文件读写、DatagramChannel用于UDP数据读写、ServerSocketChannel和SocketChannel用于TCP数据读写。
FileChannel抽象类:

// 从通道读取数据放入缓冲区(注意读写要站在内存Buffer的角度看)
public abstract int read(ByteBuffer dst);
public abstract long read(ByteBuffer[] dsts, int offset, int length);

// 将缓冲区的数据写入到通道
public abstract int write(ByteBuffer src);

// 将目标通道中数据复制到当前通道
public abstract long transferFrom(ReadableByteChannel src, long position, long count);
// 将数据从当前通道复制给目标通道
public abstract long transferTo(long position, long count, WritableByteChannel target)

下面一个实例帮助理解,ByteBuffer和FileChannel,实现一个txt文件内容拷贝到另一个txt文件:

public static void main(String[] args) throws Exception {

      // 读取磁盘中的输入流
      FileInputStream inputStream = new FileInputStream("c://test/a.txt");
      FileChannel inputStreamChannel = inputStream.getChannel();

      FileOutputStream outputStream = new FileOutputStream("c://test/b.txt");
      FileChannel outputStreamChannel = outputStream.getChannel();

      ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
      while (true){

          // 清空Buffer
          byteBuffer.clear();

          // 将输入通道中的数据读到缓冲区中
          int read = inputStreamChannel.read(byteBuffer);
          if(read == -1){
                break;
          }
          // 将缓冲区中的数据写入到输出通道中
          byteBuffer.flip();
          outputStreamChannel.write(byteBuffer);
    }

    inputStream.close();
    outputStream.close();
}

关于Buffer和Channel的注意事项和细节:
1、ByteBuffer支持类型化put和get,put放入的是什么类型,get就应该使用相应类型的方法取出,否则可能有BufferUnderflowException。
2、一个普通的Buffer可以转成只读Buffer,使用buffer.asReadOnlyBuffer()方法。
3、NIO还提供一种系统级别比较高的MappedByteBuffer抽象类,可以让文件直接在内存(堆外内存)修改,操作系统不需要拷贝一次。
4、NIO还支持分散读取(Scattering Reads)聚集写入(Gathering Writes):分散读取指将通道中的数据分散读到多个缓冲区中,采用buffer数组依次写入;聚集写入指将多个缓冲区的数据聚集写到通道中,采用buffer数组,依次读。

Selector: Selector能检测多个注册的通道上是否有事件发生,多个Channel以事件的方式可以注册到同一个Selector。

特点说明:
1、IO线程NioEventLoop(时间轮训)聚合了Selector(多路复用),可以同时并发处理成百上千个客户端连接。
2、当线程从某客户端Socket通道进行读写数据时,若没有数据可用,该线程可以进行其他任务。
3、线程通常将非阻塞IO的空闲时间用于在其他通道上执行IO操作,所以单独的线程可以管理多个输入和树池通道。
4、由于读写操作都是非阻塞的,这就可以充分提升IO线程的运行效率,避免由于频繁I/O阻塞导致的线程挂起。
5、一个I/O线程可以并发处理N个客户端连接和读写操作,这从根本上解决了传统同步阻塞IO一连接一线程模型,架构的性能、弹性伸缩能力和可靠性都得到了极大的提升。

Selector API介绍:

public abstract class Selector implements Closeable {
    // 得到一个选择器对象
    public static Selector open() throws IOException {}
    // 监控所有注册的通道,当其中有IO操作可以进行时,将对应的SelectionKey加入到内部集合中并返回,阻塞方法,参数用来设置超时时间,在规定时间没有事件就会返回。
    public abstract int select(long timeout) throws IOException;
    // 从内部集合几种得到所有的SelectionKey
    public abstract Set<SelectionKey> selectedKeys();
    // 不阻塞,立马返回
    public abstract int selectNow() throws IOException;
    // 唤醒Selector
    public abstract Selector wakeup();
}

NIO非阻塞网络编程原理分析:(Selector、SelectionKey、ServerSocketChannel、SocketChannel)关系。
1、当客户端连接时,会通过ServerSocketChannel得到SocketChannel
2、将SocketChannel注册到Selector上
3、注册后返回一个SelectionKey,会和该Selector关联(集合)
4、Selector进行监听(用select方法),返回有事件发生的通道的个数
5、进一步得到各个SelectionKey(有事件发生)
6、再通过SelectionKey反向获取SocketChannel
7、可以通过得到的Channel完成业务处理
JAVA NIO整理

下面一个NIO实现服务器和客户端之间的简单数据通讯,理解NIO非阻塞网络编程机制:
服务端:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;

public class NIOServer {

    public static void main(String[] args) throws IOException {

        // 创建ServerSocketChannel
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

        // 创建Selector
        Selector selector = Selector.open();

        // 绑定一个端口6666,在服务端监听
        serverSocketChannel.socket().bind(new InetSocketAddress(6666));

        // 设置非阻塞
        serverSocketChannel.configureBlocking(false);

        // 把serverSocketChannel注册到selector  事件为OP_ACCEPT
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);


        // 循环等待客户端连接
        while (true){
            //等待一秒钟,如果还没有连接时间发生 就去干别的事   selectNow()
            if(selector.select(1000) == 0){
                System.out.println("服务端等待了1秒,无连接");
                continue;
            }

            // 获取相关的SelectionKey集合,通过SelectionKey反向获取通道
            Set<SelectionKey> selectionKeys = selector.selectedKeys();

            Iterator<SelectionKey> iterator = selectionKeys.iterator();

            while (iterator.hasNext()){
                SelectionKey selectionKey = iterator.next();
                // 有新的客户端连接
                if(selectionKey.isAcceptable()){
                    // 为该客户端生成一个socketChannel
                    SocketChannel socketChannel = serverSocketChannel.accept();

                    System.out.println("客户端连接成功,生成了一个SocketChannel:" + socketChannel.hashCode());

                    // 将socketChannel设为非阻塞
                    socketChannel.configureBlocking(false);

                    // socketChannel注册到selector,关注事件为OP_READ,同时关联一个buffer
                    socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));
                }

                if(selectionKey.isReadable()){
                    // 获取到channel和buffer
                    SocketChannel channel = (SocketChannel)selectionKey.channel();

                    ByteBuffer buffer = (ByteBuffer)selectionKey.attachment();

                    channel.read(buffer);

                    System.out.println("from 客户端:" + new String(buffer.array()));
                }

                // 从当前集合中移除selectionKey,防止并发重复
                iterator.remove();
            }

        }

    }

}

客户端:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;

public class NIOClient {

    public static void main(String[] args) throws IOException {
        SocketChannel socketChannel = SocketChannel.open();

        socketChannel.configureBlocking(false);

        InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 6666);

        if(!socketChannel.connect(inetSocketAddress)){
            while (!socketChannel.finishConnect()){
                System.out.println("因为连接需要时间,客户端不会阻塞,可以在这里进行其它操作...");
            }
        }

        String str = "Hello, guy";
        ByteBuffer byteBuffer = ByteBuffer.wrap(str.getBytes());

        socketChannel.write(byteBuffer);
        System.in.read();

    }

}

先运行服务端,再运行客户端,结果为:

服务端等待了1秒,无连接
服务端等待了1秒,无连接
服务端等待了1秒,无连接
服务端等待了1秒,无连接
服务端等待了1秒,无连接
客户端连接成功,生成了一个SocketChannel:681842940
from 客户端:Hello, guy
服务端等待了1秒,无连接
服务端等待了1秒,无连接
服务端等待了1秒,无连接
服务端等待了1秒,无连接

总结

NIO的优点是效率高,原因是它是非阻塞的;使用零拷贝,不需要将数据读取到OS内核缓冲区,直接从内存中读写文件;一个线程处理多个请求(连接),管理多个输入和输出通道,请求都会注册到多路复用器上,多路复用器轮训到连接有IO请求就处理。
缺点是NIO底层由 epoll 实现,该实现饱受诟病的空轮询 bug 会导致 cpu 飙升 100%。项目庞大之后,自行实现的 NIO 很容易出现各类 bug,维护成本较高
由此,netty进行了很大程度的改善。

资料:
Jave Guide BIO,NIO,AIO 总结:https://snailclimb.gitee.io/javaguide/#/docs/java/BIO-NIO-AIO
彻底搞懂NIO效率高的原理:https://mp.weixin.qq.com/s/wVoHfhh28Vh5sgKQbPXk8w
NIO效率高的原理之零拷贝与直接内存映射:https://www.jianshu.com/p/a4325188f974

上一篇:Java NIO-1——示例代码-1——网络传输


下一篇:Java NIO