redis 主从同步-master端

redis 主从同步master端处理

 redis 主从同步的过程始于一系列类似tcp三次握手的过程,归于"sync/psync"命令。分析redis主从同步master端的处理逻辑需要从syncCommand的函数开始进行分析。

 redis 主从同步过程中master的执行内容包括:

  • 接收slave的sync/psync命令
  • 执行bgsave命令异步启动rdb生成
  • crontab定时检查rdb是否生成完毕
  • 发送rdb文件到slave
  • 发送rdb文件生成过程中缓存的redis执行命令
{"sync",syncCommand,1,"ars",0,NULL,0,0,0,0,0},
{"psync",syncCommand,3,"ars",0,NULL,0,0,0,0,0},


redis syncCommand处理逻辑

 整个处理逻辑主要分为处理部分同步命令psync全量同步命令sync,整个交互过程如下

  • 先尝试部分同步psync操作,成功则直接同步数据到slave
  • 部分同步psync操作失败,尝试全量同步sync操作
  • 全量同步操作区分是否已有在执行中的bgsave命令,有则共享没有则重新开启线程异步执行
  • 定时任务负责检查异步任务是否完成,完成则发送rdb数据到slave
  • 发送rdb生成过程中缓存的redis执行命令到slave

需要针对触发bgsave命令的部分作下详细说明,因为这个是核心的关键点:

  • 首先判断master是否正在执行bgsave命令,通过是否有启动bgsave的线程(server.rdb_child_pid)进行判断。
  • 如果正在执行bgsave命令,那么我们就等待前一个bgsave生成的rdb文件
  • 需要重点指出复用的过程中需要把rdb过程累计的redis命令也复制一份通过copyClientOutputBuffer。因为rdb复用了,所以这些累计的命令也需要复用。
  • 如果没有执行bgsave命令,那么就需要启动bgsave任务
/* SYNC ad PSYNC command implemenation. */
void syncCommand(redisClient *c) {

    // 已经是 SLAVE ,或者处于 MONITOR 模式,返回
    if (c->flags & REDIS_SLAVE) return;

    // 如果这是一个从服务器,但与主服务器的连接仍未就绪,那么拒绝 SYNC
    if (server.masterhost && server.repl_state != REDIS_REPL_CONNECTED) {
        addReplyError(c,"Can't SYNC while not connected with my master");
        return;
    }

    // 在客户端仍有输出数据等待输出,不能 SYNC
    if (listLength(c->reply) != 0 || c->bufpos != 0) {
        addReplyError(c,"SYNC and PSYNC are invalid with pending output");
        return;
    }

    redisLog(REDIS_NOTICE,"Slave asks for synchronization");

    /* 
     * 如果这是一个 PSYNC 命令,那么尝试 partial resynchronization 。
     * 如果失败,那么使用 full resynchronization ,
     * 在这种情况下, masterTryPartialResynchronization() 返回以下内容:
     * +FULLRESYNC <runid> <offset>
     * 这样的话,之后如果主服务器断开,那么从服务器就可以尝试 PSYNC 了。
     */
    if (!strcasecmp(c->argv[0]->ptr,"psync")) {
        // 尝试进行 PSYNC
        if (masterTryPartialResynchronization(c) == REDIS_OK) {
            // 可执行 PSYNC
            server.stat_sync_partial_ok++;
            return; /* No full resync needed, return. */
        } else {
            // 不可执行 PSYNC
            char *master_runid = c->argv[1]->ptr;
            if (master_runid[0] != '?') server.stat_sync_partial_err++;
        }
    } else {
        // 旧版实现,设置标识,避免接收 REPLCONF ACK 
        c->flags |= REDIS_PRE_PSYNC;
    }

    // 以下是完整重同步的情况。。。
    // 执行 full resynchronization ,增加计数
    server.stat_sync_full++;

    // 检查是否有 BGSAVE 在执行
    if (server.rdb_child_pid != -1) {
        redisClient *slave;
        listNode *ln;
        listIter li;

        // 如果有至少一个 slave 在等待这个 BGSAVE 完成
        // 那么说明正在进行的 BGSAVE 所产生的 RDB 也可以为其他 slave 所用
        listRewind(server.slaves,&li);
        while((ln = listNext(&li))) {
            slave = ln->value;
            if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
        }

        if (ln) {
            // 幸运的情况,可以使用目前 BGSAVE 所生成的 RDB
            copyClientOutputBuffer(c,slave);
            c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
            redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
        } else {
            // 不好运的情况,必须等待下个 BGSAVE
            c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
            redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
        }
    } else {
        // 没有 BGSAVE 在进行,开始一个新的 BGSAVE
        redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
        if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) {
            redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
            addReplyError(c,"Unable to perform background save");
            return;
        }
        // 设置状态
        c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
        /* Flush the script cache for the new slave. */
        // 因为新 slave 进入,刷新复制脚本缓存
        replicationScriptCacheFlush();
    }

    if (server.repl_disable_tcp_nodelay)
        anetDisableTcpNoDelay(NULL, c->fd); /* Non critical if it fails. */

    c->repldbfd = -1;

    c->flags |= REDIS_SLAVE;

    server.slaveseldb = -1; /* Force to re-emit the SELECT command. */

    // 添加到 slave 列表中
    listAddNodeTail(server.slaves,c);
    // 如果是第一个 slave ,那么初始化 backlog
    if (listLength(server.slaves) == 1 && server.repl_backlog == NULL)
        createReplicationBacklog();
    return;
}


