java的nio 之 select,poll和epoll

我不生产知识,我只是知识的搬运工。努力通过实践与各位博友交流一些自己的见解。

 

引文:

   由于cpu和磁盘等存储设备的处理速度的差异,巧妙的io设计能够极大的提升工作效率。从硬件设计角度包括 SPOOLING(假脱机)技术(实现独占设备的共享),DMA(通过中断的方式实现内存到磁盘的传输通道)大大降低了io传输到cpu的调用和阻塞,通道IO(有自己的指令和程序,相比DMA有更强的独立处理数据能力。并且可以控制多台同类或不同类的设备)。————来自王道考研操作系统

  总结:硬件实现cpu与磁盘的尽可能独立运行,磁盘读取尽可能少的通过中断程序来获取cpu的执行权。

  解决了单个IO的CPU和磁盘独立运行,我们来看下多个IO连接时,操作系统如何优化? 也就是多个io链接如何管理的问题。IO作为计算机的核心功能,用户只能通过系统调用实现用户态到内核态的切换来读写磁盘数据。传统io 每建立一个io链接就要新建一个线程阻塞在当前操作中,在IO密集型任务中会大大降低CPU的利用率。通过IO复用监听多个文件描述符来提升程序的性能。

  注意:IO复用虽然能同时监听多个文件描述符,但它本身是阻塞的。并且当多个文件描述符同时就绪时,如果不采取额外的措施,程序就只能按顺序依次处理其中的每一个文件描述符,这使得程序看起来就像是串行工作。如果要实现并发,只能使用多线程和多进程等编程手段。————来自Linux高性能服务器编程

 

 

理论知识:

select,poll和epoll的区别
系统调用 select poll  epoll
事件集合  用户通过3个参数分别传入感兴趣的可读,可写和异常等事件,内核通过对这些参数的在线修改来反馈其中的就绪事件。这使得用户每次调用select都要重置这3个参数 统一处理所有事件类型,因此只需一个事件集参数。用户通过pollfd.events传入感兴趣的事件,内核通过修改polld.revents反馈其中就绪的事件 内核通过一个时间表直接管理用户感兴趣的所有事件。因此每次调用epoll_wait时,无需反复传入用户感兴趣的事件。epoll_wait系统调用的参数events仅用来反馈就绪的事件
应用程序索引就绪文件描述符的事件复杂度 O(n) O(n) O(1)
最大支持文件描述符数   有最大值限制(内核默认值为1024) 65535 65535
工作模式 LT LT 支持ET高效模式
内核实现和工作效率 采用轮询方法来检测就绪事件,算法事件复杂度为O(n) 采用轮询方法来检测就绪事件,算法事件复杂度为O(n) 采用回调方法来检测就绪事件,算法事件复杂度为O(n)

 

 系统调用API的演进路线:select————》poll(事件处理统一,编程接口更简洁。不需要重置参数)————》epoll(使用回调替换轮询机制,降低事件复杂度为O(1))

注意:当活动链接比较多的时候,epoll_wait的效率未必比selelct和poll高,因为此时回调函数出发得过于频繁。所有epoll_wait适用于连接数量多,但活动链接少的情况。

 

实践代码:

聊天室程序:

服务端:

 

java的nio 之 select,poll和epoll
/**
 * @author: ljf
 * @date: 2020/12/29 19:25
 * @description: 聊天室服务端
 * 功能:
 * 1.数据转发broadcast
 * 功能推导类属性:
 * 1.userNames
 * 2.userThreads
 * 3.port 监听端口
 * <p>
 * accept阻塞监听客户端的链接,将链接添加到userNames,userThreads并启动用户线程接收数据
 * 服务端的userThread只用来转发数据,read and write
 * TODO:多个客户端同时退出的并发问题, 客户端正常段开时给服务器能够检测到。同时给其他客户端发信息
 * socket全双工通信
 * @modified By:
 * @version: $ 1.0
 */
