redis源码分析——1、网络框架

我们知道redis用的epoll,但是底层的代码到底是怎样一步步起来的,本文解读redis的网络框架,一探究竟。

一、 核心数据结构

  1. ConnectionTypeConnectionType定义了网络连接的接口,包含readwrite等,具体定义如下。

    typedef struct ConnectionType {
        void (*ae_handler)(struct aeEventLoop *el, int fd, void *clientData, int mask);
        int (*connect)(struct connection *conn, const char *addr, int port, const char *source_addr, ConnectionCallbackFunc connect_handler);
        int (*write)(struct connection *conn, const void *data, size_t data_len);
        int (*read)(struct connection *conn, void *buf, size_t buf_len);
        void (*close)(struct connection *conn);
        int (*accept)(struct connection *conn, ConnectionCallbackFunc accept_handler);
        int (*set_write_handler)(struct connection *conn, ConnectionCallbackFunc handler, int barrier);
        int (*set_read_handler)(struct connection *conn, ConnectionCallbackFunc handler);
        const char *(*get_last_error)(struct connection *conn);
        int (*blocking_connect)(struct connection *conn, const char *addr, int port, long long timeout);
        ssize_t (*sync_write)(struct connection *conn, char *ptr, ssize_t size, long long timeout);
        ssize_t (*sync_read)(struct connection *conn, char *ptr, ssize_t size, long long timeout);
        ssize_t (*sync_readline)(struct connection *conn, char *ptr, ssize_t size, long long timeout);
    } ConnectionType;
    

    在源码中实现了两种connection,分别是CT_SocketCT_TLS,这两种也是redis支持的连接类型,redis也支持本地unix socket,但这归根也是socket,其读写和网络socket是一样的。这两种的实现分别如下:

    CT_Socket:

    ConnectionType CT_Socket = {
        .ae_handler = connSocketEventHandler,
        .close = connSocketClose,
        .write = connSocketWrite,
        .read = connSocketRead,
        .accept = connSocketAccept,
        .connect = connSocketConnect,
        .set_write_handler = connSocketSetWriteHandler,
        .set_read_handler = connSocketSetReadHandler,
        .get_last_error = connSocketGetLastError,
        .blocking_connect = connSocketBlockingConnect,
        .sync_write = connSocketSyncWrite,
        .sync_read = connSocketSyncRead,
        .sync_readline = connSocketSyncReadLine
    };
    

    CT_TLS:

    ConnectionType CT_TLS = {
        .ae_handler = tlsEventHandler,
        .accept = connTLSAccept,
        .connect = connTLSConnect,
        .blocking_connect = connTLSBlockingConnect,
        .read = connTLSRead,
        .write = connTLSWrite,
        .close = connTLSClose,
        .set_write_handler = connTLSSetWriteHandler,
        .set_read_handler = connTLSSetReadHandler,
        .get_last_error = connTLSGetLastError,
        .sync_write = connTLSSyncWrite,
        .sync_read = connTLSSyncRead,
        .sync_readline = connTLSSyncReadLine,
    }
    

    这个结构是核心,后续的各种网络操作,都是通过ConnectionType中的指针调用的。

  2. **struct connection:**该结构的一个完成的连接,客户端的fd封装成一个connection。该结构与ConnectionType配合使用,可以认为ConnectionType是操作接口(所有函数的第一个参数都是connection),而struct connection是操作对象,set_r_w_hander都是设置struct connection中的handler

    struct connection {
        ConnectionType *type;
        ConnectionState state;
        short int flags;
        short int refs;
        int last_errno;
        void *private_data;
        ConnectionCallbackFunc conn_handler;
        ConnectionCallbackFunc write_handler;
        ConnectionCallbackFunc read_handler;
        int fd;
    };
    
  3. aeEventLoop,redis是单线程非阻塞io,网络的结构封装在aeEventLoop结构中

    /* State of an event based program */
    typedef struct aeEventLoop {
        int maxfd;   /* highest file descriptor currently registered */
        int setsize; /* max number of file descriptors tracked */
        long long timeEventNextId;
        time_t lastTime;     /* Used to detect system clock skew */
        aeFileEvent *events; /* Registered events */
        aeFiredEvent *fired; /* Fired events */
        aeTimeEvent *timeEventHead;
        int stop;
        void *apidata; /* This is used for polling API specific data */
        aeBeforeSleepProc *beforesleep;
        aeBeforeSleepProc *aftersleep;
        int flags;
    } aeEventLoop;
    

    最核心的两个数据就是eventsfired,分别代表了在监听的fd有事件触发的fd。

    aeFileEvent:

    /* File event structure */
    typedef struct aeFileEvent {
        int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
        aeFileProc *rfileProc;
        aeFileProc *wfileProc;
        void *clientData;
    } aeFileEvent;
    

    其中rfileProcwfileProc分别是该事件对应的读写回调。

    aeFiredEvent:

    /* A fired event */
    typedef struct aeFiredEvent {
        int fd;
        int mask;
    } aeFiredEvent;
    

    aeFiredEvent的定义很简单,就一个fd,再加一个该fd是否可读写。

    可以看到,在aeEventLoop中定义的eventsfired都是指针,指向了外面分配的一个数组,其大小是server.maxclients+**CONFIG_FDSET_INCR**

