NIO三大组件

1. Channel

1.1 什么是Channel

  • channel类似于Stream,他是读写数据的双向通道。
  • 可以从channel中将数据读入buffer,也可以将buffer的数据写入channel。(channel只与buffer打交道
  • 而之前的stream要么是输入,要么是输出,channel比stream更为底层。

1.2 常见的Channel

  • FileChannel
  • DatagramChannel
  • SocketChannel
  • ServerSocketChannel

1.3 channel的常用方法

  • 创建channel
ServerSocketChannel channel = ServerSocketChannel.open();
  • 为channel绑定端口
channel.bind(new InetSocketAddress(8080));
  • 设置channel为非阻塞访问
 channel.configureBlocking(false);
  • 关于udp的channel处理方式
public class UdpServer {
    public static void main(String[] args) {
        try (DatagramChannel channel = DatagramChannel.open()) {
            channel.socket().bind(new InetSocketAddress(9999));
            System.out.println("waiting...");
            ByteBuffer buffer = ByteBuffer.allocate(32);
            channel.receive(buffer);
            buffer.flip();
            debug(buffer);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
public class UdpClient {
    public static void main(String[] args) {
        try (DatagramChannel channel = DatagramChannel.open()) {
            ByteBuffer buffer = StandardCharsets.UTF_8.encode("hello");
            InetSocketAddress address = new InetSocketAddress("localhost", 9999);
            channel.send(buffer, address);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
  • UDP是无连接的,client发送数据不管server是否开启
  • server的receive方法会将接收到的数据存入byte buffer,如果数据报文超过buffer大小,多出来的数据默认会被抛弃。

2. Buffer

2.1 什么是Buffer

  • buffer是用来缓冲读写数据。最常用的Buffer是ByteBuffer
  • Channel只与Buffer打交道。
  • Buffer分为使用堆内存和直接内存
    • 堆内存会受到垃圾回收的影响,可能会出现频繁的对象搬迁
    • 直接内存则不会受GC影响,它有它自己的回收机制,但是这个内存分配比较耗时

2.2 ByteBuffer常见的方法

  • 分配空间

Bytebuffer buf=ByteBuffer.allocate(16);
  • 向buffer写入数据
// 调用channel的read方法
int readBytes = channel.read(buf);

// 调用buffer自己的put方法
buf.put((byte)127);
  • 从buffer读取数据
// 调用channel的write方法
int writeBytes = channel.write(buf);

// 调用buffer自己的get方法
byte b = buf.get();
  • 其他方法
    • flip:切换为读模式(会清除mark位置)
    • rewind:将position重新置为0(会清除mark位置)
    • mark:标记此时的position位置
    • reset:将position位置移动到mark处

3. Selector

3.1 什么是Selector

  • 单线程通过配合Selector可以实现对多个Channel可读写事件的监控,这个被称为多路复用。
  • 多路复用仅针对网络IO,普通文件IO没法利用多路复用
  • Selector可以保证
    • 有可连接事件时才去连接
    • 有可读事件时才去读取
    • 有可写事件时才去写入
      • 因为网络传输能力,Channel未必时时都可写,所以可能会导致写阻塞,一旦Channel可以写,会触发Selector的可写事件。

NIO三大组件

3.2 Selector的基本使用

  • 创建
Selector selector = Selector.open();
  • 绑定Channel事件
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, 绑定事件);
  • channel必须工作在非阻塞模式,而FileChannel没有非阻塞模式,因此不能与Selector配合使用
  • 可以绑定的事件有
    • connect:客户端连接成功时会触发
    • accept:服务端成功接收连接时会触发
    • read:数据可读入时触发,因为接收能力弱,会有数据暂不能读入的情况
    • write:数据可写入时触发,因为发送能力弱,会有数据暂不能写出的情况
  • 监听所有Channel注册的事件
// 阻塞监听
int count = selector.select();

// 带超时监听
int count = selector.select(long timeout);

// 非阻塞监听
int count = selector.selectNow();
  • 阻塞监听什么时候被唤醒
    • 当所监听的事件集合中任意事件发生时会唤醒阻塞
    • 调用selector.wakeup()
    • 调用selector.close()
    • selector所在线程被interrupt
  • 处理accept事件
@Slf4j
public class ChannelDemo6 {
    public static void main(String[] args) {
        try (ServerSocketChannel channel = ServerSocketChannel.open()) {
            channel.bind(new InetSocketAddress(8080));
            System.out.println(channel);
            Selector selector = Selector.open();
            channel.configureBlocking(false);
            channel.register(selector, SelectionKey.OP_ACCEPT);

            while (true) {
                int count = selector.select();
                log.debug("select count: {}", count);

                // 获取所有事件
                Set<SelectionKey> keys = selector.selectedKeys();

                // 遍历所有事件,逐一处理
                Iterator<SelectionKey> iter = keys.iterator();
                while (iter.hasNext()) {
                    SelectionKey key = iter.next();
                    // 判断事件类型
                    if (key.isAcceptable()) {
                        ServerSocketChannel c = (ServerSocketChannel) key.channel();
                        // 必须处理
                        SocketChannel sc = c.accept();
                        log.debug("{}", sc);
                    }
                    // 处理完毕,必须将事件移除
                    iter.remove();
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
  • 当事件发生后,要么处理,要么取消,不能什么都不做,否则下次select该事件仍会触发,因为NIO底层使用的是水平触发。
  • accept事件的处理就是要调用channel的accept方法。
  • 处理read事件
@Slf4j
public class ChannelDemo6 {
    public static void main(String[] args) {
        try (ServerSocketChannel channel = ServerSocketChannel.open()) {
            channel.bind(new InetSocketAddress(8080));
            System.out.println(channel);
            Selector selector = Selector.open();
            channel.configureBlocking(false);
            channel.register(selector, SelectionKey.OP_ACCEPT);

            while (true) {
                int count = selector.select();
                log.debug("select count: {}", count);

                // 获取所有事件
                Set<SelectionKey> keys = selector.selectedKeys();

                // 遍历所有事件,逐一处理
                Iterator<SelectionKey> iter = keys.iterator();
                while (iter.hasNext()) {
                    SelectionKey key = iter.next();
                    // 判断事件类型
                    if (key.isAcceptable()) {
                        ServerSocketChannel c = (ServerSocketChannel) key.channel();
                        // 必须处理
                        SocketChannel sc = c.accept();
                        sc.configureBlocking(false);
                        sc.register(selector, SelectionKey.OP_READ);
                        log.debug("连接已建立: {}", sc);
                    } else if (key.isReadable()) {
                        SocketChannel sc = (SocketChannel) key.channel();
                        ByteBuffer buffer = ByteBuffer.allocate(128);
                        int read = sc.read(buffer);
                        // 返回的为-1表示这是连接关闭,需要特别处理
                        if(read == -1) {
                            key.cancel();
                            sc.close();
                        } else {
                            buffer.flip();
                            debug(buffer);
                        }
                    }
                    // 处理完毕,必须将事件移除
                    iter.remove();
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
  • 当客户端与服务端断开连接,客户端会向服务端发送一个特殊的读事件,这个读事件的处理方式就是取消注册在selector上的channel。
  • 处理write事件
public class WriteServer {

    public static void main(String[] args) throws IOException {
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);
        ssc.bind(new InetSocketAddress(8080));

        Selector selector = Selector.open();
        ssc.register(selector, SelectionKey.OP_ACCEPT);

        while(true) {
            selector.select();

            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
            while (iter.hasNext()) {
                SelectionKey key = iter.next();
                iter.remove();
                if (key.isAcceptable()) {
                    SocketChannel sc = ssc.accept();
                    sc.configureBlocking(false);
                    SelectionKey sckey = sc.register(selector, SelectionKey.OP_READ);
                    // 1. 向客户端发送内容
                    StringBuilder sb = new StringBuilder();
                    for (int i = 0; i < 3000000; i++) {
                        sb.append("a");
                    }
                    ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
                    int write = sc.write(buffer);
                    // 3. write 表示实际写了多少字节
                    System.out.println("实际写入字节:" + write);
                    // 4. 如果有剩余未读字节,才需要关注写事件
                    if (buffer.hasRemaining()) {
                        // read 1  write 4
                        // 在原有关注事件的基础上,多关注 写事件
                        sckey.interestOps(sckey.interestOps() + SelectionKey.OP_WRITE);
                        // 把 buffer 作为附件加入 sckey
                        sckey.attach(buffer);
                    }
                } else if (key.isWritable()) {
                    ByteBuffer buffer = (ByteBuffer) key.attachment();
                    SocketChannel sc = (SocketChannel) key.channel();
                    int write = sc.write(buffer);
                    System.out.println("实际写入字节:" + write);
                    if (!buffer.hasRemaining()) { // 写完了
                        key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);
                        key.attach(null);
                    }
                }
            }
        }
    }
}
  • 当socke缓冲可写时,write事件会频繁触发,因此应当只在socket缓冲写不下时再关注可写事件,数据写完之后再取消关注可写事件。

 

上一篇:百度日常实习一面面经(Java后端)


下一篇:Tomcat架构