单点服务在生产环境是绝对无法接受的,但是数据库服务,要实现多节点或者说分布式部署,面临的问题比 stateless 服务要多的多。数据的同步方式、一致性和可用性的妥协诸多限制,必须都加以考虑。
今天我们来学习一下 redis 主从同步相关内容,本节内容是 redis 实现高可用、数据安全、数据分区的基石。如果在节点之间没有一个可靠的数据同步方法,那么上述的一切都成为空中阁楼。
主节点在任意时刻只有一个,从节点可以有若干个。主从节点需要保持链接,主节点异步的将数据同步到从节点。
Master
主从同步,就是将主节点的数据同步到从节点。同步的大体流程如下:
- 主节点在接受到同步请求后,与从节点进行全量同步,启动 BGSAVE (如果之前已经有可用 BGSAVE 在执行,即不需要启动)
- 主节点需要将后续导致数据变化的命令(或者数据过期)发送到从节点,持续同步数据
全量同步
主节点在启动的时候,并不知道从节点的任何信息。当收到 sync 或者 psync 命令后,与从节点进行开始同步:
void syncCommand(redisClient *c) {
// 已经是 SLAVE ,或者处于 MONITOR 模式,返回
if (c->flags & REDIS_SLAVE) return;
// 如果这是一个从服务器,但与主服务器的连接仍未就绪,那么拒绝 SYNC
// redis 从节点也可以有自己的从节点,这里不展开讨论
if (server.masterhost && server.repl_state != REDIS_REPL_CONNECTED) {
addReplyError(c,"Can't SYNC while not connected with my master");
return;
}
// 要进行 sync 或者 psync 的从节点的发送缓冲区必须是空的。因为我们可能会将
// 这个从节点的发送缓冲区(存储的是 BGSAVE 以后的脏数据)拷贝到其他从节点的输出缓冲区
if (listLength(c->reply) != 0 || c->bufpos != 0) {
addReplyError(c,"SYNC and PSYNC are invalid with pending output");
return;
}
redisLog(REDIS_NOTICE,"Slave asks for synchronization");
/*
解释一下 psync:
psync 代表的是 partial sync,即部分同步。这个机制是为了当已经同步过的主从节点之间
因为某些原因断开链接后,当链接重新建立以后,需要重新开始同步数据时,可以避免不必要的
全量同步。
*/
if (!strcasecmp(c->argv[0]->ptr,"psync")) {
// 尝试进行 PSYNC
if (masterTryPartialResynchronization(c) == REDIS_OK) {
// 可执行 PSYNC
server.stat_sync_partial_ok++;
return; /* No full resync needed, return. */
} else {
// 不可执行 PSYNC
char *master_runid = c->argv[1]->ptr;
// replication id 为 ? 是强制要求全量同步的意思,所以不需要统计到错误里面
if (master_runid[0] != '?') server.stat_sync_partial_err++;
}
} else {
// 旧版实现,设置标识,避免接收 REPLCONF ACK
c->flags |= REDIS_PRE_PSYNC;
}
// 执行 full resynchronization ,增加计数
server.stat_sync_full++;
// 检查是否有 BGSAVE 在执行
if (server.rdb_child_pid != -1) {
// 如果有正在进行的 BGSAVE,我们还需要检查这个 BGSAVE 的数据是不是可以用于与从节点同步数据
// 为什么会不适合呢?如果住节点的 BGSAVE 发生在有从节点与其链接前,主节点不会把 BGSAVE 启动
// 以后的数据保存下来(写入到从节点的输出缓冲区),这样会导致 BGSAVE 启动以后的增量写丢失,
// 这种 BGSAVE 产生的 rdb 文件是无法用于与从节点同步数据使用的
redisClient *slave;
listNode *ln;
listIter li;
// 只要之前已经有处于 REDIS_REPL_WAIT_BGSAVE_END 的从节点,说明这个 BGSAVE 启动以后的
// 增量写数据已经保存下来了,所以不需要重新启动 BGSAVE
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
slave = ln->value;
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
}
if (ln) {
// 幸运的情况,可以使用目前 BGSAVE 所生成的 RDB,我们直接把这个等待 BGSAVE 完成的
// 从节点的输出缓冲区数据拷贝到当前节点的输出缓冲区(里面存储的就是同步数据,这也是上
// 面不允许进行 SYNC 的从节点输出缓冲区内有其他内容的原因
copyClientOutputBuffer(c,slave);
c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
} else {
// 启动这个 BGSAVE 的时候,还没有从节点,所以需要等待下一个 BGSAVE 启动
c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
}
} else {
// 没有 BGSAVE 在进行,开始一个新的 BGSAVE
redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) {
redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
addReplyError(c,"Unable to perform background save");
return;
}
// 设置状态
c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
// 刷新脚本缓存
replicationScriptCacheFlush();
}
// 启用 Nagle 算法,失败也无所谓,所以不检查错误
if (server.repl_disable_tcp_nodelay)
anetDisableTcpNoDelay(NULL, c->fd); /* Non critical if it fails. */
c->repldbfd = -1;
c->flags |= REDIS_SLAVE;
// 确保后续传播到从节点的数据会发送 SELECT 命令
server.slaveseldb = -1;
// 添加到 slave 列表中
listAddNodeTail(server.slaves,c);
// 如果是第一个 slave ,那么初始化 backlog,只有初始化了 backlog,主节点才会传播命令到从节点
if (listLength(server.slaves) == 1 && server.repl_backlog == NULL)
createReplicationBacklog();
return;
}
/*
当 BGSAVE 完成时,会调用这个函数
*/
void backgroundSaveDoneHandler(int exitcode, int bysignal) {
if (!bysignal && exitcode == 0) {
// BGSAVE 成功
redisLog(REDIS_NOTICE,
"Background saving terminated with success");
// dirty 更新为 BGSAVE 以后的脏数据数
server.dirty = server.dirty - server.dirty_before_bgsave;
server.lastsave = time(NULL);
server.lastbgsave_status = REDIS_OK;
} else if (!bysignal && exitcode != 0) {
// BGSAVE 出错
redisLog(REDIS_WARNING, "Background saving error");
server.lastbgsave_status = REDIS_ERR;
} else {
// BGSAVE 被中断
redisLog(REDIS_WARNING,
"Background saving terminated by signal %d", bysignal);
// 移除临时文件
rdbRemoveTempFile(server.rdb_child_pid);
/* SIGUSR1 is whitelisted, so we have a way to kill a child without
* tirggering an error conditon. */
if (bysignal != SIGUSR1)
server.lastbgsave_status = REDIS_ERR;
}
// 更新服务器状态
server.rdb_child_pid = -1;
server.rdb_save_time_last = time(NULL)-server.rdb_save_time_start;
server.rdb_save_time_start = -1;
// 处理正在等待 BGSAVE 完成的从节点
updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
}
void updateSlavesWaitingBgsave(int bgsaveerr) {
listNode *ln;
int startbgsave = 0;
listIter li;
// 遍历所有 slave
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
redisClient *slave = ln->value;
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
// 之前的 RDB 文件不能被 slave 使用,立马开始一个新 BGSAVE
startbgsave = 1;
slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
} else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
// 执行到这里,说明有 slave 在等待 BGSAVE 完成
struct redis_stat buf;
if (bgsaveerr != REDIS_OK) {
// 但是 BGSAVE 执行错误 释放 slave
freeClient(slave);
redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
continue;
}
// 打开 RDB 文件
if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 ||
redis_fstat(slave->repldbfd,&buf) == -1) {
freeClient(slave);
redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
continue;
}
// 设置偏移量,各种值,准备发送 rdb 文件给从节点
slave->repldboff = 0;
slave->repldbsize = buf.st_size;
slave->replstate = REDIS_REPL_SEND_BULK;
slave->replpreamble = sdscatprintf(sdsempty(),"$%lld\r\n",
(unsigned long long) slave->repldbsize);
// 清空之前的写事件处理器,注册新的写事件处理器
aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
freeClient(slave);
continue;
}
}
}
// 需要执行新的 BGSAVE
if (startbgsave) {
// 开始行的 BGSAVE ,并清空脚本缓存
replicationScriptCacheFlush();
if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) {
// 启动 BGSAVE 失败的话,断开与从节点的链接
listIter li;
listRewind(server.slaves,&li);
redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
while((ln = listNext(&li))) {
redisClient *slave = ln->value;
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
freeClient(slave);
}
}
}
}
// 向从节点发送 rdb 文件时的写回调函数
void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
redisClient *slave = privdata;
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);
char buf[REDIS_IOBUF_LEN];
ssize_t nwritten, buflen;
// 要发送 rdb 文件的长度到从节点
if (slave->replpreamble) {
nwritten = write(fd,slave->replpreamble,sdslen(slave->replpreamble));
if (nwritten == -1) {
redisLog(REDIS_VERBOSE,"Write error sending RDB preamble to slave: %s",
strerror(errno));
freeClient(slave);
return;
}
sdsrange(slave->replpreamble,nwritten,-1);
if (sdslen(slave->replpreamble) == 0) {
sdsfree(slave->replpreamble);
slave->replpreamble = NULL;
/* fall through sending data. */
} else {
return;
}
}
// 开始发送 rdb 文件
lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
// 读取 RDB 数据
buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
if (buflen <= 0) {
redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
(buflen == 0) ? "premature EOF" : strerror(errno));
freeClient(slave);
return;
}
// 发送 rdb 数据到从节点
if ((nwritten = write(fd,buf,buflen)) == -1) {
if (errno != EAGAIN) {
redisLog(REDIS_WARNING,"Write error sending DB to slave: %s",
strerror(errno));
freeClient(slave);
}
return;
}
// 更新 offset
slave->repldboff += nwritten;
// 如果写入已经完成
if (slave->repldboff == slave->repldbsize) {
// 关闭 RDB 文件描述符
close(slave->repldbfd);
slave->repldbfd = -1;
// 删除之前绑定的写事件处理器
aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
// 只有在传输完 rdb 文件之后,从节点才算处于 online 状态
slave->replstate = REDIS_REPL_ONLINE;
// 更新响应时间
slave->repl_ack_time = server.unixtime;
// 在 BGSAVE 启动后的所有需要传播的数据都先保存在从节点的输出缓存中
// 现在可以开始发送这些数据了,注册发送缓冲区数据的写回调,个人感觉也许应该先检查一下
// 缓冲区非空再注册
if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
sendReplyToClient, slave) == AE_ERR) {
redisLog(REDIS_WARNING,"Unable to register writable event for slave bulk transfer: %s", strerror(errno));
freeClient(slave);
return;
}
// 刷新低延迟从节点的数量(slave->repl_ack_time 修改后都需要重新计算)
refreshGoodSlavesCount();
redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
}
}
增量同步
当有写命令需要传播的时候,主节点还需要将这些需要传播的数据发送到从节点:
/*
backlog 是一个循环数组,用于存储 BGSAVE 开始以后的增量修改。当有命令需要传播到从节点
会先写入到 backlog,然后再发送到从节点。因为循环数组大小有限,这也是是 PSYNC 对于超过
范围的 offset 无法支持。
这里再介绍一下 redis 中主节点用:
<replication_id> <offset> 表示数据库状态,任意两个节点,只要拥有相同的 replication id 和
offset,那么就可以说他们拥有相同的数据。
replication id 是主节点随机生成的字符串,offset 是增量数据的字节偏移量
*/
void feedReplicationBacklog(void *ptr, size_t len) {
unsigned char *p = ptr;
// server.master_repl_offset 代表全局偏移量,每次传播命令都累加
server.master_repl_offset += len;
// 环形 buffer ,每次写尽可能多的数据,并在到达尾部时将 idx 重置到头部
// 写满以后会覆盖之前的数据,但是没有关系,这个本来就是作为一个后备存储
// 传播的命令都会立马发送到客户端
while(len) {
// 从 idx 到 backlog 尾部的字节数
size_t thislen = server.repl_backlog_size - server.repl_backlog_idx;
// 如果 idx 到 backlog 尾部这段空间足以容纳要写入的内容
// 那么直接将写入数据长度设为 len
// 在将这些 len 字节复制之后,这个 while 循环将跳出
if (thislen > len) thislen = len;
// 将 p 中的 thislen 字节内容复制到 backlog
memcpy(server.repl_backlog+server.repl_backlog_idx,p,thislen);
// 更新 idx ,指向新写入的数据之后
server.repl_backlog_idx += thislen;
// 如果写入达到尾部,那么将索引重置到头部
if (server.repl_backlog_idx == server.repl_backlog_size)
server.repl_backlog_idx = 0;
// 减去已写入的字节数
len -= thislen;
// 将指针移动到已被写入数据的后面,指向未被复制数据的开头
p += thislen;
// 增加实际长度
server.repl_backlog_histlen += thislen;
}
// server.repl_backlog_histlen 代表当前可用的数据长度,最大不能超过 backlog 的容量
// 如果已经超过,代表写入的数据已经发生了覆盖
if (server.repl_backlog_histlen > server.repl_backlog_size)
server.repl_backlog_histlen = server.repl_backlog_size;
// server.repl_backlog_off 是 backlog 中最小的全局偏移值,当从节点进行 PSYNC 时
// 只有 offset 在 [server.repl_backlog_off,server.master_repl_offset) 内才
// 可能进行 partial resync
server.repl_backlog_off = server.master_repl_offset) -
server.repl_backlog_histlen + 1;
}
/*
当启动主从同步以后,主从节点需要持续同步增量数据
*/
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
listNode *ln;
listIter li;
int j, len;
char llstr[REDIS_LONGSTR_SIZE];
// backlog 为空,且没有从服务器,直接返回
if (server.repl_backlog == NULL && listLength(slaves) == 0) return;
// 如果有从节点,那么必然已经创建了 backlog 存储
redisAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL));
// 如果有需要的话,发送 SELECT 命令,指定数据库
if (server.slaveseldb != dictid) {
robj *selectcmd;
if (dictid >= 0 && dictid < REDIS_SHARED_SELECT_CMDS) {
selectcmd = shared.select[dictid];
} else {
int dictid_len;
dictid_len = ll2string(llstr,sizeof(llstr),dictid);
selectcmd = createObject(REDIS_STRING,
sdscatprintf(sdsempty(),
"*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
dictid_len, llstr));
}
// 将 SELECT 命令添加到 backlog
if (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd);
// 发送给所有从服务器
listRewind(slaves,&li);
while((ln = listNext(&li))) {
redisClient *slave = ln->value;
addReply(slave,selectcmd);
}
if (dictid < 0 || dictid >= REDIS_SHARED_SELECT_CMDS)
decrRefCount(selectcmd);
}
server.slaveseldb = dictid;
// 将命令写入到 backlog
if (server.repl_backlog) {
// 构造传播数据
char aux[REDIS_LONGSTR_SIZE+3];
aux[0] = '*';
len = ll2string(aux+1,sizeof(aux)-1,argc);
aux[len+1] = '\r';
aux[len+2] = '\n';
feedReplicationBacklog(aux,len+3);
for (j = 0; j < argc; j++) {
long objlen = stringObjectLen(argv[j]);
// 将参数从对象转换成协议格式
aux[0] = '$';
len = ll2string(aux+1,sizeof(aux)-1,objlen);
aux[len+1] = '\r';
aux[len+2] = '\n';
feedReplicationBacklog(aux,len+3);
feedReplicationBacklogWithObject(argv[j]);
feedReplicationBacklog(aux+len+1,2);
}
}
// 将命令发送给所有从节点
listRewind(slaves,&li);
while((ln = listNext(&li))) {
// 指向从服务器
redisClient *slave = ln->value;
// 不要给正在等待 BGSAVE 开始的从服务器发送命令,因为这个数据对他们来说是无用非法的
// 这种节点的缓冲区需要保存的是对他么合法的 BGSAVE 开始以后的增量数据
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
// 添加数据到缓冲区,这些数据只有在已经完成 rdb 传输以后才会发送给从节点
addReplyMultiBulkLen(slave,argc);
for (j = 0; j < argc; j++)
addReplyBulk(slave,argv[j]);
}
}
Partial Resync
Partial Resync 是为了避优化已经同步过的主从节点,在链接短暂断开以后重新同步时的开销和效率。假设从节点与主节点同步以后,因为网络原因,与主节点短暂断开了链接,期间从节点会丢失一些主节点的增量更新数据。当主从再次链接以后,如果发现主节点还保存有从节点丢失的信息(在 backlog 中),这次同步就可以跳过刚刚介绍的全量同步的步骤,这无疑可以大大提升同步效率。
需要指出 PSYNC 在一些老版本的 redis 中是不支持的。笔者看的源码 3.0,对照了下现在 redis 5.x 的代码,其实相关部分改动有很多,redis 开发团队对 PSYNC 又做了相当多的优化。但是对基础想法的理解的重要性,要远高于优化,所以笔者还是以 3.0 版本代码为基础讲解:
/*
收到 PSYNC 命令后,主节点会尝试看看可不可以进行 parital resync,如果不行的话
需要重新进行全量+增量同步过程。PSYNC 命令格式如下:
PSYNC replication_id offset
代表从节点希望与 replication_id 主节点,从 offset 偏移量开始重新同步数据
*/
int masterTryPartialResynchronization(redisClient *c) {
long long psync_offset, psync_len;
char *master_runid = c->argv[1]->ptr;
char buf[128];
int buflen;
// 检查 master id 是否和 runid 一致,只有一致的情况下才有 PSYNC 的可能
if (strcasecmp(master_runid, server.runid)) {
if (master_runid[0] != '?') {
// replication_id 不匹配,这种情况是因为这个主节点是由从节点提升而来或者主节点重启
// 而这些要求 PSYNC 的节点保存的仍然是之前主节点的 replication_id
redisLog(REDIS_NOTICE,"Partial resynchronization not accepted: "
"Runid mismatch (Client asked for runid '%s', my runid is '%s')",
master_runid, server.runid);
} else {
// replication_id 是 ? 代表从节点自己指定了全量同步
redisLog(REDIS_NOTICE,"Full resync requested by slave.");
}
// 需要 full resync
goto need_full_resync;
}
// 取出 psync_offset 参数,从节点要求从这个 offset 开始同步数据
if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) !=
REDIS_OK) goto need_full_resync;
/*
backlog 保存一定 offset 范围内的增量数据,如果从节点要求的 offset
在 backlog 保存的 offset 范围内,就可以进行 partial resync
*/
if (!server.repl_backlog ||
psync_offset < server.repl_backlog_off ||
psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen))
{
// 执行 FULL RESYNC
redisLog(REDIS_NOTICE,
"Unable to partial resync with the slave for lack of backlog (Slave request was: %lld).", psync_offset);
if (psync_offset > server.master_repl_offset) {
redisLog(REDIS_WARNING,
"Warning: slave tried to PSYNC with an offset that is greater than the master replication offset.");
}
goto need_full_resync;
}
// 可以进行 partial resync 的从节点,设置为 ONLINE 状态,加入到从节点列表中
c->flags |= REDIS_SLAVE;
c->replstate = REDIS_REPL_ONLINE;
c->repl_ack_time = server.unixtime;
listAddNodeTail(server.slaves,c);
// 向从服务器发送一个同步 +CONTINUE ,表示 PSYNC 可以执行
buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n");
if (write(c->fd,buf,buflen) != buflen) {
freeClientAsync(c);
return REDIS_OK;
}
// 发送 backlog 中的内容(也即是从服务器缺失的那些内容)到从服务器
psync_len = addReplyReplicationBacklog(c,psync_offset);
redisLog(REDIS_NOTICE,
"Partial resynchronization request accepted. Sending %lld bytes of backlog starting from offset %lld.", psync_len, psync_offset);
// 刷新低延迟从服务器的数量
refreshGoodSlavesCount();
return REDIS_OK; /* The caller can return, no full resync needed. */
need_full_resync:
// psync 从最新 offset 开始
psync_offset = server.master_repl_offset;
// 如果还没有创建 repl_backlog,offset 再加1
if (server.repl_backlog == NULL) psync_offset++;
// 发送 +FULLRESYNC ,表示需要完整重同步
buflen = snprintf(buf,sizeof(buf),"+FULLRESYNC %s %lld\r\n",
server.runid,psync_offset);
if (write(c->fd,buf,buflen) != buflen) {
freeClientAsync(c);
return REDIS_OK;
}
return REDIS_ERR;
}
// partial resync,发送从节点缺失数据
long long addReplyReplicationBacklog(redisClient *c, long long offset) {
long long j, skip, len;
redisLog(REDIS_DEBUG, "[PSYNC] Slave request offset: %lld", offset);
if (server.repl_backlog_histlen == 0) {
redisLog(REDIS_DEBUG, "[PSYNC] Backlog history len is zero");
return 0;
}
redisLog(REDIS_DEBUG, "[PSYNC] Backlog size: %lld",
server.repl_backlog_size);
redisLog(REDIS_DEBUG, "[PSYNC] First byte: %lld",
server.repl_backlog_off);
redisLog(REDIS_DEBUG, "[PSYNC] History len: %lld",
server.repl_backlog_histlen);
redisLog(REDIS_DEBUG, "[PSYNC] Current index: %lld",
server.repl_backlog_idx);
// server.repl_backlog_off 是 backlog 内保存的 olddest offset
// offset - server.repl_backlog_off 即我们需要跳过的字节数
skip = offset - server.repl_backlog_off;
redisLog(REDIS_DEBUG, "[PSYNC] Skipping: %lld", skip);
// 将 j 指向 offset 对应在 backlog 内的地址
j = (server.repl_backlog_idx +
(server.repl_backlog_size-server.repl_backlog_histlen)) %
server.repl_backlog_size;
redisLog(REDIS_DEBUG, "[PSYNC] Index of first byte: %lld", j);
j = (j + skip) % server.repl_backlog_size;
// 发送从节点缺失的数据
len = server.repl_backlog_histlen - skip;
redisLog(REDIS_DEBUG, "[PSYNC] Reply total length: %lld", len);
while(len) {
long long thislen =
((server.repl_backlog_size - j) < len) ?
(server.repl_backlog_size - j) : len;
redisLog(REDIS_DEBUG, "[PSYNC] addReply() length: %lld", thislen);
addReplySds(c,sdsnewlen(server.repl_backlog + j, thislen));
len -= thislen;
j = 0;
}
return server.repl_backlog_histlen - skip;
}
Slave
不同于主节点,从节点既要作为服务端,在主从同步的过程中,还要作为一个客户端与主节点通信。通过在配置文件中添加 salveof HOST PORT
指令,节点启动以后即会成为指定主节点的从节点:
// 解析配置,当 server.masterhost 非空则代表这个节点是从节点
} else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
slaveof_linenum = linenum;
server.masterhost = sdsnew(argv[1]);
server.masterport = atoi(argv[2]);
server.repl_state = REDIS_REPL_CONNECT;
}
链接主节点
主从同步相关的逻辑主要集中在 replicationCron 中,这个函数每 1s 调用一次:
/*
取消链接,这里的链接不是指 TCP 链接,而是主从同步链接。
*/
void undoConnectWithMaster(void) {
int fd = server.repl_transfer_s;
// 连接必须处于正在连接状态
redisAssert(server.repl_state == REDIS_REPL_CONNECTING ||
server.repl_state == REDIS_REPL_RECEIVE_PONG);
aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);
close(fd);
server.repl_transfer_s = -1;
// 回到 CONNECT 状态
server.repl_state = REDIS_REPL_CONNECT;
}
/*repl_state 状态转移
REDIS_REPL_CONNECT -> REDIS_REPL_CONNECTING -> REDIS_REPL_RECEIVE_PONG
|
REDIS_REPL_CONNECTED <- REDIS_REPL_TRANSFER
*/
void replicationCron(void) {
// 链接到主节点超时,取消链接
if (server.masterhost &&
(server.repl_state == REDIS_REPL_CONNECTING ||
server.repl_state == REDIS_REPL_RECEIVE_PONG) &&
(time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
{
redisLog(REDIS_WARNING,"Timeout connecting to the MASTER...");
// 取消连接
undoConnectWithMaster();
}
// RDB 文件的传送已超时
if (server.masterhost && server.repl_state == &&
(time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
{
redisLog(REDIS_WARNING,"Timeout receiving bulk data from MASTER... If the problem persists try to set the 'repl-timeout' parameter in redis.conf to a larger value.");
// 停止传送,并删除临时文件
replicationAbortSyncTransfer();
}
// 从服务器曾经连接上主服务器,但现在超时(当重新链接的时候,就会触发 PSYNC)
if (server.masterhost && server.repl_state == REDIS_REPL_CONNECTED &&
(time(NULL)-server.master->lastinteraction) > server.repl_timeout)
{
redisLog(REDIS_WARNING,"MASTER timeout: no data nor PING received...");
// 释放主服务器
freeClient(server.master);
}
// 还没有与主节点建立连接
if (server.repl_state == REDIS_REPL_CONNECT) {
redisLog(REDIS_NOTICE,"Connecting to MASTER %s:%d",
server.masterhost, server.masterport);
// 非阻塞的链接主节点
if (connectWithMaster() == REDIS_OK) {
redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync started");
}
}
// 如果已经建立了链接,而且主节点支持 PSYNC,发送 ACK 给主节点
if (server.masterhost && server.master &&
!(server.master->flags & REDIS_PRE_PSYNC))
replicationSendAck();
// 如果有从服务器,那么隔一段时间发送一个 PING 指令作为心跳,让对方知道我们没有掉线
if (!(server.cronloops % (server.repl_ping_slave_period * server.hz))) {
listIter li;
listNode *ln;
robj *ping_argv[1];
// 向从节点传播 PING
ping_argv[0] = createStringObject("PING",4);
replicationFeedSlaves(server.slaves, server.slaveseldb, ping_argv, 1);
decrRefCount(ping_argv[0]);
// 因为对于 REDIS_REPL_WAIT_BGSAVE_START 和 REDIS_REPL_WAIT_BGSAVE_END 状态
// 的从节点,其不会相应 PING 命令,所以我们发送一个 \n 给从节点,这个没有实际作用,只是
// 让对方知道我们没有掉线
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
redisClient *slave = ln->value;
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START ||
slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
if (write(slave->fd, "\n", 1) == -1) {
/* Don't worry, it's just a ping. */
}
}
}
}
// 断开超时从服务器
if (listLength(server.slaves)) {
listIter li;
listNode *ln;
// 遍历所有从服务器
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
redisClient *slave = ln->value;
// 略过未 ONLINE 的从服务器
if (slave->replstate != REDIS_REPL_ONLINE) continue;
// 不检查旧版的从服务器
if (slave->flags & REDIS_PRE_PSYNC) continue;
// 释放超时从服务器
if ((server.unixtime - slave->repl_ack_time) > server.repl_timeout)
{
char ip[REDIS_IP_STR_LEN];
int port;
if (anetPeerToString(slave->fd,ip,sizeof(ip),&port) != -1) {
redisLog(REDIS_WARNING,
"Disconnecting timedout slave: %s:%d",
ip, slave->slave_listening_port);
}
// 释放
freeClient(slave);
}
}
}
// 没有任何从服务器,等待一定时间后就释放 backlog 资源(等待一段时间是为了防止从节点只是暂时掉线)
if (listLength(server.slaves) == 0 && server.repl_backlog_time_limit &&
server.repl_backlog)
{
time_t idle = server.unixtime - server.repl_no_slaves_since;
if (idle > server.repl_backlog_time_limit) {
// 释放
freeReplicationBacklog();
redisLog(REDIS_NOTICE,
"Replication backlog freed after %d seconds "
"without connected slaves.",
(int) server.repl_backlog_time_limit);
}
}
// 在没有任何从服务器,AOF 关闭的情况下,清空 script 缓存
if (listLength(server.slaves) == 0 &&
server.aof_state == REDIS_AOF_OFF &&
listLength(server.repl_scriptcache_fifo) != 0)
{
replicationScriptCacheFlush();
}
// 更新符合给定延迟值的从服务器的数量
refreshGoodSlavesCount();
}
int connectWithMaster(void) {
int fd;
// 非阻塞地连接主服务器,注意如果 connect 的套接字是非阻塞的,那么 connect 返回的时候
// 不代表已经完成了 TCP 三次握手
fd = anetTcpNonBlockConnect(NULL,server.masterhost,server.masterport);
if (fd == -1) {
redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
strerror(errno));
return REDIS_ERR;
}
// 监听主服务器 fd 的读和写事件,并绑定文件事件处理器,当完成 3 次握手,fd 应该立马变为可写
// 就会调用 syncWithMaster
if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) ==
AE_ERR)
{
close(fd);
redisLog(REDIS_WARNING,"Can't create readable event for SYNC");
return REDIS_ERR;
}
// 初始化统计变量
server.repl_transfer_lastio = server.unixtime;
server.repl_transfer_s = fd;
// 将状态改为 REDIS_REPL_CONNECTING
server.repl_state = REDIS_REPL_CONNECTING;
return REDIS_OK;
}
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
char tmpfile[256], *err;
int dfd, maxtries = 5;
int sockerr = 0, psync_result;
socklen_t errlen = sizeof(sockerr);
REDIS_NOTUSED(el);
REDIS_NOTUSED(privdata);
REDIS_NOTUSED(mask);
// 如果本从节点在与之前主节点建立连接以后,被提升为主节点,那么 server.repl_state 被设置为 REDIS_REPL_NONE
// 这种情况下,我们立刻关闭套接字并且退出,因为我们已经不再是从节点
if (server.repl_state == REDIS_REPL_NONE) {
close(fd);
return;
}
// 检查套接字错误,非阻塞的 connect 需要检查错误
if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &sockerr, &errlen) == -1)
sockerr = errno;
if (sockerr) {
aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);
redisLog(REDIS_WARNING,"Error condition on socket for SYNC: %s",
strerror(sockerr));
goto error;
}
// 如果处于 REDIS_REPL_CONNECTING,代表我们需要进行一次同步(不管是全量还是 partial resync)
// 在此之前,先阻塞的发送一个 PING,这个 PING 没有额外的含义,只是希望确定与主节点的网路情况良好
if (server.repl_state == REDIS_REPL_CONNECTING) {
redisLog(REDIS_NOTICE,"Non blocking connect for SYNC fired the event.");
// 删除写事件监听,但是仍然保留读事件,因为我们要在这个函数里面读取主节点的 PONG 相应
aeDeleteFileEvent(server.el,fd,AE_WRITABLE);
// 更新状态
server.repl_state = REDIS_REPL_RECEIVE_PONG;
// 同步发送 PING,用 poll 实现了一个支持超时的阻塞写,具体代码不分析了,比较简单
syncWrite(fd,"PING\r\n",6,100);
// 返回,下次这个函数被调用的时候是主节点发回相应的时候
return;
}
// 主节点对之前的 PING 进行了相应
if (server.repl_state == REDIS_REPL_RECEIVE_PONG) {
char buf[1024];
// 读事件也删除,后续的读事件回调不会用到这个函数
aeDeleteFileEvent(server.el,fd,AE_READABLE);
// 阻塞的读取主节点响应
buf[0] = '\0';
if (syncReadLine(fd,buf,sizeof(buf),
server.repl_syncio_timeout*1000) == -1)
{
redisLog(REDIS_WARNING,
"I/O error reading PING reply from master: %s",
strerror(errno));
goto error;
}
/* 合法响应只有3种:
1. +PONG
2. —NOAUTH
3. -ERR operation not permitted
后面两种需要进行鉴权后面会处理,其余响应均为非法
*/
if (buf[0] != '+' &&
strncmp(buf,"-NOAUTH",7) != 0 &&
strncmp(buf,"-ERR operation not permitted",28) != 0)
{
// 接收到未验证错误
redisLog(REDIS_WARNING,"Error reply to PING from master: '%s'",buf);
goto error;
} else {
redisLog(REDIS_NOTICE,
"Master replied to PING, replication can continue...");
}
}
// 进行身份验证,阻塞的发送 AUTH 命令,并且等待对方回应
if(server.masterauth) {
err = sendSynchronousCommand(fd,"AUTH",server.masterauth,NULL);
if (err[0] == '-') {
// AUTH 失败
redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",err);
sdsfree(err);
goto error;
}
sdsfree(err);
}
// 将从节点作为 redis 服务器的节点通知给主节点,从节点也是可以服务客户端的
{
sds port = sdsfromlonglong(server.port);
err = sendSynchronousCommand(fd,"REPLCONF","listening-port",port,
NULL);
sdsfree(port);
// 老版本的 redis 不支持这个命令,所以不是致命错误,记录一下即可
if (err[0] == '-') {
redisLog(REDIS_NOTICE,"(Non critical) Master does not understand REPLCONF listening-port: %s", err);
}
sdsfree(err);
}
// 尝试进行 PSYNC
psync_result = slaveTryPartialResynchronization(fd);
// 可以执行 partial resync,退出(新的事件处理函数已经绑定好了)
if (psync_result == PSYNC_CONTINUE) {
redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Master accepted a Partial Resynchronization.");
return;
}
// 如果主服务器并不支持 PSYNC,我们还需要额外发送一个 SYNC 命令来请求同步
if (psync_result == PSYNC_NOT_SUPPORTED) {
redisLog(REDIS_NOTICE,"Retrying with SYNC...");
if (syncWrite(fd,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) {
redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
strerror(errno));
goto error;
}
}
// 不管是不支持 PSYNC 还是无法 partial resync,这里都需要进行全量同步,我们需要准备
// 接收主节点的 rdb 文件,大家一个临时文件用于保存 rdb
while(maxtries--) {
snprintf(tmpfile,256,
"temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());
dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
if (dfd != -1) break;
sleep(1);
}
if (dfd == -1) {
redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
goto error;
}
// 设置一个读事件处理器,来读取主服务器的 RDB 文件
if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)
== AE_ERR)
{
redisLog(REDIS_WARNING,
"Can't create readable event for SYNC: %s (fd=%d)",
strerror(errno),fd);
goto error;
}
// 设置状态
server.repl_state = REDIS_REPL_TRANSFER;
// 更新统计信息
server.repl_transfer_size = -1;
server.repl_transfer_read = 0;
server.repl_transfer_last_fsync_off = 0;
server.repl_transfer_fd = dfd;
server.repl_transfer_lastio = server.unixtime;
server.repl_transfer_tmpfile = zstrdup(tmpfile);
return;
error:
close(fd);
server.repl_transfer_s = -1;
server.repl_state = REDIS_REPL_CONNECT;
return;
}
全量同步
我们在分析主节点代码时就知道全量同步就是主节点发送 rdb 文件的过程。而从节点在需要全量同步以后会注册读回调函数来处理即将收到的 rdb 文件:
#define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */
void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
char buf[4096];
ssize_t nread, readlen;
off_t left;
REDIS_NOTUSED(el);
REDIS_NOTUSED(privdata);
REDIS_NOTUSED(mask);
// 全量同步最开始发送的是 rdb 文件的大小,如果 server.repl_transfer_size == -1
// 我们需要先解析 rdb 文件大小
if (server.repl_transfer_size == -1) {
// 调用读函数
if (syncReadLine(fd,buf,1024,server.repl_syncio_timeout*1000) == -1) {
redisLog(REDIS_WARNING,
"I/O error reading bulk count from MASTER: %s",
strerror(errno));
goto error;
}
// 出错?
if (buf[0] == '-') {
redisLog(REDIS_WARNING,
"MASTER aborted replication with an error: %s",
buf+1);
goto error;
} else if (buf[0] == '\0') {
// 还记得在 replicationCron 中对与 BGSAVE_START 和 BGSAVE_END 的从节点发送的 \n 吗
server.repl_transfer_lastio = server.unixtime;
return;
} else if (buf[0] != '$') {
// 读入的内容出错,和协议格式不符
redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$' (we received '%s'), are you sure the host and port are right?", buf);
goto error;
}
// 分析 RDB 文件大小
server.repl_transfer_size = strtol(buf+1,NULL,10);
redisLog(REDIS_NOTICE,
"MASTER <-> SLAVE sync: receiving %lld bytes from master",
(long long) server.repl_transfer_size);
// 不明白为什么这里就要退出了,其实可以继续读吧
return;
}
// 开始正式读取 rdb 文件的内容(这里不太明白了,按说应该处理 EAGAIN 等情况才对)
left = server.repl_transfer_size - server.repl_transfer_read;
readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf);
nread = read(fd,buf,readlen);
if (nread <= 0) {
redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
(nread == -1) ? strerror(errno) : "connection lost");
replicationAbortSyncTransfer();
return;
}
// 写入到临时 rdb 文件
server.repl_transfer_lastio = server.unixtime;
if (write(server.repl_transfer_fd,buf,nread) != nread) {
redisLog(REDIS_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> SLAVE synchronization: %s", strerror(errno));
goto error;
}
// 加上刚读取好的字节数
server.repl_transfer_read += nread;
// 定期将读入的文件 fsync 到磁盘,以免 buffer 太多,一下子写入时撑爆 IO
if (server.repl_transfer_read >=
server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC)
{
off_t sync_size = server.repl_transfer_read -
server.repl_transfer_last_fsync_off;
rdb_fsync_range(server.repl_transfer_fd,
server.repl_transfer_last_fsync_off, sync_size);
server.repl_transfer_last_fsync_off += sync_size;
}
// 检查 RDB 是否已经传送完毕
if (server.repl_transfer_read == server.repl_transfer_size) {
// 完毕,将临时文件改名为 dump.rdb
if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) {
redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
replicationAbortSyncTransfer();
return;
}
// 先清空旧数据库
redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Flushing old data");
signalFlushedDb(-1);
emptyDb(replicationEmptyDbCallback);
// 先删除主服务器的读事件监听,因为 rdbLoad() 会调用 rdbLoadProgressCallback
// 这个函数会调用 eventLoop 处理事件,如果不删除,会导致递归调用
aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE);
// 载入 RDB
if (rdbLoad(server.rdb_filename) != REDIS_OK) {
redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
replicationAbortSyncTransfer();
return;
}
// 关闭临时文件
zfree(server.repl_transfer_tmpfile);
close(server.repl_transfer_fd);
// 为主节点构造客户端资源,createClient 中会绑定好读事件
server.master = createClient(server.repl_transfer_s);
// 标记这个客户端为主服务器
server.master->flags |= REDIS_MASTER;
// 标记它为已验证身份
server.master->authenticated = 1;
// 更新复制状态
server.repl_state = REDIS_REPL_CONNECTED;
// 设置主服务器的复制偏移量
server.master->reploff = server.repl_master_initial_offset;
// 保存主服务器的 RUN ID
memcpy(server.master->replrunid, server.repl_master_runid,
sizeof(server.repl_master_runid));
// 如果 offset 被设置为 -1 ,那么表示主服务器的版本低于 2.8
// 无法使用 PSYNC ,所以需要设置相应的标识值
if (server.master->reploff == -1)
server.master->flags |= REDIS_PRE_PSYNC;
redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Finished with success");
// 重启 AOF,这会导致进行一次 AOF 重写
if (server.aof_state != REDIS_AOF_OFF) {
int retry = 10;
// 关闭
stopAppendOnly();
// 再重启
while (retry-- && startAppendOnly() == REDIS_ERR) {
redisLog(REDIS_WARNING,"Failed enabling the AOF after successful master synchronization! Trying it again in one second.");
sleep(1);
}
if (!retry) {
redisLog(REDIS_WARNING,"FATAL: this slave instance finished the synchronization with its master, but the AOF can't be turned on. Exiting now.");
exit(1);
}
}
}
return;
error:
replicationAbortSyncTransfer();
return;
}
增量同步
增量同步其实就是从节点不断接收主节点传播的写指令的过程
Partial Resync
当与主节点进行同步的时候,从节点会优先尝试进行 partial resync:
#define PSYNC_CONTINUE 0
#define PSYNC_FULLRESYNC 1
#define PSYNC_NOT_SUPPORTED 2
int slaveTryPartialResynchronization(int fd) {
char *psync_runid;
char psync_offset[32];
sds reply;
// 如果主节点支持 PSYNC,那么我们不管是进行了 FULLRESYNC 还是 partial resync
// repl_master_initial_offset 都不会是 -1.只有对于不支持 PSYNC 的主节点
// repl_master_initial_offset 才会保持1
server.repl_master_initial_offset = -1;
if (server.cached_master) {
/*
之前就提到了,resync 是发生在曾经同步过的主从之间,如果有曾经重新同步过的主节点
我们将之前的 replication id 和 offset 发送过去请求 PSYNC
*/
psync_runid = server.cached_master->replrunid;
snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1);
redisLog(REDIS_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_runid, psync_offset);
} else {
// 缓存不存在,replication id 设置为 ? 请求全量同步
redisLog(REDIS_NOTICE,"Partial resynchronization not possible (no cached master)");
psync_runid = "?";
memcpy(psync_offset,"-1",3);
}
// 向主服务器发送 PSYNC 命令,读取其响应
reply = sendSynchronousCommand(fd,"PSYNC",psync_runid,psync_offset,NULL);
if (!strncmp(reply,"+FULLRESYNC",11)) {
// 虽然支持 PSYNC 命令,但是无法进行 partial resync,执行全量同步
char *runid = NULL, *offset = NULL;
// 分析并记录主服务器的 run id
runid = strchr(reply,' ');
if (runid) {
runid++;
offset = strchr(runid,' ');
if (offset) offset++;
}
// 检查 run id 的合法性
if (!runid || !offset || (offset-runid-1) != REDIS_RUN_ID_SIZE) {
redisLog(REDIS_WARNING,
"Master replied with wrong +FULLRESYNC syntax.");
// 主服务器支持 PSYNC ,但是却发来了异常的 run id
// 只好将 run id 设为 0 ,让下次 PSYNC 时失败
memset(server.repl_master_runid,0,REDIS_RUN_ID_SIZE+1);
} else {
// 保存 run id
memcpy(server.repl_master_runid, runid, offset-runid-1);
server.repl_master_runid[REDIS_RUN_ID_SIZE] = '\0';
// 以及 initial offset
server.repl_master_initial_offset = strtoll(offset,NULL,10);
// 打印日志,这是一个 FULL resync
redisLog(REDIS_NOTICE,"Full resync from master: %s:%lld",
server.repl_master_runid,
server.repl_master_initial_offset);
}
// 要开始完整重同步,缓存中的 master 已经没用了,清除它
replicationDiscardCachedMaster();
sdsfree(reply);
// 返回状态
return PSYNC_FULLRESYNC;
}
if (!strncmp(reply,"+CONTINUE",9)) {
// 可以进行 partial resync,我们会收到主节点从 offset 之后的数据
redisLog(REDIS_NOTICE,
"Successful partial resynchronization with master.");
sdsfree(reply);
// 将缓存中的 master 设为当前 master
replicationResurrectCachedMaster(fd);
// 返回状态
return PSYNC_CONTINUE;
}
// 主节点不支持 PSYNC 命令
if (strncmp(reply,"-ERR",4)) {
/* If it's not an error, log the unexpected event. */
redisLog(REDIS_WARNING,
"Unexpected reply to PSYNC from master: %s", reply);
} else {
redisLog(REDIS_NOTICE,
"Master does not support PSYNC or is in "
"error state (reply: %s)", reply);
}
sdsfree(reply);
// 清楚缓存的主服务器
replicationDiscardCachedMaster();
// 主服务器不支持 PSYNC
return PSYNC_NOT_SUPPORTED;
}
void replicationResurrectCachedMaster(int newfd) {
// 设置 master
server.master = server.cached_master;
server.cached_master = NULL;
server.master->fd = newfd;
server.master->flags &= ~(REDIS_CLOSE_AFTER_REPLY|REDIS_CLOSE_ASAP);
server.master->authenticated = 1;
server.master->lastinteraction = server.unixtime;
// 回到已连接状态
server.repl_state = REDIS_REPL_CONNECTED;
// 将主节点加入到客户端列表中
listAddNodeTail(server.clients,server.master);
// 后续只需要想处理其他客户端一样处理主节点传播的消息即可
if (aeCreateFileEvent(server.el, newfd, AE_READABLE,
readQueryFromClient, server.master)) {
redisLog(REDIS_WARNING,"Error resurrecting the cached master, impossible to add the readable handler: %s", strerror(errno));
freeClientAsync(server.master); /* Close ASAP. */
}
// 如果有需要发送给主节点的消息,注册写回调
if (server.master->bufpos || listLength(server.master->reply)) {
if (aeCreateFileEvent(server.el, newfd, AE_WRITABLE,
sendReplyToClient, server.master)) {
redisLog(REDIS_WARNING,"Error resurrecting the cached master, impossible to add the writable handler: %s", strerror(errno));
freeClientAsync(server.master); /* Close ASAP. */
}
}
}
当我们要释放某个客户端时候,会检查这个客户端是不是主节点,如果是的话,就会将其缓存下来,用于后续 PSYNC:
void freeClient(redisClient *c) {
// 略
if (server.master && c->flags & REDIS_MASTER) {
redisLog(REDIS_WARNING,"Connection with master lost.");
if (!(c->flags & (REDIS_CLOSE_AFTER_REPLY|
REDIS_CLOSE_ASAP|
REDIS_BLOCKED|
REDIS_UNBLOCKED)))
{
replicationCacheMaster(c);
return;
}
}
}
void replicationCacheMaster(redisClient *c) {
listNode *ln;
redisAssert(server.master != NULL && server.cached_master == NULL);
redisLog(REDIS_NOTICE,"Caching the disconnected master state.");
// 从客户端链表中移除主服务器
ln = listSearchKey(server.clients,c);
redisAssert(ln != NULL);
listDelNode(server.clients,ln);
// 缓存 master
server.cached_master = server.master;
// 删除事件监视,关闭 socket
aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
close(c->fd);
c->fd = -1;
// 删除 peerid
if (c->peerid) {
sdsfree(c->peerid);
c->peerid = NULL;
}
replicationHandleMasterDisconnection();
}
void replicationHandleMasterDisconnection(void) {
// 设置 replication 状态,重置 master
server.master = NULL;
server.repl_state = REDIS_REPL_CONNECT;
server.repl_down_since = server.unixtime;
// 如果这个从节点本身有从节点,断开所有与从节点的链接
if (server.masterhost != NULL) disconnectSlaves();
}
我们要注意,数据的同步是单向的,只会从主节点到从节点,任何发生在从节点的写入,最终都会丢失。redis 并不禁止从节点处理写请求。
后续优化
在最新的 redis 代码中针对主从同步还做了很多优化,比如:
- 如果主节点是由从节点提升而来,那么它会保存之前主节点的信息,当从节点使用之前主节点的 replication id 进行 PSYNC 的时候,partial resync 仍然可能进行
- 全量同步时,主节点将 rdb 落盘再发送给从节点是没有必要的,后面的 redis 支持 diskless replication
总结
- 主从同步时数据库服务分布式部署必须面对的问题,redis 使用异步同步,达到最终一致性,但是这也导致了永远存在一个丢失数据的时间窗口(单点也有这个问题)
- redis 使用 PSYNC 机制来降低 resync 的成本
- 从节点本身也可以有自己的从节点,但是使用场景很少
- 通过添加从节点,我们可以扩展 redis 的服务能力,比如添加 read-only 从节点,将读请求分发到从节点,来减轻主节点压力。但是数据同步是单向的,从节点上发生的写入,最终都会丢失