前言
小编上次分享了netty的线程模型以及简单使用,不知道的小伙伴可以看看Netty框架之线程模型与基础用法,小编承认网络编程还是非常复杂的,一不小心就掉坑里了,即使是小编简单示例的代码,很多小伙伴也问小编这个为什么这么写啊,尤其是pipeline.addLast或addFirst方法里面的参数,然后什么时候使用addLast什么时候使用addFirst方法。带着这些疑问,小编今天为大家带来netty的核心组件,相信学习完并吃透这个的话,在netty的应用方面就可以得心应手了。废话不多说,咱们进入主题。
Netty核心组件 - Netty Channel
说明netty channel之前,小编先带大家回顾一下nio的channel,因为netty的底层肯定用到nio了。如果不熟悉的小伙伴可以看小编之前的博客。
在java 原生NIO操作中Channel 是一个非常核心的组件,它可以用于连接传输的两端,并提供传输和读写相关操作比如:绑定端口、建立连接、读写消息、以及关闭管道等。如果是SelectableChannel还可以注册到选择器,由选择器监听读写,从而实现非阻塞的功能。
Netty 的Channel 是在原生基础之进行封装,也就是原有的功能Netty中同样可以实现。同时引入了Pipeline与异步执行机制。
- Pipeline会组装串联若干个ChannelHandler(管道处理器),所有针对管道主动或被动发生的事件都会在Pipeline上进行流转处理(这个小编在下面pipeline的结构中着重说明)。
- 异步机制是指在原生 NIO Channel 中异步进行IO操作是一种不安全行为,它会触发阻塞甚至会导致死锁(即原先java 的nio是线程不安全的,用的不好会导致现场阻塞或死锁,线程的安全得靠自己实现)。所以一般情况下在非 IO线程中操作Channel 都会以任务的形式进行封装,并提交到IO线程执行(线程安全实现的方式)。而这些在Netty中都已经内部封装实现,即使异步调用Netty Channel都是安全的。
JAVA 原生NIO中不同Channel实现会有不同的功能,在Netty中也是类似的它不同的子类会包含对应原生NIO中的Channel 。常见如下图:
Netty Channel结构
netty channel本质上是对原生nio进行了封装让我们使用起来根据简单,netty channel的结构示意图如下:
简要说明(以UDP绑定端口为例,因为这个比较简单):
- Netty的Channel为接口,AbstractChannel实现Channel接口,AbstractNioChannel继承了AbstractChannel类,NioDatagramChannel为具体的AbstractNioChannel的实现。
- 以建立连接为例,当Channel建立连接,首先通过pipleline调用到eventloop的unsafe。(unsafe是线程不安全的,netty是可以直接提供出来给调用方使用的,不过在eventloop中的话帮我们实现了线程安全)
- unsafe调用具体的实现类NioDatagramChannel,然后NioDatagramChannel调用到java原生的channel进行端口绑定。
示例代码:
@Test
public void bindPortTest(){
//构建线程组 IO
EventLoopGroup eventLoopGroup = new NioEventLoopGroup(1);
//创建管道并注册
NioDatagramChannel nioDatagramChannel = new NioDatagramChannel();
eventLoopGroup.register(nioDatagramChannel);
//初始化pipeline,为其添加Channel Handler处理器
// (没有这个pipeline也可以绑定只是不能处理读取事件)
nioDatagramChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println(msg);
}});
//绑定端口
nioDatagramChannel.bind(new InetSocketAddress(8080));
//因为是异步线程防止主线程停止
while (true);
}
绑定端口是一个IO操作,所以实际执行是会被以任务的形式提交到IO线程(NioEventLoop)。但其最终还是调用的JAVA 原生NIO 的绑定方法。这部分内容就在NioDatagramChannel.doBind() 方法中体现。
断点调试堆栈信息结果:
通过上面的堆栈信息希望大家已经理解了调用的整个过程。
以此类推调用Channel.write()操作,同样会被异步封装后,然后调用doWrite(),最终调到java nio Channel 中的Write或send。类似方法还有很多:
bind==>doBind==>java.nio.channels.Channel.bind()
write==>doWrite==> …
connect==>doConnect==>…
close==>doClose==>…
disconnect==>doDisconnect==>…
read==>doReadBytes==> …
由此可见doXXX方法即是直接对原生 Channel的IO调用。在非IO线程调用这是一种不安全的形为,所以所有do开头的方法都是不开放的(protected)。不过Netty 在Channel 中还提供了一个Unsafe 可以直接调用这些方法。接下来小编使用Unsafe直接调用方法以及一些讲解。
Unsafe
Unsafe 是Channel当中一个内部类,可以用于直接操作Channel 中的IO方法。而不必经过异步和管道。所以在Unsafe中调用IO方法它会立即返回。 不过和它的名字一样,这不是一种不安全的行为,需要调用者去确保当前对Unsafe的调用是在IO线程下,否则就会报异常。当然不是所有的方法都是线程不安全的,以下方法是线程安全的:
localAddress() //本地地址
remoteAddress() //远程地址
closeForcibly() //强行关闭
ChannelPromiseregister(EventLoop, ChannelPromise) //这是一个异步方法不会立马返回,而是完成后通知
deregister(ChannelPromise)voidPromise() //注销并且 完成后通知
Unsafe 是Channel的内部类, 不同的Channel 会对应不同的Unsafe 所提供的功能也不一样。如,其结构与继承关系如下图:
另外要说明的是 在Unsafe并不只是作为中介把调用转发到Channel,其还提供如下作用:
- 线程检测:当前调用是否为IO线程
- 状态检测:写入前判断是否已注册
- 写入缓存:Write时把数据写入临时缓存中,当flush时才真正提交
- 触发读取:EventLoop 会基于读取事件通知Unsafe ,在由unsafe读取后 发送到pipeline
所以Unsafe中最核心作用并不是给开发者调用而是其内部的组件调用。他在Channel、Eventloop、Pipeline这个三组件间启动了一个桥梁作用。
如在一个次读取场景中流程是这样的:
- EventLoop 触发读取并通知unsafe //unsafe.read()
- unsafe调用channel 读取消息 // channel.doReadBytes(ByteBuf)
- unsafe将消息传入pipeline // pipeline.fireChannelRead(msg)
写入过程:
- 业务开发调用channel写入消息 //channel.write(msg)
- channel将消息写入 pipeline // pipeline.write(msg)
- pipeline 中的Handler异步处理消息 //ChannelOutboundHandler.write()
- pipeline调用unsafe写入消息 //unsafe.write(msg);
- unsafe调用Channel 完成写入 // channel.doWrite(msg)
示例代码:
@Test
public void unsafeTest(){
NioDatagramChannel nioDatagramChannel = new NioDatagramChannel();
nioDatagramChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("注册成功");
}
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("管道激活,绑定端口号");
}
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
DatagramPacket datagramPacket = (DatagramPacket)msg;
ByteBuf content = datagramPacket.content();
System.out.println(content.toString(Charset.defaultCharset()));
}
});
AbstractNioChannel.NioUnsafe unsafe = nioDatagramChannel.unsafe();
unsafe.register(eventLoopGroup.next(),nioDatagramChannel.newPromise() );
nioDatagramChannel.eventLoop().submit(()->{
unsafe.bind(new InetSocketAddress(8080),nioDatagramChannel.newPromise());
});
while (true);
}
@Test
public void unsafeWriteTest(){
NioDatagramChannel nioDatagramChannel = new NioDatagramChannel();
eventLoopGroup.register(nioDatagramChannel);
nioDatagramChannel.bind(new InetSocketAddress(9999));
AbstractNioChannel.NioUnsafe unsafe = nioDatagramChannel.unsafe();
eventLoopGroup.submit(() -> {
// 写入消息到 unsafe 缓冲区
ByteBuf byteBuf = Unpooled.wrappedBuffer("hello world".getBytes());
DatagramPacket datagramPacket = new DatagramPacket(byteBuf,new InetSocketAddress("127.0.0.1",8080));
unsafe.write(datagramPacket, nioDatagramChannel.newPromise());
// 刷新消息到 java channel
unsafe.flush();
});
}
ChannelPipeline
每个管道中都会有一条唯一的Pipeline 其用于流转的方式处理Channel中发生的事件比如注册、绑定端口、读写消息等。这些事件会在pipeline流中的各个节点轮转并依次处理,而每个节点就可以处理相对应的功能,这是一种责任链式的设计模式,其目的是为让各个节点处理理聚焦的业务。
Pipeline结构
事件是如何在pipeline中轮转的呢?其实内部采用双向链表结构,通过ChannelHandlerContext 包装唯一的Handler,并通过prev与next属性分别链接节点上下的Context,从而组成链条。pipeline中有Head与Tail 两个Context对应链条的首尾。
ChannelHandler
ChannelHandler是指pipeline当中的节点,共有三种类型:
- 入站处理器:即ChannelInboundHandler的实现,可用于处理如消息读取等入站事件(如上图绿色的Handler)
- 出站处理器:即ChannelOutboundHandler的实现,可用于处理消息写入、端口绑定入出站事件(如上图红色色的Handler)
- 出入站处理器:ChannelDuplexHandler的实现,可以处理所有出入站事件。某些协义的编解码操作想写在一个类里面,即可使用该处理器实现。
出入站事件
Channel 中的事件可分为出站与入站两种。
- 入站事件是指站内发生的事件,如已读取的消息处理、管道注册、管道激活(绑定端口或已连接)这些都是由EventLoop基于IO事件被动开始发起的。请注意所有入站事件触发必须由ChannelInBoundInvoker的子类执行。
- 出站事件出站事件是指向Channel的另一端发起请求或写入消息。如:bind、connect、close、write、flush 等。其均由ChannelOutboundInvoker触发并由ChannelOutboundHandler处理。与入站事件不同其都由开发者自己发起。
- 事件的触发下图可以看出 pipeline 与Context分别实现了出入站接口,说明其可触发所有出入站事件,而Channel只承继出站口,只能触发出站事件。
ChannelHandlerContext
Context主要作用如下:
- 结构上链接上下节点
- 传递出入站事件,所有的事件都可以由Context进行上下传递
- 保证处理在IO线程上,前面所说所有的IO操作都需要异步提交到IO线程处理,这个逻辑就是由Context实现的。如下面的绑定操作就是保证了IO线程执行:(io.netty.channel.AbstractChannelHandlerContext#bind)
链条梳理流程
出入站事件都是Channel 或pipeline 发起,并由Context进行上下传递。如果是入站事件将会从头部向下传递到尾部并跳过 OutboundHandler,而出站与之相反,从尾部往上传递,并跳过InboundHandler处理器。接下来小编用代码示例来说明一下:
UDP代码示例
@Test
public void inAndOutEventTest(){
//构建线程组 IO
EventLoopGroup eventLoopGroup = new NioEventLoopGroup(1);
//创建管道并注册
NioDatagramChannel nioDatagramChannel = new NioDatagramChannel();
eventLoopGroup.register(nioDatagramChannel);
//初始化pipeline,为其添加Channel Handler处理器
ChannelPipeline pipeline = nioDatagramChannel.pipeline();
pipeline.addLast(new ChannelInboundHandlerAdapter() {
public void channelRead(ChannelHandlerContext ctx, Object msg) {
String message = (String)msg;
System.out.println("入站事件1:"+ message);
//入站事件处理后的消息
message += ", I'm your farther";
ctx.fireChannelRead(message);
}});
pipeline.addFirst(new ChannelOutboundHandlerAdapter() {
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("出站事件1:"+ msg.toString());
ctx.write(msg.toString()+", hi Netty");
}});
pipeline.addFirst(new ChannelOutboundHandlerAdapter() {
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("出站事件2:"+ msg.toString());
DatagramPacket packet = new DatagramPacket(Unpooled.
wrappedBuffer(msg.toString().getBytes()),
new InetSocketAddress("127.0.0.1", 8080));
ctx.write(packet);
}});
pipeline.addLast(new ChannelInboundHandlerAdapter() {
public void channelRead(ChannelHandlerContext ctx, Object msg) {
String message = (String)msg;
System.out.println("入站事件2:"+ message);
ctx.writeAndFlush(msg);
}});
//绑定端口
nioDatagramChannel.bind(new InetSocketAddress(8083));
pipeline.fireChannelRead("hello world");
// pipeline.write("123");
// pipeline.flush();
while (true);
}
执行流程说明:
- 基于pipeline 触发入站处理,其首由头部开始处理,并向下传递
- 入站事件1接收消息 ,并改写消息后通过ctx.fireChannelRead();往下传递
- 入站事件2接收消息,并打印。此时入站事件将msg写出去
- 出站事件1接受消息并打印,继续向下一个出站事件写出去ctx.writeAndFlush(msg)
- 出站事件2接受消息打印和,写出一个DatagramPacket 发往服务端
注意各个顺序的调用。有很多种方法,如果大家理解了,那基本对pipeline的调用顺序完全理解了。
总结
这篇文章比较长可能需要一定时间的消化,请大家耐心读取,本次分享小编认为主要的核心是ChannelPipeline,搞清楚这个的话对使用netty应该没问题,其他的大家能够理解多少或有疑问希望和小编讨论一下。接下来小编还会继续分析netty相关内容,望大家继续支持啊。