Kafka竟然也用二分搜索算法查找索引!(下)

改进版二分查找算法

显然不是!我前面说过了,大多数操作系统使用页缓存来实现内存映射,而目前几乎所有的操作系统都使用LRU(Least Recently Used)或类似于LRU的机制来管理页缓存。

Kafka写入索引文件的方式是在文件末尾追加写入,而几乎所有的索引查询都集中在索引的尾部。这么来看的话,LRU机制是非常适合Kafka的索引访问场景的。


但,这里有个问题是,当Kafka在查询索引的时候,原版的二分查找算法并没有考虑到缓存的问题,因此很可能会导致一些不必要的缺页中断(Page Fault)。此时,Kafka线程会被阻塞,等待对应的索引项从物理磁盘中读出并放入到页缓存中。


下面我举个例子来说明一下这个情况。假设Kafka的某个索引占用了操作系统页缓存13个页(Page),如果待查找的位移值位于最后一个页上,也就是Page 12,那么标准的二分查找算法会依次读取页号0、6、9、11和12,具体的推演流程如下所示:


通常来说,一个页上保存了成百上千的索引项数据。随着索引文件不断被写入,Page #12不断地被填充新的索引项。如果此时索引查询方都来自ISR副本或Lag很小的消费者,那么这些查询大多集中在对Page #12的查询,因此,Page #0、6、9、11、12一定经常性地被源码访问。也就是说,这些页一定保存在页缓存上。后面当新的索引项填满了Page #12,页缓存就会申请一个新的Page来保存索引项,即Page #13。


现在,最新索引项保存在Page #13中。如果要查找最新索引项,原版二分查找算法将会依次访问Page #0、7、10、12和13。此时,问题来了:Page 7和10已经很久没有被访问过了,它们大概率不在页缓存中,因此,一旦索引开始征用Page #13,就会发生Page Fault,等待那些冷页数据从磁盘中加载到页缓存。根据国外用户的测试,这种加载过程可能长达1秒。


显然,这是一个普遍的问题,即每当索引文件占用Page数发生变化时,就会强行变更二分查找的搜索路径,从而出现不在页缓存的冷数据必须要加载到页缓存的情形,而这种加载过程是非常耗时的。


基于这个问题,社区提出了改进版的二分查找策略,也就是缓存友好的搜索算法。总体的思路是,代码将所有索引项分成两个部分:热区(Warm Area)和冷区(Cold Area),然后分别在这两个区域内执行二分查找算法,如下图所示:


乍一看,该算法并没有什么高大上的改进,仅仅是把搜寻区域分成了冷、热两个区域,然后有条件地在不同区域执行普通的二分查找算法罢了。实际上,这个改进版算法提供了一个重要的保证:它能保证那些经常需要被访问的Page组合是固定的。


想想刚才的例子,同样是查询最热的那部分数据,一旦索引占用了更多的Page,要遍历的Page组合就会发生变化。这是导致性能下降的主要原因。


这个改进版算法的最大好处在于,查询最热那部分数据所遍历的Page永远是固定的,因此大概率在页缓存中,从而避免无意义的Page Fault。

实际代码

请先了解冷区热区分割原理。

   private def indexSlotRangeFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchEntity): (Int, Int) = {
        // 第1步:如果索引为空,直接返回<-1,-1>对
        if(_entries == 0)
          return (-1, -1)
    
    
        // 封装原版的二分查找算法
        def binarySearch(begin: Int, end: Int) : (Int, Int) = {
          // binary search for the entry
          var lo = begin
          var hi = end
          while(lo < hi) {
            val mid = (lo + hi + 1) >>> 1
            val found = parseEntry(idx, mid)
            val compareResult = compareIndexEntry(found, target, searchEntity)
            if(compareResult > 0)
              hi = mid - 1
            else if(compareResult < 0)
              lo = mid
            else
              return (mid, mid)
          }
          (lo, if (lo == _entries - 1) -1 else lo + 1)
        }
    
    
        // 第3步:确认热区首个索引项位于哪个槽。_warmEntries就是所谓的分割线,目前固定为8192字节处
        // 如果是OffsetIndex,_warmEntries = 8192 / 8 = 1024,即第1024个槽
        // 如果是TimeIndex,_warmEntries = 8192 / 12 = 682,即第682个槽
        val firstHotEntry = Math.max(0, _entries - 1 - _warmEntries)
        // 第4步:判断target位移值在热区还是冷区
        if(compareIndexEntry(parseEntry(idx, firstHotEntry), target, searchEntity) < 0) {
          return binarySearch(firstHotEntry, _entries - 1) // 如果在热区,搜索热区
        }
    
    
        // 第5步:确保target位移值不能小于当前最小位移值
        if(compareIndexEntry(parseEntry(idx, 0), target, searchEntity) > 0)
          return (-1, 0)
    
    
        // 第6步:如果在冷区,搜索冷区
        binarySearch(0, firstHotEntry)

为何是 8192?

就像源码注释里面写的那样,8192这个数字不大不小正合适。所谓不大不小是指它并不是太小,它足以确保大多数lagging很小的follower或consumer都只在热区查询;同时它也不会太大,对于主流4KB大小的page size而言, 热区大约也就只占用2~3个页面。

总结

AbstractIndex是Kafka所有类型索引的抽象父类,里面的mmap变量是实现索引机制的核心,你一定要掌握它。

改进版二分查找算法:社区在标准原版的基础上,对二分查找算法根据实际访问场景做了定制化的改进。你需要特别关注改进版在提升缓存性能方面做了哪些努力。改进版能够有效地提升页缓存的使用率,从而在整体上降低物理I/O,缓解系统负载瓶颈。你最好能够从索引这个维度去思考社区在这方面所做的工作。


实际上,无论是AbstractIndex还是它使用的二分查找算法,它们都属于Kafka索引共性的东西,即所有Kafka索引都具备这些特点或特性。


上一篇:oracle 函数 实现简单的加密解密


下一篇:什么是 "署名-非商业性使用-相同方式共享"