[redis设计与实现][11]sentinel——通信

通信

初始化完成之后,sentinel会主动和master、slave进行通信,获取他们的信息。

获取主服务器信息

首先,sentinel会和master建立两个连接,分别是命令连接和订阅连接(分别保存在sentinelRedisInstance的cc和pc字段中)。

void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {
    sentinelReconnectInstance(ri);
    ...
}
#define SENTINEL_HELLO_CHANNEL "__sentinel__:hello"

命令连接用于后续获取master的信息(包括slave的信息),订阅(sentinel:hello频道)用于获取master的掉线状态等消息的推送。

连接完成之后,sentinel会定时发送消息:

void sentinelSendPeriodicCommands(sentinelRedisInstance *ri) {
    mstime_t now = mstime();
    mstime_t info_period, ping_period;
    int retval;

    /* Return ASAP if we have already a PING or INFO already pending, or
     * in the case the instance is not properly connected. */
    if (ri->flags & SRI_DISCONNECTED) return;

    /* For INFO, PING, PUBLISH that are not critical commands to send we
     * also have a limit of SENTINEL_MAX_PENDING_COMMANDS. We don't
     * want to use a lot of memory just because a link is not working
     * properly (note that anyway there is a redundant protection about this,
     * that is, the link will be disconnected and reconnected if a long
     * timeout condition is detected. */
    if (ri->pending_commands >= SENTINEL_MAX_PENDING_COMMANDS) return;

    /* If this is a slave of a master in O_DOWN condition we start sending
     * it INFO every second, instead of the usual SENTINEL_INFO_PERIOD
     * period. In this state we want to closely monitor slaves in case they
     * are turned into masters by another Sentinel, or by the sysadmin. */
     // INFO命令默认发送间隔为 10s #define SENTINEL_INFO_PERIOD 10000
     // 如果为已经宕机的master的slave,改1s
    if ((ri->flags & SRI_SLAVE) &&
        (ri->master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS))) {
        info_period = 1000;
    } else {
        info_period = SENTINEL_INFO_PERIOD;
    }

    /* We ping instances every time the last received pong is older than
     * the configured 'down-after-milliseconds' time, but every second
     * anyway if 'down-after-milliseconds' is greater than 1 second. */
     // ping命令默认间隔1s #define SENTINEL_PING_PERIOD 1000
    ping_period = ri->down_after_period;
    if (ping_period > SENTINEL_PING_PERIOD) ping_period = SENTINEL_PING_PERIOD;

    if ((ri->flags & SRI_SENTINEL) == 0 &&
        (ri->info_refresh == 0 ||
        (now - ri->info_refresh) > info_period))
    {
        /* Send INFO to masters and slaves, not sentinels. */
        retval = redisAsyncCommand(ri->cc,
            sentinelInfoReplyCallback, NULL, "INFO");
        if (retval == REDIS_OK) ri->pending_commands++;
    } else if ((now - ri->last_pong_time) > ping_period) {
        /* Send PING to all the three kinds of instances. */
        sentinelSendPing(ri);
    } else if ((now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD) {
        /* PUBLISH hello messages to all the three kinds of instances. */
        sentinelSendHello(ri);
    }
}

处理INFO消息的返回:

void sentinelInfoReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
    sentinelRedisInstance *ri = c->data;
    redisReply *r;
    REDIS_NOTUSED(privdata);

    if (ri) ri->pending_commands--;
    if (!reply || !ri) return;
    r = reply;

    if (r->type == REDIS_REPLY_STRING) {
        sentinelRefreshInstanceInfo(ri,r->str);
    }
}

这里针对向master发送的INFO,sentinel会:
1. 获取master run_id记录,检查master的role,更新自己维护的master列表
2. 通过复制字段,获取master对应的slave列表,更新自己维护的slave列表

获取slave信息

同样,sentinel也会以同样的频率向slave发送INFO命令,并且提取以下参数:
* run_id
* role
* master的host和port
* 主从服务器的连接状态(master_link_status)
* slave优先级(slave_priority)
* slave复制偏移量(slave_repl_offset)
并更新自己维护的sentinelRedisInstance结构。

发送和接受订阅信息

sentinel每秒通过命令连接向所有master和slave发送信息。

int sentinelSendHello(sentinelRedisInstance *ri) {
    char ip[REDIS_IP_STR_LEN];
    char payload[REDIS_IP_STR_LEN+1024];
    int retval;
    char *announce_ip;
    int announce_port;
    sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ? ri : ri->master;
    sentinelAddr *master_addr = sentinelGetCurrentMasterAddress(master);

    if (ri->flags & SRI_DISCONNECTED) return REDIS_ERR;

    /* Use the specified announce address if specified, otherwise try to
     * obtain our own IP address. */
    if (sentinel.announce_ip) {
        announce_ip = sentinel.announce_ip;
    } else {
        if (anetSockName(ri->cc->c.fd,ip,sizeof(ip),NULL) == -1)
            return REDIS_ERR;
        announce_ip = ip;
    }
    announce_port = sentinel.announce_port ?
                    sentinel.announce_port : server.port;

    /* Format and send the Hello message. */
    snprintf(payload,sizeof(payload),
        "%s,%d,%s,%llu," /* Info about this sentinel. */
        "%s,%s,%d,%llu", /* Info about current master. */
        announce_ip, announce_port, server.runid,
        (unsigned long long) sentinel.current_epoch,
        /* --- */
        master->name,master_addr->ip,master_addr->port,
        (unsigned long long) master->config_epoch);
    retval = redisAsyncCommand(ri->cc,
        sentinelPublishReplyCallback, NULL, "PUBLISH %s %s",
            SENTINEL_HELLO_CHANNEL,payload);
    if (retval != REDIS_OK) return REDIS_ERR;
    ri->pending_commands++;
    return REDIS_OK;
}

发送内容包括:
* sentinel ip(announce_ip)
* sentinel端口(announce_port)
* sentinel运行id(server.runid)
* sentinel配置纪元(sentinel.current_epoch)
* master名称(master->name)
* master IP(master_addr->ip)
* master端口(master_addr->port)
* master纪元(master->config_epoch)

同时,所有连接到这个master上的sentinel都会收到这个消息,然后做出回应:
* 更新sentinels字典
* 创建连接其他sentinel的命令连接


转载自:https://coolex.info/blog/471.html

上一篇:【转】javascript入门系列演示·三种弹出对话框的用法实例


下一篇:NodeJS之 Express框架 app.use(express.static)