接Redis Sentinel源码分析(一)
sentinelTimer函数周期性运行,第一次在服务启动后1ms执行,后续执行周期1000/server.hz(sentinelTimer函数会修改server.hz的值)
sentinelTimer内部包含sentinel模式需要定期执行的操作,包括check master、slave、sentinel的状态,并根据配置的条件判断是否需要fail over。
void sentinelTimer(void) {
//check是否需要进入TITL模式
sentinelCheckTiltCondition();
//执行定期操作(检查redis-server状态,和其他sentinel节点交互等)
sentinelHandleDictOfRedisInstances(sentinel.masters);
//运行等待执行的脚本
sentinelRunPendingScripts();
//清理已执行完毕脚本
sentinelCollectTerminatedScripts();
//杀死超时运行的脚本
sentinelKillTimedoutScripts();
//修改hz值(影响sentinel相关操作执行频率),引入随机值,尽量避免所有sentinel节点持续性的同一时间发起投票请求
server.hz = REDIS_DEFAULT_HZ + rand() % REDIS_DEFAULT_HZ;
}
sentinelCheckTiltCondition函数会check是否进入TITL模式,所谓TITL模式即只收集数据,而不做fail-over
进入TITL模式的原因可能是:
1)sentinel的部分操作被阻塞(可能是系统负载导致)
2)系统时钟异常
进入条件,两次进入sentinelCheckTiltCondition时间差值<0或者>2s
进入TITL模式的原因是为了避免错误的进行fail-over
void sentinelCheckTiltCondition(void) {
mstime_t now = mstime();
mstime_t delta = now - sentinel.previous_time;
//两次执行时间<0或者大于2s,则进入TITL模式
if (delta < 0 || delta > SENTINEL_TILT_TRIGGER) {
sentinel.tilt = 1;
sentinel.tilt_start_time = mstime();
sentinelEvent(REDIS_WARNING,"+tilt",NULL,"#tilt mode entered");
}
sentinel.previous_time = mstime();
}
sentinelHandleDictOfRedisInstances包含遍历所有instance,执行周期性操作
void sentinelHandleDictOfRedisInstances(dict *instances) {
dictIterator *di;
dictEntry *de;
sentinelRedisInstance *switch_to_promoted = NULL;
//遍历获取所有master结点
di = dictGetIterator(instances);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);
//执行结点的周期性操作
sentinelHandleRedisInstance(ri);
// 如果被遍历的是master,则遍历和该master关联的所有slave&sentinel
if (ri->flags & SRI_MASTER) {
sentinelHandleDictOfRedisInstances(ri->slaves);
sentinelHandleDictOfRedisInstances(ri->sentinels);
//如果master的状态为SENTINEL_FAILOVER_STATE_UPDATE_CONFIG,则准备执行failover
if (ri->failover_state == SENTINEL_FAILOVER_STATE_UPDATE_CONFIG) {
switch_to_promoted = ri;
}
}
}
//执行failover
if (switch_to_promoted)
sentinelFailoverSwitchToPromotedSlave(switch_to_promoted);
dictReleaseIterator(di);
}
sentinelHandleRedisInstance包含了具体的周期性操作,包括针对sentinel、slave、master实例的操作
void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {
/* 以下为所有实例都需要执行的操作 */
//连接及订阅管理
sentinelReconnectInstance(ri);
//和instance交流(PING/INFO/PUBLISH)
sentinelPingInstance(ri);
//如果仍然处于TILT模式,啥也不干
if (sentinel.tilt) {
if (mstime()-sentinel.tilt_start_time < SENTINEL_TILT_PERIOD) return;
sentinel.tilt = 0;
sentinelEvent(REDIS_WARNING,"-tilt",NULL,"#tilt mode exited");
}
//判断instance是否下线(sdown)
sentinelCheckSubjectivelyDown(ri);
......
/* 以下操作只针对master instance*/
if (ri->flags & SRI_MASTER) {
//check master是否为odown(满足用户配置的quorum节点数判断master为sdown)
sentinelCheckObjectivelyDown(ri);
//check是否需要做fail over,如果确认需要,则调用sentinelStartFailover修改自身状态
if (sentinelStartFailoverIfNeeded(ri))
//发送SENTINEL is-master-down-by-addr给其他的sentinel,并注册毁掉函数
sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_ASK_FORCED);
//执行故障转移
sentinelFailoverStateMachine(ri);
sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_NO_FLAGS);
}
}
sentinelReconnectInstance函数负责建立连接、重连,包括和各个instance建立连接,针对master instance,订阅其“__sentinel__:hello”频道
void sentinelReconnectInstance(sentinelRedisInstance *ri) {
if (!(ri->flags & SRI_DISCONNECTED)) return;
//和master/slave/sentinel instance建立连接
if (ri->cc == NULL) {
......
}
//针对master/slave,订阅其“__sentinel__:hello”频道
if ((ri->flags & (SRI_MASTER|SRI_SLAVE)) && ri->pc == NULL) {
......
retval = redisAsyncCommand(ri->pc,
sentinelReceiveHelloMessages, NULL, "SUBSCRIBE %s",
SENTINEL_HELLO_CHANNEL);
......
}
......
}
sentinelPingInstance会根据instance状况,向其发送命令,可能是INFO/PING/PUBLISH
void sentinelPingInstance(sentinelRedisInstance *ri) {
//假如instance处于不可连接状态或者过多的命令(100)还没有发送出去,直接返回
if (ri->flags & SRI_DISCONNECTED) return;
if (ri->pending_commands >= SENTINEL_MAX_PENDING_COMMANDS) return;
//对于slave instance,如果其master处于异常状态(SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS),则向该slave发送info的频率从10s一发提高到1s一发
if ((ri->flags & SRI_SLAVE) &&
(ri->master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS))) {
info_period = 1000;
} else {
info_period = SENTINEL_INFO_PERIOD;
}
//对于mastere/slave instance,每隔info_period时间,向其发送info命令,注册info命令的回调函数为sentinelInfoReplyCallback
//sentinelInfoReplyCallback会根据从master/slave所得到的回复中分析出相关信息,并更新sentinelRedisInstance的当前状态
if ((ri->flags & SRI_SENTINEL) == 0 &&
(ri->info_refresh == 0 ||
(now - ri->info_refresh) > info_period))
{
/* Send INFO to masters and slaves, not sentinels. */
retval = redisAsyncCommand(ri->cc,
sentinelInfoReplyCallback, NULL, "INFO");
if (retval != REDIS_OK) return;
ri->pending_commands++;
}
//对于所有类型的instance,都定时向其发送PING命令(1s),注册ping命令的回调函数为sentinelPingReplyCallback
//sentinelPingReplyCallback根据PING命令的返回值判断instance当前状态
else if ((now - ri->last_pong_time) > SENTINEL_PING_PERIOD) {
retval = redisAsyncCommand(ri->cc,
sentinelPingReplyCallback, NULL, "PING");
if (retval != REDIS_OK) return;
ri->pending_commands++;
//每隔2s向master/slave的“__sentinel__:hello”频道发布消息
//消息内容为:ip,port,runid,current_epoch, master->name,master->ip,master->port
} else if ((ri->flags & SRI_SENTINEL) == 0 &&
(now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD)
{
sentinelSendHello(ri);
}
}
sentinelCheckObjectivelyDown函数确认是否将master状态从sdown改为odown
void sentinelCheckObjectivelyDown(sentinelRedisInstance *master) {
......
//假如本身的状态为sdown,则开始判断是否可以判断为odown
if (master->flags & SRI_S_DOWN) {
quorum = 1;
di = dictGetIterator(master->sentinels);
//遍历sentinel字典,查看其是否将master状态职位sdown
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);
if (ri->flags & SRI_MASTER_DOWN) quorum++;
}
dictReleaseIterator(di);
//假如sentinel flag状态为SRI_MASTER_DOWN的sentinel个数达到用户定义的quorum个数,则将master状态置为odown
if (quorum >= master->quorum) odown = 1;
}
......
}
sentinelStartFailoverIfNeeded函数heck是否需要做fail over,如果确认需要,则调用sentinelStartFailover修改自身状态
int sentinelStartFailoverIfNeeded(sentinelRedisInstance *master) {
//确认master状态为odown
if (!(master->flags & SRI_O_DOWN)) return 0;
//确认failover没有在运行
if (master->flags & SRI_FAILOVER_IN_PROGRESS) return 0;
//确认在超时时间*2内没有failover在运行
if (mstime() - master->failover_start_time <
master->failover_timeout*2) return 0;
sentinelStartFailover(master);
return 1;
}
在确认要进行failover后,调用sentinelStartFailover修改相关状态数据
void sentinelStartFailover(sentinelRedisInstance *master) {
redisAssert(master->flags & SRI_MASTER);
// 设置 failover 状态
master->failover_state = SENTINEL_FAILOVER_STATE_WAIT_START;
// 设置master当前状态
master->flags |= SRI_FAILOVER_IN_PROGRESS;
// 设置failover_epoch
master->failover_epoch = ++sentinel.current_epoch;
// 设置fail over开始时间
master->failover_start_time = mstime()+rand()%s;
master->failover_state_change_time = mstime();
}
sentinelAskMasterStateToOtherSentinels是在检测到master状态为sdown后,sentinel向其它sentinel节点发送sentinel is-master-down-by-addr消息
void sentinelAskMasterStateToOtherSentinels(sentinelRedisInstance *master, int flags) {
//遍历关注该master的sentinel节点
while((de = dictNext(di)) != NULL) {
//向其它sentinle发送消息SENTINEL is-master-down-by-addr master_ip master_port current_epoch runid/*
//如果本身已经开始了failover进程,则向其他sentinel节点发送自己的runid,否则发送*
//注册回调函数sentinelReceiveIsMasterDownReply接受回复的信息
string(port,sizeof(port),master->addr->port);
retval = redisAsyncCommand(ri->cc,
sentinelReceiveIsMasterDownReply, NULL,
"SENTINEL is-master-down-by-addr %s %s %llu %s",
master->addr->ip, port,
sentinel.current_epoch,
(master->failover_state > SENTINEL_FAILOVER_STATE_NONE) ?
server.runid : "*");
if (retval == REDIS_OK) ri->pending_commands++;
}
dictReleaseIterator(di);
}
其他sentinel节点接受到sentinel is-master-down-by-addr消息,调用sentinelCommand处理
void sentinelCommand(redisClient *c) {
......
//处理sentinel is-master-down-by-addr消息
} else if (!strcasecmp(c->argv[1]->ptr,"is-master-down-by-addr")) {
/* SENTINEL IS-MASTER-DOWN-BY-ADDR <ip> <port> <current-epoch> <runid>*/
......
//根据其它sentinel传送过来的消息
ri = getSentinelRedisInstanceByAddrAndRunID(sentinel.masters,
c->argv[2]->ptr,port,NULL);
/* It exists? Is actually a master? Is subjectively down? It's down.
* Note: if we are in tilt mode we always reply with "0". */
if (!sentinel.tilt && ri && (ri->flags & SRI_S_DOWN) &&
(ri->flags & SRI_MASTER))
isdown = 1;
//假如发过来的信息中包含请求来源sentinel的runid,则开始进行投票
if (ri && ri->flags & SRI_MASTER && strcasecmp(c->argv[5]->ptr,"*")) {
leader = sentinelVoteLeader(ri,(uint64_t)req_epoch,
c->argv[5]->ptr,
&leader_epoch);
}
//回复信息,包括isdown,leader,leader_epoch
addReplyMultiBulkLen(c,3);
addReply(c, isdown ? shared.cone : shared.czero);
addReplyBulkCString(c, leader ? leader : "*");
addReplyLongLong(c, (long long)leader_epoch);
if (leader) sdsfree(leader);
}
sentinelReceiveIsMasterDownReply函数处理发送的给其他sentinel的消息”SENTINEL is-master-down-by-addr“的回复
void sentinelReceiveIsMasterDownReply(redisAsyncContext *c, void *reply, void *privdata) {
......
//根据返回值,判断是否将对应sentinel的状态置为SRI_MASTER_DOWN
if (r->element[0]->integer == 1) {
ri->flags |= SRI_MASTER_DOWN;
} else {
ri->flags &= ~SRI_MASTER_DOWN;
}
//如果sentinel返回了其选举的leader,则更新自己的leader和leader_epoch
if (strcmp(r->element[1]->str,"*")) {
sdsfree(ri->leader);
if (ri->leader_epoch != r->element[2]->integer)
redisLog(REDIS_WARNING,
"%s voted for %s %llu", ri->name,
r->element[1]->str,
(unsigned long long) r->element[2]->integer);
ri->leader = sdsnew(r->element[1]->str);
ri->leader_epoch = r->element[2]->integer;
}
}
sentinelFailoverStateMachine函数为故障转移状态机,其负责执行故障转移
void sentinelFailoverStateMachine(sentinelRedisInstance *ri) {
//master节点&正处于failover状态则继续
redisAssert(ri->flags & SRI_MASTER);
if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS)) return;
switch(ri->failover_state) {
//等待故障转移开始,如果自己为leader,置状态为SENTINEL_FAILOVER_STATE_SELECT_SLAVE,开始下一步操作,否则,不变更状态,等待fail-over完成/超时
case SENTINEL_FAILOVER_STATE_WAIT_START:
sentinelFailoverWaitStart(ri);
break;
//从slave中选择一个master,置状态为SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE
case SENTINEL_FAILOVER_STATE_SELECT_SLAVE: sentinelFailoverSelectSlave(ri); break;
//升级被选中的从服务器为新主服务器,置状态为SENTINEL_FAILOVER_STATE_WAIT_PROMOTION
case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE: sentinelFailoverSendSlaveOfNoOne(ri); break;
//等待fail over生效,info语句的回调函数sentinelRefreshInstanceInfo会更新当前状态SENTINEL_FAILOVER_STATE_RECONF_SLAVES
case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION: sentinelFailoverWaitPromotion(ri); break;
//令其它从服务器同步新主服务器
case SENTINEL_FAILOVER_STATE_RECONF_SLAVES: sentinelFailoverReconfNextSlave(ri); break;
}
}
转载于:https://my.oschina.net/zipu888/blog/549570