redis6.0.5之zset阅读笔记4--压缩列表(ziplist)排序集相关API

***********************************************************************************************
获取压缩链表中节点的数值
double zzlGetScore(unsigned char *sptr) {
    unsigned char *vstr;
    unsigned int vlen;
    long long vlong;
    char buf[128];
    double score;

    serverAssert(sptr != NULL);
    serverAssert(ziplistGet(sptr,&vstr,&vlen,&vlong));

    if (vstr) {  如果是字符串编码,需要转化为数值
        memcpy(buf,vstr,vlen);
        buf[vlen] = '\0';
        score = strtod(buf,NULL);
    } else {
        score = vlong;  是数值就直接赋值
    }

    return score;
}
***********************************************************************************************
/* Return a ziplist element as an SDS string. */
以SDS字符串形式返回一个压缩链表的元素
sds ziplistGetObject(unsigned char *sptr) {
    unsigned char *vstr;
    unsigned int vlen;
    long long vlong;

    serverAssert(sptr != NULL);
    serverAssert(ziplistGet(sptr,&vstr,&vlen,&vlong));

    if (vstr) {
        return sdsnewlen((char*)vstr,vlen);  新建字符串
    } else {
        return sdsfromlonglong(vlong); 将数值转化为字符串
    }
}
***********************************************************************************************
/* Compare element in sorted set with given element. */
有序集合中的元素和给定的元素 进行比较
int zzlCompareElements(unsigned char *eptr, unsigned char *cstr, unsigned int clen) {
    unsigned char *vstr;
    unsigned int vlen;
    long long vlong;
    unsigned char vbuf[32];
    int minlen, cmp;

    serverAssert(ziplistGet(eptr,&vstr,&vlen,&vlong));
    if (vstr == NULL) {
        /* Store string representation of long long in buf. */
        vlen = ll2string((char*)vbuf,sizeof(vbuf),vlong);  转为字符串
        vstr = vbuf;
    }

    minlen = (vlen < clen) ? vlen : clen; 取两个字符短的长度
    cmp = memcmp(vstr,cstr,minlen);比较字符串
    if (cmp == 0) return vlen-clen; 如果在同样长度的情况下一样,那么只要谁长就是谁大
    return cmp;
}
***********************************************************************************************
unsigned int zzlLength(unsigned char *zl) {
    return ziplistLen(zl)/2;  因为需要用两个实体来表示一个元素  数值  和  字符串 ,所以需要除以2
}
***********************************************************************************************
/* Move to next entry based on the values in eptr and sptr. Both are set to
 * NULL when there is no next entry. */
基于指针eptr 和 sptr指向的内容 移动到下个实体。没有下个实体,eptr 和 sptr都指向空
void zzlNext(unsigned char *zl, unsigned char **eptr, unsigned char **sptr) {
    unsigned char *_eptr, *_sptr;
    serverAssert(*eptr != NULL && *sptr != NULL);

    _eptr = ziplistNext(zl,*sptr);      获取字符串
    if (_eptr != NULL) {
        _sptr = ziplistNext(zl,_eptr);  获取score,即数值
        serverAssert(_sptr != NULL);
    } else {
        /* No next entry. */ 没有下个实体了
        _sptr = NULL;
    }

    *eptr = _eptr; 给指针指向的内容赋值
    *sptr = _sptr;
}
***********************************************************************************************
/* Move to the previous entry based on the values in eptr and sptr. Both are
 * set to NULL when there is no next entry. */
基于指针eptr 和 sptr指向的内容 移动到前一个实体。没有前一个实体,eptr 和 sptr都指向空
void zzlPrev(unsigned char *zl, unsigned char **eptr, unsigned char **sptr) {
    unsigned char *_eptr, *_sptr;
    serverAssert(*eptr != NULL && *sptr != NULL);

    _sptr = ziplistPrev(zl,*eptr);  获取score,即数值
    if (_sptr != NULL) {
        _eptr = ziplistPrev(zl,_sptr); 获取字符串
        serverAssert(_eptr != NULL);
    } else {
        /* No previous entry. */
        _eptr = NULL;
    }

    *eptr = _eptr;
    *sptr = _sptr;
}
***********************************************************************************************
/* Returns if there is a part of the zset is in range. Should only be used
 * internally by zzlFirstInRange and zzlLastInRange. */
返回zset的一部分是不是在给定区间中(和给定区间是否有交集)。
只能被函数zzlFirstInRange 和 zzlLastInRange 内部使用
int zzlIsInRange(unsigned char *zl, zrangespec *range) {
    unsigned char *p;
    double score;

    /* Test for ranges that will always be empty. */ 确认给定区间不为空
    if (range->min > range->max ||
            (range->min == range->max && (range->minex || range->maxex)))
        return 0;

    p = ziplistIndex(zl,-1); /* Last score. */  最后一个实体  里面是数值
    if (p == NULL) return 0; /* Empty sorted set */ 最后一个元素的空,那就是空集
    score = zzlGetScore(p);  获取具体的数值
    if (!zslValueGteMin(score,range))   是否大于区间的最小值
        return 0;

    p = ziplistIndex(zl,1); /* First score. */ 第一个数值实体
    serverAssert(p != NULL);
    score = zzlGetScore(p); 获取数值
    if (!zslValueLteMax(score,range))  最小值是否大于区间的最大值
        return 0;

    return 1;
}
***********************************************************************************************
/* Find pointer to the first element contained in the specified range.
 * Returns NULL when no element is contained in the range. */
找到指向  压缩链表 中 第一个在给定区间内 元素 的指针。 返回空如果没有元素在给定区间内。
unsigned char *zzlFirstInRange(unsigned char *zl, zrangespec *range) {
    unsigned char *eptr = ziplistIndex(zl,0), *sptr;
    double score;

    /* If everything is out of range, return early. */ 如果没有交集,尽早返回空
    if (!zzlIsInRange(zl,range)) return NULL;

    while (eptr != NULL) {
        sptr = ziplistNext(zl,eptr);  获取数值
        serverAssert(sptr != NULL);

        score = zzlGetScore(sptr);
        if (zslValueGteMin(score,range)) { 确认是否大于给定区间最小值
            /* Check if score <= max. */
            if (zslValueLteMax(score,range)) 确认是否大于给定区间最大值
                return eptr; 满足条件就返回
            return NULL;
        }

        /* Move to next element. */ 当小于给定区间最小值时,继续向下一个比较
        eptr = ziplistNext(zl,sptr); 跳过一个字符串
    }

    return NULL;
}
***********************************************************************************************
/* Find pointer to the last element contained in the specified range.
 * Returns NULL when no element is contained in the range. */
找到指向  压缩链表 中 最后一个在给定区间内 元素 的指针。 返回空如果没有元素在给定区间内。
unsigned char *zzlLastInRange(unsigned char *zl, zrangespec *range) {
    unsigned char *eptr = ziplistIndex(zl,-2), *sptr;
    double score;

    /* If everything is out of range, return early. */
    if (!zzlIsInRange(zl,range)) return NULL;

    while (eptr != NULL) {
        sptr = ziplistNext(zl,eptr);  获取下一个元素,指向数值的指针
        serverAssert(sptr != NULL);

        score = zzlGetScore(sptr);  获取数值
        if (zslValueLteMax(score,range)) {  如果压缩链表最后一个数值 小于给定区间最大值
            /* Check if score >= min. */
            if (zslValueGteMin(score,range))  同时满足大于 给定区间最小值
                return eptr; 那么就是我们要寻找的元素
            return NULL;
        }

        /* Move to previous element by moving to the score of previous element.
         * When this returns NULL, we know there also is no element. */
        sptr = ziplistPrev(zl,eptr); 回退到前面的 指向数值的指针
        if (sptr != NULL)
            serverAssert((eptr = ziplistPrev(zl,sptr)) != NULL);  
            回退到前面的指向字符串的指针。理论上是不能为空的,因为是成对出现的。
        else
            eptr = NULL;
    }

    return NULL;
}
***********************************************************************************************
int zslLexValueGteMin(sds value, zlexrangespec *spec) {
    return spec->minex ?
        (sdscmplex(value,spec->min) > 0) :
        (sdscmplex(value,spec->min) >= 0);
}

int zslLexValueLteMax(sds value, zlexrangespec *spec) {
    return spec->maxex ?
        (sdscmplex(value,spec->max) < 0) :
        (sdscmplex(value,spec->max) <= 0);
}

int zzlLexValueGteMin(unsigned char *p, zlexrangespec *spec) {
    sds value = ziplistGetObject(p);  返回压缩链表的值 
    int res = zslLexValueGteMin(value,spec); 是否大于给定区间的最小值
    sdsfree(value); 释放字符串,免得引起内存泄漏
    return res;
}

int zzlLexValueLteMax(unsigned char *p, zlexrangespec *spec) {
    sds value = ziplistGetObject(p);
    int res = zslLexValueLteMax(value,spec); 是否小于给定区间的最大值
    sdsfree(value);
    return res;
}
***********************************************************************************************
/* Returns if there is a part of the zset is in range. Should only be used
 * internally by zzlFirstInRange and zzlLastInRange. */
返回zset的一部分是不是在给定区间中(和给定区间是否有交集)。
只能被函数zzlFirstInLexRange 和 zzlLastInLexRange 内部使用(这里两个函数名称是作者的笔误)
int zzlIsInLexRange(unsigned char *zl, zlexrangespec *range) {
    unsigned char *p;

    /* Test for ranges that will always be empty. */  确认区间非空
    int cmp = sdscmplex(range->min,range->max);
    if (cmp > 0 || (cmp == 0 && (range->minex || range->maxex)))
        return 0;

    p = ziplistIndex(zl,-2); /* Last element. */ 最后一个sds字符串
    if (p == NULL) return 0;
    if (!zzlLexValueGteMin(p,range)) 字典序比较 压缩链表最大值 是否小于给定区间最小值
        return 0;

    p = ziplistIndex(zl,0); /* First element. */第一个sds字符串
    serverAssert(p != NULL);
    if (!zzlLexValueLteMax(p,range))字典序比较 压缩链表最小值 是否大于给定区间最大值
        return 0;

    return 1;
}
***********************************************************************************************
/* Find pointer to the first element contained in the specified lex range.
 * Returns NULL when no element is contained in the range. */
找到指向  压缩链表 中 第一个在给定字典序区间内 元素 的指针。 返回空如果没有元素在给定区间内。
unsigned char *zzlFirstInLexRange(unsigned char *zl, zlexrangespec *range) {
    unsigned char *eptr = ziplistIndex(zl,0), *sptr;

    /* If everything is out of range, return early. */ 没有交集,尽早返回空
    if (!zzlIsInLexRange(zl,range)) return NULL;

    while (eptr != NULL) {
        if (zzlLexValueGteMin(eptr,range)) {  是否大于给定字典序区间最小值
            /* Check if score <= max. */
            if (zzlLexValueLteMax(eptr,range))  如果同时满足小于给定字典序区间最大值,就满足条件了,直接返回
                return eptr;
            return NULL;
        }

        /* Move to next element. */ 小于给定字典序区间最小值,继续查找下一个
        sptr = ziplistNext(zl,eptr); /* This element score. Skip it. */ 跳过数值元素
        serverAssert(sptr != NULL); 理论上是不为空的,实际检测一下,防止出现bug
        eptr = ziplistNext(zl,sptr); /* Next element. */  下个元素
    }

    return NULL;
}
***********************************************************************************************
/* Find pointer to the last element contained in the specified lex range.
 * Returns NULL when no element is contained in the range. */
