Java NIO

一、Java NIO简介

NIO 是一种同步非阻塞的 I/O 模型,在 Java 1.4 中引入了 NIO 框架,对应 java.nio 包,提供了 ChannelSelectorBuffer 等抽象。

NIO 中的 N 可以理解为 Non-blocking,不单纯是 New。它支持面向缓冲的,基于通道的 I/O 操作方法。 NIO 提供了与传统 BIO 模型中的 SocketServerSocket 相对应的 SocketChannelServerSocketChannel 两种不同的套接字通道实现,两种通道都支持阻塞和非阻塞两种模式。阻塞模式使用就像传统中的支持一样,比较简单,但是性能和可靠性都不好;非阻塞模式正好与之相反。对于低负载、低并发的应用程序,可以使用同步阻塞 I/O 来提升开发速率和更好的维护性;对于高负载、高并发的(网络)应用,应使用 NIO 的非阻塞模式来开发。

NIO 和 BIO 的区别

Non-blocking IO(非阻塞)

BIO 是阻塞的,NIO 是非阻塞的

BIO 的各种流是阻塞的。这意味着,当一个线程调用 read()write() 时,该线程被阻塞,直到有一些数据被读取,或数据完全写入。在此期间,该线程不能再干其他任何事。

NIO 使我们可以进行非阻塞 IO 操作。比如说,单线程中从通道读取数据到 buffer,同时可以继续做别的事情,当数据读取到 buffer 中后,线程再继续处理数据。写数据也是一样的。另外,非阻塞写也是如此。一个线程请求写入一些数据到某通道,但不需要等待它完全写入,这个线程同时可以去做别的事情。

Buffer(缓冲区)

BIO 面向流(Stream oriented),而 NIO 面向缓冲区(Buffer oriented)

Buffer 是一个对象,它包含一些要写入或者要读出的数据。在 NIO 类库中加入 Buffer 对象,体现了 NIO 与 BIO 的一个重要区别。在面向流的 BIO 中可以将数据直接写入或者将数据直接读到 Stream 对象中。虽然 Stream 中也有 Buffer 开头的扩展类,但只是流的包装类,还是从流读到缓冲区,而 NIO 却是直接读到 Buffer 中进行操作。

在 NIO 厍中,所有数据都是用缓冲区处理的。在读取数据时,它是直接读缓冲区中的数据; 在写入数据时,写入到缓冲区中。任何时候访问 NIO 中的数据,都是通过缓冲区进行操作。

最常用的缓冲区是 ByteBuffer,一个 ByteBuffer 提供了一组功能用于操作 byte 数组。除了 ByteBuffer,还有其他的一些缓冲区,事实上,每一种 Java 基本类型(除了 Boolean 类型)都对应有一种缓冲区。

Channel (通道)

NIO 通过 Channel(通道) 进行读写。

通道是双向的,可读也可写,而流的读写是单向的。无论读写,通道只能和 Buffer 交互。因为 Buffer,通道可以异步地读写。

Selector (选择器)

NIO 有选择器,而 IO 没有。

选择器用于使用单个线程处理多个通道。因此,它需要较少的线程来处理这些通道。线程之间的切换对于操作系统来说是昂贵的。 因此,为了提高系统效率选择器是有用的。

NIO 的基本流程

通常来说 NIO 中的所有 IO 都是从 Channel(通道) 开始的。

  • 从通道进行数据读取 :创建一个缓冲区,然后请求通道读取数据。
  • 从通道进行数据写入 :创建一个缓冲区,填充数据,并要求通道写入数据。

NIO 核心组件

NIO 包含下面几个核心的组件:

  • Channel(通道)
  • Buffer(缓冲区)
  • Selector(选择器)

二、Channel(通道)

通道(Channel)是对 BIO 中的流的模拟,可以通过它读写数据。

Channel,类似在 Linux 之类操作系统上看到的文件描述符,是 NIO 中被用来支持批量式 IO 操作的一种抽象。

