redis 源码系列(17):分身术 --- replication

单点服务在生产环境是绝对无法接受的,但是数据库服务,要实现多节点或者说分布式部署,面临的问题比 stateless 服务要多的多。数据的同步方式、一致性和可用性的妥协诸多限制,必须都加以考虑。

今天我们来学习一下 redis 主从同步相关内容,本节内容是 redis 实现高可用、数据安全、数据分区的基石。如果在节点之间没有一个可靠的数据同步方法,那么上述的一切都成为空中阁楼。

主节点在任意时刻只有一个,从节点可以有若干个。主从节点需要保持链接,主节点异步的将数据同步到从节点。

Master

主从同步,就是将主节点的数据同步到从节点。同步的大体流程如下:

  1. 主节点在接受到同步请求后,与从节点进行全量同步,启动 BGSAVE (如果之前已经有可用 BGSAVE 在执行,即不需要启动)
  2. 主节点需要将后续导致数据变化的命令(或者数据过期)发送到从节点,持续同步数据

全量同步

主节点在启动的时候,并不知道从节点的任何信息。当收到 sync 或者 psync 命令后,与从节点进行开始同步:

void syncCommand(redisClient *c) {
    // 已经是 SLAVE ,或者处于 MONITOR 模式,返回
    if (c->flags & REDIS_SLAVE) return;
    // 如果这是一个从服务器,但与主服务器的连接仍未就绪,那么拒绝 SYNC
    // redis 从节点也可以有自己的从节点,这里不展开讨论
    if (server.masterhost && server.repl_state != REDIS_REPL_CONNECTED) {
        addReplyError(c,"Can't SYNC while not connected with my master");
        return;
    }
    // 要进行 sync 或者 psync 的从节点的发送缓冲区必须是空的。因为我们可能会将
    // 这个从节点的发送缓冲区(存储的是 BGSAVE 以后的脏数据)拷贝到其他从节点的输出缓冲区
    if (listLength(c->reply) != 0 || c->bufpos != 0) {
        addReplyError(c,"SYNC and PSYNC are invalid with pending output");
        return;
    }
    redisLog(REDIS_NOTICE,"Slave asks for synchronization");
		/*
			解释一下 psync:
			psync 代表的是 partial sync,即部分同步。这个机制是为了当已经同步过的主从节点之间
			因为某些原因断开链接后,当链接重新建立以后,需要重新开始同步数据时,可以避免不必要的
			全量同步。
		*/
    if (!strcasecmp(c->argv[0]->ptr,"psync")) {
        // 尝试进行 PSYNC
        if (masterTryPartialResynchronization(c) == REDIS_OK) {
            // 可执行 PSYNC
            server.stat_sync_partial_ok++;
            return; /* No full resync needed, return. */
        } else {
            // 不可执行 PSYNC
            char *master_runid = c->argv[1]->ptr;
          	// replication id 为 ? 是强制要求全量同步的意思,所以不需要统计到错误里面
            if (master_runid[0] != '?') server.stat_sync_partial_err++;
        }
    } else {
        // 旧版实现,设置标识,避免接收 REPLCONF ACK 
        c->flags |= REDIS_PRE_PSYNC;
    }
    // 执行 full resynchronization ,增加计数
    server.stat_sync_full++;
    // 检查是否有 BGSAVE 在执行
    if (server.rdb_child_pid != -1) {
				// 如果有正在进行的 BGSAVE,我们还需要检查这个 BGSAVE 的数据是不是可以用于与从节点同步数据
        // 为什么会不适合呢?如果住节点的 BGSAVE 发生在有从节点与其链接前,主节点不会把 BGSAVE 启动
        // 以后的数据保存下来(写入到从节点的输出缓冲区),这样会导致 BGSAVE 启动以后的增量写丢失,
        // 这种 BGSAVE 产生的 rdb 文件是无法用于与从节点同步数据使用的
        redisClient *slave;
        listNode *ln;
        listIter li;
				// 只要之前已经有处于 REDIS_REPL_WAIT_BGSAVE_END 的从节点,说明这个 BGSAVE 启动以后的
        // 增量写数据已经保存下来了,所以不需要重新启动 BGSAVE
        listRewind(server.slaves,&li);
        while((ln = listNext(&li))) {
            slave = ln->value;
            if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
        }

        if (ln) {
            // 幸运的情况,可以使用目前 BGSAVE 所生成的 RDB,我们直接把这个等待 BGSAVE 完成的
            // 从节点的输出缓冲区数据拷贝到当前节点的输出缓冲区(里面存储的就是同步数据,这也是上
            // 面不允许进行 SYNC 的从节点输出缓冲区内有其他内容的原因
            copyClientOutputBuffer(c,slave);
            c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
            redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
        } else {
            // 启动这个 BGSAVE 的时候,还没有从节点,所以需要等待下一个 BGSAVE 启动
            c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
            redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
        }
    } else {
        // 没有 BGSAVE 在进行,开始一个新的 BGSAVE
        redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
        if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) {
            redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
            addReplyError(c,"Unable to perform background save");
            return;
        }
        // 设置状态
        c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
        // 刷新脚本缓存
        replicationScriptCacheFlush();
    }
    // 启用 Nagle 算法,失败也无所谓,所以不检查错误
    if (server.repl_disable_tcp_nodelay)
        anetDisableTcpNoDelay(NULL, c->fd); /* Non critical if it fails. */
    c->repldbfd = -1;
    c->flags |= REDIS_SLAVE;
    // 确保后续传播到从节点的数据会发送 SELECT 命令
    server.slaveseldb = -1; 
    // 添加到 slave 列表中
    listAddNodeTail(server.slaves,c);
    // 如果是第一个 slave ,那么初始化 backlog,只有初始化了 backlog,主节点才会传播命令到从节点
    if (listLength(server.slaves) == 1 && server.repl_backlog == NULL)
        createReplicationBacklog();
    return;
}
/*
	当 BGSAVE 完成时,会调用这个函数
*/
void backgroundSaveDoneHandler(int exitcode, int bysignal) {
    if (!bysignal && exitcode == 0) {
         // BGSAVE 成功
        redisLog(REDIS_NOTICE,
            "Background saving terminated with success");
        // dirty 更新为 BGSAVE 以后的脏数据数
        server.dirty = server.dirty - server.dirty_before_bgsave;
        server.lastsave = time(NULL);
        server.lastbgsave_status = REDIS_OK;
    } else if (!bysignal && exitcode != 0) {
        // BGSAVE 出错
        redisLog(REDIS_WARNING, "Background saving error");
        server.lastbgsave_status = REDIS_ERR;
    } else {
      	// BGSAVE 被中断
        redisLog(REDIS_WARNING,
            "Background saving terminated by signal %d", bysignal);
        // 移除临时文件
        rdbRemoveTempFile(server.rdb_child_pid);
        /* SIGUSR1 is whitelisted, so we have a way to kill a child without
         * tirggering an error conditon. */
        if (bysignal != SIGUSR1)
            server.lastbgsave_status = REDIS_ERR;
    }
    // 更新服务器状态
    server.rdb_child_pid = -1;
    server.rdb_save_time_last = time(NULL)-server.rdb_save_time_start;
    server.rdb_save_time_start = -1;
    // 处理正在等待 BGSAVE 完成的从节点
    updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
}