public class ChatServer {
    private final int port;
    private final HashSet<String> userNames = new HashSet<>();
    private final HashSet<UserThread> userThreads = new HashSet<>();
    public AtomicInteger connectedCount = new AtomicInteger(0);

    public ChatServer(int port) {
        this.port = port;
    }

    public void execute() {
        //监听端口
        try (ServerSocket serverSocket = new ServerSocket(port)) {
            System.out.println("chat server listening on port:" + port);
            while (true) {
                //创建服务端进程,accept阻塞监听
                Socket clientSocket = serverSocket.accept();

                UserThread newUser = new UserThread(clientSocket, this);
                userThreads.add(newUser);
                newUser.start();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        int port = 12345;
        ChatServer chatServer = new ChatServer(port);
        chatServer.execute();
    }

    public void broadMessage(String serverMessage, UserThread userThread) {
        for (UserThread user : userThreads) {
            if (user != null && user != userThread) { //除当前用户
                user.sendMessage(serverMessage);
            }
        }
    }

    public Set<String> getUserNames() {
        return this.userNames;
    }

    /**
     * 删除用户,删除userName和userThread
     */
    public void removeUser(String user, UserThread userThread) {
        boolean removed = userNames.remove(user);
        if (removed) {
            userThreads.remove(userThread);
            System.out.println(user + " quit group chat");
        }
    }

    public boolean hasUsers() {
        return !this.userNames.isEmpty();
    }

    public void addUserName(String userName) {
        this.userNames.add(userName);
    }
}
View Code

 

服务端每创建一个socket链接,就创建一个用户线程:

java的nio 之 select,poll和epoll
import java.io.*;
import java.net.Socket;

/**
 * @author: ljf
 * @date: 2020/12/29 19:34
 * @description: 服务端socket线程,用来向客户端转发消息
 * @modified By:
 * @version: $ 1.0
 */
public class UserThread extends Thread {
    private final Socket clientSocket;
    private PrintWriter printWriter;
    private String userName; //客户端创建时输入用户名,网络传输获取
    private final ChatServer chatServer;

    public UserThread(Socket clientSocket, ChatServer chatServer) {
        this.clientSocket = clientSocket;
        this.chatServer = chatServer;
    }

    public String getUserName() {
        return this.userName;
    }

    @Override
    public void run() {
        String serverMessage;
        try {
            OutputStream outputStream = clientSocket.getOutputStream();
            printWriter = new PrintWriter(outputStream,true);

            //阻塞监听客户端发来的消息,然后转发给其他客户端
            InputStream inputStream = clientSocket.getInputStream();
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));

            printUsers();
            //首条消息是 客户端姓名
            userName = bufferedReader.readLine();
            chatServer.addUserName(userName);
            System.out.println("connectedCount: " + chatServer.connectedCount.getAndIncrement()
                    + " new user connected: " + userName);

            serverMessage = "new user " + userName + " connected";
            chatServer.broadMessage(serverMessage, this);

            //read 阻塞,直到客户端发来"bye"消息,断开连接
            String clientMessage;
            while (!(clientMessage = bufferedReader.readLine()).equals("bye")) {
                //拼接上当前socket的用户,转发给其他用户
                serverMessage = "[" + userName + "]: " + clientMessage;
                chatServer.broadMessage(serverMessage, this);
            }

        } catch (IOException e) {
//            e.printStackTrace();
            System.err.println(e.getMessage());
        } finally {
            //与客户端socket断开连接
            chatServer.removeUser(userName, this);
            if(clientSocket!=null && clientSocket.isConnected()){
                try {
                    clientSocket.shutdownOutput();//立即关闭输出流
                    clientSocket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }

            //转发我离开的消息
            serverMessage = userName + " has quited";
            chatServer.broadMessage(serverMessage, this);
        }
    }

    /**
     * 向新链接的客户端发送当前服务器的用户列表
     */
    public void printUsers() {
        if (chatServer.hasUsers()) {
            printWriter.println("connected users: " + chatServer.getUserNames());
        } else {
            printWriter.println("no other users connected");
        }
    }

    /**
     * 向客户端发送消息
     *
     * @param message:消息内容
     */
    public void sendMessage(String message) {
        printWriter.println(message);
    }
}
View Code

 

客户端:

public class ChatClient {
    private volatile String userName;
    private final String hostName;
    private final int port;
    private volatile boolean closed = false;

