Memcached源码分析之请求处理(状态机)

作者:Calix

一)上文

在上一篇线程模型的分析中,我们知道,worker线程和主线程都调用了同一个函数,conn_new进行事件监听,并返回conn结构体对象。最终有事件到达时,调用同一个函数event_handler最终来到执行drive_machine。

二)conn结构体

首先,很有必要地先分析一个结构体:conn

这个conn在memcached里面是这样一个角色,听名字也知道它代表一个“连接”,但这个“连接”不一定是已经连接上的连接,可以是监听中的连接,例如主线程在监听listen fd的时候,也通过conn_new创建了一个conn实例对象,而这个conn对象的conn_states值为conn_listening,代表“监听中的连接”。

而worker线程监听的client fd是已经连接上了,也为这个连接创建一个“conn”对象,而连接状态conn_states则不是conn_listening,最开始的时候为conn_cmd_new,听名字也知道,这个连接处于“新命令”状态。

每一个“连接”都有当前的状态,监听中,还是等待新命令中,还是后面会看到的“写数据”中,“关闭中”等等,所以这个conn结构体的定义是合理的。

所以最后总结出,无论是主线程监听listen fd还是worker线程监听client fd,只要是与客户端有关的fd的监听都以一个conn对象来表示。

下面大概分析一下conn的结构,(建议先大体看下各个字段的意义,具体到某个字段被使用时再详讲):

  1. typedef struct conn conn;
  2. struct conn {
  3. int sfd; //连接的socket fd
  4. sasl_conn_t *sasl_conn;
  5. bool authenticated;
  6. enum conn_states state; //当前的连接状态
  7. enum bin_substates substate;
  8. rel_time_t last_cmd_time;
  9. struct event event; // 监听的事件
  10. short ev_flags; //监听的事件 类型
  11. short which; /** which events were just triggered */ //刚触发的事件
  12. /**
  13. 读buffer会涉及两个方向上的“读”:
  14. 一个是从socket读进来到rbuf里面
  15. 一个是从rbuf里面把数据读出去解析,读buffer相当于一个中介,从socket读进来最终还是得被别人读出去解析,而
  16. rcurr工作指针与rbytes就是在rbuf数据被读出去解析的时候用到,用来控制可以读出去被解析的数据还剩余多少。
  17. */
  18. char *rbuf; /** buffer to read commands into */ //读buffer
  19. char *rcurr; /** but if we parsed some already, this is where we stopped */ //读buffer的当前指针
  20. int rsize; /** total allocated size of rbuf */ //读buffer大小
  21. int rbytes; /** how much data, starting from rcur, do we have unparsed */ //剩余buffer字节数
  22. //下面4个属性和上面4个类似
  23. char *wbuf;
  24. char *wcurr;
  25. int wsize;
  26. int wbytes;
  27. /** which state to go into after finishing current write */
  28. enum conn_states write_and_go; //完成当前写操作后,连接状态将会置为此状态
  29. void *write_and_free; /** free this memory after finishing writing */
  30. char *ritem; /** when we read in an item's value, it goes here */ //这个指针指向item结构体中data中的value地址
  31. int rlbytes; //尚未读完item的data的value的字节数
  32. void *item; /* for commands set/add/replace */ //当执行set/add/replace 命令时,此指针用于指向分配的item空间
  33. /* data for the swallow state */
  34. int sbytes; /* how many bytes to swallow */
  35. //下面是往socket写出数据时用的字段
  36. struct iovec *iov;
  37. int iovsize; /* number of elements allocated in iov[] */
  38. int iovused; /* number of elements used in iov[] */
  39. struct msghdr *msglist;
  40. int msgsize; /* number of elements allocated in msglist[] */
  41. int msgused; /* number of elements used in msglist[] */
  42. int msgcurr; /* element in msglist[] being transmitted now */
  43. int msgbytes; /* number of bytes in current msg */
  44. item **ilist; /* list of items to write out */
  45. int isize;
  46. item **icurr;
  47. int ileft;
  48. char **suffixlist;
  49. int suffixsize;
  50. char **suffixcurr;
  51. int suffixleft;
  52. enum protocol protocol; /* which protocol this connection speaks */
  53. enum network_transport transport; /* what transport is used by this connection */
  54. //UDP相关的字段
  55. int request_id; /* Incoming UDP request ID, if this is a UDP "connection" */
  56. struct sockaddr_in6 request_addr; /* udp: Who sent the most recent request */
  57. socklen_t request_addr_size;
  58. unsigned char *hdrbuf; /* udp packet headers */
  59. int hdrsize; /* number of headers' worth of space is allocated */
  60. bool noreply; /* True if the reply should not be sent. */
  61. /* current stats command */
  62. struct {
  63. char *buffer;
  64. size_t size;
  65. size_t offset;
  66. } stats;
  67. // 二进制相关的字段
  68. protocol_binary_request_header binary_header;
  69. uint64_t cas; /* the cas to return */
  70. short cmd; /* current command being processed */
  71. int opaque;
  72. int keylen;
  73. conn *next; /* Used for generating a list of conn structures */
  74. LIBEVENT_THREAD *thread; /* Pointer to the thread object serving this connection */
  75. };
  76. /* conn_states是一个枚举:*/
  77. enum conn_states {
  78. conn_listening, /**< the socket which listens for connections */
  79. conn_new_cmd, /**< Prepare connection for next command */
  80. conn_waiting, /**< waiting for a readable socket */
  81. conn_read, /**< reading in a command line */
  82. conn_parse_cmd, /**< try to parse a command from the input buffer */
  83. conn_write, /**< writing out a simple response */
  84. conn_nread, /**< reading in a fixed number of bytes */
  85. conn_swallow, /**< swallowing unnecessary bytes w/o storing */
  86. conn_closing, /**< closing this connection */
  87. conn_mwrite, /**< writing out many items sequentially */
  88. conn_closed, /**< connection is closed */
  89. conn_max_state /**< Max state value (used for assertion) */
  90. };

知道conn的意义之后,主线程和worker线程都调用conn_new监听fd并创建conn对象就合情合理了,大家都有conn对象,只是状态不一样,event_handler被触发,调用drive_machine,进入不一样的case完成不一样的操作。

这句话压缩来说:“根据状态不同去做不同的事情”,这种工作方式其实就是下面要讲的“状态机”。

三)状态机

状态机drive_machine函数是worker线程网络请求进行业务逻辑处理的核心。

它的实现方式是:

一个while循环里面有一个巨大的switch case,根据连接对象 conn当前的连接状态conn_state,进入不同的case,而每个case可能会改变conn的连接状态,也就是说在这个while+switch中,conn会不断的发生状态转移,最后被分发到合适的case上作处理。可以理解为,这里是一个有向图,每个case是一个顶点,有些case通过改变conn对象的连接状态让程序在下一次循环中进入另一个case,几次循环后程序最终进入到“无出度的顶点”然后结束状态机,这里的无出度的顶点就是带设置stop=true的case分支。

看下大概的代码结构:

  1. static void drive_machine(conn *c) {
  2. while (!stop) {
  3. switch(c->state) {
  4. case conn_listening:
  5. //。。。。
  6. case conn_waiting:
  7. //。。。
  8. stop = true; break;
  9. //。。。
  10. }
  11. }
  12. }

主线程状态机的行为我们已经知道了,永远只会是conn_listening状态,永远只会进入drive_machine的conn_listening分支,accept连接把client fd 通过dispatch_conn_new函数分发给worker线程。

下面我们来看一下worker线程执行状态机:

当主线程调用dispatch_conn_new的时候,worker线程创建conn对象,初始状态为conn_new_cmd。所以当有worker线程监听的client fd有请求过来时,例如客户端发了一行命令(set xxx\r\n)会进入conn_new_cmd分支:

  1. case conn_new_cmd:
  2. /*
  3. 这里的reqs是请求的意思,其实叫“命令”更准确。一次event发生,有可能包含多个命令,
  4. 从client fd里面read到的一次数据,不能保证这个数据只是包含一个命令,有可能是多个
  5. 命令数据堆在一起的一次事件通知。这个nreqs是用来控制一次event最多能处理多少个命令。
  6. */
  7. --nreqs;
  8. if (nreqs >= 0) {
  9. /**
  10. 准备执行命令。为什么叫reset cmd,reset_cmd_handler其实做了一些解析执行命令之前
  11. 的初始化动下一个,都会重新进入这个case作。而像上面说的,一次event有可能有多个命令,每执行一个命令,如果还有
  12. conn_new_cmd,reset一下再执行下一个命令。
  13. */
  14. reset_cmd_handler(c);
  15. } else {
  16. //。。。
  17. }
  18. break;

上面的nreqs在这里暂不详细分析。当client fd第一次有请求过来的时候,会进入reset_cmd_handler函数:

  1. static void reset_cmd_handler(conn *c) {
  2. c->cmd = -1;
  3. c->substate = bin_no_state;
  4. if(c->item != NULL) {
  5. item_remove(c->item);
  6. c->item = NULL;
  7. }
  8. conn_shrink(c);
  9. //第一次有请求过来触发到此函数时,c->rbytes为0
  10. if (c->rbytes > 0) {
  11. conn_set_state(c, conn_parse_cmd);
  12. } else {
  13. conn_set_state(c, c
  14. onn_waiting);  //第一次请求进入此分支
  15. }
  16. }

我们在conn_new函数里面把c->rbytes被始化为0,而直至此我们也没有看到这个c->rbytes有被重新赋新值,所以其实第一次有请求过来,这个值还是0,所以进入else分支,即执行conn_set_state(c,conn_waiting);然后重新回到状态机执行下一次循环,进入conn_waiting分支:

  1. case conn_waiting:
  2. if (!update_event(c, EV_READ | EV_PERSIST)) {
  3. //。。。
  4. }
  5. conn_set_state(c, conn_read);
  6. stop = true;
  7. break;

在conn_waiting分支你会发现,这里的代码仅仅是把状态改变conn_read然后就stop=true,结束状态机了!没错,退出while循环了!这次事件触发就此结束了!
你会觉得很奇怪,我客户端明明发了一个请求,(set xxx\r\n),你什么都没处理就只是把连接状态改成conn_read就完事了?!没错,至少这一次状态机的执行行为是这样!

到底是怎么回事?其实这里是利用了一点:libevent的epoll默认是“水平触发”!也就是说,客户端发来一个set xxx\r\n,我这边一天没有read,epoll还会有下一次通知,也就是说,这个请求有两次事件通知!第一次通知的作用仅是为了把连接状态改为conn_read! 当worker线程因为同一个client fd同一个请求收到第二次通知的时候,再次执行状态机,然后进入conn_read分支。