redis 部分同步处理逻辑

 判断能够通过部分数据同步实现的逻辑很简单,主要从两个维度进行判断:

  • 判断slave发送过来的master_runid是否等于master的runid
  • 判断master是否存在backlog缓存部分同步命令并且偏移量符合要求
    如果不满足上述两个条件那么就需要进行全量同步,否则进行部分同步
// 尝试进行部分 resync ,成功返回 REDIS_OK ,失败返回 REDIS_ERR 。
int masterTryPartialResynchronization(redisClient *c) {
    long long psync_offset, psync_len;
    char *master_runid = c->argv[1]->ptr;
    char buf[128];
    int buflen;

    // 检查 master id 是否和 runid 一致,只有一致的情况下才有 PSYNC 的可能
    if (strcasecmp(master_runid, server.runid)) {
        // 从服务器提供的 run id 和服务器的 run id 不一致
        if (master_runid[0] != '?') {
            redisLog(REDIS_NOTICE,"Partial resynchronization not accepted: "
                "Runid mismatch (Client asked for runid '%s', my runid is '%s')",
                master_runid, server.runid);
        // 从服务器提供的 run id 为 '?' ,表示强制 FULL RESYNC
        } else {
            redisLog(REDIS_NOTICE,"Full resync requested by slave.");
        }
        // 需要 full resync
        goto need_full_resync;
    }

    // 取出 psync_offset 参数
    if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) !=
       REDIS_OK) goto need_full_resync;

        // 如果没有 backlog
    if (!server.repl_backlog ||
        // 或者 psync_offset 小于 server.repl_backlog_off
        // (想要恢复的那部分数据已经被覆盖)
        psync_offset < server.repl_backlog_off ||
        // psync offset 大于 backlog 所保存的数据的偏移量
        psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen))
    {
        // 执行 FULL RESYNC
        redisLog(REDIS_NOTICE,
            "Unable to partial resync with the slave for lack of backlog (Slave request was: %lld).", psync_offset);
        if (psync_offset > server.master_repl_offset) {
            redisLog(REDIS_WARNING,
                "Warning: slave tried to PSYNC with an offset that is greater than the master replication offset.");
        }
        goto need_full_resync;
    }

    /* 
     * 程序运行到这里,说明可以执行 partial resync
     *
     * 1) Set client state to make it a slave.
     *    将客户端状态设为 salve  
     *
     * 2) Inform the client we can continue with +CONTINUE
     *    向 slave 发送 +CONTINUE ,表示 partial resync 的请求被接受
     *
     * 3) Send the backlog data (from the offset to the end) to the slave. 
     *    发送 backlog 中,客户端所需要的数据
     */
    c->flags |= REDIS_SLAVE;
    c->replstate = REDIS_REPL_ONLINE;
    c->repl_ack_time = server.unixtime;
    listAddNodeTail(server.slaves,c);

    // 向从服务器发送一个同步 +CONTINUE ,表示 PSYNC 可以执行
    buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n");
    if (write(c->fd,buf,buflen) != buflen) {
        freeClientAsync(c);
        return REDIS_OK;
    }
    // 发送 backlog 中的内容(也即是从服务器缺失的那些内容)到从服务器
    psync_len = addReplyReplicationBacklog(c,psync_offset);

    // 刷新低延迟从服务器的数量
    refreshGoodSlavesCount();
    return REDIS_OK; 

