文章目录
1. 前言
了解过并发的相信都知道在Java
中有BIO
、NIO
和AIO
三种IO
操作方式。但实际上还有中为IO
多路复用,而大名鼎鼎的Netty
框架使用的就是IO
多路复用模型。
为了了解高并发IO
的底层原理,本文参考《Java
高并发核心编程》的第1
卷来进行展开学习。当然学习的所有代码以及笔记都会放置在项目NettyLearning,感兴趣的可以自取。
Q:什么情况下可以称为高并发?
首先需要了解一个概念:QPS
,即:Query Per Second
,每秒查询率。QPS
峰值可能在1000
以内,没有多少技术挑战性和含金量,属于重复性的CRUD
的体力活。一个高并发的系统面临的QPS
峰值可能在十万、百万、千万甚至上亿级别。
2. IO的底层原理
2.1 内核态和用户态
为了避免用户进程直接操作内核,保证内核安全,操作系统将内存(虚拟内存)划分为两部分:一部分是内核空间(Kernel-Space
),另一部分是用户空间(User-Space
)。
操作系统的核心是内核程序,既有权限访问受保护的内核空间,也有权限访问硬件设备;而普通应用程序(进程)运行在用户态,用户态进程不能访问内核空间中的数据,也不能直接调用内核函数,因此需要将进程切换到内核态才能进行系统调用。
用户态进程必须通过系统调用(System Call
)向内核发出指令,完成调用系统资源之类的操作。
2.2 read和write两大系统调用
用户程序进行IO
的读写依赖于底层的IO
读写,基本上会用到底层的read
和write
两大系统调用。
需要注意的是:read
系统调用并不是直接从物理设备把数据读取到应用的内存中,write
系统调用也不是直接把数据写入物理设备。
因为计算机的外部设备的直接读写涉及操作系统的中断。发生系统中断时,需要保存之前的进程数据和状态等信息,结束中断之后,还需要恢复之前的进程数据和状态等信息。为了减少底层系统的频繁中断所导致的时间损耗、性能损耗,出现了内核缓冲区。
也就是说,操作系统会对内核缓冲区进行监控,等待缓冲区达到一定数量的时候,再进行IO
设备的中断处理,集中执行物理设备的实际IO
操作,通过这种机制来提升系统的性能。至于具体什么时候执行系统中断(包括读中断、写中断)则由操作系统的内核来决定,应用程序不需要关心。
所以说,应用程序的IO
操作实际上不是物理设备级别的读写,而是缓存的复制。上层应用通过操作系统的read
系统调用把数据从内核缓冲区复制到应用程序的进程缓冲区,通过操作系统的write
系统调用把数据从应用程序的进程缓冲区复制到操作系统的内核缓冲区。而具体的底层读写交换操作是由操作系统内核(Kernel
)来完成。这两个系统调用的大致流程如图所示:
上面流程分为两个阶段:
- 第一个阶段,应用程序等待数据通过网络到达网卡,当所等待的分组到达时,数据被操作系统复制到内核缓冲区中。这个工作由操作系统自动完成,用户程序无感知。
- 第二个阶段,内核将数据从内核缓冲区复制到应用的用户缓冲区。
更具体来讲,其完整的流程如下:
- 客户端发送请求:
Java
客户端程序通过write
系统调用将数据复制到内核缓冲区,Linux
将内核缓冲区的请求数据通过客户端机器的网卡发送出去。 - 服务端获取请求:在服务端,这份请求数据会从接收网卡中读取到服务端机器的内核缓冲区。
Java
服务端程序通过read
系统调用从Linux
内核缓冲区读取数据,再送入应用程序的进程缓冲区。 - 服务端业务处理:服务器在自己的用户空间中完成客户端的请求所对应的业务处理。
- 服务端返回数据:处理完成后,构建好的响应数据将通过
write
系统调用,从用户缓冲区写入内核缓冲区。 - 发送给客户端:操作系统将内核缓冲区中的数据写入网卡,网卡通过底层的通信协议将数据发送给目标客户端。
2.3 四种主要的IO模型
常见的IO
模型有四种,分别为:
- 同步阻塞
IO
; - 同步非阻塞
IO
; -
IO
多路复用; - 异步
IO
;
首先需要了解两个概念:
- 同步与异步;可以将同步与异步看成发起
IO
请求的两种方式。同步IO
是指用户空间(进程或者线程)是主动发起IO
请求的一方,系统内核是被动接收方。异步IO
则反过来,系统内核是主动发起IO
请求的一方,用户空间是被动接收方。 - 阻塞与非阻塞;阻塞
IO
指的是需要内核IO
操作彻底完成后,才返回到用户空间执行用户程序的操作指令。阻塞是指用户进程(或者线程)一直在等待,而不能做别的事情;非阻塞是指用户进程(或者线程)获得内核返回的状态值就返回自己的空间,可以去做别的事情。
a). 同步阻塞IO(Blocking IO)
同步阻塞IO
(Blocking IO
)指的是用户空间(或者线程)主动发起,需要等待内核IO
操作彻底完成后才返回到用户空间的IO
操作。在IO
操作过程中,发起IO
请求的用户进程(或者线程)处于阻塞状态。
默认情况下,在Java
应用程序进程中所有对socket
连接进行的IO
操作都是同步阻塞IO
。
在阻塞式IO
模型中,从Java
应用程序发起IO
系统调用开始,一直到系统调用返回,这段时间内发起IO
请求的Java
进程(或者线程)是阻塞的。直到返回成功后,应用进程才能开始处理用户空间的缓冲区数据。
阻塞IO的优点是:应用程序开发非常简单;在阻塞等待数据期间,用户线程挂起,基本不会占用CPU
资源。
阻塞IO的缺点是:一般情况下会为每个连接配备一个独立的线程,一个线程维护一个连接的IO
操作。在并发量小的情况下,这样做没有什么问题。在高并发的应用场景下,阻塞IO
模型需要大量的线程来维护大量的网络连接,内存、线程切换开销会非常巨大,性能很低,基本上是不可用的。
因为在同步阻塞IO
中,每次都会为链接设备配置一个独立的线程,而服务器的资源却是有限的。所以为了解决这个问题,就引入了NIO
。
b). 同步非阻塞IO(Non-Blocking IO,NIO)
非阻塞IO
(Non-Blocking IO,NIO
)指的是用户空间的程序不需要等待内核IO
操作彻底完成,可以立即返回用户空间去执行后续的指令,即发起IO
请求的用户进程(或者线程)处于非阻塞状态,与此同时,内核会立即返回给用户一个IO
状态值。
同步非阻塞IO
指的是用户进程主动发起,不需要等待内核IO
操作彻底完成就能立即返回用户空间的IO
操作。在IO
操作过程中,发起IO
请求的用户进程(或者线程)处于非阻塞状态。
发起一个非阻塞socket
的read
操作的系统调用,流程如下:
- 1)在内核数据没有准备好的阶段,用户线程发起
IO
请求时立即返回。所以,为了读取最终的数据,用户进程(或者线程)需要不断地发起IO
系统调用。 - 2)内核数据到达后,用户进程(或者线程)发起系统调用,用户进程(或者线程)阻塞。内核开始复制数据,它会将数据从内核缓冲区复制到用户缓冲区,然后内核返回结果(例如返回复制到的用户缓冲区的字节数)。
- 3)用户进程(或者线程)读到数据后,才会解除阻塞状态,重新运行起来。也就是说,用户空间需要经过多次尝试才能保证最终真正读到数据,而后继续执行。
同步非阻塞IO
的特点是应用程序的线程需要不断地进行IO
系统调用,轮询数据是否已经准备好,如果没有准备好就继续轮询,直到完成IO
系统调用为止。
因此,同步非阻塞IO
的缺点是不断地轮询内核,这将占用大量的CPU
时间,效率低下。
总体来说,在高并发应用场景中,同步非阻塞IO
是性能很低的,也是基本不可用的,一般Web
服务器都不使用这种IO
模型。在Java
的实际开发中,不会涉及这种IO
模型。
c). IO多路复用(IO Multiplexing)
同样的,为了解决上面的轮询消耗大量CPU
资源的问题,引入了IO
多路复用。
为了提高性能,操作系统引入了一种新的系统调用,专门用于查询IO
文件描述符(含socket
连接)的就绪状态。
在Linux
系统中,新的系统调用为select/epoll
系统调用。通过该系统调用,一个用户进程(或者线程)可以监视多个文件描述符,一旦某个描述符就绪(一般是内核缓冲区可读/可写),内核就能够将文件描述符的就绪状态返回给用户进程(或者线程),用户空间可以根据文件描述符的就绪状态进行相应的IO
系统调用。
IO
多路复用(IO Multiplexing
)属于一种经典的Reactor
模式实现,有时也称为异步阻塞IO
,Java
中的Selector
属于这种模型。
发起一个多路复用IO
的read
操作的系统调用,流程如下:
- 1)选择器注册。首先,将需要
read
操作的目标文件描述符(socket
连接)提前注册到Linux
的select/epoll
选择器中,在Java
中所对应的选择器类是Selector
类。然后,开启整个IO
多路复用模型的轮询流程。 - 2)就绪状态的轮询。通过选择器的查询方法,查询所有提前注册过的目标文件描述符(
socket
连接)的IO
就绪状态。通过查询的系统调用,内核会返回一个就绪的socket
列表。当任何一个注册过的socket
中的数据准备好或者就绪了就说明内核缓冲区有数据了,内核将该socket
加入就绪的列表中,并且返回就绪事件。 - 3)用户线程获得了就绪状态的列表后,根据其中的
socket
连接发起read
系统调用,用户线程阻塞。内核开始复制数据,将数据从内核缓冲区复制到用户缓冲区。 - 4)复制完成后,内核返回结果,用户线程才会解除阻塞的状态,用户线程读取到了数据,继续执行。
IO
多路复用模型的优点是一个选择器查询线程可以同时处理成千上万的网络连接,所以用户程序不必创建大量的线程,也不必维护这些线程,从而大大减少了系统的开销。与一个线程维护一个连接的阻塞IO
模式相比,这一点是IO
多路复用模型的最大优势。
IO
多路复用模型的缺点是,本质上select/epoll
系统调用是阻塞式的,属于同步IO
,需要在读写事件就绪后由系统调用本身负责读写,也就是说这个读写过程是阻塞的。要彻底地解除线程的阻塞,就必须使用异步IO
模型。
d). 异步IO(Asynchronous IO,AIO)
在IO
多路复用中提到,因为select/epoll
这个过程进行查询进程的就绪状态的时候,其实还是一个轮询的过程,而这个过程是阻塞的,所以也就是每次轮询的过程也需要等待结果返回。所以这里其实也就带来了性能上的开销。故而期望内核可以直接通知用户程序,进而引入了AIO
。
异步IO
(Asynchronous IO,AIO
)指的是用户空间的线程变成被动接收者,而内核空间成为主动调用者。在异步IO
模型中,当用户线程收到通知时,数据已经被内核读取完毕并放在了用户缓冲区内,内核在IO
完成后通知用户线程直接使用即可。
理论上来说,异步IO
是真正的异步输入输出,它的吞吐量高于IO
多路复用模型的吞吐量。但JDK
对它的支持目前并不完善,因此异步IO
在性能上没有明显的优势。且,目前这类高并发网络应用程序的开发大多采用IO
多路复用模型。
2.4 并发连接配置
a). Linux操作系统中文件句柄数的限制
在生产环境Linux
系统中,基本上都需要解除文件句柄数的限制。在Linux
中一个进程最多可以接受1024
个socket
连接,这是远远不够的。
文件句柄也叫文件描述符。文件描述符(File Descriptor
)是内核为了高效管理已被打开的文件所创建的索引,是一个非负整数(通常是小整数),用于指代被打开的文件。所有的IO
系统调用(包括socket
的读写调用)都是通过文件描述符完成的。
在Linux下,通过ulimit -n
可以用来查看一个进程能够打开的最大文件句柄数目,通过ulimit -n number
来调整这个系统参数:
ulimit -n 10000000
使用ulimit
命令有一个缺陷,即该命令只能修改当前用户环境的一些基础限制,仅在当前用户环境有效。如果想永久地把最大文件描述符数量值保存下来,可以编辑/etc/rc.local
开机启动文件,在文件中添加如下内容:
ulimit -SHn 10000000
3. BIO案例(同步阻塞IO,Blocking IO)
创建ServerSocket
作为服务器端,使用BIO
的方式为每个连接分配一个线程。调用accept
方法监听Client
的请求,并对hello!
数据做出响应,返回Hello Client!
字符串:
public class BIOServer {
@Test
public void startBIOServer(){
BIOServer bioServer = new BIOServer();
try {
bioServer.createServer(5000);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 创建服务端 ServerSocket
* @param port 运行端口
* @throws Exception 异常
*/
public void createServer(int port) throws Exception{
System.out.println("服务端创建成功...");
ServerSocket serverSocket = new ServerSocket(port);
// 服务器可以一直存在
while(true){
Socket accept = serverSocket.accept();
System.out.println("客户端建立连接成功...");
handleMessage(accept);
}
}
/**
* 处理从客户端发来的消息
* @param accept Socket连接对象
*/
private void handleMessage(Socket accept) {
byte[] bytes = new byte[1024];
int len = -1;
InputStream inputStream = null;
OutputStream outputStream = null;
try {
inputStream = accept.getInputStream();
// 这里read方法会阻塞在这里,所以这里不能写在外部,除非自己加一个时间判断
StringBuilder stringBuilder = new StringBuilder();
long lastTime = System.currentTimeMillis();
while((len = inputStream.read(bytes)) != -1) {
String msg = new String(bytes, 0, len);
stringBuilder.append(msg);
if (System.currentTimeMillis() < 500) {
lastTime = System.currentTimeMillis();
} else {
break;
}
}
String msg = stringBuilder.toString();
System.out.println("服务器端收到消息:" + msg);
// 消息处理
responseMessage(accept, msg);
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
accept.close();
if(inputStream != null) {
inputStream.close();
}
System.out.println("客户端断开连接.");
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 返回客户端信息
* @param accept Socket连接
* @param msg 客户端发来的消息
* @throws IOException 异常
*/
private void responseMessage(Socket accept, String msg) throws IOException {
OutputStream outputStream = null;
if(msg.equals("hello!")){
outputStream = accept.getOutputStream();
outputStream.write("Hello Client!".getBytes());
}
if(outputStream != null) outputStream.close();
}
}
这里需要注意的是inputStream.read(bytes)
会一直阻塞在这里,等待客户端传入更多的数据。所以这里我简单做了一个超时的判断,使得程序可以正常响应。
对于客户端,比较简单,建立Socket
连接即可:
/**
* 客户端类
* @author 梦否
* 2021-12-21
*/
public class BIOClient {
@Test
public void startBIOClient(){
String message = "hello!";
Socket socket = null;
try {
socket = new Socket("127.0.0.1", 5000);
// 发送消息
sendMessage(socket, message);
// 接受消息
String msg = handlerMessage(socket);
System.out.println("收到消息:" + msg);
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if(socket != null) socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 接受服务器返回的消息
* @param socket socket连接
* @return 从服务器返回的消息对象
* @throws IOException 异常
*/
private String handlerMessage(Socket socket) throws IOException {
InputStream inputStream = socket.getInputStream();
int len = -1;
byte[] bytes = new byte[1024];
StringBuilder stringBuffer = new StringBuilder();
while((len = inputStream.read(bytes)) != -1) {
String msg = new String(bytes, 0, len);
stringBuffer.append(msg);
}
return stringBuffer.toString();
}
/**
* 发送消息
* @param socket socket连接
* @param message 待发送的消息
* @throws IOException 异常
*/
private void sendMessage(Socket socket, String message) throws IOException {
System.out.println("发送消息:" + message);
OutputStream outputStream = socket.getOutputStream();
outputStream.write(message.getBytes());
// 刷新outputStream
outputStream.flush();
}
}
测试结果: