twemproxy接收流程探索——twemproxy代码分析正编

本文旨在帮助大家探索出twemproxy接收流程的代码逻辑框架,有些具体的实现需要我们在未来抽空去探索或者大家自行探索。在这篇文章开始前,大家要做好一个小小的心理准备,由于twemproxy代码是一份优秀的c语言代码,为此,在twemproxy的代码中会大篇幅使用c指针。但是不论是普通类型的指针还是函数指针,都可以让我们这些c语言使用者大饱眼福,生出一种“原来还可以这样写!!!”的快感。

数据结构

在探索twemproxy接收流程之前,我们必须对一些我们会用到的数据结构进行说明,以便我们更好地去探索,这边在讲解结构时,仅仅讲解与twemproxy接收流程相关的代码,其他代码暂时不进行剖析。

mbuf

在nc_mbuf.h里

 struct mbuf {
uint32_t magic; /* mbuf magic (const) 这个值不是很理解是什么意思,一般是0xdeadbeef*/
STAILQ_ENTRY(mbuf) next; /* next mbuf 下一块mbuf,代码里所有的mbuf几乎都是以单向链表的形式存储的*/
uint8_t *pos; /* read marker 表示这块mbuf已经读到那个字节了*/
uint8_t *last; /* write marker 表示这块mbuf已经写到哪个字节*/
uint8_t *start; /* start of buffer (const) 表示这块mbuf的起始位置*/
uint8_t *end; /* end of buffer (const) 表示这块mbuf的结束位置*/
};
STAILQ_HEAD(mhdr, mbuf); /*mhdr是mbuf单向队列的队列头部*/

这里要对mbuf解释几句,这里涉及到nc_mbuf.c里的代码:

1.mbuf的每一块可以通过配置规定其大小 ,可以说每一块mbuf的大小都是一个固定值,为此在生成时mbuf会去申请一个固定大小的内存,如果这个大小是mbuf_chunk_size,那么end = start + mbuf_chunk_size - sizeof(struct mbuf),为此start,end,以及magic都是定值。

2.mbuf在申请后一般不会被释放,在使用完后会被放入static struct mhdr free_mbufq这个队列中,一旦要使用mbuf时首先从free_mbufq中取出未使用的mbuf,如果这个队列为空时,它才会去向系统申请新的mbuf。

msg

在nc_message.h里

 struct msg {
/*
...
*/
struct conn *owner; /* message owner - client | server 服务端或客户端连接*/
/*
...
*/
struct mhdr mhdr; /* message mbuf header mbuf单向队列的队列头部*/
uint32_t mlen; /* message length mbuf字节长度*/
/*
...
*/
uint8_t *pos; /* parser position marker 现在解析到哪个个字节*/
msg_parse_t parser; /* message parser 消息解析函数指针*/
msg_parse_result_t result; /* message parsing result 消息解析结果*/
/*
...
*/ };

msg是用来存储每一条发送过来的redis包的内容,一般一个msg对应一个redis包,所有收发网络数据都存储在mhdr中。

conn

在connection.h中

 struct conn {
/*
...
*/
int sd; /* socket descriptor 套接字描述符*/
/*
...
*/
conn_recv_t recv; /* recv (read) handler 接收msg函数指针*/
conn_recv_next_t recv_next; /* recv next message handler 接收下一个msg的函数指针*/
conn_recv_done_t recv_done; /* read done handler 接收完成的函数指针*/
/*
...
*/
size_t recv_bytes; /* received (read) bytes 接收数据的字节数*/
size_t send_bytes; /* sent (written) bytes 发送数据的字节数*/
/*
...
*/
err_t err; /* connection errno 接受数据错误*/
unsigned recv_active:; /* recv active? 是否在接收数据*/
unsigned recv_ready:; /* recv ready? 是否准备接收数据*/
/*
...
*/
unsigned eof:; /* eof? aka passive close? 数据读到尾部*/
unsigned done:; /* done? aka close? 完成数据接收*/
unsigned redis:; /* redis? 网络协议是不是redis*/
/*
...
*/
};

conn是与服务端或客户端的连接,用于管理连接上的所有事件和网络数据

接收流程

首先看下主要流程,很简单的代码在nc_message.c中的msg_recv

 rstatus_t
msg_recv(struct context *ctx, struct conn *conn)
{
rstatus_t status;
struct msg *msg; ASSERT(conn->recv_active); conn->recv_ready = ;//表示准备接收网络数据
do {
msg = conn->recv_next(ctx, conn, true);
if (msg == NULL) {
return NC_OK;
} status = msg_recv_chain(ctx, conn, msg);//接收函数链,在这个流程中会改变conn->recv_ready的值,表示本次接收流程终止
if (status != NC_OK) {
return status;
}
} while (conn->recv_ready);//一旦不准备接收网络数据,就停止 return NC_OK;
}