need_full_resync:
   
    // 刷新 psync_offset
    psync_offset = server.master_repl_offset;
    // 刷新 psync_offset
    if (server.repl_backlog == NULL) psync_offset++;
    // 发送 +FULLRESYNC ,表示需要完整重同步
    buflen = snprintf(buf,sizeof(buf),"+FULLRESYNC %s %lld\r\n",
                      server.runid,psync_offset);
    if (write(c->fd,buf,buflen) != buflen) {
        freeClientAsync(c);
        return REDIS_OK;
    }
    return REDIS_ERR;
}


redis 全量同步的rdb生成过程

 众所周知rdb文件生成是内部fork新的线程去执行rdb生成过程的,通过rdbSaveBackground的函数可以看出来内部通过fork()去实现rdb文件的生成过程。
在fork的线程当中执行rdbSave实现rdb文件的生成过程。

int rdbSaveBackground(char *filename) {
    pid_t childpid;
    long long start;

    // 如果 BGSAVE 已经在执行,那么出错
    if (server.rdb_child_pid != -1) return REDIS_ERR;

    // 记录 BGSAVE 执行前的数据库被修改次数
    server.dirty_before_bgsave = server.dirty;

    // 最近一次尝试执行 BGSAVE 的时间
    server.lastbgsave_try = time(NULL);

    // fork() 开始前的时间,记录 fork() 返回耗时用
    start = ustime();

    if ((childpid = fork()) == 0) {
        int retval;

        /* Child */

        // 关闭网络连接 fd
        closeListeningSockets(0);

        // 设置进程的标题,方便识别
        redisSetProcTitle("redis-rdb-bgsave");

        // 执行保存操作
        retval = rdbSave(filename);

        // 打印 copy-on-write 时使用的内存数
        if (retval == REDIS_OK) {
            size_t private_dirty = zmalloc_get_private_dirty();

            if (private_dirty) {
                redisLog(REDIS_NOTICE,
                    "RDB: %zu MB of memory used by copy-on-write",
                    private_dirty/(1024*1024));
            }
        }

        // 向父进程发送信号
        exitFromChild((retval == REDIS_OK) ? 0 : 1);

    } else {

        /* Parent */

        // 计算 fork() 执行的时间
        server.stat_fork_time = ustime()-start;

        // 如果 fork() 出错,那么报告错误
        if (childpid == -1) {
            server.lastbgsave_status = REDIS_ERR;
            redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
                strerror(errno));
            return REDIS_ERR;
        }

        // 打印 BGSAVE 开始的日志
        redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);

        // 记录数据库开始 BGSAVE 的时间
        server.rdb_save_time_start = time(NULL);

        // 记录负责执行 BGSAVE 的子进程 ID
        server.rdb_child_pid = childpid;

        // 关闭自动 rehash
        updateDictResizePolicy();

        return REDIS_OK;
    }

    return REDIS_OK; /* unreached */
}



rdb文件的生成过程其实挺简单的,大概流程如下:

  • 创建临时的rdb文件保存数据,fork的子线程拷贝了夫线程的所有db数据
  • 写入各类前置数据类似版本号之类
  • 遍历所有db写入内存数据
  • 写入各类后者数据类似校验码等
/* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success 
 *
 * 将数据库保存到磁盘上。
 *
 * 保存成功返回 REDIS_OK ,出错/失败返回 REDIS_ERR 。
 */