void updateSlavesWaitingBgsave(int bgsaveerr) {
    listNode *ln;
    int startbgsave = 0;
    listIter li;
    // 遍历所有 slave
    listRewind(server.slaves,&li);
    while((ln = listNext(&li))) {
        redisClient *slave = ln->value;
        if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
            // 之前的 RDB 文件不能被 slave 使用,立马开始一个新 BGSAVE
            startbgsave = 1;
            slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
        } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
            // 执行到这里,说明有 slave 在等待 BGSAVE 完成
            struct redis_stat buf;
            if (bgsaveerr != REDIS_OK) {
                // 但是 BGSAVE 执行错误 释放 slave
                freeClient(slave);
                redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
                continue;
            }
            // 打开 RDB 文件
            if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 ||
                redis_fstat(slave->repldbfd,&buf) == -1) {
                freeClient(slave);
                redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
                continue;
            }
            // 设置偏移量,各种值,准备发送 rdb 文件给从节点
            slave->repldboff = 0;
            slave->repldbsize = buf.st_size;
            slave->replstate = REDIS_REPL_SEND_BULK;
            slave->replpreamble = sdscatprintf(sdsempty(),"$%lld\r\n",
                (unsigned long long) slave->repldbsize);
            // 清空之前的写事件处理器,注册新的写事件处理器
            aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
            if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
                freeClient(slave);
                continue;
            }
        }
    }

    // 需要执行新的 BGSAVE
    if (startbgsave) {
        // 开始行的 BGSAVE ,并清空脚本缓存
        replicationScriptCacheFlush();
        if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) {
            // 启动 BGSAVE 失败的话,断开与从节点的链接
            listIter li;
            listRewind(server.slaves,&li);
            redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
            while((ln = listNext(&li))) {
                redisClient *slave = ln->value;
                if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
                    freeClient(slave);
            }
        }
    }
}
// 向从节点发送 rdb 文件时的写回调函数
void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
    redisClient *slave = privdata;
    REDIS_NOTUSED(el);
    REDIS_NOTUSED(mask);
    char buf[REDIS_IOBUF_LEN];
    ssize_t nwritten, buflen;
		// 要发送 rdb 文件的长度到从节点
    if (slave->replpreamble) {
        nwritten = write(fd,slave->replpreamble,sdslen(slave->replpreamble));
        if (nwritten == -1) {
            redisLog(REDIS_VERBOSE,"Write error sending RDB preamble to slave: %s",
                strerror(errno));
            freeClient(slave);
            return;
        }
        sdsrange(slave->replpreamble,nwritten,-1);
        if (sdslen(slave->replpreamble) == 0) {
            sdsfree(slave->replpreamble);
            slave->replpreamble = NULL;
            /* fall through sending data. */
        } else {
            return;
        }
    }
    // 开始发送 rdb 文件
    lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
    // 读取 RDB 数据
    buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
    if (buflen <= 0) {
        redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
            (buflen == 0) ? "premature EOF" : strerror(errno));
        freeClient(slave);
        return;
    }
    // 发送 rdb 数据到从节点
    if ((nwritten = write(fd,buf,buflen)) == -1) {
        if (errno != EAGAIN) {
            redisLog(REDIS_WARNING,"Write error sending DB to slave: %s",
                strerror(errno));
            freeClient(slave);
        }
        return;
    }
    // 更新 offset
    slave->repldboff += nwritten;
    // 如果写入已经完成
    if (slave->repldboff == slave->repldbsize) {
        // 关闭 RDB 文件描述符
        close(slave->repldbfd);
        slave->repldbfd = -1;
        // 删除之前绑定的写事件处理器
        aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
        // 只有在传输完 rdb 文件之后,从节点才算处于 online 状态
        slave->replstate = REDIS_REPL_ONLINE;
        // 更新响应时间
        slave->repl_ack_time = server.unixtime;
        // 在 BGSAVE 启动后的所有需要传播的数据都先保存在从节点的输出缓存中
        // 现在可以开始发送这些数据了,注册发送缓冲区数据的写回调,个人感觉也许应该先检查一下
        // 缓冲区非空再注册
        if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
            sendReplyToClient, slave) == AE_ERR) {
            redisLog(REDIS_WARNING,"Unable to register writable event for slave bulk transfer: %s", strerror(errno));
            freeClient(slave);
            return;
        }
        // 刷新低延迟从节点的数量(slave->repl_ack_time 修改后都需要重新计算)
        refreshGoodSlavesCount();
        redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
    }
}

增量同步

当有写命令需要传播的时候,主节点还需要将这些需要传播的数据发送到从节点:

