redis 处理命令的过程

redis版本:redis-3.2.9

在客户端输入 set name zhang,调试redis服务器,得到调用栈如下:

redis 处理命令的过程

在dictReplace中加了断点,结果跳出来4个线程,redis还是单进程单线程吗?

上图的调用栈漏了一个栈帧:aeProcessEvents -> (networking.c) readQueryFromClient -> (networking.c) processInputBuffer

aeMain 事件循环

void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = ;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
}

aeProcessEvents 先处理文件事件(使用 epoll 选择文件事件),再处理时间事件

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = , numevents; /* Nothing to do? return ASAP */
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return ; /* Note that we want call select() even if there are no
* file events to process as long as we want to process time
* events, in order to sleep until the next time event is ready
* to fire. */
if (eventLoop->maxfd != - ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
int j;
aeTimeEvent *shortest = NULL;
struct timeval tv, *tvp; if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
shortest = aeSearchNearestTimer(eventLoop);
if (shortest) {
long now_sec, now_ms; aeGetTime(&now_sec, &now_ms);
tvp = &tv; /* How many milliseconds we need to wait for the next
* time event to fire? */
long long ms =
(shortest->when_sec - now_sec)* +
shortest->when_ms - now_ms; if (ms > ) {
tvp->tv_sec = ms/;
tvp->tv_usec = (ms % )*;
} else {
tvp->tv_sec = ;
tvp->tv_usec = ;
}
} else {
/* If we have to check for events but need to return
* ASAP because of AE_DONT_WAIT we need to set the timeout
* to zero */
if (flags & AE_DONT_WAIT) {
tv.tv_sec = tv.tv_usec = ;
tvp = &tv;
} else {
/* Otherwise we can block */
tvp = NULL; /* wait forever */
}
} numevents = aeApiPoll(eventLoop, tvp);
for (j = ; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int rfired = ; /* note the fe->mask & mask & ... code: maybe an already processed
* event removed an element that fired and we still didn't
* processed, so we check if the event is still valid. */
if (fe->mask & mask & AE_READABLE) {
rfired = ;
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
if (fe->mask & mask & AE_WRITABLE) {
if (!rfired || fe->wfileProc != fe->rfileProc)
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}
processed++;
}
}
/* Check time events */
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop); return processed; /* return the number of processed file/time events */
}

readQueryFromClient 客户端的命令通过网络传输到达 server,读取命令,设置到 client 的 querybuf 中

void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
client *c = (client*) privdata;
int nread, readlen;
size_t qblen;
UNUSED(el);
UNUSED(mask); readlen = PROTO_IOBUF_LEN;
/* If this is a multi bulk request, and we are processing a bulk reply
* that is large enough, try to maximize the probability that the query
* buffer contains exactly the SDS string representing the object, even
* at the risk of requiring more read(2) calls. This way the function
* processMultiBulkBuffer() can avoid copying buffers to create the
* Redis Object representing the argument. */
if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -
&& c->bulklen >= PROTO_MBULK_BIG_ARG)
{
int remaining = (unsigned)(c->bulklen+)-sdslen(c->querybuf); if (remaining < readlen) readlen = remaining;
} qblen = sdslen(c->querybuf);
if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
nread = read(fd, c->querybuf+qblen, readlen);
if (nread == -) {
if (errno == EAGAIN) {
return;
} else {
serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno));
freeClient(c);
return;
}
} else if (nread == ) {
serverLog(LL_VERBOSE, "Client closed connection");
freeClient(c);
return;
} sdsIncrLen(c->querybuf,nread);
c->lastinteraction = server.unixtime;
if (c->flags & CLIENT_MASTER) c->reploff += nread;
server.stat_net_input_bytes += nread;
if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty(); bytes = sdscatrepr(bytes,c->querybuf,);
serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
sdsfree(ci);
sdsfree(bytes);
freeClient(c);
return;
}
processInputBuffer(c);
}

processInputBuffer 客户端的命令全部保存在 client 的 querybuf 属性中,可能包含多条命令,例如管道,从 querybuf 中逐条解析命令,并设置 client 的 argc 和 argv 属性。

一条命令对应一个 argc 和 argv。例如 get name,此时 argv[0] 为 get,argv[1] 为 name。

