[redis设计与实现][9]复制

复制(Redis2.8)

设置主服务器的地址和端口(SLAVE OF命令)

SLAVEOF host port

Redis的主从复制设置非常方便,只需要在从服务器上设置主服务器的IP和端口即可。如果需要关闭主从同步,只需要执行SLAVEOF NO ONE即可。 该命令的具体描述见官方文档

void slaveofCommand(redisClient *c) {
// 先处理no one,解除现有的主从同步
if (!strcasecmp(c->argv[1]->ptr,no) &&
!strcasecmp(c->argv[2]->ptr,one)) {
if (server.masterhost) {
replicationUnsetMaster();
redisLog(REDIS_NOTICE,MASTER MODE enabled (user request));
}
} else {
long port;
if ((getLongFromObjectOrReply(c, c->argv[2], &port, NULL) != REDIS_OK))
return;
/* Check if we are already attached to the specified slave */
if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr)
&& server.masterport == port) {
redisLog(REDIS_NOTICE,SLAVE OF would result into synchronization with the master we are already connected with. No operation performed.);
addReplySds(c,sdsnew(+OK Already connected to specified master\r\n));
return;
}
/* There was no previous master or the user specified a different one,
* we can continue. */
// 设置新的主从同步,这里只是设置,然后直接返回
replicationSetMaster(c->argv[1]->ptr, port);
redisLog(REDIS_NOTICE,SLAVE OF %s:%d enabled (user request),
server.masterhost, server.masterport);
}
addReply(c,shared.ok);
}
void replicationSetMaster(char *ip, int port) {
sdsfree(server.masterhost);
server.masterhost = sdsdup(ip);
server.masterport = port;
//如果当前slave以前是master,断开所有原先的连接
if (server.master) freeClient(server.master);
disconnectSlaves(); /* Force our slaves to resync with us as well. */
replicationDiscardCachedMaster(); /* Don’t try a PSYNC. */
freeReplicationBacklog(); /* Don’t allow our chained slaves to PSYNC. */
cancelReplicationHandshake();
server.repl_state = REDIS_REPL_CONNECT;
server.master_repl_offset = 0;
}

可以看到,slaveof命令是一个异步命令,执行的时候只是设置了新的主服务器,然后就立马返回结果了。真正执行连接等操作的, 是在定时器中执行的。

/* Replication cron function — used to reconnect to master and
* to detect transfer failures. */
run_with_period(1000) replicationCron();

建立套接字连接

提醒哦那个每隔1秒钟,会调用replicationCron函数,该函数会根据状态执行定时操作。当状态为REDIS_REPL_CONNECT的时候 执行逻辑为:

void replicationCron(void) {
/* Check if we should connect to a MASTER */
if (server.repl_state == REDIS_REPL_CONNECT) {
redisLog(REDIS_NOTICE,Connecting to MASTER %s:%d,
server.masterhost, server.masterport);
if (connectWithMaster() == REDIS_OK) {
redisLog(REDIS_NOTICE,MASTER <-> SLAVE sync started);
}
}
}
int connectWithMaster(void) {
int fd;
fd = anetTcpNonBlockConnect(NULL,server.masterhost,server.masterport);
if (fd == –1) {
redisLog(REDIS_WARNING,Unable to connect to MASTER: %s,
strerror(errno));
return REDIS_ERR;
}
if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) ==
AE_ERR)
{
close(fd);
redisLog(REDIS_WARNING,Can’t create readable event for SYNC);
return REDIS_ERR;
}
server.repl_transfer_lastio = server.unixtime;
server.repl_transfer_s = fd;
server.repl_state = REDIS_REPL_CONNECTING;
return REDIS_OK;
}

如果发现当前主从状态是REDIS_REPL_CONNECT(刚执行slaveof的时候设置的),就会去连接主服务器。当socket连接建立之后, 会注册syncWithMaster这个回调,并且设置主从状态为REDIS_REPL_CONNECTING。

发送PING命令

PING命令都很熟悉了,jedis pool中用来检测当前连接是否有效,用的就是这个命令。手工执行PING命令,Redis会返回一个PONG作为响应。

这里发送PING命令,主要也是为了检测当前和master连接是否正常,master是否能够正常处理命令。