二、网络框架

  1. 等待连接

    main()
    initServer()
    	server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR)
    	listenToPort(server.port,server.ipfd,&server.ipfd_count)
    	aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL)
    		// 设置rfileProc和wfileProc都为 acceptTcpHandler
    InitServerLast()
    	initThreadedIO()
    		pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i)
    aeMain(server.el)
    	aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
    		fe->rfileProc(eventLoop,fd,fe->clientData,mask); // 如果触发的监听的fd,则走acceptTcpHandler回调
    			acceptTcpHandler()
    				anetTcpAccept()
                    connCreateAcceptedSocket()
                    	connCreateSocket()
                    		conn->type = &CT_Socket;  // 最终创建了ConnectionType为CT_Socket的一个连接
    				acceptCommonHandler()
                        createClient() // 创建一个真正的client
                        	connSetReadHandler(conn, readQueryFromClient); // 个客户端fd设置 readQueryFromClient 回调,epoll触发时调用
    							set_read_handler->connSocketSetReadHandler(conn, func)
                                    aeCreateFileEvent(server.el,conn->fd, AE_READABLE,conn->type->ae_handler,conn) // 注册fd
                        	linkClient()
                        		raxInsert(server.clients_index,(unsigned char*)&id,sizeof(id),c,NULL);
    					connAccept(conn, clientAcceptHandler) 
                            connSocketAccept
                            	callHandler(conn, accept_handler)
                            		accept->connSocketAccept
    		fe->wfileProc(eventLoop,fd,fe->clientData,mask);
    
  2. 读请求

    aeMain(server.el)
    	aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
    		fe->rfileProc(eventLoop,fd,fe->clientData,mask); // 如果触发的是客户端fd,则走 readQueryFromClient 回调
    			readQueryFromClient()
    