/*
	backlog 是一个循环数组,用于存储 BGSAVE 开始以后的增量修改。当有命令需要传播到从节点
	会先写入到 backlog,然后再发送到从节点。因为循环数组大小有限,这也是是 PSYNC 对于超过
	范围的 offset 无法支持。
	这里再介绍一下 redis 中主节点用:
	<replication_id> <offset> 表示数据库状态,任意两个节点,只要拥有相同的 replication id 和
	offset,那么就可以说他们拥有相同的数据。
	replication id 是主节点随机生成的字符串,offset 是增量数据的字节偏移量
*/
void feedReplicationBacklog(void *ptr, size_t len) {
    unsigned char *p = ptr;
    // server.master_repl_offset 代表全局偏移量,每次传播命令都累加
    server.master_repl_offset += len;
    // 环形 buffer ,每次写尽可能多的数据,并在到达尾部时将 idx 重置到头部
    // 写满以后会覆盖之前的数据,但是没有关系,这个本来就是作为一个后备存储
    // 传播的命令都会立马发送到客户端
    while(len) {
        // 从 idx 到 backlog 尾部的字节数
        size_t thislen = server.repl_backlog_size - server.repl_backlog_idx;
        // 如果 idx 到 backlog 尾部这段空间足以容纳要写入的内容
        // 那么直接将写入数据长度设为 len
        // 在将这些 len 字节复制之后,这个 while 循环将跳出
        if (thislen > len) thislen = len;
        // 将 p 中的 thislen 字节内容复制到 backlog
        memcpy(server.repl_backlog+server.repl_backlog_idx,p,thislen);
        // 更新 idx ,指向新写入的数据之后
        server.repl_backlog_idx += thislen;
        // 如果写入达到尾部,那么将索引重置到头部
        if (server.repl_backlog_idx == server.repl_backlog_size)
            server.repl_backlog_idx = 0;
        // 减去已写入的字节数
        len -= thislen;
        // 将指针移动到已被写入数据的后面,指向未被复制数据的开头
        p += thislen;
        // 增加实际长度
        server.repl_backlog_histlen += thislen;
    }
		// server.repl_backlog_histlen 代表当前可用的数据长度,最大不能超过 backlog 的容量
    // 如果已经超过,代表写入的数据已经发生了覆盖
    if (server.repl_backlog_histlen > server.repl_backlog_size)
        server.repl_backlog_histlen = server.repl_backlog_size;
		// server.repl_backlog_off 是 backlog 中最小的全局偏移值,当从节点进行 PSYNC 时
    // 只有 offset 在 [server.repl_backlog_off,server.master_repl_offset) 内才
    // 可能进行 partial resync
    server.repl_backlog_off = server.master_repl_offset) -
                              server.repl_backlog_histlen + 1;
}
/*
	当启动主从同步以后,主从节点需要持续同步增量数据
*/
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
    listNode *ln;
    listIter li;
    int j, len;
    char llstr[REDIS_LONGSTR_SIZE];
		// backlog 为空,且没有从服务器,直接返回
    if (server.repl_backlog == NULL && listLength(slaves) == 0) return;
    // 如果有从节点,那么必然已经创建了 backlog 存储
    redisAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL));
    // 如果有需要的话,发送 SELECT 命令,指定数据库
    if (server.slaveseldb != dictid) {
        robj *selectcmd;
        if (dictid >= 0 && dictid < REDIS_SHARED_SELECT_CMDS) {
            selectcmd = shared.select[dictid];
        } else {
            int dictid_len;
            dictid_len = ll2string(llstr,sizeof(llstr),dictid);
            selectcmd = createObject(REDIS_STRING,
                sdscatprintf(sdsempty(),
                "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
                dictid_len, llstr));
        }
        // 将 SELECT 命令添加到 backlog
        if (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd);
        // 发送给所有从服务器
        listRewind(slaves,&li);
        while((ln = listNext(&li))) {
            redisClient *slave = ln->value;
            addReply(slave,selectcmd);
        }
        if (dictid < 0 || dictid >= REDIS_SHARED_SELECT_CMDS)
            decrRefCount(selectcmd);
    }
    server.slaveseldb = dictid;
    // 将命令写入到 backlog
    if (server.repl_backlog) {
        // 构造传播数据
        char aux[REDIS_LONGSTR_SIZE+3];
        aux[0] = '*';
        len = ll2string(aux+1,sizeof(aux)-1,argc);
        aux[len+1] = '\r';
        aux[len+2] = '\n';
        feedReplicationBacklog(aux,len+3);
        for (j = 0; j < argc; j++) {
            long objlen = stringObjectLen(argv[j]);
            // 将参数从对象转换成协议格式
            aux[0] = '$';
            len = ll2string(aux+1,sizeof(aux)-1,objlen);
            aux[len+1] = '\r';
            aux[len+2] = '\n';
            feedReplicationBacklog(aux,len+3);
            feedReplicationBacklogWithObject(argv[j]);
            feedReplicationBacklog(aux+len+1,2);
        }
    }
    // 将命令发送给所有从节点
    listRewind(slaves,&li);
    while((ln = listNext(&li))) {
        // 指向从服务器
        redisClient *slave = ln->value;
        // 不要给正在等待 BGSAVE 开始的从服务器发送命令,因为这个数据对他们来说是无用非法的
        // 这种节点的缓冲区需要保存的是对他么合法的 BGSAVE 开始以后的增量数据
        if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
        // 添加数据到缓冲区,这些数据只有在已经完成 rdb 传输以后才会发送给从节点
        addReplyMultiBulkLen(slave,argc);
        for (j = 0; j < argc; j++)
            addReplyBulk(slave,argv[j]);
    }
}

Partial Resync

Partial Resync 是为了避优化已经同步过的主从节点,在链接短暂断开以后重新同步时的开销和效率。假设从节点与主节点同步以后,因为网络原因,与主节点短暂断开了链接,期间从节点会丢失一些主节点的增量更新数据。当主从再次链接以后,如果发现主节点还保存有从节点丢失的信息(在 backlog 中),这次同步就可以跳过刚刚介绍的全量同步的步骤,这无疑可以大大提升同步效率。

需要指出 PSYNC 在一些老版本的 redis 中是不支持的。笔者看的源码 3.0,对照了下现在 redis 5.x 的代码,其实相关部分改动有很多,redis 开发团队对 PSYNC 又做了相当多的优化。但是对基础想法的理解的重要性,要远高于优化,所以笔者还是以 3.0 版本代码为基础讲解:

