一、简介
一、概述
Netty是JBOSS提供的一个开源框架, 本质为网络应用程序框架, 提供了非阻塞的、事件驱动的网络应用程序框架和工具,便于快速开发出高性能、高可靠性的网络服务端以及客户端。
二、核心架构
上图是netty的核心架构,从官网上截取的, 有兴趣的可以去看看
核心:
- 可拓展的事件模型
- 统一的通信API,简化了通信编码
- 零拷贝和丰富的字节缓冲区
传输服务:
- 支持Socket和Datagram(数据报)
- HTTP传输
- In-VM Pipe(管道协议,JVM的一种进程)
协议支持:
- HTTP以及WebSocket
- SSL安全套接字协议支持
- Google Protobuf(序列化框架)
- 支持zlib、gzip压缩
- 支持大文件的传输
- RTSP(实时流传输协议, 是TCP/IP协议体系中的一个应用层协议)
- 支持二进制协议并且提供完整的单元测试
三、为什么使用Netty不使用Java原生NIO
Netty主要采用的也是NIO,并且是基于JDK中的NIO做了一些实现以及升级
主要有四点:
- Netty的API对开发者更友好,JDK中的API功能薄弱且复杂(如:ByteBuffer改为ByteBuf)
- Netty中采用了Reactor线程模型,能够保证自身的线程安全
- Netty能实现高可用,解决了一些传输的问题,如粘包、半包、断路重连等
- 解决了Bug,如JDK的NIO中epoll bug
四、在使用Netty的项目
- 数据库: Cassandra
- 大数据处理: Spark、Hadoop
- Message Queue:RocketMQ
- 检索: Elasticsearch
- 框架:gRPC、Apache Dubbo、Spring5(响应式编程WebFlux)
- 分布式协调器:ZooKeeper
- 工具类: async-http-client
二、Reactor模型
Reactor线程模型是一种思想,不属于Java,也不属于Netty,其定义了三种角色
- Reactor:用于监听和分配事件,将I/O事件分给对应的Handler。新的事件包括建立连接就绪、读就绪、写就绪。
- Acceptor:处理客户端连接,并分配到处理器链中(可暂时简单理解为ServerSocketChannel)
- Handler:将自身与事件绑定,执行非阻塞的读和写任务,从channel中读入,完成处理业务逻辑, 再将结果写入channel
通过这三种角色定义出三种NIO的模式
单Reactor-单线程模式
所有的接收请求以及处理数据都是有一个线程执行的,所以在一定的数量之后,性能就会下降。
单Reactor-多线程模式
基于刚刚的单Reactor单线程模式,我们将消耗时间较长的编解码、业务计算抽取出去,建立一个线程池进行处理,这样能提升性能,但还不是最优解。
主从Reactor-多线程模型
这次我们再次基于上面的模型进行调整,将专门负责接受连接的ServerSocketChannel另开一个Reactor进行调用,这个Reactor称为主;主Reactor将Channel建立在专门负责读写的从Reactor上,这就是所谓的1+N+M(1个监听线程,负责监听新的socket;N个IO线程,负责对socket进行读写;M个worker线程,负责处理数据)。
工作流程:
- Reactor主线程MainReactor对象通过selector监听到客户端的连接请求,通过Acceptor处理客户端连接事件。
- Acceptor跟客户端建立好socket连接之后,MainReactor会将连接分配给SubReactor。
- SubReactor将连接注册到自己的Selector的队列中进行监听,并创建对应的Handler对各种事件进行处理。
- 当连接上有新的事件发生时,SubReactor会调用对应的Handler进行处理。
- Handler通过read从channel和缓冲区上读取请求数据,然后将分发给Worker进行处理。
- Worker处理完数据会将结果再返还给Handler,Handler再通过send请求将数据发给客户端。
- 一个MainReactor可以对应多个SubReactor。
优势所在:
- 各个线程职责简单且明确,MainReactor只需要负责注册连接,SubReactor负责后续业务处理。
- MainReactor和SubReactor交互简单,主只需将连接交给从,从也无需返回数据。
- 多个SubReactor可以处理高并发业务。
三、Netty对Reactor的实现
在Netty这个部分当中可以看到用了很多的池化思想
工作流程:
- Netty提供了两个线程池,一个BossGroup、一个WorkerGroup,每个池中都有EventLoop(相当于一个线程,可以是NIO,BIO,AIO)
- 每个EventLoop中包含Selector和TaskQueue
- 每个BossEventLoop负责以下三件事
- ①select:轮询注册ServerSocketChannel上的accept事件
- ②processSeleckedKeys:与客户端进行连接,并创建SocketChannel,将它注册到某一WorkerEventLoop上
- ③runAllTasks:继续处理其他事件
- 每个WorkerEventLoop也负责一下三件事
- ①select:轮询注册SocketChannel上的read/write事件
- ②processSeleckedKeys:在对应的SocketChannel上的PipeLine上处理相对应的数据
- ③runAllTasks:继续处理队列中的其他事件
ChannelPipeline和ChannelHandler
如图所示,ChannelPipeline为ChannelHandler的容器,每个SocketChannel都会与一个ChannelPipeLine绑定。假如这是服务端程序,读取处理我们称数据是入站的,需要经过一系列Handler处理后;如果服务器想向客户端写回数据,也需要经过一系列的Handler处理,我们称之为出站。
ChannelHandler则分为出站和入站的处理器,还有混合型的既能处理出站也能处理入站的
四、Netty使用的示例代码
服务端
public class NettyServer {
public static void main(String[] args) {
NettyServer server = new NettyServer();
server.start(8888);
}
private void start(int port) {
// 创建主线程池
EventLoopGroup boss = new NioEventLoopGroup(1);
// 创建从线程池
EventLoopGroup work = new NioEventLoopGroup();
try {
// 创建服务端的引导类
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boss, work)
// 配置服务端主的channel
.channel(NioServerSocketChannel.class)
// 配置服务端handler
.handler(new LoggingHandler(LogLevel.INFO))
// 配置从的handler,也就是服务端连接的
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new ServerInboundHandler1());
}
});
ChannelFuture future = serverBootstrap.bind(port).sync();
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
boss.shutdownGracefully();
work.shutdownGracefully();
}
}
}
客户端
public class NettyClient {
public static void main(String[] args) {
NettyClient client = new NettyClient();
client.start("127.0.0.1",8888);
}
private void start(String host, int port) {
EventLoopGroup loopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(loopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
}
});
// 连接服务器
ChannelFuture future = bootstrap.connect(host, port).sync();
// 客户端向服务端发送数据
Channel channel = future.channel();
String msg= "我是Netty客户端, 你收到了吗?";
ByteBuf buffer = channel.alloc().buffer();
buffer.writeBytes(msg.getBytes(StandardCharsets.UTF_8));
channel.writeAndFlush(buffer);
// 等待关闭
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
loopGroup.shutdownGracefully();
}
}
}
入站处理器
@Slf4j
public class ServerInboundHandler1 extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("ServerInboundHandler1 channelActive 执行了");
super.channelActive(ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] dst=new byte[buf.readableBytes()];
buf.readBytes(dst);
String s = new String(dst, Charset.defaultCharset());
System.out.println(s);
log.info("读取到从客户端的数据"+s);
super.channelRead(ctx, msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
super.channelReadComplete(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
}
}