Redis:del/unlink 命令源码解析

Redis(四):del/unlink 命令源码解析

本篇我们来看一下删除过程。

对于客户端来说,删除操作无需区分何种数据类型,只管进行 del 操作即可。

零、删除命令 del 的定义

主要有两个: del/unlink, 差别是 unlink 速度会更快, 因为其使用了异步删除优化模式, 其定义如下:

// 标识只有一个 w, 说明就是一个普通的写操作,没啥好说的
{"del",delCommand,-2,"w",0,NULL,1,-1,1,0,0}
// 标识为 wF, 说明它是一个快速写的操作,其实就是有一个异步优化的过程,稍后详解
{"unlink",unlinkCommand,-2,"wF",0,NULL,1,-1,1,0,0}

一、delCommand

delCommand 的作用就是直接删除某个 key 的数据,释放内存即可。

// db.c, del 命令处理
void delCommand(client *c) {

// 同步删除
delGenericCommand(c,0);

}

/ This command implements DEL and LAZYDEL. /
void delGenericCommand(client *c, int lazy) {

int numdel = 0, j;

for (j = 1; j < c->argc; j++) {
    // 自动过期数据清理
    expireIfNeeded(c->db,c->argv[j]);
    // 此处分同步删除和异步删除, 主要差别在于对于复杂数据类型的删除方面,如hash,list,set...
    // 针对 string 的删除是完全一样的
    int deleted  = lazy ? dbAsyncDelete(c->db,c->argv[j]) :
                          dbSyncDelete(c->db,c->argv[j]);
    // 写命令的传播问题
    if (deleted) {
        signalModifiedKey(c->db,c->argv[j]);
        notifyKeyspaceEvent(NOTIFY_GENERIC,
            "del",c->argv[j],c->db->id);
        server.dirty++;
        numdel++;
    }
}
// 响应删除数据量, 粒度到 key 级别
addReplyLongLong(c,numdel);

}

框架代码一看即明,只是相比于我们普通的删除是多了不少事情。否则也不存在设计了。

二、unlinkCommand

如下,其实和del是一毛一样的,仅是变化了一个 lazy 标识而已。

// db.c, unlink 删除处理
void unlinkCommand(client *c) {

// 与 del 一致,只是 lazy 标识不一样
delGenericCommand(c,1);

}

三、删除数据过程详解

删除数据分同步和异步两种实现方式,道理都差不多,只是一个是后台删一个是前台删。我们分别来看看。

  1. 同步删除 dbSyncDelete

同步删除很简单,只要把对应的key删除,val删除就行了,如果有内层引用,则进行递归删除即可。

// db.c, 同步删除数据
/ Delete a key, value, and associated expiration entry if any, from the DB /
int dbSyncDelete(redisDb db, robj key) {

/* Deleting an entry from the expires dict will not free the sds of
 * the key, because it is shared with the main dictionary. */
// 首先从 expires 队列删除,然后再从 db->dict 中删除
if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
if (dictDelete(db->dict,key->ptr) == DICT_OK) {
    if (server.cluster_enabled) slotToKeyDel(key);
    return 1;
} else {
    return 0;
}

}
// dict.c, 如上, 仅仅是 dictDelete() 就可以了,所以真正的删除动作是在 dict 中实现的。
int dictDelete(dict ht, const void key) {

// nofree: 0, 即要求释放内存
return dictGenericDelete(ht,key,0);

}
// dict.c, nofree: 0:要释放相应的val内存, 1:不释放相应val内存只删除key
/ Search and remove an element /
static int dictGenericDelete(dict d, const void key, int nofree)
{

unsigned int h, idx;
dictEntry *he, *prevHe;
int table;

if (d->ht[0].size == 0) return DICT_ERR; /* d->ht[0].table is NULL */
if (dictIsRehashing(d)) _dictRehashStep(d);
h = dictHashKey(d, key);
// ht[0] 和 ht[1] 如有可能都进行扫描
for (table = 0; table <= 1; table++) {
    idx = h & d->ht[table].sizemask;
    he = d->ht[table].table[idx];
    prevHe = NULL;
    while(he) {
        if (dictCompareKeys(d, key, he->key)) {
            /* Unlink the element from the list */
            if (prevHe)
                prevHe->next = he->next;
            else
                d->ht[table].table[idx] = he->next;
            // no nofree, 就是要 free 内存咯
            if (!nofree) {
                // 看起来 key/value 需要单独释放内存哦
                dictFreeKey(d, he);
                dictFreeVal(d, he);
            }
            zfree(he);
            d->ht[table].used--;
            return DICT_OK;
        }
        prevHe = he;
        he = he->next;
    }
    // 如果没有进行 rehashing, 只需扫描0就行了
    if (!dictIsRehashing(d)) break;
}
return DICT_ERR; /* not found */

}