在这个代码中我们会发现一个conn->recv_next,目前我们只要知道它是准备接收下一个msg的函数,不需要知道他的具体实现,因为他在《twemproxy代码框架概述——剖析twemproxy代码前编》提到的客户层服务层扮演的角色是不同的,为此,实现也是不同的,这里主要指的是《twemproxy代码框架概述——剖析twemproxy代码前编》提到的模块1模块3,在这里我们居然看到了c语言的代码里出现了一个在面向对象语言中才有的特性——多态,在下面几篇文章的探索中会讲到,不小心做了广告,请无视上面的部分内容。

接下来我们来看msg_recv函数中的msg_recv_chain,同样也是一个框架

 static rstatus_t
msg_recv_chain(struct context *ctx, struct conn *conn, struct msg *msg)
{
rstatus_t status;
struct msg *nmsg;
struct mbuf *mbuf;
size_t msize;
ssize_t n; mbuf = STAILQ_LAST(&msg->mhdr, mbuf, next);//找到目前收到mbuf队列的最后一个mbuf
//如果这个mbuf满了或者为空,则取得一个空的mbuf,加入到msg->mhdr队列中
if (mbuf == NULL || mbuf_full(mbuf)) {
mbuf = mbuf_get();
if (mbuf == NULL) {
return NC_ENOMEM;
}
mbuf_insert(&msg->mhdr, mbuf);
msg->pos = mbuf->pos;//这时解析指针指向该mbuf的读取指针
}
ASSERT(mbuf->end - mbuf->last > );
msize = mbuf_size(mbuf); //计算剩余的mbuf的值msize n = conn_recv(conn, mbuf->last, msize);//读取最大为msize的网络数据
if (n < ) {
if (n == NC_EAGAIN) {
return NC_OK;
}
return NC_ERROR;
} ASSERT((mbuf->last + n) <= mbuf->end); mbuf->last += n; //将写指针偏移到正确的位置
msg->mlen += (uint32_t)n;
//解析网络数据内容,在其中将网络数据分成不同的msg,因为网络包可能黏合,可能会接收到不同的redis包
for (;;) {
status = msg_parse(ctx, conn, msg);//解析网络数据完成分包
if (status != NC_OK) {
return status;
} /* get next message to parse */
nmsg = conn->recv_next(ctx, conn, false);
if (nmsg == NULL || nmsg == msg) {
/* no more data to parse */
break;
} msg = nmsg;//使指针指向下一个包
} return NC_OK;
}

在前面我们看到在代码中大量使用了断言ASSERT,如ASSERT(mbuf->end - mbuf->last > 0),就表示该内存还没有被写满,查看这些断言会使我们对代码有更好的认识。同时,它也是一个很好的代码习惯

接着就是在connection.c中的接受函数conn_recv,比较简单,一些对于收发网络数据遇到的情况的处理值得学习

 ssize_t
conn_recv(struct conn *conn, void *buf, size_t size)
{
ssize_t n; ASSERT(buf != NULL);
ASSERT(size > );
ASSERT(conn->recv_ready); for (;;) {
n = nc_read(conn->sd, buf, size);//相当于read函数 log_debug(LOG_VERB, "recv on sd %d %zd of %zu", conn->sd, n, size);
//如果收到的数据不为空,一旦收到数据小于size,表示没有更多的数据能被读取,为此将conn->recv_ready = 0
if (n > ) {
if (n < (ssize_t) size) {
conn->recv_ready = ;
}
conn->recv_bytes += (size_t)n;
return n;
}
//如果收到的数据为空,表示没有更多的数据能被读取,为此将conn->recv_ready = 0
if (n == ) {
conn->recv_ready = ;
conn->eof = ;
log_debug(LOG_INFO, "recv on sd %d eof rb %zu sb %zu", conn->sd,
conn->recv_bytes, conn->send_bytes);
return n;
}
//如果收发数据出现不是EINTR的错误,表示收发数据断链或者遇到错误,为此也将conn->recv_ready = 0
if (errno == EINTR) {
log_debug(LOG_VERB, "recv on sd %d not ready - eintr", conn->sd);
continue;
} else if (errno == EAGAIN || errno == EWOULDBLOCK) {
conn->recv_ready = ;
log_debug(LOG_VERB, "recv on sd %d not ready - eagain", conn->sd);
return NC_EAGAIN;
} else {
conn->recv_ready = ;
conn->err = errno;
log_error("recv on sd %d failed: %s", conn->sd, strerror(errno));
return NC_ERROR;
}
} NOT_REACHED(); return NC_ERROR;
}

下面就是解析分包框架msg_parse

 static rstatus_t
