Kafka源码分析之MemoryRecords

        MemoryRecords是Kakfa中Record在内存中的实现形式,它基于Java NIO中ByteBuffer来实现。MemoryRecords中成员变量如下:

    private final static int WRITE_LIMIT_FOR_READABLE_ONLY = -1;

    // the compressor used for appends-only
    // 仅仅用于appends的压缩器Compressor实例compressor
    private final Compressor compressor;

    // the write limit for writable buffer, which may be smaller than the buffer capacity
    // 可写缓冲区的写限制writeLimit,可能小于缓存区的装载能力
    private final int writeLimit;

    // the capacity of the initial buffer, which is only used for de-allocation of writable records
    // 最初的缓冲区的装载能力initialCapacity,仅用于重新分配可写的记录
    private final int initialCapacity;

    // the underlying buffer used for read; while the records are still writable it is null
    // 用于读的底层缓冲区buffer,java NIO的ByteBuffer类,此时记录仍可写
    private ByteBuffer buffer;

    // indicate if the memory records is writable or not (i.e. used for appends or read-only)
    // 标志内存记录是否可写的状态位writable
    private boolean writable;
        其中,compressor是仅仅用于appends的压缩器Compressor实例,writeLimit是可写缓冲区的写限制,它可能小于缓存区的装载能力,initialCapacity是最初的缓冲区的装载能力,仅用于重新分配可写的记录,buffer是java NIO的ByteBuffer实例,用于读的底层缓冲区,而writable是一个标志内存记录是否可写的状态位。

        再来看下MemoryRecords的构造函数,如下:

    // Construct a writable memory records
    private MemoryRecords(ByteBuffer buffer, CompressionType type, boolean writable, int writeLimit) {
        
    	// 根据入参赋值writable、writeLimit
    	this.writable = writable;
        this.writeLimit = writeLimit;
        
        // initialCapacity取Java NIO中ByteBuffer的capacity
        this.initialCapacity = buffer.capacity();
        
        if (this.writable) {// 如果writable为true,即处于可写状态
        	
        	// buffer设置为null
            this.buffer = null;
            
            // 利用入参ByteBuffer类型的buffer和CompressionType类型的type构造Compressor实例compressor
            this.compressor = new Compressor(buffer, type);
        } else {
        	
        	// buffer设置为入参ByteBuffer类型的buffer
            this.buffer = buffer;
            
            // compressor设置为null
            this.compressor = null;
        }
    }
        通过MemoryRecords的构造函数我们可以知道,MemoryRecords有两种基本的状态,一个是只写,一个是只读,当为只写时,标志位writable为true,此时MemoryRecords中ByteBuffer类型的成员变量buffer被设置为null,同时利用入参ByteBuffer类型的buffer和CompressionType类型的type构造Compressor实例compressor;当为只读时,buffer设置为入参ByteBuffer类型的buffer,compressor设置为null。

        MemoryRecords最主要的一个功能就是添加记录,而实现这一功能的方法就是append()方法,代码如下:

    /**
     * Append a new record and offset to the buffer
     * 添加一条新的记录,并且在缓冲区buffer中记录偏移量offset
     */
    public void append(long offset, byte[] key, byte[] value) {
    	
    	// 首先判断MemoryRecords的可写标志位writable
    	if (!writable)
            throw new IllegalStateException("Memory records is not writable");

    	// 根据key、value通过Record的recordSize()方法计算记录大小size
        int size = Record.recordSize(key, value);
        
        // 压缩器compressor中记录偏移量offset、记录大小size、记录key和value
        compressor.putLong(offset);
        compressor.putInt(size);
        compressor.putRecord(key, value);
        
        // 压缩器compressor中通过recordWritten()方法累加记录数numRecords和已写入未压缩总大小writtenUncompressed,
        // 记录数numRecords为加1,
        // 而已写入未压缩总大小writtenUncompresse是记录大小size再加上size所占大小和offset所占大小之和LOG_OVERHEAD,也就是额外的Int+Long为12
        compressor.recordWritten(size + Records.LOG_OVERHEAD);
    }
        上面为key、value形式的append,而还有一种Record形式的append,代码如下:

    /**
     * Append the given record and offset to the buffer
     */
    public void append(long offset, Record record) {
        
    	// 首先判断MemoryRecords的可写标志位writable
    	if (!writable)
            throw new IllegalStateException("Memory records is not writable");

    	// 获取记录大小size
        int size = record.size();
        
        // 压缩器compressor中记录偏移量offset、记录大小size、记录record的buffer
        compressor.putLong(offset);
        compressor.putInt(size);
        compressor.put(record.buffer());
        
        // 压缩器compressor中通过recordWritten()方法累加记录数numRecords和已写入未压缩总大小writtenUncompressed,
        // 记录数numRecords为加1,
        // 而已写入未压缩总大小writtenUncompresse是记录大小size再加上size所占大小和offset所占大小之和LOG_OVERHEAD,也就是额外的Int+Long为12
        compressor.recordWritten(size + Records.LOG_OVERHEAD);
        
        // 重绕此缓冲区,将位置设置为零并丢弃标记
        record.buffer().rewind();
    }
        MemoryRecords中还有一个针对指定Record的key、value来判断是否尚有余地的hasRoomFor()方法,代码如下:

    /**
     * Check if we have room for a new record containing the given key/value pair
     *
     * Note that the return value is based on the estimate of the bytes written to the compressor, which may not be
     * accurate if compression is really used. When this happens, the following append may cause dynamic buffer
     * re-allocation in the underlying byte buffer stream.
     *
     * There is an exceptional case when appending a single message whose size is larger than the batch size, the
     * capacity will be the message size which is larger than the write limit, i.e. the batch size. In this case
     * the checking should be based on the capacity of the initialized buffer rather than the write limit in order
     * to accept this single record.
     */
    public boolean hasRoomFor(byte[] key, byte[] value) {
        return this.writable && this.compressor.numRecordsWritten() == 0 ?
            this.initialCapacity >= Records.LOG_OVERHEAD + Record.recordSize(key, value) :
            this.writeLimit >= this.compressor.estimatedBytesWritten() + Records.LOG_OVERHEAD + Record.recordSize(key, value);
    }
        hasRoomFor()方法中,需要首先判断writable是否为true,writable为false的话会直接返回false,直接通知调用者MemoryRecords已无余地存储Record。writable为true的话,还需要判断compressor的已写入数据大小numRecordsWritten是否为0,为0 的话,根据MemoryRecords的最初的缓冲区的装载能力initialCapacity是否大于key、value、offset所占大小、size所占大小之和来确定是否尚有足够空间容纳一个Record,不为0 的话,则根据可写缓冲区的写限制writeLimit是否大于compressor预估算的大小与key、value、offset所占大小、size所占大小之和来确定是否尚有足够空间容纳一个Record。compressor预估算的大小通过如下方法来判断:

    public long estimatedBytesWritten() {
        if (type == CompressionType.NONE) {
            return bufferStream.buffer().position();
        } else {
            // estimate the written bytes to the underlying byte buffer based on uncompressed written bytes
            return (long) (writtenUncompressed * TYPE_TO_RATE[type.id] * COMPRESSION_RATE_ESTIMATION_FACTOR);
        }
    }
        







上一篇:十一、装饰器


下一篇:十、函数进阶