    public ChatClient(String hostName,int port){
        this.hostName = hostName;
        this.port = port;
    }

    public static void main(String[] args) {
        String hostName = "localhost";
        int port =  12345;

        ChatClient chatClient = new ChatClient(hostName, port);
        chatClient.execute();
    }
    /**
     * 与服务端建立连接
     */
    public void execute(){
        try {
            //必须输入用户名字后才能创建socket
            Scanner scanner = new Scanner(System.in);
            System.out.print("\nEnter your name: ");
            userName = scanner.nextLine();

            Socket clientSocket = new Socket(hostName, port);
            System.out.println("connected to chat server");
            new ReadThread(clientSocket,this).start();
            new WriteThread(clientSocket,this).start();

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void setClosed(){
        closed = true;
    }
    public boolean isClosed(){
        return closed;
    }

    public String getUserName() {
        return userName;
    }
}

客户端socket读线程

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.Socket;
import java.net.SocketException;

/**
 * @author: ljf
 * @date: 2020/12/29 20:55
 * @description:
 * @modified By:
 * @version: $ 1.0
 */
public class ReadThread extends Thread{
    private final ChatClient chatClient;
    private final Socket clientSocket;

    public ReadThread(Socket clientSocket,ChatClient chatClient){
        this.chatClient = chatClient;
        this.clientSocket = clientSocket;
    }

    @Override
    public void run() {
        while (!chatClient.isClosed()) {
            try {
                InputStream inputStream = clientSocket.getInputStream();
                BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
                String response = bufferedReader.readLine();
                System.out.println("\n" + response);

                // prints the username after displaying the server's message
                if (chatClient.getUserName() != null) {
                    System.out.print("[" + chatClient.getUserName() + "]: ");
                }
            } catch (SocketException se){ //TODO:正常退出替代这里
                System.out.println("quit");
                System.err.println(se.getMessage());
                break;
            } catch (IOException ex) {
                System.out.println("Error reading from server: " + ex.getMessage());
                ex.printStackTrace();
                break;
            }
        }
    }
}

客户端socket写线程

import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.Scanner;

/**
 * @author: ljf
 * @date: 2020/12/29 20:59
 * @description:
 * @modified By:
 * @version: $ 1.0
 */
public class WriteThread extends Thread {
    private final ChatClient chatClient;
    private final Socket clientSocket;
    private PrintWriter printWriter;

    public WriteThread(Socket clientSocket, ChatClient chatClient) {
        this.chatClient = chatClient;
        this.clientSocket = clientSocket;

        try {
            OutputStream outputStream = clientSocket.getOutputStream();
            printWriter = new PrintWriter(outputStream,true);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        Scanner scanner = new Scanner(System.in);

        String userName = chatClient.getUserName();
        printWriter.println(userName);

        String text;
//        System.out.print("[" + userName + "]: ");
        while (!(text = scanner.nextLine()).equals("bye")) {
            printWriter.println(text);
            System.out.print("[" + userName + "]: ");
        }

        try {
            printWriter.println(text);
            clientSocket.close();
        } catch (IOException ex) {

            System.out.println("Error writing to server: " + ex.getMessage());
        }
    }
}

TODO:当前是用传统io流实现,后期加入nio,零拷贝。

 

上一篇:leetcode【每日一题】1046. 最后一块石头的重量 java


下一篇:[源码分析] 消息队列 Kombu 之 启动过程