从BIO到多路复用

先说明几个概念:
• 阻塞:等待不到预期的事件发生时,代码就不会继续执行。
• 非阻塞:等待不到预期的事件发生时,也不会阻止代码继续执行,而是直接返回方法,比如-1。
• 同步:app自己去内核的buffer读取
• 异步:内核直接帮忙把数据写到app的对应区域

一、 BIO

BIO,即blocking io,是最原始的IO模型。在早期的linux内核中,网络IO中的两个系统调用是阻塞的,这就导致了整个IO模型都是阻塞的。
一个是accpet(),另一个是read(),在IO过程中,如果等待不到客户端连接或者缓冲区里没有可读的数据时候,都会阻塞住。

public static void main(String[] args) throws Exception {
        //打开一个socket,绑定到9090端口,并开启监听 socket-->bind-->listen
        ServerSocket server = new ServerSocket(9090);
        System.out.println("server start--------------------------");
        //开启一个监听端口后,开始处理连接
        while (true) {
            //接受一个连接,并为这个连接开启一个socket fd,fd在内核中会有一个Buffer,后续就是从这个buffer中进行读写  accept()系统调用是阻塞的
            Socket client = server.accept(); //阻塞
            System.out.println("get client, port = " + client.getPort());
            //拿到客户端连接后,放到一个新的线程去处理
            new Thread(() -> {
                InputStream in = null;
                try {
                    in = client.getInputStream();
                    BufferedReader reader = new BufferedReader(new InputStreamReader(in));
                    while (true) {
                        //阻塞住,会一直等着数据来
                        String s = reader.readLine();
                        if (null != s) {
                            System.out.println(s);
                            OutputStream out = client.getOutputStream();
                            BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(out));
                            writer.write(s);
                            writer.flush();
                        } else {
                            client.close();
                            System.out.println("client close");
                            break;
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                } finally {
                    try {
                        client.close();
                    } catch (IOException ex) {
                        ex.printStackTrace();
                    }
                }
            }).start();
        }
    }

如以上代码所示,由于read()系统调用是阻塞的,所以在accept接受一个客户端连接后,必须新开启一个线程来处理这个客户端连接的所有事件,不然代码就阻塞在那里,不能继续往下执行,也就不能接收新的客户端连接了。

二、NIO

由于BIO是阻塞的,所以一个线程只能服务于一个客户端,随着网络的发展,客户端越来越多,这样的模型受到了硬件的限制,硬件资源是有限的,所以开启的线程有限,而且随着线程的增多,上下文切换的开销也巨大,所以出现了NIO,non-blocking I/O(非阻塞IO),来解决以上问题。
非阻塞IO中,accept(),接受连接这一步,以及从buffer读取数据,read()这一步,都变成了非阻塞的。当没有连接或者数据到达时候,就直接返回-1,然后继续执行后面的代码。这样的好处就是一个线程可以处理多个客户端了,解决了BIO的弊端。

public class NIOTest {
    private static List<SocketChannel> clients = new LinkedList<>();
    public static void main(String[] args) throws Exception {
        //在内核开启一个socket
        ServerSocketChannel server = ServerSocketChannel.open();
        System.out.println("server start-----------------");
        //设置为非阻塞
        server.configureBlocking(false);
        //绑定到9090端口
        server.bind(new InetSocketAddress(9090));
        while (true) {
            //通过accept调用接受一个客户端,由于前面配置了非阻塞,所以这儿不会阻塞
            SocketChannel client = server.accept();
            //如果client不为空,添加到clients的集合中。null就对应系统调用accept返回-1的情况
            if (null != client) {
                System.out.println("receive cli , port = " + client.getRemoteAddress());
                //客户端也设置成非阻塞
                client.configureBlocking(false);
                clients.add(client);
            }
            ByteBuffer buffer = ByteBuffer.allocateDirect(4096);
            clients.forEach(cli -> {
                try {
                    int num = cli.read(buffer);
                    if (num > 0) {
                        //由于刚刚是写模式,所以要翻转一下,改为读模式
                        buffer.flip();
                        byte[] bytes = new byte[buffer.limit()];
                        buffer.get(bytes);
                        System.out.println(cli.getRemoteAddress() + "--------------" + new String(bytes));
                        //再翻转,改为写模式,把收到的数据写回客户端
                        buffer.flip();
                        cli.write(buffer);
                        buffer.clear();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            });
        }
    }
}

如上代码所示,只用了一个线程就可以服务多个客户端,大大节省了系统资源。
但是这样的模型也有问题,随着客户端的增加,每次遍历的clients数量也增加,当clients数量多到一定程度时候,遍历的时间很长,每个客户端等待响应的时间也就变长了。

三、多路复用

NIO的弊端是每次都需要自己去轮询所有已经连接的客户端,看是否有事件发生,然后做处理。比如连接了10w个客户端,但是只有一个客户端有事件发生,我们任然需要去遍历10w个客户端才行,这样就浪费了很多系统资源。
多路复用机制解决了以上问题。linux内核提供了多路复用的系统调用(select/poll),select是posix的标准接口,所有操作系统都会实现,而poll是linux独有的,两个系统调用的定义如下:
int select(int nfds, fd_set *readfds, fd_set *writefds,fd_set *exceptfds, struct timeval *timeout);
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
select中的fd数量限制了为1024,而Poll则没有。两者就只有这点区别。
这两个系统调用实现了多路复用,但是也有缺点,那就是每次调用时候,都必须把注册到多路复用器里面的所有fd从用户空间传入内核空间,而select()调用又会被重复调起,这就导致浪费了很多系统资源!!!!!
为了解决上面说到的问题,推出了epoll().epoll系统调用是对poll的改进!!!epoll提供了三个系统调用:
• epoll_create(int size)。这个系统调用的作用是在内核开启一个epoll.注意:size参数只是告诉内核这个 epoll对象会处理的事件大致数目,而不是能够处理的事件的最大个数。在 Linux最新的一些内核版本的实现中,这个 size参数没有任何意义。
• int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);这个接口主要是对fd进行管理,比如把某个fd注册到epoll中,并关注读事件。
• int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);这个接口的作用类似select(),作用都是返回事件集。
epoll对于poll的改进思路主要如下:
首先要知道,不管有没有多路复用器,当有数据到达网卡时候,内核都会知道!!!
epoll在内核中维护了一颗红黑树来保存所有监控的fd,通过epoll_ctl调用对这棵树进行增删。除了这棵树以外,epoll还维护了一个set,
当监控的fd所持有的buffer有事件发生时候,内核就会收到中断,从而得知这个消息。然后把有事件发生的fd和对应的事件复制一份,放到上面提到的Set中,当用户调用epoll_wait时候,内核就直接把这个Set返回,这样就不用像Poll一样重复传递fd了!!!

public class MultiplexingSingleThread {
    public static void main(String[] args) throws Exception {
        //开启一个socket,设置为Listen状态
        ServerSocketChannel server = ServerSocketChannel.open();
        //设置非阻塞
        server.configureBlocking(false);
        //绑定到9090端口
        server.bind(new InetSocketAddress(9090));
        System.out.println("server start------------------------");
        //开启一个selector
        Selector selector = Selector.open();
        //把server注册到selector中,并关注连接事件
        server.register(selector, SelectionKey.OP_ACCEPT);
        while (true) {
            //轮询selector,看是否是事件发生
            int num = selector.select();
            System.out.println(num + "keys---------------------");
            if (num > 0) {
                //如果大于0,说明有事件发生
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iter = selectionKeys.iterator();
                while (iter.hasNext()) {
                    //拿到事件
                    SelectionKey key = iter.next();
                    //从当前事件集中移除已经拿到的事件,避免重复处理
                    iter.remove();
                    //如果是连接事件
                    if (key.isAcceptable()) {
                        SocketChannel client = server.accept();
                        //设置客户端为非阻塞
                        client.configureBlocking(false);
                        //把客户端注册到selector,并关心读事件,这儿不能注册写事件,因为多路复用器关心的是能不能读或者能不能写,
                        // 而只要send-queue没满,就是可写的状态,所以如果在这儿关心了写事件,那多路复用器就一定会返回写事件
                        ByteBuffer buffer = ByteBuffer.allocate(1024);
                        client.register(selector,SelectionKey.OP_READ,buffer);
                        System.out.println("accept client--------" + client.getRemoteAddress());
                    } else if (key.isReadable()) {
                        System.out.println("enter write");
                        //拿到客户端
                        SocketChannel client = (SocketChannel) key.channel();
                        ByteBuffer buffer = (ByteBuffer) key.attachment();
                        buffer.clear();
                        while (true) {
                            //读buffer
                            int read = client.read(buffer);
                            if (read > 0) {
                                //翻转buffer为读模式
                                buffer.flip();
                                byte[] bytes = new byte[buffer.limit()];
                                //这儿不清除buffer,为后面的写做准备
                                buffer.get(bytes);
                                System.out.println(client.getRemoteAddress() + "-------------" + new String(bytes));
                                //读完buffer后,再注册写事件,这儿注册后,下次再调select就一定会返回写事件
                                client.register(selector, SelectionKey.OP_WRITE,buffer);
                            } else if (read == 0) {
                                break;
                            } else {
                                client.close();
                            }
                        }
                    }else if (key.isWritable()) {
                        System.out.println("enter write----------");
                        SocketChannel client = (SocketChannel) key.channel();
                        //拿到内核分配给这个client fd的buffer
                        ByteBuffer buffer = (ByteBuffer) key.attachment();
                        //这个Buffer在刚处理读事件时候翻转为读模式了,所以再翻转成写模式
                        buffer.flip();
                        client.write(buffer);
                        buffer.flip();
                        buffer.clear();
                        key.cancel();
                    } else {
                        break;
                    }
                }
            }
        }
    }
}

注意:多路复用器监控的是fd的可读或者可写或可连接的状态!!!!!!!比如,如果关心了写事件,如果操作系统的send-queue为空,那么就是可写状态!!!!!!!!!!!再比如,如果buffer里有数据,那就一定会提醒有读事件!!!
基于上面这个特性,也就导致了多路复用模型不能用一个线程来接收,开启新的线程来处理读或者写事件!!!因为如果开启一个线程来处理读事件,那么在新线程读完buffer之前,都不能执行epoll_wait来取事件集,因为此时buffer还没读完,那epoll就会认为它是可读状态,就不会把它从epoll维护的事件集set中移除,再调用epoll_wait,这个事件就会被重复返回到用户空间!!!!
所以,多路复用器都最好在单线程中执行,一个线程对应一个epoll,线程内线性的去处理事件集!!!然后开启多个epoll!!!
每个线程里处理的工作都是:(select() -->处理selectKeys---->处理其他任务,这样循环下去)每个线程其实就是一个EventLoop!!
为了把IO和业务分开,所以又把selector分为两个组,一个组叫BossGroup,一个组叫workerGroup。boss负责接收新的连接,worker负责处理业务,这就是netty的思路。
如果程序需要关注写事件,那就必须要写完立马把写事件从多路复用器移除。再写的时候,再添加,这样就会导致重复吊起epoll_ctl这个系统调用。

上一篇:tw文化面


下一篇:python之模拟io模式