其实对于有GC收集器的语言来说,根本不用关注内存的释放问题,自有后台工具处理,然而对于 c 语言这种级别语言,则是需要自行关注内存的。这也是本文存在的意义,不然对于一个 hash 表的元素删除操作,如上很难吗?并没有。

下面,我们就来看看 redis 是如何具体释放内存的吧。

// dict.h, 释放key, value 的逻辑也是非常简单,用一个宏就定义好了
// 释放依赖于 keyDestructor, valDestructor

define dictFreeKey(d, entry) \

if ((d)->type->keyDestructor) \
    (d)->type->keyDestructor((d)->privdata, (entry)->key)

define dictFreeVal(d, entry) \

if ((d)->type->valDestructor) \
    (d)->type->valDestructor((d)->privdata, (entry)->v.val)

// 所以,我们有必要回去看看 key,value 的析构方法
// 而这,又依赖于具体的数据类型,也就是你在 setXXX 的时候用到的数据类型
// 我们看一下这个 keyDestructor,valDestructor 初始化的样子
// server.c kv的析构函数定义
/ Db->dict, keys are sds strings, vals are Redis objects. /
dictType dbDictType = {

dictSdsHash,                /* hash function */
NULL,                       /* key dup */
NULL,                       /* val dup */
dictSdsKeyCompare,          /* key compare */
dictSdsDestructor,          /* key destructor */
dictObjectDestructor   /* val destructor */

};

// 1. 先看看 key destructor, key 的释放
// server.c, 直接调用 sds 提供的服务即可
void dictSdsDestructor(void privdata, void val)
{

DICT_NOTUSED(privdata);
// sds 直接释放key就行了
sdsfree(val);

}
// sds.c, 真正释放 value 内存
/ Free an sds string. No operation is performed if 's' is NULL. /
void sdsfree(sds s) {

if (s == NULL) return;
// zfree, 确实很简单嘛, 因为 sds 是连续的内存空间,直接使用系统提供的方法即可删除
s_free((char*)s-sdsHdrSize(s[-1]));

}

// 2. value destructor 对value的释放, 如果说 key 一定是string格式的话,value可主不一定了,因为 redis提供丰富的数据类型呢
// server.c
void dictObjectDestructor(void privdata, void val)
{

DICT_NOTUSED(privdata);

if (val == NULL) return; /* Lazy freeing will set value to NULL. */
decrRefCount(val);

}
// 减少 value 的引用计数
void decrRefCount(robj *o) {

if (o->refcount == 1) {
    switch(o->type) {
        // string 类型
        case OBJ_STRING: freeStringObject(o); break;
        // list 类型
        case OBJ_LIST: freeListObject(o); break;
        // set 类型
        case OBJ_SET: freeSetObject(o); break;
        // zset 类型
        case OBJ_ZSET: freeZsetObject(o); break;
        // hash 类型
        case OBJ_HASH: freeHashObject(o); break;
        default: serverPanic("Unknown object type"); break;
    }
    zfree(o);
} else {
    if (o->refcount <= 0) serverPanic("decrRefCount against refcount <= 0");
    if (o->refcount != OBJ_SHARED_REFCOUNT) o->refcount--;
}

}

额,可以看出,对key的释放自然是简单之极。而对 value 则谨慎许多,首先它表面上只对引用做减操作。只有发只剩下1个引用即只有当前引用的情况下,本次释放就是最后一次释放,所以才会回收内存。

