先说明几个概念:
• 阻塞:等待不到预期的事件发生时,代码就不会继续执行。
• 非阻塞:等待不到预期的事件发生时,也不会阻止代码继续执行,而是直接返回方法,比如-1。
• 同步:app自己去内核的buffer读取
• 异步:内核直接帮忙把数据写到app的对应区域
一、 BIO
BIO,即blocking io,是最原始的IO模型。在早期的linux内核中,网络IO中的两个系统调用是阻塞的,这就导致了整个IO模型都是阻塞的。
一个是accpet(),另一个是read(),在IO过程中,如果等待不到客户端连接或者缓冲区里没有可读的数据时候,都会阻塞住。
public static void main(String[] args) throws Exception {
//打开一个socket,绑定到9090端口,并开启监听 socket-->bind-->listen
ServerSocket server = new ServerSocket(9090);
System.out.println("server start--------------------------");
//开启一个监听端口后,开始处理连接
while (true) {
//接受一个连接,并为这个连接开启一个socket fd,fd在内核中会有一个Buffer,后续就是从这个buffer中进行读写 accept()系统调用是阻塞的
Socket client = server.accept(); //阻塞
System.out.println("get client, port = " + client.getPort());
//拿到客户端连接后,放到一个新的线程去处理
new Thread(() -> {
InputStream in = null;
try {
in = client.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(in));
while (true) {
//阻塞住,会一直等着数据来
String s = reader.readLine();
if (null != s) {
System.out.println(s);
OutputStream out = client.getOutputStream();
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(out));
writer.write(s);
writer.flush();
} else {
client.close();
System.out.println("client close");
break;
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
client.close();
} catch (IOException ex) {
ex.printStackTrace();
}
}
}).start();
}
}
如以上代码所示,由于read()系统调用是阻塞的,所以在accept接受一个客户端连接后,必须新开启一个线程来处理这个客户端连接的所有事件,不然代码就阻塞在那里,不能继续往下执行,也就不能接收新的客户端连接了。
二、NIO
由于BIO是阻塞的,所以一个线程只能服务于一个客户端,随着网络的发展,客户端越来越多,这样的模型受到了硬件的限制,硬件资源是有限的,所以开启的线程有限,而且随着线程的增多,上下文切换的开销也巨大,所以出现了NIO,non-blocking I/O(非阻塞IO),来解决以上问题。
非阻塞IO中,accept(),接受连接这一步,以及从buffer读取数据,read()这一步,都变成了非阻塞的。当没有连接或者数据到达时候,就直接返回-1,然后继续执行后面的代码。这样的好处就是一个线程可以处理多个客户端了,解决了BIO的弊端。
public class NIOTest {
private static List<SocketChannel> clients = new LinkedList<>();
public static void main(String[] args) throws Exception {
//在内核开启一个socket
ServerSocketChannel server = ServerSocketChannel.open();
System.out.println("server start-----------------");
//设置为非阻塞
server.configureBlocking(false);
//绑定到9090端口
server.bind(new InetSocketAddress(9090));
while (true) {
//通过accept调用接受一个客户端,由于前面配置了非阻塞,所以这儿不会阻塞
SocketChannel client = server.accept();
//如果client不为空,添加到clients的集合中。null就对应系统调用accept返回-1的情况
if (null != client) {
System.out.println("receive cli , port = " + client.getRemoteAddress());
//客户端也设置成非阻塞
client.configureBlocking(false);
clients.add(client);
}
ByteBuffer buffer = ByteBuffer.allocateDirect(4096);
clients.forEach(cli -> {
try {
int num = cli.read(buffer);
if (num > 0) {
//由于刚刚是写模式,所以要翻转一下,改为读模式
buffer.flip();
byte[] bytes = new byte[buffer.limit()];
buffer.get(bytes);
System.out.println(cli.getRemoteAddress() + "--------------" + new String(bytes));
//再翻转,改为写模式,把收到的数据写回客户端
buffer.flip();
cli.write(buffer);
buffer.clear();
}
} catch (IOException e) {
e.printStackTrace();
}
});
}
}
}
如上代码所示,只用了一个线程就可以服务多个客户端,大大节省了系统资源。
但是这样的模型也有问题,随着客户端的增加,每次遍历的clients数量也增加,当clients数量多到一定程度时候,遍历的时间很长,每个客户端等待响应的时间也就变长了。
三、多路复用
NIO的弊端是每次都需要自己去轮询所有已经连接的客户端,看是否有事件发生,然后做处理。比如连接了10w个客户端,但是只有一个客户端有事件发生,我们任然需要去遍历10w个客户端才行,这样就浪费了很多系统资源。
多路复用机制解决了以上问题。linux内核提供了多路复用的系统调用(select/poll),select是posix的标准接口,所有操作系统都会实现,而poll是linux独有的,两个系统调用的定义如下:
int select(int nfds, fd_set *readfds, fd_set *writefds,fd_set *exceptfds, struct timeval *timeout);
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
select中的fd数量限制了为1024,而Poll则没有。两者就只有这点区别。
这两个系统调用实现了多路复用,但是也有缺点,那就是每次调用时候,都必须把注册到多路复用器里面的所有fd从用户空间传入内核空间,而select()调用又会被重复调起,这就导致浪费了很多系统资源!!!!!
为了解决上面说到的问题,推出了epoll().epoll系统调用是对poll的改进!!!epoll提供了三个系统调用:
• epoll_create(int size)。这个系统调用的作用是在内核开启一个epoll.注意:size参数只是告诉内核这个 epoll对象会处理的事件大致数目,而不是能够处理的事件的最大个数。在 Linux最新的一些内核版本的实现中,这个 size参数没有任何意义。
• int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);这个接口主要是对fd进行管理,比如把某个fd注册到epoll中,并关注读事件。
• int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);这个接口的作用类似select(),作用都是返回事件集。
epoll对于poll的改进思路主要如下:
首先要知道,不管有没有多路复用器,当有数据到达网卡时候,内核都会知道!!!
epoll在内核中维护了一颗红黑树来保存所有监控的fd,通过epoll_ctl调用对这棵树进行增删。除了这棵树以外,epoll还维护了一个set,
当监控的fd所持有的buffer有事件发生时候,内核就会收到中断,从而得知这个消息。然后把有事件发生的fd和对应的事件复制一份,放到上面提到的Set中,当用户调用epoll_wait时候,内核就直接把这个Set返回,这样就不用像Poll一样重复传递fd了!!!
public class MultiplexingSingleThread {
public static void main(String[] args) throws Exception {
//开启一个socket,设置为Listen状态
ServerSocketChannel server = ServerSocketChannel.open();
//设置非阻塞
server.configureBlocking(false);
//绑定到9090端口
server.bind(new InetSocketAddress(9090));
System.out.println("server start------------------------");
//开启一个selector
Selector selector = Selector.open();
//把server注册到selector中,并关注连接事件
server.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
//轮询selector,看是否是事件发生
int num = selector.select();
System.out.println(num + "keys---------------------");
if (num > 0) {
//如果大于0,说明有事件发生
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iter = selectionKeys.iterator();
while (iter.hasNext()) {
//拿到事件
SelectionKey key = iter.next();
//从当前事件集中移除已经拿到的事件,避免重复处理
iter.remove();
//如果是连接事件
if (key.isAcceptable()) {
SocketChannel client = server.accept();
//设置客户端为非阻塞
client.configureBlocking(false);
//把客户端注册到selector,并关心读事件,这儿不能注册写事件,因为多路复用器关心的是能不能读或者能不能写,
// 而只要send-queue没满,就是可写的状态,所以如果在这儿关心了写事件,那多路复用器就一定会返回写事件
ByteBuffer buffer = ByteBuffer.allocate(1024);
client.register(selector,SelectionKey.OP_READ,buffer);
System.out.println("accept client--------" + client.getRemoteAddress());
} else if (key.isReadable()) {
System.out.println("enter write");
//拿到客户端
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer buffer = (ByteBuffer) key.attachment();
buffer.clear();
while (true) {
//读buffer
int read = client.read(buffer);
if (read > 0) {
//翻转buffer为读模式
buffer.flip();
byte[] bytes = new byte[buffer.limit()];
//这儿不清除buffer,为后面的写做准备
buffer.get(bytes);
System.out.println(client.getRemoteAddress() + "-------------" + new String(bytes));
//读完buffer后,再注册写事件,这儿注册后,下次再调select就一定会返回写事件
client.register(selector, SelectionKey.OP_WRITE,buffer);
} else if (read == 0) {
break;
} else {
client.close();
}
}
}else if (key.isWritable()) {
System.out.println("enter write----------");
SocketChannel client = (SocketChannel) key.channel();
//拿到内核分配给这个client fd的buffer
ByteBuffer buffer = (ByteBuffer) key.attachment();
//这个Buffer在刚处理读事件时候翻转为读模式了,所以再翻转成写模式
buffer.flip();
client.write(buffer);
buffer.flip();
buffer.clear();
key.cancel();
} else {
break;
}
}
}
}
}
}
注意:多路复用器监控的是fd的可读或者可写或可连接的状态!!!!!!!比如,如果关心了写事件,如果操作系统的send-queue为空,那么就是可写状态!!!!!!!!!!!再比如,如果buffer里有数据,那就一定会提醒有读事件!!!
基于上面这个特性,也就导致了多路复用模型不能用一个线程来接收,开启新的线程来处理读或者写事件!!!因为如果开启一个线程来处理读事件,那么在新线程读完buffer之前,都不能执行epoll_wait来取事件集,因为此时buffer还没读完,那epoll就会认为它是可读状态,就不会把它从epoll维护的事件集set中移除,再调用epoll_wait,这个事件就会被重复返回到用户空间!!!!
所以,多路复用器都最好在单线程中执行,一个线程对应一个epoll,线程内线性的去处理事件集!!!然后开启多个epoll!!!
每个线程里处理的工作都是:(select() -->处理selectKeys---->处理其他任务,这样循环下去)每个线程其实就是一个EventLoop!!
为了把IO和业务分开,所以又把selector分为两个组,一个组叫BossGroup,一个组叫workerGroup。boss负责接收新的连接,worker负责处理业务,这就是netty的思路。
如果程序需要关注写事件,那就必须要写完立马把写事件从多路复用器移除。再写的时候,再添加,这样就会导致重复吊起epoll_ctl这个系统调用。