NIO, No blocking IO, 非阻塞式IO。指的是在发生IO操作时不会产生阻塞,利用NIO可以处理高并发和高访问的场景。和之相对的是BIO, Blocking IO,这是标准的阻塞IO。
BIO
一个经典的BIO的例子是一个最简单的C/S模型程序。以下为代码:
public class BIODemoServer {
public static void main(String[] args) throws IOException {
ServerSocket ss = new ServerSocket();
ss.bind(new InetSocketAddress(9999));
System.out.println("wait for connecting ...");
Socket socket = ss.accept(); // 此处产生阻塞
System.out.println("a connector is accessed!");
InputStream input = socket.getInputStream();
input.read();
System.out.println("we had some data!");
}
}
public class BIODemoClient {
public static void main(String[] args) throws IOException {
Socket socket = new Socket();
System.out.println("trying to connect ...");
socket.connect(new InetSocketAddress("127.0.0.1", 9999));
System.out.println("connect success!");
}
}
accept()和connect()都是阻塞方法,在调用的时候,会启动一个阻塞。accept()方法在等到客户端连接上时会释放阻塞;connect()方法在连接上服务器的时候会释放阻塞。
同样,ServerSocket的getInputStream()获取的也是阻塞的IO流。调用read()方法之后,如果客户端迟迟没有向服务端输出数据,那么服务端将会一直卡这里,直到客户端向服务端写数据。
而实际上,客户端的OutputStream的write()方法也是阻塞的。如果服务端没有接收数据。那么write()方法会一直往外(一般是网卡设备的缓冲区)写数据,直到写出到一定量,没有任何一方接收,也就发生了阻塞。
NIO
和BIO相对,NIO就可以实现非阻塞式的IO。也就是说在IO发生的时候,不会产生任何阻塞。
NIO的核心是通道(Channel),IO就是在这个Channel中实现的。Channel的特点在于它可以打开或关闭。打开的Channel连接了某一个实体(设备,文件,网络套接字等),可以通过对Channel的IO操作来读写数据。并且可以配置整个读写过程是没有任何阻塞的。
Channel允许多个线程同时访问,并且可以保存整个过程是线程安全的。
Channel在java中是一个接口,用的最多的实现类有:ServerSocketChannel, SocketChannel。两个类底层是基于TCP协议的。
以下是最简单的NIO使用示例:
public class NIODemoServer {
public static void main(String[] args) throws IOException {
ServerSocketChannel ssc = ServerSocketChannel.open();
// 要想实现非阻塞,必须执行这条语句,否则仍为阻塞
ssc.configureBlocking(false);
ssc.bind(new InetSocketAddress(8888));
System.out.println("a client has been connected!");
ssc.accept(); // 此处不再阻塞
}
}
public class NIODemoClient {
public static void main(String[] args) throws IOException {
SocketChannel sc = SocketChannel.open();
sc.configureBlocking(false); // 设置非阻塞
sc.connect(new InetSocketAddress("127.0.0.1", 8888)); // 此处不再阻塞
System.out.println("connect success!");
}
}
Channel进行IO操作时,用到的元数据是ByteBuffer,这是一个字节缓存区。是一个特定的基本类型元素的有限序列。其本身是一个抽象类,调用allocate(int)静态方法返回对象。
其包含三个重要属性:
- capacity: 缓存区能够保存的元素最大个数
- limit: 限制,从缓存区最多能取的数据个数,从0开始计。如果超出则抛出java.nio.BufferUnderflowException异常。
- position: 当前位置。指的是从哪个位置开始取出数据。
ByteBuffer实际上就是Channel中数据的载体。就好比隧道里的汽车一样,人是数据,汽车是ByteBuffer,隧道是Channel。
allocate(int)方法可以创建一个指定长度的ByteBuffer。
ByteBuffer有一个put(byte)方法,其接受一个字节的数据,指的就是把数据写入缓存区。要想取出数据,则调用get()方法。
每当向ByteBuffer中put一个byte数据之后,position就会递增1。下一次put或get就从position的位置继续,所以以下代码输出的是两个0而不是1.(缓存区中的所有数据默认是0)
public class ByteBufferDemo {
public static void main(String[] args) {
ByteBuffer buffer = ByteBuffer.allocate(10);
byte a = 1;
byte b = 2;
buffer.put(a);
buffer.put(b);
System.out.println(buffer.get());
System.out.println(buffer.get());
}
}
如果想正确取出a和b,那么可以通过position(int)方法修改position位置,通过limit(int)方法可以限制取出数据的个数:
public class ByteBufferDemo {
public static void main(String[] args) {
ByteBuffer buffer = ByteBuffer.allocate(10);
byte a = 1;
byte b = 2;
buffer.put(a);
buffer.put(b);
buffer.position(0); // 从第一个数据开始取出
buffer.limit(2); // 只允许取出两个数据
System.out.println(buffer.get());
System.out.println(buffer.get());
// System.out.println(buffer.get()); // 报异常
}
}
考虑为什么第三个输出会抛异常。因为limit限制了只能取出position为0-1两个byte的数据。第三个输出试图取出position为2的数据,自然抛出异常。
ByteBuffer还有一些很有意思的方法:
- flip(): 反转缓存区,把limit设置为position,把position置为0。
- clear(): 清空缓存区,把position置为0,limit置为capacity。可见这个方法并没有真正地清空缓存区,而是把缓存区的状态设置为初始值。
- hasRemaining(): 返回缓存区中是否还剩余有效的数据。其本质是判断position是否小于limit。
那么,遍历缓存区有效数据的代码如下:
public class ByteBufferDemo {
public static void main(String[] args) {
ByteBuffer buffer = ByteBuffer.allocate(10);
buffer.put((byte) 1);
buffer.put((byte) 2);
buffer.put((byte) 3);
buffer.flip();
while (buffer.hasRemaining()) {
System.out.println(buffer.get());
}
}
}
我们可以把一个byte数组直接封装到ByteBuffer里头。调用静态方法warp(byte[])返回一个大小刚刚好为该byte数组的ByteBuffer。
现在我们有了车(ByteBuffer),并且知道了如何把数据装到车里(put方法),下一步就是如何让车通过隧道了。
ServerSocketChannel的accept()方法会返回一个SocketChannel实例。调用这个实例的read(ByteBuffer)就可以读取数据了。
注意这个返回的实例默认是非阻塞,还是需要调用configureBlocking(boolean)来设置其是否阻塞。
但是因为这个aceept()是不阻塞的,所以如果没有客户端连接服务器,那么将会返回null。可以利用这个来判断是否有客户端连接。
或者是我们可以在accept()处人为设置一个阻塞,来确保有客户端连接上服务端。
public class NIODemoServer {
public static void main(String[] args) throws IOException {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.bind(new InetSocketAddress(8888));
// 要想实现非阻塞,必须执行这条语句,否则仍为阻塞
ssc.configureBlocking(false);
System.out.println("wait for client...");
SocketChannel sc = null;
while (sc == null) { // 手动进行阻塞
sc = ssc.accept();
}
System.out.println("a client has been connected!");
ByteBuffer buffer = ByteBuffer.allocate(10);
sc.configureBlocking(false); // 将通道设置为非阻塞
sc.read(buffer);
System.out.println("We have read some data!");
System.out.println("data: " + new String(buffer.array()));
}
}
public class NIODemoClient {
public static void main(String[] args) throws IOException {
SocketChannel sc = SocketChannel.open();
sc.configureBlocking(false); // 设置非阻塞
System.out.println("trying to connect...");
sc.connect(new InetSocketAddress("127.0.0.1", 8888));
if (!sc.isConnected()) { // 如果连接没有成功,继续进行连接...
sc.finishConnect();
}
System.out.println("connect Server success!");
// 开始向服务器写入数据
ByteBuffer buffer = ByteBuffer.wrap("hello".getBytes());
sc.write(buffer);
System.out.println("write success!");
while (true); // 保持连接
}
}
Selector
一旦使用NIO,稍有不慎就会产生各种异常。因此我们需要一种设计模式能够应对NIO的情况。
最简单的做法是每有一个客户端连接上服务端时,服务端启动一个单独的线程来处理客户端的请求。
这个模型有如下的缺点:
- 如果并发量大,会导致服务端线程过多,很可能导致其宕机。
- 如果客户端在连接服务端之后不进行任何操作,那么连接将会一直保持着,会占用过多无用的CPU调度。
- 接上,还会导致真正需要被处理的线程无法及时地被服务。
为了解决闲置线程的问题,我们可以将闲置的线程设置为阻塞状态。这样CPU在调度的时候不会调度这种线程。所以需要引入事件的监听机制。
这里引入Selector(多路复用选择器)。它起到的就是事件监听的作用。
Selector会实时监听所有连接上服务器的线程。如果线程没有做任何事情,Selector会让线程沉睡。如果线程发生了accept(),connect(),write(),read()行为,Selector会让线程醒来并执行操作。
Selector从一定程度上减少了线程调度器的压力,让CPU在大部分时间内可以做有意义的事情。
在处理短请求的情况下,为了解决线程过多的情况,可以使用一个线程来处理所有的客户端请求。使用NIO技术可以实现多用户的并发,使用Selector来判断事件的发生。
java已经提供好Selector了,实例化Selector使用Selector的open()静态方法。
要想让Selector提供监听,需要先注册事件,这些事件定义在SelectionKey中,均是int型常数。常见的事件有:
- OP_ACCEPT: 表示接收事件
- OP_CONNCT: 表示连接事件
- OP_WRITE: 表示写事件
- OP_READ: 表示读事件
通过ServerSocketChannel的register(Selector,int)方法可以注册Selector。
调用Selector的select()方法,会产生一个阻塞,直到有注册的监听事件被触发。
当有任何一个事件触发后,我们可以通过selectedKeys()获取触发事件的集合。这表示所有连接上服务端的客户集合。那么遍历这个集合就达到了遍历所有已经连上的客户端的作用。
这个集合保存的是SelectionKey对象。每个对象可以判断是否发生了某个事件(上述4个事件)。通过这样的判断我们可以对不同的事件进行处理。
在accept()事件发生之后,需要调用SelectionKey的channel()方法取得当前会话的ServerSocketChannel对象(需要强制转换),随后调用accept()。
其它事件的处理技巧类似,不过channel()方法将返回SocketChannel,这个SocketChannel就是accept()后通过ServerSocketChannel取得的socketChannel,所以这里可以不用设置非阻塞了。
在本示例中,在客户端连接上服务器之后(也就是ACCEPT事件发生之后),服务器就可以向客户端写数据了。也就是保持连接的客户端的WRITE事件是一直发生的。
而因为NIO,服务端可以持续不断向客户端写数据。所以在写数据的时候,我们需要进行一些处理,防止服务端向客户端过度写数据。
可以通过一些简单的二进制计算,在WRITE或READ事件发生之后,就把发生过的事件给去掉。这里可以通过SelectionKey的interestOps()方法取得现有事件,然后与当前事件的取反结果进行一次按位与计算即可。
public class Server {
// implement a Selector Server
public static void main(String[] args) throws IOException {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false); // use no-blocking.
ssc.bind(new InetSocketAddress(6666));
// create a selector
Selector selector = Selector.open();
// register the selector and event ACCEPT.
ssc.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
selector.select(); // block here ... wait for client connect ...
// get all events' set.
Set<SelectionKey> set = selector.selectedKeys();
Iterator<SelectionKey> ite = set.iterator();
while (ite.hasNext()) {
SelectionKey key = ite.next();
if (key.isAcceptable()) { // ACCEPT event.
// the serverSocketChannel for new client.
ServerSocketChannel serverSocketChannel =
(ServerSocketChannel) key.channel();
SocketChannel socketChannel = null;
while (socketChannel == null) {
socketChannel = serverSocketChannel.accept();
}
socketChannel.configureBlocking(false);
System.out.println("Thread " + Thread.currentThread().getId()
+ " is now servicing a client ...");
// register WRITE and READ event for socketChannel.
socketChannel.register(selector, SelectionKey.OP_READ
| SelectionKey.OP_WRITE);
}
if (key.isReadable()) { // READ event.
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(10);
socketChannel.read(buffer);
System.out.println("Server has read some data: "
+ new String(buffer.array()));
// remove current event.
socketChannel.register(selector, key.interestOps() &
~SelectionKey.OP_WRITE);
}
// WRITE event, this will happen after connecting success.
if (key.isWritable()) {
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.wrap("nihao!".getBytes());
socketChannel.write(buffer);
System.out.println("Server has wrote some data to client.");
// remove current event.
socketChannel.register(selector, key.interestOps() &
~SelectionKey.OP_WRITE);
}
// delete this event, to prevent repeated calls to the event.
ite.remove();
}
}
}
}
public class Client {
public static void main(String[] args) throws IOException,
InterruptedException {
SocketChannel sc = SocketChannel.open();
sc.configureBlocking(false);
sc.connect(new InetSocketAddress(
"127.0.0.1", 6666));
while (!sc.isConnected()) {
sc.finishConnect();
}
Thread.sleep(2000);
// write data to Server.
System.out.println("Write some data to Server...");
ByteBuffer buffer = ByteBuffer.wrap("hello!".getBytes());
sc.write(buffer);
Thread.sleep(2000);
// read data from Server.
ByteBuffer receive = ByteBuffer.allocate(20);
sc.read(receive);
System.out.println("Client received some data: "
+ new String(receive.array()));
while (true);
}
}
NIO因为非阻塞的特性,要实现正常的数据交互代价是很大的。上面的代码通过sleep()方法延迟获取数据,但在实际中是不可能这么用的