File 或者 Socket,通常被认为是比较高层次的抽象,而 Channel 则是更加操作系统底层的一种抽象,这也使得 NIO 得以充分利用现代操作系统底层机制,获得特定场景的性能优化,例如,DMA(Direct Memory Access)等。不同层次的抽象是相互关联的,我们可以通过 Socket 获取 Channel,反之亦然。

通道与流的不同之处在于:

  • 流是单向的 - 一个流只能单纯的负责读或写。
  • 通道是双向的 - 一个通道可以同时用于读写。

通道包括以下类型:

  • FileChannel:从文件中读写数据;
  • DatagramChannel:通过 UDP 读写网络中数据;
  • SocketChannel:通过 TCP 读写网络中数据;
  • ServerSocketChannel:可以监听新进来的 TCP 连接,对每一个新进来的连接都会创建一个 SocketChannel。

FileChannel

//使用FileChannel读取文件数据
public class FileChannelDemo1 {
    public static void main(String[] args) throws IOException {
        //创建FileChannel
        RandomAccessFile aFile = new RandomAccessFile("e:\\1.txt", "rw");
        FileChannel fileChannel = aFile.getChannel();

        //创建ByteBuffer容量为1024字节
        ByteBuffer buf = ByteBuffer.allocate(1024);

        //读取数据到buffer中
        int bytesRead = fileChannel.read(buf);
        while(bytesRead != -1){
            System.out.println("读取了" + bytesRead);
            buf.flip();//读写反转
            while(buf.hasRemaining()){
                System.out.print((char)buf.get());
            }
            System.out.println();
            buf.clear();
            bytesRead = fileChannel.read(buf);
        }
        aFile.close();
        System.out.println("操作结束");
    }
}

//使用FileChannel写出数据
public class FileChannelDemo2 {
    public static void main(String[] args) throws IOException {
        //创建FileChanel
        RandomAccessFile accessFile = new RandomAccessFile("e:\\2.txt","rw");
        FileChannel fileChannel = accessFile.getChannel();
        //创建ByteBuffer
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        String str = "hello world";
        //向缓冲区中写入数据
        buffer.put(str.getBytes());
        buffer.flip();//读写反转
        while(buffer.hasRemaining()){
            fileChannel.write(buffer);
        }
        //关闭通道
        fileChannel.close();
        System.out.println("写入数据完成");
    }
}

DatagramChannel

public class DatagramChannelDemo1 {
    @Test
    public void send() throws Exception {
        DatagramChannel sendChannel = DatagramChannel.open();
        InetSocketAddress sendAddress = new InetSocketAddress("127.0.0.1",9999);
        while(true){
            ByteBuffer buffer = ByteBuffer.wrap("你好 wuyazi".getBytes("UTF-8"));
            sendChannel.send(buffer,sendAddress);
            System.out.println("发送完成");
            Thread.sleep(1000);
        }
    }
    @Test
    public void receive() throws Exception {
        DatagramChannel receiveChannel = DatagramChannel.open();
        receiveChannel.bind(new InetSocketAddress(9999));
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        while(true){
            buffer.clear();
            SocketAddress receive = receiveChannel.receive(buffer);
            System.out.println(receive.toString());
            buffer.flip();//读写反转
            System.out.println(StandardCharsets.UTF_8.decode(buffer));

        }
    }
    @Test
    public void connect() throws Exception{
        DatagramChannel channel = DatagramChannel.open();
        channel.bind(new InetSocketAddress(9999));
        //连接
        channel.connect(new InetSocketAddress("127.0.0.1",9999));
        channel.write(ByteBuffer.wrap("你好,yaona".getBytes("UTF-8")));
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        while(true){
            buffer.clear();
            channel.read(buffer);
            buffer.flip();
            System.out.println(StandardCharsets.UTF_8.decode(buffer));
        }
    }
}

SocketChannel