/*
	收到 PSYNC 命令后,主节点会尝试看看可不可以进行 parital resync,如果不行的话
	需要重新进行全量+增量同步过程。PSYNC 命令格式如下:
	PSYNC replication_id offset
	代表从节点希望与 replication_id 主节点,从 offset 偏移量开始重新同步数据
*/
int masterTryPartialResynchronization(redisClient *c) {
    long long psync_offset, psync_len;
    char *master_runid = c->argv[1]->ptr;
    char buf[128];
    int buflen;
    // 检查 master id 是否和 runid 一致,只有一致的情况下才有 PSYNC 的可能
    if (strcasecmp(master_runid, server.runid)) {
        if (master_runid[0] != '?') {
            // replication_id 不匹配,这种情况是因为这个主节点是由从节点提升而来或者主节点重启
            // 而这些要求 PSYNC 的节点保存的仍然是之前主节点的 replication_id 
            redisLog(REDIS_NOTICE,"Partial resynchronization not accepted: "
                "Runid mismatch (Client asked for runid '%s', my runid is '%s')",
                master_runid, server.runid);
        } else {
            // replication_id 是 ? 代表从节点自己指定了全量同步
            redisLog(REDIS_NOTICE,"Full resync requested by slave.");
        }
        // 需要 full resync
        goto need_full_resync;
    }
    // 取出 psync_offset 参数,从节点要求从这个 offset 开始同步数据
    if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) !=
       REDIS_OK) goto need_full_resync;

    /*
    	backlog 保存一定 offset 范围内的增量数据,如果从节点要求的 offset
    	在 backlog 保存的 offset 范围内,就可以进行 partial resync
    */
    if (!server.repl_backlog ||
        psync_offset < server.repl_backlog_off ||
        psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen))
    {
        // 执行 FULL RESYNC
        redisLog(REDIS_NOTICE,
            "Unable to partial resync with the slave for lack of backlog (Slave request was: %lld).", psync_offset);
        if (psync_offset > server.master_repl_offset) {
            redisLog(REDIS_WARNING,
                "Warning: slave tried to PSYNC with an offset that is greater than the master replication offset.");
        }
        goto need_full_resync;
    }
		// 可以进行 partial resync 的从节点,设置为 ONLINE 状态,加入到从节点列表中
    c->flags |= REDIS_SLAVE;
    c->replstate = REDIS_REPL_ONLINE;
    c->repl_ack_time = server.unixtime;
    listAddNodeTail(server.slaves,c);
    // 向从服务器发送一个同步 +CONTINUE ,表示 PSYNC 可以执行
    buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n");
    if (write(c->fd,buf,buflen) != buflen) {
        freeClientAsync(c);
        return REDIS_OK;
    }
    // 发送 backlog 中的内容(也即是从服务器缺失的那些内容)到从服务器
    psync_len = addReplyReplicationBacklog(c,psync_offset);
    redisLog(REDIS_NOTICE,
        "Partial resynchronization request accepted. Sending %lld bytes of backlog starting from offset %lld.", psync_len, psync_offset);
    // 刷新低延迟从服务器的数量
    refreshGoodSlavesCount();
    return REDIS_OK; /* The caller can return, no full resync needed. */

need_full_resync:
    // psync 从最新 offset 开始
    psync_offset = server.master_repl_offset;
    // 如果还没有创建 repl_backlog,offset 再加1
    if (server.repl_backlog == NULL) psync_offset++;
    // 发送 +FULLRESYNC ,表示需要完整重同步
    buflen = snprintf(buf,sizeof(buf),"+FULLRESYNC %s %lld\r\n",
                      server.runid,psync_offset);
    if (write(c->fd,buf,buflen) != buflen) {
        freeClientAsync(c);
        return REDIS_OK;
    }
    return REDIS_ERR;
}
// partial resync,发送从节点缺失数据
long long addReplyReplicationBacklog(redisClient *c, long long offset) {
    long long j, skip, len;

    redisLog(REDIS_DEBUG, "[PSYNC] Slave request offset: %lld", offset);

    if (server.repl_backlog_histlen == 0) {
        redisLog(REDIS_DEBUG, "[PSYNC] Backlog history len is zero");
        return 0;
    }
    redisLog(REDIS_DEBUG, "[PSYNC] Backlog size: %lld",
             server.repl_backlog_size);
    redisLog(REDIS_DEBUG, "[PSYNC] First byte: %lld",
             server.repl_backlog_off);
    redisLog(REDIS_DEBUG, "[PSYNC] History len: %lld",
             server.repl_backlog_histlen);
    redisLog(REDIS_DEBUG, "[PSYNC] Current index: %lld",
             server.repl_backlog_idx);
		// server.repl_backlog_off 是 backlog 内保存的 olddest offset
    // offset - server.repl_backlog_off 即我们需要跳过的字节数
  	skip = offset - server.repl_backlog_off;
    redisLog(REDIS_DEBUG, "[PSYNC] Skipping: %lld", skip);
    // 将 j 指向 offset 对应在 backlog 内的地址
    j = (server.repl_backlog_idx +
        (server.repl_backlog_size-server.repl_backlog_histlen)) %
        server.repl_backlog_size;
    redisLog(REDIS_DEBUG, "[PSYNC] Index of first byte: %lld", j);
  	j = (j + skip) % server.repl_backlog_size;
		// 发送从节点缺失的数据
    len = server.repl_backlog_histlen - skip;
    redisLog(REDIS_DEBUG, "[PSYNC] Reply total length: %lld", len);
    while(len) {
        long long thislen =
            ((server.repl_backlog_size - j) < len) ?
            (server.repl_backlog_size - j) : len;

        redisLog(REDIS_DEBUG, "[PSYNC] addReply() length: %lld", thislen);
        addReplySds(c,sdsnewlen(server.repl_backlog + j, thislen));
        len -= thislen;
        j = 0;
    }
    return server.repl_backlog_histlen - skip;
}

Slave

不同于主节点,从节点既要作为服务端,在主从同步的过程中,还要作为一个客户端与主节点通信。通过在配置文件中添加 salveof HOST PORT 指令,节点启动以后即会成为指定主节点的从节点:

   // 解析配置,当 server.masterhost 非空则代表这个节点是从节点
				} else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
            slaveof_linenum = linenum;
            server.masterhost = sdsnew(argv[1]);
            server.masterport = atoi(argv[2]);
            server.repl_state = REDIS_REPL_CONNECT;
         }

链接主节点

主从同步相关的逻辑主要集中在 replicationCron 中,这个函数每 1s 调用一次:

/*
	取消链接,这里的链接不是指 TCP 链接,而是主从同步链接。
*/
void undoConnectWithMaster(void) {
    int fd = server.repl_transfer_s;
    // 连接必须处于正在连接状态
    redisAssert(server.repl_state == REDIS_REPL_CONNECTING ||
                server.repl_state == REDIS_REPL_RECEIVE_PONG);
    aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);
    close(fd);
    server.repl_transfer_s = -1;
    // 回到 CONNECT 状态
    server.repl_state = REDIS_REPL_CONNECT;
}

