NIO - 非阻塞式IO

NIO, No blocking IO, 非阻塞式IO。指的是在发生IO操作时不会产生阻塞,利用NIO可以处理高并发和高访问的场景。和之相对的是BIO, Blocking IO,这是标准的阻塞IO。

BIO

一个经典的BIO的例子是一个最简单的C/S模型程序。以下为代码:


public class BIODemoServer {

    public static void main(String[] args) throws IOException {
        ServerSocket ss = new ServerSocket();
        ss.bind(new InetSocketAddress(9999));

        System.out.println("wait for connecting ...");

        Socket socket = ss.accept();    // 此处产生阻塞

        System.out.println("a connector is accessed!");

        InputStream input = socket.getInputStream();
        input.read();

        System.out.println("we had some data!");
    }
}

public class BIODemoClient {

    public static void main(String[] args) throws IOException {
        Socket socket = new Socket();
        System.out.println("trying to connect ...");
        socket.connect(new InetSocketAddress("127.0.0.1", 9999));
        System.out.println("connect success!");
    }

}

accept()和connect()都是阻塞方法,在调用的时候,会启动一个阻塞。accept()方法在等到客户端连接上时会释放阻塞;connect()方法在连接上服务器的时候会释放阻塞。

同样,ServerSocket的getInputStream()获取的也是阻塞的IO流。调用read()方法之后,如果客户端迟迟没有向服务端输出数据,那么服务端将会一直卡这里,直到客户端向服务端写数据。

而实际上,客户端的OutputStream的write()方法也是阻塞的。如果服务端没有接收数据。那么write()方法会一直往外(一般是网卡设备的缓冲区)写数据,直到写出到一定量,没有任何一方接收,也就发生了阻塞。

NIO

和BIO相对,NIO就可以实现非阻塞式的IO。也就是说在IO发生的时候,不会产生任何阻塞。

NIO的核心是通道(Channel),IO就是在这个Channel中实现的。Channel的特点在于它可以打开或关闭。打开的Channel连接了某一个实体(设备,文件,网络套接字等),可以通过对Channel的IO操作来读写数据。并且可以配置整个读写过程是没有任何阻塞的。

Channel允许多个线程同时访问,并且可以保存整个过程是线程安全的。

Channel在java中是一个接口,用的最多的实现类有:ServerSocketChannel, SocketChannel。两个类底层是基于TCP协议的。

以下是最简单的NIO使用示例:


public class NIODemoServer {

    public static void main(String[] args) throws IOException {
        ServerSocketChannel ssc = ServerSocketChannel.open();
        // 要想实现非阻塞,必须执行这条语句,否则仍为阻塞
        ssc.configureBlocking(false);
        ssc.bind(new InetSocketAddress(8888));
        System.out.println("a client has been connected!");
        ssc.accept();   // 此处不再阻塞
    }

}

public class NIODemoClient {

    public static void main(String[] args) throws IOException {
        SocketChannel sc = SocketChannel.open();
        sc.configureBlocking(false);   // 设置非阻塞
        sc.connect(new InetSocketAddress("127.0.0.1", 8888));  // 此处不再阻塞
        System.out.println("connect success!");
    }

}

Channel进行IO操作时,用到的元数据是ByteBuffer,这是一个字节缓存区。是一个特定的基本类型元素的有限序列。其本身是一个抽象类,调用allocate(int)静态方法返回对象。

其包含三个重要属性:

  • capacity: 缓存区能够保存的元素最大个数
  • limit: 限制,从缓存区最多能取的数据个数,从0开始计。如果超出则抛出java.nio.BufferUnderflowException异常。
  • position: 当前位置。指的是从哪个位置开始取出数据。

ByteBuffer实际上就是Channel中数据的载体。就好比隧道里的汽车一样,人是数据,汽车是ByteBuffer,隧道是Channel。

allocate(int)方法可以创建一个指定长度的ByteBuffer。

ByteBuffer有一个put(byte)方法,其接受一个字节的数据,指的就是把数据写入缓存区。要想取出数据,则调用get()方法。

每当向ByteBuffer中put一个byte数据之后,position就会递增1。下一次put或get就从position的位置继续,所以以下代码输出的是两个0而不是1.(缓存区中的所有数据默认是0)

public class ByteBufferDemo {

    public static void main(String[] args) {
        ByteBuffer buffer = ByteBuffer.allocate(10);
        byte a = 1;
        byte b = 2;
        buffer.put(a);
        buffer.put(b);
        System.out.println(buffer.get());
        System.out.println(buffer.get());
    }

}

如果想正确取出a和b,那么可以通过position(int)方法修改position位置,通过limit(int)方法可以限制取出数据的个数:

public class ByteBufferDemo {

    public static void main(String[] args) {
        ByteBuffer buffer = ByteBuffer.allocate(10);
        byte a = 1;
        byte b = 2;
        buffer.put(a);
        buffer.put(b);
        buffer.position(0);  // 从第一个数据开始取出
        buffer.limit(2);     // 只允许取出两个数据
        System.out.println(buffer.get());
        System.out.println(buffer.get());
//      System.out.println(buffer.get());   // 报异常
    }
}