void processInputBuffer(client *c) {
server.current_client = c;
/* Keep processing while there is something in the input buffer */
while(sdslen(c->querybuf)) {
/* Return if clients are paused. */
if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break; /* Immediately abort if the client is in the middle of something. */
if (c->flags & CLIENT_BLOCKED) break; /* CLIENT_CLOSE_AFTER_REPLY closes the connection once the reply is
* written to the client. Make sure to not let the reply grow after
* this flag has been set (i.e. don't process more commands).
*
* The same applies for clients we want to terminate ASAP. */
if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break; /* Determine request type when unknown. */
if (!c->reqtype) {
if (c->querybuf[] == '*') {
c->reqtype = PROTO_REQ_MULTIBULK;
} else {
c->reqtype = PROTO_REQ_INLINE;
}
} if (c->reqtype == PROTO_REQ_INLINE) {
if (processInlineBuffer(c) != C_OK) break;
} else if (c->reqtype == PROTO_REQ_MULTIBULK) {
if (processMultibulkBuffer(c) != C_OK) break;
} else {
serverPanic("Unknown request type");
} /* Multibulk processing could see a <= 0 length. */
if (c->argc == ) {
resetClient(c);
} else {
/* Only reset the client when the command was executed. */
if (processCommand(c) == C_OK)
resetClient(c);
/* freeMemoryIfNeeded may flush slave output buffers. This may result
* into a slave, that may be the active client, to be freed. */
if (server.current_client == NULL) break;
}
}
server.current_client = NULL;
}

processCommand 根据命令名(即 argv[0])寻找对应的命令函数,给 c->cmd 赋值,以 'get name' 为例,c->cmd = {"get",getCommand,2,"rF",0,NULL,1,1,1,0,0}

