Redis 6.0中新加入了多线程版本的网络通信。
从总体来看,在开启多线程的情况下,会创建IO线程进行读写。但是对于内容的修改,读取等依然是通过主线程完成的。
下面是从大佬偷来的一副时序图。https://zhuanlan.zhihu.com/p/144805500
通过时序图,可以按照顺序查看过程。
1.初始化
整个sever的初始化是通过 server.c/initServer函数完成的
其中可以看到里面做了一下比较重要的事情:
1. 创建了事件循环。事件循环是redis进行相应的事件处理的一个载体。将想要做的事件注册倒事件循环中,然后循环去处理。
2.aeCreateTimeEvent向事件循环中添加了时间事件。
3.createSocketAcceptHandler向事件循环中添加了文件事件。同时也是为监听的文件描述符注册了回调函数。之后会用到。
void initServer(void) {
....
// 创建时间循环。
server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
if (server.el == NULL) {
serverLog(LL_WARNING,
"Failed creating the event loop. Error message: '%s'",
strerror(errno));
exit(1);
}
server.db = zmalloc(sizeof(redisDb)*server.dbnum);
// 监听端口
/* Open the TCP listening socket for the user commands. */
if (server.port != 0 &&
listenToPort(server.port,&server.ipfd) == C_ERR) {
serverLog(LL_WARNING, "Failed listening on port %u (TCP), aborting.", server.port);
exit(1);
}
if (server.tls_port != 0 &&
listenToPort(server.tls_port,&server.tlsfd) == C_ERR) {
serverLog(LL_WARNING, "Failed listening on port %u (TLS), aborting.", server.tls_port);
exit(1);
}
/* Open the listening Unix domain socket. */
if (server.unixsocket != NULL) {
unlink(server.unixsocket); /* don't care if this fails */
server.sofd = anetUnixServer(server.neterr,server.unixsocket,
server.unixsocketperm, server.tcp_backlog);
if (server.sofd == ANET_ERR) {
serverLog(LL_WARNING, "Opening Unix socket: %s", server.neterr);
exit(1);
}
anetNonBlock(NULL,server.sofd);
anetCloexec(server.sofd);
}
// 向时间循环中添加时间事件。时间事件也就是一些定时任务,具体做什么可以查看serverCron函数的内容
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
serverPanic("Can't create event loop timers.");
exit(1);
}
/* Create an event handler for accepting new connections in TCP and Unix
* domain sockets. */
// 为监听的文件描述符注册回调函数。当有请求连接过来时,后续会调用此处理函数。
if (createSocketAcceptHandler(&server.ipfd, acceptTcpHandler) != C_OK) {
serverPanic("Unrecoverable error creating TCP socket accept handler.");
}
.....
}
1.1 创建事件循环
创建事件循环里面会调用aeApiCreate函数。根据当前系统的不同,选择不同的IO处理方式。如select,epoll等。
如果选择了epoll,则会调用ae_epoll.c文件的aeApiCreate函数。
aeEventLoop *aeCreateEventLoop(int setsize) {
aeEventLoop *eventLoop;
int i;
monotonicInit(); /* just in case the calling app didn't initialize */
if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
eventLoop->setsize = setsize;
eventLoop->timeEventHead = NULL;
eventLoop->timeEventNextId = 0;
eventLoop->stop = 0;
eventLoop->maxfd = -1;
eventLoop->beforesleep = NULL;
eventLoop->aftersleep = NULL;
eventLoop->flags = 0;
if (aeApiCreate(eventLoop) == -1) goto err;
/* Events with mask == AE_NONE are not set. So let's initialize the
* vector with it. */
for (i = 0; i < setsize; i++)
eventLoop->events[i].mask = AE_NONE;
return eventLoop;
err:
if (eventLoop) {
zfree(eventLoop->events);
zfree(eventLoop->fired);
zfree(eventLoop);
}
return NULL;
}
// ae_epoll.c文件
static int aeApiCreate(aeEventLoop *eventLoop) {
aeApiState *state = zmalloc(sizeof(aeApiState));
if (!state) return -1;
state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
if (!state->events) {
zfree(state);
return -1;
}
state->epfd = epoll_create(1024); /* 创建epoll对象 */
if (state->epfd == -1) {
zfree(state->events);
zfree(state);
return -1;
}
anetCloexec(state->epfd);
eventLoop->apidata = state;
return 0;
}
1.2 添加时间事件
创建时间事件aeCreateTimeEvent,则是创建一个时间事件,然后添加倒事件循环的timeEventHead队列中。
在之后进行操作时会从队列中拿出要执行的时间事件,然后进行处理
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
aeTimeProc *proc, void *clientData,
aeEventFinalizerProc *finalizerProc)
{
long long id = eventLoop->timeEventNextId++;
aeTimeEvent *te;
te = zmalloc(sizeof(*te));
if (te == NULL) return AE_ERR;
te->id = id;
te->when = getMonotonicUs() + milliseconds * 1000;
te->timeProc = proc;
te->finalizerProc = finalizerProc;
te->clientData = clientData;
te->prev = NULL;
te->next = eventLoop->timeEventHead;
te->refcount = 0;
if (te->next)
te->next->prev = te;
eventLoop->timeEventHead = te;
return id;
}
1.3 添加文件事件
可以看出,为监听的socket添加回调方法,底层其实是像事件循环添加文件事件。
在aeCreateFileEvent函数中,会调用aeApiAddEvent函数。以epoll为例,则会将文件描述符加入到事件循环的初始化时创建的epoll对象中。后续进行监听。
另外,传入的回调函数则是被赋值到rfileProc和wfileProc。表示当前事件的读写回调函数一开始都为acceptTcpHandler。该函数内容后续分析。
int createSocketAcceptHandler(socketFds *sfd, aeFileProc *accept_handler) {
int j;
for (j = 0; j < sfd->count; j++) {
if (aeCreateFileEvent(server.el, sfd->fd[j], AE_READABLE, accept_handler,NULL) == AE_ERR) {
/* Rollback */
for (j = j-1; j >= 0; j--) aeDeleteFileEvent(server.el, sfd->fd[j], AE_READABLE);
return C_ERR;
}
}
return C_OK;
}
// ae.c文件
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
if (fd >= eventLoop->setsize) {
errno = ERANGE;
return AE_ERR;
}
aeFileEvent *fe = &eventLoop->events[fd];
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
return AE_OK;
}
// ae_epoll.c
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee = {0}; /* avoid valgrind warning */
/* If the fd was already monitored for some event, we need a MOD
* operation. Otherwise we need an ADD operation. */
int op = eventLoop->events[fd].mask == AE_NONE ?
EPOLL_CTL_ADD : EPOLL_CTL_MOD;
ee.events = 0;
mask |= eventLoop->events[fd].mask; /* Merge old events */
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.fd = fd;
if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
return 0;
}
1.4 创建多线程
IO多线程的创建则是通过server.c/InitServerLast函数创建的。
当未开启多线程模式的时候是直接返回的。
void InitServerLast() {
bioInit();
initThreadedIO();
set_jemalloc_bg_thread(server.jemalloc_bg_thread);
server.initial_memory_usage = zmalloc_used_memory();
}
/* Initialize the data structures needed for threaded I/O. */
void initThreadedIO(void) {
server.io_threads_active = 0; /* We start with threads not active. */
// 如果没有开启多线程模式,则返回。使用主线程处理。
if (server.io_threads_num == 1) return;
if (server.io_threads_num > IO_THREADS_MAX_NUM) {
serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
"The maximum number is %d.", IO_THREADS_MAX_NUM);
exit(1);
}
/* Spawn and initialize the I/O threads. */
for (int i = 0; i < server.io_threads_num; i++) {
/* Things we do for all the threads including the main thread. */
io_threads_list[i] = listCreate();
if (i == 0) continue; /* Thread 0 is the main thread. */
/* Things we do only for the additional threads. */
pthread_t tid;
pthread_mutex_init(&io_threads_mutex[i],NULL);
setIOPendingCount(i, 0);
pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */
// 创建线程,执行的代码是IOThreadMain方法
if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
exit(1);
}
io_threads[i] = tid;
}
}
2.网络通信过程
当一些初始化工作完成之后,通过调用aeMain方法开始事件循环。
通过下面的代码可以看出来,执行的顺序为:
beforesleep -> aeApiPoll -> aftersleep -> 对需要处理的文件事件进行处理 -> 处理时间事件
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
aeProcessEvents(eventLoop, AE_ALL_EVENTS|
AE_CALL_BEFORE_SLEEP|
AE_CALL_AFTER_SLEEP);
}
}
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
....
// 调用beforesleep
if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
eventLoop->beforesleep(eventLoop);
// 调用相应的poll方法。其中tvp超时时间会通过最近的时间事件算出。
numevents = aeApiPoll(eventLoop, tvp);
// 调用aftersleep
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
eventLoop->aftersleep(eventLoop);
// 对需要处理的事件进行处理
for (j = 0; j < numevents; j++) {
int fd = eventLoop->fired[j].fd;
aeFileEvent *fe = &eventLoop->events[fd];
int mask = eventLoop->fired[j].mask;
int fired = 0; /* Number of events fired for current fd. */
if (!invert && fe->mask & mask & AE_READABLE) {
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
}
/* Fire the writable event. */
if (fe->mask & mask & AE_WRITABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
/* If we have to invert the call, fire the readable event now
* after the writable one. */
if (invert) {
fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
if ((fe->mask & mask & AE_READABLE) &&
(!fired || fe->wfileProc != fe->rfileProc))
{
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
processed++;
}
}
// 执行时间事件
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
return processed; /* return the number of processed file/time events */
}
2.1 aeApiPoll等处理
这里先绕过beforesleep方法以及aftersleep方法。看一下一个事件来了之后会发生什么。
aeApiPoll方法会把需要处理的文件描述符放入到事件循环的fired队列中。
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + (tvp->tv_usec + 999)/1000) : -1);
if (retval > 0) {
int j;
numevents = retval;
for (j = 0; j < numevents; j++) {
int mask = 0;
struct epoll_event *e = state->events+j;
if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE;
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask; // 加入事件循环的fired队列中
}
} else if (retval == -1 && errno != EINTR) {
panic("aeApiPoll: epoll_wait, %s", strerror(errno));
}
return numevents;
}
之后在外部函数中,也就是上面的aeProcessEvents函数中,会对这些激活的事件进行处理。而处理方法就是通过是读事件还是写事件执行相应的回调函数。
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
....
for (j = 0; j < numevents; j++) {
int fd = eventLoop->fired[j].fd;
aeFileEvent *fe = &eventLoop->events[fd];
int mask = eventLoop->fired[j].mask;
int fired = 0; /* Number of events fired for current fd. */
if (!invert && fe->mask & mask & AE_READABLE) {
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
}
/* Fire the writable event. */
if (fe->mask & mask & AE_WRITABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
/* If we have to invert the call, fire the readable event now
* after the writable one. */
if (invert) {
fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
if ((fe->mask & mask & AE_READABLE) &&
(!fired || fe->wfileProc != fe->rfileProc))
{
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
processed++;
}
}
...
}
还记得一开始的添加文件事件时候调用的createSocketAcceptHandler方法吗?当时为事件注册了统一的回调函数acceptTcpHandler。因此不管是读写事件,此时都会调用该方法进行处理。
在该方法中,会依次调用accept方法,获取全连接队列里面的请求。然后为每个请求新生成一个socket。
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
char cip[NET_IP_STR_LEN];
UNUSED(el);
UNUSED(mask);
UNUSED(privdata);
while(max--) {
// accept并且生成一个新的socket
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
if (cfd == ANET_ERR) {
if (errno != EWOULDBLOCK)
serverLog(LL_WARNING,
"Accepting client connection: %s", server.neterr);
return;
}
anetCloexec(cfd);
serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
// 处理这个新的socket
acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
}
}
对于新生成的socket,进行封装。而其中比较核心的一点是会为这个socket注册回调函数。
static void acceptCommonHandler(connection *conn, int flags, char *ip) {
...
/* Create connection and client */
if ((c = createClient(conn)) == NULL) {
serverLog(LL_WARNING,
"Error registering fd event for the new client: %s (conn: %s)",
connGetLastError(conn),
connGetInfo(conn, conninfo, sizeof(conninfo)));
connClose(conn); /* May be already closed, just ignore errors */
return;
}
...
}
client *createClient(connection *conn) {
client *c = zmalloc(sizeof(client));
if (conn) {
connNonBlock(conn);
connEnableTcpNoDelay(conn);
if (server.tcpkeepalive)
connKeepAlive(conn,server.tcpkeepalive);
// 设置回调函数,此处readQueryFromClient就是读取处理的关键函数
connSetReadHandler(conn, readQueryFromClient);
connSetPrivateData(conn, c);
}
...
return c;
}
// 最后会调用到这个方法
static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
if (func == conn->read_handler) return C_OK;
conn->read_handler = func;
if (!conn->read_handler)
aeDeleteFileEvent(server.el,conn->fd,AE_READABLE);
else
// 回调函数为ae_handler。此后该连接有数据来时,会使用该回调函数进行处理
if (aeCreateFileEvent(server.el,conn->fd,
AE_READABLE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR;
return C_OK;
}
当此后连接有读事件发生时,回调函数会在读事件发生时,使用readQueryFromClient方法进行相应的处理。
未完待续