redis的数据恢复过程
redis的数据载入主要是指redis重启时候恢复数据的过程,恢复的数据总共有两种:
- aof 数据文件
- rdb 数据文件
数据恢复的过程是二选一的过程,也就是如果开启aof持久化那么就会使用aof文件进行恢复,如果没有才会选择rdb文件进行恢复。
void loadDataFromDisk(void) {
// 记录开始时间
long long start = ustime();
// AOF 持久化已打开?
if (server.aof_state == REDIS_AOF_ON) {
// 尝试载入 AOF 文件
if (loadAppendOnlyFile(server.aof_filename) == REDIS_OK)
// 打印载入信息,并计算载入耗时长度
redisLog(REDIS_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000);
// AOF 持久化未打开
} else {
// 尝试载入 RDB 文件
if (rdbLoad(server.rdb_filename) == REDIS_OK) {
// 打印载入信息,并计算载入耗时长度
redisLog(REDIS_NOTICE,"DB loaded from disk: %.3f seconds",
(float)(ustime()-start)/1000000);
} else if (errno != ENOENT) {
redisLog(REDIS_WARNING,"Fatal error loading the DB: %s. Exiting.",strerror(errno));
exit(1);
}
}
}
redis aof数据恢复过程
整个aof文件载入的过程其实是非常简单,整体步骤如下:
- 打开aof文件开始循环读取
- 根据aof写入的命令解析redis 命令行
- 通过伪命令行客户端执行解析的命令行
- redis接收到伪客户端发送的命令行以后找到命令对应的函数负责执行数据写入
aof保存的命令行格式类似"*3\r\n$3\r\nSET\r\n$5\r\nmykey\r\n$7\r\nmyvalue\r\n", 所以解析到*字符就知道是一个命令的开始,然后就知道命令涉及的参数个数,每个参数都以$字符开始标记字符串长度,知道字符串长度就可以解析出命令字符串了。
int loadAppendOnlyFile(char *filename) {
// 为客户端
struct redisClient *fakeClient;
// 打开 AOF 文件
FILE *fp = fopen(filename,"r");
struct redis_stat sb;
int old_aof_state = server.aof_state;
long loops = 0;
// 检查文件的正确性
if (fp && redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) {
server.aof_current_size = 0;
fclose(fp);
return REDIS_ERR;
}
// 检查文件是否正常打开
if (fp == NULL) {
redisLog(REDIS_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
exit(1);
}
/*
*
* 暂时性地关闭 AOF ,防止在执行 MULTI 时,
* EXEC 命令被传播到正在打开的 AOF 文件中。
*/
server.aof_state = REDIS_AOF_OFF;
fakeClient = createFakeClient();
// 设置服务器的状态为:正在载入
// startLoading 定义于 rdb.c
startLoading(fp);
while(1) {
int argc, j;
unsigned long len;
robj **argv;
char buf[128];
sds argsds;
struct redisCommand *cmd;
/*
* 间隔性地处理客户端发送来的请求
* 因为服务器正处于载入状态,所以能正常执行的只有 PUBSUB 等模块
*/
if (!(loops++ % 1000)) {
loadingProgress(ftello(fp));
processEventsWhileBlocked();
}
// 读入文件内容到缓存
if (fgets(buf,sizeof(buf),fp) == NULL) {
if (feof(fp))
// 文件已经读完,跳出
break;
else
goto readerr;
}
// 确认协议格式,比如 *3\r\n
if (buf[0] != '*') goto fmterr;
// 取出命令参数,比如 *3\r\n 中的 3
argc = atoi(buf+1);
// 至少要有一个参数(被调用的命令)
if (argc < 1) goto fmterr;
// 从文本中创建字符串对象:包括命令,以及命令参数
// 例如 $3\r\nSET\r\n$3\r\nKEY\r\n$5\r\nVALUE\r\n
// 将创建三个包含以下内容的字符串对象:
// SET 、 KEY 、 VALUE
argv = zmalloc(sizeof(robj*)*argc);
for (j = 0; j < argc; j++) {
if (fgets(buf,sizeof(buf),fp) == NULL) goto readerr;
if (buf[0] != '$') goto fmterr;
// 读取参数值的长度
len = strtol(buf+1,NULL,10);
// 读取参数值
argsds = sdsnewlen(NULL,len);
if (len && fread(argsds,len,1,fp) == 0) goto fmterr;
// 为参数创建对象
argv[j] = createObject(REDIS_STRING,argsds);
if (fread(buf,2,1,fp) == 0) goto fmterr; /* discard CRLF */
}
/* Command lookup
*
* 查找命令
*/
cmd = lookupCommand(argv[0]->ptr);
if (!cmd) {
redisLog(REDIS_WARNING,"Unknown command '%s' reading the append only file", (char*)argv[0]->ptr);
exit(1);
}
/*
* 调用伪客户端,执行命令
*/
fakeClient->argc = argc;
fakeClient->argv = argv;
cmd->proc(fakeClient);
/* The fake client should not have a reply */
redisAssert(fakeClient->bufpos == 0 && listLength(fakeClient->reply) == 0);
/* The fake client should never get blocked */
redisAssert((fakeClient->flags & REDIS_BLOCKED) == 0);
/*
* 清理命令和命令参数对象
*/
for (j = 0; j < fakeClient->argc; j++)
decrRefCount(fakeClient->argv[j]);
zfree(fakeClient->argv);
}
/*
* 如果能执行到这里,说明 AOF 文件的全部内容都可以正确地读取,
* 但是,还要检查 AOF 是否包含未正确结束的事务
*/
if (fakeClient->flags & REDIS_MULTI) goto readerr;
// 关闭 AOF 文件
fclose(fp);
// 释放伪客户端
freeFakeClient(fakeClient);
// 复原 AOF 状态
server.aof_state = old_aof_state;
// 停止载入
stopLoading();
// 更新服务器状态中, AOF 文件的当前大小
aofUpdateCurrentSize();
// 记录前一次重写时的大小
server.aof_rewrite_base_size = server.aof_current_size;
return REDIS_OK;
// 读入错误
readerr:
// 非预期的末尾,可能是 AOF 文件在写入的中途遭遇了停机
if (feof(fp)) {
redisLog(REDIS_WARNING,"Unexpected end of file reading the append only file");
// 文件内容出错
} else {
redisLog(REDIS_WARNING,"Unrecoverable error reading the append only file: %s", strerror(errno));
}
exit(1);
// 内容格式错误
fmterr:
redisLog(REDIS_WARNING");
exit(1);
}
redis rdb数据恢复过程
整个rdb文件载入的过程其实是非常简单,不过和aof有些许差别:
rdb文件的数据恢复直接写入内存而不是通过伪装命令行执行命令生成的
rdb文件的读取过程和aof不一样,rdb文件存储按照type+key+value的格式存储所以读取也是这样读取的
整体恢复步骤如下:
- 打开rdb文件开始恢复数据
- 读取type用于判断读取value的格式
- 读取key且key的第一个字节标明了key的长度所以可以读取准确长度的key
- 读取value对象,读取过程根据type进行读取以及恢复
/*
* 将给定 rdb 中保存的数据载入到数据库中。
*/
int rdbLoad(char *filename) {
uint32_t dbid;
int type, rdbver;
redisDb *db = server.db+0;
char buf[1024];
long long expiretime, now = mstime();
FILE *fp;
rio rdb;
// 打开 rdb 文件
if ((fp = fopen(filename,"r")) == NULL) return REDIS_ERR;
// 初始化写入流
rioInitWithFile(&rdb,fp);
rdb.update_cksum = rdbLoadProgressCallback;
rdb.max_processing_chunk = server.loading_process_events_interval_bytes;
if (rioRead(&rdb,buf,9) == 0) goto eoferr;
buf[9] = '\0';
// 检查版本号
if (memcmp(buf,"REDIS",5) != 0) {
fclose(fp);
redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
errno = EINVAL;
return REDIS_ERR;
}
rdbver = atoi(buf+5);
if (rdbver < 1 || rdbver > REDIS_RDB_VERSION) {
fclose(fp);
redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
errno = EINVAL;
return REDIS_ERR;
}
// 将服务器状态调整到开始载入状态
startLoading(fp);
while(1) {
robj *key, *val;
expiretime = -1;
/* Read type.
*
* 读入类型指示,决定该如何读入之后跟着的数据。
*
* 这个指示可以是 rdb.h 中定义的所有以
* REDIS_RDB_TYPE_* 为前缀的常量的其中一个
* 或者所有以 REDIS_RDB_OPCODE_* 为前缀的常量的其中一个
*/
if ((type = rdbLoadType(&rdb)) == -1) goto eoferr;
// 读入过期时间值
if (type == REDIS_RDB_OPCODE_EXPIRETIME) {
// 以秒计算的过期时间
if ((expiretime = rdbLoadTime(&rdb)) == -1) goto eoferr;
/* We read the time so we need to read the object type again.
*
* 在过期时间之后会跟着一个键值对,我们要读入这个键值对的类型
*/
if ((type = rdbLoadType(&rdb)) == -1) goto eoferr;
/* the EXPIRETIME opcode specifies time in seconds, so convert
* into milliseconds.
*
* 将格式转换为毫秒*/
expiretime *= 1000;
} else if (type == REDIS_RDB_OPCODE_EXPIRETIME_MS) {
// 以毫秒计算的过期时间
/* Milliseconds precision expire times introduced with RDB
* version 3. */
if ((expiretime = rdbLoadMillisecondTime(&rdb)) == -1) goto eoferr;
/* We read the time so we need to read the object type again.
*
* 在过期时间之后会跟着一个键值对,我们要读入这个键值对的类型
*/
if ((type = rdbLoadType(&rdb)) == -1) goto eoferr;
}
// 读入数据 EOF (不是 rdb 文件的 EOF)
if (type == REDIS_RDB_OPCODE_EOF)
break;
/*
* 读入切换数据库指示
*/
if (type == REDIS_RDB_OPCODE_SELECTDB) {
// 读入数据库号码
if ((dbid = rdbLoadLen(&rdb,NULL)) == REDIS_RDB_LENERR)
goto eoferr;
// 检查数据库号码的正确性
if (dbid >= (unsigned)server.dbnum) {
redisLog(REDIS_WARNING,"FATAL: ", server.dbnum);
exit(1);
}
// 在程序内容切换数据库
db = server.db+dbid;
// 跳过
continue;
}
/* Read key
*
* 读入键
*/
if ((key = rdbLoadStringObject(&rdb)) == NULL) goto eoferr;
/* Read value
*
* 读入值
*/
if ((val = rdbLoadObject(type,&rdb)) == NULL) goto eoferr;
/*
*
* 如果服务器为主节点的话,
* 那么在键已经过期的时候,不再将它们关联到数据库中去
*/
if (server.masterhost == NULL && expiretime != -1 && expiretime < now) {
decrRefCount(key);
decrRefCount(val);
// 跳过
continue;
}
/* Add the new object in the hash table
*
* 将键值对关联到数据库中
*/
dbAdd(db,key,val);
/* Set the expire time if needed
*
* 设置过期时间
*/
if (expiretime != -1) setExpire(db,key,expiretime);
decrRefCount(key);
}
/* Verify the checksum if RDB version is >= 5
*
* 如果 RDB 版本 >= 5 ,那么比对校验和
*/
if (rdbver >= 5 && server.rdb_checksum) {
uint64_t cksum, expected = rdb.cksum;
// 读入文件的校验和
if (rioRead(&rdb,&cksum,8) == 0) goto eoferr;
memrev64ifbe(&cksum);
// 比对校验和
if (cksum == 0) {
redisLog(REDIS_WARNING,"RDB file was saved with checksum disabled: no check performed.");
} else if (cksum != expected) {
redisLog(REDIS_WARNING,"Wrong RDB checksum. Aborting now.");
exit(1);
}
}
// 关闭 RDB
fclose(fp);
// 服务器从载入状态中退出
stopLoading();
return REDIS_OK;
eoferr: /* unexpected end of file is handled here with a fatal exit */
redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
exit(1);
return REDIS_ERR; /* Just to avoid warning */
}