为了验证这一点,我在drive_machine函数代码执行的开头处打了一下log:

  1. static void drive_machine(conn *c) {
  2. fprintf(stderr, "event arrive!\n");

然后重新编译memcached运行,测试一下是否worker线程事件通知发生了两次(左边是服务端,右边是客户端):

客户端telnet发起连接,event_base通知主线程,所以这里会有一次调用drive_machine的情况:

Memcached源码分析之请求处理(状态机)

客户端输入“set testkey 0 0 4”的命令后:

Memcached源码分析之请求处理(状态机)

可以看到当服务端收到命令后,先利用第一次事件通知(上面图中的第二个event arrive)把状态置为conn_read,然后等待第二次事件通知。非常快地,第二次事件通知就到达(上面图中的第三个event arrive),然后进入conn_read状态继续执行。

下面我们看一下收到第二次通知的时候进入conn_read分支后的代码:

  1. case conn_read:
  2. res = IS_UDP(c->transport) ? try_read_udp(c) : try_read_network(c);
  3. switch (res) {
  4. case READ_NO_DATA_RECEIVED:
  5. conn_set_state(c, conn_waiting);
  6. break;
  7. case READ_DATA_RECEIVED:
  8. conn_set_state(c, conn_parse_cmd);
  9. break;
  10. case READ_ERROR:
  11. conn_set_state(c, conn_closing);
  12. break;
  13. case READ_MEMORY_ERROR:
  14. break;
  15. }
  16. break;

进入conn_read此时才调用函数try_read_network函数读出请求(set xxx\r\n):

  1. static enum try_read_result try_read_network(conn *c) {
  2. enum try_read_result gotdata = READ_NO_DATA_RECEIVED;
  3. int res;
  4. int num_allocs = 0;
  5. assert(c != NULL);
  6. if (c->rcurr != c->rbuf) {
  7. if (c->rbytes != 0) /* otherwise there's nothing to copy */
  8. memmove(c->rbuf, c->rcurr, c->rbytes);
  9. c->rcurr = c->rbuf;
  10. }
  11. while (1) {
  12. if (c->rbytes >= c->rsize) {//读buffer空间扩充
  13. //。。。
  14. }
  15. int avail = c->rsize - c->rbytes; //读buffer的空间还剩余多少大小可以用
  16. res = read(c->sfd, c->rbuf + c->rbytes, avail); //往剩下的可用的地方里塞
  17. if (res > 0) {
  18. gotdata = READ_DATA_RECEIVED;
  19. /**
  20. rbytes是当前指针rcurr至读buffer末尾的数据大小,这里可简单地理解为对rbytes的初始化。
  21. */
  22. c->rbytes += res;
  23. if (res == avail) { //可能还没读完,此时读buffer可用空间满了,那么下次循环会进行读buffer空间扩充
  24. continue;
  25. } else {
  26. break; //socket的可读数据都读完了
  27. }
  28. }
  29. //。。。
  30. }
  31. return gotdata;
  32. }

try_read_network函数就是从socket中把数据读到c->rbuf中去而已,同时初始化一些变量例如rbytes等,读取数据成功则返回READ_DATA_RECEIVED,状态机 conn_set_state(c, conn_parse_cmd);进入conn_parse_cmd状态:

  1. case conn_parse_cmd :
  2. /**
  3. try_read_network后,到达conn_parse_cmd状态,但try_read_network并不确保每次到达
  4. 的数据都足够一个完整的cmd(ascii协议情况下往往是没有"\r\n",即回车换行),
  5. 所以下面的try_read_command之所以叫try就是这个原因,
  6. 当读到的数据还不够成为一个cmd的时候,返回0,conn继续进入conn_waiting状态等待更多的数据到达。
  7. */
  8. if (try_read_command(c) == 0) {
  9. /* wee need more data! */
  10. conn_set_state(c, conn_waiting);
  11. }
  12. break;

进行conn_parse_cmd主要是调用try_read_command函数读取命令,上面注释也说明了数据不够一个cmd的情况,下面我们进入try_read_command,看看try_read_command不返回0时,也就是足够一个cmd后是怎么解析这个cmd的(只说明tcp ascii协议的情况):

  1. static int try_read_command(conn *c) {
  2. char *el, *cont;
  3. if (c->rbytes == 0)  //读buffer没有待解析的数据
  4. return 0;
  5. el = memchr(c->rcurr, '\n', c->rbytes); //找第一个命令的末尾,即换行符
  6. if (!el) {
  7. //。。。
  8. /*
  9. 如果没有找到换行符,则说明读到的数据还不足以成为一个完整的命令,
  10. 返回0
  11. */
  12. return 0;
  13. }
  14. cont = el + 1; //下一个命令的开头
  15. /*
  16. 下面这个if的作用是把el指向当前命令最后一个有效字符的下一个字符,即\r
  17. 目的是为了在命令后面插上一个\0,字符串结束符。
  18. 例如 GET abc\r\n******,变成GET abc\0\n*****,这样以后读出的字符串就是一个命令。
  19. */
  20. if ((el - c->rcurr) > 1 && *(el - 1) == '\r') {
  21. el--;
  22. }
  23. *el = '\0';
  24. c->last_cmd_time = current_time;
  25. process_command(c, c->rcurr); //执行命令。分析详见process_command
  26. //当前命令执行完之后,把当前指针rcurr指向 下一个命令的开头,并调用rbytes(剩余未处理字节数大小)
  27. //逻辑上相当于把已处理的命令去掉。
  28. c->rbytes -= (cont - c->rcurr);
  29. c->rcurr = cont;
  30. }
  31. return 1;
  32. }