int processCommand(client *c) {
/* The QUIT command is handled separately. Normal command procs will
* go through checking for replication and QUIT will cause trouble
* when FORCE_REPLICATION is enabled and would be implemented in
* a regular command proc. */
if (!strcasecmp(c->argv[]->ptr,"quit")) {
addReply(c,shared.ok);
c->flags |= CLIENT_CLOSE_AFTER_REPLY;
return C_ERR;
} /* Now lookup the command and check ASAP about trivial error conditions
* such as wrong arity, bad command name and so forth. */
c->cmd = c->lastcmd = lookupCommand(c->argv[]->ptr);
if (!c->cmd) {
flagTransaction(c);
addReplyErrorFormat(c,"unknown command '%s'",
(char*)c->argv[]->ptr);
return C_OK;
} else if ((c->cmd->arity > && c->cmd->arity != c->argc) ||
(c->argc < -c->cmd->arity)) {
flagTransaction(c);
addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
c->cmd->name);
return C_OK;
} /* Check if the user is authenticated */
if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand)
{
flagTransaction(c);
addReply(c,shared.noautherr);
return C_OK;
} /* If cluster is enabled perform the cluster redirection here.
* However we don't perform the redirection if:
* 1) The sender of this command is our master.
* 2) The command has no key arguments. */
if (server.cluster_enabled &&
!(c->flags & CLIENT_MASTER) &&
!(c->flags & CLIENT_LUA &&
server.lua_caller->flags & CLIENT_MASTER) &&
!(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == &&
c->cmd->proc != execCommand))
{
int hashslot;
int error_code;
clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,
&hashslot,&error_code);
if (n == NULL || n != server.cluster->myself) {
if (c->cmd->proc == execCommand) {
discardTransaction(c);
} else {
flagTransaction(c);
}
clusterRedirectClient(c,n,hashslot,error_code);
return C_OK;
}
} /* Handle the maxmemory directive.
*
* First we try to free some memory if possible (if there are volatile
* keys in the dataset). If there are not the only thing we can do
* is returning an error. */
if (server.maxmemory) {
int retval = freeMemoryIfNeeded();
/* freeMemoryIfNeeded may flush slave output buffers. This may result
* into a slave, that may be the active client, to be freed. */
if (server.current_client == NULL) return C_ERR; /* It was impossible to free enough memory, and the command the client
* is trying to execute is denied during OOM conditions? Error. */
if ((c->cmd->flags & CMD_DENYOOM) && retval == C_ERR) {
flagTransaction(c);
addReply(c, shared.oomerr);
return C_OK;
}
} /* Don't accept write commands if there are problems persisting on disk
* and if this is a master instance. */
if (((server.stop_writes_on_bgsave_err &&
server.saveparamslen > &&
server.lastbgsave_status == C_ERR) ||
server.aof_last_write_status == C_ERR) &&
server.masterhost == NULL &&
(c->cmd->flags & CMD_WRITE ||
c->cmd->proc == pingCommand))
{
flagTransaction(c);
if (server.aof_last_write_status == C_OK)
addReply(c, shared.bgsaveerr);
else
addReplySds(c,
sdscatprintf(sdsempty(),
"-MISCONF Errors writing to the AOF file: %s\r\n",
strerror(server.aof_last_write_errno)));
return C_OK;
} /* Don't accept write commands if there are not enough good slaves and
* user configured the min-slaves-to-write option. */
if (server.masterhost == NULL &&
server.repl_min_slaves_to_write &&
server.repl_min_slaves_max_lag &&
c->cmd->flags & CMD_WRITE &&
server.repl_good_slaves_count < server.repl_min_slaves_to_write)
{
flagTransaction(c);
addReply(c, shared.noreplicaserr);
return C_OK;
} /* Don't accept write commands if this is a read only slave. But
* accept write commands if this is our master. */
if (server.masterhost && server.repl_slave_ro &&
!(c->flags & CLIENT_MASTER) &&
c->cmd->flags & CMD_WRITE)
{
addReply(c, shared.roslaveerr);
return C_OK;
} /* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */
if (c->flags & CLIENT_PUBSUB &&
c->cmd->proc != pingCommand &&
c->cmd->proc != subscribeCommand &&
c->cmd->proc != unsubscribeCommand &&
c->cmd->proc != psubscribeCommand &&
c->cmd->proc != punsubscribeCommand) {
addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context");
return C_OK;
} /* Only allow INFO and SLAVEOF when slave-serve-stale-data is no and
* we are a slave with a broken link with master. */
if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED &&
server.repl_serve_stale_data == &&
!(c->cmd->flags & CMD_STALE))
{
flagTransaction(c);
addReply(c, shared.masterdownerr);
return C_OK;
} /* Loading DB? Return an error if the command has not the
* CMD_LOADING flag. */
if (server.loading && !(c->cmd->flags & CMD_LOADING)) {
addReply(c, shared.loadingerr);
return C_OK;
} /* Lua script too slow? Only allow a limited number of commands. */
if (server.lua_timedout &&
c->cmd->proc != authCommand &&
c->cmd->proc != replconfCommand &&
!(c->cmd->proc == shutdownCommand &&
c->argc == &&
tolower(((char*)c->argv[]->ptr)[]) == 'n') &&
!(c->cmd->proc == scriptCommand &&
c->argc == &&
tolower(((char*)c->argv[]->ptr)[]) == 'k'))
{
flagTransaction(c);
addReply(c, shared.slowscripterr);
return C_OK;
} /* Exec the command */
if (c->flags & CLIENT_MULTI &&
c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
{
queueMultiCommand(c);
addReply(c,shared.queued);
} else {
call(c,CMD_CALL_FULL);
c->woff = server.master_repl_offset;
if (listLength(server.ready_keys))
handleClientsBlockedOnLists();
}
return C_OK;
}

redis 所有的命令存硬编码在一个数组中,数组元素包含命令名,执行函数等属性

