redis之集群源码分析(一)

redis集群源码分为两个部分来讲,第一部分主要分析一个命令在集群模式下是如何定位到一个server来处理该命令。第二部分主要分析集群故障转移部分的源码。

本篇文章主要包括
1、一个命令请求在集群模式下是如何被处理的
2、集群模式下我们要注意哪些问题?
我们知道在redis集群模式下,每个实例分担了部分的命令请求。比如说我们要查找一个key,那么查找这个key的命令究竟要在哪个示例上被处理呢?说到这里我们有必要先给出redis集群的数据结构。


// 集群模式下节点数据结构
struct clusterNode {
    // 创建节点的时间
    mstime_t ctime; /* Node object creation time. */

    // 节点的名字,由 40 个十六进制字符组成
    // 例如 68eef66df23420a5862208ef5b1a7005b806f2ff
    char name[REDIS_CLUSTER_NAMELEN]; /* Node name, hex string, sha1-size */

    // 节点标识
    // 使用各种不同的标识值记录节点的角色(比如主节点或者从节点),
    // 以及节点目前所处的状态(比如在线或者下线)。
    int flags;      /* REDIS_NODE_... */

    // 节点当前的配置纪元,用于实现故障转移
    uint64_t configEpoch; /* Last configEpoch observed for this node */

    // 由这个节点负责处理的槽
    // 一共有 REDIS_CLUSTER_SLOTS / 8 个字节长
    // 每个字节的每个位记录了一个槽的保存状态
    // 位的值为 1 表示槽正由本节点处理,值为 0 则表示槽并非本节点处理
    // 比如 slots[0] 的第一个位保存了槽 0 的保存情况
    // slots[0] 的第二个位保存了槽 1 的保存情况,以此类推
    unsigned char slots[REDIS_CLUSTER_SLOTS/8]; /* slots handled by this node */

    // 该节点负责处理的槽数量
    int numslots;   /* Number of slots handled by this node */

    // 如果本节点是主节点,那么用这个属性记录从节点的数量
    int numslaves;  /* Number of slave nodes, if this is a master */

    // 指针数组,指向各个从节点
    struct clusterNode **slaves; /* pointers to slave nodes */

    // 如果这是一个从节点,那么指向主节点
    struct clusterNode *slaveof; /* pointer to the master node */

    // 最后一次发送 PING 命令的时间
    mstime_t ping_sent;      /* Unix time we sent latest ping */

    // 最后一次接收 PONG 回复的时间戳
    mstime_t pong_received;  /* Unix time we received the pong */

    // 最后一次被设置为 FAIL 状态的时间
    mstime_t fail_time;      /* Unix time when FAIL flag was set */

    // 最后一次给某个从节点投票的时间
    mstime_t voted_time;     /* Last time we voted for a slave of this master */

    // 最后一次从这个节点接收到复制偏移量的时间
    mstime_t repl_offset_time;  /* Unix time we received offset for this node */

    // 这个节点的复制偏移量
    long long repl_offset;      /* Last known repl offset for this node. */

    // 节点的 IP 地址
    char ip[REDIS_IP_STR_LEN];  /* Latest known IP address of this node */

    // 节点的端口号
    int port;                   /* Latest known port of this node */

    // 保存连接节点所需的有关信息
    clusterLink *link;          /* TCP/IP link with this node */

    // 一个链表,记录了所有其他节点对该节点的下线报告
    list *fail_reports;         /* List of nodes signaling this as failing */

};

我们需要注意的是clusterNode节点中的 slots数组,这个数组中保存了该节点要处理的槽位。如果该位置为1表示该节点负责该槽位的处理,否则该节点不负责该槽位的处理。
关于槽位的解释:在集群模式下,redis总共划分16384个槽位,每个集群中的节点(注:在本文中 节点=实例)负责部分槽位的处理。

下面再给出在集群模式下集群状态的数据结构

