BIO、NIO、AIO

BIO

同步阻塞 IO

ServerSocket serverSocket = new ServerSocket();
serverSocket.bind(new InetSocketAddress("127.0.0.1", 8081));
while(true) {
    // 同步阻塞
    Socket socket = serverSocket.accept();
    new Thread(() -> {
        try {
            byte[] bytes = new byte[1024];
            // 同步阻塞
            int len = socket.getInputStream().read(bytes);
            System.out.println(new String(bytes, 0, len));
            socket.getOutputStream().write(bytes, 0, len);
            socket.getOutputStream().flush();
        } catch(IOException e) {
            e.printStackTrace();
        }
    }).start();
}

NIO

同步非阻塞 IO

NIOServer:

public class NIOServer extends Thread {
    // 1.多路复用器
    private Selector selector;
    // 2.缓冲区
    private ByteBuffer readBuffer = ByteBuffer.allocate(1024);
    private ByteBuffer writeBuffer = ByteBuffer.allocate(1024);

    // 3.定义构造方法初始化端口
    public NIOServer(int port) {
        init(port)
    }

    // 4.main方法启动线程 
    public static void main(String[] args) { 
        new Thread(new NIOServer(8888)).start();
    }

    // 5.初始化 
    private void init(int port) {
        try {
            System.out.println("服务器正在启动......"); 
            // 1)开启多路复用器 
            this.selector = Selector.open(); 
            // 2)开启服务通道
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            // 3)设置为非阻塞 
            serverSocketChannel.configureBlocking(false); 
            // 4)绑定端口 
            serverSocketChannel.bind(new InetSocketAddress(port)); 
            // 5)注册,标记服务通标状态 
            // SelectionKey.OP_ACCEPT:接收连接继续事件,表示服务器监听到了客户连接,服务器可以接收这个连接了
            // SelectionKey.OP_CONNECT:连接就绪事件,表示客户与服务器的连接已经建立成功
            // SelectionKey.OP_READ:读就绪事件,表示通道中已经有了可读的数据,可以执行读操作了
            // SelectionKey.OP_WRITE:写就绪事件,表示已经可以向通道写数据了
            serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT); 		
            System.out.println("服务器启动完毕");
        } catch (IOException e) { 
            e.printStackTrace();
        }
    }

    public void run() { 
        while (true) { 
            try { 
                // 1.当有至少一个通道被选中,执行此方法 
                this.selector.select(); 
                // 2.获取选中的通道编号集合 
                Iterator<SelectionKey> keys = this.selector.selectedKeys().iterator(); 
                // 3.遍历 keys 
                while (keys.hasNext()) { 
                    SelectionKey key = keys.next(); 
                    // 4.当前 key 需要从动刀集合中移出,如果不移出,下次循环会执行对应的逻辑,造成业务错乱 
                    keys.remove(); 
                    // 5.判断通道是否有效 
                    if (key.isValid()) { 
                        try { 
                            // 6.判断是否可读 
                            if (key.isAcceptable()) { 
                                accept(key);
                          }
                            // 7.判断是否可读 
                            if (key.isReadable()) { 
                                read(key);
                          }
                            // 8.判断是否可写 
                            if (key.isWritable()) { 
                                write(key);
                          }
                      } catch (CancelledKeyException e) { 
                            //出现异常断开连接
                            key.cancel(); 
                        } 
                    } 
                }
          } catch (IOException e) { 
                e.printStackTrace();
          }
        }
    }

    private void accept(SelectionKey key) { 
        try { 
            // 1.当前通道在 init 方法中注册到了 selector 中的 ServerSocketChannel 
            ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel(); 
            // 2.阻塞方法, 客户端发起后请求返回. 
            SocketChannel channel = serverSocketChannel.accept(); 
            // 3.serverSocketChannel 设置为非阻塞 
            channel.configureBlocking(false); 
            // 4.设置对应客户端的通道标记,设置次通道为可读时使用 
            channel.register(this.selector, SelectionKey.OP_READ);
      } catch (IOException e) { 
            e.printStackTrace();
      } 
    }

    // 使用通道读取数据 
    private void read(SelectionKey key) { 
        try {
          // 清空缓存 
            this.readBuffer.clear(); 
            // 获取当前通道对象 
            SocketChannel channel = (SocketChannel) key.channel(); 
            // 将通道的数据 (客户发送的 data) 读到缓存中. 
            int readLen = channel.read(readBuffer); 
            // 如果通道中没有数据 
            if(readLen == -1) { 
                // 关闭通道 
                key.channel().close(); 
                // 关闭连接 
                key.cancel(); 
                return;
          } 
            // Buffer 中有游标,游标不会重置,需要我们调用 flip 重置. 否则读取不一致 
            this.readBuffer.flip(); 
            // 创建有效字节长度数组 
            byte[] bytes = new byte[readBuffer.remaining()]; 
            // 读取 buffer 中数据保存在字节数组 
            readBuffer.get(bytes); 
            System.out.println("收到了从客户端 "+ channel.getRemoteAddress() + " : " + new String(bytes,"UTF-8")); 
            // 注册通道,标记为写操作
            channel.register(this.selector, SelectionKey.OP_WRITE);
      } catch (Exception e) {
      }
    }

    // 给通道中写操作 
    private void write(SelectionKey key) { 
        // 清空缓存 
        this.writeBuffer.clear(); 
        // 获取当前通道对象 
        SocketChannel channel = (SocketChannel) key.channel(); 
        // 录入数据 
        Scanner scanner = new Scanner(System.in);

        try { 
            System.out.println("即将发送数据到客户端.."); 
            String line = scanner.nextLine(); 
            // 把录入的数据写到 Buffer 中 
            writeBuffer.put(line.getBytes("UTF-8")); 
            // 重置缓存游标 
            writeBuffer.flip(); 
            channel.write(writeBuffer); 
            channel.register(this.selector,SelectionKey.OP_READ);
      } catch (Exception e) { 
            e.printStackTrace();
      } 
    }

}

