Selector(选择器)
Selector能够检测多个注册的通道上是否有事件发生(多个channel以事件的方式可以注册到同一个Selector),如果有事件发生,便获取事件然后针对每个事件进行相应的处理。这样就可以只用一个单线程去管理多个通道,也就是管理多个链接和请求。
只有在连接通道真正有读写事件发生时,才会进行读写,这就大大地减少了系统开销,并且不必为每个连接都创建一个线程,不用去维护多个线程了。并且避免了多线程之间的上下文切换导致的开销。
图解说明:
-
多路复用器Selector可以同时并发处理多个客户端连接
-
当线程从某客户端 Socket 通道进行读写数据时,若没有数据可用时,该线程可以进行其他任务
-
线程通常将非阻塞 IO 的空闲时间用于在其他通道上执行 IO 操作,所以单独的线程可以管理多个输入和输出
通道。
-
由于读写操作都是非阻塞的,这就可以充分提升 IO 线程的运行效率,避免由于频繁 I/O 阻塞导致的线程挂
起。
-
一个 I/O 线程可以并发处理 N 个客户端连接和读写操作,这从根本上解决了传统同步阻塞 I/O 一连接一线
程模型,架构的性能、弹性伸缩能力和可靠性都得到了极大的提升。
Selector类相关方法
Selector类是一个抽象类,常用方法如下:
public static Selector open();//得到一个选择器对象
public int select(long timeout);//监控所有注册的通道,当其中有IO操作可以进行时,将对应的SelectionKey加入到内部集合中并返回,参数用来设置超时时间,即阻塞xx毫秒,在xx毫秒后返回
public Set<SelectionKey> selectedKeys();//从内部集合中得到所有的SelectionKey
public abstract int select() throws IOException;//阻塞
public abstract Selector wakeup();//唤醒Selector
public abstract int selectNow() throws IOException;//不阻塞,立马返回
NIO非阻塞网络编程关系梳理图
图解说明:
- Selector进行监听select方法,返回有事件发生的通道的个数
- 当客户端连接时,会通过ServerSocketChannel得到SocketChannel
- 将socketChannel注册到Selector上(一个Selector上可以注册多个SocketChannel)
- 注册后返回一个SelectionKey,会和该selector关联
- 利用连接到服务器上的SelectionKey来判断事件类型
- 判断了事件之后,如是读、写事件,则根据SelectionKey获取SocketChannel,进行业务处理。
SelectionKey
SelectionKey表示selector和网络通道的注册关系,分为四种:
-
SelectionKey.OP_ACCEPT —— 接收连接继续事件,表示服务器监听到了客户连接,服务器可以接收这个连接了
-
SelectionKey.OP_CONNECT —— 连接就绪事件,表示客户端与服务器的连接已经建立成功
-
SelectionKey.OP_READ —— 读就绪事件,表示通道中已经有了可读的数据,可以执行读操作了(通道目前有数据,可以进行读操作了)
-
SelectionKey.OP_WRITE —— 写就绪事件,表示已经可以向通道写数据了(通道目前可以用于写操作)
SelectionKey中常用的方法:
public abstract Selector selector();//得到与该Selectionkey关联的Selector对象
public abstract SelectableChannel channel();//得到与该Selectionkey关联的通道
public final Object attachment();//得到与该SelectionKey关联的共享数据
public abstract SelectionKey intersetOps(int ops);//设置或改变监听事件
public final boolean isAcceptable();//是否可以连接
public final boolean isReadable();//是否可以读
public final boolean isWriteable();//是否可以写
ServerSocketChannel类
ServerSocketChannel在服务器端监听新的客户端Socket连接
ServerSocketChannel常用方法:
public static ServerSocketChannel open() throws IOException;//得到一个ServerSocketChannel通道
public final ServerSocketChannel bind(SocketAddress local) throws IOException;//设置服务器端口号
public final SelectableChannel configureBlocking(boolean block);//设置阻塞或非阻塞模式,false表示非阻塞模式
public abstract SocketChannel accept() throws IOException;//接受一个连接,返回代表这个连接的通道对象
public final SelectionKey register(Selector sel, int ops);//注册一个选择器并设置监听事件
public final SelectionKey register(Selector sel, int ops,Object att);//注册一个选择器并设置监听事件,最后一个参数可以设置共享数据
SocketChannel类
SocketChannel,网络 IO 通道,具体负责进行读写操作。NIO 把缓冲区的数据写入通道,或者把通道里的数据读到缓冲区。
SocketChannel常用方法:
public static ServerSocketChannel open() throws IOException;//得到一个SocketChannel通道
public final SelectableChannel configureBlocking(boolean block);//设置阻塞或非阻塞模式,false表示非阻塞模式
public boolean connect(SocketAddress remote);//连接服务器
public boolean finishConnect();//如果connect连接失败,那么要通过这个方法完成连接操作
public int write(ByteBuffer src);//往通道里写数据
public int read(ByteBuffer det);//从通道里读数据
public final SelectionKey register(Selector sel, int ops,Object att);//注册一个选择器并设置监听事件,最后一个参数可以设置共享数据
public final void close();//关闭通道
例子
Demo1:利用NIO,实现服务器端和客户端之间的简单数据通讯
服务器端
/**
* 2020/1/8 上午10:09
* 实现服务器端和客户端之间的数据通讯(非阻塞)
*/
public class NIOServer {
private static String IP = "127.0.0.1";
private static int PORT = 6668;
public static void main(String[] args) throws Exception {
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(PORT));
serverSocketChannel.configureBlocking(false);
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
if (selector.select(1000L) == 0) {
continue;
}
//如果客户端连接上了
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
if (key.isAcceptable()) {
SocketChannel channel = serverSocketChannel.accept();
System.out.println(" 客 户 端 连 接 成 功 生 成 了 一 个 socketChannel " +
channel.hashCode());
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_READ,ByteBuffer.allocate(1024));
}
if (key.isReadable()) {
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer buffer = (ByteBuffer) key.attachment();
socketChannel.read(buffer);
System.out.println("from 客户端:" + new String(buffer.array()).trim());
}
iterator.remove();
}
}
}
}
selector轮询事件,当客户端连接上服务之后,即产生一个OP_ACCEPT事件,服务器轮询到该事件后,获取到当前客户端连接的通道channel,并为通道注册Read事件。 selector随后会轮询到Read事件,便会执行Read事件的业务逻辑,比如向通道中写数据。这样客户端就能读取到通道中写入的数据,即实现简单的通讯。
客户端
public class NIOClient {
private static String IP = "127.0.0.1";
private static int PORT = 6668;
public static void main(String[] args) throws Exception {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
InetSocketAddress inetAddress = new InetSocketAddress(IP, PORT);
if (!socketChannel.connect(inetAddress)) {
while (!socketChannel.finishConnect()) {
System.out.println("因为连接需要时间,客户端不会阻塞,可以做其它工作..");
}
String str = "Hello";
ByteBuffer buffer = ByteBuffer.wrap(str.getBytes());
socketChannel.write(buffer);
//客户端阻塞在这里
System.in.read();
}
}
}
客户端主要是执行连接服务器的操作
Demo2:实现一个简单的服务器端与客户端的群聊程序
服务器端:
/**
* 2020/1/8 上午11:18
* 实现服务器端与客户端之间的数据简单通讯
* 服务器端:检测用户上线、离线 并实现消息转发功能
*/
public class NIOChatServer {
private ServerSocketChannel serverSocketChannel;
private Selector selector;
public NIOChatServer() throws Exception {
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(6669));
selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
}
public static void main(String[] args) throws Exception {
NIOChatServer server = new NIOChatServer();
server.chat();
}
private void chat() throws Exception {
String user = null;
while (true) {
int connectNum = selector.select();
if (connectNum <= 0) {
continue;
}
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
if (key.isAcceptable()) {
SocketChannel socketChannel = serverSocketChannel.accept();
System.out.println(socketChannel.getRemoteAddress() + "已经上线");
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));
}
if (key.isReadable()) {
readMsg(key);
}
iterator.remove();
}
}
}
private void readMsg(SelectionKey key) {
SocketChannel channel = null;
try {
channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int len = channel.read(buffer);
if (len > 0) {
String content = new String(buffer.array(), 0, len);
System.out.println(content.trim());
sendMsgToOtherChannel(content,channel);
}
} catch (IOException e) {
try {
System.out.println(channel.getRemoteAddress() + " 离线了..");
key.cancel();
channel.close();
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
//发送消息给其他客户端
private void sendMsgToOtherChannel(String msg, SocketChannel self) throws IOException {
//获取当前选择器中发生的事件
Set<SelectionKey> keys = selector.keys();
for (SelectionKey key : keys) {
//每个事件与channel是对应的,如果获取到的Channel不是当前客户端端的channel,那么可以认为是其他客户端,就可以发送消息
SelectableChannel targetChannel = key.channel();
if (targetChannel instanceof SocketChannel && targetChannel != self) {
SocketChannel socketChannel = (SocketChannel) targetChannel;
socketChannel.write(ByteBuffer.wrap(msg.getBytes()));
}
}
}
}
客户端:
/**
* 2020/1/8 上午11:19
* 无阻塞地发送消息给其他所有用户,同时可以接受其他用户发送的消息(服务器转发)
*/
public class NIOChatClient {
private SocketChannel socketChannel;
private Selector selector;
private String userName;
public NIOChatClient() throws Exception {
socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 6669));
socketChannel.configureBlocking(false);
selector = Selector.open();
socketChannel.register(selector, SelectionKey.OP_READ);
userName = socketChannel.getLocalAddress().toString().substring(1);
System.out.println(userName + " is ok...");
}
public static void main(String[] args) throws Exception {
NIOChatClient chatClient = new NIOChatClient();
new Thread(() -> {
try {
while(true){
chatClient.readInfo();
Thread.sleep(3000);
}
} catch (Exception e) {
e.printStackTrace();
}
}).start();
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()) {
String msg = scanner.nextLine();
chatClient.sendInfo(msg);
}
}
public void sendInfo(String info) throws IOException {
info = userName.concat("说").concat(info);
socketChannel.write(ByteBuffer.wrap(info.getBytes()));
}
public void readInfo() throws IOException {
while (true) {
int select = selector.select();
if (select > 0) {
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
if (key.isReadable()) {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int len = channel.read(buffer);
if (len > 0) {
String msg = new String(buffer.array(), 0, len);
System.out.println(msg.trim());
}
}
iterator.remove();
}
}
}
}
}
注意:
1.当向通道中注册SelectionKey.OP_READ事件后,如果客户端又向缓存中write数据,下次轮询时,则isReadable()=true;
2.当向通道中注册SelectionKey.OP_WRITE事件后,如果不设置为其他事件,这时你会发现当前轮询线程中isWritable()一直为ture(解决方式:write业务处理完之后,将通道注册为其他事件)
码农的进阶之路 发布了88 篇原创文章 · 获赞 21 · 访问量 8万+ 私信 关注