找到指向  压缩链表 中 最后一个在给定字典序区间内 元素 的指针。 返回空如果没有元素在给定区间内。
unsigned char *zzlLastInLexRange(unsigned char *zl, zlexrangespec *range) {
    unsigned char *eptr = ziplistIndex(zl,-2), *sptr;

    /* If everything is out of range, return early. */
    if (!zzlIsInLexRange(zl,range)) return NULL;

    while (eptr != NULL) {
        if (zzlLexValueLteMax(eptr,range)) {  最后一个元素 小于给定字典序区间最大值
            /* Check if score >= min. */
            if (zzlLexValueGteMin(eptr,range))  同时 大于给定字典序区间最小值,满足条件,直接返回
                return eptr;
            return NULL;
        }

        /* Move to previous element by moving to the score of previous element.
         * When this returns NULL, we know there also is no element. */
移动到前一个元素通过移动到前一个元素的数值部分。
当数值部分是空的,我们就知道前面已经没有元素了。
        sptr = ziplistPrev(zl,eptr);  移动到前一个的数值部分
        if (sptr != NULL) 不为空,继续移动到字符串部分
            serverAssert((eptr = ziplistPrev(zl,sptr)) != NULL);
        else
            eptr = NULL;
    }

    return NULL;
}
***********************************************************************************************
unsigned char *zzlFind(unsigned char *zl, sds ele, double *score) {
    unsigned char *eptr = ziplistIndex(zl,0), *sptr;

    while (eptr != NULL) {  从压缩链表头节点开始查找
        sptr = ziplistNext(zl,eptr);  获取数值
        serverAssert(sptr != NULL);

        if (ziplistCompare(eptr,(unsigned char*)ele,sdslen(ele))) {  比较sds字符串
            /* Matching element, pull out score. */ 匹配字符串,获取数值
            if (score != NULL) *score = zzlGetScore(sptr);
            return eptr;
        }

        /* Move to next element. */ 移动到下个字符串
        eptr = ziplistNext(zl,sptr);
    }
    return NULL;
}
***********************************************************************************************
/* Delete (element,score) pair from ziplist. Use local copy of eptr because we
 * don't want to modify the one given as argument. */
从压缩链表删除(元素,数值)对。使用指针eptr的本地变量,因为我们不希望修改传入的参数eptr
unsigned char *zzlDelete(unsigned char *zl, unsigned char *eptr) {
    unsigned char *p = eptr;

    /* TODO: add function to ziplist API to delete N elements from offset. */
    将来要做  增加从偏移量开始删除N个元素的函数
    zl = ziplistDelete(zl,&p);  删除字符串部分
    zl = ziplistDelete(zl,&p);  删除数值部分
    return zl;
}
***********************************************************************************************
unsigned char *zzlInsertAt(unsigned char *zl, unsigned char *eptr, sds ele, double score) {
    unsigned char *sptr;
    char scorebuf[128];
    int scorelen;
    size_t offset;

    scorelen = d2string(scorebuf,sizeof(scorebuf),score); 将double数值转化为字符串
    if (eptr == NULL) {
        zl = ziplistPush(zl,(unsigned char*)ele,sdslen(ele),ZIPLIST_TAIL); 在压缩链表尾部插入字符串实体
        zl = ziplistPush(zl,(unsigned char*)scorebuf,scorelen,ZIPLIST_TAIL); 在压缩链表尾部插入数值实体
    } else {
        /* Keep offset relative to zl, as it might be re-allocated. */ 记住压缩链表的偏移量,因为压缩链表可能被重新分配
        offset = eptr-zl;
        zl = ziplistInsert(zl,eptr,(unsigned char*)ele,sdslen(ele)); 在指针eptr指向的实体 插入新的字符串
        eptr = zl+offset;  定位到新插入实体的开始位置

        /* Insert score after the element. */ 在字符串后面插入数值实体
        serverAssert((sptr = ziplistNext(zl,eptr)) != NULL); 获取插入实体的位置
        zl = ziplistInsert(zl,sptr,(unsigned char*)scorebuf,scorelen);
    }
    return zl;
}
***********************************************************************************************
/* Insert (element,score) pair in ziplist. This function assumes the element is
 * not yet present in the list. */
