复制(Redis2.8)
设置主服务器的地址和端口(SLAVE OF命令)
SLAVEOF host port
Redis的主从复制设置非常方便,只需要在从服务器上设置主服务器的IP和端口即可。如果需要关闭主从同步,只需要执行SLAVEOF NO ONE即可。 该命令的具体描述见官方文档
void slaveofCommand(redisClient *c) {
if (!strcasecmp(c->argv[1]->ptr,“no“) &&
!strcasecmp(c->argv[2]->ptr,“one“)) {
if (server.masterhost) {
replicationUnsetMaster();
redisLog(REDIS_NOTICE,“MASTER MODE enabled (user request)“);
}
} else {
long port;
if ((getLongFromObjectOrReply(c, c->argv[2], &port, NULL) != REDIS_OK))
return;
if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr)
&& server.masterport == port) {
redisLog(REDIS_NOTICE,“SLAVE OF would result into synchronization with the master we are already connected with. No operation performed.“);
addReplySds(c,sdsnew(“+OK Already connected to specified master\r\n“));
return;
}
replicationSetMaster(c->argv[1]->ptr, port);
redisLog(REDIS_NOTICE,“SLAVE OF %s:%d enabled (user request)“,
server.masterhost, server.masterport);
}
addReply(c,shared.ok);
}
void replicationSetMaster(char *ip, int port) {
sdsfree(server.masterhost);
server.masterhost = sdsdup(ip);
server.masterport = port;
if (server.master) freeClient(server.master);
disconnectSlaves();
replicationDiscardCachedMaster();
freeReplicationBacklog();
cancelReplicationHandshake();
server.repl_state = REDIS_REPL_CONNECT;
server.master_repl_offset = 0;
}
可以看到,slaveof命令是一个异步命令,执行的时候只是设置了新的主服务器,然后就立马返回结果了。真正执行连接等操作的, 是在定时器中执行的。
Replication cron function — used to reconnect to master and
run_with_period(1000) replicationCron();
建立套接字连接
提醒哦那个每隔1秒钟,会调用replicationCron函数,该函数会根据状态执行定时操作。当状态为REDIS_REPL_CONNECT的时候 执行逻辑为:
void replicationCron(void) {
…
if (server.repl_state == REDIS_REPL_CONNECT) {
redisLog(REDIS_NOTICE,“Connecting to MASTER %s:%d“,
server.masterhost, server.masterport);
if (connectWithMaster() == REDIS_OK) {
redisLog(REDIS_NOTICE,“MASTER <-> SLAVE sync started“);
}
}
…
}
int connectWithMaster(void) {
int fd;
fd = anetTcpNonBlockConnect(NULL,server.masterhost,server.masterport);
if (fd == –1) {
redisLog(REDIS_WARNING,“Unable to connect to MASTER: %s“,
strerror(errno));
return REDIS_ERR;
}
if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) ==
AE_ERR)
{
close(fd);
redisLog(REDIS_WARNING,“Can’t create readable event for SYNC“);
return REDIS_ERR;
}
server.repl_transfer_lastio = server.unixtime;
server.repl_transfer_s = fd;
server.repl_state = REDIS_REPL_CONNECTING;
return REDIS_OK;
}
如果发现当前主从状态是REDIS_REPL_CONNECT(刚执行slaveof的时候设置的),就会去连接主服务器。当socket连接建立之后, 会注册syncWithMaster这个回调,并且设置主从状态为REDIS_REPL_CONNECTING。
发送PING命令
PING命令都很熟悉了,jedis pool中用来检测当前连接是否有效,用的就是这个命令。手工执行PING命令,Redis会返回一个PONG作为响应。
这里发送PING命令,主要也是为了检测当前和master连接是否正常,master是否能够正常处理命令。
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
…
if (server.repl_state == REDIS_REPL_CONNECTING) {
redisLog(REDIS_NOTICE,“Non blocking connect for SYNC fired the event.“);
aeDeleteFileEvent(server.el,fd,AE_WRITABLE);
server.repl_state = REDIS_REPL_RECEIVE_PONG;
syncWrite(fd,“PING\r\n“,6,100);
return;
}
…
}
这里当状态是REDIS_REPL_CONNECTING的时候,向master发送了PING命令,然后就等待master返回PONG的响应。
PONG响应也是在这个函数中进行处理的:
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
…
if (server.repl_state == REDIS_REPL_RECEIVE_PONG) {
char buf[1024];
aeDeleteFileEvent(server.el,fd,AE_READABLE);
buf[0] = ‘\0‘;
if (syncReadLine(fd,buf,sizeof(buf),
server.repl_syncio_timeout*1000) == –1)
{
redisLog(REDIS_WARNING,
“I/O error reading PING reply from master: %s“,
strerror(errno));
goto error;
}
* (we just check for “+”) or an authentication error.
* Note that older versions of Redis replied with “operation not
* permitted” instead of using a proper error code, so we test
if (buf[0] != ‘+‘ &&
strncmp(buf,“-NOAUTH“,7) != 0 &&
strncmp(buf,“-ERR operation not permitted“,28) != 0)
{
redisLog(REDIS_WARNING,“Error reply to PING from master: ‘%s‘“,buf);
goto error;
} else {
redisLog(REDIS_NOTICE,
“Master replied to PING, replication can continue…“);
}
}
if(server.masterauth) {
err = sendSynchronousCommand(fd,“AUTH“,server.masterauth,NULL);
if (err[0] == ‘–‘) {
redisLog(REDIS_WARNING,“Unable to AUTH to MASTER: %s“,err);
sdsfree(err);
goto error;
}
sdsfree(err);
}
{
sds port = sdsfromlonglong(server.port);
err = sendSynchronousCommand(fd,“REPLCONF“,“listening-port“,port,
NULL);
sdsfree(port);
if (err[0] == ‘–‘) {
redisLog(REDIS_NOTICE,“(Non critical) Master does not understand REPLCONF listening-port: %s“, err);
}
sdsfree(err);
}
…
error:
close(fd);
server.repl_transfer_s = –1;
server.repl_state = REDIS_REPL_CONNECT;
return;
}
- 如果读取master返回值失败,直接跳转到error,关闭连接,重新将连接状态设置为REDIS_REPL_CONNECT(也就是SLAVEOF执行完成之后的状态), 等待下次定时器重连;
- 读取响应成功,判断响应值是否为PONG,如果为PONG则表示连接检测完成,将发送当前slave端口信息,用于master同步数据
- 如果判断是需要认证,切设置了masterauth,则发送AUTH命令,向master发起授权。
- 如果授权成功,将继续后续的同步流程
- 如果授权失败,则进入error流程,关闭连接,并等待下次重试
发送端口信息
前面的PONG响应流程里面已经提到了,当正确接收到了PONG响应,或者是完成了认证之后,slave会发起一个REPLCONF命令,将自己的端口发送给master。 master接受到这个命令之后,将slave的端口信息记录到这个slave对应的client对象的slave_listening_port属性中。
void replconfCommand(redisClient *c) {
…
if (!strcasecmp(c->argv[j]->ptr,“listening-port“)) {
long port;
if ((getLongFromObjectOrReply(c,c->argv[j+1],
&port,NULL) != REDIS_OK))
return;
c->slave_listening_port = port;
}
…
}
这时,在master上通过INFO命令,就可以看见slave的端口信息:
INFO replication
同步
还是在syncWithMaster函数中。发送完端口信息之后,slave会尝试进行增量同步:
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
…
psync_result = slaveTryPartialResynchronization(fd);
if (psync_result == PSYNC_CONTINUE) {
redisLog(REDIS_NOTICE, “MASTER <-> SLAVE sync: Master accepted a Partial Resynchronization.“);
return;
}
* and the server.repl_master_runid and repl_master_initial_offset are
if (psync_result == PSYNC_NOT_SUPPORTED) {
redisLog(REDIS_NOTICE,“Retrying with SYNC…“);
if (syncWrite(fd,“SYNC\r\n“,6,server.repl_syncio_timeout*1000) == –1) {
redisLog(REDIS_WARNING,“I/O error writing to MASTER: %s“,
strerror(errno));
goto error;
}
}
…
如果不支持增量同步,会向master发送SYNC命令做全量同步。增量同步是在Redis2.8中支持的,所以全量同步就不管了。大致的操作流程就是 master做一次BGSAVE,然后将保存的rdb文件通过TCP连接发送给slave,slave加载这个rdb文件。
这里着重了解增量同步:
#define PSYNC_CONTINUE 0
#define PSYNC_FULLRESYNC 1
#define PSYNC_NOT_SUPPORTED 2
int slaveTryPartialResynchronization(int fd) {
char *psync_runid;
char psync_offset[32];
sds reply;
* master run_id and offset as not valid. Later if we’ll be able to do
* a FULL resync using the PSYNC command we’ll set the offset at the
* right value, so that this information will be propagated to the
server.repl_master_initial_offset = –1;
if (server.cached_master) {
psync_runid = server.cached_master->replrunid;
snprintf(psync_offset,sizeof(psync_offset),“%lld“, server.cached_master->reploff+1);
redisLog(REDIS_NOTICE,“Trying a partial resynchronization (request %s:%s).“, psync_runid, psync_offset);
} else {
redisLog(REDIS_NOTICE,“Partial resynchronization not possible (no cached master)“);
psync_runid = “?“;
memcpy(psync_offset,“-1“,3);
}
reply = sendSynchronousCommand(fd,“PSYNC“,psync_runid,psync_offset,NULL);
if (!strncmp(reply,“+FULLRESYNC“,11)) {
char *runid = NULL, *offset = NULL;
runid = strchr(reply,‘ ‘);
if (runid) {
runid++;
offset = strchr(runid,‘ ‘);
if (offset) offset++;
}
if (!runid || !offset || (offset-runid-1) != REDIS_RUN_ID_SIZE) {
redisLog(REDIS_WARNING,
“Master replied with wrong +FULLRESYNC syntax.“);
* reply means that the master supports PSYNC, but the reply
* format seems wrong. To stay safe we blank the master
memset(server.repl_master_runid,0,REDIS_RUN_ID_SIZE+1);
} else {
memcpy(server.repl_master_runid, runid, offset-runid-1);
server.repl_master_runid[REDIS_RUN_ID_SIZE] = ‘\0‘;
server.repl_master_initial_offset = strtoll(offset,NULL,10);
redisLog(REDIS_NOTICE,“Full resync from master: %s:%lld“,
server.repl_master_runid,
server.repl_master_initial_offset);
}
replicationDiscardCachedMaster();
sdsfree(reply);
return PSYNC_FULLRESYNC;
}
if (!strncmp(reply,“+CONTINUE“,9)) {
redisLog(REDIS_NOTICE,
“Successful partial resynchronization with master.“);
sdsfree(reply);
replicationResurrectCachedMaster(fd);
return PSYNC_CONTINUE;
}
* not understand PSYNC, or an unexpected reply from the master.
if (strncmp(reply,“-ERR“,4)) {
redisLog(REDIS_WARNING,
“Unexpected reply to PSYNC from master: %s“, reply);
} else {
redisLog(REDIS_NOTICE,
“Master does not support PSYNC or is in “
“error state (reply: %s)“, reply);
}
sdsfree(reply);
replicationDiscardCachedMaster();
return PSYNC_NOT_SUPPORTED;
}
- 首先设置同步偏移量为-1,表示第一次增量更新(其实也就是个全量更新)
- 向master发送PSYNC命令,告知master自己的id和同步偏移量
- master返回全量更新(FULLRESYNC),保存master返回的偏移量和运行id,清除之前缓存的master信息 确认可以增量同步后,由于第一次是全量同步,因此操作和原全量同步相同:
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
…
while(maxtries–) {
snprintf(tmpfile,256,
“temp-%d.%ld.rdb“,(int)server.unixtime,(long int)getpid());
dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
if (dfd != –1) break;
sleep(1);
}
if (dfd == –1) {
redisLog(REDIS_WARNING,“Opening the temp file needed for MASTER <-> SLAVE synchronization: %s“,strerror(errno));
goto error;
}
if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)
== AE_ERR)
{
redisLog(REDIS_WARNING,
“Can’t create readable event for SYNC: %s (fd=%d)“,
strerror(errno),fd);
goto error;
}
server.repl_state = REDIS_REPL_TRANSFER;
server.repl_transfer_size = –1;
server.repl_transfer_read = 0;
server.repl_transfer_last_fsync_off = 0;
server.repl_transfer_fd = dfd;
server.repl_transfer_lastio = server.unixtime;
server.repl_transfer_tmpfile = zstrdup(tmpfile);
return;
}
- 创建一个临时文件,用于保存master传回的rdb文件
- 开始读取master传输回来的rdb文件,注册readSyncBulkPayload回调函数来处理
- 设置当前的状态为REDIS_REPL_TRANSFER,并保存传输文件等中间内容
readSyncBulkPayload函数用于接收master传输的rdb文件,并加载到Redis中,大致流程:
- 读取文件长度
- 读取文件内容,并保存到本地rdb临时文件中
- 读取完成之后,清空Redis数据库
- 加载rdb文件
- 创建一个master -> slave的通道,将当前slave作为master的client,以继续执行master同步过来的命令
- 将同步状态改成REDIS_REPL_CONNECTED,并回写同步偏移量等
- 开启aof如果需要(server.aof_state != REDIS_AOF_OFF)
master对PSYNC命令的处理
void syncCommand(redisClient *c) {
if (c->flags & REDIS_SLAVE) return;
if (server.masterhost && server.repl_state != REDIS_REPL_CONNECTED) {
addReplyError(c,“Can’t SYNC while not connected with my master“);
return;
}
* the client about already issued commands. We need a fresh reply
* buffer registering the differences between the BGSAVE and the current
if (listLength(c->reply) != 0 || c->bufpos != 0) {
addReplyError(c,“SYNC and PSYNC are invalid with pending output“);
return;
}
redisLog(REDIS_NOTICE,“Slave asks for synchronization“);
* If it fails, we continue with usual full resynchronization, however
* when this happens masterTryPartialResynchronization() already
* replied with:
*
* +FULLRESYNC <runid> <offset>
*
* So the slave knows the new runid and offset to try a PSYNC later
if (!strcasecmp(c->argv[0]->ptr,“psync“)) {
if (masterTryPartialResynchronization(c) == REDIS_OK) {
server.stat_sync_partial_ok++;
return;
} else {
char *master_runid = c->argv[1]->ptr;
* runid is not “?”, as this is used by slaves to force a full
* resync on purpose when they are not albe to partially
if (master_runid[0] != ‘?‘) server.stat_sync_partial_err++;
}
} else {
* of the replication protocol (like redis-cli –slave). Flag the client
c->flags |= REDIS_PRE_PSYNC;
}
server.stat_sync_full++;
if (server.rdb_child_pid != –1) {
* one for replication, i.e. if there is another slave that is
redisClient *slave;
listNode *ln;
listIter li;
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
slave = ln->value;
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
}
if (ln) {
copyClientOutputBuffer(c,slave);
c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
redisLog(REDIS_NOTICE,“Waiting for end of BGSAVE for SYNC“);
} else {
c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
redisLog(REDIS_NOTICE,“Waiting for next BGSAVE for SYNC“);
}
} else {
redisLog(REDIS_NOTICE,“Starting BGSAVE for SYNC“);
if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) {
redisLog(REDIS_NOTICE,“Replication failed, can’t BGSAVE“);
addReplyError(c,“Unable to perform background save“);
return;
}
c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
replicationScriptCacheFlush();
}
if (server.repl_disable_tcp_nodelay)
anetDisableTcpNoDelay(NULL, c->fd);
c->repldbfd = –1;
c->flags |= REDIS_SLAVE;
server.slaveseldb = –1;
listAddNodeTail(server.slaves,c);
if (listLength(server.slaves) == 1 && server.repl_backlog == NULL)
createReplicationBacklog();
return;
}
- 首先判断自己是slave的时候不能执行psync
- 判断是否需要全量同步,如果不需要,直接退出
- 如果需要全量同步,创建一个rdb文件
- 如果已经在写rdb文件,尽量复用当前的文件
- 如果没有,则发起一个bgsave
判断是否需要全量同步:
int masterTryPartialResynchronization(redisClient *c) {
long long psync_offset, psync_len;
char *master_runid = c->argv[1]->ptr;
char buf[128];
int buflen;
* via PSYNC? If runid changed this master is a different instance and
if (strcasecmp(master_runid, server.runid)) {
if (master_runid[0] != ‘?‘) {
redisLog(REDIS_NOTICE,“Partial resynchronization not accepted: “
“Runid mismatch (Client asked for ‘%s‘, I’m ‘%s‘)“,
master_runid, server.runid);
} else {
redisLog(REDIS_NOTICE,“Full resync requested by slave.“);
}
goto need_full_resync;
}
if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) !=
REDIS_OK) goto need_full_resync;
if (!server.repl_backlog ||
psync_offset < server.repl_backlog_off ||
psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen))
{
redisLog(REDIS_NOTICE,
“Unable to partial resync with the slave for lack of backlog (Slave request was: %lld).“, psync_offset);
if (psync_offset > server.master_repl_offset) {
redisLog(REDIS_WARNING,
“Warning: slave tried to PSYNC with an offset that is greater than the master replication offset.“);
}
goto need_full_resync;
}
* 1) Set client state to make it a slave.
* 2) Inform the client we can continue with +CONTINUE
c->flags |= REDIS_SLAVE;
c->replstate = REDIS_REPL_ONLINE;
c->repl_ack_time = server.unixtime;
listAddNodeTail(server.slaves,c);
* new commands at this stage. But we are sure the socket send buffer is
buflen = snprintf(buf,sizeof(buf),“+CONTINUE\r\n“);
if (write(c->fd,buf,buflen) != buflen) {
freeClientAsync(c);
return REDIS_OK;
}
psync_len = addReplyReplicationBacklog(c,psync_offset);
redisLog(REDIS_NOTICE,
“Partial resynchronization request accepted. Sending %lld bytes of backlog starting from offset %lld.“, psync_len, psync_offset);
* to -1 to force the master to emit SELECT, since the slave already
refreshGoodSlavesCount();
return REDIS_OK;
need_full_resync:
psync_offset = server.master_repl_offset;
if (server.repl_backlog == NULL) psync_offset++;
buflen = snprintf(buf,sizeof(buf),“+FULLRESYNC %s %lld\r\n“,
server.runid,psync_offset);
if (write(c->fd,buf,buflen) != buflen) {
freeClientAsync(c);
return REDIS_OK;
}
return REDIS_ERR;
}
主要场景有两个:
- 当前请求的id和server的id不匹配
- 当前Redis保存的日志无法满足slave要求的偏移量
- master还没有back log
- master back log长度不够
同时,每次rdb文件保存完毕的时候,都会调用updateSlavesWaitingBgsave函数,处理保存的rdb文件。
void updateSlavesWaitingBgsave(int bgsaveerr) {
listNode *ln;
int startbgsave = 0;
listIter li;
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
redisClient *slave = ln->value;
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
startbgsave = 1;
slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
} else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
struct redis_stat buf;
if (bgsaveerr != REDIS_OK) {
freeClient(slave);
redisLog(REDIS_WARNING,“SYNC failed. BGSAVE child returned an error“);
continue;
}
if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == –1 ||
redis_fstat(slave->repldbfd,&buf) == –1) {
freeClient(slave);
redisLog(REDIS_WARNING,“SYNC failed. Can’t open/stat DB after BGSAVE: %s“, strerror(errno));
continue;
}
slave->repldboff = 0;
slave->repldbsize = buf.st_size;
slave->replstate = REDIS_REPL_SEND_BULK;
aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
freeClient(slave);
continue;
}
}
}
if (startbgsave) {
* we flush the Replication Script Cache to use EVAL to propagate every
* new EVALSHA for the first time, since all the new slaves don’t know
replicationScriptCacheFlush();
if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) {
listIter li;
listRewind(server.slaves,&li);
redisLog(REDIS_WARNING,“SYNC failed. BGSAVE failed“);
while((ln = listNext(&li))) {
redisClient *slave = ln->value;
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
freeClient(slave);
}
}
}
}
命令传播
上面的流程结束,slave已经包含了master BGSAVE时所包含的所有数据。后续就需要master一直将自己的命令发送给slave。
void call(redisClient *c, int flags) {
…
if (flags & REDIS_CALL_PROPAGATE) {
int flags = REDIS_PROPAGATE_NONE;
if (c->flags & REDIS_FORCE_REPL) flags |= REDIS_PROPAGATE_REPL;
if (c->flags & REDIS_FORCE_AOF) flags |= REDIS_PROPAGATE_AOF;
if (dirty)
flags |= (REDIS_PROPAGATE_REPL | REDIS_PROPAGATE_AOF);
if (flags != REDIS_PROPAGATE_NONE)
propagate(c->cmd,c->db->id,c->argv,c->argc,flags);
}
…
}
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
int flags)
{
if (server.aof_state != REDIS_AOF_OFF && flags & REDIS_PROPAGATE_AOF)
feedAppendOnlyFile(cmd,dbid,argv,argc);
if (flags & REDIS_PROPAGATE_REPL)
replicationFeedSlaves(server.slaves,dbid,argv,argc);
}
在调用任何命令的时候,都会将命令分发到slave上去(除了AOF加载或者命令加了REDIS_CMD_SKIP_MONITOR标签)。
replicationFeedSlaves函数主要作用有两个:
- 将命令发送给所有在线的slave
- 将命令写入到back log中,方便后续增量同步
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
listNode *ln;
listIter li;
int j, len;
char llstr[REDIS_LONGSTR_SIZE];
if (server.repl_backlog == NULL && listLength(slaves) == 0) return;
redisAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL));
if (server.slaveseldb != dictid) {
robj *selectcmd;
if (dictid >= 0 && dictid < REDIS_SHARED_SELECT_CMDS) {
selectcmd = shared.select[dictid];
} else {
int dictid_len;
dictid_len = ll2string(llstr,sizeof(llstr),dictid);
selectcmd = createObject(REDIS_STRING,
sdscatprintf(sdsempty(),
“*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n“,
dictid_len, llstr));
}
if (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd);
listRewind(slaves,&li);
while((ln = listNext(&li))) {
redisClient *slave = ln->value;
addReply(slave,selectcmd);
}
if (dictid < 0 || dictid >= REDIS_SHARED_SELECT_CMDS)
decrRefCount(selectcmd);
}
server.slaveseldb = dictid;
if (server.repl_backlog) {
char aux[REDIS_LONGSTR_SIZE+3];
aux[0] = ‘*‘;
len = ll2string(aux+1,sizeof(aux)-1,argc);
aux[len+1] = ‘\r‘;
aux[len+2] = ‘\n‘;
feedReplicationBacklog(aux,len+3);
for (j = 0; j < argc; j++) {
long objlen = stringObjectLen(argv[j]);
* not just as a plain string, so create the $..CRLF payload len
aux[0] = ‘$‘;
len = ll2string(aux+1,sizeof(aux)-1,objlen);
aux[len+1] = ‘\r‘;
aux[len+2] = ‘\n‘;
feedReplicationBacklog(aux,len+3);
feedReplicationBacklogWithObject(argv[j]);
feedReplicationBacklog(aux+len+1,2);
}
}
listRewind(slaves,&li);
while((ln = listNext(&li))) {
redisClient *slave = ln->value;
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
* are queued in the output buffer until the initial SYNC completes),
addReplyMultiBulkLen(slave,argc);
for (j = 0; j < argc; j++)
addReplyBulk(slave,argv[j]);
}
}
注:backlog大小可以设置,默认的大小为1M,如果超过,覆盖最初的日志
#define REDIS_DEFAULT_REPL_BACKLOG_SIZE (1024*1024)
心跳检测和命令丢失补偿
在命令传播阶段,slave每秒一次向master发送REPLCONF命令,发送当前的offset,让master检测是否有命令丢失。 这个也是在定时器中发送的。
void replicationCron(void) {
…
if (server.masterhost && server.master &&
!(server.master->flags & REDIS_PRE_PSYNC))
replicationSendAck();
…
}
void replicationSendAck(void) {
redisClient *c = server.master;
if (c != NULL) {
c->flags |= REDIS_MASTER_FORCE_REPLY;
addReplyMultiBulkLen(c,3);
addReplyBulkCString(c,“REPLCONF“);
addReplyBulkCString(c,“ACK“);
addReplyBulkLongLong(c,c->reploff);
c->flags &= ~REDIS_MASTER_FORCE_REPLY;
}
}
同时,master在接收到这个ACK包的时候,会记录slave的ack offset和ack时间:
void replconfCommand(redisClient *c) {
…
else if (!strcasecmp(c->argv[j]->ptr,“ack“)) {
* of replication stream that it processed so far. It is an
long long offset;
if (!(c->flags & REDIS_SLAVE)) return;
if ((getLongLongFromObject(c->argv[j+1], &offset) != REDIS_OK))
return;
if (offset > c->repl_ack_off)
c->repl_ack_off = offset;
c->repl_ack_time = server.unixtime;
return;
}
…
}
还是在定时器中,每次调用的时候都会清理已经超时的slave:
void replicationCron(void) {
…
if (listLength(server.slaves)) {
listIter li;
listNode *ln;
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
redisClient *slave = ln->value;
if (slave->replstate != REDIS_REPL_ONLINE) continue;
if (slave->flags & REDIS_PRE_PSYNC) continue;
if ((server.unixtime – slave->repl_ack_time) > server.repl_timeout)
{
char ip[REDIS_IP_STR_LEN];
int port;
if (anetPeerToString(slave->fd,ip,sizeof(ip),&port) != –1) {
redisLog(REDIS_WARNING,
“Disconnecting timedout slave: %s:%d“,
ip, slave->slave_listening_port);
}
freeClient(slave);
}
}
}
…
}
这里的repl_ack_time由slave每次发送的ack包写入,server.repl_timeout默认值是60s:
#define REDIS_REPL_TIMEOUT 60
增量同步
master断开了slave连接之后,slave为了能够进行增量同步,freeClient的实现,针对master的slave client,也有不同的处理:
void freeClient(redisClient *c) {
…
* to cache the state to try a partial resynchronization later.
*
* Note that before doing this we make sure that the client is not in
if (server.master && c->flags & REDIS_MASTER) {
redisLog(REDIS_WARNING,“Connection with master lost.“);
if (!(c->flags & (REDIS_CLOSE_AFTER_REPLY|
REDIS_CLOSE_ASAP|
REDIS_BLOCKED|
REDIS_UNBLOCKED)))
{
replicationCacheMaster(c);
return;
}
}
…
}
void replicationCacheMaster(redisClient *c) {
listNode *ln;
redisAssert(server.master != NULL && server.cached_master == NULL);
redisLog(REDIS_NOTICE,“Caching the disconnected master state.“);
ln = listSearchKey(server.clients,c);
redisAssert(ln != NULL);
listDelNode(server.clients,ln);
server.cached_master = server.master;
aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
close(c->fd);
c->fd = –1;
if (c->peerid) {
sdsfree(c->peerid);
c->peerid = NULL;
}
·
* so make sure to adjust the replication state. This function will
replicationHandleMasterDisconnection();
}
void replicationHandleMasterDisconnection(void) {
server.master = NULL;
server.repl_state = REDIS_REPL_CONNECT;
server.repl_down_since = server.unixtime;
* with us as well to load the new data set.
*
* If server.masterhost is NULL the user called SLAVEOF NO ONE so
if (server.masterhost != NULL) disconnectSlaves();
}
经过这些处理,一个断开连接的slave,复制状态变成了REDIS_REPL_CONNECT。按照之前的流程,定时器会去尝试连接master, 发送PING命令,然后再发送PSYNC命令的时候,由于已经有了cached_master,会在PSYNC命令中带上之前master的id和偏移量。 相关slave和master的处理逻辑,前面代码中已经有了。
转载自:https://coolex.info/blog/463.html