void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
if (server.repl_state == REDIS_REPL_CONNECTING) {
redisLog(REDIS_NOTICE,Non blocking connect for SYNC fired the event.);
/* Delete the writable event so that the readable event remains
* registered and we can wait for the PONG reply. */
aeDeleteFileEvent(server.el,fd,AE_WRITABLE);
server.repl_state = REDIS_REPL_RECEIVE_PONG;
/* Send the PING, don’t check for errors at all, we have the timeout
* that will take care about this. */
syncWrite(fd,PING\r\n,6,100);
return;
}
}

这里当状态是REDIS_REPL_CONNECTING的时候,向master发送了PING命令,然后就等待master返回PONG的响应。

PONG响应也是在这个函数中进行处理的:

void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
/* Receive the PONG command. */
if (server.repl_state == REDIS_REPL_RECEIVE_PONG) {
char buf[1024];
/* Delete the readable event, we no longer need it now that there is
* the PING reply to read. */
aeDeleteFileEvent(server.el,fd,AE_READABLE);
/* Read the reply with explicit timeout. */
buf[0] = \0;
if (syncReadLine(fd,buf,sizeof(buf),
server.repl_syncio_timeout*1000) == –1)
{
redisLog(REDIS_WARNING,
I/O error reading PING reply from master: %s,
strerror(errno));
goto error;
}
/* We accept only two replies as valid, a positive +PONG reply
* (we just check for “+”) or an authentication error.
* Note that older versions of Redis replied with “operation not
* permitted” instead of using a proper error code, so we test
* both. */
if (buf[0] != + &&
strncmp(buf,-NOAUTH,7) != 0 &&
strncmp(buf,-ERR operation not permitted,28) != 0)
{
redisLog(REDIS_WARNING,Error reply to PING from master: ‘%s,buf);
goto error;
} else {
redisLog(REDIS_NOTICE,
Master replied to PING, replication can continue…);
}
}
/* AUTH with the master if required. */
if(server.masterauth) {
err = sendSynchronousCommand(fd,AUTH,server.masterauth,NULL);
if (err[0] == ) {
redisLog(REDIS_WARNING,Unable to AUTH to MASTER: %s,err);
sdsfree(err);
goto error;
}
sdsfree(err);
}
/* Set the slave port, so that Master’s INFO command can list the
* slave listening port correctly. */
{
sds port = sdsfromlonglong(server.port);
err = sendSynchronousCommand(fd,REPLCONF,listening-port,port,
NULL);
sdsfree(port);
/* Ignore the error if any, not all the Redis versions support
* REPLCONF listening-port. */
if (err[0] == ) {
redisLog(REDIS_NOTICE,(Non critical) Master does not understand REPLCONF listening-port: %s, err);
}
sdsfree(err);
}
error:
close(fd);
server.repl_transfer_s = –1;
server.repl_state = REDIS_REPL_CONNECT;
return;
}
  • 如果读取master返回值失败,直接跳转到error,关闭连接,重新将连接状态设置为REDIS_REPL_CONNECT(也就是SLAVEOF执行完成之后的状态), 等待下次定时器重连;
  • 读取响应成功,判断响应值是否为PONG,如果为PONG则表示连接检测完成,将发送当前slave端口信息,用于master同步数据
  • 如果判断是需要认证,切设置了masterauth,则发送AUTH命令,向master发起授权。
    • 如果授权成功,将继续后续的同步流程
    • 如果授权失败,则进入error流程,关闭连接,并等待下次重试

发送端口信息

前面的PONG响应流程里面已经提到了,当正确接收到了PONG响应,或者是完成了认证之后,slave会发起一个REPLCONF命令,将自己的端口发送给master。 master接受到这个命令之后,将slave的端口信息记录到这个slave对应的client对象的slave_listening_port属性中。

void replconfCommand(redisClient *c) {
if (!strcasecmp(c->argv[j]->ptr,listening-port)) {
long port;
if ((getLongFromObjectOrReply(c,c->argv[j+1],
&port,NULL) != REDIS_OK))
return;
c->slave_listening_port = port;
}
}

这时,在master上通过INFO命令,就可以看见slave的端口信息:

INFO replication

同步

还是在syncWithMaster函数中。发送完端口信息之后,slave会尝试进行增量同步:

void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
psync_result = slaveTryPartialResynchronization(fd);
if (psync_result == PSYNC_CONTINUE) {
redisLog(REDIS_NOTICE, MASTER <-> SLAVE sync: Master accepted a Partial Resynchronization.);
return;
}
/* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC
* and the server.repl_master_runid and repl_master_initial_offset are
* already populated. */
if (psync_result == PSYNC_NOT_SUPPORTED) {
redisLog(REDIS_NOTICE,Retrying with SYNC…);
if (syncWrite(fd,SYNC\r\n,6,server.repl_syncio_timeout*1000) == –1) {
redisLog(REDIS_WARNING,I/O error writing to MASTER: %s,
strerror(errno));
goto error;
}
}

如果不支持增量同步,会向master发送SYNC命令做全量同步。增量同步是在Redis2.8中支持的,所以全量同步就不管了。大致的操作流程就是 master做一次BGSAVE,然后将保存的rdb文件通过TCP连接发送给slave,slave加载这个rdb文件。

这里着重了解增量同步:

#define PSYNC_CONTINUE 0
#define PSYNC_FULLRESYNC 1
#define PSYNC_NOT_SUPPORTED 2
int slaveTryPartialResynchronization(int fd) {
char *psync_runid;
char psync_offset[32];
sds reply;
/* Initially set repl_master_initial_offset to -1 to mark the current
* master run_id and offset as not valid. Later if we’ll be able to do
* a FULL resync using the PSYNC command we’ll set the offset at the
* right value, so that this information will be propagated to the
* client structure representing the master into server.master. */
server.repl_master_initial_offset = –1;
if (server.cached_master) {
psync_runid = server.cached_master->replrunid;
snprintf(psync_offset,sizeof(psync_offset),%lld, server.cached_master->reploff+1);
redisLog(REDIS_NOTICE,Trying a partial resynchronization (request %s:%s)., psync_runid, psync_offset);
} else {
redisLog(REDIS_NOTICE,Partial resynchronization not possible (no cached master));
psync_runid = ?;
memcpy(psync_offset,-1,3);
}
/* Issue the PSYNC command */
reply = sendSynchronousCommand(fd,PSYNC,psync_runid,psync_offset,NULL);
if (!strncmp(reply,+FULLRESYNC,11)) {
char *runid = NULL, *offset = NULL;
/* FULL RESYNC, parse the reply in order to extract the run id
* and the replication offset. */
runid = strchr(reply, );
if (runid) {
runid++;
offset = strchr(runid, );
if (offset) offset++;
}
if (!runid || !offset || (offset-runid-1) != REDIS_RUN_ID_SIZE) {
redisLog(REDIS_WARNING,
Master replied with wrong +FULLRESYNC syntax.);
/* This is an unexpected condition, actually the +FULLRESYNC
* reply means that the master supports PSYNC, but the reply
* format seems wrong. To stay safe we blank the master
* runid to make sure next PSYNCs will fail. */
memset(server.repl_master_runid,0,REDIS_RUN_ID_SIZE+1);
} else {
memcpy(server.repl_master_runid, runid, offset-runid-1);
server.repl_master_runid[REDIS_RUN_ID_SIZE] = \0;
server.repl_master_initial_offset = strtoll(offset,NULL,10);
redisLog(REDIS_NOTICE,Full resync from master: %s:%lld,
server.repl_master_runid,
server.repl_master_initial_offset);
}
/* We are going to full resync, discard the cached master structure. */
replicationDiscardCachedMaster();
sdsfree(reply);
return PSYNC_FULLRESYNC;
}
if (!strncmp(reply,+CONTINUE,9)) {
/* Partial resync was accepted, set the replication state accordingly */
redisLog(REDIS_NOTICE,
Successful partial resynchronization with master.);
sdsfree(reply);
replicationResurrectCachedMaster(fd);
return PSYNC_CONTINUE;
}
/* If we reach this point we receied either an error since the master does
* not understand PSYNC, or an unexpected reply from the master.
* Return PSYNC_NOT_SUPPORTED to the caller in both cases. */
if (strncmp(reply,-ERR,4)) {
/* If it’s not an error, log the unexpected event. */
redisLog(REDIS_WARNING,
Unexpected reply to PSYNC from master: %s, reply);
} else {
redisLog(REDIS_NOTICE,
Master does not support PSYNC or is in
error state (reply: %s), reply);
}
sdsfree(reply);
replicationDiscardCachedMaster();
return PSYNC_NOT_SUPPORTED;
}
  1. 首先设置同步偏移量为-1,表示第一次增量更新(其实也就是个全量更新)
  2. 向master发送PSYNC命令,告知master自己的id和同步偏移量
  3. master返回全量更新(FULLRESYNC),保存master返回的偏移量和运行id,清除之前缓存的master信息 确认可以增量同步后,由于第一次是全量同步,因此操作和原全量同步相同:
    void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
    /* Prepare a suitable temp file for bulk transfer */
    while(maxtries–) {
    snprintf(tmpfile,256,
    temp-%d.%ld.rdb,(int)server.unixtime,(long int)getpid());
    dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
    if (dfd != –1) break;
    sleep(1);
    }
    if (dfd == –1) {
    redisLog(REDIS_WARNING,Opening the temp file needed for MASTER <-> SLAVE synchronization: %s,strerror(errno));
    goto error;
    }
    /* Setup the non blocking download of the bulk file. */
    if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)
    == AE_ERR)
    {
    redisLog(REDIS_WARNING,
    Can’t create readable event for SYNC: %s (fd=%d),
    strerror(errno),fd);
    goto error;
    }
    server.repl_state = REDIS_REPL_TRANSFER;
    server.repl_transfer_size = –1;
    server.repl_transfer_read = 0;
    server.repl_transfer_last_fsync_off = 0;
    server.repl_transfer_fd = dfd;
    server.repl_transfer_lastio = server.unixtime;
    server.repl_transfer_tmpfile = zstrdup(tmpfile);
    return;
    }
  4. 创建一个临时文件,用于保存master传回的rdb文件
  5. 开始读取master传输回来的rdb文件,注册readSyncBulkPayload回调函数来处理
  6. 设置当前的状态为REDIS_REPL_TRANSFER,并保存传输文件等中间内容