// 集群状态,每个节点都保存着一个这样的状态,记录了它们眼中的集群的样子。
// 另外,虽然这个结构主要用于记录集群的属性,但是为了节约资源,
// 有些与节点有关的属性,比如 slots_to_keys 、 failover_auth_count 
// 也被放到了这个结构里面。
typedef struct clusterState {

    // 指向当前节点的指针
    clusterNode *myself;  /* This node */

    // 集群当前的配置纪元,用于实现故障转移
    uint64_t currentEpoch;

    // 集群当前的状态:是在线还是下线
    int state;            /* REDIS_CLUSTER_OK, REDIS_CLUSTER_FAIL, ... */

    // 集群中至少处理着一个槽的节点的数量。
    int size;             /* Num of master nodes with at least one slot */

    // 集群节点名单(包括 myself 节点)
    // 字典的键为节点的名字,字典的值为 clusterNode 结构
    dict *nodes;          /* Hash table of name -> clusterNode structures */

    // 节点黑名单,用于 CLUSTER FORGET 命令
    // 防止被 FORGET 的命令重新被添加到集群里面
    // (不过现在似乎没有在使用的样子,已废弃?还是尚未实现?)
    dict *nodes_black_list; /* Nodes we don't re-add for a few seconds. */

    // 记录要从当前节点迁移到目标节点的槽,以及迁移的目标节点
    // migrating_slots_to[i] = NULL 表示槽 i 未被迁移
    // migrating_slots_to[i] = clusterNode_A 表示槽 i 要从本节点迁移至节点 A
    clusterNode *migrating_slots_to[REDIS_CLUSTER_SLOTS];

    // 记录要从源节点迁移到本节点的槽,以及进行迁移的源节点
    // importing_slots_from[i] = NULL 表示槽 i 未进行导入
    // importing_slots_from[i] = clusterNode_A 表示正从节点 A 中导入槽 i
    clusterNode *importing_slots_from[REDIS_CLUSTER_SLOTS];

    // 负责处理各个槽的节点
    // 例如 slots[i] = clusterNode_A 表示槽 i 由节点 A 处理
    clusterNode *slots[REDIS_CLUSTER_SLOTS];

    // 跳跃表,表中以槽作为分值,键作为成员,对槽进行有序排序
    // 当需要对某些槽进行区间(range)操作时,这个跳跃表可以提供方便
    // 具体操作定义在 db.c 里面
    zskiplist *slots_to_keys;

    /* The following fields are used to take the slave state on elections. */
    // 以下这些域被用于进行故障转移选举

    // 上次执行选举或者下次执行选举的时间
    mstime_t failover_auth_time; /* Time of previous or next election. */

    // 节点获得的投票数量
    int failover_auth_count;    /* Number of votes received so far. */

    // 如果值为 1 ,表示本节点已经向其他节点发送了投票请求
    int failover_auth_sent;     /* True if we already asked for votes. */

    int failover_auth_rank;     /* This slave rank for current auth request. */

    uint64_t failover_auth_epoch; /* Epoch of the current election. */

    /* Manual failover state in common. */
    /* 共用的手动故障转移状态 */

    // 手动故障转移执行的时间限制
    mstime_t mf_end;            /* Manual failover time limit (ms unixtime).
                                   It is zero if there is no MF in progress. */
    /* Manual failover state of master. */
    /* 主服务器的手动故障转移状态 */
    clusterNode *mf_slave;      /* Slave performing the manual failover. */
    /* Manual failover state of slave. */
    /* 从服务器的手动故障转移状态 */
    long long mf_master_offset; /* Master offset the slave needs to start MF
                                   or zero if stil not received. */
    // 指示手动故障转移是否可以开始的标志值
    // 值为非 0 时表示各个主服务器可以开始投票
    int mf_can_start;           /* If non-zero signal that the manual failover
                                   can start requesting masters vote. */

    /* The followign fields are uesd by masters to take state on elections. */
    /* 以下这些域由主服务器使用,用于记录选举时的状态 */

    // 集群最后一次进行投票的纪元
    uint64_t lastVoteEpoch;     /* Epoch of the last vote granted. */

    // 在进入下个事件循环之前要做的事情,以各个 flag 来记录
    int todo_before_sleep; /* Things to do in clusterBeforeSleep(). */

    // 通过 cluster 连接发送的消息数量
    long long stats_bus_messages_sent;  /* Num of msg sent via cluster bus. */

    // 通过 cluster 接收到的消息数量
    long long stats_bus_messages_received; /* Num of msg rcvd via cluster bus.*/

} clusterState;

