前言: (Netty官网 、GitHub)
Netty 是一个 NIO 客户端服务器框架,可以快速轻松地开发协议服务器和客户端等网络应用程序。它极大地简化和流线了网络编程,例如 TCP 和 UDP 套接字服务器。
Netty 是一个异步的、基于事件驱动的网络应用框架,用以快速开发高性能、高可靠性的网络 IO 程序。
Netty 主要针对在 TCP 协议下,面向 Clients 端的高并发应用,或者 Peer-to-Peer 场景下的大量数据持续传输的
应用。
Netty 本质是一个 NIO 框架,适用于服务器通讯相关的多种应用场景
“快速和简单”并不意味着生成的应用程序会受到可维护性或性能问题的影响。Netty 是根据从实现许多协议(如 FTP、SMTP、HTTP 以及各种二进制和基于文本的旧协议)中获得的经验精心设计的。因此,Netty 成功地找到了一种方法,可以在不妥协的情况下实现易于开发、性能、稳定性和灵活性。
运行条件最低要求只有两个: 最新版本的 Netty 和 JDK 1.6 或更高版本, 要透彻理解 Netty , 需要先学习 NIO
1.0 I/O 模型
I/O 模型简单的理解:就是用什么样的通道进行数据的发送和接收,很大程度上决定了程序通信的性能
Java 共支持 3 种网络编程模型/IO 模式:BIO、NIO、AIO
- Java BIO : 同步并阻塞(传统阻塞型),服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销
BIO 就是传统的 java io 编程,其相关的类和接口在 java.io
- Java NIO : 同步非阻塞,服务器实现模式为一个线程处理多个请求(连接),即客户端发送的连接请求都会注
册到多路复用器上,多路复用器轮询到连接有 I/O 请求就进行处理
- Java AIO(NIO.2) : 异步非阻塞,AIO 引入异步通道的概念,采用了 Proactor 模式,简化了程序编写,有效
的请求才启动线程,它的特点是先由操作系统完成后才通知服务端程序启动线程去处理,一般适用于连接数较
多且连接时间较长的应用
2.0 BIO、NIO、AIO 适用场景
-
BIO 方式适用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,JDK1.4 以前的唯一选择,但程序简单易理解。
-
NIO 方式适用于连接数目多且连接比较短(轻操作)的架构,比如聊天服务器,弹幕系统,服务器间通讯等。 编程比较复杂,JDK1.4 开始支持。
-
AIO 方式使用于连接数目多且连接比较长(重操作)的架构,比如相册服务器,充分调用 OS 参与并发操作, 编程比较复杂,JDK7 开始支持。
3.0 同步(Synchronous)与异步(Asynchronous)
同步和异步都是基于应用程序和操作系统处理 IO 事件所采用的方式。
同步:是应用程序要直接参与 IO 读写的操作。
异步:所有的 IO 读写交给操作系统去处理,应用程序只需要等待通知。
同步方式在处理 IO 事件的时候,必须阻塞在某个方法上面等待我们的 IO 事件完成(阻塞 IO 事件或者通过轮询 IO事件的方式),对于异步来说,所有的 IO 读写都交给了操作系统。这个时候,我们可以去做其他的事情,并不需要去完成真正的 IO 操作,当操作完成 IO 后,会给我们的应用程序一个通知。
所以异步相比较于同步带来的直接好处就是在我们处理IO数据的时候,异步的方式我们可以把这部分等待所消耗的资源用于处理其他事务,提升我们服务自身的性能。
4.0 BIO与NIO对比
4.1 BIO
- 它基于流模型实现,提供了我们最熟知的一些 IO 功能,比如File抽象、输入输出流等。交互方式是同步、阻塞的方式,也就是说,在读取输入流或者写入输出流时,在读、写动作完成之前,线程会一直阻塞在那里,它们之间的调用是可靠的线性顺序。
BIO的缺点: 每个请求都需要创建独立的线程,与对应的客户端进行数据 Read,业务处理,数据 Write 。
当并发数较大时,需要创建大量线程来处理连接,系统资源占用较大。
连接建立后,如果当前线程暂时没有数据可读,则线程就阻塞在 Read 操作上,造成线程资源浪费。
BIO的好处: 编码简单, 便于调试、理解
4.2 NIO
- 是一种同步非阻塞的 I/O 模型,于 Java 1.4 中引入,对应 java.nio 包,NIO 有三大核心部分: Channel(通道),Selector(选择器),Buffer(缓冲区) 等抽象。NIO 中的 N 可以理解为 Non-blocking,不单纯是 New。它支持面向缓冲的,基于通道的 I/O 操作方法。NIO 提供了与传统 BIO 模型中的
Socket
和ServerSocket
相对应的SocketChannel
和ServerSocketChannel
两种不同的套接字通道实现,两种通道都支持阻塞和非阻塞两种模式。对于高负载、高并发的(网络)应用,应使用 NIO 的非阻塞模式来开发。
NIO
的缺点: 编码需要一定学习成本, 不容易调试、理解
NIO
的好处: 使一个线程从某通道发送请求或者读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取,而不是保持线程阻塞,所以直至数据变的可以读取之前,该线程可以继续做其他的事情。 非阻塞写也是如此,一个线程请求写入一些数据到某通道,但不需要等待它完全写入, 这个线程同时可以去做别的事情, 通过Selector(选择器)
用于监听多个通道的事件(比如:连接请求, 数据到达等),因此使用单个线程就可以监听多个客户端通道。NIO是可以做到用一个线程来处理多个操作的。假设有 10000 个请求过来,根据实际情况,可以分配 50 或者 100 个线程来处理。不像之前的阻塞 IO 那样,非得分配 10000 个, 这就大量节约了系统资源。
同样HTTP2.0 也使用了多路复用的技术,做到同一个连接并发处理多个请求,而且并发请求的数量比 HTTP1.1 大了好几个数量级
5 NIO
三大核心部分通信的简单模型
6.0 缓冲区(Buffer
)
缓冲区(Buffer):缓冲区本质上是一个可以读写数据的内存块,可以理解成是一个容器对象(含数组),该对象提供了一组方法,可以更轻松地使用内存块,,缓冲区对象内置了一些机制,能够跟踪和记录缓冲区的状态变化情况。
Channel 提供从文件、网络读取数据的渠道,但是读取或写入的数据都必须经由 Buffer
在 NIO 中,Buffer 是一个顶层父类,它是一个抽象类, 类的层级关系图:
Buffer 类定义了所有的缓冲区都具有的四个属性来提供关于其所包含的数据元素的信息:
以及相关方法:
从前面可以看出对于 Java 中的基本数据类型(boolean 除外),都有一个 Buffer 类型与之相对应,最常用的自然是 ByteBuffer 类(二进制数据),
该类的主要方法如下:
7.0 通道(Channel)
7.1 基本介绍
NIO 的通道类似于流,但有些区别如下:
-
通道可以同时进行读写,而流只能读或者只能写
-
通道可以实现异步读写数据
-
通道可以从缓冲读数据,也可以写数据到缓冲
BIO 中的 stream 是单向的,例如 FileInputStream 对象只能进行读取数据的操作,而 NIO 中的通道(Channel) 是双向的,可以读操作,也可以写操作。
7.2 Channel
在 NIO 中是一个接口
public interface Channel extends Closeable {
public boolean isOpen();
public void close() throws IOException;
}
常 用 的 Channel 类有:
FileChannel
(文件的读写) ,DatagramChannel
(UDP的数据读写) ,ServerSocketChannel
和SocketChannel
(TCP的数据读写)。【ServerSocketChanne 类似 ServerSocket , SocketChannel 类似 Socket】
7.2.1 FileChannel 类 (例子在基础笔记中的FileChannel读写文件)
FileChannel 主要用来对本地文件进行 IO 操作,FileChannel只能在阻塞模式下工作,所以无法搭配Selector,
常见的方法有
-
public int read(ByteBuffer dst)
,从通道读取数据并放到缓冲区中 -
public int write(ByteBuffer src)
,把缓冲区的数据写到通道中 -
public long transferFrom(ReadableByteChannel src, long position, long count)
,从目标通道中复制数据到当前通道 -
public long transferTo(long position, long count, WritableByteChannel target)
,把数据从当前通道复制给目标通道, 但一次只能传输2G的内容, 底层使用了零拷贝技术
7.3 关于 Buffer 和 Channel 的注意事项和细节
- ByteBuffer 支持类型化的 put 和 get, put 放入的是什么数据类型,get 就应该使用相应的数据类型来取出,否则可能有 BufferUnderflowException 异常。
-
public abstract ByteBuffer asReadOnlyBuffer()
// 获取一个只读Buffer
//得到一个只读的
Buffer ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer();
//读取
while (readOnlyBuffer.hasRemaining()) {
System.out.println(readOnlyBuffer.get());
}
readOnlyBuffer.put((byte)100); //ReadOnlyBufferException
- NIO还提供了 MappedByteBuffer,可以让文件直接在内存(堆外的内存)中进行修改, 而如何同步到文件由 NIO 来完成
RandomAccessFile randomAccessFile=new RandomAccessFile("1.txt", "rw");
//获取对应的通道
FileChannel channel=randomAccessFile.getChannel();
/*** 参数 1: FileChannel.MapMode.READ_WRITE 使用的读写模式
* 参数 2: 0 : 可以直接修改的起始位置
* 参数 3: 5: 是映射到内存的大小(不是索引位置) ,即将 1.txt 的多少个字节映射到内存
* 可以直接修改的范围就是 0-5
* mappedByteBuffer实际类型是 DirectByteBuffer
*/
MappedByteBuffer mappedByteBuffer=channel.map(FileChannel.MapMode.READ_WRITE, 0, 5);
mappedByteBuffer.put(0, (byte)'H');
mappedByteBuffer.put(3, (byte)'9');
mappedByteBuffer.put(5, (byte)'Y'); //这里抛出异常IndexOutOfBoundsException
randomAccessFile.close();
- NIO 还支持 通过多个 Buffer (即 Buffer 数组) 完成读写操作,即 Scattering 和 Gathering
Scattering:将数据写入到 buffer 时,可以采用 buffer 数组,依次写入 [分散]
Gathering: 从 buffer 读取数据时,可以采用 buffer
void contextLoads() throws Exception {
RandomAccessFile randomAccessFile = new RandomAccessFile("1.txt", "rw");
//获取对应的通道
FileChannel channel = randomAccessFile.getChannel();
//创建 buffer 数组
ByteBuffer[] byteBuffers = new ByteBuffer[2];
byteBuffers[0] = ByteBuffer.allocate(5);
byteBuffers[1] = ByteBuffer.allocate(3);
// 循环添加
for (ByteBuffer byteBuffer : byteBuffers) {
byteBuffer.put("8".getBytes(StandardCharsets.UTF_8));
}
channel.write(byteBuffers); // byteBuffers数组写 (也可以读)
}
8.0 Selector
8.1 基本介绍
- Java 的 NIO,用非阻塞的 IO 方式。可以用一个线程,处理多个的客户端连接,就会使用到 Selector(选择器)
- Selector 能够检测多个注册的通道上是否有事件发生(注意:多个 Channel 以事件的方式可以注册到同一个Selector),如果有事件发生,便获取事件然后针对每个事件进行相应的处理。这样就可以只用一个单线程去管理多个通道,也就是管理多个连接和请求。
- 只有在 连接/通道 真正有读写事件发生时,才会进行读写,就大大地减少了系统开销,并且不必为每个连接都创建一个线程,不用去维护多个线程, 避免了多线程之间的上下文切换导致的开销
单线程可以配合 Selector 完成对多个 Channel 可读写事件的监控,这称之为多路复用 (如下图)
多路复用仅针对网络 IO,普通文件 IO 无法利用多路复用
- Netty 的 IO 线程 NioEventLoop 聚合了 Selector(选择器,也叫多路复用器),可以同时并发处理成百上千个客户端连接。
- 当线程从某客户端 Socket 通道进行读写数据时,若没有数据可用时,该线程可以进行其他任务。
- 线程通常将非阻塞 IO 的空闲时间用于在其他通道上执行 IO 操作,所以单独的线程可以管理多个输入和输出通道。
- 由于读写操作都是非阻塞的,这就可以充分提升 IO 线程的运行效率,避免由于频繁 I/O 阻塞导致的线程挂起。
- 一个 I/O 线程可以并发处理 N 个客户端连接和读写操作,这从根本上解决了传统同步阻塞 I/O 一连接一线程模型,架构的性能、弹性伸缩能力和可靠性都得到了极大的提升。
8.2 Selector
相关方法
- selector.select() //阻塞的操作, 必须返回有事件发生通道数
- selector.select(1000); //阻塞 1000 毫秒,在 1000 毫秒后返回
- selector.wakeup(); //唤醒 selector
- selector.selectNow(); //不阻塞,立马返还
8.2.1 重要的属性字段
selector
=>AbstractSelector
=>SelectorImpl
重要的属性字段:
-
protected HashSet<SelectionKey> keys = new HashSet();
// 里面存储的是所有注册到selector通道的SelectionKey
-
protected Set<SelectionKey> selectedKeys = new HashSet();
// 里面存储的是通道里面发生了事件的SelectionKey
, 没有事件发生时为空! 当有事件发生的时候selector
会将相应SelectionKey
添加进selectedKeys
集合, 而不会主动的删除。 -
selectedKeys()
方法本质获取的就是publicSelectedKeys
, 而publicSelectedKeys
里面的所有调用又是指向了调用selectedKeys
的方法
8.3 NIO 非阻塞 网络编程原理分析图
NIO 非阻塞 网络编程相关的(Selector、SelectionKey、ServerScoketChannel 和 SocketChannel) 关系梳理图
- 当客户端连接时,会通过 ServerSocketChannel 得到 SocketChannel
- Selector 进行监听 select 方法, 返回有事件发生的通道的个数.
- 将 socketChannel 注册到 Selector 上, register(Selector sel, int ops), 一个 selector 上可以注册多个 SocketChannel
- 注册后返回一个 SelectionKey, 会和该 Selector 关联(集合), 进一步得到各个 SelectionKey (有事件发生)
- 在通过 SelectionKey 反向获取 SocketChannel , 方法 channel()
- 可以通过 得到的 channel , 完成业务处理
8.4 SelectionKey
SelectionKey,表示 Selector 和网络通道的注册关系(或者监听的事件), 共四种:
-
int OP_ACCEPT
:有新的网络连接可以 accept,值为 16 -
int OP_CONNECT
:代表连接已经建立,值为 8 -
int OP_READ
:代表读操作,值为 1 -
int OP_WRITE
:代表写操作,值为 4
8.4.1 SelectionKey 相关方法
8.5 ServerSocketChannel
ServerSocketChannel 在服务器端监听新的客户端 Socket 连接
相关方法如下:
8.6 SocketChannel
SocketChannel,网络 IO 通道,具体负责进行读写操作。NIO 把缓冲区的数据写入通道,或者把通道里的数据读到缓冲区。
相关方法如下:
9.0 一个简单的NIO
非阻塞配合复用器的网络编程
9.1 NIO服务端
/**
* @Author: ZhiHao
* @Date: 2022/1/25 17:58
* @Description: NIO网络编程演示
* @Versions 1.0
**/
@Slf4j
public class NIOSelectorServiceDemo {
public static void main(String[] args) throws Exception {
// 1-打开多路复用器
Selector selector = Selector.open();
// 2-打开一个ServerSocketChannel通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 3-绑定到8888端口
serverSocketChannel = serverSocketChannel.bind(new InetSocketAddress(8888));
// 4-ServerSocketChannel通道设置为非阻塞
serverSocketChannel.configureBlocking(false);
// 4-ServerSocketChannel通道注册到Selector多路复用器
SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
// 5-接受请求
for (; ; ) {
// 6-若没有事件就绪, 就阻塞等待直到有事件发生
log.info("NIOServiceDemo-0, 准备阻塞直到等待事件发生");
int select = selector.select();
log.info("NIOServiceDemo-1, select:{}", select);
if (select <= 0) {
log.info("NIOServiceDemo-2, 没有事件发生!");
continue;
}
// 7-遍历复用器里面的所有事件key
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
// 获取事件key
SelectionKey key = iterator.next();
// 手动移除当前selectionKey, 防止重复操作
iterator.remove();
// 8-当key的事件绑定的是接收连接进行处理
SocketChannel clientSocketChannel = null;
if (key.isAcceptable()) {
log.info("NIOServiceDemo-3, 发生接受连接事件!");
ServerSocketChannel serverSocketChannel1 = (ServerSocketChannel) key.channel();
clientSocketChannel = serverSocketChannel1.accept();
clientSocketChannel.configureBlocking(false);
// 9-服务端通道注册读取事件到复用器, 并设置一个通道共享数据
// (这里设置容量10是为了演示一次性读不完进行扩展例子, 正常应该经过客户端协商定义大小)
clientSocketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(10));
}
// 10-当key的事件是读取事件时候进行处理
try {
// 关闭客户端时候总会触发一个客户端事件
if (key.isReadable()) {
log.info("NIOServiceDemo-4, 发生读取事件!");
clientSocketChannel = (SocketChannel) key.channel();
// 11-获取通道共享数据
ByteBuffer attachment = (ByteBuffer) Optional.ofNullable(key.attachment()).orElseGet(()-> ByteBuffer.allocate(1024));
int read;
while ((read = clientSocketChannel.read(attachment)) > 0) {
log.info("NIOServiceDemo-read, 读取到的数量:{}", read);
// position=limit, 说明ByteBuffer容量不足以一次性全部读完通道数据, 需要进行扩容
if (attachment.position() == attachment.limit()) {
ByteBuffer newByteBuffer = ByteBuffer.allocate(attachment.limit() * 2);
attachment.flip(); // 转换读取模式
newByteBuffer.put(attachment);
attachment = newByteBuffer;
key.attach(attachment); // 替换对应通道的共享数据
}
}
// 如果是客户端正常主动取消, read = -1
if (read == -1) {
log.error("NIOServiceDemo-cance, 客户端主动关闭了通道!");
cancelAndClose(key, clientSocketChannel);
continue;
}
attachment.flip();
String str = new String(attachment.array(), attachment.position(), attachment.limit());
System.out.println(str);
// 12-数据写回给客户端 (wrap方法会根据内容分配大小)
clientSocketChannel.write(ByteBuffer.wrap((str + "我写回数据啦").getBytes(StandardCharsets.UTF_8)));
}
} catch (IOException e) {
log.error("NIOServiceDemo-5, 客户端异常断开了!, 进行取消注册与关闭通道");
cancelAndClose(key, clientSocketChannel);
}
}
}
}
private static void cancelAndClose(SelectionKey key, SocketChannel clientSocketChannel) {
key.cancel(); // 取消注册
IoUtil.close(clientSocketChannel); // 关闭通道
}
}
9.2 NIO客户端
@Slf4j
public class NIOClientDemo {
public static void main(String[] args) throws Exception {
// 1-打开多路复用器 (思考这里是不是可以使用服务端相同的一个多路复用器)
Selector selector = Selector.open();
// 2-打开得到ClientSocketChannel
SocketChannel clientSocketChannel = SocketChannel.open();
// 3-设置非阻塞 (默认是true阻塞)
clientSocketChannel.configureBlocking(false);
// 4-客户端通道注册连接事件到复用器
SelectionKey clientSelectionKey = clientSocketChannel.register(selector, SelectionKey.OP_CONNECT);
// 5-连接服务端
boolean connect = clientSocketChannel.connect(new InetSocketAddress("127.0.0.1", 8888));
log.info("NIOClientDemo-1, 连接需要时间, 这里不会阻塞,结果connect:{}", connect);
for (; ; ) {
// 6-若没有事件就绪, 就阻塞等待直到有事件发生
log.info("NIOClientDemo-0, 准备阻塞直到等待事件发生");
int select = selector.select();
log.info("NIOClientDemo-1, 结果select:{}", select);
if (select == 0) {
log.info("NIOClientDemo-2, 没有事件发生!");
continue;
}
// 7-获取所有的事件
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
// 8-是连接事件
if (key.isConnectable()) {
log.info("NIOClientDemo-3, 连接事件发生!");
SocketChannel socketChannel = (SocketChannel) key.channel();
// 如果没有成功则循环等待
while (!socketChannel.finishConnect()) {}
socketChannel.configureBlocking(false);
// 9-客户端通道注册一个读取服务器响应写回的数据事件 (注意如果通道没有设置为非阻塞,注册会抛出异常)
socketChannel.register(selector, SelectionKey.OP_READ);
// 10-响应数据给服务端
// 包装字节数组方式, 数组长度多长就分配多大buffer, 节约空间
ByteBuffer byteBuffer = ByteBuffer.wrap("测试发送参数666".getBytes(StandardCharsets.UTF_8));
socketChannel.write(byteBuffer);
// ByteBuffer allocate = ByteBuffer.allocate(1000);
// allocate.put("测试发送参数666".getBytes(StandardCharsets.UTF_8));
// allocate.flip();
// socketChannel.write(allocate);
}
// 11-是读取事件, 进行读取数据
if (key.isReadable()) {
log.info("NIOClientDemo-4, 读取服务端事件发生!");
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024 * 1024);
socketChannel.read(byteBuffer);
byteBuffer.flip();
String str = new String(byteBuffer.array(), byteBuffer.position(), byteBuffer.limit());
System.out.println(str);
}
}
}
}
}
结果:
16:34:27.062 [main] INFO com.zhihao.nio.NIOServiceDemo - NIOServiceDemo-0, 准备阻塞直到等待事件发生
16:34:31.030 [main] INFO com.zhihao.nio.NIOServiceDemo - NIOServiceDemo-1, select:1
16:34:31.032 [main] INFO com.zhihao.nio.NIOServiceDemo - NIOServiceDemo-3, 发生接受连接事件!
16:34:31.032 [main] INFO com.zhihao.nio.NIOServiceDemo - NIOServiceDemo-0, 准备阻塞直到等待事件发生
16:34:31.037 [main] INFO com.zhihao.nio.NIOServiceDemo - NIOServiceDemo-1, select:1
16:34:31.037 [main] INFO com.zhihao.nio.NIOServiceDemo - NIOServiceDemo-4, 发生读取事件!
测试发送参数666
16:34:31.038 [main] INFO com.zhihao.nio.NIOServiceDemo - NIOServiceDemo-0, 准备阻塞直到等待事件发生
-------------------------------------------------------------------------------------
16:34:31.032 [main] INFO com.zhihao.nio.NIOClientDemo - NIOClientDemo-1, 连接需要时间, 这里不会阻塞,结果connect:false
16:34:31.036 [main] INFO com.zhihao.nio.NIOClientDemo - NIOClientDemo-0, 准备阻塞直到等待事件发生
16:34:31.037 [main] INFO com.zhihao.nio.NIOClientDemo - NIOClientDemo-1, 结果select:1
16:34:31.037 [main] INFO com.zhihao.nio.NIOClientDemo - NIOClientDemo-3, 连接事件发生!
16:34:31.038 [main] INFO com.zhihao.nio.NIOClientDemo - NIOClientDemo-0, 准备阻塞直到等待事件发生
16:34:31.038 [main] INFO com.zhihao.nio.NIOClientDemo - NIOClientDemo-1, 结果select:1
16:34:31.038 [main] INFO com.zhihao.nio.NIOClientDemo - NIOClientDemo-4, 读取服务端事件发生!
测试发送参数666我写回数据啦
16:34:31.039 [main] INFO com.zhihao.nio.NIOClientDemo - NIOClientDemo-0, 准备阻塞直到等待事件发生
10. 为什么处理完需要remove()
因为触发事件添加进
selectedKeys
容器中, 而selectedKeys
又不会主动删除, 下次迭代事件selectedKeys
的时候就会处理了这些已经处理过的通道事件, 但这些事件又没有真正发生事件, 操作就会报错。
扩展
非阻塞模式配合复用器
- 打开对应服务/客户端的通道, 设置异步模式
- 打开Selector, 并将对应通道监听那些事件注册到Selector
- 阻塞监听通道是否发生事件
- 发生事件后遍历所有的事件key
- 根据事件key类型执行对应操作类型
阻塞模式
- 打开对应服务/客户端的通道, 设置阻塞
- 当接收到新连接后, 开启一个新线程进行阻塞读
服务端
缺点是当有很多客户端连接时候, 就会有多少个线程进行处理! 线程资源很宝贵的
private static ExecutorService executors = Executors.newCachedThreadPool();
public static void main(String[] args) throws Exception {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(8888));
serverSocketChannel.configureBlocking(true); // 默认true
ByteBuffer allocate = ByteBuffer.allocate(10);
StringBuilder builder = new StringBuilder();
for (; ; ) {
log.info("线程名称:{}, 阻塞等待连接!",Thread.currentThread().getName());
// 这里阻塞当前线程等待新连接
SocketChannel channel = serverSocketChannel.accept();
channel.configureBlocking(true);
executors.execute(()->{
// 接收到的通道也设置是阻塞的情况, channel.read()读取完了之后, 在次读也会阻塞,
// 这个线程就被阻塞了, 所以需要新开线程处理, 这也是阻塞IO的缺点, 每个连接都需要开一个线程处理
try {
log.info("阻塞的连接成功!");
int read;
while ((read =channel.read(allocate)) > 0) {
log.info("读取到多少个字节:{}",read);
allocate.flip();
builder.append(new String(allocate.array(),allocate.position(),allocate.limit()));
allocate.clear();
log.info("读取的结果:{}",builder.toString());
}
} catch (IOException e) {
e.printStackTrace();
}
});
}
}
客户端 (多个也是代码一样)
public static void main(String[] args) throws Exception {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(true);
socketChannel.connect(new InetSocketAddress("127.0.0.1", 8888));
while (!socketChannel.finishConnect()) {
}
waitWrite(socketChannel);
LockSupport.park();
}
private static void waitWrite(SocketChannel channel) throws IOException {
Scanner scan = new Scanner(System.in);
log.info("GroupChatNIOClient1号, 求输入需要发送的信息, 格式 1-xxxxx");
System.out.print("输入数据:");
String str = null; // 接收数据
while (StrUtil.isNotBlank((str = scan.next()))) {
break;
}
String[] split = str.split("-");
JSONObject jsonObject = new JSONObject();
if (split.length > 1) {
jsonObject.putOnce("clientNum", split[0]);
jsonObject.putOnce("data", split[1]);
} else {
jsonObject.putOnce("data", split[0]);
}
log.info("GroupChatNIOClient1号, 输入需要发送的信息为:{}", jsonObject.toString());
channel.write(ByteBuffer.wrap(jsonObject.toString().getBytes(StandardCharsets.UTF_8)));
}
11:34:55.292 [main] INFO com.zhihao.nio.BlockNIOService - 线程名称:main, 阻塞等待连接!
11:34:59.061 [main] INFO com.zhihao.nio.BlockNIOService - 线程名称:main, 阻塞等待连接!
11:34:59.061 [pool-1-thread-1] INFO com.zhihao.nio.BlockNIOService - 阻塞的连接成功!
11:35:08.550 [pool-1-thread-1] INFO com.zhihao.nio.BlockNIOService - 读取到多少个字节:10
11:35:08.551 [pool-1-thread-1] INFO com.zhihao.nio.BlockNIOService - 读取的结果:{"data":"8
11:35:08.551 [pool-1-thread-1] INFO com.zhihao.nio.BlockNIOService - 读取到多少个字节:10
11:35:08.551 [pool-1-thread-1] INFO com.zhihao.nio.BlockNIOService - 读取的结果:{"data":"88888888888
11:35:08.551 [pool-1-thread-1] INFO com.zhihao.nio.BlockNIOService - 读取到多少个字节:7
11:35:08.551 [pool-1-thread-1] INFO com.zhihao.nio.BlockNIOService - 读取的结果:{"data":"8888888888888888"}
11:35:22.601 [main] INFO com.zhihao.nio.BlockNIOService - 线程名称:main, 阻塞等待连接!
11:35:22.601 [pool-1-thread-2] INFO com.zhihao.nio.BlockNIOService - 阻塞的连接成功!
11:35:33.533 [pool-1-thread-2] INFO com.zhihao.nio.BlockNIOService - 读取到多少个字节:10
11:35:33.533 [pool-1-thread-2] INFO com.zhihao.nio.BlockNIOService - 读取的结果:{"data":"8888888888888888"}{"data":"9
11:35:33.533 [pool-1-thread-2] INFO com.zhihao.nio.BlockNIOService - 读取到多少个字节:4
11:35:33.533 [pool-1-thread-2] INFO com.zhihao.nio.BlockNIOService - 读取的结果:{"data":"8888888888888888"}{"data":"998"}
// -----------------------------------------------------------------------------------
输入数据:8888888888888888
11:35:08.548 [main] INFO com.zhihao.nio.BlockNIOClient1 - , 输入需要发送的信息为:{"data":"8888888888888888"}
//-----------------------------------------------------------------------------------
输入数据:998
11:35:33.531 [main] INFO com.zhihao.nio.BlockNIOClient2 - , 输入需要发送的信息为:{"data":"998"}
非阻塞模式(不配合复用器)
- 打开对应服务/客户端的通道, 设置非阻塞
- 当接收到新连接后, 只开启一个新线程进行非阻塞循环读取所有通道里面内容
服务端
缺点, 有二个线程会不断的自旋, 大量消耗CPU资源!
private static int num = 0;
private static int num1 = 0;
private static int num2 = 0;
public static void main(String[] args) throws Exception {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(8888));
serverSocketChannel.configureBlocking(false); // 默认true
ByteBuffer allocate = ByteBuffer.allocate(8);
StringBuilder builder = new StringBuilder();
List<SocketChannel> NotBlockSocketChannel = new CopyOnWriteArrayList<>();
for (; ; ) {
if (num1 < 3) {
log.info("不阻塞等待连接!, 只打印3次日志");
num1++;
}
// 这里不阻塞当前线程等待新连接, 但是没有新连接的情况下, 这里会一直疯狂每循环一次都返回null
SocketChannel channel = serverSocketChannel.accept();
if (Objects.isNull(channel)) {
if (num2 < 3) {
log.info("没有连接每循环一次都返回null! 只打印3次日志");
num2++;
}
continue;
}
channel.configureBlocking(false);
// 接收到的通道也设置是非阻塞的情况,
log.info("非阻塞的连接成功!, 非阻塞通道添加进通道集合, 不断循环读取集合中通道的内容");
NotBlockSocketChannel.add(channel);
// 只需要开启一个线程不断循环读取集合通道里面的内容
if (num == 0) {
new Thread(() -> {
for (; ; ) {
try {
for (SocketChannel socketChannel : NotBlockSocketChannel) {
int read;
while ((read = socketChannel.read(allocate)) > 0) {
log.info("读取到多少个字节:{}", read);
allocate.flip();
builder.append(new String(allocate.array(), allocate.position(), allocate.limit()));
allocate.clear();
log.info("读取的结果:{}", builder.toString());
}
builder.setLength(0);
}
} catch (IOException e) {
}
}
}).start();
num++;
}
}
}
客户端 (多个也是代码一样)
public static void main(String[] args) throws Exception {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress("127.0.0.1", 8888));
while (!socketChannel.finishConnect()) {
}
waitWrite(socketChannel);
LockSupport.park();
}
private static void waitWrite(SocketChannel channel) throws IOException {
Scanner scan = new Scanner(System.in);
log.info("NotBlockNIOClient1号, 求输入需要发送的信息, 格式 1-xxxxx");
System.out.print("输入数据:");
String str = null; // 接收数据
while (StrUtil.isNotBlank((str = scan.next()))) {
break;
}
String[] split = str.split("-");
JSONObject jsonObject = new JSONObject();
if (split.length > 1) {
jsonObject.putOnce("clientNum", split[0]);
jsonObject.putOnce("data", split[1]);
} else {
jsonObject.putOnce("data", split[0]);
}
log.info("NotBlockNIOClient1号, 输入需要发送的信息为:{}", jsonObject.toString());
channel.write(ByteBuffer.wrap(jsonObject.toString().getBytes(StandardCharsets.UTF_8)));
}
12:02:50.068 [main] INFO com.zhihao.nio.NotBlockNIOService - 不阻塞等待连接!, 只打印3次日志
12:02:50.072 [main] INFO com.zhihao.nio.NotBlockNIOService - 没有连接每循环一次都返回null! 只打印3次日志
12:02:50.072 [main] INFO com.zhihao.nio.NotBlockNIOService - 不阻塞等待连接!, 只打印3次日志
12:02:50.072 [main] INFO com.zhihao.nio.NotBlockNIOService - 没有连接每循环一次都返回null! 只打印3次日志
12:02:50.072 [main] INFO com.zhihao.nio.NotBlockNIOService - 不阻塞等待连接!, 只打印3次日志
12:02:50.072 [main] INFO com.zhihao.nio.NotBlockNIOService - 没有连接每循环一次都返回null! 只打印3次日志
12:02:54.680 [main] INFO com.zhihao.nio.NotBlockNIOService - 非阻塞的连接成功!, 非阻塞通道添加进通道集合, 不断循环读取集合中通道的内容
12:02:58.088 [Thread-0] INFO com.zhihao.nio.NotBlockNIOService - 读取到多少个字节:8
12:02:58.091 [Thread-0] INFO com.zhihao.nio.NotBlockNIOService - 读取的结果:{"data":
12:02:58.091 [Thread-0] INFO com.zhihao.nio.NotBlockNIOService - 读取到多少个字节:8
12:02:58.091 [Thread-0] INFO com.zhihao.nio.NotBlockNIOService - 读取的结果:{"data":"8978978
12:02:58.091 [Thread-0] INFO com.zhihao.nio.NotBlockNIOService - 读取到多少个字节:4
12:02:58.091 [Thread-0] INFO com.zhihao.nio.NotBlockNIOService - 读取的结果:{"data":"897897897"}
12:03:03.570 [main] INFO com.zhihao.nio.NotBlockNIOService - 非阻塞的连接成功!, 非阻塞通道添加进通道集合, 不断循环读取集合中通道的内容
12:03:08.330 [Thread-0] INFO com.zhihao.nio.NotBlockNIOService - 读取到多少个字节:8
12:03:08.331 [Thread-0] INFO com.zhihao.nio.NotBlockNIOService - 读取的结果:{"data":
12:03:08.331 [Thread-0] INFO com.zhihao.nio.NotBlockNIOService - 读取到多少个字节:6
12:03:08.331 [Thread-0] INFO com.zhihao.nio.NotBlockNIOService - 读取的结果:{"data":"998"}
// ---------------------------------------------
输入数据:897897897
12:02:58.086 [main] INFO com.zhihao.nio.NotBlockNIOClient1 - NotBlockNIOClient1号, 输入需要发送的信息为:{"data":"897897897"}
// ----------------------------------------------
输入数据:998
12:03:08.328 [main] INFO com.zhihao.nio.NotBlockNIOClient1 - NotBlockNIOClient1号, 输入需要发送的信息为:{"data":"998"}
直接缓冲区DirectBuffer
零拷贝指的是数据无需拷贝到 JVM 内存中,同时具有以下三个优点
- 更少的用户态与内核态的切换
- 不利用 cpu 计算,减少 cpu 缓存伪共享
- 零拷贝适合小文件传输
传统的 IO 将一个文件通过 socket 写出 内存流程如下
-
java 本身并不具备 IO 读写能力,因此 read 方法调用后,要从 Java 程序的用户态切换至内核态,去调用操作系统(Kernel)的读能力,将数据读入内核缓冲区。这期间用户线程阻塞,操作系统使用 DMA(Direct Memory Access)来实现文件读,其间也不会使用 CPU
DMA 也可以理解为硬件单元,用来解放 cpu 完成文件 IO
-
从内核态切换回用户态,将数据从内核缓冲区读入用户缓冲区(即 byte[] buf),这期间 CPU 会参与拷贝,无法利用 DMA
-
调用 write 方法,这时将数据从用户缓冲区(byte[] buf)写入 socket 缓冲区,CPU 会参与拷贝
-
接下来要向网卡写数据,这项能力 Java 又不具备,因此又得从用户态切换至内核态,调用操作系统的写能力,使用 DMA 将 socket 缓冲区的数据写入网卡,不会使用 CPU
可以看到中间环节较多,java 的 IO 实际不是物理设备级别的读写,而是缓存的复制,底层的真正读写是操作系统来完成的
- 用户态与内核态的切换发生了 3 次,这个操作比较重量级
- 数据拷贝了共 4 次
NIO通过了DirectBuffer
进行了优化
NIO 的 Buffer 除了做了缓冲块优化之外,还提供了一个可以直接访问物理内存的类 DirectBuffer。普通的 Buffer 分配的是 JVM 堆内存,而 DirectBuffer 是直接分配物理内存 (非堆内存)。
DirectBuffer 则是直接将步骤简化为数据直接保存到非堆内存,从而减少了一次数据拷贝。
由于 DirectBuffer 申请的是非 JVM 的物理内存,所以创建和销毁的代价很高。DirectBuffer 申请的内存并不是直接由 JVM 负责垃圾回收,但在 DirectBuffer 包装类被回收时,会通过 Java Reference 机制来释放该内存块。
DirectBuffer 只优化了用户空间内部的拷贝,而之前我们是说优化用户空间和内核空间的拷贝,那 Java 的 NIO 中是否能做到减少用户空间和内核空间的拷贝优化呢?
答案是可以的,DirectBuffer 是通过 unsafe.allocateMemory(size) 方法分配内存,也就是基于本地类 Unsafe 类调用 native 方法进行内存分配的。而在 NIO 中,还存在另外一个 Buffer 类:MappedByteBuffer,跟 DirectBuffer 不同的是,MappedByteBuffer 是通过本地类调用 mmap 进行文件内存映射的,map() 系统调用方法会直接将文件从硬盘拷贝到用户空间,只进行一次数据拷贝,从而减少了传统的 read() 方法从硬盘拷贝到内核空间这一步。
ByteBuffer 粘包与半包
网络上有多条数据发送给服务端,数据之间使用 \n 进行分隔
但由于某种原因这些数据在接收时,被进行了重新组合,例如原始数据有3条为
- Hello,world\n
- I’m Nyima\n
- How are you?\n
变成了下面的两个 byteBuffer (粘包,半包)
- Hello,world\nI’m Nyima\nHo
- w are you?\n
出现原因
- 粘包
发送方在发送数据时,并不是一条一条地发送数据,而是将数据整合在一起,当数据达到一定的数量后再一起发送。这就会导致多条信息被放在一个缓冲区中被一起发送出去
- 半包
接收方的缓冲区的大小是有限的,当接收方的缓冲区满了以后,就需要将信息截断,等缓冲区空了以后再继续放入数据。这就会发生一段完整的数据最后被截断的现象
暂时的解决办法
- 通过get(index)方法遍历ByteBuffer,遇到分隔符时进行处理。注意:get(index)不会改变position的值
- 记录该段数据长度,以便于申请对应大小的缓冲区
- 将缓冲区的数据通过get()方法写入到target中
- 调用compact方法切换模式,因为缓冲区中可能还有未读的数据
public class ByteBufferDemo {
public static void main(String[] args) {
ByteBuffer buffer = ByteBuffer.allocate(32);
// 模拟粘包+半包
buffer.put("Hello,world\nI'm Nyima\nHo".getBytes());
// 调用split函数处理
split(buffer);
buffer.put("w are you?\n".getBytes());
split(buffer);
}
private static void split(ByteBuffer buffer) {
// 切换为读模式
buffer.flip();
for(int i = 0; i < buffer.limit(); i++) {
// 遍历寻找分隔符
if (buffer.get(i) == '\n') {
// 缓冲区长度
int length = i+1-buffer.position();
ByteBuffer target = ByteBuffer.allocate(length);
// 将前面的内容写入target缓冲区
for(int j = 0; j < length; j++) {
// 将buffer中的数据写入target中
target.put(buffer.get());
}
// 打印查看结果
ByteBufferUtil.debugAll(target);
}
}
// 切换为写模式,但是缓冲区可能未读完,这里需要使用compact
buffer.compact();
}
}
ByteBuffer 大小分配
- 每个 channel 都需要记录可能被切分的消息,因为 ByteBuffer 不能被多个 channel 共同使用,因此需要为每个 channel 维护一个独立的 ByteBuffer
- ByteBuffer 不能太大,比如一个 ByteBuffer 1Mb 的话,要支持百万连接就要 1Tb 内存,因此需要设计大小可变的 ByteBuffer
- 一种思路是首先分配一个较小的 buffer,例如 4k,如果发现数据不够,再分配 8k 的 buffer,将 4k buffer 内容拷贝至 8k buffer,优点是消息连续容易处理,缺点是数据拷贝耗费性能,参考实现 http://tutorials.jenkov.com/java-performance/resizable-array.html
- 另一种思路是用多个数组组成 buffer,一个数组不够,把多出来的内容写入新的数组,与前面的区别是消息存储不连续解析复杂,优点是避免了拷贝引起的性能损耗
一次无法写完例子
- 非阻塞模式下,无法保证把 buffer 中所有数据都写入 channel,因此需要追踪 write 方法的返回值(代表实际写入字节数)
- 用 selector 监听所有 channel 的可写事件,每个 channel 都需要一个 key 来跟踪 buffer,但这样又会导致占用内存过多,就有两阶段策略
- 当消息处理器第一次写入消息时,才将 channel 注册到 selector 上
- selector 检查 channel 上的可写事件,如果所有的数据写完了,就取消 channel 的注册
- 如果不取消,会每次可写均会触发 write 事件
public static void main(String[] args) throws IOException {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
ssc.bind(new InetSocketAddress(8888));
Selector selector = Selector.open();
ssc.register(selector, SelectionKey.OP_ACCEPT);
while(true) {
selector.select();
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isAcceptable()) {
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
SelectionKey sckey = sc.register(selector, SelectionKey.OP_READ);
// 1. 向客户端发送内容
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 30000000; i++) {
sb.append("a");
}
ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
int write = sc.write(buffer);
// 3. write 表示实际写了多少字节
System.out.println("实际写入字节:" + write);
// 4. 如果有剩余未读字节,才需要关注写事件
if (buffer.hasRemaining()) {
log.info("NIOServiceDemo-通道写了:{}缓冲池消耗跟不上了, 还有{}未完写的数据, 给当前通道事件key添加多个写事件, 当通道可写时候触发",write,buffer.limit()-write);
// read 1 write 4
// 在原有关注事件的基础上,多关注 写事件 (关注了读+写)
sckey.interestOps(sckey.interestOps() + SelectionKey.OP_WRITE);
// 把 buffer 作为附件加入 sckey
sckey.attach(buffer);
}
} else if (key.isWritable()) {
// 取没写完的附件、通道
ByteBuffer buffer = (ByteBuffer) key.attachment();
SocketChannel sc = (SocketChannel) key.channel();
// 进行写
int write = sc.write(buffer);
log.info("NIOServiceDemo-进行再次进行写未写完的数据, 实际写入字节:{}",write);
if (!buffer.hasRemaining()) {
// 写完了, 取消关注写事件与附件
key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);
key.attach(null);
}
}
}
}
}
1