readSyncBulkPayload函数用于接收master传输的rdb文件,并加载到Redis中,大致流程:

  1. 读取文件长度
  2. 读取文件内容,并保存到本地rdb临时文件中
  3. 读取完成之后,清空Redis数据库
  4. 加载rdb文件
  5. 创建一个master -> slave的通道,将当前slave作为master的client,以继续执行master同步过来的命令
  6. 将同步状态改成REDIS_REPL_CONNECTED,并回写同步偏移量等
  7. 开启aof如果需要(server.aof_state != REDIS_AOF_OFF)

master对PSYNC命令的处理

void syncCommand(redisClient *c) {
/* ignore SYNC if already slave or in monitor mode */
if (c->flags & REDIS_SLAVE) return;
/* Refuse SYNC requests if we are a slave but the link with our master
* is not ok… */
if (server.masterhost && server.repl_state != REDIS_REPL_CONNECTED) {
addReplyError(c,Can’t SYNC while not connected with my master);
return;
}
/* SYNC can’t be issued when the server has pending data to send to
* the client about already issued commands. We need a fresh reply
* buffer registering the differences between the BGSAVE and the current
* dataset, so that we can copy to other slaves if needed. */
if (listLength(c->reply) != 0 || c->bufpos != 0) {
addReplyError(c,SYNC and PSYNC are invalid with pending output);
return;
}
redisLog(REDIS_NOTICE,Slave asks for synchronization);
/* Try a partial resynchronization if this is a PSYNC command.
* If it fails, we continue with usual full resynchronization, however
* when this happens masterTryPartialResynchronization() already
* replied with:
*
* +FULLRESYNC <runid> <offset>
*
* So the slave knows the new runid and offset to try a PSYNC later
* if the connection with the master is lost. */
if (!strcasecmp(c->argv[0]->ptr,psync)) {
if (masterTryPartialResynchronization(c) == REDIS_OK) {
server.stat_sync_partial_ok++;
return; /* No full resync needed, return. */
} else {
char *master_runid = c->argv[1]->ptr;
/* Increment stats for failed PSYNCs, but only if the
* runid is not “?”, as this is used by slaves to force a full
* resync on purpose when they are not albe to partially
* resync. */
if (master_runid[0] != ?) server.stat_sync_partial_err++;
}
} else {
/* If a slave uses SYNC, we are dealing with an old implementation
* of the replication protocol (like redis-cli –slave). Flag the client
* so that we don’t expect to receive REPLCONF ACK feedbacks. */
c->flags |= REDIS_PRE_PSYNC;
}
/* Full resynchronization. */
server.stat_sync_full++;
/* Here we need to check if there is a background saving operation
* in progress, or if it is required to start one */
if (server.rdb_child_pid != –1) {
/* Ok a background save is in progress. Let’s check if it is a good
* one for replication, i.e. if there is another slave that is
* registering differences since the server forked to save */
redisClient *slave;
listNode *ln;
listIter li;
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
slave = ln->value;
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
}
if (ln) {
/* Perfect, the server is already registering differences for
* another slave. Set the right state, and copy the buffer. */
copyClientOutputBuffer(c,slave);
c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
redisLog(REDIS_NOTICE,Waiting for end of BGSAVE for SYNC);
} else {
/* No way, we need to wait for the next BGSAVE in order to
* register differences */
c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
redisLog(REDIS_NOTICE,Waiting for next BGSAVE for SYNC);
}
} else {
/* Ok we don’t have a BGSAVE in progress, let’s start one */
redisLog(REDIS_NOTICE,Starting BGSAVE for SYNC);
if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) {
redisLog(REDIS_NOTICE,Replication failed, can’t BGSAVE);
addReplyError(c,Unable to perform background save);
return;
}
c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
/* Flush the script cache for the new slave. */
replicationScriptCacheFlush();
}
if (server.repl_disable_tcp_nodelay)
anetDisableTcpNoDelay(NULL, c->fd); /* Non critical if it fails. */
c->repldbfd = –1;
c->flags |= REDIS_SLAVE;
server.slaveseldb = –1; /* Force to re-emit the SELECT command. */
listAddNodeTail(server.slaves,c);
if (listLength(server.slaves) == 1 && server.repl_backlog == NULL)
createReplicationBacklog();
return;
}
  1. 首先判断自己是slave的时候不能执行psync
  2. 判断是否需要全量同步,如果不需要,直接退出
  3. 如果需要全量同步,创建一个rdb文件
    • 如果已经在写rdb文件,尽量复用当前的文件
    • 如果没有,则发起一个bgsave