int rdbSave(char *filename) {
    dictIterator *di = NULL;
    dictEntry *de;
    char tmpfile[256];
    char magic[10];
    int j;
    long long now = mstime();
    FILE *fp;
    rio rdb;
    uint64_t cksum;

    // 创建临时文件
    snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
    fp = fopen(tmpfile,"w");
    if (!fp) {
        redisLog(REDIS_WARNING, "Failed opening .rdb for saving: %s",
            strerror(errno));
        return REDIS_ERR;
    }

    // 初始化 I/O
    rioInitWithFile(&rdb,fp);

    // 设置校验和函数
    if (server.rdb_checksum)
        rdb.update_cksum = rioGenericUpdateChecksum;

    // 写入 RDB 版本号
    snprintf(magic,sizeof(magic),"REDIS%04d",REDIS_RDB_VERSION);
    if (rdbWriteRaw(&rdb,magic,9) == -1) goto werr;

    // 遍历所有数据库
    for (j = 0; j < server.dbnum; j++) {

        // 指向数据库
        redisDb *db = server.db+j;

        // 指向数据库键空间
        dict *d = db->dict;

        // 跳过空数据库
        if (dictSize(d) == 0) continue;

        // 创建键空间迭代器
        di = dictGetSafeIterator(d);
        if (!di) {
            fclose(fp);
            return REDIS_ERR;
        }

        /* Write the SELECT DB opcode 
         *
         * 写入 DB 选择器
         */
        if (rdbSaveType(&rdb,REDIS_RDB_OPCODE_SELECTDB) == -1) goto werr;
        if (rdbSaveLen(&rdb,j) == -1) goto werr;

        /* Iterate this DB writing every entry 
         *
         * 遍历数据库,并写入每个键值对的数据
         */
        while((de = dictNext(di)) != NULL) {
            sds keystr = dictGetKey(de);
            robj key, *o = dictGetVal(de);
            long long expire;
            
            // 根据 keystr ,在栈中创建一个 key 对象
            initStaticStringObject(key,keystr);

            // 获取键的过期时间
            expire = getExpire(db,&key);

            // 保存键值对数据
            if (rdbSaveKeyValuePair(&rdb,&key,o,expire,now) == -1) goto werr;
        }
        dictReleaseIterator(di);
    }
    di = NULL; /* So that we don't release it again on error. */

    /* EOF opcode 
     *
     * 写入 EOF 代码
     */
    if (rdbSaveType(&rdb,REDIS_RDB_OPCODE_EOF) == -1) goto werr;

    /* CRC64 checksum. It will be zero if checksum computation is disabled, the
     * loading code skips the check in this case. 
     *
     * CRC64 校验和。
     *
     * 如果校验和功能已关闭,那么 rdb.cksum 将为 0 ,
     * 在这种情况下, RDB 载入时会跳过校验和检查。
     */
    cksum = rdb.cksum;
    memrev64ifbe(&cksum);
    rioWrite(&rdb,&cksum,8);

    /* Make sure data will not remain on the OS's output buffers */
    // 冲洗缓存,确保数据已写入磁盘
    if (fflush(fp) == EOF) goto werr;
    if (fsync(fileno(fp)) == -1) goto werr;
    if (fclose(fp) == EOF) goto werr;

    /* Use RENAME to make sure the DB file is changed atomically only
     * if the generate DB file is ok. 
     *
     * 使用 RENAME ,原子性地对临时文件进行改名,覆盖原来的 RDB 文件。
     */
    if (rename(tmpfile,filename) == -1) {
        redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));
        unlink(tmpfile);
        return REDIS_ERR;
    }

    // 写入完成,打印日志
    redisLog(REDIS_NOTICE,"DB saved on disk");

    // 清零数据库脏状态
    server.dirty = 0;

    // 记录最后一次完成 SAVE 的时间
    server.lastsave = time(NULL);

    // 记录最后一次执行 SAVE 的状态
    server.lastbgsave_status = REDIS_OK;

    return REDIS_OK;

werr:
    // 关闭文件
    fclose(fp);
    // 删除文件
    unlink(tmpfile);

    redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));

    if (di) dictReleaseIterator(di);

    return REDIS_ERR;
}