// 在介绍不同数据类型的内存释放前,我们可以先来看下每个元素的数据结构
// dict.h
typedef struct dictEntry {

// 存储 key 字段内容
void *key;
// 用一个联合体存储value
union {
    // 存储数据时使用 *val 存储
    void *val;
    uint64_t u64;
    // 存储过期时间时使用该字段
    int64_t s64;
    // 存储 score 时使用
    double d;
} v;
// 存在hash冲突时,作链表使用
struct dictEntry *next;

} dictEntry;

// 1. string 类型的释放
// object.c
void freeStringObject(robj *o) {

// 直接调用 sds服务释放
if (o->encoding == OBJ_ENCODING_RAW) {
    sdsfree(o->ptr);
}

}

// 2. list 类型的释放
// object.c
void freeListObject(robj *o) {

switch (o->encoding) {
case OBJ_ENCODING_QUICKLIST:
    quicklistRelease(o->ptr);
    break;
default:
    serverPanic("Unknown list encoding type");
}

}
// quicklist.c
/ Free entire quicklist. /
void quicklistRelease(quicklist *quicklist) {

unsigned long len;
quicklistNode *current, *next;

current = quicklist->head;
len = quicklist->len;
// 链表依次迭代就可以释放完成了
while (len--) {
    next = current->next;
    // 释放list具体值
    zfree(current->zl);
    quicklist->count -= current->count;
    // 释放list对象
    zfree(current);

    quicklist->len--;
    current = next;
}
zfree(quicklist);

}

// 3. set 类型的释放
// object.c, set 分两种类型, ht, intset
void freeSetObject(robj *o) {

switch (o->encoding) {
case OBJ_ENCODING_HT:
    // hash 类型则需要删除每个 hash 的 kv
    dictRelease((dict*) o->ptr);
    break;
case OBJ_ENCODING_INTSET:
    // intset 直接释放
    zfree(o->ptr);
    break;
default:
    serverPanic("Unknown set encoding type");
}

}
// dict.c,
/ Clear & Release the hash table /
void dictRelease(dict *d)
{

// ht[0],ht[1] 依次清理
_dictClear(d,&d->ht[0],NULL);
_dictClear(d,&d->ht[1],NULL);
zfree(d);

}
// dict.c,
/ Destroy an entire dictionary /
int _dictClear(dict d, dictht ht, void(callback)(void *)) {

unsigned long i;

/* Free all the elements */
for (i = 0; i < ht->size && ht->used > 0; i++) {
    dictEntry *he, *nextHe;

    if (callback && (i & 65535) == 0) callback(d->privdata);
    // 元素为空,hash未命中,但只要 used > 0, 代表就还有需要删除的元素存在
    // 其实对于只有少数几个元素的情况下,这个效率就呵呵了
    if ((he = ht->table[i]) == NULL) continue;
    while(he) {
        nextHe = he->next;
        // 这里的释放 kv 逻辑和前面是一致的
        // 看起来像是递归,其实不然,因为redis不存在数据类型嵌套问题,比如 hash下存储hash, 所以不会存在递归
        // 具体结构会在后续解读到
        dictFreeKey(d, he);
        dictFreeVal(d, he);
        zfree(he);
        ht->used--;
        he = nextHe;
    }
}
/* Free the table and the allocated cache structure */
zfree(ht->table);
/* Re-initialize the table */
_dictReset(ht);
return DICT_OK; /* never fails */

}

// 4. hash 类型的释放
// object.c, hash和set其实是很相似的,代码也做了大量的复用
void freeHashObject(robj *o) {

switch (o->encoding) {
case OBJ_ENCODING_HT:
    // ht 形式与set一致
    dictRelease((dict*) o->ptr);
    break;
case OBJ_ENCODING_ZIPLIST:
    // ziplist 直接释放
    zfree(o->ptr);
    break;
default:
    serverPanic("Unknown hash encoding type");
    break;
}

}