/*repl_state 状态转移
	REDIS_REPL_CONNECT -> REDIS_REPL_CONNECTING -> REDIS_REPL_RECEIVE_PONG 
																												|
														REDIS_REPL_CONNECTED <- REDIS_REPL_TRANSFER
*/
void replicationCron(void) {
    // 链接到主节点超时,取消链接
    if (server.masterhost &&
        (server.repl_state == REDIS_REPL_CONNECTING ||
         server.repl_state == REDIS_REPL_RECEIVE_PONG) &&
        (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
    {
        redisLog(REDIS_WARNING,"Timeout connecting to the MASTER...");
        // 取消连接
        undoConnectWithMaster();
    }

    // RDB 文件的传送已超时
    if (server.masterhost && server.repl_state ==  &&
        (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
    {
        redisLog(REDIS_WARNING,"Timeout receiving bulk data from MASTER... If the problem persists try to set the 'repl-timeout' parameter in redis.conf to a larger value.");
        // 停止传送,并删除临时文件
        replicationAbortSyncTransfer();
    }

    // 从服务器曾经连接上主服务器,但现在超时(当重新链接的时候,就会触发 PSYNC)
    if (server.masterhost && server.repl_state == REDIS_REPL_CONNECTED &&
        (time(NULL)-server.master->lastinteraction) > server.repl_timeout)
    {
        redisLog(REDIS_WARNING,"MASTER timeout: no data nor PING received...");
        // 释放主服务器
        freeClient(server.master);
    }
    // 还没有与主节点建立连接
    if (server.repl_state == REDIS_REPL_CONNECT) {
        redisLog(REDIS_NOTICE,"Connecting to MASTER %s:%d",
            server.masterhost, server.masterport);
        // 非阻塞的链接主节点
        if (connectWithMaster() == REDIS_OK) {
            redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync started");
        }
    }
		// 如果已经建立了链接,而且主节点支持 PSYNC,发送 ACK 给主节点
    if (server.masterhost && server.master &&
        !(server.master->flags & REDIS_PRE_PSYNC))
        replicationSendAck();
    // 如果有从服务器,那么隔一段时间发送一个 PING 指令作为心跳,让对方知道我们没有掉线
    if (!(server.cronloops % (server.repl_ping_slave_period * server.hz))) {
        listIter li;
        listNode *ln;
        robj *ping_argv[1];
        // 向从节点传播 PING
        ping_argv[0] = createStringObject("PING",4);
        replicationFeedSlaves(server.slaves, server.slaveseldb, ping_argv, 1);
        decrRefCount(ping_argv[0]);
        // 因为对于 REDIS_REPL_WAIT_BGSAVE_START 和 REDIS_REPL_WAIT_BGSAVE_END 状态
        // 的从节点,其不会相应 PING 命令,所以我们发送一个 \n 给从节点,这个没有实际作用,只是
        // 让对方知道我们没有掉线
        listRewind(server.slaves,&li);
        while((ln = listNext(&li))) {
            redisClient *slave = ln->value;

            if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START ||
                slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
                if (write(slave->fd, "\n", 1) == -1) {
                    /* Don't worry, it's just a ping. */
                }
            }
        }
    }
    // 断开超时从服务器
    if (listLength(server.slaves)) {
        listIter li;
        listNode *ln;
        // 遍历所有从服务器
        listRewind(server.slaves,&li);
        while((ln = listNext(&li))) {
            redisClient *slave = ln->value;
            // 略过未 ONLINE 的从服务器
            if (slave->replstate != REDIS_REPL_ONLINE) continue;
            // 不检查旧版的从服务器
            if (slave->flags & REDIS_PRE_PSYNC) continue;
            // 释放超时从服务器
            if ((server.unixtime - slave->repl_ack_time) > server.repl_timeout)
            {
                char ip[REDIS_IP_STR_LEN];
                int port;
                if (anetPeerToString(slave->fd,ip,sizeof(ip),&port) != -1) {
                    redisLog(REDIS_WARNING,
                        "Disconnecting timedout slave: %s:%d",
                        ip, slave->slave_listening_port);
                } 
                // 释放
                freeClient(slave);
            }
        }
    }
		// 没有任何从服务器,等待一定时间后就释放 backlog 资源(等待一段时间是为了防止从节点只是暂时掉线)
    if (listLength(server.slaves) == 0 && server.repl_backlog_time_limit &&
        server.repl_backlog)
    {
        time_t idle = server.unixtime - server.repl_no_slaves_since;
        if (idle > server.repl_backlog_time_limit) {
            // 释放
            freeReplicationBacklog();
            redisLog(REDIS_NOTICE,
                "Replication backlog freed after %d seconds "
                "without connected slaves.",
                (int) server.repl_backlog_time_limit);
        }
    }
    // 在没有任何从服务器,AOF 关闭的情况下,清空 script 缓存
    if (listLength(server.slaves) == 0 &&
        server.aof_state == REDIS_AOF_OFF &&
        listLength(server.repl_scriptcache_fifo) != 0)
    {
        replicationScriptCacheFlush();
    }
    // 更新符合给定延迟值的从服务器的数量
    refreshGoodSlavesCount();
}

int connectWithMaster(void) {
    int fd;
    // 非阻塞地连接主服务器,注意如果 connect 的套接字是非阻塞的,那么 connect 返回的时候
    // 不代表已经完成了 TCP 三次握手
    fd = anetTcpNonBlockConnect(NULL,server.masterhost,server.masterport);
    if (fd == -1) {
        redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
            strerror(errno));
        return REDIS_ERR;
    }

    // 监听主服务器 fd 的读和写事件,并绑定文件事件处理器,当完成 3 次握手,fd 应该立马变为可写
    // 就会调用 syncWithMaster
    if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) ==
            AE_ERR)
    {
        close(fd);
        redisLog(REDIS_WARNING,"Can't create readable event for SYNC");
        return REDIS_ERR;
    }
    // 初始化统计变量
    server.repl_transfer_lastio = server.unixtime;
    server.repl_transfer_s = fd;
    // 将状态改为 REDIS_REPL_CONNECTING
    server.repl_state = REDIS_REPL_CONNECTING;
    return REDIS_OK;
}