上面try_read_command把命令读出(其实只是简单地找出一个完整的命令,在后面加个\0而已)。

在这里插一下memcached的SET命令的协议,或者你可以看memcached/doc/protocol.txt中的说明:

完成一个SET命令,其实需要两行,也就是需要按两次回车换行“\r\n”,第一行叫“命令行”,格式是SET key flags exptime bytes\r\n,如SET name 0 0 5\r\n, 键为name,flags标志位可暂时不管,超时设为0,value的字节长度是4。然后才有第二行叫“数据行”,格式为:value\r\n,例如:calix\r\n。这两行分别敲下去,SET命令才算完成。

所以处理SET命令时上面的try_read_command首先处理的是SET name 0 0 5\r\n这个“命令行”。

看看进入process_command函数如何执行:

  1. /**
  2. 这里就是对命令的解析和执行了
  3. (其实准确来说,这里只是执行了命令的一半(例如如果是SET命令,则是“命令行”部分),
  4. 然后根据命令类型再次改变conn_state使程序再次进入状态机,完成命令的
  5. 另一半工作,后面详说)
  6. command此时的指针值等于conn的rcurr
  7. */
  8. static void process_command(conn *c, char *command) {
  9. token_t tokens[MAX_TOKENS];
  10. size_t ntokens;
  11. int comm; //命令类型
  12. c->msgcurr = 0;
  13. c->msgused = 0;
  14. c->iovused = 0;
  15. if (add_msghdr(c) != 0) {
  16. out_of_memory(c, "SERVER_ERROR out of memory preparing response");
  17. return;
  18. }
  19. /**
  20. 下面这个tokenize_command是一个词法分析,把command分解成一个个token
  21. */
  22. ntokens = tokenize_command(command, tokens, MAX_TOKENS);
  23. //下面是对上面分解出来的token再进行语法分析,解析命令,下面的comm变量为最终解析出来命令类型
  24. if (ntokens >= 3 &&
  25. ((strcmp(tokens[COMMAND_TOKEN].value, "get") == 0) ||
  26. (strcmp(tokens[COMMAND_TOKEN].value, "bget") == 0))) {
  27. process_get_command(c, tokens, ntokens, false);
  28. } else if ((ntokens == 6 || ntokens == 7) &&
  29. ((strcmp(tokens[COMMAND_TOKEN].value, "add") == 0 && (comm = NREAD_ADD)) ||
  30. (strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = NREAD_SET)) ||
  31. (strcmp(tokens[COMMAND_TOKEN].value, "replace") == 0 && (comm = NREAD_REPLACE)) ||
  32. (strcmp(tokens[COMMAND_TOKEN].value, "prepend") == 0 && (comm = NREAD_PREPEND)) ||
  33. (strcmp(tokens[COMMAND_TOKEN].value, "append") == 0 && (comm = NREAD_APPEND)) )) {
  34. //add/set/replace/prepend/append为“更新”命令,调用同一个函数执行命令。详见process_update_command定义处
  35. process_update_command(c, tokens, ntokens, comm, false);
  36. }
  37. //。。。
  38. }