判断是否需要全量同步:

int masterTryPartialResynchronization(redisClient *c) {
long long psync_offset, psync_len;
char *master_runid = c->argv[1]->ptr;
char buf[128];
int buflen;
/* Is the runid of this master the same advertised by the wannabe slave
* via PSYNC? If runid changed this master is a different instance and
* there is no way to continue. */
if (strcasecmp(master_runid, server.runid)) {
/* Run id “?” is used by slaves that want to force a full resync. */
if (master_runid[0] != ?) {
redisLog(REDIS_NOTICE,Partial resynchronization not accepted:
Runid mismatch (Client asked for ‘%s‘, I’m ‘%s‘),
master_runid, server.runid);
} else {
redisLog(REDIS_NOTICE,Full resync requested by slave.);
}
goto need_full_resync;
}
/* We still have the data our slave is asking for? */
if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) !=
REDIS_OK) goto need_full_resync;
if (!server.repl_backlog ||
psync_offset < server.repl_backlog_off ||
psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen))
{
redisLog(REDIS_NOTICE,
Unable to partial resync with the slave for lack of backlog (Slave request was: %lld)., psync_offset);
if (psync_offset > server.master_repl_offset) {
redisLog(REDIS_WARNING,
Warning: slave tried to PSYNC with an offset that is greater than the master replication offset.);
}
goto need_full_resync;
}
/* If we reached this point, we are able to perform a partial resync:
* 1) Set client state to make it a slave.
* 2) Inform the client we can continue with +CONTINUE
* 3) Send the backlog data (from the offset to the end) to the slave. */
c->flags |= REDIS_SLAVE;
c->replstate = REDIS_REPL_ONLINE;
c->repl_ack_time = server.unixtime;
listAddNodeTail(server.slaves,c);
/* We can’t use the connection buffers since they are used to accumulate
* new commands at this stage. But we are sure the socket send buffer is
* emtpy so this write will never fail actually. */
buflen = snprintf(buf,sizeof(buf),+CONTINUE\r\n);
if (write(c->fd,buf,buflen) != buflen) {
freeClientAsync(c);
return REDIS_OK;
}
psync_len = addReplyReplicationBacklog(c,psync_offset);
redisLog(REDIS_NOTICE,
Partial resynchronization request accepted. Sending %lld bytes of backlog starting from offset %lld., psync_len, psync_offset);
/* Note that we don’t need to set the selected DB at server.slaveseldb
* to -1 to force the master to emit SELECT, since the slave already
* has this state from the previous connection with the master. */
refreshGoodSlavesCount();
return REDIS_OK; /* The caller can return, no full resync needed. */
need_full_resync:
/* We need a full resync for some reason… notify the client. */
psync_offset = server.master_repl_offset;
/* Add 1 to psync_offset if it the replication backlog does not exists
* as when it will be created later we’ll increment the offset by one. */
if (server.repl_backlog == NULL) psync_offset++;
/* Again, we can’t use the connection buffers (see above). */
buflen = snprintf(buf,sizeof(buf),+FULLRESYNC %s %lld\r\n,
server.runid,psync_offset);
if (write(c->fd,buf,buflen) != buflen) {
freeClientAsync(c);
return REDIS_OK;
}
return REDIS_ERR;
}

