一.组合模式作用以及场景
1.当对象之间具有部分和整体结构时,比如目录与子目录,总类和子类,父节点和子节点。
2.当子对象操作可以统一处理时
二.复合缓冲区 Composite Buffer
Composite Buffer是Netty特有的缓冲区。本质上类似于提供一个或多个ByteBuf的组合视图,
可以根据需要添加和删除不同类型的ByteBuf。它是一个组合视图。它提供一种访问方式让使用者自
由的组合多个ByteBuf,避免了拷贝和分配新的缓冲区。
Composite Buffer不支持访问其支撑数组。因此如果要访问,需要先将内容拷贝到堆内存
中,再进行访问
下图是将多个ByteBuf组合在一起,没有进行任何复制过程。仅仅创建了一个视
图
三.用法
private static ByteBuf newCompositeBuffer(ByteBufAllocator alloc) { CompositeByteBuf compositeByteBuf = alloc.compositeBuffer(); compositeByteBuf.addComponent(true, alloc.directBuffer(4).writeInt(100)); compositeByteBuf.addComponent(true, alloc.directBuffer(8).writeLong(123)); compositeByteBuf.addComponent(true, alloc.directBuffer(8).writeLong(456)); assertEquals(EXPECTED_BYTES, compositeByteBuf.readableBytes()); return compositeByteBuf; }
public static void testSingleCompositeBufferWrite(ServerBootstrap sb, Bootstrap cb) throws Throwable {
Channel serverChannel = null;
Channel clientChannel = null;
try {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Object> clientReceived = new AtomicReference<Object>();
sb.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(newCompositeBuffer(ctx.alloc()))
.addListener(ChannelFutureListener.CLOSE);
}
});
}
});
cb.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
private ByteBuf aggregator;
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
aggregator = ctx.alloc().buffer(EXPECTED_BYTES);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
if (msg instanceof ByteBuf) {
aggregator.writeBytes((ByteBuf) msg);
}
} finally {
ReferenceCountUtil.release(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// IOException is fine as it will also close the channel and may just be a connection reset.
if (!(cause instanceof IOException)) {
clientReceived.set(cause);
latch.countDown();
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
if (clientReceived.compareAndSet(null, aggregator)) {
try {
assertEquals(EXPECTED_BYTES, aggregator.readableBytes());
} catch (Throwable cause) {
aggregator.release();
aggregator = null;
clientReceived.set(cause);
} finally {
latch.countDown();
}
}
}
});
}
});
serverChannel = sb.bind().syncUninterruptibly().channel();
clientChannel = cb.connect(serverChannel.localAddress()).syncUninterruptibly().channel();
ByteBuf expected = newCompositeBuffer(clientChannel.alloc());
latch.await();
Object received = clientReceived.get();
if (received instanceof ByteBuf) {
ByteBuf actual = (ByteBuf) received;
assertEquals(expected, actual);
expected.release();
actual.release();
} else {
expected.release();
throw (Throwable) received;
}
} finally {
if (clientChannel != null) {
clientChannel.close().sync();
}
if (serverChannel != null) {
serverChannel.close().sync();
}
}
}
四.源码
内部类 CompositeByteBuf.Component
private static final class Component { final ByteBuf srcBuf; // the originally added buffer final ByteBuf buf; // srcBuf unwrapped zero or more times int srcAdjustment; // index of the start of this CompositeByteBuf relative to srcBuf int adjustment; // index of the start of this CompositeByteBuf relative to buf int offset; // offset of this component within this CompositeByteBuf int endOffset; // end offset of this component within this CompositeByteBuf private ByteBuf slice; // cached slice, may be null Component(ByteBuf srcBuf, int srcOffset, ByteBuf buf, int bufOffset, int offset, int len, ByteBuf slice) { this.srcBuf = srcBuf; this.srcAdjustment = srcOffset - offset; this.buf = buf; this.adjustment = bufOffset - offset; this.offset = offset; this.endOffset = offset + len; this.slice = slice; } int srcIdx(int index) { return index + srcAdjustment; } int idx(int index) { return index + adjustment; } int length() { return endOffset - offset; } void reposition(int newOffset) { int move = newOffset - offset; endOffset += move; srcAdjustment -= move; adjustment -= move; offset = newOffset; } // copy then release void transferTo(ByteBuf dst) { dst.writeBytes(buf, idx(offset), length()); free(); } ByteBuf slice() { ByteBuf s = slice; if (s == null) { slice = s = srcBuf.slice(srcIdx(offset), length()); } return s; } ByteBuf duplicate() { return srcBuf.duplicate(); } ByteBuffer internalNioBuffer(int index, int length) { // Some buffers override this so we must use srcBuf return srcBuf.internalNioBuffer(srcIdx(index), length); } void free() { slice = null; // Release the original buffer since it may have a different // refcount to the unwrapped buf (e.g. if PooledSlicedByteBuf) srcBuf.release(); } }
public class CompositeByteBuf extends AbstractReferenceCountedByteBuf implements Iterable<ByteBuf> { private static final ByteBuffer EMPTY_NIO_BUFFER = Unpooled.EMPTY_BUFFER.nioBuffer(); private static final Iterator<ByteBuf> EMPTY_ITERATOR = Collections.<ByteBuf>emptyList().iterator(); private final ByteBufAllocator alloc; private final boolean direct; private final int maxNumComponents; private int componentCount; private Component[] components; // resized when needed private boolean freed; private CompositeByteBuf(ByteBufAllocator alloc, boolean direct, int maxNumComponents, int initSize) { super(AbstractByteBufAllocator.DEFAULT_MAX_CAPACITY); this.alloc = ObjectUtil.checkNotNull(alloc, "alloc"); if (maxNumComponents < 1) { throw new IllegalArgumentException( "maxNumComponents: " + maxNumComponents + " (expected: >= 1)"); } this.direct = direct; this.maxNumComponents = maxNumComponents; components = newCompArray(initSize, maxNumComponents); } public CompositeByteBuf(ByteBufAllocator alloc, boolean direct, int maxNumComponents) { this(alloc, direct, maxNumComponents, 0); } public CompositeByteBuf(ByteBufAllocator alloc, boolean direct, int maxNumComponents, ByteBuf... buffers) { this(alloc, direct, maxNumComponents, buffers, 0); } CompositeByteBuf(ByteBufAllocator alloc, boolean direct, int maxNumComponents, ByteBuf[] buffers, int offset) { this(alloc, direct, maxNumComponents, buffers.length - offset); addComponents0(false, 0, buffers, offset); consolidateIfNeeded(); setIndex0(0, capacity()); } public CompositeByteBuf( ByteBufAllocator alloc, boolean direct, int maxNumComponents, Iterable<ByteBuf> buffers) { this(alloc, direct, maxNumComponents, buffers instanceof Collection ? ((Collection<ByteBuf>) buffers).size() : 0); addComponents(false, 0, buffers); setIndex(0, capacity()); } //...... public CompositeByteBuf addComponent(ByteBuf buffer) { return addComponent(false, buffer); } public CompositeByteBuf addComponents(ByteBuf... buffers) { return addComponents(false, buffers); } public CompositeByteBuf addComponent(boolean increaseWriterIndex, ByteBuf buffer) { return addComponent(increaseWriterIndex, componentCount, buffer); } public CompositeByteBuf addComponent(boolean increaseWriterIndex, int cIndex, ByteBuf buffer) { checkNotNull(buffer, "buffer"); addComponent0(increaseWriterIndex, cIndex, buffer); consolidateIfNeeded(); return this; } private int addComponent0(boolean increaseWriterIndex, int cIndex, ByteBuf buffer) { assert buffer != null; boolean wasAdded = false; try { checkComponentIndex(cIndex); // No need to consolidate - just add a component to the list. Component c = newComponent(ensureAccessible(buffer), 0); int readableBytes = c.length(); // Check if we would overflow. // See https://github.com/netty/netty/issues/10194 checkForOverflow(capacity(), readableBytes); addComp(cIndex, c); wasAdded = true; if (readableBytes > 0 && cIndex < componentCount - 1) { updateComponentOffsets(cIndex); } else if (cIndex > 0) { c.reposition(components[cIndex - 1].endOffset); } if (increaseWriterIndex) { writerIndex += readableBytes; } return cIndex; } finally { if (!wasAdded) { buffer.release(); } } } //...... }
五.总结
通过组合模式将不同的ByteBuf合并成一个ByteBuf,并且不产生额外空间,将数据的合并封装并统一处理,方便了管理和调用。