struct redisCommand redisCommandTable[] = {
{"get",getCommand,,"rF",,NULL,,,,,},
{"set",setCommand,-,"wm",,NULL,,,,,},
{"setnx",setnxCommand,,"wmF",,NULL,,,,,},
{"setex",setexCommand,,"wm",,NULL,,,,,},
{"psetex",psetexCommand,,"wm",,NULL,,,,,},
{"append",appendCommand,,"wm",,NULL,,,,,},
{"strlen",strlenCommand,,"rF",,NULL,,,,,},
{"del",delCommand,-,"w",,NULL,,-,,,},
{"exists",existsCommand,-,"rF",,NULL,,-,,,},
{"setbit",setbitCommand,,"wm",,NULL,,,,,},
{"getbit",getbitCommand,,"rF",,NULL,,,,,},
{"bitfield",bitfieldCommand,-,"wm",,NULL,,,,,},
{"setrange",setrangeCommand,,"wm",,NULL,,,,,},
{"getrange",getrangeCommand,,"r",,NULL,,,,,},
{"substr",getrangeCommand,,"r",,NULL,,,,,},
{"incr",incrCommand,,"wmF",,NULL,,,,,},
{"decr",decrCommand,,"wmF",,NULL,,,,,},
{"mget",mgetCommand,-,"r",,NULL,,-,,,},
{"rpush",rpushCommand,-,"wmF",,NULL,,,,,},
{"lpush",lpushCommand,-,"wmF",,NULL,,,,,},
{"rpushx",rpushxCommand,,"wmF",,NULL,,,,,},
{"lpushx",lpushxCommand,,"wmF",,NULL,,,,,},
{"linsert",linsertCommand,,"wm",,NULL,,,,,},
{"rpop",rpopCommand,,"wF",,NULL,,,,,},
{"lpop",lpopCommand,,"wF",,NULL,,,,,},
{"brpop",brpopCommand,-,"ws",,NULL,,,,,},
{"brpoplpush",brpoplpushCommand,,"wms",,NULL,,,,,},
{"blpop",blpopCommand,-,"ws",,NULL,,-,,,},
{"llen",llenCommand,,"rF",,NULL,,,,,},
{"lindex",lindexCommand,,"r",,NULL,,,,,},
{"lset",lsetCommand,,"wm",,NULL,,,,,},
{"lrange",lrangeCommand,,"r",,NULL,,,,,},
{"ltrim",ltrimCommand,,"w",,NULL,,,,,},
{"lrem",lremCommand,,"w",,NULL,,,,,},
{"rpoplpush",rpoplpushCommand,,"wm",,NULL,,,,,},
{"sadd",saddCommand,-,"wmF",,NULL,,,,,},
{"srem",sremCommand,-,"wF",,NULL,,,,,},
{"smove",smoveCommand,,"wF",,NULL,,,,,},
{"sismember",sismemberCommand,,"rF",,NULL,,,,,},
{"scard",scardCommand,,"rF",,NULL,,,,,},
{"spop",spopCommand,-,"wRF",,NULL,,,,,},
{"srandmember",srandmemberCommand,-,"rR",,NULL,,,,,},
{"sinter",sinterCommand,-,"rS",,NULL,,-,,,},
{"sinterstore",sinterstoreCommand,-,"wm",,NULL,,-,,,},
{"sunion",sunionCommand,-,"rS",,NULL,,-,,,},
{"sunionstore",sunionstoreCommand,-,"wm",,NULL,,-,,,},
{"sdiff",sdiffCommand,-,"rS",,NULL,,-,,,},
{"sdiffstore",sdiffstoreCommand,-,"wm",,NULL,,-,,,},
{"smembers",sinterCommand,,"rS",,NULL,,,,,},
{"sscan",sscanCommand,-,"rR",,NULL,,,,,},
{"zadd",zaddCommand,-,"wmF",,NULL,,,,,},
{"zincrby",zincrbyCommand,,"wmF",,NULL,,,,,},
{"zrem",zremCommand,-,"wF",,NULL,,,,,},
{"zremrangebyscore",zremrangebyscoreCommand,,"w",,NULL,,,,,},
{"zremrangebyrank",zremrangebyrankCommand,,"w",,NULL,,,,,},
{"zremrangebylex",zremrangebylexCommand,,"w",,NULL,,,,,},
{"zunionstore",zunionstoreCommand,-,"wm",,zunionInterGetKeys,,,,,},
{"zinterstore",zinterstoreCommand,-,"wm",,zunionInterGetKeys,,,,,},
{"zrange",zrangeCommand,-,"r",,NULL,,,,,},
{"zrangebyscore",zrangebyscoreCommand,-,"r",,NULL,,,,,},
{"zrevrangebyscore",zrevrangebyscoreCommand,-,"r",,NULL,,,,,},
{"zrangebylex",zrangebylexCommand,-,"r",,NULL,,,,,},
{"zrevrangebylex",zrevrangebylexCommand,-,"r",,NULL,,,,,},
{"zcount",zcountCommand,,"rF",,NULL,,,,,},
{"zlexcount",zlexcountCommand,,"rF",,NULL,,,,,},
{"zrevrange",zrevrangeCommand,-,"r",,NULL,,,,,},
{"zcard",zcardCommand,,"rF",,NULL,,,,,},
{"zscore",zscoreCommand,,"rF",,NULL,,,,,},
{"zrank",zrankCommand,,"rF",,NULL,,,,,},
{"zrevrank",zrevrankCommand,,"rF",,NULL,,,,,},
{"zscan",zscanCommand,-,"rR",,NULL,,,,,},
{"hset",hsetCommand,,"wmF",,NULL,,,,,},
{"hsetnx",hsetnxCommand,,"wmF",,NULL,,,,,},
{"hget",hgetCommand,,"rF",,NULL,,,,,},
{"hmset",hmsetCommand,-,"wm",,NULL,,,,,},
{"hmget",hmgetCommand,-,"r",,NULL,,,,,},
{"hincrby",hincrbyCommand,,"wmF",,NULL,,,,,},
{"hincrbyfloat",hincrbyfloatCommand,,"wmF",,NULL,,,,,},
{"hdel",hdelCommand,-,"wF",,NULL,,,,,},
{"hlen",hlenCommand,,"rF",,NULL,,,,,},
{"hstrlen",hstrlenCommand,,"rF",,NULL,,,,,},
{"hkeys",hkeysCommand,,"rS",,NULL,,,,,},
{"hvals",hvalsCommand,,"rS",,NULL,,,,,},
{"hgetall",hgetallCommand,,"r",,NULL,,,,,},
{"hexists",hexistsCommand,,"rF",,NULL,,,,,},
{"hscan",hscanCommand,-,"rR",,NULL,,,,,},
{"incrby",incrbyCommand,,"wmF",,NULL,,,,,},
{"decrby",decrbyCommand,,"wmF",,NULL,,,,,},
{"incrbyfloat",incrbyfloatCommand,,"wmF",,NULL,,,,,},
{"getset",getsetCommand,,"wm",,NULL,,,,,},
{"mset",msetCommand,-,"wm",,NULL,,-,,,},
{"msetnx",msetnxCommand,-,"wm",,NULL,,-,,,},
{"randomkey",randomkeyCommand,,"rR",,NULL,,,,,},
{"select",selectCommand,,"lF",,NULL,,,,,},
{"move",moveCommand,,"wF",,NULL,,,,,},
{"rename",renameCommand,,"w",,NULL,,,,,},
{"renamenx",renamenxCommand,,"wF",,NULL,,,,,},
{"expire",expireCommand,,"wF",,NULL,,,,,},
{"expireat",expireatCommand,,"wF",,NULL,,,,,},
{"pexpire",pexpireCommand,,"wF",,NULL,,,,,},
{"pexpireat",pexpireatCommand,,"wF",,NULL,,,,,},
{"keys",keysCommand,,"rS",,NULL,,,,,},
{"scan",scanCommand,-,"rR",,NULL,,,,,},
{"dbsize",dbsizeCommand,,"rF",,NULL,,,,,},
{"auth",authCommand,,"sltF",,NULL,,,,,},
{"ping",pingCommand,-,"tF",,NULL,,,,,},
{"echo",echoCommand,,"F",,NULL,,,,,},
{"save",saveCommand,,"as",,NULL,,,,,},
{"bgsave",bgsaveCommand,-,"a",,NULL,,,,,},
{"bgrewriteaof",bgrewriteaofCommand,,"a",,NULL,,,,,},
{"shutdown",shutdownCommand,-,"alt",,NULL,,,,,},
{"lastsave",lastsaveCommand,,"RF",,NULL,,,,,},
{"type",typeCommand,,"rF",,NULL,,,,,},
{"multi",multiCommand,,"sF",,NULL,,,,,},
{"exec",execCommand,,"sM",,NULL,,,,,},
{"discard",discardCommand,,"sF",,NULL,,,,,},
{"sync",syncCommand,,"ars",,NULL,,,,,},
{"psync",syncCommand,,"ars",,NULL,,,,,},
{"replconf",replconfCommand,-,"aslt",,NULL,,,,,},
{"flushdb",flushdbCommand,,"w",,NULL,,,,,},
{"flushall",flushallCommand,,"w",,NULL,,,,,},
{"sort",sortCommand,-,"wm",,sortGetKeys,,,,,},
{"info",infoCommand,-,"lt",,NULL,,,,,},
{"monitor",monitorCommand,,"as",,NULL,,,,,},
{"ttl",ttlCommand,,"rF",,NULL,,,,,},
{"touch",touchCommand,-,"rF",,NULL,,,,,},
{"pttl",pttlCommand,,"rF",,NULL,,,,,},
{"persist",persistCommand,,"wF",,NULL,,,,,},
{"slaveof",slaveofCommand,,"ast",,NULL,,,,,},
{"role",roleCommand,,"lst",,NULL,,,,,},
{"debug",debugCommand,-,"as",,NULL,,,,,},
{"config",configCommand,-,"lat",,NULL,,,,,},
{"subscribe",subscribeCommand,-,"pslt",,NULL,,,,,},
{"unsubscribe",unsubscribeCommand,-,"pslt",,NULL,,,,,},
{"psubscribe",psubscribeCommand,-,"pslt",,NULL,,,,,},
{"punsubscribe",punsubscribeCommand,-,"pslt",,NULL,,,,,},
{"publish",publishCommand,,"pltF",,NULL,,,,,},
{"pubsub",pubsubCommand,-,"pltR",,NULL,,,,,},
{"watch",watchCommand,-,"sF",,NULL,,-,,,},
{"unwatch",unwatchCommand,,"sF",,NULL,,,,,},
{"cluster",clusterCommand,-,"a",,NULL,,,,,},
{"restore",restoreCommand,-,"wm",,NULL,,,,,},
{"restore-asking",restoreCommand,-,"wmk",,NULL,,,,,},
{"migrate",migrateCommand,-,"w",,migrateGetKeys,,,,,},
{"asking",askingCommand,,"F",,NULL,,,,,},
{"readonly",readonlyCommand,,"F",,NULL,,,,,},
{"readwrite",readwriteCommand,,"F",,NULL,,,,,},
{"dump",dumpCommand,,"r",,NULL,,,,,},
{"object",objectCommand,,"r",,NULL,,,,,},
{"client",clientCommand,-,"as",,NULL,,,,,},
{"eval",evalCommand,-,"s",,evalGetKeys,,,,,},
{"evalsha",evalShaCommand,-,"s",,evalGetKeys,,,,,},
{"slowlog",slowlogCommand,-,"a",,NULL,,,,,},
{"script",scriptCommand,-,"s",,NULL,,,,,},
{"time",timeCommand,,"RF",,NULL,,,,,},
{"bitop",bitopCommand,-,"wm",,NULL,,-,,,},
{"bitcount",bitcountCommand,-,"r",,NULL,,,,,},
{"bitpos",bitposCommand,-,"r",,NULL,,,,,},
{"wait",waitCommand,,"s",,NULL,,,,,},
{"command",commandCommand,,"lt",,NULL,,,,,},
{"geoadd",geoaddCommand,-,"wm",,NULL,,,,,},
{"georadius",georadiusCommand,-,"w",,NULL,,,,,},
{"georadiusbymember",georadiusByMemberCommand,-,"w",,NULL,,,,,},
{"geohash",geohashCommand,-,"r",,NULL,,,,,},
{"geopos",geoposCommand,-,"r",,NULL,,,,,},
{"geodist",geodistCommand,-,"r",,NULL,,,,,},
{"pfselftest",pfselftestCommand,,"a",,NULL,,,,,},
{"pfadd",pfaddCommand,-,"wmF",,NULL,,,,,},
{"pfcount",pfcountCommand,-,"r",,NULL,,-,,,},
{"pfmerge",pfmergeCommand,-,"wm",,NULL,,-,,,},
{"pfdebug",pfdebugCommand,-,"w",,NULL,,,,,},
{"post",securityWarningCommand,-,"lt",,NULL,,,,,},
{"host:",securityWarningCommand,-,"lt",,NULL,,,,,},
{"latency",latencyCommand,-,"aslt",,NULL,,,,,}
};