三、主要函数

  1. aeCreateFileEvent: 该函数将redis监听的fd都加到epoll队列中,并且对fd绑定了回调函数acceptTcpHandler用于接收客户端的连接请求。

    int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
            aeFileProc *proc, void *clientData)
    {
        if (fd >= eventLoop->setsize) {
            errno = ERANGE;
            return AE_ERR;
        }
        aeFileEvent *fe = &eventLoop->events[fd];
     
        if (aeApiAddEvent(eventLoop, fd, mask) == -1) // 将监听的fd加到epoll
            return AE_ERR;
        fe->mask |= mask;
        if (mask & AE_READABLE) fe->rfileProc = proc;  // 读写回调都是 acceptTcpHandler
        if (mask & AE_WRITABLE) fe->wfileProc = proc;
        fe->clientData = clientData;
        if (fd > eventLoop->maxfd)
            eventLoop->maxfd = fd;
        return AE_OK;
    }
    
  2. acceptTcpHandler: 用于接收客户端连接请求,其中anetTcpAccept内部是系统调用accept接口。该函数内部的acceptCommonHandler是重要操作,通过connCreateAcceptedSocket将客户端的fd转换成一个connection,在connCreateAcceptedSocket内部通过connCreateSocket创建一个connection,其类型为CT_Socket

    void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
        int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
        char cip[NET_IP_STR_LEN];
        UNUSED(el);
        UNUSED(mask);
        UNUSED(privdata);
     
        while(max--) {
            cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport); // 内部调用accept,返回成功fd
            if (cfd == ANET_ERR) {
                if (errno != EWOULDBLOCK)
                    serverLog(LL_WARNING,
                        "Accepting client connection: %s", server.neterr);
                return;
            }
            serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
            acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
        }
    }
    
  3. **acceptCommonHandler:**根据connection创建一个client,并将该客户端链接到server.clients。在新建client的时候有个重要操作connSetReadHandler,这个操作绑定了客户端的读取实现。

    static void acceptCommonHandler(connection *conn, int flags, char *ip) {
        client *c;
        UNUSED(ip);
     
        /* Admission control will happen before a client is created and connAccept()
         * called, because we don't want to even start transport-level negotiation
         * if rejected.
         */
        if (listLength(server.clients) >= server.maxclients) {
            char *err = "-ERR max number of clients reached\r\n";
     
            /* That's a best effort error message, don't check write errors.
             * Note that for TLS connections, no handshake was done yet so nothing is written
             * and the connection will just drop.
             */
            if (connWrite(conn,err,strlen(err)) == -1) {
                /* Nothing to do, Just to avoid the warning... */
            }
            server.stat_rejected_conn++;
            connClose(conn);
            return;
        }
     
        /* Create connection and client */
        if ((c = createClient(conn)) == NULL) {  // 这里创建了客户端,并且画挂载到server.clients列表
            char conninfo[100];
            serverLog(LL_WARNING,
                "Error registering fd event for the new client: %s (conn: %s)",
                connGetLastError(conn),
                connGetInfo(conn, conninfo, sizeof(conninfo)));
            connClose(conn); /* May be already closed, just ignore errors */
            return;
        }
     
        /* Last chance to keep flags */
        c->flags |= flags;
     
        /* Initiate accept.
         *
         * Note that connAccept() is free to do two things here:
         * 1. Call clientAcceptHandler() immediately;
         * 2. Schedule a future call to clientAcceptHandler().
         *
         * Because of that, we must do nothing else afterwards.
         */
        if (connAccept(conn, clientAcceptHandler) == C_ERR) { 
            char conninfo[100];
            if (connGetState(conn) == CONN_STATE_ERROR)
                serverLog(LL_WARNING,
                        "Error accepting a client connection: %s (conn: %s)",
                        connGetLastError(conn), connGetInfo(conn, conninfo, sizeof(conninfo)));
            freeClient(connGetPrivateData(conn));
            return;
        }
    }
    
  4. **createClient:**最主要的是connSetReadHandler,其实是调用CT_Socketset_read_handler接口,也就是connSocketSetReadHandler函数,读取请求就是readQueryFromClient函数。

    client *createClient(connection *conn) {
        client *c = zmalloc(sizeof(client));
    
        /* passing NULL as conn it is possible to create a non connected client.
         * This is useful since all the commands needs to be executed
         * in the context of a client. When commands are executed in other
         * contexts (for instance a Lua script) we need a non connected client. */
        if (conn) {
            connNonBlock(conn);
            connEnableTcpNoDelay(conn);
            if (server.tcpkeepalive)
                connKeepAlive(conn,server.tcpkeepalive);
            connSetReadHandler(conn, readQueryFromClient);
            connSetPrivateData(conn, c);
        }
        省略代码/
    
  5. **connSocketSetReadHandler:**该函数的目的是把客户端的connection注册到epoll队列中,等待读写触发,从而实现了非阻塞。

    /* Register a read handler, to be called when the connection is readable.
     * If NULL, the existing handler is removed.
     */
    static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
        if (func == conn->read_handler) return C_OK;
    
        conn->read_handler = func;
        if (!conn->read_handler)
            aeDeleteFileEvent(server.el,conn->fd,AE_READABLE);
        else
            if (aeCreateFileEvent(server.el,conn->fd,
                        AE_READABLE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR;
        return C_OK;
    }
    

原文: http://ditanshow.com/articles/p/313.html

上一篇:一个用来把一个工作簿按其中一个工作表关键词列拆分成多个工作簿的VBA代码


下一篇:国民经济行业分类与代码GB/T 4754-2002、GB/T 4754-2011、GB/T 4754-2017 存入mysql数据库