redis 检查rdb是否完成

 通过检查server.rdb_child_pid或者server.aof_child_pid确认是否执行rdb文件生成或者aof文件写入。

  • rdb生成完成执行backgroundSaveDoneHandler函数
  • aof生成完成执行backgroundRewriteDoneHandler函数
    暂时我们只关心rdb文件生成也就是跟进backgroundSaveDoneHandler过程
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    
    // 检查 BGSAVE 或者 BGREWRITEAOF 是否已经执行完毕
    if (server.rdb_child_pid != -1 || server.aof_child_pid != -1) {
        int statloc;
        pid_t pid;

        // 接收子进程发来的信号,非阻塞
        if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
            int exitcode = WEXITSTATUS(statloc);
            int bysignal = 0;
            
            if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);

            // BGSAVE 执行完毕
            if (pid == server.rdb_child_pid) {
                backgroundSaveDoneHandler(exitcode,bysignal);

            // BGREWRITEAOF 执行完毕
            } else if (pid == server.aof_child_pid) {
                backgroundRewriteDoneHandler(exitcode,bysignal);

            } else {
                redisLog(REDIS_WARNING,
                    "Warning, detected child with unmatched pid: %ld",
                    (long)pid);
            }
            updateDictResizePolicy();
        }
    } else {
        // 既然没有 BGSAVE 或者 BGREWRITEAOF 在执行,那么检查是否需要执行它们

        // 遍历所有保存条件,看是否需要执行 BGSAVE 命令
         for (j = 0; j < server.saveparamslen; j++) {
            struct saveparam *sp = server.saveparams+j;

            // 检查是否有某个保存条件已经满足了
            if (server.dirty >= sp->changes &&
                server.unixtime-server.lastsave > sp->seconds &&
                (server.unixtime-server.lastbgsave_try >
                 REDIS_BGSAVE_RETRY_DELAY ||
                 server.lastbgsave_status == REDIS_OK))
            {
                redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
                    sp->changes, (int)sp->seconds);
                // 执行 BGSAVE
                rdbSaveBackground(server.rdb_filename);
                break;
            }
         }

         /* Trigger an AOF rewrite if needed */
        // 出发 BGREWRITEAOF
         if (server.rdb_child_pid == -1 &&
             server.aof_child_pid == -1 &&
             server.aof_rewrite_perc &&
             // AOF 文件的当前大小大于执行 BGREWRITEAOF 所需的最小大小
             server.aof_current_size > server.aof_rewrite_min_size)
         {
            // 上一次完成 AOF 写入之后,AOF 文件的大小
            long long base = server.aof_rewrite_base_size ?
                            server.aof_rewrite_base_size : 1;

            // AOF 文件当前的体积相对于 base 的体积的百分比
            long long growth = (server.aof_current_size*100/base) - 100;

            // 如果增长体积的百分比超过了 growth ,那么执行 BGREWRITEAOF
            if (growth >= server.aof_rewrite_perc) {
                redisLog(REDIS_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
                // 执行 BGREWRITEAOF
                rewriteAppendOnlyFileBackground();
            }
         }
    }
}



 bgsave完成后我们执行updateSlavesWaitingBgsave来实现rdb数据的同步。

/* 
 * 处理 BGSAVE 完成时发送的信号
 */
void backgroundSaveDoneHandler(int exitcode, int bysignal) {

    // BGSAVE 成功
    if (!bysignal && exitcode == 0) {
        redisLog(REDIS_NOTICE,
            "Background saving terminated with success");
        server.dirty = server.dirty - server.dirty_before_bgsave;
        server.lastsave = time(NULL);
        server.lastbgsave_status = REDIS_OK;

    // BGSAVE 出错
    } else if (!bysignal && exitcode != 0) {
        redisLog(REDIS_WARNING, "Background saving error");
        server.lastbgsave_status = REDIS_ERR;

    // BGSAVE 被中断
    } else {
        redisLog(REDIS_WARNING,
            "Background saving terminated by signal %d", bysignal);
        // 移除临时文件
        rdbRemoveTempFile(server.rdb_child_pid);
        /* SIGUSR1 is whitelisted, so we have a way to kill a child without
         * tirggering an error conditon. */
        if (bysignal != SIGUSR1)
            server.lastbgsave_status = REDIS_ERR;
    }

    // 更新服务器状态
    server.rdb_child_pid = -1;
    server.rdb_save_time_last = time(NULL)-server.rdb_save_time_start;
    server.rdb_save_time_start = -1;

    /* Possibly there are slaves waiting for a BGSAVE in order to be served
     * (the first stage of SYNC is a bulk transfer of dump.rdb) */
    // 处理正在等待 BGSAVE 完成的那些 slave
    updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
}



  在updateSlavesWaitingBgsave过程中我们把和slave连接的socket注册写事件到eventLoop当中且回调函数为sendBulkToSlave,通过该回调函数实现rdb文件的传输。
  可以看出来整个同步过程中我们会同步master到所有的slave节点,注意是所有的slave节点。

/* 
 * 在每次 BGSAVE 执行完毕之后使用
 * bgsaveerr 可能是 REDIS_OK 或者 REDIS_ERR ,显示 BGSAVE 的执行结果
 * 这个函数是在 BGSAVE 完成之后的异步回调函数,
 * 它指导该怎么执行和 slave 相关的 RDB 下一步工作。
 */