而这些命令会存放在一个字典中 dict *commands

void populateCommandTable(void) {
int j;
int numcommands = sizeof(redisCommandTable)/sizeof(struct redisCommand); for (j = ; j < numcommands; j++) {
struct redisCommand *c = redisCommandTable+j;
char *f = c->sflags;
int retval1, retval2; while(*f != '\0') {
switch(*f) {
case 'w': c->flags |= CMD_WRITE; break;
case 'r': c->flags |= CMD_READONLY; break;
case 'm': c->flags |= CMD_DENYOOM; break;
case 'a': c->flags |= CMD_ADMIN; break;
case 'p': c->flags |= CMD_PUBSUB; break;
case 's': c->flags |= CMD_NOSCRIPT; break;
case 'R': c->flags |= CMD_RANDOM; break;
case 'S': c->flags |= CMD_SORT_FOR_SCRIPT; break;
case 'l': c->flags |= CMD_LOADING; break;
case 't': c->flags |= CMD_STALE; break;
case 'M': c->flags |= CMD_SKIP_MONITOR; break;
case 'k': c->flags |= CMD_ASKING; break;
case 'F': c->flags |= CMD_FAST; break;
default: serverPanic("Unsupported command flag"); break;
}
f++;
} retval1 = dictAdd(server.commands, sdsnew(c->name), c);
/* Populate an additional dictionary that will be unaffected
* by rename-command statements in redis.conf. */
retval2 = dictAdd(server.orig_commands, sdsnew(c->name), c);
serverAssert(retval1 == DICT_OK && retval2 == DICT_OK);
}
}