主要场景有两个:

  1. 当前请求的id和server的id不匹配
  2. 当前Redis保存的日志无法满足slave要求的偏移量
    • master还没有back log
    • master back log长度不够

同时,每次rdb文件保存完毕的时候,都会调用updateSlavesWaitingBgsave函数,处理保存的rdb文件。

void updateSlavesWaitingBgsave(int bgsaveerr) {
listNode *ln;
int startbgsave = 0;
listIter li;
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
redisClient *slave = ln->value;
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
startbgsave = 1;
slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
} else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
// rdb文件写入完毕
struct redis_stat buf;
if (bgsaveerr != REDIS_OK) {
freeClient(slave);
redisLog(REDIS_WARNING,SYNC failed. BGSAVE child returned an error);
continue;
}
// 打开刚写入的rdb文件
if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == –1 ||
redis_fstat(slave->repldbfd,&buf) == –1) {
freeClient(slave);
redisLog(REDIS_WARNING,SYNC failed. Can’t open/stat DB after BGSAVE: %s, strerror(errno));
continue;
}
slave->repldboff = 0;
slave->repldbsize = buf.st_size;
slave->replstate = REDIS_REPL_SEND_BULK;
aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
// 开始发送rdb文件
if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
freeClient(slave);
continue;
}
}
}
if (startbgsave) {
/* Since we are starting a new background save for one or more slaves,
* we flush the Replication Script Cache to use EVAL to propagate every
* new EVALSHA for the first time, since all the new slaves don’t know
* about previous scripts. */
replicationScriptCacheFlush();
if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) {
listIter li;
listRewind(server.slaves,&li);
redisLog(REDIS_WARNING,SYNC failed. BGSAVE failed);
while((ln = listNext(&li))) {
redisClient *slave = ln->value;
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
freeClient(slave);
}
}
}
}

命令传播

上面的流程结束,slave已经包含了master BGSAVE时所包含的所有数据。后续就需要master一直将自己的命令发送给slave。

void call(redisClient *c, int flags) {
/* Propagate the command into the AOF and replication link */
if (flags & REDIS_CALL_PROPAGATE) {
int flags = REDIS_PROPAGATE_NONE;
if (c->flags & REDIS_FORCE_REPL) flags |= REDIS_PROPAGATE_REPL;
if (c->flags & REDIS_FORCE_AOF) flags |= REDIS_PROPAGATE_AOF;
if (dirty)
flags |= (REDIS_PROPAGATE_REPL | REDIS_PROPAGATE_AOF);
if (flags != REDIS_PROPAGATE_NONE)
propagate(c->cmd,c->db->id,c->argv,c->argc,flags);
}
}
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
int flags)
{
if (server.aof_state != REDIS_AOF_OFF && flags & REDIS_PROPAGATE_AOF)
feedAppendOnlyFile(cmd,dbid,argv,argc);
if (flags & REDIS_PROPAGATE_REPL)
replicationFeedSlaves(server.slaves,dbid,argv,argc);
}

在调用任何命令的时候,都会将命令分发到slave上去(除了AOF加载或者命令加了REDIS_CMD_SKIP_MONITOR标签)。

