1. Channel
1.1 什么是Channel
- channel类似于Stream,他是读写数据的双向通道。
- 可以从channel中将数据读入buffer,也可以将buffer的数据写入channel。(channel只与buffer打交道)
- 而之前的stream要么是输入,要么是输出,channel比stream更为底层。
1.2 常见的Channel
- FileChannel
- DatagramChannel
- SocketChannel
- ServerSocketChannel
1.3 channel的常用方法
- 创建channel
ServerSocketChannel channel = ServerSocketChannel.open();
- 为channel绑定端口
channel.bind(new InetSocketAddress(8080));
- 设置channel为非阻塞访问
channel.configureBlocking(false);
- 关于udp的channel处理方式
public class UdpServer {
public static void main(String[] args) {
try (DatagramChannel channel = DatagramChannel.open()) {
channel.socket().bind(new InetSocketAddress(9999));
System.out.println("waiting...");
ByteBuffer buffer = ByteBuffer.allocate(32);
channel.receive(buffer);
buffer.flip();
debug(buffer);
} catch (IOException e) {
e.printStackTrace();
}
}
}
public class UdpClient {
public static void main(String[] args) {
try (DatagramChannel channel = DatagramChannel.open()) {
ByteBuffer buffer = StandardCharsets.UTF_8.encode("hello");
InetSocketAddress address = new InetSocketAddress("localhost", 9999);
channel.send(buffer, address);
} catch (Exception e) {
e.printStackTrace();
}
}
}
- UDP是无连接的,client发送数据不管server是否开启
- server的receive方法会将接收到的数据存入byte buffer,如果数据报文超过buffer大小,多出来的数据默认会被抛弃。
2. Buffer
2.1 什么是Buffer
- buffer是用来缓冲读写数据。最常用的Buffer是ByteBuffer
- Channel只与Buffer打交道。
- Buffer分为使用堆内存和直接内存
- 堆内存会受到垃圾回收的影响,可能会出现频繁的对象搬迁
- 直接内存则不会受GC影响,它有它自己的回收机制,但是这个内存分配比较耗时
2.2 ByteBuffer常见的方法
- 分配空间
Bytebuffer buf=ByteBuffer.allocate(16);
- 向buffer写入数据
// 调用channel的read方法
int readBytes = channel.read(buf);
// 调用buffer自己的put方法
buf.put((byte)127);
- 从buffer读取数据
// 调用channel的write方法
int writeBytes = channel.write(buf);
// 调用buffer自己的get方法
byte b = buf.get();
- 其他方法
- flip:切换为读模式(会清除mark位置)
- rewind:将position重新置为0(会清除mark位置)
- mark:标记此时的position位置
- reset:将position位置移动到mark处
3. Selector
3.1 什么是Selector
- 单线程通过配合Selector可以实现对多个Channel可读写事件的监控,这个被称为多路复用。
- 多路复用仅针对网络IO,普通文件IO没法利用多路复用
- Selector可以保证
- 有可连接事件时才去连接
- 有可读事件时才去读取
- 有可写事件时才去写入
- 因为网络传输能力,Channel未必时时都可写,所以可能会导致写阻塞,一旦Channel可以写,会触发Selector的可写事件。
3.2 Selector的基本使用
- 创建
Selector selector = Selector.open();
- 绑定Channel事件
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, 绑定事件);
- channel必须工作在非阻塞模式,而FileChannel没有非阻塞模式,因此不能与Selector配合使用
- 可以绑定的事件有
- connect:客户端连接成功时会触发
- accept:服务端成功接收连接时会触发
- read:数据可读入时触发,因为接收能力弱,会有数据暂不能读入的情况
- write:数据可写入时触发,因为发送能力弱,会有数据暂不能写出的情况
- 监听所有Channel注册的事件
// 阻塞监听
int count = selector.select();
// 带超时监听
int count = selector.select(long timeout);
// 非阻塞监听
int count = selector.selectNow();
- 阻塞监听什么时候被唤醒
- 当所监听的事件集合中任意事件发生时会唤醒阻塞
- 调用selector.wakeup()
- 调用selector.close()
- selector所在线程被interrupt
- 处理accept事件
@Slf4j
public class ChannelDemo6 {
public static void main(String[] args) {
try (ServerSocketChannel channel = ServerSocketChannel.open()) {
channel.bind(new InetSocketAddress(8080));
System.out.println(channel);
Selector selector = Selector.open();
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
int count = selector.select();
log.debug("select count: {}", count);
// 获取所有事件
Set<SelectionKey> keys = selector.selectedKeys();
// 遍历所有事件,逐一处理
Iterator<SelectionKey> iter = keys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
// 判断事件类型
if (key.isAcceptable()) {
ServerSocketChannel c = (ServerSocketChannel) key.channel();
// 必须处理
SocketChannel sc = c.accept();
log.debug("{}", sc);
}
// 处理完毕,必须将事件移除
iter.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
- 当事件发生后,要么处理,要么取消,不能什么都不做,否则下次select该事件仍会触发,因为NIO底层使用的是水平触发。
- accept事件的处理就是要调用channel的accept方法。
- 处理read事件
@Slf4j
public class ChannelDemo6 {
public static void main(String[] args) {
try (ServerSocketChannel channel = ServerSocketChannel.open()) {
channel.bind(new InetSocketAddress(8080));
System.out.println(channel);
Selector selector = Selector.open();
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
int count = selector.select();
log.debug("select count: {}", count);
// 获取所有事件
Set<SelectionKey> keys = selector.selectedKeys();
// 遍历所有事件,逐一处理
Iterator<SelectionKey> iter = keys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
// 判断事件类型
if (key.isAcceptable()) {
ServerSocketChannel c = (ServerSocketChannel) key.channel();
// 必须处理
SocketChannel sc = c.accept();
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ);
log.debug("连接已建立: {}", sc);
} else if (key.isReadable()) {
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(128);
int read = sc.read(buffer);
// 返回的为-1表示这是连接关闭,需要特别处理
if(read == -1) {
key.cancel();
sc.close();
} else {
buffer.flip();
debug(buffer);
}
}
// 处理完毕,必须将事件移除
iter.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
- 当客户端与服务端断开连接,客户端会向服务端发送一个特殊的读事件,这个读事件的处理方式就是取消注册在selector上的channel。
- 处理write事件
public class WriteServer {
public static void main(String[] args) throws IOException {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
ssc.bind(new InetSocketAddress(8080));
Selector selector = Selector.open();
ssc.register(selector, SelectionKey.OP_ACCEPT);
while(true) {
selector.select();
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isAcceptable()) {
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
SelectionKey sckey = sc.register(selector, SelectionKey.OP_READ);
// 1. 向客户端发送内容
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 3000000; i++) {
sb.append("a");
}
ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
int write = sc.write(buffer);
// 3. write 表示实际写了多少字节
System.out.println("实际写入字节:" + write);
// 4. 如果有剩余未读字节,才需要关注写事件
if (buffer.hasRemaining()) {
// read 1 write 4
// 在原有关注事件的基础上,多关注 写事件
sckey.interestOps(sckey.interestOps() + SelectionKey.OP_WRITE);
// 把 buffer 作为附件加入 sckey
sckey.attach(buffer);
}
} else if (key.isWritable()) {
ByteBuffer buffer = (ByteBuffer) key.attachment();
SocketChannel sc = (SocketChannel) key.channel();
int write = sc.write(buffer);
System.out.println("实际写入字节:" + write);
if (!buffer.hasRemaining()) { // 写完了
key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);
key.attach(null);
}
}
}
}
}
}
- 当socke缓冲可写时,write事件会频繁触发,因此应当只在socket缓冲写不下时再关注可写事件,数据写完之后再取消关注可写事件。