本打算接下来分析version相关的概念,但是在准备的过程中看到了VersionSet的table_cache_这个变量才想起还有这样一个模块尚未分析,经过权衡觉得leveldb的version相对Cache来说相对复杂,而且version虽然对整个leveldb来说实现上跟其他功能十分紧密,但是从概念上来说却相对弱很多,有点感觉是附加的功能的感觉。所以从介绍系统首先应该注意的是整个系统概念的完整性的角度说还是先分析Cache相关的功能。
我们先来看Cache的基本框架结构数据:
struct LRUHandle { void* value; //cache的对象句柄,table的时候为table&file, block时为(table&file)_offset void (*deleter)(const Slice&, void* value); //回调函数 LRUHandle* next_hash; //hash表冲突解决指针 LRUHandle* next; //LRU双链表指针 LRUHandle* prev; //LRU双链表指针 size_t charge; // TODO(opt): Only allow uint32_t? size_t key_length; uint32_t refs; uint32_t hash; //根据key计算出的hash值,实现在hash.cc中 char key_data[1]; // encode后的file_num };
HandleTable是一个简单的hashtable的链式实现,其成员如下:
HandleTable{ uint32_t length_; uint32_t elems_; LRUHandle** list_; };
LRUCache包含了一个HashTable(HandleTable)和一个双向链表头及容量、使用情况、互斥锁,每当insert的时候会同时插入HashTable中和双向链表中。
LRUCache{
size_t capacity_;
port::Mutex mutex_;
size_t usage_;
LRUHandle lru_;
HandleTable table_;
};
LRUCache的与HandleTable和LRUHandle(途中绿色框)之间的基本关系图可以描述如下:
为了图形的简化,其中的地址和对象的关系未完整展现,lru_是对象而其他的绿色框都应该是表示的地址,大致图形便于理解,具体的关系请参阅源码。
ShardedLRUCache结构就更为简单,就是一个LRUCahce的数组,这样可以简单的集合封装多个LRUCache来达到分片降低锁的粒度增加并发度的目的。
在了解了基本关系之后要去理解其中的代码就十分简单了,这里不再一一列举,唯一要说明的是HandleTable 的hash自动增长模式,当HandleTable中的element 大于其hash数组大小的时候就对数组进行resize为最小的大于当前element个数的4的倍数,再将旧的element迁移到新的hash数组中。
if (elems_ > length_) Resize();
void Resize() { uint32_t new_length = 4; while (new_length < elems_) { new_length *= 2; } LRUHandle** new_list = new LRUHandle*[new_length]; //新建hash数组 for (uint32_t i = 0; i < length_; i++) { //将原始的element迁移至新hash数组 LRUHandle* h = list_[i]; while (h != NULL) { LRUHandle* next = h->next_hash; uint32_t hash = h->hash; LRUHandle** ptr = &new_list[hash & (new_length - 1)]; h->next_hash = *ptr; *ptr = h; h = next; count++; } } delete[] list_;//删除就数组、使用新表 list_ = new_list; length_ = new_length; } };
以上就是leveldb中关于LRUCache的基本结构了,但是leveldb在使用过程中又进行了一些变异和封装,如TableCache和DBImpl中的一个block_cache。我们这里先来对这两个概念进行一下梳理:由代码可以看出TableCache缓存的只是一个Table对象和RandomAccessFile对象的引用。而从Table::Open函数可以知道这个Table对象只保存了基本的管理信息(具体包括的内容前面的文章已经阐述过,请仔细查证),所以Table中的实际数据并未缓存其中。那么其中的实际数据在什么地方缓存呢?这里leveldb用到了另外的一个option中的ShardedLRUCache,当然这个cache由于是在option中那么说明其实可以改变的,你可以根据你的业务目标自行设计一个。到此你可能有点晕了,leveldb我要在cache中获取一个KV对的流程是怎么样的呢?这么设计的原因是什么呢?
第一个问题:leveldb中的Cache其实也是实现了分层,先cache一个SSTable的基本信息,而不是将整个SSTable的读到内存;再用一个cache缓存SSTable下一层级的实际的Block数据。那么获取数据的时候就得先根据基本信息获取到大致的SSTable,得到SSTable的句柄,再根据SSTable缓存中的基本信息获得应该去哪个block的信息,然后再根据block的句柄得到(SSTable句柄+Block句柄)去block缓存中获取实际的数据。
第二个问题:由于分层而不是将整个SSTable一次性的缓存到内存,那么得到的好久就显而易见了,可以减少内存的占用量。
我们来浏览一下Table_cache中各个函数的功能,首先看Get
Status TableCache::Get(const ReadOptions& options, uint64_t file_number, //file句柄 uint64_t file_size, const Slice& k, //查找的key void* arg, void (*saver)(void*, const Slice&, const Slice&)) { Status s = FindTable(file_number, file_size, &handle); //查找到Cache handle if (s.ok()) { Table* t = reinterpret_cast<TableAndFile*>(cache_->Value(handle))->table; s = t->InternalGet(options, k, arg, saver); cache_->Release(handle); } return s; }
再看FindTable:
Status TableCache::FindTable(uint64_t file_number, uint64_t file_size, Cache::Handle** handle) { EncodeFixed64(buf, file_number); //根据file_num组一个key Slice key(buf, sizeof(buf)); *handle = cache_->Lookup(key); //查找当前SSTable的信息是否已经在Table_cache中 if (*handle == NULL) { //如果不在则打开SSTable, std::string fname = TableFileName(dbname_, file_number); s = env_->NewRandomAccessFile(fname, &file);//尝试ldb后缀 if (!s.ok()) { std::string old_fname = SSTTableFileName(dbname_, file_number); //尝试sst后缀 if (env_->NewRandomAccessFile(old_fname, &file).ok()) { s = Status::OK(); } } if (s.ok()) { s = Table::Open(*options_, file, file_size, &table); //读取文件管理信息生成Table对象 } if (!s.ok()) { } else { //将内容缓存至TableCache中 TableAndFile* tf = new TableAndFile; tf->file = file; tf->table = table; *handle = cache_->Insert(key, tf, 1, &DeleteEntry); } } return s; }
再看Table的InternalGet,其功能是在SSTable中查找key
Status Table::InternalGet(const ReadOptions& options, const Slice& k, void* arg, void (*saver)(void*, const Slice&, const Slice&)) { Iterator* iiter = rep_->index_block->NewIterator(rep_->options.comparator); iiter->Seek(k); //查找key可能存在的block if (iiter->Valid()) { if (filter != NULL && handle.DecodeFrom(&handle_value).ok() && !filter->KeyMayMatch(handle.offset(), k)) { //根据bloomfilter判断是否在块中 // Not found } else { Iterator* block_iter = BlockReader(this, options, iiter->value()); block_iter->Seek(k); //读取block内容然后在block中查找 if (block_iter->Valid()) { (*saver)(arg, block_iter->key(), block_iter->value()); } s = block_iter->status(); delete block_iter; } } if (s.ok()) { s = iiter->status(); } delete iiter; return s; }
另外TableCache中还有一个NewIterator的函数,顾名思义他就是生成一个遍历Cache的SSTable的迭代器,他基本上也是简单的调用Table 的 NewIterator。leveldb的Iterator设计也比较精妙,比如NewTwoLevelIterator这个东西,我们稍后会有篇幅来介绍,这里你只需要知道他就是生成一个遍历SSTable的迭代器就可以了。
理清了基本数据关系,最后就该轮到介绍我们大家最关心的DB_Impl的Get函数了,由于DB_Impl是DB虚类的一个子类,所以用户调用DB的get的时候实际调用的是这个函数的实现。
Status DBImpl::Get(const ReadOptions& options, const Slice& key, std::string* value) { // 锁,Sequence,version,ref等一系列的一些设置 { //查找是否在memtable中,这是最新的数据 if (mem->Get(lkey, value, &s)) { // Done } else if (imm != NULL && imm->Get(lkey, value, &s)) { // 查找是否在imutable memtable中,这是次新的数据 } else { s = current->Get(options, lkey, value, &stats); //否则到当前版本中的SSTable中查找 have_stat_update = true; } } //其他一些处理,以及判断是否需要出发compaction return s; }
当前version中的查找:
Status Version::Get(const ReadOptions& options, const LookupKey& k, std::string* value, GetStats* stats) { // 从0级开始一级一级查找,0级最新,1级次新,依次更旧, //所以查找的时候不能跳跃级别 ,找到最新的数据以后旧的数据就不在需要了 for (int level = 0; level < config::kNumLevels; level++) { size_t num_files = files_[level].size(); if (num_files == 0) continue; //取得文件句柄 FileMetaData* const* files = &files_[level][0]; if (level == 0) { // Level-0 需要查找全部文件,因为level-0中可能存在重叠 for (uint32_t i = 0; i < num_files; i++) { FileMetaData* f = files[i]; if (ucmp->Compare(user_key, f->smallest.user_key()) >= 0 && ucmp->Compare(user_key, f->largest.user_key()) <= 0) { tmp.push_back(f); } } //按文件新旧程度排序,新的在最前面 std::sort(tmp.begin(), tmp.end(), NewestFirst); files = &tmp[0]; num_files = tmp.size(); } else { // 找到第一个 largest key >= ikey的文件(SSTable). uint32_t index = FindFile(vset_->icmp_, files_[level], ikey); tmp2 = files[index]; if (ucmp->Compare(user_key, tmp2->smallest.user_key()) < 0) { // 不在该文件中 files = NULL; num_files = 0; } else { files = &tmp2; num_files = 1; } } //找到具体的SSTable以后在该SSTable的缓存中进行具体查找 s = vset_->table_cache_->Get(options, f->number, f->file_size, ikey, &saver, SaveValue); switch (saver.state) { case kNotFound: break; // 继续查找,知道找出的所有文件都查找完 case kFound: return s; case kDeleted: s = Status::NotFound(Slice()); // 已经被删除,直接返回 return s; case kCorrupt: s = Status::Corruption("corrupted key for ", user_key); return s; } } } return Status::NotFound(Slice()); // Use an empty error message for speed }