void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
    char tmpfile[256], *err;
    int dfd, maxtries = 5;
    int sockerr = 0, psync_result;
    socklen_t errlen = sizeof(sockerr);
    REDIS_NOTUSED(el);
    REDIS_NOTUSED(privdata);
    REDIS_NOTUSED(mask);
    // 如果本从节点在与之前主节点建立连接以后,被提升为主节点,那么 server.repl_state 被设置为 REDIS_REPL_NONE
    // 这种情况下,我们立刻关闭套接字并且退出,因为我们已经不再是从节点
    if (server.repl_state == REDIS_REPL_NONE) {
        close(fd);
        return;
    }
    // 检查套接字错误,非阻塞的 connect 需要检查错误
    if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &sockerr, &errlen) == -1)
        sockerr = errno;
    if (sockerr) {
        aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);
        redisLog(REDIS_WARNING,"Error condition on socket for SYNC: %s",
            strerror(sockerr));
        goto error;
    }
		// 如果处于 REDIS_REPL_CONNECTING,代表我们需要进行一次同步(不管是全量还是 partial resync)
    // 在此之前,先阻塞的发送一个 PING,这个 PING 没有额外的含义,只是希望确定与主节点的网路情况良好
    if (server.repl_state == REDIS_REPL_CONNECTING) {
        redisLog(REDIS_NOTICE,"Non blocking connect for SYNC fired the event.");
        // 删除写事件监听,但是仍然保留读事件,因为我们要在这个函数里面读取主节点的 PONG 相应
        aeDeleteFileEvent(server.el,fd,AE_WRITABLE);
        // 更新状态
        server.repl_state = REDIS_REPL_RECEIVE_PONG;
        // 同步发送 PING,用 poll 实现了一个支持超时的阻塞写,具体代码不分析了,比较简单
        syncWrite(fd,"PING\r\n",6,100);
        // 返回,下次这个函数被调用的时候是主节点发回相应的时候
        return;
    }
    // 主节点对之前的 PING 进行了相应
    if (server.repl_state == REDIS_REPL_RECEIVE_PONG) {
        char buf[1024];
        // 读事件也删除,后续的读事件回调不会用到这个函数
        aeDeleteFileEvent(server.el,fd,AE_READABLE);
        // 阻塞的读取主节点响应
        buf[0] = '\0';
        if (syncReadLine(fd,buf,sizeof(buf),
            server.repl_syncio_timeout*1000) == -1)
        {
            redisLog(REDIS_WARNING,
                "I/O error reading PING reply from master: %s",
                strerror(errno));
            goto error;
        }
				/* 合法响应只有3种:
					1. +PONG
					2. —NOAUTH
					3. -ERR operation not permitted
					后面两种需要进行鉴权后面会处理,其余响应均为非法
  			*/
        if (buf[0] != '+' &&
            strncmp(buf,"-NOAUTH",7) != 0 &&
            strncmp(buf,"-ERR operation not permitted",28) != 0)
        {
            // 接收到未验证错误
            redisLog(REDIS_WARNING,"Error reply to PING from master: '%s'",buf);
            goto error;
        } else {
            redisLog(REDIS_NOTICE,
                "Master replied to PING, replication can continue...");
        }
    }
    // 进行身份验证,阻塞的发送 AUTH 命令,并且等待对方回应
    if(server.masterauth) {
        err = sendSynchronousCommand(fd,"AUTH",server.masterauth,NULL);
        if (err[0] == '-') {
            // AUTH 失败
            redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",err);
            sdsfree(err);
            goto error;
        }
        sdsfree(err);
    }
		// 将从节点作为 redis 服务器的节点通知给主节点,从节点也是可以服务客户端的
    {
        sds port = sdsfromlonglong(server.port);
        err = sendSynchronousCommand(fd,"REPLCONF","listening-port",port,
                                         NULL);
        sdsfree(port);
				// 老版本的 redis 不支持这个命令,所以不是致命错误,记录一下即可
        if (err[0] == '-') {
            redisLog(REDIS_NOTICE,"(Non critical) Master does not understand REPLCONF listening-port: %s", err);
        }
        sdsfree(err);
    }
    // 尝试进行 PSYNC
    psync_result = slaveTryPartialResynchronization(fd);
    // 可以执行 partial resync,退出(新的事件处理函数已经绑定好了)
    if (psync_result == PSYNC_CONTINUE) {
        redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Master accepted a Partial Resynchronization.");
        return;
    }
    // 如果主服务器并不支持 PSYNC,我们还需要额外发送一个 SYNC 命令来请求同步
    if (psync_result == PSYNC_NOT_SUPPORTED) {
        redisLog(REDIS_NOTICE,"Retrying with SYNC...");
        if (syncWrite(fd,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) {
            redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
                strerror(errno));
            goto error;
        }
    }
		// 不管是不支持 PSYNC 还是无法 partial resync,这里都需要进行全量同步,我们需要准备
    // 接收主节点的 rdb 文件,大家一个临时文件用于保存 rdb
    while(maxtries--) {
        snprintf(tmpfile,256,
            "temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());
        dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
        if (dfd != -1) break;
        sleep(1);
    }
    if (dfd == -1) {
        redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
        goto error;
    }

    // 设置一个读事件处理器,来读取主服务器的 RDB 文件
    if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)
            == AE_ERR)
    {
        redisLog(REDIS_WARNING,
            "Can't create readable event for SYNC: %s (fd=%d)",
            strerror(errno),fd);
        goto error;
    }
    // 设置状态
    server.repl_state = REDIS_REPL_TRANSFER;
    // 更新统计信息
    server.repl_transfer_size = -1;
    server.repl_transfer_read = 0;
    server.repl_transfer_last_fsync_off = 0;
    server.repl_transfer_fd = dfd;
    server.repl_transfer_lastio = server.unixtime;
    server.repl_transfer_tmpfile = zstrdup(tmpfile);
    return;
error:
    close(fd);
    server.repl_transfer_s = -1;
    server.repl_state = REDIS_REPL_CONNECT;
    return;
}

全量同步

我们在分析主节点代码时就知道全量同步就是主节点发送 rdb 文件的过程。而从节点在需要全量同步以后会注册读回调函数来处理即将收到的 rdb 文件:

