组合模式1(netty CompositeByteBuf)

一.组合模式作用以及场景

  1.当对象之间具有部分和整体结构时,比如目录与子目录,总类和子类,父节点和子节点。

  2.当子对象操作可以统一处理时

二.复合缓冲区 Composite Buffer

  Composite Buffer是Netty特有的缓冲区。本质上类似于提供一个或多个ByteBuf的组合视图,

可以根据需要添加和删除不同类型的ByteBuf。它是一个组合视图。它提供一种访问方式让使用者自

由的组合多个ByteBuf,避免了拷贝和分配新的缓冲区。

Composite Buffer不支持访问其支撑数组。因此如果要访问,需要先将内容拷贝到堆内存
中,再进行访问
下图是将多个ByteBuf组合在一起,没有进行任何复制过程。仅仅创建了一个视


组合模式1(netty CompositeByteBuf)

 

三.用法

private static ByteBuf newCompositeBuffer(ByteBufAllocator alloc) {
        CompositeByteBuf compositeByteBuf = alloc.compositeBuffer();
        compositeByteBuf.addComponent(true, alloc.directBuffer(4).writeInt(100));
        compositeByteBuf.addComponent(true, alloc.directBuffer(8).writeLong(123));
        compositeByteBuf.addComponent(true, alloc.directBuffer(8).writeLong(456));
        assertEquals(EXPECTED_BYTES, compositeByteBuf.readableBytes());
        return compositeByteBuf;
    }
public static void testSingleCompositeBufferWrite(ServerBootstrap sb, Bootstrap cb) throws Throwable {
Channel serverChannel = null;
Channel clientChannel = null;
try {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Object> clientReceived = new AtomicReference<Object>();
sb.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(newCompositeBuffer(ctx.alloc()))
.addListener(ChannelFutureListener.CLOSE);
}
});
}
});
cb.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
private ByteBuf aggregator;
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
aggregator = ctx.alloc().buffer(EXPECTED_BYTES);
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
if (msg instanceof ByteBuf) {
aggregator.writeBytes((ByteBuf) msg);
}
} finally {
ReferenceCountUtil.release(msg);
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// IOException is fine as it will also close the channel and may just be a connection reset.
if (!(cause instanceof IOException)) {
clientReceived.set(cause);
latch.countDown();
}
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
if (clientReceived.compareAndSet(null, aggregator)) {
try {
assertEquals(EXPECTED_BYTES, aggregator.readableBytes());
} catch (Throwable cause) {
aggregator.release();
aggregator = null;
clientReceived.set(cause);
} finally {
latch.countDown();
}
}
}
});
}
});

serverChannel = sb.bind().syncUninterruptibly().channel();
clientChannel = cb.connect(serverChannel.localAddress()).syncUninterruptibly().channel();

ByteBuf expected = newCompositeBuffer(clientChannel.alloc());
latch.await();
Object received = clientReceived.get();
if (received instanceof ByteBuf) {
ByteBuf actual = (ByteBuf) received;
assertEquals(expected, actual);
expected.release();
actual.release();
} else {
expected.release();
throw (Throwable) received;
}
} finally {
if (clientChannel != null) {
clientChannel.close().sync();
}
if (serverChannel != null) {
serverChannel.close().sync();
}
}
}

  

四.源码

内部类   CompositeByteBuf.Component 

 private static final class Component {
        final ByteBuf srcBuf; // the originally added buffer
        final ByteBuf buf; // srcBuf unwrapped zero or more times

        int srcAdjustment; // index of the start of this CompositeByteBuf relative to srcBuf
        int adjustment; // index of the start of this CompositeByteBuf relative to buf

        int offset; // offset of this component within this CompositeByteBuf
        int endOffset; // end offset of this component within this CompositeByteBuf

        private ByteBuf slice; // cached slice, may be null

        Component(ByteBuf srcBuf, int srcOffset, ByteBuf buf, int bufOffset,
                int offset, int len, ByteBuf slice) {
            this.srcBuf = srcBuf;
            this.srcAdjustment = srcOffset - offset;
            this.buf = buf;
            this.adjustment = bufOffset - offset;
            this.offset = offset;
            this.endOffset = offset + len;
            this.slice = slice;
        }

        int srcIdx(int index) {
            return index + srcAdjustment;
        }

        int idx(int index) {
            return index + adjustment;
        }

        int length() {
            return endOffset - offset;
        }

        void reposition(int newOffset) {
            int move = newOffset - offset;
            endOffset += move;
            srcAdjustment -= move;
            adjustment -= move;
            offset = newOffset;
        }

        // copy then release
        void transferTo(ByteBuf dst) {
            dst.writeBytes(buf, idx(offset), length());
            free();
        }

        ByteBuf slice() {
            ByteBuf s = slice;
            if (s == null) {
                slice = s = srcBuf.slice(srcIdx(offset), length());
            }
            return s;
        }

        ByteBuf duplicate() {
            return srcBuf.duplicate();
        }

        ByteBuffer internalNioBuffer(int index, int length) {
            // Some buffers override this so we must use srcBuf
            return srcBuf.internalNioBuffer(srcIdx(index), length);
        }

        void free() {
            slice = null;
            // Release the original buffer since it may have a different
            // refcount to the unwrapped buf (e.g. if PooledSlicedByteBuf)
            srcBuf.release();
        }
    }

  