// 5. zset 类型的释放
// object.c, zset 的存储形式与其他几个
void freeZsetObject(robj *o) {

zset *zs;
switch (o->encoding) {
case OBJ_ENCODING_SKIPLIST:
    zs = o->ptr;
    // 释放dict 数据, ht 0,1 的释放
    dictRelease(zs->dict);
    // 释放skiplist 数据, 主要看下这个
    zslFree(zs->zsl);
    zfree(zs);
    break;
case OBJ_ENCODING_ZIPLIST:
    zfree(o->ptr);
    break;
default:
    serverPanic("Unknown sorted set encoding");
}

}
// t_zset.c, 释放跳表数据
/ Free a whole skiplist. /
void zslFree(zskiplist *zsl) {

zskiplistNode *node = zsl->header->level[0].forward, *next;

zfree(zsl->header);
while(node) {
    // 基于第0层数据释放,也基于第0层做迭代,直到删除完成
    // 因为其他层数据都是引用的第0层的数据,所以释放时无需关注
    next = node->level[0].forward;
    zslFreeNode(node);
    node = next;
}
zfree(zsl);

}
// t_zset 也很简单,只是把 node.ele 释放掉,再把自身释放到即可
// 这样的删除方式依赖于其存储结构,咱们后续再聊
/* Free the specified skiplist node. The referenced SDS string representation

  • of the element is freed too, unless node->ele is set to NULL before calling
  • this function. */
  1. zslFreeNode(zskiplistNode *node) {
    sdsfree(node->ele);
    zfree(node);
    }
  1. 异步删除过程

异步删除按理说会更复杂,更有意思些。只不过我们前面已经把核心的东西撸了个遍,这剩下的也不多了。

// lazyfree.c,
int dbAsyncDelete(redisDb db, robj key) {

/* Deleting an entry from the expires dict will not free the sds of
 * the key, because it is shared with the main dictionary. */
if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);

/* If the value is composed of a few allocations, to free in a lazy way
 * is actually just slower... So under a certain limit we just free
 * the object synchronously. */
dictEntry *de = dictFind(db->dict,key->ptr);
if (de) {
    robj *val = dictGetVal(de);
    size_t free_effort = lazyfreeGetFreeEffort(val);

    /* If releasing the object is too much work, let's put it into the
     * lazy free list. */
    // 其实异步方法与同步方法的差别在这,即要求 删除的元素影响须大于某阀值(64)
    // 否则按照同步方式直接删除,因为那样代价更小
    if (free_effort > LAZYFREE_THRESHOLD) {
        // 异步释放+1,原子操作
        atomicIncr(lazyfree_objects,1,&lazyfree_objects_mutex);
        // 将 value 的释放添加到异步线程队列中去,后台处理, 任务类型为 异步释放内存
        bioCreateBackgroundJob(BIO_LAZY_FREE,val,NULL,NULL);
        // 设置val为NULL, 以便在外部进行删除时忽略释放value相关内存
        dictSetVal(db->dict,de,NULL);
    }
}

/* Release the key-val pair, or just the key if we set the val
 * field to NULL in order to lazy free it later. */
if (dictDelete(db->dict,key->ptr) == DICT_OK) {
    if (server.cluster_enabled) slotToKeyDel(key);
    return 1;
} else {
    return 0;
}

}
// bio.c, 添加异步任务到线程中, 类型由type决定,线程安全地添加
// 然后嘛,后台线程就不会停地运行了任务了
void bioCreateBackgroundJob(int type, void arg1, void arg2, void *arg3) {

struct bio_job *job = zmalloc(sizeof(*job));

job->time = time(NULL);
job->arg1 = arg1;
job->arg2 = arg2;
job->arg3 = arg3;
// 上锁操作
pthread_mutex_lock(&bio_mutex[type]);
listAddNodeTail(bio_jobs[type],job);
bio_pending[type]++;
// 唤醒任务线程
pthread_cond_signal(&bio_newjob_cond[type]);
pthread_mutex_unlock(&bio_mutex[type]);

}
// bio.c, 后台线程任务框架,总之还是有事情可做了。
void bioProcessBackgroundJobs(void arg) {

struct bio_job *job;
unsigned long type = (unsigned long) arg;
sigset_t sigset;

/* Check that the type is within the right interval. */
if (type >= BIO_NUM_OPS) {
    serverLog(LL_WARNING,
        "Warning: bio thread started with wrong type %lu",type);
    return NULL;
}

/* Make the thread killable at any time, so that bioKillThreads()
 * can work reliably. */
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);