public class SocketChannelDemo1 {
    public static void main(String[] args) throws IOException {
        //创建SocketChannel
        //方式1
        SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("www.baidu.com",80));
        //方式2
//        SocketChannel socketChannel1 = SocketChannel.open();
//        socketChannel1.connect(new InetSocketAddress("www.baidu.com",80));
        socketChannel.configureBlocking(false);//设置非阻塞式
        //读操作
        ByteBuffer buffer = ByteBuffer.allocate(16);
        socketChannel.read(buffer);
        socketChannel.close();
        System.out.println("over");

    }
}

ServerSocketChannel

//ServerSocketChannel测试
public class ServerSocketChannelDemo1 {
    public static void main(String[] args) throws IOException, InterruptedException {
        //设置端口号
        int port = 8888;
        //创建Buffer
        ByteBuffer buffer = ByteBuffer.wrap("hello wyz".getBytes());
        //创建ServerSocketChannel
        ServerSocketChannel ssc = ServerSocketChannel.open();
        //绑定一个端口
        ssc.socket().bind(new InetSocketAddress(port));
        //设置非阻塞模式
        ssc.configureBlocking(false);
        //监听是否有连接传入
        while(true){
            System.out.println("Waiting for Connections");
            SocketChannel socketChannel = ssc.accept();
            if(socketChannel == null){
                System.out.println("null");
                Thread.sleep(2000);
            }else{
                System.out.println("Incoming connection from:" + socketChannel.getRemoteAddress());
                buffer.rewind();//指针指向0
                socketChannel.write(buffer);
                socketChannel.close();//关闭
            }

        }
    }
}

三、Buffer(缓冲区)

NIO 与传统 I/O 不同,它是基于块(Block)的,它以块为基本单位处理数据。Buffer 是一块连续的内存块,是 NIO 读写数据的缓冲。Buffer 可以将文件一次性读入内存再做后续处理,而传统的方式是边读文件边处理数据。

Channel 读写的数据都必须先置于缓冲区中。也就是说,不会直接对通道进行读写数据,而是要先经过缓冲区。缓冲区实质上是一个数组,但它不仅仅是一个数组。缓冲区提供了对数据的结构化访问,而且还可以跟踪系统的读/写进程。

BIO 和 NIO 已经很好地集成了,java.io.* 已经以 NIO 为基础重新实现了,所以现在它可以利用 NIO 的一些特性。例如,java.io.* 包中的一些类包含以块的形式读写数据的方法,这使得即使在面向流的系统中,处理速度也会更快。

缓冲区包括以下类型:

  • ByteBuffer
  • CharBuffer
  • ShortBuffer
  • IntBuffer
  • LongBuffer
  • FloatBuffer
  • DoubleBuffer

缓冲区状态变量

  • capacity:最大容量;
  • position:当前已经读写的字节数;
  • limit:还可以读写的字节数。
  • mark:记录上一次 postion 的位置,默认是 0,算是一个便利性的考虑,往往不是必须 的。

缓冲区状态变量的改变过程举例:

  1. 新建一个大小为 8 个字节的缓冲区,此时 position 为 0,而 limit = capacity = 8。capacity 变量不会改变,下面的讨论会忽略它。
  2. 从输入通道中读取 5 个字节数据写入缓冲区中,此时 position 移动设置为 5,limit 保持不变。
  3. 在将缓冲区的数据写到输出通道之前,需要先调用 flip() 方法,这个方法将 limit 设置为当前 position,并将 position 设置为 0。
  4. 从缓冲区中取 4 个字节到输出缓冲中,此时 position 设为 4。
  5. 最后需要调用 clear() 方法来清空缓冲区,此时 position 和 limit 都被设置为最初位置。

文件 NIO 示例

