ssdb的高可用,源码分析

ssdb,一个高性能的支持丰富数据结构的 NoSQL 数据库, 用于替代 Redis。——这是其官网的自我介绍。

ssdb在leveldb存储库的基础上进行改造和丰富,添加了类似redis操作的接口,实现了数据的高可用。所以ssdb是基于leveldb实现了redis功能的nosql数据库,可以直接使用redis的客户端访问ssdb。

安装

wget --no-check-certificate https://github.com/ideawu/ssdb/archive/master.zip
unzip master
cd ssdb-master
make
# optional, install ssdb in /usr/local/ssdb
sudo make install

启动

# start master
./ssdb-server ssdb.conf # or start as daemon
./ssdb-server -d ssdb.conf 本文主要是基于ssdb的高可用功能,对照源码(版本:1.9.2)进行简要分析。好,进入主题。 入口代码,在ssdb-server.cpp文件中:
 int main(int argc, char **argv){
  MyApplication app;
  return app.main(argc, argv);
} void MyApplication::run(){
Options option;
option.load(*conf); std::string data_db_dir = app_args.work_dir + "/data"; // 这个db是存放用户数据的
std::string meta_db_dir = app_args.work_dir + "/meta"; // 这个db是ssdb内部使用的,存放一些属性数据,为同步数据提供服务 // 省略日志 SSDB *data_db = NULL;
SSDB *meta_db = NULL;
data_db = SSDB::open(option, data_db_dir);
if(!data_db){
log_fatal("could not open data db: %s", data_db_dir.c_str());
fprintf(stderr, "could not open data db: %s\n", data_db_dir.c_str());
exit();
} meta_db = SSDB::open(Options(), meta_db_dir);
if(!meta_db){
log_fatal("could not open meta db: %s", meta_db_dir.c_str());
fprintf(stderr, "could not open meta db: %s\n", meta_db_dir.c_str());
exit();
}
  // 由此可见ssdb会打开两个db,一个名叫data,另一个名叫meta NetworkServer *net = NULL;
SSDBServer *server;
net = NetworkServer::init(*conf);
server = new SSDBServer(data_db, meta_db, *conf, net); log_info("pidfile: %s, pid: %d", app_args.pidfile.c_str(), (int)getpid());
log_info("ssdb server started.");
net->serve(); delete net;
delete server;
delete meta_db;
delete data_db; log_info("%s exit.", APP_NAME);
}

ssdb的所有数据先通过Binlog进行封装,再通过Binlog操作leveldb引擎,而Binlog的操作又通过BinlogQueue管理。先看看ssdb的binlog数据格式:

Binlog里面只保存key,没有value,数据格式为: head + key,其中head的组成是(uint64_t)seq + (char)type + (char)cmd。