pthread_mutex_lock(&bio_mutex[type]);
/* Block SIGALRM so we are sure that only the main thread will
 * receive the watchdog signal. */
sigemptyset(&sigset);
sigaddset(&sigset, SIGALRM);
if (pthread_sigmask(SIG_BLOCK, &sigset, NULL))
    serverLog(LL_WARNING,
        "Warning: can't mask SIGALRM in bio.c thread: %s", strerror(errno));
// 任务一直运行
while(1) {
    listNode *ln;

    /* The loop always starts with the lock hold. */
    if (listLength(bio_jobs[type]) == 0) {
        // 注意此处将会释放锁哟,以便外部可以添加任务进来
        pthread_cond_wait(&bio_newjob_cond[type],&bio_mutex[type]);
        continue;
    }
    /* Pop the job from the queue. */
    ln = listFirst(bio_jobs[type]);
    job = ln->value;
    /* It is now possible to unlock the background system as we know have
     * a stand alone job structure to process.*/
    pthread_mutex_unlock(&bio_mutex[type]);

    /* Process the job accordingly to its type. */
    if (type == BIO_CLOSE_FILE) {
        close((long)job->arg1);
    } else if (type == BIO_AOF_FSYNC) {
        aof_fsync((long)job->arg1);
    } 
    // 也就是这玩意了,会去处理提交过来的任务
    else if (type == BIO_LAZY_FREE) {
        /* What we free changes depending on what arguments are set:
         * arg1 -> free the object at pointer.
         * arg2 & arg3 -> free two dictionaries (a Redis DB).
         * only arg3 -> free the skiplist. */
        // 本文介绍的删除value形式,用第一种情况
        if (job->arg1)
            lazyfreeFreeObjectFromBioThread(job->arg1);
        else if (job->arg2 && job->arg3)
            lazyfreeFreeDatabaseFromBioThread(job->arg2,job->arg3);
        else if (job->arg3)
            lazyfreeFreeSlotsMapFromBioThread(job->arg3);
    } else {
        serverPanic("Wrong job type in bioProcessBackgroundJobs().");
    }
    zfree(job);

    /* Unblock threads blocked on bioWaitStepOfType() if any. */
    // 唤醒所有相关等待线程
    pthread_cond_broadcast(&bio_step_cond[type]);

    /* Lock again before reiterating the loop, if there are no longer
     * jobs to process we'll block again in pthread_cond_wait(). */
    pthread_mutex_lock(&bio_mutex[type]);
    listDelNode(bio_jobs[type],ln);
    bio_pending[type]--;
}

}

// lazyfree.c, 和同步删除一致了
/* Release objects from the lazyfree thread. It's just decrRefCount()

  • updating the count of objects to release. */
  1. lazyfreeFreeObjectFromBioThread(robj *o) {
    decrRefCount(o);
    atomicDecr(lazyfree_objects,1,&lazyfree_objects_mutex);
    }

从此处redis异步处理过程,我们可以看到,redis并不是每次进入异步时都进行异步操作,而是在必要的时候才会进行。这也提示我们,不要为了异步而异步,而是应该计算利弊。

如此,整个 del/unlink 的过程就完成了。总体来说,删除都是基于hash的简单remove而已,唯一有点难度是对内存的回收问题,这其实就是一个简单的使用引用计数器算法实现的垃圾回收器应该做的事而已。勿须多言。具体过程需依赖于数据的存储结构,主要目的自然是递归释放空间,避免内存泄漏了。

原文地址https://www.cnblogs.com/yougewe/p/12231419.html

上一篇:redis4.0、codis、阿里云redis 3种redis集群对比分析


下一篇:一文带你迅速看懂Kafka可观测优秀实践