call 执行具体命令,并且传播命令,等等。

void call(client *c, int flags) {
long long dirty, start, duration;
int client_old_flags = c->flags; /* Sent the command to clients in MONITOR mode, only if the commands are
* not generated from reading an AOF. */
if (listLength(server.monitors) &&
!server.loading &&
!(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN)))
{
replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
} /* Initialization: clear the flags that must be set by the command on
* demand, and initialize the array for additional commands propagation. */
c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
redisOpArrayInit(&server.also_propagate); /* Call the command. */
dirty = server.dirty;
start = ustime();
c->cmd->proc(c);
duration = ustime()-start;
dirty = server.dirty-dirty;
if (dirty < ) dirty = ; /* When EVAL is called loading the AOF we don't want commands called
* from Lua to go into the slowlog or to populate statistics. */
if (server.loading && c->flags & CLIENT_LUA)
flags &= ~(CMD_CALL_SLOWLOG | CMD_CALL_STATS); /* If the caller is Lua, we want to force the EVAL caller to propagate
* the script if the command flag or client flag are forcing the
* propagation. */
if (c->flags & CLIENT_LUA && server.lua_caller) {
if (c->flags & CLIENT_FORCE_REPL)
server.lua_caller->flags |= CLIENT_FORCE_REPL;
if (c->flags & CLIENT_FORCE_AOF)
server.lua_caller->flags |= CLIENT_FORCE_AOF;
} /* Log the command into the Slow log if needed, and populate the
* per-command statistics that we show in INFO commandstats. */
if (flags & CMD_CALL_SLOWLOG && c->cmd->proc != execCommand) {
char *latency_event = (c->cmd->flags & CMD_FAST) ?
"fast-command" : "command";
latencyAddSampleIfNeeded(latency_event,duration/);
slowlogPushEntryIfNeeded(c->argv,c->argc,duration);
}
if (flags & CMD_CALL_STATS) {
c->lastcmd->microseconds += duration;
c->lastcmd->calls++;
} /* Propagate the command into the AOF and replication link */
if (flags & CMD_CALL_PROPAGATE &&
(c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP)
{
int propagate_flags = PROPAGATE_NONE; /* Check if the command operated changes in the data set. If so
* set for replication / AOF propagation. */
if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL); /* If the client forced AOF / replication of the command, set
* the flags regardless of the command effects on the data set. */
if (c->flags & CLIENT_FORCE_REPL) propagate_flags |= PROPAGATE_REPL;
if (c->flags & CLIENT_FORCE_AOF) propagate_flags |= PROPAGATE_AOF; /* However prevent AOF / replication propagation if the command
* implementatino called preventCommandPropagation() or similar,
* or if we don't have the call() flags to do so. */
if (c->flags & CLIENT_PREVENT_REPL_PROP ||
!(flags & CMD_CALL_PROPAGATE_REPL))
propagate_flags &= ~PROPAGATE_REPL;
if (c->flags & CLIENT_PREVENT_AOF_PROP ||
!(flags & CMD_CALL_PROPAGATE_AOF))
propagate_flags &= ~PROPAGATE_AOF; /* Call propagate() only if at least one of AOF / replication
* propagation is needed. */
if (propagate_flags != PROPAGATE_NONE)
propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
} /* Restore the old replication flags, since call() can be executed
* recursively. */
c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
c->flags |= client_old_flags &
(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP); /* Handle the alsoPropagate() API to handle commands that want to propagate
* multiple separated commands. Note that alsoPropagate() is not affected
* by CLIENT_PREVENT_PROP flag. */
if (server.also_propagate.numops) {
int j;
redisOp *rop; if (flags & CMD_CALL_PROPAGATE) {
for (j = ; j < server.also_propagate.numops; j++) {
rop = &server.also_propagate.ops[j];
int target = rop->target;
/* Whatever the command wish is, we honor the call() flags. */
if (!(flags&CMD_CALL_PROPAGATE_AOF)) target &= ~PROPAGATE_AOF;
if (!(flags&CMD_CALL_PROPAGATE_REPL)) target &= ~PROPAGATE_REPL;
if (target)
propagate(rop->cmd,rop->dbid,rop->argv,rop->argc,target);
}
}
redisOpArrayFree(&server.also_propagate);
}
server.stat_numcommands++;
}

