redis基础

一、关于redis的基本说明

What does Redis actually mean?

It means REmote DIctionary Server.

Redis clients communicate with the Redis server using a protocol called RESP (REdis Serialization Protocol). While the protocol was designed specifically for Redis, it can be used for other client-server software projects.

RESP is a compromise between the following things:

Simple to implement.

Fast to parse.

Human readable.

key-value格式的一些限制

It is not hard, but we did not yet check how we create following / follower relationships. If user ID 1000 (antirez) wants to follow user ID 5000 (pippo), we need to create both a following and a follower relationship. We just need to ZADD calls:

    ZADD following:1000 5000

    ZADD followers:5000 1000

Note the same pattern again and again. In theory with a relational database, the list of following and followers would be contained in a single table with fields like following_id and follower_id. You can extract the followers or following of every user using an SQL query. With a key-value DB things are a bit different since we need to set both the 1000 is following 5000 and 5000 is followed by 1000 relations. This is the price to pay, but on the other hand accessing the data is simpler and extremely fast. Having these things as separate sets allows us to do interesting stuff. For example, using ZINTERSTORE we can have the intersection of 'following' of two different users, so we may add a feature to our Twitter clone so that it is able to tell you very quickly when you visit somebody else's profile, "you and Alice have 34 followers in common", and things like that.

You can find the code that sets or removes a following / follower relation in the follow.php file. 

二、一些基本的数据结构

 1、构建

 

直接在根目录下执行构建就好,为了便于调试,这里禁掉优化

make OPTIMIZATION= V=1 

2、客户端连接的接受处理

 void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {

    int cport, cfd, max = MAX_ACCEPTS_PER_CALL;

    char cip[NET_IP_STR_LEN];

    UNUSED(el);

    UNUSED(mask);

    UNUSED(privdata);

 

    while(max--) {

        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);

        if (cfd == ANET_ERR) {

            if (errno != EWOULDBLOCK)

                serverLog(LL_WARNING,

                    "Accepting client connection: %s", server.neterr);

            return;

        }

        serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);

        acceptCommonHandler(cfd,0,cip);

    }

3、每个client的创建

client *createClient(int fd) {

    client *c = zmalloc(sizeof(client));

 

    /* passing -1 as fd it is possible to create a non connected client.

     * This is useful since all the commands needs to be executed

     * in the context of a client. When commands are executed in other

     * contexts (for instance a Lua script) we need a non connected client. */

    if (fd != -1) {

        anetNonBlock(NULL,fd);

        anetEnableTcpNoDelay(NULL,fd);

        if (server.tcpkeepalive)

            anetKeepAlive(NULL,fd,server.tcpkeepalive);

        if (aeCreateFileEvent(server.el,fd,AE_READABLE,

            readQueryFromClient, c) == AE_ERR)

        {

            close(fd);

            zfree(c);

            return NULL;

        }

    }

 

 ……

    initClientMultiState(c);

    return c;

4、每个客户端数据的处理

void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {

    client *c = (client*) privdata;

    int nread, readlen;

    size_t qblen;

    UNUSED(el);

    UNUSED(mask);

 

    readlen = PROTO_IOBUF_LEN;

    /* If this is a multi bulk request, and we are processing a bulk reply

     * that is large enough, try to maximize the probability that the query

     * buffer contains exactly the SDS string representing the object, even

     * at the risk of requiring more read(2) calls. This way the function

     * processMultiBulkBuffer() can avoid copying buffers to create the

     * Redis Object representing the argument. */

    if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1

        && c->bulklen >= PROTO_MBULK_BIG_ARG)

    {

        int remaining = (unsigned)(c->bulklen+2)-sdslen(c->querybuf);

 

        if (remaining < readlen) readlen = remaining;

    }

 

    qblen = sdslen(c->querybuf);

    if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;

    c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);

    nread = read(fd, c->querybuf+qblen, readlen);

……

5、binary格式内容的传输

由于字符串是"$长度\r\rblob"的格式,在开始已经包含了字符串的长度,所以这个字符串中的内容可以是任意的。