我们需要特别注意的是 clusterState结构中引用了clusterNode数据结构。在clusterState结构中我们也看到了一个slots数组,这个数组记录了槽i被指派给了哪个节点处理。所以clusterNode中的slots与clusterState中的slots数组不同之处在于:clusterNode.slots只记录了本节点需要处理的槽位信息,而clusterState.slots记录了槽位i分配给了哪个节点,之所以用两个数组是为了效率上的考虑,我们知道这点就行了。
下面进入正题,当在集群模式下,一个请求命令到底要被哪个server进行处理呢?这里大家记住,每一个命令请求都是从函数 processCommand(redisClient *c) 开始的

在这里插入代码片int processCommand(redisClient *c) {

    // 这里就是有关集群的入口
    /* If cluster is enabled perform the cluster redirection here.
     *
     * 如果开启了集群模式,那么在这里进行转向操作。
     *
     * However we don't perform the redirection if:
     *
     * 不过,如果有以下情况出现,那么节点不进行转向:
     *
     * 1) The sender of this command is our master.
     *    命令的发送者是本节点的主节点
     *
     * 2) The command has no key arguments. 
     *    命令没有 key 参数
     */
    if (server.cluster_enabled &&
        !(c->flags & REDIS_MASTER) &&
        !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0))
    {
        int hashslot;

        // 集群已下线
        if (server.cluster->state != REDIS_CLUSTER_OK) {
            flagTransaction(c);
            addReplySds(c,sdsnew("-CLUSTERDOWN The cluster is down. Use CLUSTER INFO for more information\r\n"));
            return REDIS_OK;

        // 集群运作正常
        } else {
            int error_code;
            clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&error_code);
            // 不能执行多键处理命令
            if (n == NULL) {
                flagTransaction(c);
                if (error_code == REDIS_CLUSTER_REDIR_CROSS_SLOT) {
                    addReplySds(c,sdsnew("-CROSSSLOT Keys in request don't hash to the same slot\r\n"));
                } else if (error_code == REDIS_CLUSTER_REDIR_UNSTABLE) {
                    /* The request spawns mutliple keys in the same slot,
                     * but the slot is not "stable" currently as there is
                     * a migration or import in progress. */
                    addReplySds(c,sdsnew("-TRYAGAIN Multiple keys request during rehashing of slot\r\n"));
                } else {
                    redisPanic("getNodeByQuery() unknown error.");
                }
                return REDIS_OK;

            // 命令针对的槽和键不是本节点处理的,进行转向
            } else if (n != server.cluster->myself) {
                flagTransaction(c);
                // -<ASK or MOVED> <slot> <ip>:<port>
                // 例如 -ASK 10086 127.0.0.1:12345
                addReplySds(c,sdscatprintf(sdsempty(),
                    "-%s %d %s:%d\r\n",
                    (error_code == REDIS_CLUSTER_REDIR_ASK) ? "ASK" : "MOVED",
                    hashslot,n->ip,n->port));

                return REDIS_OK;
            }

            // 如果执行到这里,说明键 key 所在的槽由本节点处理
            // 或者客户端执行的是无参数命令
        }
    }

上面的代码段就是集群模式下处理命令请求的入口。其中getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&error_code)这个函数就是找到处理命令对应的redis 实例。
我们看到入参有 error_code这个参数,这个参数会在此函数中被赋值。那么它有几个值呢?每个不同的code码,都是代表什么意思呢?下面给出几个枚举值

/* 由 getNodeByQuery() 函数返回的转向错误。 /
// 节点可以处理这个命令
#define REDIS_CLUSTER_REDIR_NONE 0 /
Node can serve the request. /
// 键在其他槽
#define REDIS_CLUSTER_REDIR_CROSS_SLOT 1 /
Keys in different slots. /
// 键所处的槽正在进行 reshard
#define REDIS_CLUSTER_REDIR_UNSTABLE 2 /
Keys in slot resharding. /
// 需要进行 ASK 转向
#define REDIS_CLUSTER_REDIR_ASK 3 /
-ASK redirection required. */
// 需要进行 MOVED 转向
#define REDIS_CLUSTER_REDIR_MOVED 4
我们这里先给出这几个值,后续源码中会有关于这几个值的解释。
我们开始分析getNodeByQuery()这个函数