c->cmd->proc(c); 是调用具体的命令。以 'get name' 为例,c->cmd->proc = getCommand

int getGenericCommand(client *c) {
robj *o; if ((o = lookupKeyReadOrReply(c,c->argv[],shared.nullbulk)) == NULL)
return C_OK; if (o->type != OBJ_STRING) {
addReply(c,shared.wrongtypeerr);
return C_ERR;
} else {
addReplyBulk(c,o);
return C_OK;
}
} void getCommand(client *c) {
getGenericCommand(c);
}

执行命令,把响应添加到缓冲区

client 有 2 个输出缓冲区可用,一个是 char buf[PROTO_REPLY_CHUNK_BYTES],一个是 list *reply,首先使用 buf,当 buf 不足时,使用 reply

发送响应: (t_string.c) setGenericCommand -> (networking.c) addReply

void addReply(client *c, robj *obj) {
if (prepareClientToWrite(c) != C_OK) return; /* This is an important place where we can avoid copy-on-write
* when there is a saving child running, avoiding touching the
* refcount field of the object if it's not needed.
*
* If the encoding is RAW and there is room in the static buffer
* we'll be able to send the object to the client without
* messing with its page. */
if (sdsEncodedObject(obj)) {
if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
_addReplyObjectToList(c,obj);
} else if (obj->encoding == OBJ_ENCODING_INT) {
/* Optimization: if there is room in the static buffer for 32 bytes
* (more than the max chars a 64 bit integer can take as string) we
* avoid decoding the object and go for the lower level approach. */
if (listLength(c->reply) == && (sizeof(c->buf) - c->bufpos) >= ) {
char buf[];
int len; len = ll2string(buf,sizeof(buf),(long)obj->ptr);
if (_addReplyToBuffer(c,buf,len) == C_OK)
return;
/* else... continue with the normal code path, but should never
* happen actually since we verified there is room. */
}
obj = getDecodedObject(obj);
if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
_addReplyObjectToList(c,obj);
decrRefCount(obj);
} else {
serverPanic("Wrong obj->encoding in addReply()");
}
} int _addReplyToBuffer(client *c, const char *s, size_t len) {
size_t available = sizeof(c->buf)-c->bufpos; if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return C_OK; /* If there already are entries in the reply list, we cannot
* add anything more to the static buffer. */
if (listLength(c->reply) > ) return C_ERR; /* Check that the buffer has enough space available for this string. */
if (len > available) return C_ERR; memcpy(c->buf+c->bufpos,s,len);
c->bufpos+=len;
return C_OK;
}

那么客户端的响应是什么时候从缓冲区发送出去的呢?

redis 处理命令的过程

void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = ;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
}
上一篇:salesforce 零基础学习(四十二)简单文件上传下载


下一篇:Magento-找出没有图片的产品