#define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */
void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
    char buf[4096];
    ssize_t nread, readlen;
    off_t left;
    REDIS_NOTUSED(el);
    REDIS_NOTUSED(privdata);
    REDIS_NOTUSED(mask);
    // 全量同步最开始发送的是 rdb 文件的大小,如果 server.repl_transfer_size == -1
    // 我们需要先解析 rdb 文件大小
    if (server.repl_transfer_size == -1) {
        // 调用读函数
        if (syncReadLine(fd,buf,1024,server.repl_syncio_timeout*1000) == -1) {
            redisLog(REDIS_WARNING,
                "I/O error reading bulk count from MASTER: %s",
                strerror(errno));
            goto error;
        }
        // 出错?
        if (buf[0] == '-') {
            redisLog(REDIS_WARNING,
                "MASTER aborted replication with an error: %s",
                buf+1);
            goto error;
        } else if (buf[0] == '\0') {
						// 还记得在 replicationCron 中对与 BGSAVE_START 和 BGSAVE_END 的从节点发送的 \n 吗
            server.repl_transfer_lastio = server.unixtime;
            return;
        } else if (buf[0] != '$') {
            // 读入的内容出错,和协议格式不符
            redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$' (we received '%s'), are you sure the host and port are right?", buf);
            goto error;
        }
        // 分析 RDB 文件大小
        server.repl_transfer_size = strtol(buf+1,NULL,10);
        redisLog(REDIS_NOTICE,
            "MASTER <-> SLAVE sync: receiving %lld bytes from master",
            (long long) server.repl_transfer_size);
        // 不明白为什么这里就要退出了,其实可以继续读吧
        return;
    }
    // 开始正式读取 rdb 文件的内容(这里不太明白了,按说应该处理 EAGAIN 等情况才对)
    left = server.repl_transfer_size - server.repl_transfer_read;
    readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf);
    nread = read(fd,buf,readlen);
    if (nread <= 0) {
        redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
            (nread == -1) ? strerror(errno) : "connection lost");
        replicationAbortSyncTransfer();
        return;
    }
    // 写入到临时 rdb 文件
    server.repl_transfer_lastio = server.unixtime;
    if (write(server.repl_transfer_fd,buf,nread) != nread) {
        redisLog(REDIS_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> SLAVE synchronization: %s", strerror(errno));
        goto error;
    }
    // 加上刚读取好的字节数
    server.repl_transfer_read += nread;
    // 定期将读入的文件 fsync 到磁盘,以免 buffer 太多,一下子写入时撑爆 IO
    if (server.repl_transfer_read >=
        server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC)
    {
        off_t sync_size = server.repl_transfer_read -
                          server.repl_transfer_last_fsync_off;
        rdb_fsync_range(server.repl_transfer_fd,
            server.repl_transfer_last_fsync_off, sync_size);
        server.repl_transfer_last_fsync_off += sync_size;
    }
    // 检查 RDB 是否已经传送完毕
    if (server.repl_transfer_read == server.repl_transfer_size) {
        // 完毕,将临时文件改名为 dump.rdb
        if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) {
            redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
            replicationAbortSyncTransfer();
            return;
        }
        // 先清空旧数据库
        redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Flushing old data");
        signalFlushedDb(-1);
        emptyDb(replicationEmptyDbCallback);
        // 先删除主服务器的读事件监听,因为 rdbLoad() 会调用 rdbLoadProgressCallback
        // 这个函数会调用 eventLoop 处理事件,如果不删除,会导致递归调用
        aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE);
        // 载入 RDB
        if (rdbLoad(server.rdb_filename) != REDIS_OK) {
            redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
            replicationAbortSyncTransfer();
            return;
        }
        // 关闭临时文件
        zfree(server.repl_transfer_tmpfile);
        close(server.repl_transfer_fd);
        // 为主节点构造客户端资源,createClient 中会绑定好读事件
        server.master = createClient(server.repl_transfer_s);
        // 标记这个客户端为主服务器
        server.master->flags |= REDIS_MASTER;
        // 标记它为已验证身份
        server.master->authenticated = 1;
        // 更新复制状态
        server.repl_state = REDIS_REPL_CONNECTED;
        // 设置主服务器的复制偏移量
        server.master->reploff = server.repl_master_initial_offset;
        // 保存主服务器的 RUN ID
        memcpy(server.master->replrunid, server.repl_master_runid,
            sizeof(server.repl_master_runid));
        // 如果 offset 被设置为 -1 ,那么表示主服务器的版本低于 2.8 
        // 无法使用 PSYNC ,所以需要设置相应的标识值
        if (server.master->reploff == -1)
            server.master->flags |= REDIS_PRE_PSYNC;
        redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Finished with success");
        // 重启 AOF,这会导致进行一次 AOF 重写
        if (server.aof_state != REDIS_AOF_OFF) {
            int retry = 10;
            // 关闭
            stopAppendOnly();
            // 再重启
            while (retry-- && startAppendOnly() == REDIS_ERR) {
                redisLog(REDIS_WARNING,"Failed enabling the AOF after successful master synchronization! Trying it again in one second.");
                sleep(1);
            }
            if (!retry) {
                redisLog(REDIS_WARNING,"FATAL: this slave instance finished the synchronization with its master, but the AOF can't be turned on. Exiting now.");
                exit(1);
            }
        }
    }

    return;

error:
    replicationAbortSyncTransfer();
    return;
}

增量同步

增量同步其实就是从节点不断接收主节点传播的写指令的过程

Partial Resync

当与主节点进行同步的时候,从节点会优先尝试进行 partial resync:

#define PSYNC_CONTINUE 0
#define PSYNC_FULLRESYNC 1
#define PSYNC_NOT_SUPPORTED 2
int slaveTryPartialResynchronization(int fd) {
    char *psync_runid;
    char psync_offset[32];
    sds reply;
		// 如果主节点支持 PSYNC,那么我们不管是进行了 FULLRESYNC 还是 partial resync
    // repl_master_initial_offset 都不会是 -1.只有对于不支持 PSYNC 的主节点
    // repl_master_initial_offset 才会保持1
    server.repl_master_initial_offset = -1;

    if (server.cached_master) {
 				/*
 					之前就提到了,resync 是发生在曾经同步过的主从之间,如果有曾经重新同步过的主节点
 					我们将之前的 replication id 和 offset 发送过去请求 PSYNC
 				*/
        psync_runid = server.cached_master->replrunid;
        snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1);
        redisLog(REDIS_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_runid, psync_offset);
    } else {
        // 缓存不存在,replication id 设置为 ? 请求全量同步
        redisLog(REDIS_NOTICE,"Partial resynchronization not possible (no cached master)");
        psync_runid = "?";
        memcpy(psync_offset,"-1",3);
    }
    // 向主服务器发送 PSYNC 命令,读取其响应
    reply = sendSynchronousCommand(fd,"PSYNC",psync_runid,psync_offset,NULL);
    if (!strncmp(reply,"+FULLRESYNC",11)) {
        // 虽然支持 PSYNC 命令,但是无法进行 partial resync,执行全量同步
        char *runid = NULL, *offset = NULL;
        // 分析并记录主服务器的 run id
        runid = strchr(reply,' ');
        if (runid) {
            runid++;
            offset = strchr(runid,' ');
            if (offset) offset++;
        }
        // 检查 run id 的合法性
        if (!runid || !offset || (offset-runid-1) != REDIS_RUN_ID_SIZE) {
            redisLog(REDIS_WARNING,
                "Master replied with wrong +FULLRESYNC syntax.");
            // 主服务器支持 PSYNC ,但是却发来了异常的 run id
            // 只好将 run id 设为 0 ,让下次 PSYNC 时失败
            memset(server.repl_master_runid,0,REDIS_RUN_ID_SIZE+1);
        } else {
            // 保存 run id
            memcpy(server.repl_master_runid, runid, offset-runid-1);
            server.repl_master_runid[REDIS_RUN_ID_SIZE] = '\0';
            // 以及 initial offset
            server.repl_master_initial_offset = strtoll(offset,NULL,10);
            // 打印日志,这是一个 FULL resync
            redisLog(REDIS_NOTICE,"Full resync from master: %s:%lld",
                server.repl_master_runid,
                server.repl_master_initial_offset);
        }
        // 要开始完整重同步,缓存中的 master 已经没用了,清除它
        replicationDiscardCachedMaster();
        sdsfree(reply);
        // 返回状态
        return PSYNC_FULLRESYNC;
    }
    if (!strncmp(reply,"+CONTINUE",9)) {
        // 可以进行 partial resync,我们会收到主节点从 offset 之后的数据
        redisLog(REDIS_NOTICE,
            "Successful partial resynchronization with master.");
        sdsfree(reply);
        // 将缓存中的 master 设为当前 master
        replicationResurrectCachedMaster(fd);
        // 返回状态
        return PSYNC_CONTINUE;
    }
    // 主节点不支持 PSYNC 命令
    if (strncmp(reply,"-ERR",4)) {
        /* If it's not an error, log the unexpected event. */
        redisLog(REDIS_WARNING,
            "Unexpected reply to PSYNC from master: %s", reply);
    } else {
        redisLog(REDIS_NOTICE,
            "Master does not support PSYNC or is in "
            "error state (reply: %s)", reply);
    }
    sdsfree(reply);
    // 清楚缓存的主服务器
    replicationDiscardCachedMaster();
    // 主服务器不支持 PSYNC
    return PSYNC_NOT_SUPPORTED;
}

