前言
如果说http对应着《Jetty概述》,那为tcp就对应着netty.Netty对于程序员的重要性可想而知
Netty架构
netty主要有以下三部份构成:
- ByteBuf内存
- Channel管道
- Pipeline事件模式
ByteBuf内存篇
《内核概述与NIO》与《JAVA直接内存(堆外内存)》中详细介绍了buff操作。分别对应Netty中的
- UnpooledHeapByteBuf(堆内)
- UnpooledDirectByteBuf (堆外)
- UnpooledUnsafeHeapByteBuf(unsafe堆内)
- UnpooledUnsafeDirectByteBuf(unsafe堆外)
- Netty对ByteBuf增加了基本控制,诸如 readerIndex, writerIndex, markedReaderIndex, markedWriterIndex, maxCapacity。
- 使用虚拟机提但的UNSAFE类可以直接跳过虚拟机的检测,可提交内存分配数据
- 使用ResourceLeakDetector封装RefQueue来检测堆外内存的回收
通过阅读AbstractNioByteChannel中的代码,read()方法发生在nio的op_read事件时触发
@Override
public final void read() {
final ChannelConfig config = config();
if (shouldBreakReadReady(config)) {
clearReadPending();
return;
}
//每个channel对应一个PPLine
final ChannelPipeline pipeline = pipeline();
//ByteBuf分配器
final ByteBufAllocator allocator = config.getAllocator();
//容量计算器
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
//重置,把之前计数的值全部清空
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
//分配内存,关键在于计算分配内存的大小(小了不够,大了浪费)
byteBuf = allocHandle.allocate(allocator);
//doReadBytes,从socket读取字节到byteBuf,返回真实读取数量
//更新容量计算器
allocHandle.lastBytesRead(doReadBytes(byteBuf));
//如果小于0 则socket关闭,如果等于0则没读取到数据
if (allocHandle.lastBytesRead() <= 0) {
//释放资源
byteBuf.release();
byteBuf = null;
//如果小于0则意味着socket关闭
close = allocHandle.lastBytesRead() < 0;
if (close) {
readPending = false;
}
break;
}
//增加循环计数器
allocHandle.incMessagesRead(1);
readPending = false;
//把读取到的数据,交给管道去处理
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
//判断是否继续从socket读取数据
} while (allocHandle.continueReading());
//读取完成后调用readComplete,重新估算内存分配容量
allocHandle.readComplete();
//事件激发
pipeline.fireChannelReadComplete();
//如果需要关闭,则处理关闭
if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
//根据情况移除OP_READ事件
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
- ByteBufAllocator 是内存分配者,用于申请内存
- RecvByteBufAllocator.Handle 内存读取策略,不可能一次性读完所有的数据
- allocHandle.lastBytesRead方法标记已读取的内容大下
- doReadBytes是真正的从管道读取数据
- 从上述代码可知,如果读取数据过大,会多次触发pipeline.fireChannelRead
ByteToMessageDecoder解决多次触发问题
从上述源码分析得,如果NIO一次性传取的数据过大会多次触发fireChannelRead。可在代码编写的时候感觉不出来,Netty在ByteToMessageDecoder做处理. ByteToMessageDecoder往往是头部pipeline.
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//只处理字节缓冲区类型的
if (msg instanceof ByteBuf) {
CodecOutputList out = CodecOutputList.newInstance();
try {
first = cumulation == null;
// 如果是第一次读取则创建一个,不是则累加
cumulation = cumulator.cumulate(ctx.alloc(),
first ? Unpooled.EMPTY_BUFFER : cumulation, (ByteBuf) msg);
// 具体解析数据,到数据符合规则解析到out对象中。
// 如果不合规定,不操作,等待后面的数据累加
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Exception e) {
throw new DecoderException(e);
} finally {
//当数据不为空也不可读,要释放。证明数据有丢失不处理
if (cumulation != null && !cumulation.isReadable()) {
numReads = 0;
cumulation.release();
cumulation = null;
} else if (++ numReads >= discardAfterReads) {
//读取数据的次数大于阈值,则尝试丢弃已读的,避免占着内存
numReads = 0;
discardSomeReadBytes();
}
int size = out.size();
//有被添加或者设置,表是有读过了
firedChannelRead |= out.insertSinceRecycled();
// 如果size > 0 尝试向下面Handler传递数据
fireChannelRead(ctx, out, size);
out.recycle();
}
} else {
ctx.fireChannelRead(msg);//其他类型继续传递
}
}
Channel通道
《网络IO模型》中epoll框架.Netty框架运行例子如下
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.option(ChannelOption.SO_BACKLOG, 1024);
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer(){
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
// http加解码器
p.addLast(new HttpServerCodec());
// 自定义handler
p.addLast(new HttpHelloWorldServerHandler());
}
});
Channel ch = b.bind(PORT).sync().channel();
System.err.println("Open your web browser and navigate to http://127.0.0.1:" + PORT + '/');
ch.closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
其中bind调用过程如下所示
- initAndRegister创建的管道并调用init做初始化
- init对channel进行了pipeline的ServerBootstrapAcceptor设置及bossGroup的使用
- register0将channel事件注册并获取 selectionKey
- run 循环遍历监听nio事件,并分配pipeline处理
- 通过pipeline事件,ServerBootstrapAcceptor获取连接进来的childChannel,对其注册并配置workerGroup
- childChannel注册流程和channel相似
其中bossGroup,workerGroup的设计为传统Reactor反应堆设计模式。
Pipeline事件模型
在Netty 中每个Channel 都有且仅有一个ChannelPipeline 与之对应。
Pipeline的元素包括2部份:
- Context 标记Handler的位置,状态及提供上下文的依赖,事件传播
- Handler 专门处理事件
Pipeline 的事件传播机制
Netty 中的传播事件可以分为两种:Inbound 事件和Outbound 事件。
从上图可以看出,inbound 事件和outbound 事件的流向是不一样的,inbound 事件的流行是从下至上,而outbound刚好相反,是从上到下。
通过 AbstractChannelHandlerContext 中 executionMask属性来标识MASK_ALL_INBOUND,MASK_ALL_OUTBOUND 所支持的事务。
// 向下链路查找符合当前事件传播规则的Context
// mask 标识当前要传递的事件
private AbstractChannelHandlerContext findContextInbound(int mask) {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while ((ctx.executionMask & mask) == 0);
return ctx;
}
链表中head 是一个ChannelOutboundHandler,而tail则是一个ChannelInboundHandler.表示当Inbound事件没人处理tail空处理,反之Outbound没人处理head空处理
主要参考
《Netty4源码分析——框架总结》
《源码分析Netty系列》
《Netty之大动脉Pipeline》
《Netty 启动过程源码分析》