[TOC]
NIO && Netty
文章目录 |
---|
基于《Netty源码剖析与应用》《Java高并发核心编程_卷1》记录 |
Netty面试题(2020最新版)_ThinkWon的博客-CSDN博客 |
一、NIO基础
1.1 IO读写的基本原理
为了避免用户进程直接操作内核,保证内核安全,操作系统将内存(虚拟内存)划分为两部分
- 内核空间(Kernel-Space)
- 用户空间(User-Space)
在用户态与内核态交互读写时,并不是每次IO操作都会直接去调用内核的读写,这样会加大IO的开销,并且减少硬盘的寿命
所以在这里引入了缓冲区
的概念,缓冲区分为两种
- 内核缓冲区(唯一)
- 用户缓冲区(多用户进程,不唯一)
接下来解读一下上述的交互过程
其实,IO操作本质上也是内核缓冲区
和用户缓冲区
的一种数据交互过程
1.2 主要的5种IO模型
同步阻塞IO
Blocking IO,字面意思,在进行IO操作的时候,该线程是同步并且是阻塞状态,传统的IO流库就是该模式
但是在这里要解释一下同步和异步的概念
-
同步
用户空间(进程或者线程)是主动发起IO请求的一方,系统内核是被动接收方
-
异步
系统内核是主动发起IO请求的一方,用户空间是被动接收方
1、用户进行IO操作,在内核缓冲区等待数据准备
2、准备好了,开始复制到用户缓冲区
3、复制完成,立即返回,这整个过程都是阻塞的
同步非阻塞IO
Non-Blocking IO
用户空间的程序不需要等待内核IO操作彻底完成,可以立即返回用户空间去执行后续的指令
与此同时,内核会立即返回给用户一个IO状态值
但是!这里的NIO不是指Java编程中的NIO!那个NIO是指多路复用模型
用户发起IO请求,询问内核缓冲区是否存在数据
- 没有数据就立刻返回,并下次再来问你(
不断轮询
) - 有数据,就进入阻塞模式,开始复制内核缓冲区数据到用户缓冲区,复制完成后返回
IO多路复用
IO Multiplexing
如何避免同步非阻塞IO模型中不断轮询等待的问题呢,真相只有一个,那就是IO多路复用
多路是指多个网络连接,复用指的是同一个线程意思说一个或一组线程处理多个TCP连接
最大优势是减少系统开销小,不必创建过多的进程/线程,也不必维护这些进程/线程
IO多路复用模型是建立在操作系统的基础设施之上,如果想要使用这种模型,操作系统的内核必须能够提供多路分离的系统调用select/epoll
信号驱动IO
Signal Drive IO
应用进程使用 sigaction 系统调用,内核立即返回,应用进程可以继续执行,也就是说等待数据阶段应用进程是非阻塞的。
内核在数据到达时向应用进程发送 SIGIO 信号,应用进程收到之后在信号处理程序中调用 recvfrom 将数据从内核复制到应用进程中。
异步IO
Asynchronous IO
这个模型是我认为比较给力的,为什么呢?
用户线程通过系统调用向内核注册某个IO操作,内核在整个IO操作(包括数据准备、数据复制)完成后通知用户程序,用户执行后续的业务操作
异步IO和信号驱动IO的本质区别
- 异步IO
read调用 ——— 用户缓冲区复制完成了,告诉线程一声
- 信号驱动IO
sigaction调用 — 数据准备好了,告诉线程一声 ——— 用户缓冲区复制完成了,告诉线程一声
1.3 Selector - Poll - ePoll
上面有提到,如果要使用多路复用技术,就需要系统内核提供多路分离的底层支持,Selector - Poll - ePoll 这三者都能提供多路I/O的解决方案,下面进行介绍
Select
Poll
本质与Select技术没太大区别,存储文件描述符
的集合原来是 BitsMap ,现在换成了动态数组+链表的形式,突破了 select 的文件描述符个数限制
ePoll
主要是两个方面
- epoll 在内核里使用
红黑树来跟踪进程所有待检测的文件描述字
- epoll 使用事件驱动的机制,内核里维护了一个
链表来记录就绪事件
select | poll | epoll | |
---|---|---|---|
操作方式 | 遍历 | 遍历 | 回调 |
底层实现 | 数组 | 链表 | 红黑树 |
IO效率 | 每次调用都进行线性遍历,时间复杂度为O(n) | 每次调用都进行线性遍历,时间复杂度为O(n) | 事件通知方式,每当fd就绪,系统注册的回调函数就会被调用,将就绪fd放到readyList里面,时间复杂度O(1) |
最大连接数 | 1024(x86)或2048(x64) | 无上限 | 无上限 |
fd拷贝 | 每次调用select,都需要把fd集合从用户态拷贝到内核态,再拷贝回用户态 | 每次调用poll,都需要把fd集合从用户态拷贝到内核态,再拷贝回用户态 | 调用epoll_ctl时拷贝进内核并保存,之后每次epoll_wait不拷贝 |
1.4 系统并发限制
默认情况下,在Linux系统中输入ulimit -n
可以查看当前系统所能承受的最大文件句柄数,这直接关系到能承受的一次性并发量的大小,所以我们需要通过修改,来让系统能够承受更大的并发空间
root@ubuntu:/home/ljm/Desktop# ulimit -n
1024
root@ubuntu:/home/ljm/Desktop# ulimit -n 10000
root@ubuntu:/home/ljm/Desktop# ulimit -n
10000
root@ubuntu:/home/ljm/Desktop#
文件句柄数不够,会导致什么后果呢?当单个进程打开的文件句柄数量超过了系统配置的上限值时会发出“Socket/File:Can't open so many files”的错误提示
上面这种操作是临时性的,我们要永久化就需要修改配置文件/etc/security/limits.conf
来进行硬性放开
你学废了吗
二、NIO详解
在1.4版本之前,Java IO类库是阻塞IO;从1.4版本开始,引进了新的异步IO库,被称为Java New IO类库,简称为Java NIO。New IO类库的目标就是要让Java支持非阻塞IO
2.1 BIO和NIO的区别
IO | NIO |
---|---|
面向流(Stream Orientend) | 面向缓冲区(Buffer Orientend) |
阻塞IO(Blocking IO ) | 非阻塞IO(Non Blocking IO) |
选择器(Selector) |
2.2 核心组件
NIO属于 —— I/O多路复用模型,所以讲解核心组件前我们看看这张图
Channel
NIO中,一个网络连接用一个通道表示,一个通道类似于 输出/输入流 的结合,可以写也可以读
通道也分为很多种类,主要实现类有这几种
- FileChannel 读写文件数据
- SocketChannel 通过TCP读写网络中的数据
- ServerSocketChannel 监听TCP连接,每一个新进来的连接都会创建一个SocketChannel
- DatagramChannel 通过UDP读写网络中的数据
如果再细致一些划分的话
- NioSocketChannel: 异步非阻塞TCP Socket传输通道
- NioServerSocketChannel:异步非阻塞TCP Socket服务端监听通道
- NioDatagramChannel: 异步非阻塞的UDP传输通道
- NioSctpChannel: 异步非阻塞Sctp传输通道
- NioSctpServerChannel: 异步非阻塞Sctp服务端监听通道
- OioSocketChannel: 同步阻塞式TCP Socket传输通道
- OioServerSocketChannel:同步阻塞式TCP Socket服务端监听通道
- OioDatagramChannel: 同步阻塞式UDP传输通道
- OioSctpChannel: 同步阻塞式Sctp传输通道
- OioSctpServerChannel: 同步阻塞式Sctp服务端监听通道
示例代码
public class Test01 {
public static void main(String[] args) throws IOException {
// 获取文件流
FileInputStream fis = new FileInputStream("D:\\###临时放一下\\tou.jpg");
FileOutputStream fos = new FileOutputStream("D:\\###临时放一下\\tou1.jpg");
// 获取通道
FileChannel inChannel = fis.getChannel();
FileChannel outChannel = fos.getChannel();
// 分配指定大小缓存区
// position 0 ,limit 1024
ByteBuffer buff = ByteBuffer.allocate(1024);
// 将通道的数据存入缓存区
while(inChannel.read(buff)!=-1){
// position 1024 ,limit 1024 ,相当于put
// 切换读模式
buff.flip();
// 将缓存去的数据写入通道
outChannel.write(buff);
// position 1024 ,limit 1024,相当于get
// 清空缓冲区
buff.clear();
}
/**
* 关闭输入输出通道,关闭输入输出流通道
*/
outChannel.close();
inChannel.close();
fis.close();
fos.close();
}
}
Buffer
缓冲区,是NIO非阻塞的重要前提和基础之一,通过把数据放在缓冲区,实现非阻塞的读写
常见的 Buffer 有
-
ByteBuffer
- MappedByteBuffer
- DirectByteBuffer
- HeapByteBuffer
- ShortBuffer
- IntBuffer
- LongBuffer
- FloatBuffer
- DoubleBuffer
- CharBuffer
Buffer类是一个非线程安全类
下面简述一下这个类的四个重要属性说明
mark 和 reset
- mark 是在读取时,做一个标记,即使 position 改变,只要调用 reset 就能回到 mark 的位置
- rewind 和 flip 都会清除 mark 位置
使用Buffer类的基本步骤
(1)使用创建子类实例对象的allocate()
方法创建一个Buffer类的实例对象
(2)调用put()
方法将数据写入缓冲区中
(3)写入完成后,在开始读取数据前调用Buffer.flip()
方法,将缓冲区转换为读模式
(4)调用get()
方法,可以从缓冲区中读取数据
(5)读取完成后,调用Buffer.clear()
方法或Buffer.compact()
方法,将缓冲区转换为写模式,可以继续写入
Selector
IO多路复用指的是一个进程/线程可以同时监视多个文件描述符(含socket连接)
,一旦其中的一个或者多个文件描述符可读或者可写,该监听进程/线程就能够进行IO就绪事件的查询,这个监听就绪事件就由选择器来进行工作
可供选择器监控的通道IO事件类型包括以下四种
- 可读:SelectionKey.OP_READ
- 可写:SelectionKey.OP_WRITE
- 连接:SelectionKey.OP_CONNECT
- 接收:SelectionKey.OP_ACCEPT
以上事件类型常量定义在SelectionKey类中。如果选择器要监控通道的多种事件,可以用“按位或”运算符来实现。例如,同时监控可读和可写IO事件:
// 监控通道的多种事件,可以用“按位或”运算符来实现
int key = SelectionKey.OP_READ | SelectionKey.OP_WRITE
实战案例(1)丢弃服务
NioDiscardServer案例
服务端
package com.ljm;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.logging.Logger;
/**
* @author 李家民
*/
public class NioDiscardServer {
public static void startServer() throws IOException {
// 1、获取Selector选择器
Selector selector = Selector.open();
// 2、获取通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 3.设置为非阻塞
serverSocketChannel.configureBlocking(false);
// 4、绑定端口连接
serverSocketChannel.bind(new InetSocketAddress(9000));
// 5、将通道注册到选择器上,并注册的IO事件为:“接收新连接”
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
// 6、轮询感兴趣的I/O就绪事件(选择键集合)
while (selector.select() > 0) {
// 7、获取选择键集合
Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
while (selectedKeys.hasNext()) {
// 8、获取单个的选择键,并处理
SelectionKey selectedKey = selectedKeys.next();
// 9、判断key是具体的什么事件,不同的事件类型,做不同的处理
if (selectedKey.isAcceptable()) {
// 10-1、若选择键的IO事件是“连接就绪”事件,就获取客户端连接,此时新的客户端请求连接,需要获取链接通道,并将通道绑定到选择器上。
SocketChannel socketChannel = serverSocketChannel.accept();
// 10-1、切换为非阻塞模式
socketChannel.configureBlocking(false);
// 10-1、将该通道注册到selector选择器上
socketChannel.register(selector, SelectionKey.OP_READ);
} else if (selectedKey.isReadable()) {
// 10-2、若选择键的IO事件是“可读”事件,读取数据
SocketChannel socketChannel = (SocketChannel) selectedKey.channel();
// 10-2、读取数据
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int length = 0;
while ((length = socketChannel.read(byteBuffer)) >0) {
// 反转模式,变成读模式
byteBuffer.flip();
// 展示数据
System.out.println(new String(byteBuffer.array(), 0, length));
byteBuffer.clear();
}
socketChannel.close();
}
// 15、移除选择键
selectedKeys.remove();
}
}
// 7、关闭连接
serverSocketChannel.close();
}
public static void main(String[] args) throws IOException {
startServer();
}
}
客户端
package com.ljm;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
/**
* @author 李家民
*/
public class NioDiscardClient {
public static void startClient() throws IOException {
// 1、获取通道(channel)
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress(9000));
// 2、切换成非阻塞模式
socketChannel.configureBlocking(false);
//3.不断的自旋、等待连接完成,或者做一些其他的事情
while (!socketChannel.finishConnect()) {
// 这个过程是阻塞的,回想一下IO多路复用模型中的进程监听等待
}
// 4、分配指定大小的缓冲区
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
// 写入数据到缓冲区
byteBuffer.put("hello world1".getBytes());
// 反转
byteBuffer.flip();
socketChannel.write(byteBuffer);
socketChannel.shutdownOutput();
socketChannel.close();
}
public static void main(String[] args) throws IOException {
startClient();
}
}
实战案例(2)文件接收
使用SocketChannel在服务端接收文件的实战案例
服务器
public class MyServer {
public static void main(String[] args) throws IOException {
//服务器通到打开
ServerSocketChannel server = ServerSocketChannel.open();
//绑定端口号
server.bind(new InetSocketAddress(9999)); //参数需要个客户端用他的类
System.out.println("已经连接客户端成功");
//TODO 接收文件
//创建文件输出流
FileOutputStream fos = new FileOutputStream("performance/MyFile/demo17.jpg");
FileChannel channel = fos.getChannel(); // 创建有内存读到硬盘的通道
//连接
server.configureBlocking(true);//阻塞连接
SocketChannel accept = server.accept();//服务端接收客户端之后一定要记得给返回值,后面都是靠返回值操作的
System.out.println("连接完成");
//创建ByteBuffer,来读取接收的数据
ByteBuffer bb = ByteBuffer.allocate(1024);
int a;
while ((a = accept.read(bb)) != -1) { //每读一次收到的内容就存进bb
bb.clear(); //将索引重新指会0
channel.write(bb);//将存进的ByteBuffer对象写进文件输出流
bb.flip();//翻转缓冲区
}
}
}
客户端
public class MyClient {
public static void main(String[] args) throws IOException {
System.out.println("创建客户端成功");
//1.创建客户端,连接服务器
/**
* 传统方法创建客户端对象
* Socket socket = new Socket("127.0.0.1",9999);
*
*/
//通到打开(客户端和服务端通道)
SocketChannel client = SocketChannel.open();//直接用方法,通到打开就可以(创建对象比较特殊)
//连接服务器
client.connect(new InetSocketAddress("127.0.0.1", 9999));
System.out.println("连接服务器成功");
//2.通过通道发数据
/**
* 需要通过ByteBuffer对象
*/
//创建文件输入流
FileInputStream fis = new FileInputStream("performance/MyFile/ab.jpg");
//创建文件流通道(硬盘到内存的通道)
FileChannel channel = fis.getChannel();
/**
* 循环读取到内存中
*/
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int a;
while ((a = channel.read(byteBuffer)) != -1){
byteBuffer.clear();
client.write(byteBuffer); //循环写入(客户端和服务端的通道)
byteBuffer.flip();
}
//3.关闭资源
client.close();
}
}
2.3 零拷贝
主要是为了更少的数据复制及上下文的状态切换,提升性能,内容偏底层了
传统IO拷贝
mmap
SendFile
三、Reactor
(3条消息) 面试官:什么是 Reactor 和 Proactor?_小林coding-CSDN博客
Reactor 翻译过来就是反应器,简单理解就是对接收到的请求进行高效的自反应处理,不过这么说还是太敷衍了,看看下面
3.1 简述
The reactor design pattern is an event handling pattern for handling service requests delivered concurrently to a service handler by one or more inputs. The service handler then demultiplexes the incoming requests and dispatches them synchronously to the associated request handlers.反应器设计模式 是一种事件处理模式,用于处理由一个或多个输入并发地交付给服务处理程序的服务请求。然后,服务处理程序将传入请求解复用,并将它们同步地分发到相关的请求处理程序。
Reactor是基于NIO模型实现的,为什么不用BIO模型理由也是非常容易说明,这是关于单线程阻塞IO
跟多路复用IO
两种模型的比较
由于 Reactor 是一个设计模型,所以有三种实现方案,方案具体使用进程还是线程,要看使用的编程语言以及平台有关
- Java 语言一般使用线程,比如 Netty
- C 语言使用进程和线程都可以,例如 Nginx 使用的是进程,Memcache 使用的是线程
3.2 单 Reactor 单进程 / 线程
首先看看单线程Reactor下的基本模型
应用程序中的对象的作用
- Reactor 对象的作用是监听和分发事件
- Acceptor 对象的作用是获取连接
- Handler 对象的作用是处理业务
接下来,介绍下「单 Reactor 单进程」这个方案:
- Reactor 对象通过
select (IO 多路复用接口)
监听事件,收到事件后通过 Dispatch 进行分发,具体分发给 Acceptor 对象还是 Handler 对象,还要看收到的事件类型; - 如果是连接建立的事件,则交由 Acceptor 对象进行处理,Acceptor 对象会通过 accept 方法 获取连接,并创建一个 Handler 对象来处理后续的响应事件;
- 如果不是连接建立事件, 则交由当前连接对应的 Handler 对象来进行响应;
- Handler 对象通过 read -> 业务处理 -> send 的流程来完成完整的业务流程。
3.3 单 Reactor 多线程 / 进程
详细说一下这个方案
- Reactor 对象通过 select (IO 多路复用接口) 监听事件,收到事件后通过 dispatch 进行分发,具体分发给 Acceptor 对象还是 Handler 对象,还要看收到的事件类型
- 如果是连接建立的事件,则交由 Acceptor 对象进行处理,Acceptor 对象会通过 accept 方法 获取连接,并创建一个 Handler 对象来处理后续的响应事件
- 如果不是连接建立事件, 则交由当前连接对应的 Handler 对象来进行响应
上面的三个步骤和单 Reactor 单线程方案是一样的,接下来的步骤就开始不一样了
- Handler 对象不再负责业务处理,只负责数据的接收和发送,Handler 对象通过 read 读取到数据后,会将数据发给子线程里的 Processor 对象进行业务处理
- 子线程里的 Processor 对象就进行业务处理,处理完后,将结果发给主线程中的 Handler 对象,接着由 Handler 通过 send 方法将响应结果发送给 client
3.4 多 Reactor 多进程 / 线程
主从模型
Reactor和Handler挤在单个线程中会造成非常严重的性能缺陷,可以使用多线程来对基础的Reactor模式进行改造和演进
方案详细说明如下
- 主线程中的 MainReactor 对象通过 select 监控连接建立事件,收到事件后通过 Acceptor 对象中的 accept 获取连接,将新的连接分配给某个子线程
- 子线程中的 SubReactor 对象将 MainReactor 对象分配的连接加入 select 继续进行监听,并创建一个 Handler 用于处理连接的响应事件
- 如果有新的事件发生时,SubReactor 对象会调用当前连接对应的 Handler 对象来进行响应
- Handler 对象通过 read -> 业务处理 -> send 的流程来完成完整的业务流程
3.5 Reactor模式的优缺点
- 与生产者消费者模式对比
二者的相似之处
:在一定程度上,Reactor模式有点类似生产者消费者模式。在生产者消费者模式中,一个或多个生产者将事件加入一个队列中,一个或多个消费者主动从这个队列中拉取(Pull)事件来处理。二者的不同之处
:Reactor模式是基于查询的,没有专门的队列去缓冲存储IO事件,查询到IO事件之后,反应器会根据不同IO选择键(事件)将其分发给对应的Handler来处理。
- 与观察者模式对比
二者的相似之处
:在Reactor模式中,当查询到IO事件后,服务处理程序使用单路/多路分发(Dispatch)策略,同步分发这些IO事件。观察者模式(Observer Pattern)也被称作发布/订阅模式,它定义了一种依赖关系,让多个观察者同时监听某一个主题(Topic)。这个主题对象在状态发生变化时会通知所有观察者,它们能够执行相应的处理。二者的不同之处
:在Reactor模式中,Handler实例和IO事件(选择键)的订阅关系基本上是一个事件绑定到一个Handler,每一个IO事件(选择键)被查询后,反应器会将事件分发给所绑定的Handler,也就是一个事件只能被一个Handler处理;在观察者模式中,同一时刻、同一主题可以被订阅过的多个观察者处理
四、Proactor(AIO)
Proactor是异步网络模型,它相较于多路复用IO,它的效率更高
但是,在Linux下的异步 I/O 是不完善的,aio 系列函数是由 POSIX 定义的异步操作接口,不是真正的操作系统级别支持的,而是在用户空间模拟出来的异步,并且仅仅支持基于本地文件的 aio 异步操作,网络编程中的 socket 是不支持的,这也使得基于 Linux 的高性能网络程序都是使用 Reactor 方案。
而 Windows 里实现了一套完整的支持 socket 的异步编程接口,这套接口就是 IOCP,是由操作系统级别实现的异步 I/O,真正意义上异步 I/O,因此在 Windows 里实现高性能网络程序可以使用效率更高的 Proactor 方案。
目前来说,Java的AIO模型还没得到广泛的应用,主要还是基于多路复用的NIO的Reactor的Netty模型
五、Netty的使用
5.1 基本概述
Netty 是一个 NIO 客户端服务器框架,可快速轻松地开发网络应用程序,例如协议服务器和客户端。它极大地简化和简化了网络编程,例如 TCP 和 UDP 套接字服务器,且在市面上也存在非常多使用Netty开发的程序,如Dubbo、Flink、Spark、Elasticsearch、HBase等流行的分布式框架
5.2 工作模型
5.3 Hello World
我们总体分为四个部分
- 服务端
- 服务端处理器
- 客户端
- 客户端处理器
最后是执行结果,所以,直接上代码先
Pom依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.68.Final</version>
</dependency>
NettyDemoServer
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* @author lijiamin
*/
public class NettyDemoServer {
public static void main(String[] args) throws Exception {
// 1. 启动类线程组配置
/**
* 创建两个线程组
* bossGroup 只处理连接请求
* workerGroup 处理I/O请求
*/
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(2);
/**
* 创建服务器端的启动对象,配置参数
* 通过链式编程,对serverBootstrap这个对象进行设置
* 1 设置两个线程组
* 2 TCP使用NioSocketChannel 作为服务器的通道实现
* 如果是UDP 则是NioDatagramChannel
* 3 设置线程队列等待连接个数
* 4 设置保持活动连接状态
*/
ServerBootstrap serverBootstrap =
new ServerBootstrap().group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
// 2. 客户端进来注册读写事件时,发配相应处理器
/**
* 这里进行了初始化
* 给 pipeline 设置处理器
* 给 workerGroup 的 EventLoop 对应的管道设置处理器
*/
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
System.out.println("此时客户端进来了,发配相应处理器");
System.out.println("客户 SocketChannel hashcode=" + ch.hashCode());
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(new NettyDemoServerHandler());
}
});
// 3. 服务器准备完成,开始启动并绑定端口
/**
* 启动服务器并绑定端口
*/
ChannelFuture channelFuture = serverBootstrap.bind(16668).sync();
/**
* 给 channelFuture 注册监听器,监控我们关心的事件
*/
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (channelFuture.isSuccess()) {
System.out.println("身为服务器的我已经准备好了");
System.out.println("监听端口 16668 成功");
} else {
System.out.println("监听端口 16668 失败");
}
}
});
/**
* 关闭通道并监听
*/
channelFuture.channel().closeFuture().sync();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
NettyDemoServerHandler
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.util.CharsetUtil;
/**
* 说明
* 1. 我们自定义一个Handler 需要继承netty 规定好的某个HandlerAdapter(规范)
* 2. 这时我们自定义一个Handler , 才能称为一个handler
*
* @author 李家民
*/
public class NettyDemoServerHandler extends ChannelInboundHandlerAdapter {
/**
* 读取数据事件 (可以在这里读取客户端发送的消息)
*
* @param ctx
* @param msg 客户端发送的数据
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("服务器读取线程 "
+ Thread.currentThread().getName()
+ " Channel ="
+ ctx.channel());
System.out.println("server ctx =" + ctx);
System.out.println("看看channel 和 pipeline的关系");
// 本质是一个双向链表
Channel channel = ctx.channel();
ChannelPipeline pipeline = ctx.pipeline();
// 将 msg 转成一个 ByteBuf
// ByteBuf 是 Netty 提供的,不是 NIO 的 ByteBuffer
ByteBuf buf = (ByteBuf) msg;
System.out.println("客户端发送消息是:" + buf.toString(CharsetUtil.UTF_8));
System.out.println("客户端地址:" + channel.remoteAddress());
}
/**
* 数据读取完成后的操作
* 1. writeAndFlush代表写入+刷新
* 2. 还需要对返回的数据进行编码 毕竟是中文
*
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
String ret = "你好客户端,我是服务器,我已经收到了你的消息";
ctx.writeAndFlush(Unpooled.copiedBuffer(ret, CharsetUtil.UTF_8));
}
/**
* 发生异常后, 一般是需要关闭通道
*
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
cause.printStackTrace();
}
}
NettyDemoClient
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
* @author 李家民
*/
public class NettyDemoClient {
public static void main(String[] args) throws Exception {
/**
* 创建客户端 事件循环组 + 启动对象
* 注意客户端使用的不是 ServerBootstrap 而是 Bootstrap
*/
EventLoopGroup eventGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
/**
* 设置相关参数
* 设置线程组
* 设置客户端通道的实现类(反射)
*/
bootstrap.group(eventGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(new NettyDemoClientHandler());
}
});
System.out.println("上面的代码 --- 客户端 ok...");
// 启动客户端去连接服务器端
ChannelFuture channelFuture
= bootstrap.connect("127.0.0.1", 16668).sync();
// 对关闭通道事件 进行监听
channelFuture.channel().closeFuture().sync();
eventGroup.shutdownGracefully();
}
}
NettyDemoClientHandler
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
/**
* 客户端的处理器
*
* @author 李家民
*/
public class NettyDemoClientHandler extends ChannelInboundHandlerAdapter {
/**
* 通道就绪后触发
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("client " + ctx);
ctx.writeAndFlush(Unpooled.copiedBuffer("你好,我是客户端", CharsetUtil.UTF_8));
}
/**
* 通道读事件触发
*
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("服务器回复的消息:" + buf.toString(CharsetUtil.UTF_8));
System.out.println("服务器的地址: " + ctx.channel().remoteAddress());
}
/**
* 关闭通道 异常处理
*
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("出现异常我就出现了");
cause.printStackTrace();
ctx.close();
}
}
5.4 Taskueue 任务队列
那么来思考一个问题,如果当服务器在上述的代码中,读取一个客户端的消息过久,是否就会造成阻塞了?
这样的后果那是不是就违背了我们同步非阻塞的NIO模型初衷
因此我们需要采取措施,来解决这种可能会发生阻塞的问题
普通/定时任务
修改 NettyDemoServerHandler
中的 channelRead
方法
这个方法的核心其实就是,new 出一个子线程进行操作
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 本质是一个双向链表
Channel channel = ctx.channel();
ChannelPipeline pipeline = ctx.pipeline();
/**
* 自定义用户普通任务测试
*/
ctx.channel().eventLoop().execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(5 * 1000);
System.out.println("自定义用户普通任务测试1");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
/**
* 定时任务测试
*/
ctx.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
ctx.writeAndFlush(Unpooled.copiedBuffer("用户定时任务---", CharsetUtil.UTF_8));
}
}, 5, TimeUnit.SECONDS);
ByteBuf buf = (ByteBuf) msg;
System.out.println("客户端发送消息是:" + buf.toString(CharsetUtil.UTF_8));
System.out.println("客户端地址:" + channel.remoteAddress());
}
非当前reactor线程调用的各种方法
在设置参数的时候通过hashcode,例如在推送系统的业务线程里面,根据用户的标识,找到对应的channel引用,然后调用Write类方法向该用户推送消息,就会进入到这种场景。最终的Write会提交到任务队列中后被异步消费
5.5 编码与解码
简述
我们先看下面的效果
- 没加
编解码器
之前,服务器收到客户端的消息如下客户端发送消息是1:PooledUnsafeDirectByteBuf(ridx: 0, widx: 24, cap: 2048)
- 加了
编解码器
之后,服务器收到客户端的消息如下客户端发送消息是1:你好,我是客户端
这里涉及到一个概念,叫做粘包和拆包,原理就不详解了,解决方案就是在下面这些
然后再来看一张图
所以,编码与解码的流程大致如上
Netty本身有自带的编码/解码器,例如ObjectDecoder/ObjectEncoder
和StringDecoder/StringEncoder
,但是本身使用了Java底层的序列化,效率不高,所以引入了其他的解决方案(Protobuf、Kryo、JSON),这些都是序列化的框架,当然你可以自定义编码器,但是没必要,我也懒
Protobuf
文章目录 |
---|
GitHub - protocolbuffers/protobuf: Protocol Buffers - Google's data interchange format |
解决方案之一,跨平台、支持多种语言,这个东西不像JSON用起来这么简单,但是相较于JSON的数据解析,它更加快,效率更高,下面讲讲怎么用,首先引入依赖
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.19.4</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
<version>3.19.4</version>
</dependency>
如果在Windows环境开发,请先将编译器从github上下载好,环境变量就不配置了,一会演示使用maven去附带编译
在IDEA添加porotbuf编译插件
由于我们是使用maven进行构建,所以这里要在pom文件将插件引入
<build>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocExecutable>
E:\63_protoc-3.19.4-win64\bin\protoc.exe
</protocExecutable>
<pluginId>protoc-java</pluginId>
<!-- proto文件放置的目录 -->
<protoSourceRoot>${project.basedir}/src/main/resources/protobuf</protoSourceRoot>
<!-- 生成文件的目录 -->
<outputDirectory>${project.basedir}/src/main/java</outputDirectory>
<!-- 生成文件前是否把目标目录清空,这个最好设置为false,以免误删项目文件 -->
<clearOutputDirectory>false</clearOutputDirectory>
</configuration>
</plugin>
</plugins>
</build>
将pom插件引入后,在后边会有这些东西,一会会用来构建
以上就是准备工作,下面开始写一个简单的Demo
首先从文件格式上了解,proto文件简单地说就是一个消息的协议文件,这个协议文件的后缀文件名为“.proto”
person-entity.proto
// 协议号
syntax = "proto3";
// 所在目录
package com.ljm.protobuf;
// 生成具体目录
option java_package = "com.ljm.protobuf";
// 该字段是option(可选)的,如果是true,那么每一个message文件都会有一个单独的class文件
option java_multiple_files = true;
// 该字段是option(可选)的,生成的数据访问类的类名
option java_outer_classname = "PersonEntity";
// 具体的消息体
message Person {
int32 id = 1;
string name = 2;//必须字段,在后面的使用中必须为该段设置值
optional string email = 3;//可选字段,在后面的使用中可以*决定是否为该字段设置值
optional string detail = 4;
}
然后通过maven生成了具体的包及类后,就正式进入使用阶段,Protobuf与netty结合
服务器上的变化的点是在管道的处理器以及参数接收上
目前发现这种处理方式比较死板,只能接收这一种类型的数据,以后有更好的我再补充
// protobuf 编码器
pipeline.addLast("encoder", new ProtobufEncoder());
// protobuf 解码器
pipeline.addLast("decoder", new ProtobufDecoder(Person.getDefaultInstance()));
接着在钩子方法的处理上就比较简单了
/**
* 读取数据事件 (可以在这里读取客户端发送的消息)
*
* @param ctx
* @param msg 客户端发送的数据
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Person person = (Person)msg;
System.out.println("客户端发送消息是:" + person);
System.out.println("客户端地址:" + ctx.channel().remoteAddress());
}
客户端就不重复去写了,方法是一样的,前面我也有说到,这么写太死板了,所以就需要我们去重写一下编解码器,使它更加的灵活、通用。
JSON
或者使用Json来解决
<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.78</version>
</dependency>
编解码器的结合
主要是在出入站的处理器上,平时我们需要放入编码器类和解码器类,如
// 处理器管道配置
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
System.out.println("此时客户端进来了:" + "客户 SocketChannel hashcode=" + ch.hashCode());
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(new NettyDemoServerHandler());
}
});
但是现在有一种新的处理方式,那种就是编码器和解码器的结合,Netty的新类型——Codec(编解码器)
书上介绍了两种编解码器
- ByteToMessageCodec
- CombinedChannelDuplexHandler
第一种同时包含了编解码的两个抽象方法 —— 编码encode()和解码decode(),都需要自己去实现;而第二种解决了第一种编解码器在逻辑上的缺陷
将编码器和解码器的逻辑强制性地放在同一个类中,在只需要编码或者解码单边操作的流水线上,逻辑上不大合适
自定义Protobuf编解码器
简单说明
- 通过继承Netty中基础的MessageToByteEncoder编码器类,实现其抽象的编码方法encode()
- 通过继承Netty中基础的ByteToMessageDecoder解码器类,实现其抽象的编码方法decode()
5.6 TCP链路背压问题
解决方案
- Apache Flink 大数据的分布式处理框架
- MQ应该也行
5.7 手写分布式RPC
逻辑梳理
个人理解
注册中心消费者 提供者
1.消费者/提供者 从注册中心获取端点地址信息
2.Netty与Spring整合,封装动态代理
3.根据从注册中心拿到的端点地址信息,传递参数给动态代理,实现RPC调用
代码实现
不写代码了,大概要的内容就是
- 代理类,封装Netty
- 远程调用,注册中心处获取微服务的地址和端口
- 发送数据时编码器的处理
5.8 Netty处理器中的方法介绍
方法名 | 描述 |
---|---|
channelRegistered(…) | Channel注册到EventLoop,并且可以处理IO请求 |
channelUnregistered(…) | Channel从EventLoop中被取消注册,并且不能处理任何IO请求 |
channelActive(…) | Channel已经连接到远程服务器,并准备好了接收数据 |
channelInactive(…) | Channel还没有连接到远程服务器 |
channelReadComplete(…) | Channel的读取操作已经完成 |
channelRead(…) | 有数据可以读取的时候触发 |
userEventTriggered(…) | 当用户调用Channel.fireUserEventTriggered方法的时候触发,用户可以传递一个自定义的对象当这个方法里 |
5.9 手撸一个简易的聊天系统
首先思路梳理,大致的结构
- 服务端上,首先明确,对端通信是通过通道Channel的,所以我发送消息的时候也是利用通道广播出去,但是我目前还没找到比较效率高的办法,具体可以看看代码;
- 客户端上,我刚才也说了,有了通道就可以发送消息,那在客户端与服务端建立连接的时候,利用通道就绪的钩子方法,将通道赋给静态的类变量上;
服务端
NettyDemoServer
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
/**
* @author 李家民
*/
public class NettyDemoServer {
public void ServerStart() throws InterruptedException {
// 连接请求处理
EventLoopGroup bossGroup = new NioEventLoopGroup(3);
// IO请求处理
EventLoopGroup workerGroup = new NioEventLoopGroup(5);
// 基本配置
ServerBootstrap serverBootstrap =
new ServerBootstrap().group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
// 处理器管道配置
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
System.out.println("此时客户端进来了:" + "客户 SocketChannel hashcode=" + ch.hashCode());
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(new NettyDemoServerHandler());
}
});
// 服务器端口配置及监听
ChannelFuture channelFuture = serverBootstrap.bind(16668).sync();
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (channelFuture.isSuccess()) {
System.out.println("监听端口 16668 成功");
} else {
System.out.println("监听端口 16668 失败");
}
}
});
// 关闭通道并监听
channelFuture.channel().closeFuture().sync();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
NettyDemoServerHandler
广播消息的做法太蠢了,但是我还没找到更好的方法
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import java.util.concurrent.CopyOnWriteArrayList;
/**
* 说明
* 1. 我们自定义一个Handler 需要继承netty 规定好的某个HandlerAdapter(规范)
* 2. 这时我们自定义一个Handler , 才能称为一个handler
*
* @author 李家民
*/
public class NettyDemoServerHandler extends ChannelInboundHandlerAdapter {
private static CopyOnWriteArrayList<Channel> copyOnWriteArrayList = new CopyOnWriteArrayList<Channel>();
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("有客户端连接上来了:" + ctx.channel().remoteAddress());
copyOnWriteArrayList.add(ctx.channel());
}
/**
* 读取数据事件 (可以在这里读取客户端发送的消息)
*
* @param ctx
* @param msg 客户端发送的数据
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String buf = (String) msg;
System.out.println("客户端发送消息是:" + buf);
System.out.println("客户端地址:" + ctx.channel().remoteAddress());
// 把拿到的消息广播出去
for (Channel channel : copyOnWriteArrayList) {
channel.writeAndFlush("服务器广播消息:" + buf);
}
}
/**
* 数据读取完成后的操作
* 1. writeAndFlush代表写入+刷新
* 2. 还需要对返回的数据进行编码 毕竟是中文
*
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
String ret = "你好客户端,我是服务器,我已经收到了你的消息 = channelReadComplete";
ctx.writeAndFlush(Unpooled.copiedBuffer(ret, CharsetUtil.UTF_8));
}
/**
* 发生异常后, 一般是需要关闭通道
*
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
cause.printStackTrace();
}
}
客户端
DemoController
import com.ljm.netty.NettyDemoClientHandler;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
/**
* @author 李家民
*/
@RestController
public class DemoController {
@Resource
private NettyDemoClientHandler nettyDemoClientHandler;
@RequestMapping("msgSend")
public String msgSend(String msg) {
nettyDemoClientHandler.msgSend("用户1:"+msg);
return "ok";
}
}
NettyDemoClient
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
/**
* @author 李家民
*/
public class NettyDemoClient {
public void startClient() throws Exception {
// 创建客户端 事件循环组 + 启动对象
EventLoopGroup eventGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
// 参数配置
bootstrap.group(eventGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
System.out.println("连接到服务器那头了...");
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(new NettyDemoClientHandler());
}
});
// 启动客户端去连接服务器端
ChannelFuture channelFuture
= bootstrap.connect("127.0.0.1", 16668).sync();
// 对关闭通道事件进行监听
channelFuture.channel().closeFuture().sync();
eventGroup.shutdownGracefully();
}
}
NettyDemoClientHandler
其实最后我们都是在利用ChannelInboundHandlerAdapter的各种钩子方法
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.springframework.stereotype.Component;
/**
* 客户端的处理器
*
* @author 李家民
*/
@Component
public class NettyDemoClientHandler extends ChannelInboundHandlerAdapter {
private static Channel channel;
public void msgSend(String msg) {
channel.writeAndFlush(msg);
}
/**
* 通道就绪后触发
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("println:客户端连接就绪");
channel = ctx.channel();
}
/**
* 通道读事件触发
*
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String buf = (String) msg;
System.out.println("服务器回复的消息:" + buf);
System.out.println("服务器的地址: " + ctx.channel().remoteAddress());
}
/**
* 关闭通道 异常处理
*
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("出现异常我就出现了");
cause.printStackTrace();
ctx.close();
}
}
5.10 心跳检测
先列个大纲
- 事出有因 - 为什么这么做
- 解决方案
简介
为什么需要做心跳检测?一般来说,如果客户端服务端之间socket已经断开了,但是服务端还没关掉自己的ServerSocket,数量一多就会很占用服务端的性能,根据书中说明,这种连接假死的情况有三种
- 服务端上处于TCP_ESTABLISHED状态的“正常”连接
- TCP客户端显示连接已经断开
- 客户端无法进行断线重连,上一次的连接状态服务端没有释放
解决方案就是
- 服务端使用Netty自带的IdleStateHandler做空闲检测
- 客户端定时发送心跳包
Demo
下面我写一个简易版的示例
服务端,实现IdleStateHandler的抽象方法
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler
import java.util.Date;
import java.util.concurrent.TimeUnit;
/**
* 客户端的空闲检测
*
* @author 李家民
*/
public class HeartBeatServerHandler extends IdleStateHandler {
public HeartBeatServerHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
super(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds);
}
public HeartBeatServerHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) {
super(readerIdleTime, writerIdleTime, allIdleTime, unit);
}
public HeartBeatServerHandler(boolean observeOutput, long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) {
// observeOutput:观察输出 - 在评估写入空闲时是否应考虑bytes的消耗.默认值为false
// readerIdleTime:表示入站(Inbound)空闲时长
// writerIdleTime:出站(Outbound)空闲时长
// allIdleTime:出/入站检测时长
// unit:上述的时间单位
// 如果参数为0代表禁用
super(observeOutput, readerIdleTime, writerIdleTime, allIdleTime, unit);
}
@Override
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
// 接收不到心跳被调用了
System.out.println("接收不到心跳了 我出现了.... = " + new Date() + "__" + ctx.channel().id() + "__" + evt);
super.channelIdle(ctx, evt);
}
}
装配到管道上,记得装配的位置是个坑,要放在业务处理器前
接下来看客户端,客户端需要定时的发送数据给服务端,我的逻辑是通道建立就绪后,做一个异步执行的任务
/**
* 通道就绪后触发
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 异步客户端心跳监测
CompletableFuture.runAsync(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 6; i++) {
System.out.println("客户端循环心跳监测发送: " + new Date());
ctx.writeAndFlush("客户端循环心跳监测发送: " + new Date());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
}
最终的效果就是,当我停止发送数据后,就会回调channelIdle()方法
你学废了吗
六、Netty核心组件再分析
大致分为这几个方向
- Netty中的Reactor模式
- Bootstrap
- NioEventLoop
- Channel
- Handler
- Pipeline
- ByteBuf
好了,开始!
6.1 Netty中的Reactor模式
前情回顾,复习一下NIO下的Reactor模式中处理IO事件的方式
- 注册Channel通道
- 查询Selector选择器中的IO事件
- 将IO事件发配给相应的Handler处理器
而在Netty的中的Reactor模式
,对许多组件进行了重封装,例如Channel。通过对应不同的协议,实现对应的通道类型,在2.2章节上有对各个通道类型进行简介,但是实际上,我们用的最多的协议还是TCP,具体情况具体分析,下面简要的看一下这组Channel的UML图。
其他的....现在我还不会,就跳过吧
6.2 Bootstrap
首先,这是一个用于完成Netty的一些基本配置的便利工厂类,如果不用这个类去进行一些基本的配置也可以,但是十分繁琐,不利于新手学习,也比较拖效率,之前的Demo中我们也用了许多Bootstrap中的方法
而在netty中有两个引导类,分别用在服务端和客户端,但是本质上差不多,下面简要的看一下Bootstrap的启动流程
- 反应器轮询组创建
如果不需要分开监听新连接事件和IO事件,其实可以只用一个轮询组,但是那样会在同一线程下的阻塞问题导致效率很低,所以一般都是分两个组分别干一件事。 - 设置通道类型
上图中,我设置的通道类型是NioServerSocketChannel,还有许多其他类型的通道,具体情况具体分析。 - 监听端口配置
- 通道配置
例如像一些连接保持、连接数等。 - 装配Pipeline管道
在ChannelInitializer通道初始化类的实例中,有一个initChannel初始化方法,在子通道创建后会被执行,向子通道流水线增加业务处理器。 - 绑定服务器监听端口
- 把自己这个线程阻塞了,直到监听的通道关闭,最后关闭EventLoopGroup,释放所有资源
关于这个有一个很好的模拟现象,我通过将Netty的启动方法放入Bean后发现,我的主线程跑不起来了,其实原因就是因为我把当前自己的这个线程阻塞掉了,导致后续的SpringBoot无法继续下去。
另外再描述一下关于在ServerBootstrap上的一些ChannelOption可选通道配置
文章目录 |
---|
自顶向下深入分析Netty(六)--Channel总述 - 简书 (jianshu.com) |
6.3 NioEventLoop
首先从功能上了解,NioEventLoopGroup和NioEventLoop分别都做了哪些事
NioEventLoopGroup
- 根据设置的线程数创建NioEventLoop线程组并初始化
- 创建线程选择器,当获取线程时,通过选择器来获取
- 创建线程工厂并构建线程执行器
NioEventLoop
- 开启Selector并初始化
- 把ServerSocketChannel注册到Selector上
- 处理各类型的IO事件
- 执行定时调度任务
- 解决JDK空轮询问题
具体的源码层面的刨析看看《Netty源码剖析及应用》,现阶段俺不会。。。
6.4 Channel
很重要,是Netty最核心的组件,用于数据通信传输,下面看看里面的重要方法
- ChannelFuture connect(SocketAddress address)
- ChannelFuture bind(SocketAddress address)
- ChannelFuture close()
- Channel read()
- ChannelFuture write(Object o)
- Channel flush()
不想写了...感觉方法名上说明的很清楚,另外有一个单元测试通道EmbeddedChannel,是为了解决在每开发一个业务处理器都进行服务器和客户端的重复启动的痛点问题,它也叫嵌入式通道。
6.5 Handler
在前端由反应器做完基本的处理后,最终的IO业务会落在业务处理器上,这是由用户进行定义的,通常我们会对处理器类型进行这么区分
- 入站处理器 ChannelInboundHandler
- 出站处理器 ChannelOutboundHandler
- 通道初始化处理器 ChannelInitializer
当对端数据入站进入Netty通道时,Netty将触发入站处理器所对应的方法进行入站处理;当我方业务数据写入Netty通道的时候,就可以使用出站处理器进行一些列的底层通道操作。例如连接建立、连接断开、写入刷新等等;
然而,对于通道初始化处理器,我们可以先看看这段代码
// 处理器管道配置
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("encoder", new ProtobufEncoder());
pipeline.addLast("decoder", new ProtobufDecoder(Person.getDefaultInstance()));
pipeline.addLast(new NettyDemoServerHandler());
}
});
Channel和Handler业务处理器之间的关系:一条通道有一条处理器流水线(Pipeline),如果我们需要自行装配自己的业务处理器,就会像上面这样利用通道的初始化处理器进行装配。
6.6 Pipeline
管道,也叫流水线,上文也提到Channel和Handler业务处理器之间的关系,我好懒,不想学了
文章目录 |
---|
netty源码分析之pipeline(一) - 简书 (jianshu.com) |
Netty职责链Pipeline详解_suvue-CSDN博客 |
6.7 ByteBuf
Netty中的ByteBuf是将NIO的ByteBuf进行重写过,它不再需要调用flip()方法去切换读/写模式,其内部是一个字节容器,分为了四个部分
- 废弃 - 已用字节
- 可读 - 有效数据
- 可写 - 数据写入的部分
- 可扩容 - 可扩容的字节数大小
文章目录 |
---|
ByteBuf(秒懂)- 图解Netty系列_架构师尼恩-CSDN博客 |
我先码后学
七、内存管理
文章目录 |
---|
jemalloc内存分配算法 - xiaojiesir - 博客园 (cnblogs.com) |
为了提高内存的使用效率,Netty引入了Jemalloc
内存分配算法,根据内存的大小分配至不同的内存区域,为了避免线程间锁的竞争和同步,每个I/O线程都对应一个PoolThreadCache
,负责当前线程使用非大内存的快速申请和释放。
大概总结下来就是,根据计算好的内存大小去合适的空间分配内存,结合上下图去理解
所以,它最终还是由几个组件去进行内存工作
- PoolChunk
- PoolSubpage - 内存分配和释放
- PoolArena - 内存管理
- RecvByteBufAllocator - 内存分配计算
文章目录 |
---|
Netty内存管理机制 - 简书 (jianshu.com) |
netty中的内存泄漏检测机制ResourceLeakDetector - 简书 (jianshu.com) |
八、时间轮
文章目录 | |
---|---|
[netty源码解读之时间轮算法实现-HashedWheelTimer \ | Zacard's Notes](https://zacard.net/2016/12/02/netty-hashedwheeltimer/) |
先来看一张图
大概理解就是,指针每指向一个刻度,就把那个刻度里面的任务拿出来执行,所以这个时间轮还是很形象的。
好难,跳过
九、问题分析/性能调优
9.1 生产问题分析
列举以下几个情况
- 内存溢出现象在服务器运行一段时间后一定会出现
- 内存溢出偶尔会出现,可能要运行很长一段时间才会发生
- 无内存溢出,但响应耗时比较长且TPS非常低
- 无内存溢出,但发现有些功能出现卡死现象
情况列出来了,开始问腿解决
- 内存溢出一定出现
使用jmap观察哪些对象内存增长速度过快,且没有释放
- 内存溢出偶尔会出现
服务启动时加上内存异常快照,问题出现后,通过内存分析工具进行分析
- 响应耗时比较长且TPS非常低
可以通过阿里巴巴的这个开源工具,找到那条很占CPU的线程
- 卡死
也可以通过阿里巴巴的工具查看,看看是否是因为消费速度跟不上导致的背压,或者是死锁了
9.2 性能调优
概述
我们主要从三大方向入手
- Linux系统参数调整
- TCP参数调整
- Netty服务器应用层优化
对于第一种情况,我们可以通过放开服务器的全局句柄数
来解决
TCP参数调整
文章目录 |
---|
Netty:option和childOption参数设置说明 - 简书 (jianshu.com) |
主要是针对option
和childOption
的调优配置
option:针对BOSS线程组childOption:针对worker线程组
Netty服务器应用层优化
- 线程调优问题
- JVM优化
在第一个问题上,也是担心会出现任务的消费速度跟不上,产生背压,所以需要把线程以及背压解决逻辑给写好
在第二个问题,初始堆内存和最大堆内存需要根据内存模型进行计算并得出相对合理的值,并不是内存分配得越大越好
- 在JVM内存小于32GB的时候,会采用内存对象指针压缩技术,此指针不再表示对象在内存中的精确位置,而是表示偏移量。这意味着32bit的指针可以引用40亿个对象,而不是40亿个字节
- JVM内存设置一旦超过30~32GB的边界,指针就会切回普通对象的指针,每个对象的指针都变长了,就会使用更多的CPU内存
- 如果你实在是有大内存的需求,则需要在垃圾回收器上的选择进行更多的一个考虑
另外,对于非实时性特别高的系统,Netty的写操作尽量减少直接往网络中写的次数,可减少系统调用的开销,提高带宽利用率,如Flink、JStorm等流式计算框架,并且业务接口可以上Redis缓存,减少磁盘I/O的动作
最终总结
文章目录 |
---|
通俗地讲,Netty 能做什么? - 知乎 (zhihu.com) |
1
1
1