上面的代码可以看出首先我们要对命令进行“解析”,词法语法分析等等(属于编译原理知识,在这不详讲),最终我们的set name 0 0 5\r\n命令会进入process_update_command函数中执行:

static void process_update_command(conn *c, token_t *tokens, const size_t ntokens, int comm, bool handle_cas) {
if (tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
out_string(c, "CLIENT_ERROR bad command line format"); //key过长,out_string函数的作用是输出响应,
//详见out_string定义处
return;
}
key = tokens[KEY_TOKEN].value; //键名
nkey = tokens[KEY_TOKEN].length; //键长度
//下面这个if同时把命令相应的参数(如缓存超时时间等)赋值给相应变量:exptime_int等
if (! (safe_strtoul(tokens[2].value, (uint32_t *)&flags)
&& safe_strtol(tokens[3].value, &exptime_int)
&& safe_strtol(tokens[4].value, (int32_t *)&vlen))) {
out_string(c, "CLIENT_ERROR bad command line format");
return;
}
exptime = exptime_int;
if (exptime < 0)
exptime = REALTIME_MAXDELTA + 1;
//在这里执行内存分配工作。详见内存管理篇
it = item_alloc(key, nkey, flags, realtime(exptime), vlen);
ITEM_set_cas(it, req_cas_id);
c->item = it; //将item指针指向分配的item空间
c->ritem = ITEM_data(it); //将 ritem 指向 it->data中要存放 value 的空间地址
c->rlbytes = it->nbytes; //data的大小
c->cmd = comm; //命令类型
conn_set_state(c, conn_nread); //继续调用状态机,执行命令的另一半工作。
}

process_update_command函数最终执行了item_alloc为我们要set的数据(称为item)分配了内存。同时,为c对象赋了相应的一些值。

但是其实这里仅仅是为item分配了空间,还没有把value塞进去,因为我们仅仅执行了SET命令的“命令行“部分,根据“命令行”部分的信息分配空间。代码最后一行看到在这里,我们又把c的状态变成了conn_nread,等“数据行”达到,epoll事件触发状态机下一次循环进入conn_nread分支,其实就是完成SET命令的第二部分,读出“数据行”:

  1. case conn_nread:
  2. /**
  3. 由process_update_command执行后进入此状态,process_update_command函数只执行了add/set/replace 等命令的一半,
  4. 剩下的一半由这里完成。
  5. 例如如果是上面的set命令,process_update_command只完成了“命令行”部分,分配了item空间,
  6. 但还没有把value塞到对应的 item中去。因此,在这一半要完成的动作就是把value的数据从socket中读出来,
  7. 塞到刚拿到的item空间中去
  8. */
  9. /*
  10. 下面的rlbytes字段表示要读的“value数据”还剩下多少字节 (注意与"rbytes"的区别)
  11. 如果是第一次由process_update_command进入到此,rlbytes此时在process_update_command中被初始化为item->nbytes,
  12. 即value的总字节数,SET name 0 0 5\r\n中的5。
  13. */
  14. if (c->rlbytes == 0) {
  15. /**
  16. 注意rlbytes为0才读完,否则状态机一直会进来这个conn_nread分支继续读value数据,
  17. 读完就调用complete_nread完成收尾工作,程序会跟着complete_nread进入下一个
  18. 状态。所以执行完complete_nread会break;
  19. */
  20. complete_nread(c);
  21. break;
  22. }
  23. //如果还有数据没读完,继续往下执行。可知,下面的动作就是继续从buffer中读value数据往item中的data的value位置塞。
  24. if (c->rbytes > 0) {
  25. /**
  26. 进入到这个if,是因为有可能先前读到的buffer已经有“数据行”部分,因为一次事件通知,
  27. 不保证socket可读数据只有一个\r\n。
  28. */
  29. /**
  30. 取rbytes与rlbytes中最小的值。
  31. 为啥?
  32. 因为这里我们的目的是剩下的还没读的value的字节,而rlbytes代表的是还剩下的字节数
  33. 如果rlbytes比rbytes小,只读rlbytes长度就够了,rbytes中多出来的部分不是我们这个时候想要的
  34. 如果rbytes比rlbytes小,即使你要rlbytes这么多,但buffer中没有这么多给你读。
  35. */
  36. int tocopy = c->rbytes > c->rlbytes ? c->rlbytes : c->rbytes;
  37. if (c->ritem != c->rcurr) {
  38. memmove(c->ritem, c->rcurr, tocopy); //往分配的item中塞,即为key设置value的过程
  39. }
  40. c->ritem += tocopy;
  41. c->rlbytes -= tocopy;
  42. c->rcurr += tocopy;
  43. c->rbytes -= tocopy;
  44. if (c->rlbytes == 0) {
  45. break;
  46. }
  47. }
  48. //这里往往是我们先前读到buffer的数据还没足够的情况下,从socket中读。
  49. res = read(c->sfd, c->ritem, c->rlbytes);//往分配的item中塞,即为key设置value的过程
  50. if (res > 0) {
  51. if (c->rcurr == c->ritem) {
  52. c->rcurr += res;
  53. }
  54. c->ritem += res;
  55. c->rlbytes -= res;
  56. break;
  57. }