void updateSlavesWaitingBgsave(int bgsaveerr) {
    listNode *ln;
    int startbgsave = 0;
    listIter li;

    // 遍历所有 slave
    listRewind(server.slaves,&li);
    while((ln = listNext(&li))) {
        redisClient *slave = ln->value;

        if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
            // 之前的 RDB 文件不能被 slave 使用,
            // 开始新的 BGSAVE
            startbgsave = 1;
            slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
        } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {

            // 执行到这里,说明有 slave 在等待 BGSAVE 完成
            struct redis_stat buf;

            // 但是 BGSAVE 执行错误
            if (bgsaveerr != REDIS_OK) {
                // 释放 slave
                freeClient(slave);
                redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
                continue;
            }

            // 打开 RDB 文件
            if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 ||
                redis_fstat(slave->repldbfd,&buf) == -1) {
                freeClient(slave);
                redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
                continue;
            }

            // 设置偏移量,各种值
            slave->repldboff = 0;
            slave->repldbsize = buf.st_size;
            // 更新状态
            slave->replstate = REDIS_REPL_SEND_BULK;

            slave->replpreamble = sdscatprintf(sdsempty(),"$%lld\r\n",
                (unsigned long long) slave->repldbsize);

            // 清空之前的写事件处理器
            aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
            // 将 sendBulkToSlave 安装为 slave 的写事件处理器
            // 它用于将 RDB 文件发送给 slave
            if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
                freeClient(slave);
                continue;
            }
        }
    }

    // 需要执行新的 BGSAVE
    if (startbgsave) {
        // 开始行的 BGSAVE ,并清空脚本缓存
        replicationScriptCacheFlush();
        if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) {
            listIter li;

            listRewind(server.slaves,&li);
            redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
            while((ln = listNext(&li))) {
                redisClient *slave = ln->value;

                if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
                    freeClient(slave);
            }
        }
    }
}



  sendBulkToSlave内部主要实现两个事情,都是和数据传输有关:

  • master传递rdb文件内容给slave
  • master将slave的socket的写事件注册到eventLoop当中且回调函数为sendReplyToClient,在sendReplyToClient内部把缓存的redis操作命令同步到slave。
// master 将 RDB 文件发送给 slave 的写事件处理器
void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
    redisClient *slave = privdata;
    REDIS_NOTUSED(el);
    REDIS_NOTUSED(mask);
    char buf[REDIS_IOBUF_LEN];
    ssize_t nwritten, buflen;

    /* Before sending the RDB file, we send the preamble as configured by the
     * replication process. Currently the preamble is just the bulk count of
     * the file in the form "$<length>\r\n". */
    if (slave->replpreamble) {
        nwritten = write(fd,slave->replpreamble,sdslen(slave->replpreamble));
        if (nwritten == -1) {
            redisLog(REDIS_VERBOSE,"Write error sending RDB preamble to slave: %s",
                strerror(errno));
            freeClient(slave);
            return;
        }
        sdsrange(slave->replpreamble,nwritten,-1);
        if (sdslen(slave->replpreamble) == 0) {
            sdsfree(slave->replpreamble);
            slave->replpreamble = NULL;
            /* fall through sending data. */
        } else {
            return;
        }
    }

    /* If the preamble was already transfered, send the RDB bulk data. */
    lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
    // 读取 RDB 数据
    buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
    if (buflen <= 0) {
        redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
            (buflen == 0) ? "premature EOF" : strerror(errno));
        freeClient(slave);
        return;
    }
    // 写入数据到 slave
    if ((nwritten = write(fd,buf,buflen)) == -1) {
        if (errno != EAGAIN) {
            redisLog(REDIS_WARNING,"Write error sending DB to slave: %s",
                strerror(errno));
            freeClient(slave);
        }
        return;
    }

    // 如果写入成功,那么更新写入字节数到 repldboff ,等待下次继续写入
    slave->repldboff += nwritten;

    // 如果写入已经完成
    if (slave->repldboff == slave->repldbsize) {
        // 关闭 RDB 文件描述符
        close(slave->repldbfd);
        slave->repldbfd = -1;
        // 删除之前绑定的写事件处理器
        aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
        // 将状态更新为 REDIS_REPL_ONLINE
        slave->replstate = REDIS_REPL_ONLINE;
        // 更新响应时间
        slave->repl_ack_time = server.unixtime;
        // 创建向从服务器发送命令的写事件处理器
        // 将保存并发送 RDB 期间的回复全部发送给从服务器
        if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
            sendReplyToClient, slave) == AE_ERR) {
            redisLog(REDIS_WARNING,"Unable to register writable event for slave bulk transfer: %s", strerror(errno));
            freeClient(slave);
            return;
        }
        // 刷新低延迟 slave 数量
        refreshGoodSlavesCount();
        redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
    }
}
上一篇:DataRabbit 轻量的数据访问框架(04) -- IEntityRelationLoader


下一篇:phalapi-入门篇4(国际化高可用和自动生成文档)