//binlog.h
class Binlog{
private: // 存储数据的buf,Binlog里面只保存key,没有value,数据格式为: head + key
std::string buf; // Binlog数据由头部和body组成。头部包含uint64_t类型的seq和一个字符的数据同步类型以及一个字节的cmd号。即head:(uint64_t)seq + (char)type + (char)cmd
static const unsigned int HEADER_LEN = sizeof(uint64_t) + ;
public:
Binlog(){}
Binlog(uint64_t seq, char type, char cmd, const leveldb::Slice &key); // 看构造传入的参数 int load(const Bytes &s); // 从Bytes加载数据
int load(const leveldb::Slice &s); // 从Slice加载数据
int load(const std::string &s); // 从string加载数据 uint64_t seq() const; // 从Binlog字节流里面取seq
char type() const; // 从Binlog字节流里面取type
char cmd() const; // 从Binlog字节流里面取cmd
const Bytes key() const; // 从Binlog字节流里面取key const char* data() const{
return buf.data();
}
int size() const{
return (int)buf.size();
}
const std::string repr() const{ // 取内容
return this->buf;
}
std::string dumps() const; // 格式化成可显示的字符串
}; //binlog.cpp
Binlog::Binlog(uint64_t seq, char type, char cmd, const leveldb::Slice &key){
buf.append((char *)(&seq), sizeof(uint64_t));
buf.push_back(type);
buf.push_back(cmd);
buf.append(key.data(), key.size());
} uint64_t Binlog::seq() const{
return *((uint64_t *)(buf.data()));
} char Binlog::type() const{
return buf[sizeof(uint64_t)];
} char Binlog::cmd() const{
return buf[sizeof(uint64_t) + ];
} const Bytes Binlog::key() const{
return Bytes(buf.data() + HEADER_LEN, buf.size() - HEADER_LEN);
} int Binlog::load(const Bytes &s){
if(s.size() < HEADER_LEN){
return -;
}
buf.assign(s.data(), s.size());
return ;
} int Binlog::load(const leveldb::Slice &s){
if(s.size() < HEADER_LEN){
return -;
}
buf.assign(s.data(), s.size());
return ;
} int Binlog::load(const std::string &s){
if(s.size() < HEADER_LEN){
return -;
}
buf.assign(s.data(), s.size());
return ;
} std::string Binlog::dumps() const{
std::string str;
if(buf.size() < HEADER_LEN){
return str;
}
char buf[];
snprintf(buf, sizeof(buf), "%" PRIu64 " ", this->seq());
str.append(buf); switch(this->type()){
case BinlogType::NOOP:
str.append("noop ");
break;
case BinlogType::SYNC:
str.append("sync ");
break;
case BinlogType::MIRROR:
str.append("mirror ");
break;
case BinlogType::COPY:
str.append("copy ");
break;
}
switch(this->cmd()){
case BinlogCommand::NONE:
str.append("none ");
break;
case BinlogCommand::KSET:
str.append("set ");
break;
case BinlogCommand::KDEL:
str.append("del ");
break;
case BinlogCommand::HSET:
str.append("hset ");
break;
case BinlogCommand::HDEL:
str.append("hdel ");
break;
case BinlogCommand::ZSET:
str.append("zset ");
break;
case BinlogCommand::ZDEL:
str.append("zdel ");
break;
case BinlogCommand::BEGIN:
str.append("begin ");
break;
case BinlogCommand::END:
str.append("end ");
break;
case BinlogCommand::QPUSH_BACK:
str.append("qpush_back ");
break;
case BinlogCommand::QPUSH_FRONT:
str.append("qpush_front ");
break;
case BinlogCommand::QPOP_BACK:
str.append("qpop_back ");
break;
case BinlogCommand::QPOP_FRONT:
str.append("qpop_front ");
case BinlogCommand::QSET:
str.append("qset ");
break;
}
Bytes b = this->key();
str.append(hexmem(b.data(), b.size()));
return str;
}

再看看数据是如何写入到db的,主要看BinlogQueue,BinlogQueue是一个循环队列,封装了对数据的操作接口,上层业务调用BinlogQueue提供的接口操作db。需要注意的是,在数据同步时,管理db中最新的一段数据,充当一个缓冲区的作用,如果同步的seq在[min_seq, last_seq]范围内,就执行SYNC,否则执行COPY。先看BinlogQueue代码:

// circular queue
class BinlogQueue{
private:
#ifdef NDEBUG
static const int LOG_QUEUE_SIZE = * * ;
#else
static const int LOG_QUEUE_SIZE = ;
#endif
leveldb::DB *db; // 这个就是要操作的存储数据的db
uint64_t min_seq; // 队列中的最小seq,用于同步数据时标识起点。注意该seq不是db中的最小seq。
uint64_t last_seq; // db中最新seq,标识当前数据的最大seq
uint64_t tran_seq;
int capacity;
leveldb::WriteBatch batch; // leveldb的批量写 volatile bool thread_quit;
static void* log_clean_thread_func(void *arg); // 定量清理log的线程函数,在独立线程维护清理工作
int del(uint64_t seq); // 按seq删除数据
// [start, end] includesive
int del_range(uint64_t start, uint64_t end); // 按seq的范围删除数据 void merge();
bool enabled;
public:
Mutex mutex; BinlogQueue(leveldb::DB *db, bool enabled=true);
~BinlogQueue();
void begin();
void rollback();
leveldb::Status commit(); // 将batch里面的数据写到db
// leveldb put
void Put(const leveldb::Slice& key, const leveldb::Slice& value); // 写数据提交到batch
// leveldb delete
void Delete(const leveldb::Slice& key); // 删数据提交到batch
void add_log(char type, char cmd, const leveldb::Slice &key); // 添加一行Binlog日志到batch
void add_log(char type, char cmd, const std::string &key); // 添加一行Binlog日志到batch int get(uint64_t seq, Binlog *log) const; // 从db获取一条Binlog日志
int update(uint64_t seq, char type, char cmd, const std::string &key);// 直接操作db,写一条Binlog日志到db void flush(); // 清除db中当前管理的Binlog日志 /** @returns
1 : log.seq greater than or equal to seq
0 : not found
-1: error
*/
int find_next(uint64_t seq, Binlog *log) const; // 根据seq查找下一条Binlog日志
int find_last(Binlog *log) const; // 查找最新的Binlog日志 std::string stats() const; // 合并Binlog日志
}; BinlogQueue::BinlogQueue(leveldb::DB *db, bool enabled){
this->db = db;
this->min_seq = ;
this->last_seq = ;
this->tran_seq = ;
this->capacity = LOG_QUEUE_SIZE;
this->enabled = enabled; // 下面的逻辑可以表明min_seq并非db中的最小seq,而只是队列范围中的最小seq
Binlog log;
if(this->find_last(&log) == ){
this->last_seq = log.seq();
}
if(this->last_seq > LOG_QUEUE_SIZE){
this->min_seq = this->last_seq - LOG_QUEUE_SIZE;
}else{
this->min_seq = ;
}
// TODO: use binary search to find out min_seq
if(this->find_next(this->min_seq, &log) == ){
this->min_seq = log.seq();
}
if(this->enabled){
log_info("binlogs capacity: %d, min: %" PRIu64 ", max: %" PRIu64 ",", capacity, min_seq, last_seq);
} // 启动清理Binlog的线程,定量清理Binlog
if(this->enabled){
thread_quit = false;
pthread_t tid;
int err = pthread_create(&tid, NULL, &BinlogQueue::log_clean_thread_func, this);
if(err != ){
log_fatal("can't create thread: %s", strerror(err));
exit();
}
}
}

上述的BinlogQueue只保存LOG_QUEUE_SIZE条记录的范围,若同步的seq在此范围内,就执行SYNC,否则执行COPY。

SSDB 的主从同步策略非常简单, 就是把主(Master)上的所有写操作(Binlogs), 在从(Slave)上再执行一遍.
MySQL 的主从同步也是一样. 而多主可以理解为互为主从.

把 Master 上的所有操作(Binlogs)在 Slave 上执行一遍, 说来很简单, 但还是会遇到一些难题,
例如 Binlogs 不可能无限地永久保留. SSDB 只保留最新的 1000 万次写操作.
对于熟悉 MySQL 的同学可能也知道这样的例子: 在有 Binlogs 之前, 数据库内已经有了一部分数据.
也就是说, 这部分数据是无法通过 Binlog 来获得的.

SSDB 数据库中的所有数据都是排好序的, 所以你可以把整个数据库理解为一个链表,
SSDB 从表头开始 Copy, 一次一个节点, 游标一直往后. 这时, 如果有新的 Binlog,
SSDB 会先判断这条 Binlog 对应的节点在链表中的什么位置, 是在游标的前面还是后面?

如果在游标的前面, 那么会把这条 Binlog 发给 Slave 执行.
如果在游标的后面, 就会直接忽略掉, 因为游标最终会移动到更新的位置.
从这个描述也可以知道, 处于 Copy 阶段的 Slave, 有可能无法立即知道 Master 上的更新.

当游标移动到了链表的末端之后, Copy 过程就结束了, 主从同步流程进入到 Sync 阶段, 也就是即时(毫秒级)更新阶段.

__引自 http://www.ideawu.net/blog/archives/849.html

master接收slave连接时,master启动线程处理同步逻辑。看看master的BackendSync行为:

void* BackendSync::_run_thread(void *arg){
pthread_detach(pthread_self());
struct run_arg *p = (struct run_arg*)arg;
BackendSync *backend = (BackendSync *)p->backend;
Link *link = (Link *)p->link;
delete p; // set Link non block
link->noblock(false); SSDBImpl *ssdb = (SSDBImpl *)backend->ssdb;
BinlogQueue *logs = ssdb->binlogs; Client client(backend);
client.link = link;
client.init(); {
pthread_t tid = pthread_self();
Locking l(&backend->mutex);
backend->workers[tid] = &client;
} // sleep longer to reduce logs.find
#define TICK_INTERVAL_MS 300
#define NOOP_IDLES (3000/TICK_INTERVAL_MS) int idle = ;
while(!backend->thread_quit){ if(client.status == Client::OUT_OF_SYNC){
client.reset();
continue;
} bool is_empty = true;
// 先调sync,sync里面会更新last_seq。而且sync里面会判断,如果是COPY阶段,新的Binlog操作的key在COPY的key之后,就忽略本条Binlog
// 等待COPY操作顺序复制key-value
if(client.sync(logs)){
is_empty = false;
} // 如果是COPY阶段,就执行copy操作,按key范围顺序拷贝
if(client.status == Client::COPY){
if(client.copy()){
is_empty = false;
}
}
if(is_empty){
if(idle >= NOOP_IDLES){
idle = ;
client.noop();
}else{
idle ++;
usleep(TICK_INTERVAL_MS * );
}
}else{
idle = ;
} float data_size_mb = link->output->size() / 1024.0 / 1024.0;
if(link->flush() == -){
log_info("%s:%d fd: %d, send error: %s", link->remote_ip, link->remote_port, link->fd(), strerror(errno));
break;
}
if(backend->sync_speed > ){
usleep((data_size_mb / backend->sync_speed) * * );
}
} log_info("Sync Client quit, %s:%d fd: %d, delete link", link->remote_ip, link->remote_port, link->fd());
delete link; Locking l(&backend->mutex);
backend->workers.erase(pthread_self());
return (void *)NULL;
} // 初始化操作里面根据slave发过来的seq和key判断执行的状态
// 首次连上来的slave,传过来的last_key和last_seq都没有值,进入COPY状态;
// 如果last_key为空,last_seq不为空,则表明是重连,进入SYNC状态;
// 否则为COPY状态。
void BackendSync::Client::init(){
const std::vector<Bytes> *req = this->link->last_recv();
last_seq = ;
if(req->size() > ){
last_seq = req->at().Uint64();
}
last_key = "";
if(req->size() > ){
last_key = req->at().String();
}
// is_mirror
if(req->size() > ){
if(req->at().String() == "mirror"){
is_mirror = true;
}
}
const char *type = is_mirror? "mirror" : "sync";
// a slave must reset its last_key when receiving 'copy_end' command
if(last_key == "" && last_seq != ){
log_info("[%s] %s:%d fd: %d, sync recover, seq: %" PRIu64 ", key: '%s'",
type,
link->remote_ip, link->remote_port,
link->fd(),
last_seq, hexmem(last_key.data(), last_key.size()).c_str()
);
this->status = Client::SYNC; Binlog log(this->last_seq, BinlogType::COPY, BinlogCommand::END, "");
log_trace("fd: %d, %s", link->fd(), log.dumps().c_str());
link->send(log.repr(), "copy_end");
}else if(last_key == "" && last_seq == ){
log_info("[%s] %s:%d fd: %d, copy begin, seq: %" PRIu64 ", key: '%s'",
type,
link->remote_ip, link->remote_port,
link->fd(),
last_seq, hexmem(last_key.data(), last_key.size()).c_str()
);
this->reset();
}else{
log_info("[%s] %s:%d fd: %d, copy recover, seq: %" PRIu64 ", key: '%s'",
type,
link->remote_ip, link->remote_port,
link->fd(),
last_seq, hexmem(last_key.data(), last_key.size()).c_str()
);
this->status = Client::COPY;
}
log_debug("==last seq:%lu, last_key:%s, type:%s, status:%u", last_seq
, hexmem(last_key.data(), last_key.size()).c_str(), type, this->status);
} // 根据迭代器将当前的数据拷贝到slave
int BackendSync::Client::copy(){
if(this->iter == NULL){
log_info("new iterator, last_key: '%s'", hexmem(last_key.data(), last_key.size()).c_str());
std::string key = this->last_key;
if(this->last_key.empty()){
key.push_back(DataType::MIN_PREFIX);
}
this->iter = backend->ssdb->iterator(key, "", -);
log_info("iterator created, last_key: '%s'", hexmem(last_key.data(), last_key.size()).c_str());
}
int ret = ;
int iterate_count = ;
int64_t stime = time_ms();
while(true){
// Prevent copy() from blocking too long
if(++iterate_count > || link->output->size() > * * ){
break;
}
if(time_ms() - stime > ){
log_info("copy blocks too long, flush");
break;
} if(!iter->next()){
goto copy_end;
}
Bytes key = iter->key();
if(key.size() == ){
continue;
}
// finish copying all valid data types
if(key.data()[] > DataType::MAX_PREFIX){
goto copy_end;
}
Bytes val = iter->val();
this->last_key = key.String(); char cmd = ;
char data_type = key.data()[];
if(data_type == DataType::KV){
cmd = BinlogCommand::KSET;
}else if(data_type == DataType::HASH){
cmd = BinlogCommand::HSET;
}else if(data_type == DataType::ZSET){
cmd = BinlogCommand::ZSET;
}else if(data_type == DataType::QUEUE){
cmd = BinlogCommand::QPUSH_BACK;
}else{
continue;
} ret = ; Binlog log(this->last_seq, BinlogType::COPY, cmd, slice(key));
log_trace("fd: %d, %s", link->fd(), log.dumps().c_str());
link->send(log.repr(), val);
}
return ret; copy_end:
log_info("%s:%d fd: %d, copy end", link->remote_ip, link->remote_port, link->fd());
this->status = Client::SYNC;
delete this->iter;
this->iter = NULL; Binlog log(this->last_seq, BinlogType::COPY, BinlogCommand::END, "");
log_trace("fd: %d, %s", link->fd(), log.dumps().c_str());
link->send(log.repr(), "copy_end");
return ;
} int BackendSync::Client::sync(BinlogQueue *logs){
Binlog log;
// 从BinlogQueue取一条数据进行同步
while(){
int ret = ;
uint64_t expect_seq = this->last_seq + ;
if(this->status == Client::COPY && this->last_seq == ){
ret = logs->find_last(&log);
}else{
ret = logs->find_next(expect_seq, &log);
}
if(ret == ){
return ;
}
if(this->status == Client::COPY && log.key() > this->last_key){
log_debug("fd: %d, last_key: '%s', drop: %s",
link->fd(),
hexmem(this->last_key.data(), this->last_key.size()).c_str(),
log.dumps().c_str());
this->last_seq = log.seq();
// WARN: When there are writes behind last_key, we MUST create
// a new iterator, because iterator will not know this key.
// Because iterator ONLY iterates throught keys written before
// iterator is created.
if(this->iter){
delete this->iter;
this->iter = NULL;
}
continue;
}
if(this->last_seq != && log.seq() != expect_seq){
log_warn("%s:%d fd: %d, OUT_OF_SYNC! log.seq: %" PRIu64 ", expect_seq: %" PRIu64 "",
link->remote_ip, link->remote_port,
link->fd(),
log.seq(),
expect_seq
);
this->status = Client::OUT_OF_SYNC;
return ;
} // update last_seq
log_debug("==last seq:%lu. now seq:%lu", this->last_seq, log.seq());
this->last_seq = log.seq(); char type = log.type();
if(type == BinlogType::MIRROR && this->is_mirror){
if(this->last_seq - this->last_noop_seq >= ){
this->noop();
return ;
}else{
continue;
}
} break;
} // 根据数据类型添加数据到待发送缓冲区
int ret = ;
std::string val;
switch(log.cmd()){
case BinlogCommand::KSET:
case BinlogCommand::HSET:
case BinlogCommand::ZSET:
case BinlogCommand::QSET:
case BinlogCommand::QPUSH_BACK:
case BinlogCommand::QPUSH_FRONT:
ret = backend->ssdb->raw_get(log.key(), &val);
if(ret == -){
log_error("fd: %d, raw_get error!", link->fd());
}else if(ret == ){
//log_debug("%s", hexmem(log.key().data(), log.key().size()).c_str());
log_trace("fd: %d, skip not found: %s", link->fd(), log.dumps().c_str());
}else{
log_trace("fd: %d, %s", link->fd(), log.dumps().c_str());
link->send(log.repr(), val);
}
break;
case BinlogCommand::KDEL:
case BinlogCommand::HDEL:
case BinlogCommand::ZDEL:
case BinlogCommand::QPOP_BACK:
case BinlogCommand::QPOP_FRONT:
log_trace("fd: %d, %s", link->fd(), log.dumps().c_str());
link->send(log.repr());
break;
}
return ;
}
对于slave发送过来的sync140请求,master启动单独线程BackendSync,进入数据同步逻辑。该线程在slave和master连接期间一直存活。slave何时发送sync140?在启动的时候。逻辑如下:
// serv.cpp
SSDBServer::SSDBServer(SSDB *ssdb, SSDB *meta, const Config &conf, NetworkServer *net){
this->ssdb = (SSDBImpl *)ssdb;
this->meta = meta; net->data = this;
this->reg_procs(net); int sync_speed = conf.get_num("replication.sync_speed"); backend_dump = new BackendDump(this->ssdb);
backend_sync = new BackendSync(this->ssdb, sync_speed);
expiration = new ExpirationHandler(this->ssdb); cluster = new Cluster(this->ssdb);
if(cluster->init() == -){
log_fatal("cluster init failed!");
exit();
} { // slaves
const Config *repl_conf = conf.get("replication");
if(repl_conf != NULL){
std::vector<Config *> children = repl_conf->children;
for(std::vector<Config *>::iterator it = children.begin(); it != children.end(); it++){
Config *c = *it;
if(c->key != "slaveof"){
continue;
}
std::string ip = c->get_str("ip");
int port = c->get_num("port");
if(ip == "" || port <= || port > ){
continue;
}
bool is_mirror = false;
std::string type = c->get_str("type");
if(type == "mirror"){
is_mirror = true;
}else{
type = "sync";
is_mirror = false;
} std::string id = c->get_str("id"); log_info("slaveof: %s:%d, type: %s", ip.c_str(), port, type.c_str());
Slave *slave = new Slave(ssdb, meta, ip.c_str(), port, is_mirror);
if(!id.empty()){
slave->set_id(id);
}
slave->auth = c->get_str("auth");
// start里面会启动线程,执行Slave::_run_thread。连接master之后就发送sync140请求。并在此线程保存last_key和last_seq。
slave->start();
slaves.push_back(slave);
}
}
} // load kv_range
int ret = this->get_kv_range(&this->kv_range_s, &this->kv_range_e);
if(ret == -){
log_fatal("load key_range failed!");
exit();
}
log_info("key_range.kv: \"%s\", \"%s\"",
str_escape(this->kv_range_s).c_str(),
str_escape(this->kv_range_e).c_str()
);
} // slave.cpp
void* Slave::_run_thread(void *arg){
Slave *slave = (Slave *)arg;
const std::vector<Bytes> *req;
Fdevents select;
const Fdevents::events_t *events;
int idle = ;
bool reconnect = false; #define RECV_TIMEOUT 200
#define MAX_RECV_TIMEOUT 300 * 1000
#define MAX_RECV_IDLE MAX_RECV_TIMEOUT/RECV_TIMEOUT while(!slave->thread_quit){
if(reconnect){
slave->status = DISCONNECTED;
reconnect = false;
select.del(slave->link->fd());
delete slave->link;
slave->link = NULL;
sleep();
}
if(!slave->connected()){
if(slave->connect() != ){
usleep( * );
}else{
select.set(slave->link->fd(), FDEVENT_IN, , NULL);
}
continue;
} events = select.wait(RECV_TIMEOUT);
if(events == NULL){
log_error("events.wait error: %s", strerror(errno));
sleep();
continue;
}else if(events->empty()){
if(idle++ >= MAX_RECV_IDLE){
log_error("the master hasn't responsed for awhile, reconnect...");
idle = ;
reconnect = true;
}
continue;
}
idle = ; if(slave->link->read() <= ){
log_error("link.read error: %s, reconnecting to master", strerror(errno));
reconnect = true;
continue;
} while(){
req = slave->link->recv();
if(req == NULL){
log_error("link.recv error: %s, reconnecting to master", strerror(errno));
reconnect = true;
break;
}else if(req->empty()){
break;
}else if(req->at() == "noauth"){
log_error("authentication required");
reconnect = true;
sleep();
break;
}else{
if(slave->proc(*req) == -){
goto err;
}
}
}
} // end while
log_info("Slave thread quit");
return (void *)NULL; err:
log_fatal("Slave thread exit unexpectedly");
exit();
return (void *)NULL;;
}
上一篇:.Net Core中使用RabbitMQ


下一篇:下拉框上移、下移、添加、移除demo