//利用NIO实现文件的拷贝
public class FileCopy {
    public static void main(String[] args) throws IOException {
        fastCopy("e:\\java.txt","e:\\javaCopy.txt");
    }
    private static void fastCopy(String src,String desc) throws IOException {
        //定义输入流
        FileInputStream fis = new FileInputStream(src);
        //获取输入通道
        FileChannel fin = fis.getChannel();
        //定义输出流
        FileOutputStream fos = new FileOutputStream(desc);
        //获取输出通道
        FileChannel fout = fos.getChannel();

        //创建缓冲区
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        while(true){
            int len = fin.read(buffer);//读取数据到缓冲区
            if(len == -1){
                break;
            }
            buffer.flip();//读写切换
            //把缓冲区的数据写入文件
            fout.write(buffer);

            //缓冲区清空
            buffer.clear();
        }
        //关闭通道和流
        fout.close();
        fos.close();
        fin.close();
        fis.close();

    }
}

DirectBuffer

NIO 还提供了一个可以直接访问物理内存的类 DirectBuffer。普通的 Buffer 分配的是 JVM 堆内存,而 DirectBuffer 是直接分配物理内存。

数据要输出到外部设备,必须先从用户空间复制到内核空间,再复制到输出设备,而 DirectBuffer 则是直接将步骤简化为从内核空间复制到外部设备,减少了数据拷贝。

这里拓展一点,由于 DirectBuffer 申请的是非 JVM 的物理内存,所以创建和销毁的代价很高。DirectBuffer 申请的内存并不是直接由 JVM 负责垃圾回收,但在 DirectBuffer 包装类被回收时,会通过 Java 引用机制来释放该内存块。

public class BufferDemo1 {
    @Test
    public void buffer1() throws Exception {
        //从文件中读取数据到buffer,然后输出内容
        RandomAccessFile accessFile = new RandomAccessFile("e:\\java.txt", "rw");
        FileChannel fileChannel = accessFile.getChannel();

        //创建buffer
        ByteBuffer buffer = ByteBuffer.allocate(100);
        int len;
        while ((len = fileChannel.read(buffer)) != -1) {
            buffer.flip();//读写模式转换
            while (buffer.hasRemaining()) {
                System.out.print((char) buffer.get());
            }
            buffer.clear();//清空缓冲区
        }
        accessFile.close();
    }

    @Test
    public void buffer2() {
        IntBuffer buffer = IntBuffer.allocate(10);
        //放数据
        for (int i = 0; i < buffer.capacity(); i++) {
            int value = (i + 1) * 2;
            buffer.put(value);
        }
        buffer.flip();
        while (buffer.hasRemaining()) {
            System.out.print(buffer.get() + " ");
        }

    }

    @Test
    public void buffer3() {
        //创建子缓冲区
        ByteBuffer buffer = ByteBuffer.allocate(10);
        for (int i = 0; i < buffer.capacity(); i++) {
            buffer.put((byte) i);
        }
        //设置子缓冲区的起始位置和大小
        buffer.position(3);
        buffer.limit(7);
        //创建子缓冲区
        ByteBuffer slice = buffer.slice();
        for (int i = 0; i < slice.capacity(); i++) {
            byte value = slice.get(i);
            value *= 10;
            slice.put(i, value);
        }
        //重新设置position位置和limit,读取缓冲区数据
        buffer.position(0);
        buffer.limit(buffer.capacity());
        while (buffer.hasRemaining()) {
            System.out.print(buffer.get() + " ");
        }

    }
    @Test
    public void buffer4(){
      //只读缓冲区
      ByteBuffer buffer = ByteBuffer.allocate(10);
      for(int i = 0; i < buffer.capacity(); i++){
          buffer.put((byte)i);
      }
      //创建只读缓冲区
      ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer();
      //改变原缓冲区
      for(int i = 0; i < buffer.position(); i++){
          byte value = buffer.get(i);
          value *= 10;
          buffer.put(i,value);
      }

      readOnlyBuffer.position(0);
      readOnlyBuffer.limit(buffer.capacity());
      while(readOnlyBuffer.hasRemaining()){
          System.out.println(readOnlyBuffer.get());
      }


    }