msg_parse(struct context *ctx, struct conn *conn, struct msg *msg)
{
rstatus_t status; if (msg_empty(msg)) {
/* no data to parse */
conn->recv_done(ctx, conn, msg, NULL);
return NC_OK;
} msg->parser(msg);//解析函数器,这个我们会在后续的文章中提到,即完整的redis协议解析流程 switch (msg->result) {
case MSG_PARSE_OK:
status = msg_parsed(ctx, conn, msg);//解析一个包完成,进行分包
break; case MSG_PARSE_REPAIR:
status = msg_repair(ctx, conn, msg);//将受到的网络数据分到不同的buffer中
break; case MSG_PARSE_AGAIN:
status = NC_OK;
break; default:
status = NC_ERROR;
conn->err = errno;
break;
} return conn->err != ? NC_ERROR : status;
}

在这个代码中我们又会发现一个conn->recv_done,目前我们只要知道它是接收结束的函数,同样不需要知道他的具体实现,因为它也是在《twemproxy代码框架概述——剖析twemproxy代码前编》提到的客户层服务层扮演的角色是不同的,为此,实现也是不同的,这里主要指的是《twemproxy代码框架概述——剖析twemproxy代码前编》提到的模块1模块3

下面就是msg_parsed,用于解析一个包完成后分包

 static rstatus_t
msg_parsed(struct context *ctx, struct conn *conn, struct msg *msg)
{
struct msg *nmsg;
struct mbuf *mbuf, *nbuf; mbuf = STAILQ_LAST(&msg->mhdr, mbuf, next);
if (msg->pos == mbuf->last) {//正好结束分包
/* no more data to parse */
conn->recv_done(ctx, conn, msg, NULL);
return NC_OK;
} /*
* Input mbuf has un-parsed data. Split mbuf of the current message msg
* into (mbuf, nbuf), where mbuf is the portion of the message that has
* been parsed and nbuf is the portion of the message that is un-parsed.
* Parse nbuf as a new message nmsg in the next iteration.
*/
//下面的所有工作就是把mbuf收到的网络数据,将不属于这个包msg的而属于下个包nmsg的内容分割出去放到下一个包nmsg
nbuf = mbuf_split(&msg->mhdr, msg->pos, NULL, NULL);
if (nbuf == NULL) {
return NC_ENOMEM;
} nmsg = msg_get(msg->owner, msg->request, conn->redis);
if (nmsg == NULL) {
mbuf_put(nbuf);
return NC_ENOMEM;
}
mbuf_insert(&nmsg->mhdr, nbuf);
nmsg->pos = nbuf->pos; /* update length of current (msg) and new message (nmsg)*/
nmsg->mlen = mbuf_length(nbuf);
msg->mlen -= nmsg->mlen; conn->recv_done(ctx, conn, msg, nmsg); return NC_OK;
}

上面的流程可以用图1表示,我们可以看到图1中的mbuf收到了两个包的数据,分别是一个包msg(红色)的结尾和一个包nmsg(黄色)的开始,根据我们前文的说法一个msg对应一个包,为此必须把这个mbuf分割到到两个msg中。

twemproxy接收流程探索——twemproxy代码分析正编

图1.分包示意图

最后是分muf的msg_repair

 static rstatus_t
msg_repair(struct context *ctx, struct conn *conn, struct msg *msg)
{
struct mbuf *nbuf;
//取出一个新的nbuf去读取下轮的网络数据
nbuf = mbuf_split(&msg->mhdr, msg->pos, NULL, NULL);
if (nbuf == NULL) {
return NC_ENOMEM;
}
mbuf_insert(&msg->mhdr, nbuf);
msg->pos = nbuf->pos; return NC_OK;
}

在redis包中可能会存在多key的情况,一个msg中的mbuf具体是怎么存的,还需要完成对于redis协议的解读,我们才能明白为什么需要msg_repair,,在这里稍稍挖个坑。目前我们可以理解为它产生了一个新的nbuf去读下一轮的网络数据。

这样我们完成了整个接收流程的探索,至于发送流程需要在下几个篇章中完成。

总结

本文完成了对于twemproxy整个接收流程的探索,首先介绍了相关的数据结构——mbuf、msg以及conn,在下面的日子里我们会更多地去了解它们,在未来的解析中它们是主角,接着分析了接收流程中的各个函数msg_repair、msg_parse、msg_parsed、msg_recv_chain、msg_recv以及conn_recv,最后较为介绍了它们在接收中的作用,当然稍稍挖了几个坑,表示以后再填。下面我们会着重探索twemproxy的redis协议解析和twemproxy发送流程,敬请期待!!

另外,对于博文有问题的请大家在评论中留言与博主讨论,博主会及时回复的!!!!

上一篇:linux系统重启后提示An error occurred during the file system check.


下一篇:crypto 简单了解