写入索引项
下面这段代码是OffsetIndex的append方法,用于向索引文件中写入新索引项。
def append(offset: Long, position: Int): Unit = { inLock(lock) { // 第1步:判断索引文件未写满 require(!isFull, "Attempt to append to a full index (size = " + _entries + ").") // 第2步:必须满足以下条件之一才允许写入索引项: // 条件1:当前索引文件为空 // 条件2:要写入的位移大于当前所有已写入的索引项的位移——Kafka规定索引项中的位移值必须是单调增加的 if (_entries == 0 || offset > _lastOffset) { trace(s"Adding index entry $offset => $position to ${file.getAbsolutePath}") mmap.putInt(relativeOffset(offset)) // 第3步A:向mmap中写入相对位移值 mmap.putInt(position) // 第3步B:向mmap中写入物理位置信息 // 第4步:更新其他元数据统计信息,如当前索引项计数器_entries和当前索引项最新位移值_lastOffset _entries += 1 _lastOffset = offset // 第5步:执行校验。写入的索引项格式必须符合要求,即索引项个数*单个索引项占用字节数匹配当前文件物理大小,否则说明文件已损坏 require(_entries * entrySize == mmap.position(), entries + " entries but file position in index is " + mmap.position() + ".") } else { // 如果第2步中两个条件都不满足,不能执行写入索引项操作,抛出异常 throw new InvalidOffsetException(s"Attempt to append an offset ($offset) to position $entries no larger than" + s" the last offset appended (${_lastOffset}) to ${file.getAbsolutePath}.") } } }
append方法的执行流程
查找索引项
索引项的写入逻辑并不复杂,难点在于如何查找索引项。AbstractIndex定义了抽象方法parseEntry用于查找给定的索引项,如下所示:
protected def parseEntry(buffer: ByteBuffer, n: Int): IndexEntry
“n”表示要查找给定ByteBuffer中保存的第n个索引项(在Kafka中也称第n个槽)。IndexEntry是源码定义的一个接口,里面有两个方法:indexKey和indexValue,分别返回不同类型索引的<Key,Value>对。
OffsetIndex实现parseEntry的逻辑如下:
override protected def parseEntry(buffer: ByteBuffer, n: Int): OffsetPosition = { OffsetPosition(baseOffset + relativeOffset(buffer, n), physical(buffer, n)) }
OffsetPosition是实现IndexEntry的实现类,Key就是之前说的位移值,而Value就是物理磁盘位置值。所以,这里你能看到代码调用了relativeOffset(buffer, n) + baseOffset计算出绝对位移值,之后调用physical(buffer, n)计算物理磁盘位置,最后将它们封装到一起作为一个独立的索引项返回。
我建议你去看下relativeOffset和physical方法的实现,看看它们是如何计算相对位移值和物理磁盘位置信息的。
有了parseEntry方法,我们就能够根据给定的n来查找索引项了。但是,这里还有个问题需要解决,那就是,我们如何确定要找的索引项在第n个槽中呢?其实本质上,这是一个算法问题,也就是如何从一组已排序的数中快速定位符合条件的那个数。
二分查找算法
到目前为止,从已排序数组中寻找某个数字最快速的算法就是二分查找了,它能做到O(lgN)的时间复杂度。Kafka的索引组件就应用了二分查找算法。
private def indexSlotRangeFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchEntity): (Int, Int) = { // 第1步:如果当前索引为空,直接返回<-1,-1>对 if(_entries == 0) return (-1, -1) // 第2步:要查找的位移值不能小于当前最小位移值 if(compareIndexEntry(parseEntry(idx, 0), target, searchEntity) > 0) return (-1, 0) // binary search for the entry // 第3步:执行二分查找算法 var lo = 0 var hi = _entries - 1 while(lo < hi) { val mid = ceil(hi/2.0 + lo/2.0).toInt val found = parseEntry(idx, mid) val compareResult = compareIndexEntry(found, target, searchEntity) if(compareResult > 0) hi = mid - 1 else if(compareResult < 0) lo = mid else return (mid, mid) } (lo, if (lo == _entries - 1) -1 else lo + 1)
Kafka索引应用二分查找算法快速定位待查找索引项位置,之后调用parseEntry来读取索引项。不过,这真的就是无懈可击的解决方案了吗?