    //直接缓冲区
    @Test
    public void buffer5() throws Exception{
        FileInputStream fin = new FileInputStream("e:\\java.txt");
        FileChannel finChannel = fin.getChannel();

        FileOutputStream fout = new FileOutputStream("e:\\java1.txt");
        FileChannel foutChannel = fout.getChannel();

        ByteBuffer buffer = ByteBuffer.allocateDirect(1024);//创建直接缓存
        int len;
        while((len = finChannel.read(buffer)) != -1){
            buffer.flip();
            while(buffer.hasRemaining()){
                foutChannel.write(buffer);
            }
            buffer.clear();
        }
        finChannel.close();
        fin.close();
        foutChannel.close();
        fout.close();
    }
    private static final int START = 0;
    private static final int END = 1024;
    //内存映射文件IO
    public void test6() throws Exception{
        RandomAccessFile accessFile = new RandomAccessFile("e:\\test.txt","rw");
        FileChannel channel = accessFile.getChannel();
        MappedByteBuffer map = channel.map(FileChannel.MapMode.READ_WRITE, START, END);
        map.put(0,(byte)97);
        map.put(1023,(byte)123);
        accessFile.close();
        channel.close();

    }

}

四、Selector(选择器)

NIO 常常被叫做非阻塞 IO,主要是因为 NIO 在网络通信中的非阻塞特性被广泛使用。

Selector 是 Java NIO 编程的基础。用于检查一个或多个 NIO Channel 的状态是否处于可读、可写。

