MemoryRecords是Kakfa中Record在内存中的实现形式,它基于Java NIO中ByteBuffer来实现。MemoryRecords中成员变量如下:
private final static int WRITE_LIMIT_FOR_READABLE_ONLY = -1; // the compressor used for appends-only // 仅仅用于appends的压缩器Compressor实例compressor private final Compressor compressor; // the write limit for writable buffer, which may be smaller than the buffer capacity // 可写缓冲区的写限制writeLimit,可能小于缓存区的装载能力 private final int writeLimit; // the capacity of the initial buffer, which is only used for de-allocation of writable records // 最初的缓冲区的装载能力initialCapacity,仅用于重新分配可写的记录 private final int initialCapacity; // the underlying buffer used for read; while the records are still writable it is null // 用于读的底层缓冲区buffer,java NIO的ByteBuffer类,此时记录仍可写 private ByteBuffer buffer; // indicate if the memory records is writable or not (i.e. used for appends or read-only) // 标志内存记录是否可写的状态位writable private boolean writable;其中,compressor是仅仅用于appends的压缩器Compressor实例,writeLimit是可写缓冲区的写限制,它可能小于缓存区的装载能力,initialCapacity是最初的缓冲区的装载能力,仅用于重新分配可写的记录,buffer是java NIO的ByteBuffer实例,用于读的底层缓冲区,而writable是一个标志内存记录是否可写的状态位。
再来看下MemoryRecords的构造函数,如下:
// Construct a writable memory records private MemoryRecords(ByteBuffer buffer, CompressionType type, boolean writable, int writeLimit) { // 根据入参赋值writable、writeLimit this.writable = writable; this.writeLimit = writeLimit; // initialCapacity取Java NIO中ByteBuffer的capacity this.initialCapacity = buffer.capacity(); if (this.writable) {// 如果writable为true,即处于可写状态 // buffer设置为null this.buffer = null; // 利用入参ByteBuffer类型的buffer和CompressionType类型的type构造Compressor实例compressor this.compressor = new Compressor(buffer, type); } else { // buffer设置为入参ByteBuffer类型的buffer this.buffer = buffer; // compressor设置为null this.compressor = null; } }通过MemoryRecords的构造函数我们可以知道,MemoryRecords有两种基本的状态,一个是只写,一个是只读,当为只写时,标志位writable为true,此时MemoryRecords中ByteBuffer类型的成员变量buffer被设置为null,同时利用入参ByteBuffer类型的buffer和CompressionType类型的type构造Compressor实例compressor;当为只读时,buffer设置为入参ByteBuffer类型的buffer,compressor设置为null。
MemoryRecords最主要的一个功能就是添加记录,而实现这一功能的方法就是append()方法,代码如下:
/** * Append a new record and offset to the buffer * 添加一条新的记录,并且在缓冲区buffer中记录偏移量offset */ public void append(long offset, byte[] key, byte[] value) { // 首先判断MemoryRecords的可写标志位writable if (!writable) throw new IllegalStateException("Memory records is not writable"); // 根据key、value通过Record的recordSize()方法计算记录大小size int size = Record.recordSize(key, value); // 压缩器compressor中记录偏移量offset、记录大小size、记录key和value compressor.putLong(offset); compressor.putInt(size); compressor.putRecord(key, value); // 压缩器compressor中通过recordWritten()方法累加记录数numRecords和已写入未压缩总大小writtenUncompressed, // 记录数numRecords为加1, // 而已写入未压缩总大小writtenUncompresse是记录大小size再加上size所占大小和offset所占大小之和LOG_OVERHEAD,也就是额外的Int+Long为12 compressor.recordWritten(size + Records.LOG_OVERHEAD); }上面为key、value形式的append,而还有一种Record形式的append,代码如下:
/** * Append the given record and offset to the buffer */ public void append(long offset, Record record) { // 首先判断MemoryRecords的可写标志位writable if (!writable) throw new IllegalStateException("Memory records is not writable"); // 获取记录大小size int size = record.size(); // 压缩器compressor中记录偏移量offset、记录大小size、记录record的buffer compressor.putLong(offset); compressor.putInt(size); compressor.put(record.buffer()); // 压缩器compressor中通过recordWritten()方法累加记录数numRecords和已写入未压缩总大小writtenUncompressed, // 记录数numRecords为加1, // 而已写入未压缩总大小writtenUncompresse是记录大小size再加上size所占大小和offset所占大小之和LOG_OVERHEAD,也就是额外的Int+Long为12 compressor.recordWritten(size + Records.LOG_OVERHEAD); // 重绕此缓冲区,将位置设置为零并丢弃标记 record.buffer().rewind(); }MemoryRecords中还有一个针对指定Record的key、value来判断是否尚有余地的hasRoomFor()方法,代码如下:
/** * Check if we have room for a new record containing the given key/value pair * * Note that the return value is based on the estimate of the bytes written to the compressor, which may not be * accurate if compression is really used. When this happens, the following append may cause dynamic buffer * re-allocation in the underlying byte buffer stream. * * There is an exceptional case when appending a single message whose size is larger than the batch size, the * capacity will be the message size which is larger than the write limit, i.e. the batch size. In this case * the checking should be based on the capacity of the initialized buffer rather than the write limit in order * to accept this single record. */ public boolean hasRoomFor(byte[] key, byte[] value) { return this.writable && this.compressor.numRecordsWritten() == 0 ? this.initialCapacity >= Records.LOG_OVERHEAD + Record.recordSize(key, value) : this.writeLimit >= this.compressor.estimatedBytesWritten() + Records.LOG_OVERHEAD + Record.recordSize(key, value); }hasRoomFor()方法中,需要首先判断writable是否为true,writable为false的话会直接返回false,直接通知调用者MemoryRecords已无余地存储Record。writable为true的话,还需要判断compressor的已写入数据大小numRecordsWritten是否为0,为0 的话,根据MemoryRecords的最初的缓冲区的装载能力initialCapacity是否大于key、value、offset所占大小、size所占大小之和来确定是否尚有足够空间容纳一个Record,不为0 的话,则根据可写缓冲区的写限制writeLimit是否大于compressor预估算的大小与key、value、offset所占大小、size所占大小之和来确定是否尚有足够空间容纳一个Record。compressor预估算的大小通过如下方法来判断:
public long estimatedBytesWritten() { if (type == CompressionType.NONE) { return bufferStream.buffer().position(); } else { // estimate the written bytes to the underlying byte buffer based on uncompressed written bytes return (long) (writtenUncompressed * TYPE_TO_RATE[type.id] * COMPRESSION_RATE_ESTIMATION_FACTOR); } }