redis集群源码分为两个部分来讲,第一部分主要分析一个命令在集群模式下是如何定位到一个server来处理该命令。第二部分主要分析集群故障转移部分的源码。
本篇文章主要包括
1、一个命令请求在集群模式下是如何被处理的
2、集群模式下我们要注意哪些问题?
我们知道在redis集群模式下,每个实例分担了部分的命令请求。比如说我们要查找一个key,那么查找这个key的命令究竟要在哪个示例上被处理呢?说到这里我们有必要先给出redis集群的数据结构。
// 集群模式下节点数据结构
struct clusterNode {
// 创建节点的时间
mstime_t ctime; /* Node object creation time. */
// 节点的名字,由 40 个十六进制字符组成
// 例如 68eef66df23420a5862208ef5b1a7005b806f2ff
char name[REDIS_CLUSTER_NAMELEN]; /* Node name, hex string, sha1-size */
// 节点标识
// 使用各种不同的标识值记录节点的角色(比如主节点或者从节点),
// 以及节点目前所处的状态(比如在线或者下线)。
int flags; /* REDIS_NODE_... */
// 节点当前的配置纪元,用于实现故障转移
uint64_t configEpoch; /* Last configEpoch observed for this node */
// 由这个节点负责处理的槽
// 一共有 REDIS_CLUSTER_SLOTS / 8 个字节长
// 每个字节的每个位记录了一个槽的保存状态
// 位的值为 1 表示槽正由本节点处理,值为 0 则表示槽并非本节点处理
// 比如 slots[0] 的第一个位保存了槽 0 的保存情况
// slots[0] 的第二个位保存了槽 1 的保存情况,以此类推
unsigned char slots[REDIS_CLUSTER_SLOTS/8]; /* slots handled by this node */
// 该节点负责处理的槽数量
int numslots; /* Number of slots handled by this node */
// 如果本节点是主节点,那么用这个属性记录从节点的数量
int numslaves; /* Number of slave nodes, if this is a master */
// 指针数组,指向各个从节点
struct clusterNode **slaves; /* pointers to slave nodes */
// 如果这是一个从节点,那么指向主节点
struct clusterNode *slaveof; /* pointer to the master node */
// 最后一次发送 PING 命令的时间
mstime_t ping_sent; /* Unix time we sent latest ping */
// 最后一次接收 PONG 回复的时间戳
mstime_t pong_received; /* Unix time we received the pong */
// 最后一次被设置为 FAIL 状态的时间
mstime_t fail_time; /* Unix time when FAIL flag was set */
// 最后一次给某个从节点投票的时间
mstime_t voted_time; /* Last time we voted for a slave of this master */
// 最后一次从这个节点接收到复制偏移量的时间
mstime_t repl_offset_time; /* Unix time we received offset for this node */
// 这个节点的复制偏移量
long long repl_offset; /* Last known repl offset for this node. */
// 节点的 IP 地址
char ip[REDIS_IP_STR_LEN]; /* Latest known IP address of this node */
// 节点的端口号
int port; /* Latest known port of this node */
// 保存连接节点所需的有关信息
clusterLink *link; /* TCP/IP link with this node */
// 一个链表,记录了所有其他节点对该节点的下线报告
list *fail_reports; /* List of nodes signaling this as failing */
};
我们需要注意的是clusterNode节点中的 slots数组,这个数组中保存了该节点要处理的槽位。如果该位置为1表示该节点负责该槽位的处理,否则该节点不负责该槽位的处理。
关于槽位的解释:在集群模式下,redis总共划分16384个槽位,每个集群中的节点(注:在本文中 节点=实例)负责部分槽位的处理。
下面再给出在集群模式下集群状态的数据结构
// 集群状态,每个节点都保存着一个这样的状态,记录了它们眼中的集群的样子。
// 另外,虽然这个结构主要用于记录集群的属性,但是为了节约资源,
// 有些与节点有关的属性,比如 slots_to_keys 、 failover_auth_count
// 也被放到了这个结构里面。
typedef struct clusterState {
// 指向当前节点的指针
clusterNode *myself; /* This node */
// 集群当前的配置纪元,用于实现故障转移
uint64_t currentEpoch;
// 集群当前的状态:是在线还是下线
int state; /* REDIS_CLUSTER_OK, REDIS_CLUSTER_FAIL, ... */
// 集群中至少处理着一个槽的节点的数量。
int size; /* Num of master nodes with at least one slot */
// 集群节点名单(包括 myself 节点)
// 字典的键为节点的名字,字典的值为 clusterNode 结构
dict *nodes; /* Hash table of name -> clusterNode structures */
// 节点黑名单,用于 CLUSTER FORGET 命令
// 防止被 FORGET 的命令重新被添加到集群里面
// (不过现在似乎没有在使用的样子,已废弃?还是尚未实现?)
dict *nodes_black_list; /* Nodes we don't re-add for a few seconds. */
// 记录要从当前节点迁移到目标节点的槽,以及迁移的目标节点
// migrating_slots_to[i] = NULL 表示槽 i 未被迁移
// migrating_slots_to[i] = clusterNode_A 表示槽 i 要从本节点迁移至节点 A
clusterNode *migrating_slots_to[REDIS_CLUSTER_SLOTS];
// 记录要从源节点迁移到本节点的槽,以及进行迁移的源节点
// importing_slots_from[i] = NULL 表示槽 i 未进行导入
// importing_slots_from[i] = clusterNode_A 表示正从节点 A 中导入槽 i
clusterNode *importing_slots_from[REDIS_CLUSTER_SLOTS];
// 负责处理各个槽的节点
// 例如 slots[i] = clusterNode_A 表示槽 i 由节点 A 处理
clusterNode *slots[REDIS_CLUSTER_SLOTS];
// 跳跃表,表中以槽作为分值,键作为成员,对槽进行有序排序
// 当需要对某些槽进行区间(range)操作时,这个跳跃表可以提供方便
// 具体操作定义在 db.c 里面
zskiplist *slots_to_keys;
/* The following fields are used to take the slave state on elections. */
// 以下这些域被用于进行故障转移选举
// 上次执行选举或者下次执行选举的时间
mstime_t failover_auth_time; /* Time of previous or next election. */
// 节点获得的投票数量
int failover_auth_count; /* Number of votes received so far. */
// 如果值为 1 ,表示本节点已经向其他节点发送了投票请求
int failover_auth_sent; /* True if we already asked for votes. */
int failover_auth_rank; /* This slave rank for current auth request. */
uint64_t failover_auth_epoch; /* Epoch of the current election. */
/* Manual failover state in common. */
/* 共用的手动故障转移状态 */
// 手动故障转移执行的时间限制
mstime_t mf_end; /* Manual failover time limit (ms unixtime).
It is zero if there is no MF in progress. */
/* Manual failover state of master. */
/* 主服务器的手动故障转移状态 */
clusterNode *mf_slave; /* Slave performing the manual failover. */
/* Manual failover state of slave. */
/* 从服务器的手动故障转移状态 */
long long mf_master_offset; /* Master offset the slave needs to start MF
or zero if stil not received. */
// 指示手动故障转移是否可以开始的标志值
// 值为非 0 时表示各个主服务器可以开始投票
int mf_can_start; /* If non-zero signal that the manual failover
can start requesting masters vote. */
/* The followign fields are uesd by masters to take state on elections. */
/* 以下这些域由主服务器使用,用于记录选举时的状态 */
// 集群最后一次进行投票的纪元
uint64_t lastVoteEpoch; /* Epoch of the last vote granted. */
// 在进入下个事件循环之前要做的事情,以各个 flag 来记录
int todo_before_sleep; /* Things to do in clusterBeforeSleep(). */
// 通过 cluster 连接发送的消息数量
long long stats_bus_messages_sent; /* Num of msg sent via cluster bus. */
// 通过 cluster 接收到的消息数量
long long stats_bus_messages_received; /* Num of msg rcvd via cluster bus.*/
} clusterState;
我们需要特别注意的是 clusterState结构中引用了clusterNode数据结构。在clusterState结构中我们也看到了一个slots数组,这个数组记录了槽i被指派给了哪个节点处理。所以clusterNode中的slots与clusterState中的slots数组不同之处在于:clusterNode.slots只记录了本节点需要处理的槽位信息,而clusterState.slots记录了槽位i分配给了哪个节点,之所以用两个数组是为了效率上的考虑,我们知道这点就行了。
下面进入正题,当在集群模式下,一个请求命令到底要被哪个server进行处理呢?这里大家记住,每一个命令请求都是从函数 processCommand(redisClient *c) 开始的
在这里插入代码片int processCommand(redisClient *c) {
// 这里就是有关集群的入口
/* If cluster is enabled perform the cluster redirection here.
*
* 如果开启了集群模式,那么在这里进行转向操作。
*
* However we don't perform the redirection if:
*
* 不过,如果有以下情况出现,那么节点不进行转向:
*
* 1) The sender of this command is our master.
* 命令的发送者是本节点的主节点
*
* 2) The command has no key arguments.
* 命令没有 key 参数
*/
if (server.cluster_enabled &&
!(c->flags & REDIS_MASTER) &&
!(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0))
{
int hashslot;
// 集群已下线
if (server.cluster->state != REDIS_CLUSTER_OK) {
flagTransaction(c);
addReplySds(c,sdsnew("-CLUSTERDOWN The cluster is down. Use CLUSTER INFO for more information\r\n"));
return REDIS_OK;
// 集群运作正常
} else {
int error_code;
clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&error_code);
// 不能执行多键处理命令
if (n == NULL) {
flagTransaction(c);
if (error_code == REDIS_CLUSTER_REDIR_CROSS_SLOT) {
addReplySds(c,sdsnew("-CROSSSLOT Keys in request don't hash to the same slot\r\n"));
} else if (error_code == REDIS_CLUSTER_REDIR_UNSTABLE) {
/* The request spawns mutliple keys in the same slot,
* but the slot is not "stable" currently as there is
* a migration or import in progress. */
addReplySds(c,sdsnew("-TRYAGAIN Multiple keys request during rehashing of slot\r\n"));
} else {
redisPanic("getNodeByQuery() unknown error.");
}
return REDIS_OK;
// 命令针对的槽和键不是本节点处理的,进行转向
} else if (n != server.cluster->myself) {
flagTransaction(c);
// -<ASK or MOVED> <slot> <ip>:<port>
// 例如 -ASK 10086 127.0.0.1:12345
addReplySds(c,sdscatprintf(sdsempty(),
"-%s %d %s:%d\r\n",
(error_code == REDIS_CLUSTER_REDIR_ASK) ? "ASK" : "MOVED",
hashslot,n->ip,n->port));
return REDIS_OK;
}
// 如果执行到这里,说明键 key 所在的槽由本节点处理
// 或者客户端执行的是无参数命令
}
}
上面的代码段就是集群模式下处理命令请求的入口。其中getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&error_code)这个函数就是找到处理命令对应的redis 实例。
我们看到入参有 error_code这个参数,这个参数会在此函数中被赋值。那么它有几个值呢?每个不同的code码,都是代表什么意思呢?下面给出几个枚举值
/* 由 getNodeByQuery() 函数返回的转向错误。 /
// 节点可以处理这个命令
#define REDIS_CLUSTER_REDIR_NONE 0 / Node can serve the request. /
// 键在其他槽
#define REDIS_CLUSTER_REDIR_CROSS_SLOT 1 / Keys in different slots. /
// 键所处的槽正在进行 reshard
#define REDIS_CLUSTER_REDIR_UNSTABLE 2 / Keys in slot resharding. /
// 需要进行 ASK 转向
#define REDIS_CLUSTER_REDIR_ASK 3 / -ASK redirection required. */
// 需要进行 MOVED 转向
#define REDIS_CLUSTER_REDIR_MOVED 4
我们这里先给出这几个值,后续源码中会有关于这几个值的解释。
我们开始分析getNodeByQuery()这个函数
```c
/*
该函数返回处理命令的集群节点
Return the pointer to the cluster node that is able to serve the command.
集群处理的命令只能是1.单个key 2.多个key,但是这些key对应的redis 实例是同一个,并且集群是稳定的【未在进行重新分片】
* For the function to succeed the command should only target either:
*
* 1) A single key (even multiple times like LPOPRPUSH mylist mylist).
* 2) Multiple keys in the same hash slot, while the slot is stable (no
* resharding in progress).
*
如果成功,该函数返回能够处理命令请求的redis 实例
* On success the function returns the node that is able to serve the request.
* 如果节点不是 'myself',会执行重定向
* If the node is not 'myself' a redirection must be perfomed.
*
重定向的方式有 ask和remvoed两种
The kind of
* redirection is specified setting the integer passed by reference
* 'error_code', which will be set to REDIS_CLUSTER_REDIR_ASK or
* REDIS_CLUSTER_REDIR_MOVED.
*
当处理命令的节点是 `myself`, error_code被赋值为REDIS_CLUSTER_REDIR_NONE
* When the node is 'myself' 'error_code' is set to REDIS_CLUSTER_REDIR_NONE.
*当这个命令不能处理时,返回NULL,error_code被赋值为原因
* If the command fails NULL is returned, and the reason of the failure is
* provided via 'error_code', which will be set to:
*当这个命令包含多个key并且这些key不在同一个slot中时,
error_code被赋值为 REDIS_CLUSTER_REDIR_CROSS_SLOT
* REDIS_CLUSTER_REDIR_CROSS_SLOT if the request contains multiple keys that
* don't belong to the same hash slot.
*
当命令中含有多个key,并且这些key同属一个slot,但是集群正在resharding,error_code被赋值为RREDIS_CLUSTER_REDIR_UNSTABLE
* REDIS_CLUSTER_REDIR_UNSTABLE if the request contains mutliple keys
* belonging to the same slot, but the slot is not stable (in migration or
* importing state, likely because a resharding is in progress). */
clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *error_code) {
// 初始化为 NULL ,
// 如果输入命令是无参数命令,那么 n 就会继续为 NULL
clusterNode *n = NULL;
robj *firstkey = NULL;
int multiple_keys = 0;
multiState *ms, _ms;
multiCmd mc;
int i, slot = 0, migrating_slot = 0, importing_slot = 0, missing_keys = 0;
/* Set error code optimistically for the base case. */
if (error_code) *error_code = REDIS_CLUSTER_REDIR_NONE;
/* We handle all the cases as if they were EXEC commands, so we have
* a common code path for everything */
// 集群可以执行事务,
// 但必须确保事务中的所有命令都是针对某个相同的键进行的
// 这个 if 和接下来的 for 进行的就是这一合法性检测
if (cmd->proc == execCommand) {
/* If REDIS_MULTI flag is not set EXEC is just going to return an
* error. */
if (!(c->flags & REDIS_MULTI)) return myself;
ms = &c->mstate;
} else {
/* In order to have a single codepath create a fake Multi State
* structure if the client is not in MULTI/EXEC state, this way
* we have a single codepath below. */
ms = &_ms;
_ms.commands = &mc;
_ms.count = 1;
mc.argv = argv;
mc.argc = argc;
mc.cmd = cmd;
}
/* Check that all the keys are in the same hash slot, and obtain this
* slot and the node associated. */
for (i = 0; i < ms->count; i++) {
struct redisCommand *mcmd;
robj **margv;
int margc, *keyindex, numkeys, j;
mcmd = ms->commands[i].cmd;
margc = ms->commands[i].argc;
margv = ms->commands[i].argv;
// 定位命令的键位置
keyindex = getKeysFromCommand(mcmd,margv,margc,&numkeys);
// 遍历命令中的所有键
for (j = 0; j < numkeys; j++) {
robj *thiskey = margv[keyindex[j]];
int thisslot = keyHashSlot((char*)thiskey->ptr,
sdslen(thiskey->ptr));
if (firstkey == NULL) {
// 这是事务中第一个被处理的键
// 获取该键的槽和负责处理该槽的节点
/* This is the first key we see. Check what is the slot
* and node. */
firstkey = thiskey;
slot = thisslot;
n = server.cluster->slots[slot];
redisAssertWithInfo(c,firstkey,n != NULL);
/* If we are migrating or importing this slot, we need to check
* if we have all the keys in the request (the only way we
* can safely serve the request, otherwise we return a TRYAGAIN
* error). To do so we set the importing/migrating state and
* increment a counter for every missing key. */
if (n == myself &&
server.cluster->migrating_slots_to[slot] != NULL)
{
migrating_slot = 1;
} else if (server.cluster->importing_slots_from[slot] != NULL) {
importing_slot = 1;
}
} else {
/* If it is not the first key, make sure it is exactly
* the same key as the first we saw. */
if (!equalStringObjects(firstkey,thiskey)) {
if (slot != thisslot) {
/* Error: multiple keys from different slots. */
getKeysFreeResult(keyindex);
if (error_code)
*error_code = REDIS_CLUSTER_REDIR_CROSS_SLOT;
return NULL;
} else {
/* Flag this request as one with multiple different
* keys. */
multiple_keys = 1;
}
}
}
/* Migarting / Improrting slot? Count keys we don't have. */
if ((migrating_slot || importing_slot) &&
lookupKeyRead(&server.db[0],thiskey) == NULL)
{
missing_keys++;
}
}
getKeysFreeResult(keyindex);
} // end for
/* No key at all in command? then we can serve the request
* without redirections or errors. */
if (n == NULL) return myself;
/* Return the hashslot by reference. */
if (hashslot) *hashslot = slot;
/* This request is about a slot we are migrating into another instance?
* Then if we have all the keys. */
/* If we don't have all the keys and we are migrating the slot, send
* an ASK redirection. */
if (migrating_slot && missing_keys) {
if (error_code) *error_code = REDIS_CLUSTER_REDIR_ASK;
return server.cluster->migrating_slots_to[slot];
}
/* If we are receiving the slot, and the client correctly flagged the
* request as "ASKING", we can serve the request. However if the request
* involves multiple keys and we don't have them all, the only option is
* to send a TRYAGAIN error. */
if (importing_slot &&
(c->flags & REDIS_ASKING || cmd->flags & REDIS_CMD_ASKING))
{
if (multiple_keys && missing_keys) {
if (error_code) *error_code = REDIS_CLUSTER_REDIR_UNSTABLE;
return NULL;
} else {
return myself;
}
}
/* Handle the read-only client case reading from a slave: if this
* node is a slave and the request is about an hash slot our master
* is serving, we can reply without redirection. */
if (c->flags & REDIS_READONLY &&
cmd->flags & REDIS_CMD_READONLY &&
nodeIsSlave(myself) &&
myself->slaveof == n)
{
return myself;
}
/* Base case: just return the right node. However if this node is not
* myself, set error_code to MOVED since we need to issue a rediretion. */
if (n != myself && error_code) *error_code = REDIS_CLUSTER_REDIR_MOVED;
// 返回负责处理槽 slot 的节点 n
return n;
}
以上就是 getQueryNode这个函数的功能,其实也比较简单,对着注释多看两遍就能看懂。无非就是找到处理命令的集群节点,如果找不到给error_code赋值原因。调用者根绝error_code做相应的处理。
2.在集群模式下我们应该注意哪些问题呢?
(1)在集群模式下应该尽量避免多key查询
(2)注意 key的散列与集群均衡
后记:参考书籍《redis设计与实现》《redis3.0源码》