static int processBulkItem(redisReader *r) {

    redisReadTask *cur = &(r->rstack[r->ridx]);

    void *obj = NULL;

    char *p, *s;

    long len;

    unsigned long bytelen;

    int success = 0;

 

    p = r->buf+r->pos;

    s = seekNewline(p,r->len-r->pos);

    if (s != NULL) {

        p = r->buf+r->pos;

        bytelen = s-(r->buf+r->pos)+2; /* include \r\n */

        len = readLongLong(p);

……

} 

三、异步框架中定时器和epoll的兼容问题

在异步框架中,定时器和select操作都要依赖系统调用,在以阻塞状态poll进入系统调用之后,用户态定时器检查就没办法进行。所以在redis的处理函数中,在poll进入系统之前,需要计算最近一次定时器到来的时间,保证在系统poll调用的时候不会丢失定时器:

int aeProcessEvents(aeEventLoop *eventLoop, int flags)

{

……

        aeTimeEvent *shortest = NULL;

        struct timeval tv, *tvp;

 

        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))

            shortest = aeSearchNearestTimer(eventLoop);

        if (shortest) {

            long now_sec, now_ms;

 

            aeGetTime(&now_sec, &now_ms);

            tvp = &tv;

 

            /* How many milliseconds we need to wait for the next

             * time event to fire? */

            long long ms =

                (shortest->when_sec - now_sec)*1000 +

                shortest->when_ms - now_ms;

 

            if (ms > 0) {

                tvp->tv_sec = ms/1000;

                tvp->tv_usec = (ms % 1000)*1000;

            } else {

                tvp->tv_sec = 0;

                tvp->tv_usec = 0;

            }

        } else {

            /* If we have to check for events but need to return

             * ASAP because of AE_DONT_WAIT we need to set the timeout

             * to zero */

            if (flags & AE_DONT_WAIT) {

                tv.tv_sec = tv.tv_usec = 0;

                tvp = &tv;

            } else {

                /* Otherwise we can block */

                tvp = NULL; /* wait forever */

            }

        }

 

        numevents = aeApiPoll(eventLoop, tvp);

……

}

四、数据落地处理

redis-3.2.3\src\rdb.c

/* Save the DB on disk. Return C_ERR on error, C_OK on success. */

int rdbSave(char *filename) {

    char tmpfile[256];

    char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */

    FILE *fp;

    rio rdb;

    int error = 0;

 

    snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());

    fp = fopen(tmpfile,"w");

    if (!fp) {

        char *cwdp = getcwd(cwd,MAXPATHLEN);

        serverLog(LL_WARNING,

            "Failed opening the RDB file %s (in server root dir %s) "

            "for saving: %s",

            filename,

            cwdp ? cwdp : "unknown",

            strerror(errno));

        return C_ERR;

    }

 

    rioInitWithFile(&rdb,fp);

    if (rdbSaveRio(&rdb,&error) == C_ERR) {

        errno = error;

        goto werr;

    }

 

    /* Make sure data will not remain on the OS's output buffers */

    if (fflush(fp) == EOF) goto werr;

    if (fsync(fileno(fp)) == -1) goto werr;

    if (fclose(fp) == EOF) goto werr;

 

    /* Use RENAME to make sure the DB file is changed atomically only

     * if the generate DB file is ok. */

    if (rename(tmpfile,filename) == -1) {

        char *cwdp = getcwd(cwd,MAXPATHLEN);

        serverLog(LL_WARNING,

            "Error moving temp DB file %s on the final "

            "destination %s (in server root dir %s): %s",

            tmpfile,

            filename,

            cwdp ? cwdp : "unknown",

            strerror(errno));

        unlink(tmpfile);

        return C_ERR;

    }

 

    serverLog(LL_NOTICE,"DB saved on disk");

    server.dirty = 0;

    server.lastsave = time(NULL);

    server.lastbgsave_status = C_OK;

    return C_OK;

 

werr:

    serverLog(LL_WARNING,"Write error saving DB on disk: %s", strerror(errno));

    fclose(fp);

    unlink(tmpfile);

    return C_ERR;

}

上一篇:IfcDistributionChamberElement


下一篇:Java中四种引用的介绍