/* ================================ MULTI/EXEC ============================== */ /* Client state initialization for MULTI/EXEC */ MULTI/EXEC的客户端状态初始化 void initClientMultiState(client *c) { c->mstate.commands = NULL; 初始化命令为空 c->mstate.count = 0; 命令个数为0 c->mstate.cmd_flags = 0; 命令标志 } typedef struct multiState { multiCmd *commands; /* Array of MULTI commands */ MULTI命令数组 int count; /* Total number of MULTI commands */ 总的MULTI命令数 int cmd_flags; /* The accumulated command flags OR-ed together. So if at least a command has a given flag, it will be set in this field. */ 累积使用异或操作在一起的命令标志,因此如果至少有一个命令有一个给定的标志,这个域就会被设值 int minreplicas; /* MINREPLICAS for synchronous replication */ 同步复制的时间 time_t minreplicas_timeout; /* MINREPLICAS timeout as unixtime. */同步复制 过期时间 } multiState; /* Release all the resources associated with MULTI/EXEC state */ 释放与MULTI/EXEC状态关联的所有资源 void freeClientMultiState(client *c) { int j; for (j = 0; j < c->mstate.count; j++) { int i; multiCmd *mc = c->mstate.commands+j; 根据j依次获取命令 for (i = 0; i < mc->argc; i++) decrRefCount(mc->argv[i]); 释放每个命令的参数 zfree(mc->argv); } zfree(c->mstate.commands); 释放命令指针本身 } /* Add a new command into the MULTI commands queue */ 添加一个新的命令到MULTI命令的队列中 void queueMultiCommand(client *c) { multiCmd *mc; int j; /* No sense to waste memory if the transaction is already aborted. * this is useful in case client sends these in a pipeline, or doesn't * bother to read previous responses and didn't notice the multi was already * aborted. */ 如果事务已经终止,浪费内存就毫无意义。当客户端在管道宏发出这些响应, 或者不必读之前的响应并且注意到multi命令已经终止,这将会非常有用 if (c->flags & CLIENT_DIRTY_EXEC) 事务终止了,直接返回 return; c->mstate.commands = zrealloc(c->mstate.commands, 给新命令分配内存,这里的+1就是这个作用 sizeof(multiCmd)*(c->mstate.count+1)); mc = c->mstate.commands+c->mstate.count; mc->cmd = c->cmd; 命令本身 mc->argc = c->argc; 该命令参数个数 mc->argv = zmalloc(sizeof(robj*)*c->argc); 给命令的具体参数分配内存 memcpy(mc->argv,c->argv,sizeof(robj*)*c->argc);拷贝具体的内容 for (j = 0; j < c->argc; j++) incrRefCount(mc->argv[j]); 对每个参数减少引用 c->mstate.count++; 命令数+1 c->mstate.cmd_flags |= c->cmd->flags; 设置命令标志到异或的总值 } 取消事务 void discardTransaction(client *c) { freeClientMultiState(c); 释放事务关联的所有的资源 initClientMultiState(c); 设置成初始化状态 c->flags &= ~(CLIENT_MULTI|CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC); 将标志清空 unwatchAllKeys(c); 取消客户端关注的键 } /* Flag the transacation as DIRTY_EXEC so that EXEC will fail. * Should be called every time there is an error while queueing a command. */ 标记事务为DIRTY_EXEC,这样命令EXEC就会失败。当命令排队时,每当有错误时都需要调用 void flagTransaction(client *c) { if (c->flags & CLIENT_MULTI) 是客户端事务 c->flags |= CLIENT_DIRTY_EXEC; 设置失败 } 开启事务命令 void multiCommand(client *c) { if (c->flags & CLIENT_MULTI) { 客户端事务不能嵌套 addReplyError(c,"MULTI calls can not be nested"); return; } c->flags |= CLIENT_MULTI; 设置成执行客户端事务标志 addReply(c,shared.ok); } 取消事务命令 void discardCommand(client *c) { if (!(c->flags & CLIENT_MULTI)) { 无客户端事务标志 addReplyError(c,"DISCARD without MULTI"); return; } discardTransaction(c); 取消事务 addReply(c,shared.ok); } /* Send a MULTI command to all the slaves and AOF file. Check the execCommand * implementation for more information. */ 发送一个MULTI命令到所有的从机和AOF文件。获取更多细节请查找执行命令的实现 传播事务开启 void execCommandPropagateMulti(client *c) { propagate(server.multiCommand,c->db->id,&shared.multi,1, PROPAGATE_AOF|PROPAGATE_REPL); } 传播执行命令 void execCommandPropagateExec(client *c) { propagate(server.execCommand,c->db->id,&shared.exec,1, PROPAGATE_AOF|PROPAGATE_REPL); } 执行命令 void execCommand(client *c) { int j; robj **orig_argv; int orig_argc; struct redisCommand *orig_cmd; int must_propagate = 0; /* Need to propagate MULTI/EXEC to AOF / slaves? */ 是否需要传播给 事务 AOF 或者从机 int was_master = server.masterhost == NULL; 判断是否是主机 if (!(c->flags & CLIENT_MULTI)) { 非事务 addReplyError(c,"EXEC without MULTI"); return; } /* If we are in -BUSY state, flag the transaction and return the * -BUSY error, like Redis <= 5. This is a temporary fix, may be changed * ASAP, see issue #7353 on Github. */ 如果系统在忙的状态,标记事务失败,返回忙错误。例如redis<=5.这是一个临时修改,会尽快修改。 具体情况见github上的问题7353。 if (server.lua_timedout) { flagTransaction(c); 设置事务失败 addReply(c, shared.slowscripterr); return; } /* Check if we need to abort the EXEC because: 检测我们是否需要终止执行EXEC * 1) Some WATCHed key was touched. 一些键被标记了 * 2) There was a previous error while queueing commands. 排队的命令中有一个之前的错误 * A failed EXEC in the first case returns a multi bulk nil object * (technically it is not an error but a special behavior), while * in the second an EXECABORT error is returned. */ 第一种情况下一个失败的EXEC返回一个事务的空对象(从技术角度讲这个不是错误是一个特殊状态), 诶二中情况一个EXECABORT会被返回 if (c->flags & (CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC)) { 有被标记的脏标识 addReply(c, c->flags & CLIENT_DIRTY_EXEC ? shared.execaborterr : shared.nullarray[c->resp]); discardTransaction(c); 取消事务 goto handle_monitor; } /* If there are write commands inside the transaction, and this is a read * only slave, we want to send an error. This happens when the transaction * was initiated when the instance was a master or a writable replica and * then the configuration changed (for example instance was turned into * a replica). */ 如果事务中有写命令,但是这个是一个只读的从机,我们需要发送一个错误信息。 这种情况发生在 当事务初始化的时候实例是主机或者可写的副本,然后配置被修改了(举例来说实例被转化为副本) if (!server.loading && server.masterhost && server.repl_slave_ro && !(c->flags & CLIENT_MASTER) && c->mstate.cmd_flags & CMD_WRITE) 不是正在加载 存在主(自身为从) 只读 非主机 命令可写 不可写的从机需要执行可写的事务,需要终止执行 { addReplyError(c, "Transaction contains write commands but instance " "is now a read-only replica. EXEC aborted."); discardTransaction(c); 取消事务 goto handle_monitor; } /* Exec all the queued commands */ 执行所有排队的命令 unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */尽快取消监视,否则我们会浪费CPU orig_argv = c->argv; orig_argc = c->argc; orig_cmd = c->cmd; addReplyArrayLen(c,c->mstate.count); for (j = 0; j < c->mstate.count; j++) { c->argc = c->mstate.commands[j].argc; c->argv = c->mstate.commands[j].argv; c->cmd = c->mstate.commands[j].cmd; /* Propagate a MULTI request once we encounter the first command which * is not readonly nor an administrative one. * This way we'll deliver the MULTI/..../EXEC block as a whole and * both the AOF and the replication link will have the same consistency * and atomicity guarantees. */ 传播一个MULTI的请求,当我们遇到一个不是只读或者管理命令时。 通过这种方式,我们会发送一整块MULTI/..../EXEC当做一个整体,无论是AOF还是复制链路将会拥有相同的一致性和原子性保证 if (!must_propagate && !server.loading && 不正在加载 !(c->cmd->flags & (CMD_READONLY|CMD_ADMIN))) 非 只读 和 管理命令 { execCommandPropagateMulti(c);开启传播事务 must_propagate = 1; } int acl_keypos; int acl_retval = ACLCheckCommandPerm(c,&acl_keypos); 检测ACL规则是否可执行 if (acl_retval != ACL_OK) { 不可执行 addACLLogEntry(c,acl_retval,acl_keypos,NULL); 将信息添加到acl的日志 addReplyErrorFormat(c, "-NOPERM ACLs rules changed between the moment the " "transaction was accumulated and the EXEC call. " "This command is no longer allowed for the " "following reason: %s", (acl_retval == ACL_DENIED_CMD) ? "no permission to execute the command or subcommand" : "no permission to touch the specified keys"); } else { call(c,server.loading ? CMD_CALL_NONE : CMD_CALL_FULL); 执行命令 } /* Commands may alter argc/argv, restore mstate. */ 命令可能改变argc/argv参数,恢复mstate状态。 c->mstate.commands[j].argc = c->argc; c->mstate.commands[j].argv = c->argv; c->mstate.commands[j].cmd = c->cmd; } c->argv = orig_argv; c->argc = orig_argc; c->cmd = orig_cmd; discardTransaction(c);取消事务 /* Make sure the EXEC command will be propagated as well if MULTI * was already propagated. */ 如果已经传播了MULTI,请确保EXEC命令也将被传播 if (must_propagate) { int is_master = server.masterhost == NULL; 是否是主机,主机不需要传播 server.dirty++; 有变动 /* If inside the MULTI/EXEC block this instance was suddenly * switched from master to slave (using the SLAVEOF command), the * initial MULTI was propagated into the replication backlog, but the * rest was not. We need to make sure to at least terminate the * backlog with the final EXEC. */ 如果在事务块中,这个实例突然从主机转化为从机(使用SLAVEOF命令), 初始的事务会被传播到复制的积压工作中,但是剩下的不会。 我们需要确保至少用最后的exec执行积压的复制信息 if (server.repl_backlog && was_master && !is_master) { 有积压的赋值工作 原来是主机 现在不是主机 char *execcmd = "*1\r\n$4\r\nEXEC\r\n"; 执行命令 feedReplicationBacklog(execcmd,strlen(execcmd)); 将执行命令放到复制的积压信息中,这样前面的积压命令就会被执行 } } handle_monitor: /* Send EXEC to clients waiting data from MONITOR. We do it here * since the natural order of commands execution is actually: * MUTLI, EXEC, ... commands inside transaction ... * Instead EXEC is flagged as CMD_SKIP_MONITOR in the command * table, and we do it here with correct ordering. */ 发送执行命令给等待监视数据的客户端。我们这里这样做,是因为命令执行的自然顺序实际上是: MUTLI, EXEC, ... 事务中的命令,相反,EXEC在命令表中被标记为CMD_SKIP_MONITOR,我们在这里按照正确的顺序执行 if (listLength(server.monitors) && !server.loading) 如果监视器不为空 并且不是正在加载文件 replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc); 向监视器发送命令信息 } /* ===================== WATCH (CAS alike for MULTI/EXEC) =================== 监视(适用于MULTI/EXEC的CAS) CAS( Compare And Swap ) * * The implementation uses a per-DB hash table mapping keys to list of clients * WATCHing those keys, so that given a key that is going to be modified * we can mark all the associated clients as dirty. 该实现使用每个数据库的哈希表将键映射到监视这些键的客户端列表, 因此给定将要修改的键,我们能够标记所有关联的客户端为脏 * Also every client contains a list of WATCHed keys so that's possible to * un-watch such keys when the client is freed or when UNWATCH is called. */ 虽然每个客户端包含监视键的列表,因此可能取消这样的键的监视,当客户端已经释放或者取消监视被调用 /* In the client->watched_keys list we need to use watchedKey structures * as in order to identify a key in Redis we need both the key name and the * DB */ 在客户端的监视键列表中,我们需要使用监视键的结构,为了标识redis中断额键,我们需要键名和对应的数据库 typedef struct watchedKey { robj *key; redisDb *db; } watchedKey; /* Watch for the specified key */ void watchForKey(client *c, robj *key) { list *clients = NULL; listIter li; listNode *ln; watchedKey *wk; /* Check if we are already watching for this key */ 检查我们是否已经监视了这个键 listRewind(c->watched_keys,&li); while((ln = listNext(&li))) { wk = listNodeValue(ln); if (wk->db == c->db && equalStringObjects(key,wk->key)) 数据库一样 键一样 相等 return; /* Key already watched */ 已经被监视 } /* This key is not already watched in this DB. Let's add it */ 这个数据库的这个键还没有被监视,让我们添加它 clients = dictFetchValue(c->db->watched_keys,key); if (!clients) { clients = listCreate(); 创建监视列表 dictAdd(c->db->watched_keys,key,clients);添加列表为监视键的值 incrRefCount(key);增加引用计数 } listAddNodeTail(clients,c); 将当前客户端添加到监视键的列表中,这样主服务器有消息就可以通知这些监视的客户端 /* Add the new key to the list of keys watched by this client */ 将新键添加到此客户端监视的键列表中 wk = zmalloc(sizeof(*wk)); 为新对象分配空间 wk->key = key; 键 wk->db = c->db; 数据库 incrRefCount(key); 增加键的引用 listAddNodeTail(c->watched_keys,wk);添加到客户端的监视键列表中 } /* Unwatch all the keys watched by this client. To clean the EXEC dirty * flag is up to the caller. */ 由客户端除去对所有监视键的监视。由调用者清除命令EXEC的脏标志 void unwatchAllKeys(client *c) { listIter li; listNode *ln; if (listLength(c->watched_keys) == 0) return; 没有监视的键 listRewind(c->watched_keys,&li);非空开始初始化遍历 while((ln = listNext(&li))) { 逐个遍历 list *clients; watchedKey *wk; /* Lookup the watched key -> clients list and remove the client * from the list */ 检查数据库中被监视的键中是否有这个客户端,如果有就从列表中移除这个客户端 wk = listNodeValue(ln); clients = dictFetchValue(wk->db->watched_keys, wk->key); serverAssertWithInfo(c,NULL,clients != NULL); 键的客户端列表不为空 listDelNode(clients,listSearchKey(clients,c)); 从列表中删除客户端 /* Kill the entry at all if this was the only client */ 如果这个是列表唯一的客户端了,删除这个列表对应的字典元素 if (listLength(clients) == 0) 列表的元素为空 dictDelete(wk->db->watched_keys, wk->key); 从字典删除这个列表对应的键和值 /* Remove this watched key from the client->watched list */ 从客户端列表删除监视的键 listDelNode(c->watched_keys,ln); 从客户端列表删除键 decrRefCount(wk->key);减少引用 zfree(wk);释放监视的键 } } /* "Touch" a key, so that if this key is being WATCHed by some client the * next EXEC will fail. */ 标记一个键,如果这个键正在被一些客户端监视,那么杰西莱的命令EXEC将会失败 void touchWatchedKey(redisDb *db, robj *key) { list *clients; listIter li; listNode *ln; if (dictSize(db->watched_keys) == 0) return; 监视的键为空 clients = dictFetchValue(db->watched_keys, key); 不为空的情况下,获取监视键的客户端列表 if (!clients) return; 如果监视的客户端列表已空,就返回 /* Mark all the clients watching this key as CLIENT_DIRTY_CAS */ /* Check if we are already watching for this key */ 标记所有监视这个键的客户端为CLIENT_DIRTY_CAS,检测我们是否已经监视这个键 listRewind(clients,&li); while((ln = listNext(&li))) { client *c = listNodeValue(ln); 获取客户端 c->flags |= CLIENT_DIRTY_CAS; 给每个客户端增加标记CLIENT_DIRTY_CAS } } /* On FLUSHDB or FLUSHALL all the watched keys that are present before the * flush but will be deleted as effect of the flushing operation should * be touched. "dbid" is the DB that's getting the flush. -1 if it is * a FLUSHALL operation (all the DBs flushed). */ 当做FLUSHDB 或者 FLUSHALL时, 所有在刷新前出现的被监视的键,作为刷新操作的功能将会被删除, 所以应当标记。dbid是正在做刷新的数据库。 如果是FLUSHALL操作(所有数据库做刷新)那就是-1 void touchWatchedKeysOnFlush(int dbid) { listIter li1, li2; listNode *ln; /* For every client, check all the waited keys */ 对每个客户端,检测所有的被监视键 listRewind(server.clients,&li1); while((ln = listNext(&li1))) { 对每个客户端 client *c = listNodeValue(ln); listRewind(c->watched_keys,&li2);每个客户端监视的键列表 while((ln = listNext(&li2))) { 遍历监视键列表 watchedKey *wk = listNodeValue(ln); /* For every watched key matching the specified DB, if the * key exists, mark the client as dirty, as the key will be * removed. */ 对每个监视键匹配指定的数据库,如果键存在,标记客户端为脏,因为这个键会被移除 if (dbid == -1 || wk->db->id == dbid) { 所有数据库刷新 或者 单个数据库刷新 if (dictFind(wk->db->dict, wk->key->ptr) != NULL) 确认在数据库中 c->flags |= CLIENT_DIRTY_CAS; 增加标记CLIENT_DIRTY_CAS } } } } 监视命令 void watchCommand(client *c) { int j; if (c->flags & CLIENT_MULTI) { 不能嵌套事务 addReplyError(c,"WATCH inside MULTI is not allowed"); return; } for (j = 1; j < c->argc; j++) watchForKey(c,c->argv[j]); 对每个参数键进行监视 addReply(c,shared.ok); } void unwatchCommand(client *c) { unwatchAllKeys(c); 清除所有监视的键 c->flags &= (~CLIENT_DIRTY_CAS); 减少标记CLIENT_DIRTY_CAS addReply(c,shared.ok); }