MemTable(db/memtable.h db/memtable.cc db/skiplist.h)
LevelDB中存储在内存中的那部分KV数据都存储在memtable中,而memtable中的数据实际是用跳表来存储的。MemTable使用Arena进行内存管理,并提供了添加、查找、迭代器的接口,而实际上这些接口都是调用SkipList的添加和迭代器接口来实现的。
MemTable类
MemTable类的声明:
class MemTable
{
public:
// ...
// Increase reference count.
void Ref() { ++refs_; }
// Drop reference count. Delete if no more references exist.
void Unref()
{
--refs_;
assert(refs_ >= 0);
if (refs_ <= 0)
{
delete this;
}
}
// ...
// Return an iterator that yields the contents of the memtable.
//
// The caller must ensure that the underlying MemTable remains live
// while the returned iterator is live. The keys returned by this
// iterator are internal keys encoded by AppendInternalKey in the
// db/format.{h,cc} module.
Iterator *NewIterator();
// Add an entry into memtable that maps key to value at the
// specified sequence number and with the specified type.
// Typically value will be empty if type==kTypeDeletion.
void Add(SequenceNumber seq, ValueType type,
const Slice &key,
const Slice &value);
// If memtable contains a value for key, store it in *value and return true.
// If memtable contains a deletion for key, store a NotFound() error
// in *status and return true.
// Else, return false.
bool Get(const LookupKey &key, std::string *value, Status *s);
private:
// ...
struct KeyComparator
{
const InternalKeyComparator comparator;
explicit KeyComparator(const InternalKeyComparator &c) : comparator(c) {}
int operator()(const char *a, const char *b) const;
};
friend class MemTableIterator;
friend class MemTableBackwardIterator;
typedef SkipList<const char *, KeyComparator> Table;
KeyComparator comparator_;
int refs_;
Arena arena_;
Table table_;
// ...
};
MemTable的成员变量
table_变量即为实际存储KV数据的跳表,arena_用于内存管理,refs_用于引用计数,comparator_用于key之间的比较。
其中最简单的是ref_,调用Ref()函数则增加引用计数,调用Unref()函数减少引用次数,并且当引用次数为0时,Unref()函数会将对象自身delete。
Arena类
然后是arena_。Arena类为:
class Arena
{
public:
// ...
// Return a pointer to a newly allocated memory block of "bytes" bytes.
char *Allocate(size_t bytes);
// Allocate memory with the normal alignment guarantees provided by malloc
char *AllocateAligned(size_t bytes);
// ...
private:
char *AllocateFallback(size_t bytes);
char *AllocateNewBlock(size_t block_bytes);
// Allocation state
char *alloc_ptr_;
size_t alloc_bytes_remaining_;
// Array of new[] allocated memory blocks
std::vector<char *> blocks_;
// Total memory usage of the arena.
port::AtomicPointer memory_usage_;
// ...
};
alloc_ptr_指向当前可以被分配的内存起始位置(在某一个block中),alloc_bytes_remaining_为当前可以被分配的内存的大小,blocks_中包含了指向每一个block的指针,memory_usage_为使用的内存总量,并且使用了AtomicPointer,可以通过内存屏障或者c++的atomic接口实现原子操作。
Arena类提供的方法主要有两个,一个是Allocate(size_t bytes):
inline char *Arena::Allocate(size_t bytes)
char *Arena::AllocateFallback(size_t bytes)
char *Arena::AllocateNewBlock(size_t block_bytes)
{
char *result = new char[block_bytes];
blocks_.push_back(result);
memory_usage_.NoBarrier_Store(
reinterpret_cast<void *>(MemoryUsage() + block_bytes + sizeof(char *)));
return result;
}
Allocate(size_t bytes)函数分配的内存不保证对齐。首先检查当前可以被分配的内存空间是否足够,如果够的话,直接将从alloc_ptr_指向的位置开始的bytes个字节分配给调用者,并移动alloc_ptr_指向新的位置,调整alloc_bytes_remaining_:
// The semantics of what to return are a bit messy if we allow
// 0-byte allocations, so we disallow them here (we don't need
// them for our internal use).
assert(bytes > 0);
if (bytes <= alloc_bytes_remaining_)
{
char *result = alloc_ptr_;
alloc_ptr_ += bytes;
alloc_bytes_remaining_ -= bytes;
return result;
}
如果不够,就调用AllocateFallback(size_t bytes)函数:
return AllocateFallback(bytes);
AllocateFallback(size_t bytes)函数首先检查bytes大小是否超过1/4个block,如果超过,则直接调用AllocateNewBlock(size_t block_bytes)函数分配一个新的block给调用者,并且这个新的block中除了当前的bytes个字节,其余部分将不会被使用,而alloc_ptr_的指向却不变,也就是说当前可以被分配的内存保持不变,因为此时剩余空间至少为1/4个block,这样设计就可以减少大块的内存碎片:
if (bytes > kBlockSize / 4)
{
// Object is more than a quarter of our block size. Allocate it separately
// to avoid wasting too much space in leftover bytes.
char *result = AllocateNewBlock(bytes);
return result;
}
如果没有超过1/4个block,就分配一个新的block,并且将alloc_ptr_指向新的block中的剩余空间:
// We waste the remaining space in the current block.
alloc_ptr_ = AllocateNewBlock(kBlockSize);
alloc_bytes_remaining_ = kBlockSize;
char *result = alloc_ptr_;
alloc_ptr_ += bytes;
alloc_bytes_remaining_ -= bytes;
return result;
还有一个内存分配函数是AllocateAligned(size_t bytes):
char *Arena::AllocateAligned(size_t bytes)
这个函数保证内存对齐。首先,以下代码会调整alloc_ptr_的位置使其对齐:
const int align = (sizeof(void *) > 8) ? sizeof(void *) : 8;
assert((align & (align - 1)) == 0); // Pointer size should be a power of 2
size_t current_mod = reinterpret_cast<uintptr_t>(alloc_ptr_) & (align - 1);
size_t slop = (current_mod == 0 ? 0 : align - current_mod);
size_t needed = bytes + slop;
char *result;
剩下的代码和Allocate(size_t bytes)中实现一致:
if (needed <= alloc_bytes_remaining_)
{
result = alloc_ptr_ + slop;
alloc_ptr_ += needed;
alloc_bytes_remaining_ -= needed;
}
else
{
// AllocateFallback always returned aligned memory
result = AllocateFallback(bytes);
}
assert((reinterpret_cast<uintptr_t>(result) & (align - 1)) == 0);
return result;
SkipList类
最后是最重要的跳表,跳表由SkipList类实现:
template <typename Key, class Comparator>
class SkipList
{
private:
struct Node;
public:
// ...
// Insert key into the list.
// REQUIRES: nothing that compares equal to key is currently in the list.
void Insert(const Key &key);
// Returns true iff an entry that compares equal to key is in the list.
bool Contains(const Key &key) const;
// Iteration over the contents of a skip list
class Iterator
{
// ...
};
private:
// ...
// Immutable after construction
Comparator const compare_;
Arena *const arena_; // Arena used for allocations of nodes
Node *const head_;
// Modified only by Insert(). Read racily by readers, but stale
// values are ok.
port::AtomicPointer max_height_; // Height of the entire list
// ...
Node *NewNode(const Key &key, int height);
// ...
// Return true if key is greater than the data stored in "n"
bool KeyIsAfterNode(const Key &key, Node *n) const;
// Return the earliest node that comes at or after key.
// Return nullptr if there is no such node.
//
// If prev is non-null, fills prev[level] with pointer to previous
// node at "level" for every level in [0..max_height_-1].
Node *FindGreaterOrEqual(const Key &key, Node **prev) const;
// Return the latest node with a key < key.
// Return head_ if there is no such node.
Node *FindLessThan(const Key &key) const;
// Return the last node in the list.
// Return head_ if list is empty.
Node *FindLast() const;
// ...
};
跳表中主要有四个成员,一个是compare_用于key的比较,一个是arena_用于内存管理,一个是head_指向跳表开头的节点,最后一个是max_height_为跳表最大的高度。
当然跳表中还需要两个类和结构,一个是Iterator类,用于跳表的遍历,另一个是Node结构,用于表示跳表中的节点。
Node结构
首先是Node结构:
template <typename Key, class Comparator>
struct SkipList<Key, Comparator>::Node
{
// ...
Key const key;
// Accessors/mutators for links. Wrapped in methods so we can
// add the appropriate barriers as necessary.
Node *Next(int n)
// ...
void SetNext(int n, Node *x)
// ...
// No-barrier variants that can be safely used in a few locations.
Node *NoBarrier_Next(int n)
// ...
void NoBarrier_SetNext(int n, Node *x)
// ...
private:
// Array of length equal to the node height. next_[0] is lowest level link.
port::AtomicPointer next_[1];
};
Node中key存储了节点中实际存储的KV值,next_为一个变长数组(struct最后一个元素如果是数组,则长度可变),用于存储该节点在不同高度的链表上指向下一个节点的指针。
创建Node使用了一个很神奇的函数:
template <typename Key, class Comparator>
typename SkipList<Key, Comparator>::Node *
SkipList<Key, Comparator>::NewNode(const Key &key, int height)
{
char *mem = arena_->AllocateAligned(
sizeof(Node) + sizeof(port::AtomicPointer) * (height - 1));
return new (mem) Node(key);
}
这个函数首先分配了一块对齐的内存,然后通过new (mem) Node(key)
这个语句创建一个节点,我也不是很懂。
Iterator类
其次是iterator类:
// Iteration over the contents of a skip list
class Iterator
{
public:
// ...
// Returns true iff the iterator is positioned at a valid node.
bool Valid() const;
// Returns the key at the current position.
// REQUIRES: Valid()
const Key &key() const;
// Advances to the next position.
// REQUIRES: Valid()
void Next();
// Advances to the previous position.
// REQUIRES: Valid()
void Prev();
// Advance to the first entry with a key >= target
void Seek(const Key &target);
// Position at the first entry in list.
// Final state of iterator is Valid() iff list is not empty.
void SeekToFirst();
// Position at the last entry in list.
// Final state of iterator is Valid() iff list is not empty.
void SeekToLast();
private:
const SkipList *list_;
Node *node_;
// ...
};
这个类主要有两个成员,一个list_指向迭代的跳表,node_指向当前迭代的节点。
对跳表的遍历操作需要获取一个Iterator类型的迭代器,然后通过调用Iterator类提供的接口函数对跳表进行遍历。而Iterator类提供的接口函数实际上通过调用SkipList类提供的私有接口函数实现。
SkipList类的私有接口函数如下:
template <typename Key, class Comparator>
typename SkipList<Key, Comparator>::Node *SkipList<Key, Comparator>::FindGreaterOrEqual(const Key &key, Node **prev)
const
{
Node *x = head_;
int level = GetMaxHeight() - 1;
while (true)
{
Node *next = x->Next(level);
if (KeyIsAfterNode(key, next))
{
// Keep searching in this list
x = next;
}
else
{
if (prev != nullptr)
prev[level] = x;
if (level == 0)
{
return next;
}
else
{
// Switch to next list
level--;
}
}
}
}
template <typename Key, class Comparator>
typename SkipList<Key, Comparator>::Node *
SkipList<Key, Comparator>::FindLessThan(const Key &key) const
{
Node *x = head_;
int level = GetMaxHeight() - 1;
while (true)
{
assert(x == head_ || compare_(x->key, key) < 0);
Node *next = x->Next(level);
if (next == nullptr || compare_(next->key, key) >= 0)
{
if (level == 0)
{
return x;
}
else
{
// Switch to next list
level--;
}
}
else
{
x = next;
}
}
}
template <typename Key, class Comparator>
typename SkipList<Key, Comparator>::Node *SkipList<Key, Comparator>::FindLast()
const
{
Node *x = head_;
int level = GetMaxHeight() - 1;
while (true)
{
Node *next = x->Next(level);
if (next == nullptr)
{
if (level == 0)
{
return x;
}
else
{
// Switch to next list
level--;
}
}
else
{
x = next;
}
}
}
这三个函数的实现比较简单,类似于链表的遍历,就不做具体分析了。
另外,跳表还有两个接口函数用于插入和检查是否包含某个key,这两个函数的实现也使用了跳表的私有接口函数:
template <typename Key, class Comparator>
void SkipList<Key, Comparator>::Insert(const Key &key)
template <typename Key, class Comparator>
bool SkipList<Key, Comparator>::Contains(const Key &key) const
MemTable成员函数
分析完以上这些,我们开始分析MemTable提供的接口函数:
void MemTable::Add(SequenceNumber s, ValueType type,
const Slice &key,
const Slice &value)
{
// Format of an entry is concatenation of:
// key_size : varint32 of internal_key.size()
// key bytes : char[internal_key.size()]
// value_size : varint32 of value.size()
// value bytes : char[value.size()]
size_t key_size = key.size();
size_t val_size = value.size();
size_t internal_key_size = key_size + 8;
const size_t encoded_len =
VarintLength(internal_key_size) + internal_key_size +
VarintLength(val_size) + val_size;
char *buf = arena_.Allocate(encoded_len);
char *p = EncodeVarint32(buf, internal_key_size);
memcpy(p, key.data(), key_size);
p += key_size;
EncodeFixed64(p, (s << 8) | type);
p += 8;
p = EncodeVarint32(p, val_size);
memcpy(p, value.data(), val_size);
assert(p + val_size == buf + encoded_len);
table_.Insert(buf);
}
Add函数将key和value组成key_size | sequencenumber | type | key | value_size | value这样的形式,然后调用SkipList类提供的Insert函数插入跳表中。
bool MemTable::Get(const LookupKey &key, std::string *value, Status *s)
Get函数先用跳表的迭代器找到包含key的节点:
Slice memkey = key.memtable_key();
Table::Iterator iter(&table_);
iter.Seek(memkey.data());
然后将节点中的数据解码后封装为Slice存入value:
if (iter.Valid())
{
// entry format is:
// klength varint32
// userkey char[klength]
// tag uint64
// vlength varint32
// value char[vlength]
// Check that it belongs to same user key. We do not check the
// sequence number since the Seek() call above should have skipped
// all entries with overly large sequence numbers.
const char *entry = iter.key();
uint32_t key_length;
const char *key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
if (comparator_.comparator.user_comparator()->Compare(
Slice(key_ptr, key_length - 8),
key.user_key()) == 0)
{
// Correct user key
const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
switch (static_cast<ValueType>(tag & 0xff))
{
case kTypeValue:
{
Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
value->assign(v.data(), v.size());
return true;
}
case kTypeDeletion:
*s = Status::NotFound(Slice());
return true;
}
}
}
return false;
MemTableIterator类
MemTable还封装了一个MemTableIterator类用于MemTable的迭代:
class MemTableIterator : public Iterator
{
public:
explicit MemTableIterator(MemTable::Table *table) : iter_(table) {}
virtual bool Valid() const { return iter_.Valid(); }
virtual void Seek(const Slice &k) { iter_.Seek(EncodeKey(&tmp_, k)); }
virtual void SeekToFirst() { iter_.SeekToFirst(); }
virtual void SeekToLast() { iter_.SeekToLast(); }
virtual void Next() { iter_.Next(); }
virtual void Prev() { iter_.Prev(); }
virtual Slice key() const { return GetLengthPrefixedSlice(iter_.key()); }
virtual Slice value() const
{
Slice key_slice = GetLengthPrefixedSlice(iter_.key());
return GetLengthPrefixedSlice(key_slice.data() + key_slice.size());
}
virtual Status status() const { return Status::OK(); }
private:
MemTable::Table::Iterator iter_;
std::string tmp_; // For passing to EncodeKey
// No copying allowed
MemTableIterator(const MemTableIterator &);
void operator=(const MemTableIterator &);
};
遍历MemTable就需要获得一个MemTableIterator类的迭代器,然后通过调用MemTableIterator类提供的接口函数进行遍历。MemTableIterator类其实就是实现了对跳表迭代器的封装,成员为一个跳表的迭代器iter_,接口函数通过调用跳表迭代器的接口函数实现。
227 Love u