public class CompositeByteBuf extends AbstractReferenceCountedByteBuf implements Iterable<ByteBuf> {

    private static final ByteBuffer EMPTY_NIO_BUFFER = Unpooled.EMPTY_BUFFER.nioBuffer();
    private static final Iterator<ByteBuf> EMPTY_ITERATOR = Collections.<ByteBuf>emptyList().iterator();

    private final ByteBufAllocator alloc;
    private final boolean direct;
    private final int maxNumComponents;

    private int componentCount;
    private Component[] components; // resized when needed

    private boolean freed;

    private CompositeByteBuf(ByteBufAllocator alloc, boolean direct, int maxNumComponents, int initSize) {
        super(AbstractByteBufAllocator.DEFAULT_MAX_CAPACITY);

        this.alloc = ObjectUtil.checkNotNull(alloc, "alloc");
        if (maxNumComponents < 1) {
            throw new IllegalArgumentException(
                    "maxNumComponents: " + maxNumComponents + " (expected: >= 1)");
        }

        this.direct = direct;
        this.maxNumComponents = maxNumComponents;
        components = newCompArray(initSize, maxNumComponents);
    }

    public CompositeByteBuf(ByteBufAllocator alloc, boolean direct, int maxNumComponents) {
        this(alloc, direct, maxNumComponents, 0);
    }

    public CompositeByteBuf(ByteBufAllocator alloc, boolean direct, int maxNumComponents, ByteBuf... buffers) {
        this(alloc, direct, maxNumComponents, buffers, 0);
    }

    CompositeByteBuf(ByteBufAllocator alloc, boolean direct, int maxNumComponents,
            ByteBuf[] buffers, int offset) {
        this(alloc, direct, maxNumComponents, buffers.length - offset);

        addComponents0(false, 0, buffers, offset);
        consolidateIfNeeded();
        setIndex0(0, capacity());
    }

    public CompositeByteBuf(
            ByteBufAllocator alloc, boolean direct, int maxNumComponents, Iterable<ByteBuf> buffers) {
        this(alloc, direct, maxNumComponents,
                buffers instanceof Collection ? ((Collection<ByteBuf>) buffers).size() : 0);

        addComponents(false, 0, buffers);
        setIndex(0, capacity());
    }
//......

 public CompositeByteBuf addComponent(ByteBuf buffer) {
        return addComponent(false, buffer);
    }

 public CompositeByteBuf addComponents(ByteBuf... buffers) {
        return addComponents(false, buffers);
    }

public CompositeByteBuf addComponent(boolean increaseWriterIndex, ByteBuf buffer) {
        return addComponent(increaseWriterIndex, componentCount, buffer);
    }

public CompositeByteBuf addComponent(boolean increaseWriterIndex, int cIndex, ByteBuf buffer) {
        checkNotNull(buffer, "buffer");
        addComponent0(increaseWriterIndex, cIndex, buffer);
        consolidateIfNeeded();
        return this;
    }

private int addComponent0(boolean increaseWriterIndex, int cIndex, ByteBuf buffer) {
        assert buffer != null;
        boolean wasAdded = false;
        try {
            checkComponentIndex(cIndex);

            // No need to consolidate - just add a component to the list.
            Component c = newComponent(ensureAccessible(buffer), 0);
            int readableBytes = c.length();

            // Check if we would overflow.
            // See https://github.com/netty/netty/issues/10194
            checkForOverflow(capacity(), readableBytes);

            addComp(cIndex, c);
            wasAdded = true;
            if (readableBytes > 0 && cIndex < componentCount - 1) {
                updateComponentOffsets(cIndex);
            } else if (cIndex > 0) {
                c.reposition(components[cIndex - 1].endOffset);
            }
            if (increaseWriterIndex) {
                writerIndex += readableBytes;
            }
            return cIndex;
        } finally {
            if (!wasAdded) {
                buffer.release();
            }
        }
    }

//......



}

五.总结

通过组合模式将不同的ByteBuf合并成一个ByteBuf,并且不产生额外空间,将数据的合并封装并统一处理,方便了管理和调用。

 

  

上一篇:AJPFX的内存管理小结


下一篇:智能聊天机器人实现 源码+解析