tair是一个分布式KV存储引擎,当新增机器或者有机器down掉的时候,tair的dataserver会根据ConfigServer生成的新的对照表进行数据的迁移和清理。在数据清理的过程中就用到了在tair中新增的Compaction方式——CompactRangeSelfLevel,顾名思义,这个CompactRangeSelfLevel就是对自己所在(指定)的Level进行一定Key范围的Compaction然后将生成的输出文件也写入到自己所在的Level而不是父层(L + 1)。下面我们来对这个CompactRangeSelfLevel进行分析。
// compact filenumber 小于limit的key 在 [begin, end)范围内的 sstable // compact的时候只有level 0 SSTable会输出到 level 1, 其他的level都是输入输出在同一个level Status DBImpl::CompactRangeSelfLevel( uint64_t limit_filenumber, const Slice* begin, const Slice* end) { // 初始化一个MannualCompaction对象 manual.limit_filenumber = limit_filenumber; manual.bg_compaction_func = &DBImpl::BackgroundCompactionSelfLevel; // use TimedCond() 防止丢失唤醒信号 // 每个层级逐次schedule manualcompaction for (int level = 0; level < config::kNumLevels && manual.compaction_status.ok(); ++level) { ManualCompaction each_manual = manual; each_manual.level = level; while (each_manual.compaction_status.ok() && !each_manual.done) { // still have compaction running 就等待 while (bg_compaction_scheduled_) { bg_cv_.TimedWait(timed_us); } manual_compaction_ = &each_manual; MaybeScheduleCompaction(); while (manual_compaction_ == &each_manual) { bg_cv_.TimedWait(timed_us); } } manual.compaction_status = each_manual.compaction_status; } return manual.compaction_status; }
MaybeScheduleCompaction主要是判断是否新启动一个后台的Compaction线程,主要是以是否有Compaction的任务和是否已经有Compaction线程已经在运行为依据,这个函数已经在前面专门解释Compaction的文章中进行了分析,这里不再介绍。我们详细分析一下真正进行工作的BackgroundCompactionSelfLevel函数
void DBImpl::BackgroundCompactionSelfLevel() { do { // level-0 不对 filenumber 进行限制 /* CompactRangeOneLevel故名思议将该level中的所有符合filenumber 小于limit key 在 [begin, end)范围内的 sstable找出来 */ c = versions_-> CompactRangeOneLevel(m->level, m->level > 0 ? m->limit_filenumber : ~(static_cast<uint64_t>(0)), m->begin, m->end); if (NULL == c) { // no compact for this level m->done = true; // done all. break; } // 记录下manual Compaction的结束key manual_end = c->input(0, c->num_input_files(0) - 1)->largest; CompactionState* compact = new CompactionState(c); status = DoCompactionWorkSelfLevel(compact); // 真正进行Compaction的函数 CleanupCompaction(compact); c->ReleaseInputs(); DeleteObsoleteFiles(); delete c; if (shutting_down_.Acquire_Load()) { // Ignore compaction errors found during shutting down } else if (!status.ok()) { m->compaction_status = status; // save error if (bg_error_.ok()) { // no matter paranoid_checks bg_error_ = status; } break; // exit once fail. } } while (false); if (!m->done) { // We only compacted part of the requested range. Update *m // to the range that is left to be compacted. m->tmp_storage = manual_end; m->begin = &m->tmp_storage; } // Mark it as done manual_compaction_ = NULL; }
这里DoCompactionWorkSelfLevel是真正进行KV读取和Compaction的地方,然而我们这里并不打算对其进行详细的分析,因为通过对比我们知道其主题过程和DoCompactionWork相同,只是在一些细微的判断方式和处理方式上稍微有所不同。具体的DoCompactionWork的过程请参考《leveldb源码分析--SSTable之Compaction》,我们下面通过对比 差异之处的方式让大家理解DoCompactionWorkSelfLevel的实际处理过程。
DoCompactionWorkSelfLevel 和 DoCompactionWork基本相同,只是流程上少了几个判断:
1. DoCompactionWorkSelfLevel 遍历到key以后不需要进行ShouldStopBefore的判断,因为这个是判断是否跟L + 2层有过多的重叠,这里selflevel是输出到当前层,所以必然不会影响跟L+ 2层的重叠情况;
Slice key = input->key(); // if (compact->compaction->ShouldStopBefore(key) && // compact->builder != NULL) { // status = FinishCompactionOutputFile(compact, input); // if (!status.ok()) { // break; // } // } // 注释掉的地方即是CompactionWorkSelfLevel减少的部分
2. 是否drop的时候少了seq<= smallest_snapshot && (type == Deletion || ShouldDrop) && IsBaseLevelForKey(ikey)) 为drop,也是因为当前层的Compaction,而IsBaseLevelForKey是判断的L + 2层以上有无该key的相关值,这里如果要加上判断就应该是将 L+1 层也包含在判断范围内。
} else if (ikey.sequence <= compact->smallest_snapshot && (ikey.type == kTypeDeletion || // deleted or .. user_comparator()->ShouldDropMaybe(ikey.user_key.data(), ikey.sequence, expired_end_time)) && // .. user-defined should drop(maybe), // based on some condition(eg. this key only has this update.). compact->compaction->IsBaseLevelForKey(ikey.user_key)) { // For this user key: // (1) there is no data in higher levels // (2) data in lower levels will have larger sequence numbers // (3) data in layers that are being compacted here and have // smaller sequence numbers will be dropped in the next // few iterations of this loop (by rule (A) above). // Therefore this deletion marker is obsolete and can be dropped. drop = true; }
3. 在InstallCompactionResults 时第二个参数传入的是false,这样这个函数将新生成的SSTable放入当前层而不是L+ 1层
status = InstallCompactionResults(compact); // 修改为: status = InstallCompactionResults(compact, false); // output files is in current level, not level + 1
另外这里顺便提一下tair中的leveldb是对google开源的leveldb也有了一定的修改,比如添加expire等特性,在tair中comparator就添加了三个接口函数。而在tair中实现了两个这样的comparator分别是NumericalComparatorImpl和BitcmpLdbComparatorImpl,我们这里以BitcmpLdbComparatorImpl为例进行一下简单的介绍其功能。
// 判断这个key是否在需要回收的bucket中,如果是就返回true,那么Compaction的时候直接删除(即回收掉) virtual bool ShouldDrop(const char* key, int64_t sequence, uint32_t now = 0) const { return false;} // 根据expire_time判断这个key是否已经过期,如过期则返回true virtual bool ShouldDropMaybe(const char* key, int64_t sequence, uint32_t now = 0) const { return false;} // start_key和key是否依旧属于同一个bucket,是的放回false,否则返回true virtual bool ShouldStopBefore(const Slice& start_key, const Slice& key) const { return false;}
有了这三个函数以后tair的ldb引擎就可以在Compaction的时候对key进行回收和判断是否写入同一个SSTable中,比如最直接的Compaction的时候如果ShouldDrop返回true那么直接标记这个key为drop不写入到新的SSTable中;而ShouldStopBefore则被用在是否生成新的SSTable文件,如果返回true则结束当前文件的写入生成下一个SSTable,这样就可以将不同的bucket写入到不同的SSTable文件中了。