前面八篇博客,我们已经介绍了服务端的主Reactor
线程的启动流程,以及服务端的工作Reactor
线程的启动流程,以及工作的Reactor
线程注册感兴趣的事件的过程,今天这篇博客我们主要介绍一下工作线程的读取数据的流程,以及介绍几个常用的的pipeline
。就让我们直接进入对应的代码,走来直接看工作线程中的循环流程。具体的代码如下:
public final class NioEventLoop extends SingleThreadEventLoop {
//事件循环
@Override
protected void run() {
for (;;) {
try {
try {
//hasTasks() 若taskQueue or tailTasks任务队列中有任务 返回true 没有则返回false
//有任务返回selectNow的返回值 没任务返回-1
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIO
case SelectStrategy.SELECT:
//首先轮询注册到reactor线程对应的selector上的所有的channel的IO事件
//wakenUp 表示是否应该唤醒正在阻塞的select操作,netty在每次进行新的loop之前,都会将wakeUp 被设置成false,标志新的一轮loop的开始
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
// fall through
default:
}
} catch (IOException e) {
// If we receive an IOException here its because the Selector is messed up. Let's rebuild
// the selector and retry. https://github.com/netty/netty/issues/8566
rebuildSelector0();
handleLoopException(e);
continue;
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
//2.处理产生网络IO事件的channel
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
//3.处理任务队列
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
}
这个时候假设我们任务队列中没有任务,也没有定时任务,这个时候,假设发生了读的事件,这个时候执行select(wakenUp.getAndSet(false));
方法,就会将发生事件的selectKey
记录下来,然后会执行processSelectedKeys();
方法处理这些事件,这些流程我们前面已经很熟悉了,这儿不做过多的介绍,我们直接跟进对应的代码就行了,具体的代码如下:
public final class NioEventLoop extends SingleThreadEventLoop {
private void processSelectedKeys() {
if (selectedKeys != null) {
//调用 selectedKeys 不为空
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
// null out entry in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.keys[i] = null;
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
//附加对象是AbstractNioChannel 继续调用
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) {
// null out entries in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
}
}
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
// k.isValid()告知此键是否有效。
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
return;
}
if (eventLoop != this || eventLoop == null) {
return;
}
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
return;
}
try {
//获取此键的 ready 操作集合。
int readyOps = k.readyOps();
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
h.unsafe().forceFlush();
}
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
//发生读的事件调用
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
}
上面的代码就是经过一系列的调用,然后遍历SelectionKey
,判断是什么事件,最后执行对应的代码,这儿会执行unsafe.read();
方法。我们继续跟进对应的代码,具体的代码如下:
public abstract class AbstractNioByteChannel extends AbstractNioChannel {
protected class NioByteUnsafe extends AbstractNioUnsafe {
@Override
public final void read() {
final ChannelConfig config = config();
//这个方法默认返回的是false
if (shouldBreakReadReady(config)) {
clearReadPending();
return;
}
final ChannelPipeline pipeline = pipeline();
//获取ByteBuf的分配器
final ByteBufAllocator allocator = config.getAllocator();
//接收字节的ByteBuf分配器
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
byteBuf = allocHandle.allocate(allocator);
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read. release the buffer.
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
// There is nothing left to read as we received an EOF.
readPending = false;
}
break;
}
allocHandle.incMessagesRead(1);
readPending = false;
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}
}
上面走来的判断shouldBreakReadReady(config)
默认是返回返回false
,然后就是获取ByteBuf
的分配器。为了后面分配对应的ByteBuf
做准备。这个时候我们需要看下doReadBytes(byteBuf)
方法,具体的代码如下:
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
@Override
protected int doReadBytes(ByteBuf byteBuf) throws Exception {
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.attemptedBytesRead(byteBuf.writableBytes());
return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
}
}
上面的代码就是将从java
原生的通道的读取到的字节写入到byteBuf
中,返回读到的字节数,我们继续返回到原来的执行的代码地方,具体的代码如下:
public abstract class AbstractNioByteChannel extends AbstractNioChannel {
protected class NioByteUnsafe extends AbstractNioUnsafe {
@Override
public final void read() {
try {
//省略一部分代码
do {
byteBuf = allocHandle.allocate(allocator);
//设置上次读取的字节数
allocHandle.lastBytesRead(doReadBytes(byteBuf));
//判断如果读到的是空的直接结束
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read. release the buffer.
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
// There is nothing left to read as we received an EOF.
readPending = false;
}
break;
}
allocHandle.incMessagesRead(1);
readPending = false;
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}
}
上面的代码将上一次读取到的字节数通过allocHandle.lastBytesRead(doReadBytes(byteBuf));
设置进去,然后通过allocHandle.lastBytesRead() <= 0
判断上一次读取的数据是否为空,如果不为空,我们就继续执行pipeline.fireChannelRead(byteBuf);
方法,这个方法就是从头节点向下传播,并同时将读取到的byteBuf
传入进去,我们继续跟进对应的代码,具体的代码如下:
public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public final ChannelPipeline fireChannelRead(Object msg) {
//在新连接接入时 msg是NioSocketChannel
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
}
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
//执行channelRead方法
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
//执行头节点的channelRead方法
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}
}
public class DefaultChannelPipeline implements ChannelPipeline {
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println(this.getClass() + " HeadContext.channelRead");
//向下传播
ctx.fireChannelRead(msg);
}
}
}
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
//找头节点的下一个节点是Inbound的节点
invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
return this;
}
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
//继续调用
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
//继续调用
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}
}
上面的代码先是调用了头节点的channelRead
的方法,然后查找头节点的下一个节点是inbound
的节点,然后调用对应的channelRead
方法,我们再回到原来的地方,假设读取完了客户端发来的消息,后续的流程执行过程,具体的代码如下:
public abstract class AbstractNioByteChannel extends AbstractNioChannel {
protected class NioByteUnsafe extends AbstractNioUnsafe {
@Override
public final void read() {
final ChannelConfig config = config();
//这个方法默认返回的是false
if (shouldBreakReadReady(config)) {
clearReadPending();
return;
}
final ChannelPipeline pipeline = pipeline();
//获取ByteBuf的分配器
final ByteBufAllocator allocator = config.getAllocator();
//接收字节的ByteBuf分配器
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
//省略一部分代码
//执行读完成
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}
}
这个时候会执行pipeline.fireChannelReadComplete();
方法,又是向下传播执行channelReadComplete()
方法,其实执行下去就是注册感兴趣的事件,由于这个时候我们的读的感兴趣的事件已经注册了,不会再注册了。所以到此整个服务端的读的流程已经结束了。这儿我们需要分析下解码器的源码。一个基于固定长度的解码器FixedLengthFrameDecoder
,我们会主要看对应的channelRead
方法。由于这个类没有实现channelRead
方法,而是父类ByteToMessageDecoder
中实现了。具体的代码如下:
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//判断是否是ByteBuf很明显是的
if (msg instanceof ByteBuf) {
CodecOutputList out = CodecOutputList.newInstance();
try {
ByteBuf data = (ByteBuf) msg;
first = cumulation == null;
if (first) {
//第一次直接赋值
cumulation = data;
} else {
//累加
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Exception e) {
throw new DecoderException(e);
} finally {
if (cumulation != null && !cumulation.isReadable()) {
numReads = 0;
cumulation.release();
cumulation = null;
} else if (++ numReads >= discardAfterReads) {
// We did enough reads already try to discard some bytes so we not risk to see a OOME.
// See https://github.com/netty/netty/issues/4275
numReads = 0;
discardSomeReadBytes();
}
int size = out.size();
decodeWasNull = !out.insertSinceRecycled();
fireChannelRead(ctx, out, size);
out.recycle();
}
} else {
ctx.fireChannelRead(msg);
}
}
}
走来先判断是否是ByteBuf
,很明显这儿是ByteBuf
,然后创建CodecOutputList
类,当我们打开这个类的时候发现继承的是AbstractList
,所以这儿我们就将这个类当成一个集合。然后将传进来的msg
转成ByteBuf
,cumulation
走来是null
的,所以这儿的判断是正确的,表示是第一次读,这个时候会直接将读到的ByteBuf
赋值给cumulation
,然后会执行callDecode(ctx, cumulation, out);
,这个时候我们继续跟进去查看对应的代码,具体的代码如下:
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
//传进来的参数in表示读到的ByteBuf并且是赋值给cumulation
//out传进来的集合,原来的创建好的集合CodecOutputList
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
try {
while (in.isReadable()) {
int outSize = out.size();
if (outSize > 0) {
fireChannelRead(ctx, out, outSize);
out.clear();
// Check if this handler was removed before continuing with decoding.
// If it was removed, it is not safe to continue to operate on the buffer.
//
// See:
// - https://github.com/netty/netty/issues/4635
if (ctx.isRemoved()) {
break;
}
outSize = 0;
}
int oldInputLength = in.readableBytes();
decodeRemovalReentryProtection(ctx, in, out);
// Check if this handler was removed before continuing the loop.
// If it was removed, it is not safe to continue to operate on the buffer.
//
// See https://github.com/netty/netty/issues/1664
if (ctx.isRemoved()) {
break;
}
if (outSize == out.size()) {
if (oldInputLength == in.readableBytes()) {
break;
} else {
continue;
}
}
if (oldInputLength == in.readableBytes()) {
throw new DecoderException(
StringUtil.simpleClassName(getClass()) +
".decode() did not read anything but decoded a message.");
}
if (isSingleDecode()) {
break;
}
}
} catch (DecoderException e) {
throw e;
} catch (Exception cause) {
throw new DecoderException(cause);
}
}
}
判断传进来的ByteBuf
是否还可以读,很明显是可以读的,然后进入循环,这个时候的outSize
是等于0,所以下面的判断是不会进入,会直接执行后面的代码,这个时候oldInputLength
是写指针减去读指针的长度。这个时候会直接执行decodeRemovalReentryProtection(ctx, in, out);
方法,我们继续跟进对应代码,具体代码如下:
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
throws Exception {
decodeState = STATE_CALLING_CHILD_DECODE;
try {
//调用子类的解码方法
decode(ctx, in, out);
} finally {
boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
decodeState = STATE_INIT;
if (removePending) {
handlerRemoved(ctx);
}
}
}
}
这个时候会直接调用子类的解码方法,这个时候我们继续跟进decode(ctx, in, out);
方法,具体的代码如下:
public class FixedLengthFrameDecoder extends ByteToMessageDecoder {
@Override
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
Object decoded = decode(ctx, in);
if (decoded != null) {
out.add(decoded);
}
}
protected Object decode(
@SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception {
//读到的字符串小于固定长度frameLength
if (in.readableBytes() < frameLength) {
return null;
} else {
return in.readRetainedSlice(frameLength);
}
}
}
上面的代码,如果读取到长度小于固定长度frameLength
,就直接返回null
,然后out
集合不会添加任何东西,这个时候我们假设读到的内容大于固定的字符长度frameLength
,然后会调用in.readRetainedSlice(frameLength);
方法,我们继续跟进对应的代码。具体的代码如下:
public abstract class AbstractByteBuf extends ByteBuf {
@Override
public ByteBuf readRetainedSlice(int length) {
//检查可读字节
checkReadableBytes(length);
//截取固定长度的字节数
ByteBuf slice = retainedSlice(readerIndex, length);
//改变读指针的位置
readerIndex += length;
return slice;
}
@Override
public ByteBuf retainedSlice(int index, int length) {
return slice(index, length).retain();
}
@Override
public ByteBuf slice(int index, int length) {
ensureAccessible();
return new UnpooledSlicedByteBuf(this, index, length);
}
}
上面的代码走来先检查可读的字节数,然后截取并创建一个新的ByteBuf
,并截取从读指针开始的固定长度的字节数,然后再改变对应的读指针的位置,最后将创建好的ByteBuf
返回出去。我们再回到原来执行代码的地方。具体的代码如下:
public class FixedLengthFrameDecoder extends ByteToMessageDecoder {
@Override
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
Object decoded = decode(ctx, in);
if (decoded != null) {
out.add(decoded);
}
}
protected Object decode(
@SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception {
//读到的字符串小于固定长度frameLength
if (in.readableBytes() < frameLength) {
return null;
} else {
return in.readRetainedSlice(frameLength);
}
}
}
这个时候会将对应的ByteBuf
添加到out
的集合中去,这个时候decode
方法就执行完成。我们继续往回返,具体的代码如下:
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
//传进来的参数in表示读到的ByteBuf并且是赋值给cumulation
//out传进来的集合,原来的创建好的集合CodecOutputList
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
try {
while (in.isReadable()) {
int outSize = out.size();
if (outSize > 0) {
fireChannelRead(ctx, out, outSize);
out.clear();
// Check if this handler was removed before continuing with decoding.
// If it was removed, it is not safe to continue to operate on the buffer.
//
// See:
// - https://github.com/netty/netty/issues/4635
if (ctx.isRemoved()) {
break;
}
outSize = 0;
}
int oldInputLength = in.readableBytes();
decodeRemovalReentryProtection(ctx, in, out);
// Check if this handler was removed before continuing the loop.
// If it was removed, it is not safe to continue to operate on the buffer.
//
// See https://github.com/netty/netty/issues/1664
if (ctx.isRemoved()) {
break;
}
if (outSize == out.size()) {
if (oldInputLength == in.readableBytes()) {
break;
} else {
continue;
}
}
if (oldInputLength == in.readableBytes()) {
throw new DecoderException(
StringUtil.simpleClassName(getClass()) +
".decode() did not read anything but decoded a message.");
}
if (isSingleDecode()) {
break;
}
}
} catch (DecoderException e) {
throw e;
} catch (Exception cause) {
throw new DecoderException(cause);
}
}
}
这个时候假设读到的字节数大于固定长度,那么outSize
是不等于out.size()
的值,因为这个时候的out.size()
是等于固定长度的,而outSize
是等于0的。oldInputLength
也是不等于in.readableBytes()
的因为前面的一个判断如果没有进的话,一定读到了数据,读指针一定改变了,这个时候就一定不会相等。继续循环读,总有一次是outSize
是等于out.size()
,同时oldInputLength == in.readableBytes()
也是相等的,这个时候表示这个客户端发来的数据分片成功了。由于我们的字节的长度是大于固定长度,所以这儿第二次进入这个循环的时候是outSize
是大于0的,就执行执行向下传播的方法。
如果这个时候读到的字节数小于固定长度,那么outSize
就是等于out.size()
,因为这个时候out
集合中没有添加任何字节,这个时候out.size()
会等于0,而oldInputLength == in.readableBytes()
也是相等的这个时候直接结束这个循环。表示没有读到固定长度的数据。我们再返回到原来的代码的调用的地方,具体代码如下:
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//判断是否是ByteBuf很明显是的
if (msg instanceof ByteBuf) {
CodecOutputList out = CodecOutputList.newInstance();
try {
ByteBuf data = (ByteBuf) msg;
first = cumulation == null;
if (first) {
//第一次直接赋值
cumulation = data;
} else {
//累加
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Exception e) {
throw new DecoderException(e);
} finally {
if (cumulation != null && !cumulation.isReadable()) {
numReads = 0;
cumulation.release();
cumulation = null;
} else if (++ numReads >= discardAfterReads) {
// We did enough reads already try to discard some bytes so we not risk to see a OOME.
// See https://github.com/netty/netty/issues/4275
numReads = 0;
discardSomeReadBytes();
}
int size = out.size();
decodeWasNull = !out.insertSinceRecycled();
fireChannelRead(ctx, out, size);
out.recycle();
}
} else {
ctx.fireChannelRead(msg);
}
}
}
这个时候会执行向下传播的方法,然后把out
的值清空。记住这儿的cumulation
并没有清空,而是等待下次客户端发来数据继续按刚才的一样处理,只不过,第二次发来的数据,会调用cumulator.cumulate(ctx.alloc(), cumulation, data);
方法。我们继续跟进对应的代码,具体的代码如下:
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
/**
* @param alloc 内存分配器
* @param cumulation 老数据
* @param in 本次数据
* @return
*/
@Override
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
try {
final ByteBuf buffer;
//如果因为空间满了写不了本次的新数据 就扩容
if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
|| cumulation.refCnt() > 1 || cumulation.isReadOnly()) {
buffer = expandCumulation(alloc, cumulation, in.readableBytes());
} else {
buffer = cumulation;
}
//将新数据写入
buffer.writeBytes(in);
return buffer;
} finally {
in.release();
}
}
};
走来是如果因为原来的空间满了写不了本次的新数据,就直接扩容,然后将本次读取到的数据写入cumulation
,并返回出去,就形成了一个累加的效果了。
这个解码器实现的效果就是,如果固定长度为5的话,客户端第一次发来的是“1234”这个时候服务端是接受不到这个数据的,如果第二次发送的是“5678”这个时候服务端接收的数据是“12345”剩下的“678”还是在原来的ByteToMessageDecoder
中存着,当满足发送的长度的时候,会直接发送。到此整个基于固定长度的解码器FixedLengthFrameDecoder
就讲完了。下一篇博客我会讲讲剩下的几个解码器。