NIO 实现了 IO 多路复用中的 Reactor 模型

  • 一个线程(Thread)使用一个选择器 Selector 通过轮询的方式去监听多个通道 Channel 上的事件(accpetread,如果某个 Channel 上面发生监听事件,这个 Channel 就处于就绪状态,然后进行 I/O 操作。
  • 通过配置监听的通道 Channel 为非阻塞,那么当 Channel 上的 IO 事件还未到达时,就不会进入阻塞状态一直等待,而是继续轮询其它 Channel,找到 IO 事件已经到达的 Channel 执行。
  • 因为创建和切换线程的开销很大,因此使用一个线程来处理多个事件而不是一个线程处理一个事件具有更好的性能。

需要注意的是,只有 SocketChannel 才能配置为非阻塞,而 FileChannel 不能,因为 FileChannel 配置非阻塞也没有意义。

目前操作系统的 I/O 多路复用机制都使用了 epoll,相比传统的 select 机制,epoll 没有最大连接句柄 1024 的限制。所以 Selector 在理论上可以轮询成千上万的客户端。

创建选择器

Selector selector = Selector.open();

将通道注册到选择器上

ServerSocketChannel ssChannel = ServerSocketChannel.open();
ssChannel.configureBlocking(false);
ssChannel.register(selector, SelectionKey.OP_ACCEPT);

通道必须配置为非阻塞模式,否则使用选择器就没有任何意义了,因为如果通道在某个事件上被阻塞,那么服务器就不能响应其它事件,必须等待这个事件处理完毕才能去处理其它事件,显然这和选择器的作用背道而驰。

在将通道注册到选择器上时,还需要指定要注册的具体事件,主要有以下几类:

  • SelectionKey.OP_CONNECT
  • SelectionKey.OP_ACCEPT
  • SelectionKey.OP_READ
  • SelectionKey.OP_WRITE

它们在 SelectionKey 的定义如下:

public static final int OP_READ = 1 << 0;
public static final int OP_WRITE = 1 << 2;
public static final int OP_CONNECT = 1 << 3;
public static final int OP_ACCEPT = 1 << 4;

可以看出每个事件可以被当成一个位域,从而组成事件集整数。例如:

int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;

监听事件

int num = selector.select();

使用 select() 来监听到达的事件,它会一直阻塞直到有至少一个事件到达。

获取到达的事件

Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = keys.iterator();
while (keyIterator.hasNext()) {
    SelectionKey key = keyIterator.next();
    if (key.isAcceptable()) {
        // ...
    } else if (key.isReadable()) {
        // ...
    }
    keyIterator.remove();
}

事件循环

因为一次 select() 调用不能处理完所有的事件,并且服务器端有可能需要一直监听事件,因此服务器端处理事件的代码一般会放在一个死循环内。

while (true) {
    int num = selector.select();
    Set<SelectionKey> keys = selector.selectedKeys();
    Iterator<SelectionKey> keyIterator = keys.iterator();
    while (keyIterator.hasNext()) {
        SelectionKey key = keyIterator.next();
        if (key.isAcceptable()) {
            // ...
        } else if (key.isReadable()) {
            // ...
        }
        keyIterator.remove();
    }
}

套接字 NIO 示例

public class SelectorDemo1 {
    @Test
    public void server() throws Exception{
        //创建一个选择器
        Selector selector = Selector.open();
        //创建一个服务端Channel
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);//配置为非阻塞的
        //绑定端口
        ssc.socket().bind(new InetSocketAddress("127.0.0.1",8080));
        ssc.register(selector, SelectionKey.OP_ACCEPT);//注册到选择器

        while(true){
            selector.select();//监听到达事件,除非有一个事件到达,否则一直阻塞到这里
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
            while (keyIterator.hasNext()){
                SelectionKey key = keyIterator.next();
                if(key.isAcceptable()){//可连接
                    ServerSocketChannel ssChannel = (ServerSocketChannel) key.channel();
                    //服务器为每一个新连接建立一个SocketChannel
                    SocketChannel accept = ssChannel.accept();
                    accept.configureBlocking(false);//配置为非阻塞的
                    accept.register(selector,SelectionKey.OP_READ);//这个连接主要用于读取数据

                }else if(key.isReadable()){//可读
                    SocketChannel socketChannel = (SocketChannel) key.channel();
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    int len;
                    while((len = socketChannel.read(buffer)) != -1){
                        buffer.flip();
                        System.out.print(new String(buffer.array(),0,len));
                        buffer.clear();
                    }
                    System.out.println();
//                    System.out.println(readDataFromSocketChannel(socketChannel));
                    socketChannel.close();//关闭channel
                }
                keyIterator.remove();
            }
        }
    }
    private static String readDataFromSocketChannel(SocketChannel sChannel) throws IOException {

        ByteBuffer buffer = ByteBuffer.allocate(1024);
        StringBuilder data = new StringBuilder();

        while (true) {

            buffer.clear();
            int n = sChannel.read(buffer);
            if (n == -1) {
                break;
            }
            buffer.flip();
            int limit = buffer.limit();
            char[] dst = new char[limit];
            for (int i = 0; i < limit; i++) {
                dst[i] = (char) buffer.get(i);
            }
            data.append(dst);
            buffer.clear();
        }
        return data.toString();
    }
    @Test
    public void client() throws Exception{
        SocketChannel  socketChannel = SocketChannel.open(
                new InetSocketAddress("127.0.0.1",8080));
        socketChannel.configureBlocking(false);
        ByteBuffer buffer = ByteBuffer.wrap("hello world".getBytes());
        socketChannel.write(buffer);
        System.out.println("客户端发送数据完成");
        socketChannel.close();
    }
}

五、NIO vs. BIO

BIO 与 NIO 最重要的区别是数据打包和传输的方式:BIO 以流的方式处理数据,而 NIO 以块的方式处理数据

  • 面向流的 BIO 一次处理一个字节数据:一个输入流产生一个字节数据,一个输出流消费一个字节数据。为流式数据创建过滤器非常容易,链接几个过滤器,以便每个过滤器只负责复杂处理机制的一部分。不利的一面是,面向流的 I/O 通常相当慢。
  • 面向块的 NIO 一次处理一个数据块,按块处理数据比按流处理数据要快得多。但是面向块的 NIO 缺少一些面向流的 BIO 所具有的优雅性和简单性。

参考资料

http://tutorials.jenkov.com/java-nio/index.html

Java NIO

上一篇:Python - 3.6 学习二


下一篇:linux select poll epoll IO多路复用简单使用