NIOClient:

public class NIOClient { 
    public static void main(String[] args) { 
        // 创建远程地址 
        InetSocketAddress address = new InetSocketAddress("127.0.0.1", 8888); 
        SocketChannel channel = null; 
        // 定义缓存 
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        try { 
            // 开启通道 
            channel = SocketChannel.open();
			// 连接远程远程服务器 
            channel.connect(address);
            Scanner sc = new Scanner(System.in); 
            while (true) { 
                System.out.println("客户端即将给 服务器发送数据.."); 
                String line = sc.nextLine(); 
                if(line.equals("exit")) { 
                    break;
				} 
                // 控制台输入数据写到缓存 
                buffer.put(line.getBytes("UTF-8")); 
                // 重置 buffer 游标 
                buffer.flip(); 
                // 数据发送到数据 
                channel.write(buffer); 
                // 清空缓存数据
                buffer.clear();

                // 读取服务器返回的数据 
                int readLen = channel.read(buffer); 
                if(readLen == -1) { 
                    break;
                } 
                // 重置 buffer 游标 
                buffer.flip(); 
                byte[] bytes = new byte[buffer.remaining()]; 
                // 读取数据到字节数组 
                buffer.get(bytes); 
                System.out.println("收到了服务器发送的数据 : "+ new String(bytes,"UTF-8"));
                buffer.clear();
			}
		} catch (IOException e) { 
            e.printStackTrace();
		} finally { 
            if (null != channel) { 
                try { 
                    channel.close();
				} catch (IOException e) { 
                    e.printStackTrace();
				} 
            } 
        }
    }
}

AIO

异步非阻塞 IO

使用场景:连接数目多且连接比较长的架构,如相册服务器。重点调用了 OS 参与并发操作,编程比较复杂

上一篇:SpringBoot - 整合Mybatis


下一篇:数据连接池