replicationFeedSlaves函数主要作用有两个:

  1. 将命令发送给所有在线的slave
  2. 将命令写入到back log中,方便后续增量同步
    void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
    listNode *ln;
    listIter li;
    int j, len;
    char llstr[REDIS_LONGSTR_SIZE];
    /* If there aren’t slaves, and there is no backlog buffer to populate,
    * we can return ASAP. */
    if (server.repl_backlog == NULL && listLength(slaves) == 0) return;
    /* We can’t have slaves attached and no backlog. */
    redisAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL));
    /* Send SELECT command to every slave if needed. */
    if (server.slaveseldb != dictid) {
    robj *selectcmd;
    /* For a few DBs we have pre-computed SELECT command. */
    // 每次都增加一个SELECT命令,防止弄错db
    if (dictid >= 0 && dictid < REDIS_SHARED_SELECT_CMDS) {
    selectcmd = shared.select[dictid];
    } else {
    int dictid_len;
    dictid_len = ll2string(llstr,sizeof(llstr),dictid);
    selectcmd = createObject(REDIS_STRING,
    sdscatprintf(sdsempty(),
    *2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n,
    dictid_len, llstr));
    }
    /* Add the SELECT command into the backlog. */
    // 将select命令写入到backlog中
    if (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd);
    /* Send it to slaves. */
    // 将select命令发送给slave
    listRewind(slaves,&li);
    while((ln = listNext(&li))) {
    redisClient *slave = ln->value;
    addReply(slave,selectcmd);
    }
    if (dictid < 0 || dictid >= REDIS_SHARED_SELECT_CMDS)
    decrRefCount(selectcmd);
    }
    server.slaveseldb = dictid;
    /* Write the command to the replication backlog if any. */
    // 将命令写入到backlog中
    if (server.repl_backlog) {
    char aux[REDIS_LONGSTR_SIZE+3];
    /* Add the multi bulk reply length. */
    aux[0] = *;
    len = ll2string(aux+1,sizeof(aux)-1,argc);
    aux[len+1] = \r;
    aux[len+2] = \n;
    feedReplicationBacklog(aux,len+3);
    for (j = 0; j < argc; j++) {
    long objlen = stringObjectLen(argv[j]);
    /* We need to feed the buffer with the object as a bulk reply
    * not just as a plain string, so create the $..CRLF payload len
    * ad add the final CRLF */
    aux[0] = $;
    len = ll2string(aux+1,sizeof(aux)-1,objlen);
    aux[len+1] = \r;
    aux[len+2] = \n;
    feedReplicationBacklog(aux,len+3);
    feedReplicationBacklogWithObject(argv[j]);
    feedReplicationBacklog(aux+len+1,2);
    }
    }
    /* Write the command to every slave. */
    // 将命令发送到所有的slave
    listRewind(slaves,&li);
    while((ln = listNext(&li))) {
    redisClient *slave = ln->value;
    /* Don’t feed slaves that are still waiting for BGSAVE to start */
    if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
    /* Feed slaves that are waiting for the initial SYNC (so these commands
    * are queued in the output buffer until the initial SYNC completes),
    * or are already in sync with the master. */
    /* Add the multi bulk length. */
    addReplyMultiBulkLen(slave,argc);
    /* Finally any additional argument that was not stored inside the
    * static buffer if any (from j to argc). */
    for (j = 0; j < argc; j++)
    addReplyBulk(slave,argv[j]);
    }
    }

    注:backlog大小可以设置,默认的大小为1M,如果超过,覆盖最初的日志

    #define REDIS_DEFAULT_REPL_BACKLOG_SIZE (1024*1024) /* 1mb */

心跳检测和命令丢失补偿

在命令传播阶段,slave每秒一次向master发送REPLCONF命令,发送当前的offset,让master检测是否有命令丢失。 这个也是在定时器中发送的。

void replicationCron(void) {
if (server.masterhost && server.master &&
!(server.master->flags & REDIS_PRE_PSYNC))
replicationSendAck();
}
void replicationSendAck(void) {
redisClient *c = server.master;
if (c != NULL) {
c->flags |= REDIS_MASTER_FORCE_REPLY;
addReplyMultiBulkLen(c,3);
addReplyBulkCString(c,REPLCONF);
addReplyBulkCString(c,ACK);
addReplyBulkLongLong(c,c->reploff);
c->flags &= ~REDIS_MASTER_FORCE_REPLY;
}
}

同时,master在接收到这个ACK包的时候,会记录slave的ack offset和ack时间:

