我们知道redis用的epoll,但是底层的代码到底是怎样一步步起来的,本文解读redis的网络框架,一探究竟。
一、 核心数据结构
-
ConnectionType,
ConnectionType
定义了网络连接的接口,包含read
、write
等,具体定义如下。typedef struct ConnectionType { void (*ae_handler)(struct aeEventLoop *el, int fd, void *clientData, int mask); int (*connect)(struct connection *conn, const char *addr, int port, const char *source_addr, ConnectionCallbackFunc connect_handler); int (*write)(struct connection *conn, const void *data, size_t data_len); int (*read)(struct connection *conn, void *buf, size_t buf_len); void (*close)(struct connection *conn); int (*accept)(struct connection *conn, ConnectionCallbackFunc accept_handler); int (*set_write_handler)(struct connection *conn, ConnectionCallbackFunc handler, int barrier); int (*set_read_handler)(struct connection *conn, ConnectionCallbackFunc handler); const char *(*get_last_error)(struct connection *conn); int (*blocking_connect)(struct connection *conn, const char *addr, int port, long long timeout); ssize_t (*sync_write)(struct connection *conn, char *ptr, ssize_t size, long long timeout); ssize_t (*sync_read)(struct connection *conn, char *ptr, ssize_t size, long long timeout); ssize_t (*sync_readline)(struct connection *conn, char *ptr, ssize_t size, long long timeout); } ConnectionType;
在源码中实现了两种connection,分别是
CT_Socket
和CT_TLS
,这两种也是redis支持的连接类型,redis也支持本地unix socket,但这归根也是socket,其读写和网络socket是一样的。这两种的实现分别如下:CT_Socket:
ConnectionType CT_Socket = { .ae_handler = connSocketEventHandler, .close = connSocketClose, .write = connSocketWrite, .read = connSocketRead, .accept = connSocketAccept, .connect = connSocketConnect, .set_write_handler = connSocketSetWriteHandler, .set_read_handler = connSocketSetReadHandler, .get_last_error = connSocketGetLastError, .blocking_connect = connSocketBlockingConnect, .sync_write = connSocketSyncWrite, .sync_read = connSocketSyncRead, .sync_readline = connSocketSyncReadLine };
CT_TLS:
ConnectionType CT_TLS = { .ae_handler = tlsEventHandler, .accept = connTLSAccept, .connect = connTLSConnect, .blocking_connect = connTLSBlockingConnect, .read = connTLSRead, .write = connTLSWrite, .close = connTLSClose, .set_write_handler = connTLSSetWriteHandler, .set_read_handler = connTLSSetReadHandler, .get_last_error = connTLSGetLastError, .sync_write = connTLSSyncWrite, .sync_read = connTLSSyncRead, .sync_readline = connTLSSyncReadLine, }
这个结构是核心,后续的各种网络操作,都是通过
ConnectionType
中的指针调用的。 -
**struct connection:**该结构的一个完成的连接,客户端的fd封装成一个
connection
。该结构与ConnectionType
配合使用,可以认为ConnectionType
是操作接口(所有函数的第一个参数都是connection
),而struct connection
是操作对象,set_r_w_hander
都是设置struct connection
中的handler
。struct connection { ConnectionType *type; ConnectionState state; short int flags; short int refs; int last_errno; void *private_data; ConnectionCallbackFunc conn_handler; ConnectionCallbackFunc write_handler; ConnectionCallbackFunc read_handler; int fd; };
-
aeEventLoop,redis是单线程非阻塞io,网络的结构封装在
aeEventLoop
结构中/* State of an event based program */ typedef struct aeEventLoop { int maxfd; /* highest file descriptor currently registered */ int setsize; /* max number of file descriptors tracked */ long long timeEventNextId; time_t lastTime; /* Used to detect system clock skew */ aeFileEvent *events; /* Registered events */ aeFiredEvent *fired; /* Fired events */ aeTimeEvent *timeEventHead; int stop; void *apidata; /* This is used for polling API specific data */ aeBeforeSleepProc *beforesleep; aeBeforeSleepProc *aftersleep; int flags; } aeEventLoop;
最核心的两个数据就是
events
和fired
,分别代表了在监听的fd有事件触发的fd。aeFileEvent:
/* File event structure */ typedef struct aeFileEvent { int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */ aeFileProc *rfileProc; aeFileProc *wfileProc; void *clientData; } aeFileEvent;
其中
rfileProc
和wfileProc
分别是该事件对应的读写回调。aeFiredEvent:
/* A fired event */ typedef struct aeFiredEvent { int fd; int mask; } aeFiredEvent;
aeFiredEvent
的定义很简单,就一个fd,再加一个该fd是否可读写。可以看到,在
aeEventLoop
中定义的events
和fired
都是指针,指向了外面分配的一个数组,其大小是server.maxclients+**CONFIG_FDSET_INCR**
二、网络框架
-
等待连接
main() initServer() server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR) listenToPort(server.port,server.ipfd,&server.ipfd_count) aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) // 设置rfileProc和wfileProc都为 acceptTcpHandler InitServerLast() initThreadedIO() pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) aeMain(server.el) aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP); fe->rfileProc(eventLoop,fd,fe->clientData,mask); // 如果触发的监听的fd,则走acceptTcpHandler回调 acceptTcpHandler() anetTcpAccept() connCreateAcceptedSocket() connCreateSocket() conn->type = &CT_Socket; // 最终创建了ConnectionType为CT_Socket的一个连接 acceptCommonHandler() createClient() // 创建一个真正的client connSetReadHandler(conn, readQueryFromClient); // 个客户端fd设置 readQueryFromClient 回调,epoll触发时调用 set_read_handler->connSocketSetReadHandler(conn, func) aeCreateFileEvent(server.el,conn->fd, AE_READABLE,conn->type->ae_handler,conn) // 注册fd linkClient() raxInsert(server.clients_index,(unsigned char*)&id,sizeof(id),c,NULL); connAccept(conn, clientAcceptHandler) connSocketAccept callHandler(conn, accept_handler) accept->connSocketAccept fe->wfileProc(eventLoop,fd,fe->clientData,mask);
-
读请求
aeMain(server.el) aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP); fe->rfileProc(eventLoop,fd,fe->clientData,mask); // 如果触发的是客户端fd,则走 readQueryFromClient 回调 readQueryFromClient()
三、主要函数
-
aeCreateFileEvent: 该函数将redis监听的fd都加到epoll队列中,并且对fd绑定了回调函数
acceptTcpHandler
用于接收客户端的连接请求。int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData) { if (fd >= eventLoop->setsize) { errno = ERANGE; return AE_ERR; } aeFileEvent *fe = &eventLoop->events[fd]; if (aeApiAddEvent(eventLoop, fd, mask) == -1) // 将监听的fd加到epoll return AE_ERR; fe->mask |= mask; if (mask & AE_READABLE) fe->rfileProc = proc; // 读写回调都是 acceptTcpHandler if (mask & AE_WRITABLE) fe->wfileProc = proc; fe->clientData = clientData; if (fd > eventLoop->maxfd) eventLoop->maxfd = fd; return AE_OK; }
-
acceptTcpHandler: 用于接收客户端连接请求,其中
anetTcpAccept
内部是系统调用accept
接口。该函数内部的acceptCommonHandler
是重要操作,通过connCreateAcceptedSocket
将客户端的fd转换成一个connection
,在connCreateAcceptedSocket
内部通过connCreateSocket
创建一个connection
,其类型为CT_Socket
。void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { int cport, cfd, max = MAX_ACCEPTS_PER_CALL; char cip[NET_IP_STR_LEN]; UNUSED(el); UNUSED(mask); UNUSED(privdata); while(max--) { cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport); // 内部调用accept,返回成功fd if (cfd == ANET_ERR) { if (errno != EWOULDBLOCK) serverLog(LL_WARNING, "Accepting client connection: %s", server.neterr); return; } serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport); acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip); } }
-
**acceptCommonHandler:**根据
connection
创建一个client,并将该客户端链接到server.clients
。在新建client的时候有个重要操作connSetReadHandler
,这个操作绑定了客户端的读取实现。static void acceptCommonHandler(connection *conn, int flags, char *ip) { client *c; UNUSED(ip); /* Admission control will happen before a client is created and connAccept() * called, because we don't want to even start transport-level negotiation * if rejected. */ if (listLength(server.clients) >= server.maxclients) { char *err = "-ERR max number of clients reached\r\n"; /* That's a best effort error message, don't check write errors. * Note that for TLS connections, no handshake was done yet so nothing is written * and the connection will just drop. */ if (connWrite(conn,err,strlen(err)) == -1) { /* Nothing to do, Just to avoid the warning... */ } server.stat_rejected_conn++; connClose(conn); return; } /* Create connection and client */ if ((c = createClient(conn)) == NULL) { // 这里创建了客户端,并且画挂载到server.clients列表 char conninfo[100]; serverLog(LL_WARNING, "Error registering fd event for the new client: %s (conn: %s)", connGetLastError(conn), connGetInfo(conn, conninfo, sizeof(conninfo))); connClose(conn); /* May be already closed, just ignore errors */ return; } /* Last chance to keep flags */ c->flags |= flags; /* Initiate accept. * * Note that connAccept() is free to do two things here: * 1. Call clientAcceptHandler() immediately; * 2. Schedule a future call to clientAcceptHandler(). * * Because of that, we must do nothing else afterwards. */ if (connAccept(conn, clientAcceptHandler) == C_ERR) { char conninfo[100]; if (connGetState(conn) == CONN_STATE_ERROR) serverLog(LL_WARNING, "Error accepting a client connection: %s (conn: %s)", connGetLastError(conn), connGetInfo(conn, conninfo, sizeof(conninfo))); freeClient(connGetPrivateData(conn)); return; } }
-
**createClient:**最主要的是
connSetReadHandler
,其实是调用CT_Socket
的set_read_handler
接口,也就是connSocketSetReadHandler
函数,读取请求就是readQueryFromClient
函数。client *createClient(connection *conn) { client *c = zmalloc(sizeof(client)); /* passing NULL as conn it is possible to create a non connected client. * This is useful since all the commands needs to be executed * in the context of a client. When commands are executed in other * contexts (for instance a Lua script) we need a non connected client. */ if (conn) { connNonBlock(conn); connEnableTcpNoDelay(conn); if (server.tcpkeepalive) connKeepAlive(conn,server.tcpkeepalive); connSetReadHandler(conn, readQueryFromClient); connSetPrivateData(conn, c); } 省略代码/
-
**connSocketSetReadHandler:**该函数的目的是把客户端的
connection
注册到epoll队列中,等待读写触发,从而实现了非阻塞。/* Register a read handler, to be called when the connection is readable. * If NULL, the existing handler is removed. */ static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func) { if (func == conn->read_handler) return C_OK; conn->read_handler = func; if (!conn->read_handler) aeDeleteFileEvent(server.el,conn->fd,AE_READABLE); else if (aeCreateFileEvent(server.el,conn->fd, AE_READABLE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR; return C_OK; }
原文: http://ditanshow.com/articles/p/313.html