```c
/* 
该函数返回处理命令的集群节点
Return the pointer to the cluster node that is able to serve the command.

  集群处理的命令只能是1.单个key 2.多个key,但是这些key对应的redis 实例是同一个,并且集群是稳定的【未在进行重新分片】
 * For the function to succeed the command should only target either:
 *
 * 1) A single key (even multiple times like LPOPRPUSH mylist mylist).
 * 2) Multiple keys in the same hash slot, while the slot is stable (no
 *    resharding in progress).
 *
  如果成功,该函数返回能够处理命令请求的redis 实例
 * On success the function returns the node that is able to serve the request.
 * 如果节点不是 'myself',会执行重定向
 * If the node is not 'myself' a redirection must be perfomed.
 *
 重定向的方式有 ask和remvoed两种
 The kind of
 * redirection is specified setting the integer passed by reference
 * 'error_code', which will be set to REDIS_CLUSTER_REDIR_ASK or
 * REDIS_CLUSTER_REDIR_MOVED.
 *
 当处理命令的节点是 `myself`, error_code被赋值为REDIS_CLUSTER_REDIR_NONE
 * When the node is 'myself' 'error_code' is set to REDIS_CLUSTER_REDIR_NONE.
 *当这个命令不能处理时,返回NULL,error_code被赋值为原因
 
 * If the command fails NULL is returned, and the reason of the failure is
 * provided via 'error_code', which will be set to:
 *当这个命令包含多个key并且这些key不在同一个slot中时,
 error_code被赋值为 REDIS_CLUSTER_REDIR_CROSS_SLOT
 * REDIS_CLUSTER_REDIR_CROSS_SLOT if the request contains multiple keys that
 * don't belong to the same hash slot.
 *
 当命令中含有多个key,并且这些key同属一个slot,但是集群正在resharding,error_code被赋值为RREDIS_CLUSTER_REDIR_UNSTABLE
 * REDIS_CLUSTER_REDIR_UNSTABLE if the request contains mutliple keys
 * belonging to the same slot, but the slot is not stable (in migration or
 * importing state, likely because a resharding is in progress). */
clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *error_code) {

    // 初始化为 NULL ,
    // 如果输入命令是无参数命令,那么 n 就会继续为 NULL
    clusterNode *n = NULL;

    robj *firstkey = NULL;
    int multiple_keys = 0;
    multiState *ms, _ms;
    multiCmd mc;
    int i, slot = 0, migrating_slot = 0, importing_slot = 0, missing_keys = 0;

    /* Set error code optimistically for the base case. */
    if (error_code) *error_code = REDIS_CLUSTER_REDIR_NONE;

    /* We handle all the cases as if they were EXEC commands, so we have
     * a common code path for everything */
    // 集群可以执行事务,
    // 但必须确保事务中的所有命令都是针对某个相同的键进行的
    // 这个 if 和接下来的 for 进行的就是这一合法性检测
    if (cmd->proc == execCommand) {
        /* If REDIS_MULTI flag is not set EXEC is just going to return an
         * error. */
        if (!(c->flags & REDIS_MULTI)) return myself;
        ms = &c->mstate;
    } else {
        /* In order to have a single codepath create a fake Multi State
         * structure if the client is not in MULTI/EXEC state, this way
         * we have a single codepath below. */
        ms = &_ms;
        _ms.commands = &mc;
        _ms.count = 1;
        mc.argv = argv;
        mc.argc = argc;
        mc.cmd = cmd;
    }

    /* Check that all the keys are in the same hash slot, and obtain this
     * slot and the node associated. */
    for (i = 0; i < ms->count; i++) {
        struct redisCommand *mcmd;
        robj **margv;
        int margc, *keyindex, numkeys, j;

        mcmd = ms->commands[i].cmd;
        margc = ms->commands[i].argc;
        margv = ms->commands[i].argv;

        // 定位命令的键位置
        keyindex = getKeysFromCommand(mcmd,margv,margc,&numkeys);
        // 遍历命令中的所有键
        for (j = 0; j < numkeys; j++) {
            robj *thiskey = margv[keyindex[j]];
            int thisslot = keyHashSlot((char*)thiskey->ptr,
                                       sdslen(thiskey->ptr));

            if (firstkey == NULL) {
                // 这是事务中第一个被处理的键
                // 获取该键的槽和负责处理该槽的节点
                /* This is the first key we see. Check what is the slot
                 * and node. */
                firstkey = thiskey;
                slot = thisslot;
                n = server.cluster->slots[slot];
                redisAssertWithInfo(c,firstkey,n != NULL);
                /* If we are migrating or importing this slot, we need to check
                 * if we have all the keys in the request (the only way we
                 * can safely serve the request, otherwise we return a TRYAGAIN
                 * error). To do so we set the importing/migrating state and
                 * increment a counter for every missing key. */
                if (n == myself &&
                    server.cluster->migrating_slots_to[slot] != NULL)
                {
                    migrating_slot = 1;
                } else if (server.cluster->importing_slots_from[slot] != NULL) {
                    importing_slot = 1;
                }
            } else {
                /* If it is not the first key, make sure it is exactly
                 * the same key as the first we saw. */
                if (!equalStringObjects(firstkey,thiskey)) {
                    if (slot != thisslot) {
                        /* Error: multiple keys from different slots. */
                        getKeysFreeResult(keyindex);
                        if (error_code)
                            *error_code = REDIS_CLUSTER_REDIR_CROSS_SLOT;
                        return NULL;
                    } else {
                        /* Flag this request as one with multiple different
                         * keys. */
                        multiple_keys = 1;
                    }
                }
            }

            /* Migarting / Improrting slot? Count keys we don't have. */
            if ((migrating_slot || importing_slot) &&
                lookupKeyRead(&server.db[0],thiskey) == NULL)
            {
                missing_keys++;
            }
        }
        getKeysFreeResult(keyindex);
    }  // end for

    /* No key at all in command? then we can serve the request
     * without redirections or errors. */
    if (n == NULL) return myself;

    /* Return the hashslot by reference. */
    if (hashslot) *hashslot = slot;

    /* This request is about a slot we are migrating into another instance?
     * Then if we have all the keys. */

    /* If we don't have all the keys and we are migrating the slot, send
     * an ASK redirection. */
    if (migrating_slot && missing_keys) {
        if (error_code) *error_code = REDIS_CLUSTER_REDIR_ASK;
        return server.cluster->migrating_slots_to[slot];
    }

    /* If we are receiving the slot, and the client correctly flagged the
     * request as "ASKING", we can serve the request. However if the request
     * involves multiple keys and we don't have them all, the only option is
     * to send a TRYAGAIN error. */
    if (importing_slot &&
        (c->flags & REDIS_ASKING || cmd->flags & REDIS_CMD_ASKING))
    {
        if (multiple_keys && missing_keys) {
            if (error_code) *error_code = REDIS_CLUSTER_REDIR_UNSTABLE;
            return NULL;
        } else {
            return myself;
        }
    }

    /* Handle the read-only client case reading from a slave: if this
     * node is a slave and the request is about an hash slot our master
     * is serving, we can reply without redirection. */
    if (c->flags & REDIS_READONLY &&
        cmd->flags & REDIS_CMD_READONLY &&
        nodeIsSlave(myself) &&
        myself->slaveof == n)
    {
        return myself;
    }

    /* Base case: just return the right node. However if this node is not
     * myself, set error_code to MOVED since we need to issue a rediretion. */
    if (n != myself && error_code) *error_code = REDIS_CLUSTER_REDIR_MOVED;

    // 返回负责处理槽 slot 的节点 n
    return n;
   }

以上就是 getQueryNode这个函数的功能,其实也比较简单,对着注释多看两遍就能看懂。无非就是找到处理命令的集群节点,如果找不到给error_code赋值原因。调用者根绝error_code做相应的处理。

2.在集群模式下我们应该注意哪些问题呢?
(1)在集群模式下应该尽量避免多key查询
(2)注意 key的散列与集群均衡

后记:参考书籍《redis设计与实现》《redis3.0源码》

上一篇:dotnet 在 Linux 下的 GDI 库对 EMF 图片格式的支持


下一篇:Redis Cluster集群底层原理的面试题(持续补充ing)