Kafka竟然也用二分搜索算法查找索引!(中)

写入索引项

下面这段代码是OffsetIndex的append方法,用于向索引文件中写入新索引项。

 def append(offset: Long, position: Int): Unit = {
        inLock(lock) {
          // 第1步:判断索引文件未写满
          require(!isFull, "Attempt to append to a full index (size = " + _entries + ").")
          // 第2步:必须满足以下条件之一才允许写入索引项:
          // 条件1:当前索引文件为空
          // 条件2:要写入的位移大于当前所有已写入的索引项的位移——Kafka规定索引项中的位移值必须是单调增加的
          if (_entries == 0 || offset > _lastOffset) {
            trace(s"Adding index entry $offset => $position to ${file.getAbsolutePath}")
            mmap.putInt(relativeOffset(offset)) // 第3步A:向mmap中写入相对位移值
            mmap.putInt(position) // 第3步B:向mmap中写入物理位置信息
            // 第4步:更新其他元数据统计信息,如当前索引项计数器_entries和当前索引项最新位移值_lastOffset
            _entries += 1
            _lastOffset = offset
            // 第5步:执行校验。写入的索引项格式必须符合要求,即索引项个数*单个索引项占用字节数匹配当前文件物理大小,否则说明文件已损坏
            require(_entries * entrySize == mmap.position(), entries + " entries but file position in index is " + mmap.position() + ".")
          } else {
            // 如果第2步中两个条件都不满足,不能执行写入索引项操作,抛出异常
            throw new InvalidOffsetException(s"Attempt to append an offset ($offset) to position $entries no larger than" +
              s" the last offset appended (${_lastOffset}) to ${file.getAbsolutePath}.")
          }
        }
      }

append方法的执行流程

Kafka竟然也用二分搜索算法查找索引!(中)

查找索引项

索引项的写入逻辑并不复杂,难点在于如何查找索引项。AbstractIndex定义了抽象方法parseEntry用于查找给定的索引项,如下所示:

protected def parseEntry(buffer: ByteBuffer, n: Int): IndexEntry

“n”表示要查找给定ByteBuffer中保存的第n个索引项(在Kafka中也称第n个槽)。IndexEntry是源码定义的一个接口,里面有两个方法:indexKey和indexValue,分别返回不同类型索引的<Key,Value>对。


OffsetIndex实现parseEntry的逻辑如下:

   override protected def parseEntry(buffer: ByteBuffer, n: Int): OffsetPosition = {
        OffsetPosition(baseOffset + relativeOffset(buffer, n), physical(buffer, n))
      }

OffsetPosition是实现IndexEntry的实现类,Key就是之前说的位移值,而Value就是物理磁盘位置值。所以,这里你能看到代码调用了relativeOffset(buffer, n) + baseOffset计算出绝对位移值,之后调用physical(buffer, n)计算物理磁盘位置,最后将它们封装到一起作为一个独立的索引项返回。


我建议你去看下relativeOffset和physical方法的实现,看看它们是如何计算相对位移值和物理磁盘位置信息的。


有了parseEntry方法,我们就能够根据给定的n来查找索引项了。但是,这里还有个问题需要解决,那就是,我们如何确定要找的索引项在第n个槽中呢?其实本质上,这是一个算法问题,也就是如何从一组已排序的数中快速定位符合条件的那个数。

二分查找算法

到目前为止,从已排序数组中寻找某个数字最快速的算法就是二分查找了,它能做到O(lgN)的时间复杂度。Kafka的索引组件就应用了二分查找算法。

private def indexSlotRangeFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchEntity): (Int, Int) = {
        // 第1步:如果当前索引为空,直接返回<-1,-1>对
        if(_entries == 0)
          return (-1, -1)
    
    
        // 第2步:要查找的位移值不能小于当前最小位移值
        if(compareIndexEntry(parseEntry(idx, 0), target, searchEntity) > 0)
          return (-1, 0)
    
    
        // binary search for the entry
        // 第3步:执行二分查找算法
        var lo = 0
        var hi = _entries - 1
        while(lo < hi) {
          val mid = ceil(hi/2.0 + lo/2.0).toInt
          val found = parseEntry(idx, mid)
          val compareResult = compareIndexEntry(found, target, searchEntity)
          if(compareResult > 0)
            hi = mid - 1
          else if(compareResult < 0)
            lo = mid
          else
            return (mid, mid)
        }
    
    
        (lo, if (lo == _entries - 1) -1 else lo + 1)

Kafka索引应用二分查找算法快速定位待查找索引项位置,之后调用parseEntry来读取索引项。不过,这真的就是无懈可击的解决方案了吗?

上一篇:[原创]Fluent NHibernate之旅(四)-- 关系(中)


下一篇:== 和 equals 的区别