在压缩链表中插入(字符串,数值)对。这个函数假设字符串没有在链表中出现过
unsigned char *zzlInsert(unsigned char *zl, sds ele, double score) {
    unsigned char *eptr = ziplistIndex(zl,0), *sptr;
    double s;

    while (eptr != NULL) {
        sptr = ziplistNext(zl,eptr); 获取数值指向指针
        serverAssert(sptr != NULL);
        s = zzlGetScore(sptr); 获取数值

        if (s > score) {  链表中实体的数值 大于 要插入实体的数值
            /* First element with score larger than score for element to be
             * inserted. This means we should take its spot in the list to
             * maintain ordering. */
             这个是第一个大于传入待插入数值的实体数值。
             意味着我们该在这个点插入新实体来保持链表顺序
            zl = zzlInsertAt(zl,eptr,ele,score); 插入字符串和数值
            break;
        } else if (s == score) { 相等的情况,需要比较字符串的大小
            /* Ensure lexicographical ordering for elements. */ 确认字符串的字典序大小
            if (zzlCompareElements(eptr,(unsigned char*)ele,sdslen(ele)) > 0) { 
            如果链表中的字符串比较大,那么就在这个位置插入新字符串
                zl = zzlInsertAt(zl,eptr,ele,score);
                break;
            }
        }

        /* Move to next element. */ 还是传入的参数大,继续下一个比较
        eptr = ziplistNext(zl,sptr);
    }

    /* Push on tail of list when it was not yet inserted. */ 
    如果没有找到合适的位置,意味传入参数 比链表中任何一个实体都要大,那么就添加在最后面
    if (eptr == NULL)
        zl = zzlInsertAt(zl,NULL,ele,score);
    return zl;
}
***********************************************************************************************
unsigned char *zzlDeleteRangeByScore(unsigned char *zl, zrangespec *range, unsigned long *deleted) {
    unsigned char *eptr, *sptr;
    double score;
    unsigned long num = 0;

    if (deleted != NULL) *deleted = 0;

    eptr = zzlFirstInRange(zl,range); 查找交集的第一个元素
    if (eptr == NULL) return zl; 

    /* When the tail of the ziplist is deleted, eptr will point to the sentinel
     * byte and ziplistNext will return NULL. */
     如果压缩链表的尾巴节点被删除,那么指针eptr就指向哨兵字节(即尾部字节FF),函数ziplistNext返回空
    while ((sptr = ziplistNext(zl,eptr)) != NULL) {
        score = zzlGetScore(sptr);
        if (zslValueLteMax(score,range)) { 数值小于给定区间的最大值
            /* Delete both the element and the score. */ 把字符串和数值两者全部删除
            zl = ziplistDelete(zl,&eptr);
            zl = ziplistDelete(zl,&eptr);
            num++;删除对数加1
        } else {
            /* No longer in range. */ 不在给定区间范围内,无需删除
            break;
        }
    }

    if (deleted != NULL) *deleted = num; 传入指针deleted非空,返回删除对数
    return zl;
}
***********************************************************************************************
删除给定字典序区间中的链表元素,分析完全同上个函数一样
unsigned char *zzlDeleteRangeByLex(unsigned char *zl, zlexrangespec *range, unsigned long *deleted) {
    unsigned char *eptr, *sptr;
    unsigned long num = 0;

    if (deleted != NULL) *deleted = 0;

    eptr = zzlFirstInLexRange(zl,range);  字典序 ,第一个交集元素
    if (eptr == NULL) return zl;

    /* When the tail of the ziplist is deleted, eptr will point to the sentinel
     * byte and ziplistNext will return NULL. */
     如果压缩链表的尾巴节点被删除,那么指针eptr就指向哨兵字节(即尾部字节FF),函数ziplistNext返回空
    while ((sptr = ziplistNext(zl,eptr)) != NULL) {
        if (zzlLexValueLteMax(eptr,range)) {  字典序 小于给定区间的最大值
            /* Delete both the element and the score. */
            zl = ziplistDelete(zl,&eptr);
            zl = ziplistDelete(zl,&eptr);
            num++;
        } else {
            /* No longer in range. */
            break;
        }
    }

    if (deleted != NULL) *deleted = num;
    return zl;
}
***********************************************************************************************
/* Delete all the elements with rank between start and end from the skiplist.这里笔误,应为ziplist
 * Start and end are inclusive. Note that start and end need to be 1-based */
删除压缩链表中所有位于从开始位置到结束位置的节点。
包括开始和结束.注意开始和结束位置都是基于1的(至少是1,并且都是1的整数倍)
unsigned char *zzlDeleteRangeByRank(unsigned char *zl, unsigned int start, unsigned int end, unsigned long *deleted) {
    unsigned int num = (end-start)+1;  删除的对数
    if (deleted) *deleted = num;
    zl = ziplistDeleteRange(zl,2*(start-1),2*num);  从2*(start-1) 开始, 后面连续删除num对,即2*num个实体
    return zl;
}
***********************************************************************************************

 

上一篇:NoSQL之Redis对zset(有序集合)数据类型的操作


下一篇:RedisTemplate与zset