分析完leveldb以后,接下来的时间准备队tair的源码进行阅读和分析。我们刚刚分析完了leveldb而在tair中leveldb是其几大存储引擎之一,所以我们这里首先从tair对leveldb的使用和修改来窥探在这个分布式的存储引擎中是如何将area和bucket持久化到存储,并且方便bucket和area的处理的。
我们首先来看tair中key的结构,我们以大致梳理存储和查询一个KV的流程来确认key的处理。tair_client_api::put直接调用tair_client_impl::put,impl中首先会对key和value(data)进行合法性判断,然后调用get_server_id取得存储当前key的server_id后将组装好的request_put发送到服务器端。服务端接收到消息后handlePacketQueue解码然后会调用对应的process函数,这里是 request_processor::process(request_put *, ……)这个函数。这个函数会进行一些plugin和是否存储在本地等一些处理和判断,我们这里只需关注其调用到了tair_manager::put,这个函数里面对可以进行了一定的处理
data_entry mkey = key; // key merged with area mkey.merge_area(area); int bucket_number = get_bucket_number(key); 而 void merge_area(int _area) { if(has_merged) { return; } if(size < 0) return; //now should check is alloc by me. I have 2 extra head. if(m_true_size==size+2) { m_true_data[0]=(_area & 0xFF); m_true_data[1]=((_area >> 8) & 0xFF); size=m_true_size; data=m_true_data; }else{ //一些重新分配空间然后和if分支类似的处理 } }
这里即将area编码后放到了key_entry的最前两位,接下来看LdbInstance::put,里面处理得到cdate、mdate、edate,然后获得一个
LdbKey ldb_key(key.get_data(), key.get_size(), bucket_number, edate); //再看LdbKey的处理 LdbKey(const char* key_data, int32_t key_size, int32_t bucket_number, uint32_t expired_time = 0) : data_(NULL), data_size_(0), alloc_(false) { set(key_data, key_size, bucket_number, expired_time); } inline void set(const char* key_data, int32_t key_size, int32_t bucket_number, uint32_t expired_time) { if (key_data != NULL && key_size > 0) { free(); data_size_ = key_size + LDB_KEY_META_SIZE; data_ = new char[data_size_]; build_key_meta(data_, bucket_number, expired_time); memcpy(data_ + LDB_KEY_META_SIZE, key_data, key_size); alloc_ = true; } }
static void build_key_meta(char* buf, int32_t bucket_number, uint32_t expired_time = 0) { encode_fixed32(buf, expired_time); encode_bucket_number(buf + LDB_EXPIRED_TIME_SIZE, bucket_number); }
static void encode_bucket_number(char* buf, int bucket_number) { for (int i = 0; i < LDB_KEY_BUCKET_NUM_SIZE; ++i)//LDB_KEY_BUCKET_NUM_SIZE = 3 { buf[LDB_KEY_BUCKET_NUM_SIZE - i - 1] = (bucket_number >> (i*8)) & 0xFF; } }
从代码可以看出,这次再往key的前面分别编码了4字节的expired_time和3字节的bucket_no,到此时整个可以的组成就变成了
expire(4B) | bucket_no(3B) | area (2B) | user_key
再往下可以看到会有db_version_care_和put的version_care的判断,这里不进行详细的描述,主要功能是如果设置的db_version_care_那么每次put的时候需要先查询是否已经有插入过数据将其取出来,在后在根据本次插入的version_care判断本次插入传入的version是否和当前数据库中存储的version是否相等,如果不等则本次插入失败,如果相等则可以插入(更新),并且将改key的version加1,进行完相应的这些设置以后就可以把对应的KV对插入leveldb中了。这里需要说明的是在tair的存储引擎中实现了一种纯内存存储的memdb,而tair的leveldb也对此进行了一些修改增加了一个db_cache的概念,这里你就可以选择将memdb作为leveldb的一个cache以加快查询的速度。
到此我们就将重新组装后的KV对插入到leveldb中了,然而你有没有发现有何问题呢?对,就是这里的key是以expire开头的,而我们回忆leveldb内部默认使用的Comparator是BytewiseComparator,如果是使用率这个Comparator的话leveldb内部就将首先按照expire的时间来排序。从数据过期的处理来说,按expire排序那么我们就可以很容易的将expire的key排出掉,但是我们知道leveldb是会进行迁移的,当我们需要将一个area或者一个bucket的数据迁移到另外的机器上时那么我们岂不是要扫描整个数据库?二期查找的时候我如何能够确定这个key的过期时间呢?如果不能确定就不能获取到相应的值了。别担心,我们的leveldb不是有Comparator这个东西么,使用默认的BytewiseComparator不能达到要求,那么tair自己实现了一个Comparator叫做LdbComparator的东西来满足我们的需要。我们看看其Compare的代码
int LdbComparatorImpl::Compare(const leveldb::Slice& a, const leveldb::Slice& b) const { // LDB_COMPARE_SKIP_SIZE = LDB_EXPIRED_TIME_SIZE = sizeof(uint32_t); assert(a.size() > LDB_COMPARE_SKIP_SIZE && b.size() > LDB_COMPARE_SKIP_SIZE); const int min_len = (a.size() < b.size()) ? a.size() - LDB_COMPARE_SKIP_SIZE : b.size() - LDB_COMPARE_SKIP_SIZE; int r = memcmp(a.data() + LDB_COMPARE_SKIP_SIZE, b.data() + LDB_COMPARE_SKIP_SIZE, min_len); if (r == 0) { if (a.size() < b.size()) { r = -1; } else if (a.size() > b.size()) { r = +1; } } return r; }
这里compare的时候首先跳过了4个字节再进行memcmp,而我们知道4个字节正是expire的长度。
所以我们最终得出的结论是tair在使用leveldb时对key进行了重构,传入重构后的key结构为
expire(4B) | bucket_no(3B) | area (2B) | user_key
而又使用了自己实现的Comparator跳过开始的expire4个字节,这样最后就得到了整个leveldb中存储的数据就先以bucket_no排序,然后是area,最后才是安装user_key,经过这样的处理后我们以bucket进行迁移时就只需安装bucket_no生成对应的区间扫描的key就可以了。
本文介绍leveldb的key存储结构,希望以此为起点理清tair内部存储然后再以此为脉络进一步分析tair的实现。