考虑为什么第三个输出会抛异常。因为limit限制了只能取出position为0-1两个byte的数据。第三个输出试图取出position为2的数据,自然抛出异常。

ByteBuffer还有一些很有意思的方法:

  • flip(): 反转缓存区,把limit设置为position,把position置为0。
  • clear(): 清空缓存区,把position置为0,limit置为capacity。可见这个方法并没有真正地清空缓存区,而是把缓存区的状态设置为初始值。
  • hasRemaining(): 返回缓存区中是否还剩余有效的数据。其本质是判断position是否小于limit。

那么,遍历缓存区有效数据的代码如下:

public class ByteBufferDemo {

    public static void main(String[] args) {
        ByteBuffer buffer = ByteBuffer.allocate(10);
        buffer.put((byte) 1);
        buffer.put((byte) 2);
        buffer.put((byte) 3);

        buffer.flip();
        while (buffer.hasRemaining()) {
            System.out.println(buffer.get());
        }

    }

}

我们可以把一个byte数组直接封装到ByteBuffer里头。调用静态方法warp(byte[])返回一个大小刚刚好为该byte数组的ByteBuffer。

现在我们有了车(ByteBuffer),并且知道了如何把数据装到车里(put方法),下一步就是如何让车通过隧道了。

ServerSocketChannel的accept()方法会返回一个SocketChannel实例。调用这个实例的read(ByteBuffer)就可以读取数据了。

注意这个返回的实例默认是非阻塞,还是需要调用configureBlocking(boolean)来设置其是否阻塞。

但是因为这个aceept()是不阻塞的,所以如果没有客户端连接服务器,那么将会返回null。可以利用这个来判断是否有客户端连接。

或者是我们可以在accept()处人为设置一个阻塞,来确保有客户端连接上服务端。

public class NIODemoServer {

    public static void main(String[] args) throws IOException {
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.bind(new InetSocketAddress(8888));
        // 要想实现非阻塞,必须执行这条语句,否则仍为阻塞
        ssc.configureBlocking(false);

        System.out.println("wait for client...");

        SocketChannel sc = null;
        while (sc == null) {   // 手动进行阻塞
            sc = ssc.accept();
        }

        System.out.println("a client has been connected!");

        ByteBuffer buffer = ByteBuffer.allocate(10);
        sc.configureBlocking(false);   // 将通道设置为非阻塞
        sc.read(buffer);

        System.out.println("We have read some data!");
        System.out.println("data: " + new String(buffer.array()));

    }

}

public class NIODemoClient {

    public static void main(String[] args) throws IOException {
        SocketChannel sc = SocketChannel.open();
        sc.configureBlocking(false);   // 设置非阻塞

        System.out.println("trying to connect...");

        sc.connect(new InetSocketAddress("127.0.0.1", 8888));
        if (!sc.isConnected()) {   // 如果连接没有成功,继续进行连接...
            sc.finishConnect();
        }

        System.out.println("connect Server success!");

        // 开始向服务器写入数据
        ByteBuffer buffer = ByteBuffer.wrap("hello".getBytes());
        sc.write(buffer);

        System.out.println("write success!");

        while (true);  // 保持连接
    }

}

Selector

一旦使用NIO,稍有不慎就会产生各种异常。因此我们需要一种设计模式能够应对NIO的情况。

最简单的做法是每有一个客户端连接上服务端时,服务端启动一个单独的线程来处理客户端的请求。

这个模型有如下的缺点:

  • 如果并发量大,会导致服务端线程过多,很可能导致其宕机。
  • 如果客户端在连接服务端之后不进行任何操作,那么连接将会一直保持着,会占用过多无用的CPU调度。
  • 接上,还会导致真正需要被处理的线程无法及时地被服务。

为了解决闲置线程的问题,我们可以将闲置的线程设置为阻塞状态。这样CPU在调度的时候不会调度这种线程。所以需要引入事件的监听机制。

这里引入Selector(多路复用选择器)。它起到的就是事件监听的作用。

Selector会实时监听所有连接上服务器的线程。如果线程没有做任何事情,Selector会让线程沉睡。如果线程发生了accept(),connect(),write(),read()行为,Selector会让线程醒来并执行操作。

Selector从一定程度上减少了线程调度器的压力,让CPU在大部分时间内可以做有意义的事情。

在处理短请求的情况下,为了解决线程过多的情况,可以使用一个线程来处理所有的客户端请求。使用NIO技术可以实现多用户的并发,使用Selector来判断事件的发生。

java已经提供好Selector了,实例化Selector使用Selector的open()静态方法。

要想让Selector提供监听,需要先注册事件,这些事件定义在SelectionKey中,均是int型常数。常见的事件有:

  • OP_ACCEPT: 表示接收事件
  • OP_CONNCT: 表示连接事件
  • OP_WRITE: 表示写事件
  • OP_READ: 表示读事件