上面主要通过这一行 res = read(c->sfd, c->ritem, c->rlbytes); 把value塞到刚分配出来的item空间,完成“数据行”部分的工作,逻辑上就是对key“赋值”。赋值结束后,调用complete_nread做一些收尾的工作,在本篇“状态机”篇只需了解它的作用是向客户端输出命令执行结果(即往socket写“STORED”):

  1. static void complete_nread(conn *c) {
  2. //。。。
  3. complete_nread_ascii(c);
  4. //。。。
  5. }
  6. static void complete_nread_ascii(conn *c) {
  7. ret = store_item(it, comm, c);
  8. switch (ret)
  9. {
  10. case STORED:
  11. out_string(c, "STORED");
  12. break;
  13. //。。。
  14. }
  15. //。。。
  16. }
  17. static void out_string(conn *c, const char *str) {
  18. size_t len;
  19. c->msgcurr = 0;
  20. c->msgused = 0;
  21. c->iovused = 0;
  22. add_msghdr(c);
  23. len = strlen(str);
  24. memcpy(c->wbuf, str, len);
  25. memcpy(c->wbuf + len, "\r\n", 2);
  26. c->wbytes = len + 2;
  27. c->wcurr = c->wbuf;
  28. conn_set_state(c, conn_write);
  29. c->write_and_go = conn_new_cmd;
  30. return;
  31. }

进入状态机conn_write状态进行输出:

  1. case conn_write:
  2. //。。。
  3. /* fall through... */
  4. case conn_mwrite:
  5. transmit(c);
  6. //。。。
  7. static enum transmit_result transmit(conn *c) {
  8. //。。。
  9. res = sendmsg(c->sfd, m, 0);
  10. //。。。
  11. }

最后通过调用sendmsg把我们的”STORED”字符串响应给客户端。

附上 处理 SET 命令状态机的状态转换图:

Memcached源码分析之请求处理(状态机)

本文中我们分析了memcached是怎么利用状态机的方式对请求进行解析和处理,以及SET命令的代码实现细节。而在执行SET命令的时候,我们知道会调用item_alloc函数给数据分配空间,而到底item_alloc背后是怎么实现的?请看下一篇:《Memcached源码分析之内存管理》

上一篇:plsql查找不到带中文的纪录


下一篇:手把手带你走进MVP +Dagger2 + DataBinding+ Rxjava+Retrofit 的世界