void replicationResurrectCachedMaster(int newfd) {
    // 设置 master
    server.master = server.cached_master;
    server.cached_master = NULL;
    server.master->fd = newfd;
    server.master->flags &= ~(REDIS_CLOSE_AFTER_REPLY|REDIS_CLOSE_ASAP);
    server.master->authenticated = 1;
    server.master->lastinteraction = server.unixtime;

    // 回到已连接状态
    server.repl_state = REDIS_REPL_CONNECTED;
    // 将主节点加入到客户端列表中
    listAddNodeTail(server.clients,server.master);
    // 后续只需要想处理其他客户端一样处理主节点传播的消息即可
    if (aeCreateFileEvent(server.el, newfd, AE_READABLE,
                          readQueryFromClient, server.master)) {
        redisLog(REDIS_WARNING,"Error resurrecting the cached master, impossible to add the readable handler: %s", strerror(errno));
        freeClientAsync(server.master); /* Close ASAP. */
    }
		// 如果有需要发送给主节点的消息,注册写回调
    if (server.master->bufpos || listLength(server.master->reply)) {
        if (aeCreateFileEvent(server.el, newfd, AE_WRITABLE,
                          sendReplyToClient, server.master)) {
            redisLog(REDIS_WARNING,"Error resurrecting the cached master, impossible to add the writable handler: %s", strerror(errno));
            freeClientAsync(server.master); /* Close ASAP. */
        }
    }
}

当我们要释放某个客户端时候,会检查这个客户端是不是主节点,如果是的话,就会将其缓存下来,用于后续 PSYNC:

void freeClient(redisClient *c) {
 	// 略
      if (server.master && c->flags & REDIS_MASTER) {
        redisLog(REDIS_WARNING,"Connection with master lost.");
        if (!(c->flags & (REDIS_CLOSE_AFTER_REPLY|
                          REDIS_CLOSE_ASAP|
                          REDIS_BLOCKED|
                          REDIS_UNBLOCKED)))
        {
            replicationCacheMaster(c);
            return;
        }
    }
}

void replicationCacheMaster(redisClient *c) {
    listNode *ln;
    redisAssert(server.master != NULL && server.cached_master == NULL);
    redisLog(REDIS_NOTICE,"Caching the disconnected master state.");
    // 从客户端链表中移除主服务器
    ln = listSearchKey(server.clients,c);
    redisAssert(ln != NULL);
    listDelNode(server.clients,ln);
    // 缓存 master
    server.cached_master = server.master;
    // 删除事件监视,关闭 socket
    aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
    aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
    close(c->fd);
    c->fd = -1;
		// 删除 peerid
    if (c->peerid) {
        sdsfree(c->peerid);
        c->peerid = NULL;
    }
    replicationHandleMasterDisconnection();
}

void replicationHandleMasterDisconnection(void) {
    // 设置 replication 状态,重置 master
    server.master = NULL;
    server.repl_state = REDIS_REPL_CONNECT;
    server.repl_down_since = server.unixtime;
		// 如果这个从节点本身有从节点,断开所有与从节点的链接
    if (server.masterhost != NULL) disconnectSlaves();
}

我们要注意,数据的同步是单向的,只会从主节点到从节点,任何发生在从节点的写入,最终都会丢失。redis 并不禁止从节点处理写请求。

后续优化

在最新的 redis 代码中针对主从同步还做了很多优化,比如:

  1. 如果主节点是由从节点提升而来,那么它会保存之前主节点的信息,当从节点使用之前主节点的 replication id 进行 PSYNC 的时候,partial resync 仍然可能进行
  2. 全量同步时,主节点将 rdb 落盘再发送给从节点是没有必要的,后面的 redis 支持 diskless replication

总结

  1. 主从同步时数据库服务分布式部署必须面对的问题,redis 使用异步同步,达到最终一致性,但是这也导致了永远存在一个丢失数据的时间窗口(单点也有这个问题)
  2. redis 使用 PSYNC 机制来降低 resync 的成本
  3. 从节点本身也可以有自己的从节点,但是使用场景很少
  4. 通过添加从节点,我们可以扩展 redis 的服务能力,比如添加 read-only 从节点,将读请求分发到从节点,来减轻主节点压力。但是数据同步是单向的,从节点上发生的写入,最终都会丢失
redis 源码系列(17):分身术 --- replicationredis 源码系列(17):分身术 --- replication maohuazhu 发布了27 篇原创文章 · 获赞 10 · 访问量 2309 私信 关注
上一篇:【转帖】什么是REPL


下一篇:【sessions】Oracle中sessions和processes的大小关系(10g和11g不同)