BIO
同步阻塞 IO
ServerSocket serverSocket = new ServerSocket();
serverSocket.bind(new InetSocketAddress("127.0.0.1", 8081));
while(true) {
// 同步阻塞
Socket socket = serverSocket.accept();
new Thread(() -> {
try {
byte[] bytes = new byte[1024];
// 同步阻塞
int len = socket.getInputStream().read(bytes);
System.out.println(new String(bytes, 0, len));
socket.getOutputStream().write(bytes, 0, len);
socket.getOutputStream().flush();
} catch(IOException e) {
e.printStackTrace();
}
}).start();
}
NIO
同步非阻塞 IO
NIOServer:
public class NIOServer extends Thread {
// 1.多路复用器
private Selector selector;
// 2.缓冲区
private ByteBuffer readBuffer = ByteBuffer.allocate(1024);
private ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
// 3.定义构造方法初始化端口
public NIOServer(int port) {
init(port)
}
// 4.main方法启动线程
public static void main(String[] args) {
new Thread(new NIOServer(8888)).start();
}
// 5.初始化
private void init(int port) {
try {
System.out.println("服务器正在启动......");
// 1)开启多路复用器
this.selector = Selector.open();
// 2)开启服务通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 3)设置为非阻塞
serverSocketChannel.configureBlocking(false);
// 4)绑定端口
serverSocketChannel.bind(new InetSocketAddress(port));
// 5)注册,标记服务通标状态
// SelectionKey.OP_ACCEPT:接收连接继续事件,表示服务器监听到了客户连接,服务器可以接收这个连接了
// SelectionKey.OP_CONNECT:连接就绪事件,表示客户与服务器的连接已经建立成功
// SelectionKey.OP_READ:读就绪事件,表示通道中已经有了可读的数据,可以执行读操作了
// SelectionKey.OP_WRITE:写就绪事件,表示已经可以向通道写数据了
serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
System.out.println("服务器启动完毕");
} catch (IOException e) {
e.printStackTrace();
}
}
public void run() {
while (true) {
try {
// 1.当有至少一个通道被选中,执行此方法
this.selector.select();
// 2.获取选中的通道编号集合
Iterator<SelectionKey> keys = this.selector.selectedKeys().iterator();
// 3.遍历 keys
while (keys.hasNext()) {
SelectionKey key = keys.next();
// 4.当前 key 需要从动刀集合中移出,如果不移出,下次循环会执行对应的逻辑,造成业务错乱
keys.remove();
// 5.判断通道是否有效
if (key.isValid()) {
try {
// 6.判断是否可读
if (key.isAcceptable()) {
accept(key);
}
// 7.判断是否可读
if (key.isReadable()) {
read(key);
}
// 8.判断是否可写
if (key.isWritable()) {
write(key);
}
} catch (CancelledKeyException e) {
//出现异常断开连接
key.cancel();
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void accept(SelectionKey key) {
try {
// 1.当前通道在 init 方法中注册到了 selector 中的 ServerSocketChannel
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
// 2.阻塞方法, 客户端发起后请求返回.
SocketChannel channel = serverSocketChannel.accept();
// 3.serverSocketChannel 设置为非阻塞
channel.configureBlocking(false);
// 4.设置对应客户端的通道标记,设置次通道为可读时使用
channel.register(this.selector, SelectionKey.OP_READ);
} catch (IOException e) {
e.printStackTrace();
}
}
// 使用通道读取数据
private void read(SelectionKey key) {
try {
// 清空缓存
this.readBuffer.clear();
// 获取当前通道对象
SocketChannel channel = (SocketChannel) key.channel();
// 将通道的数据 (客户发送的 data) 读到缓存中.
int readLen = channel.read(readBuffer);
// 如果通道中没有数据
if(readLen == -1) {
// 关闭通道
key.channel().close();
// 关闭连接
key.cancel();
return;
}
// Buffer 中有游标,游标不会重置,需要我们调用 flip 重置. 否则读取不一致
this.readBuffer.flip();
// 创建有效字节长度数组
byte[] bytes = new byte[readBuffer.remaining()];
// 读取 buffer 中数据保存在字节数组
readBuffer.get(bytes);
System.out.println("收到了从客户端 "+ channel.getRemoteAddress() + " : " + new String(bytes,"UTF-8"));
// 注册通道,标记为写操作
channel.register(this.selector, SelectionKey.OP_WRITE);
} catch (Exception e) {
}
}
// 给通道中写操作
private void write(SelectionKey key) {
// 清空缓存
this.writeBuffer.clear();
// 获取当前通道对象
SocketChannel channel = (SocketChannel) key.channel();
// 录入数据
Scanner scanner = new Scanner(System.in);
try {
System.out.println("即将发送数据到客户端..");
String line = scanner.nextLine();
// 把录入的数据写到 Buffer 中
writeBuffer.put(line.getBytes("UTF-8"));
// 重置缓存游标
writeBuffer.flip();
channel.write(writeBuffer);
channel.register(this.selector,SelectionKey.OP_READ);
} catch (Exception e) {
e.printStackTrace();
}
}
}
NIOClient:
public class NIOClient {
public static void main(String[] args) {
// 创建远程地址
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 8888);
SocketChannel channel = null;
// 定义缓存
ByteBuffer buffer = ByteBuffer.allocate(1024);
try {
// 开启通道
channel = SocketChannel.open();
// 连接远程远程服务器
channel.connect(address);
Scanner sc = new Scanner(System.in);
while (true) {
System.out.println("客户端即将给 服务器发送数据..");
String line = sc.nextLine();
if(line.equals("exit")) {
break;
}
// 控制台输入数据写到缓存
buffer.put(line.getBytes("UTF-8"));
// 重置 buffer 游标
buffer.flip();
// 数据发送到数据
channel.write(buffer);
// 清空缓存数据
buffer.clear();
// 读取服务器返回的数据
int readLen = channel.read(buffer);
if(readLen == -1) {
break;
}
// 重置 buffer 游标
buffer.flip();
byte[] bytes = new byte[buffer.remaining()];
// 读取数据到字节数组
buffer.get(bytes);
System.out.println("收到了服务器发送的数据 : "+ new String(bytes,"UTF-8"));
buffer.clear();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if (null != channel) {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
AIO
异步非阻塞 IO
使用场景:连接数目多且连接比较长的架构,如相册服务器。重点调用了 OS 参与并发操作,编程比较复杂