一、创建 selector
Selector selector = Selector.open();
1、一个 selector 可以管理多个 channel 。
二、channel 如何注册到 selector 中 (建立关联关系,使 selector 能够监测到 channel 发生的事件)
// 创建一个 ServerSocketChannel ServerSocketChannel ssc = ServerSocketChannel.open(); // 此方法默认为 true,设置为 false 后,会使 accept() 不再阻塞 ssc.configureBlocking(false); /* register() 建立 Selector 和 Channel 之间的联系(也称之为注册) SelectionKey 就是将来该 key 对应的 channel 发生事件后可以知道是什么时间,以及是哪个 channel 的时间 四种事件: 1、accept: 客户端发起连接请求时触发 2、connect:客户端建立连接后出发 3、read: 可读事件 3、write: 可写事件 */ SelectionKey sscKey = ssc.register(selector, 0, null); // 指定 SelectionKey 需要关注的事件 sscKey.interestOps(SelectionKey.OP_ACCEPT); // 绑定地址 ssc.bind(new InetSocketAddress("127.0.0.1", 8080));
三、selector 如何监听 channel 发生的事件
/* 事件监听 没有事件发生,线程阻塞;有事件发生,则恢复运行 如果监听到的事件未处理,则不会阻塞 总结:事件发生后,要么处理,要么取消 */ selector.select();
四、监听到事件后如何处理,以及需要注意的点
/* 处理事件 */ // selectedKeys() 获取到 selector 中所有监听到事件的 SelectionKey 集合 Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iter = selectionKeys.iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); // 区分事件类型 if (key.isAcceptable()) { ServerSocketChannel channel = (ServerSocketChannel) key.channel(); SocketChannel sc = channel.accept();// 建立连接 // key.cancel(); // 处于某种原因不想处理事件,则取消 sc.configureBlocking(false); // 开启非阻塞模式,避免后续因为未读取到数据而照成线程阻塞 // 将 SocketChannel 也注册到selector 中 SelectionKey scKey = sc.register(selector, 0, null); // 设置该 channel 需要关注的事件 scKey.interestOps(SelectionKey.OP_READ); } else if (key.isReadable()) { SocketChannel sc = (SocketChannel) key.channel(); ByteBuffer buff2 = ByteBuffer.allocate(16); sc.read(buff2); buff2.flip(); System.out.println(buff2.toString()); } // 一个 Selectionkey 的事件处理完成后,应当将其从 selectedKeys 集合中移除 iter.remove(); }
五、为什么一个 SelectionKey 的事件处理完后需要手动移除
*可以这样理解:
selector 会有两个集合,一个存放了所有注册到该 selector 中的 channel,姑且称之为 channels ;另一个用于存放所有 selector 监听到 channel 的事件,姑且称之为 selectedKeys ,里面包含 0 - n 个 selectionKey 。
每当 selector 监听到 channels 集合中的 channel 有新的事件发生时,会将该发生事件的 selectionKey 放入 selectedKeys 中,而我们对 selectedKeys 集合中发生事件的 key 处理后,该集合并不会主动移除此 selectionKey 。
当下次 selector 通过 select() 方法监听到新的事件,我们通过遍历 selectedKeys 集合去处理此事件时,会将之前已经处理过事件的 selectionKey 遍历出来,代码再次处理此 key 对应 channel 的事件时,往往得不到想要的结果。
例如,通过 selectionKey 得到一个已经处理完事件的 channel,使用该 channel 调用 accept() 函数建立连接时,会返回一个类型为 SocketChannel 的空对象,此时如果使用此对象进行相应操作则会导致空指针。
总结,已经处理完事件的 channel 再次处理对应事件得到的结果:1、accept() 返回空对象;2、read() 方法读取到的数据长度为 0 (待验证)。其余两个事件未测试。
# 附练习代码,用于联想所学,不具备实际功能意义
package com.sourceplan.nettydemo.selector; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Set; /** * @Author zhou * @Date 2021/7/19 */ public class SelectorDemo { public static void main(String[] args) throws IOException { // 创建 selector,管理多个 channel Selector selector = Selector.open(); // 创建一个 ByteBuffer ByteBuffer buff = ByteBuffer.allocate(16); // 创建一个 ServerSocketChannel ServerSocketChannel ssc = ServerSocketChannel.open(); // 此方法默认为 true,设置为 false 后,会使 accept() 不再阻塞 ssc.configureBlocking(false); /* register() 建立 Selector 和 Channel 之间的联系(也称之为注册) SelectionKey 就是将来该 key 对应的 channel 发生事件后可以知道是什么时间,以及是哪个 channel 的时间 四种事件: 1、accept: 客户端发起连接请求时触发 2、connect:客户端建立连接后出发 3、read: 可读事件 3、write: 可写事件 */ SelectionKey sscKey = ssc.register(selector, 0, null); // 指定 SelectionKey 需要关注的事件 sscKey.interestOps(SelectionKey.OP_ACCEPT); // 绑定地址 ssc.bind(new InetSocketAddress("127.0.0.1", 8080)); // 创建一个 List 用于存储与客户端建立的 channel List<SocketChannel> scList = new ArrayList<>(); while (true) { /* 事件监听 没有事件发生,线程阻塞;有事件发生,则恢复运行 如果监听到的事件未处理,则不会阻塞 总结:事件发生后,要么处理,要么取消 */ selector.select(); /* 处理事件 */ // selectedKeys() 获取到 selector 中所有监听到事件的 SelectionKey 集合 Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iter = selectionKeys.iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); // 区分事件类型 if (key.isAcceptable()) { ServerSocketChannel channel = (ServerSocketChannel) key.channel(); SocketChannel sc = channel.accept();// 建立连接 // key.cancel(); // 处于某种原因不想处理事件,则取消 sc.configureBlocking(false); // 开启非阻塞模式,避免后续因为未读取到数据而照成线程阻塞 // 将 SocketChannel 也注册到selector 中 SelectionKey scKey = sc.register(selector, 0, null); // 设置该 channel 需要关注的事件 scKey.interestOps(SelectionKey.OP_READ); } else if (key.isReadable()) { SocketChannel sc = (SocketChannel) key.channel(); ByteBuffer buff2 = ByteBuffer.allocate(16); sc.read(buff2); buff2.flip(); System.out.println(buff2.toString()); } // 一个 key 的事件处理完成后,应当将其从 selectedKeys 集合中移除 iter.remove(); } } } private static void testOne(ByteBuffer buff, ServerSocketChannel ssc, List<SocketChannel> scList) throws IOException { /* accept() 与客户端建立连接,默认阻塞,获取到客户端连接请求后才会往下走 只有 ssc(ServerSocketChannel) 对象调用 configureBlocking(),将模式切换为非阻塞时,accept() 方法才为非阻塞 当 accept() 方法为非阻塞时,如果客户端没有发起连接请求,得到的结果为 null */ SocketChannel sc = ssc.accept(); if (sc != null) { sc.configureBlocking(false); // 切换为非阻塞模式,该 channel 下原本为阻塞的方法将不再阻塞 scList.add(sc); } for (SocketChannel channel : scList) { /* read() 默认阻塞,只有读取到内容才会继续执行后续代码 当调用该方法的 SocketChannel 对象开启非阻塞模式时,read() 才为非阻塞方法 当 read() 方法为非阻塞时,如果未读取到数据,则返回 0 */ int read = channel.read(buff); if (read > 0) { } } } }