通过ServerSocketChannel的register(Selector,int)方法可以注册Selector。

调用Selector的select()方法,会产生一个阻塞,直到有注册的监听事件被触发。

当有任何一个事件触发后,我们可以通过selectedKeys()获取触发事件的集合。这表示所有连接上服务端的客户集合。那么遍历这个集合就达到了遍历所有已经连上的客户端的作用。

这个集合保存的是SelectionKey对象。每个对象可以判断是否发生了某个事件(上述4个事件)。通过这样的判断我们可以对不同的事件进行处理。

在accept()事件发生之后,需要调用SelectionKey的channel()方法取得当前会话的ServerSocketChannel对象(需要强制转换),随后调用accept()。

其它事件的处理技巧类似,不过channel()方法将返回SocketChannel,这个SocketChannel就是accept()后通过ServerSocketChannel取得的socketChannel,所以这里可以不用设置非阻塞了。

在本示例中,在客户端连接上服务器之后(也就是ACCEPT事件发生之后),服务器就可以向客户端写数据了。也就是保持连接的客户端的WRITE事件是一直发生的。

而因为NIO,服务端可以持续不断向客户端写数据。所以在写数据的时候,我们需要进行一些处理,防止服务端向客户端过度写数据。

可以通过一些简单的二进制计算,在WRITE或READ事件发生之后,就把发生过的事件给去掉。这里可以通过SelectionKey的interestOps()方法取得现有事件,然后与当前事件的取反结果进行一次按位与计算即可。

public class Server {

    // implement a Selector Server
    public static void main(String[] args) throws IOException {
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);   // use no-blocking.
        ssc.bind(new InetSocketAddress(6666));

        // create a selector
        Selector selector = Selector.open();
        // register the selector and event ACCEPT.
        ssc.register(selector, SelectionKey.OP_ACCEPT);

        while (true) {
            selector.select();  // block here ... wait for client connect ...

            // get all events' set.
            Set<SelectionKey> set = selector.selectedKeys();
            Iterator<SelectionKey> ite = set.iterator();
            while (ite.hasNext()) {

                SelectionKey key = ite.next();
                if (key.isAcceptable()) {      // ACCEPT event.

                    // the serverSocketChannel for new client.
                    ServerSocketChannel serverSocketChannel =
                            (ServerSocketChannel) key.channel();
                    SocketChannel socketChannel = null;
                    while (socketChannel == null) {
                        socketChannel = serverSocketChannel.accept();
                    }
                    socketChannel.configureBlocking(false);

                    System.out.println("Thread " + Thread.currentThread().getId()
                        + " is now servicing a client ...");

                    // register WRITE and READ event for socketChannel.
                    socketChannel.register(selector, SelectionKey.OP_READ
                            | SelectionKey.OP_WRITE);

                }

                if (key.isReadable()) {  // READ event.

                    SocketChannel socketChannel = (SocketChannel) key.channel();
                    ByteBuffer buffer = ByteBuffer.allocate(10);
                    socketChannel.read(buffer);
                    System.out.println("Server has read some data: "
                            + new String(buffer.array()));

                    // remove current event.
                    socketChannel.register(selector, key.interestOps() &
                            ~SelectionKey.OP_WRITE);
                }

                // WRITE event, this will happen after connecting success.
                if (key.isWritable()) {

                    SocketChannel socketChannel = (SocketChannel) key.channel();
                    ByteBuffer buffer = ByteBuffer.wrap("nihao!".getBytes());
                    socketChannel.write(buffer);
                    System.out.println("Server has wrote some data to client.");

                    // remove current event.
                    socketChannel.register(selector, key.interestOps() &
                            ~SelectionKey.OP_WRITE);

                }

                // delete this event, to prevent repeated calls to the event.
                ite.remove();
            }
        }
    }
}

public class Client {

    public static void main(String[] args) throws IOException,
            InterruptedException {

        SocketChannel sc = SocketChannel.open();
        sc.configureBlocking(false);

        sc.connect(new InetSocketAddress(
                "127.0.0.1", 6666));

        while (!sc.isConnected()) {
            sc.finishConnect();
        }

        Thread.sleep(2000);

        // write data to Server.
        System.out.println("Write some data to Server...");
        ByteBuffer buffer = ByteBuffer.wrap("hello!".getBytes());
        sc.write(buffer);

        Thread.sleep(2000);

        // read data from Server.
        ByteBuffer receive = ByteBuffer.allocate(20);
        sc.read(receive);

        System.out.println("Client received some data: "
                + new String(receive.array()));

        while (true);
    }

}

NIO因为非阻塞的特性,要实现正常的数据交互代价是很大的。上面的代码通过sleep()方法延迟获取数据,但在实际中是不可能这么用的

上一篇:elasticsearch(四) index 跨集群迁移


下一篇:opengl使用VAO和VBO绘制三角形时glVertexAttribPointer的调用位置不正确产生的小问题