void replconfCommand(redisClient *c) {
else if (!strcasecmp(c->argv[j]->ptr,ack)) {
/* REPLCONF ACK is used by slave to inform the master the amount
* of replication stream that it processed so far. It is an
* internal only command that normal clients should never use. */
long long offset;
if (!(c->flags & REDIS_SLAVE)) return;
if ((getLongLongFromObject(c->argv[j+1], &offset) != REDIS_OK))
return;
if (offset > c->repl_ack_off)
c->repl_ack_off = offset;
c->repl_ack_time = server.unixtime;
/* Note: this command does not reply anything! */
return;
}
}

还是在定时器中,每次调用的时候都会清理已经超时的slave:

void replicationCron(void) {
/* Disconnect timedout slaves. */
if (listLength(server.slaves)) {
listIter li;
listNode *ln;
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
redisClient *slave = ln->value;
if (slave->replstate != REDIS_REPL_ONLINE) continue;
if (slave->flags & REDIS_PRE_PSYNC) continue;
if ((server.unixtime – slave->repl_ack_time) > server.repl_timeout)
{
char ip[REDIS_IP_STR_LEN];
int port;
if (anetPeerToString(slave->fd,ip,sizeof(ip),&port) != –1) {
redisLog(REDIS_WARNING,
Disconnecting timedout slave: %s:%d,
ip, slave->slave_listening_port);
}
freeClient(slave);
}
}
}
}

这里的repl_ack_time由slave每次发送的ack包写入,server.repl_timeout默认值是60s:

#define REDIS_REPL_TIMEOUT 60

增量同步

master断开了slave连接之后,slave为了能够进行增量同步,freeClient的实现,针对master的slave client,也有不同的处理:

void freeClient(redisClient *c) {
/* If it is our master that’s beging disconnected we should make sure
* to cache the state to try a partial resynchronization later.
*
* Note that before doing this we make sure that the client is not in
* some unexpected state, by checking its flags. */
if (server.master && c->flags & REDIS_MASTER) {
redisLog(REDIS_WARNING,Connection with master lost.);
if (!(c->flags & (REDIS_CLOSE_AFTER_REPLY|
REDIS_CLOSE_ASAP|
REDIS_BLOCKED|
REDIS_UNBLOCKED)))
{
replicationCacheMaster(c);
return;
}
}
}
void replicationCacheMaster(redisClient *c) {
listNode *ln;
redisAssert(server.master != NULL && server.cached_master == NULL);
redisLog(REDIS_NOTICE,Caching the disconnected master state.);
/* Remove from the list of clients, we don’t want this client to be
* listed by CLIENT LIST or processed in any way by batch operations. */
// 首先将slave从client列表中删除
ln = listSearchKey(server.clients,c);
redisAssert(ln != NULL);
listDelNode(server.clients,ln);
/* Save the master. Server.master will be set to null later by
* replicationHandleMasterDisconnection(). */
// 把slave的master保存到cached_master中
server.cached_master = server.master;
/* Remove the event handlers and close the socket. We’ll later reuse
* the socket of the new connection with the master during PSYNC. */
// 清理slave连接,释放资源
aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
close(c->fd);
/* Set fd to -1 so that we can safely call freeClient(c) later. */
c->fd = –1;
/* Invalidate the Peer ID cache. */
if (c->peerid) {
sdsfree(c->peerid);
c->peerid = NULL;
}
·
/* Caching the master happens instead of the actual freeClient() call,
* so make sure to adjust the replication state. This function will
* also set server.master to NULL. */
replicationHandleMasterDisconnection();
}
void replicationHandleMasterDisconnection(void) {
server.master = NULL;
server.repl_state = REDIS_REPL_CONNECT;
server.repl_down_since = server.unixtime;
/* We lost connection with our master, force our slaves to resync
* with us as well to load the new data set.
*
* If server.masterhost is NULL the user called SLAVEOF NO ONE so
* slave resync is not needed. */
if (server.masterhost != NULL) disconnectSlaves();
}

经过这些处理,一个断开连接的slave,复制状态变成了REDIS_REPL_CONNECT。按照之前的流程,定时器会去尝试连接master, 发送PING命令,然后再发送PSYNC命令的时候,由于已经有了cached_master,会在PSYNC命令中带上之前master的id和偏移量。 相关slave和master的处理逻辑,前面代码中已经有了。


转载自:https://coolex.info/blog/463.html

上一篇:C# 通过form表单下载文本文件


下一篇:修复52项安全漏洞:请用户尽快升级Flash Player