我们再来学习如何从跳跃表中查询数据,跳跃表本质上是一个链表,但它允许我们像数组一样定位某个索引区间内的节点,并且与数组不同的是,跳跃表允许我们将头节点L0层的前驱节点(即跳跃表分值最小的节点)zsl->header.level[0].forward当成索引0的节点,尾节点zsl->tail(跳跃表分值最大的节点)当成索引zsl->length-1的节点,索引按分值从小到大递增查询;也允许我们将尾节点当成索引0的节点,头节点L0层的前驱节点当做索引zsl->length-1的节点,索引按分值从大到小递增查询。当我们调用下面的方法按照索引区间来查询时,会把我们的索引转换成跨度,然后查找落在跨度的第一个节点,之后根据reverse(逆向状态)决定是要正向查询还是逆向查询。
假设我们要进行正向查询(即:索引按分值从小到大递增查询),给定的索引区间是[0,2],那么我们要找到跨度为1的节点,然后从跨度为1的节点L0层逐个递进,直到停留在跨度为3的节点,头节点L0层的前驱节点zsl->header.level[0].forward在跳跃表的索引为0,跨度为1,在跨度为1的节点从L0层逐个递进,一直递进到跨度为3的节点,这样便完成了索引区间[0,2]的查询。如果我们要进行逆向查询(即:索引按分值从大到小递增查询),索引区间依旧是[0,2],那么我们要找到跨度为跨度为zsl->length-0=zsl->length的节点,那自然是尾节点,找到跨度区间的第一个节点后,我们通过backward指针逐个后退,一直后退到跨度为zsl->length-2的节点,如此便完成查询。
void zrangeGenericCommand(client *c, int reverse) {
robj *key = c->argv[1];
robj *zobj;
int withscores = 0;
long start;
long end;
long llen;
long rangelen;
//读取起始索引和终止索引,如果存在一个索引读取失败,则退出
if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != C_OK) ||
(getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != C_OK))
return;
//判断是否要返回分值
if (c->argc == 5 && !strcasecmp(c->argv[4]->ptr, "withscores")) {
withscores = 1;
} else if (c->argc >= 5) {
addReply(c, shared.syntaxerr);
return;
}
//判断key是否存在,如果不存在则退出,如果存在但类型不为ZSET也退出。
if ((zobj = lookupKeyReadOrReply(c, key, shared.emptyarray)) == NULL
|| checkType(c, zobj, OBJ_ZSET))
return;
/*
* Sanitize indexes.
* 审查索引,这里主要针对传入索引为负数的情况,大家都知道,如果一个
* 跳跃表的节点个数为N,我们要从起始节点查询到末尾节点,可以用[0,N-1]
* 或者[0,-1],当传入的end<0时,这里会重新规正end的索引,llen为zset的长度,
* 因此查询[0,-1],这里会规正为[0,N-1]。同理,start也会被规正,如果我们查询
* [-5,-3],即代表查询有序集合倒数第5个节点至倒数第三个节点,前提是N>=5这个
* 查询才有意义。如果我们的起始索引传入的是一个绝对值>N的负数,那么llen + start的
* 结果也为负数,如果判断start<0,则start会被规正为0。
* */
llen = zsetLength(zobj);
if (start < 0) start = llen + start;
if (end < 0) end = llen + end;
if (start < 0) start = 0;
/*
* Invariant: start >= 0, so this test will be true when end < 0.
* The range is empty when start > end or start >= length.
* 如果起始索引大于终止索引,或者起始索引大于等于有序集合节点数量,则直接
* 返回空数组。
* */
if (start > end || start >= llen) {
addReply(c, shared.emptyarray);
return;
}
//如果判断终止索引大于等于节点数,则规整为llen-1
if (end >= llen) end = llen - 1;
//计算要返回的节点数
rangelen = (end - start) + 1;
//……
if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
//压缩列表逻辑……
} else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = zobj->ptr;
zskiplist *zsl = zs->zsl;
zskiplistNode *ln;
sds ele;
/*
* Check if starting point is trivial, before doing log(N) lookup.
* 这里会根据给定的开始索引,查找该索引对应的节点,并将ln指向该节点。
* 需要注意的一点是:平常我们都认为header.level[0].forward指向的节点,在跳跃表
* 中的索引为0,但有时候跳跃表的末尾节点zsl->tail的索引值也有可能为0,这里就要提到
* 逆向查询。
* 当我们使用ZRANGE key min max [WITHSCORES]命令查询时,成员的位置是按照其分值
* 从小到大来排序,这时候header.level[0].forward的索引值为0,
* header.level[0].forward.level[0].forward的索引值为1。而zls->tail的索引值
* 为zls->length-1。
* 当我们使用ZREVRANGE key start stop [WITHSCORES]命令查询时,成员的位置是按照
* 其分值从大到小来排序,这时候zls->tail的索引值为0,header.level[0].forward的
* 索引值为zls->length-1,header.level[0].forward.level[0].forward的索引值为
* zls->length-2。当reverse为1时,本次查询即为逆向查询。
* 我们注意到不管是if还是else分值,只要start>0,最终都会执行zslGetElementByRank()
* 将ln定位到起始节点。当start为0时,如果是逆向查询,则索引0的位置是尾节点zsl->tail,
* 如果是正向查询,索引0的位置则是zsl->header->level[0].forward。
* 那么(llen - start)和(start + 1)又代表什么含义呢?为什么zslGetElementByRank()
* 可以根据这两个公式的计算结果,定位到索引对应的节点呢?其实这两个公式计算的是跨度,而
* zslGetElementByRank()则是根据给定的跳跃表和跨度查找节点而已。
* 如果是正常查询,假设起始索引为0,则跨度为start(0)+1=1,刚好为头节点L0层到达第一个节点的
* 跨度为1;如果起始索引为1,则跨度为start(1)+1=2,刚好是头节点到达索引值为1的节点的跨度。
* 如果是逆向查询,索引值为0代表尾节点,而llen-start(0)=llen为头节点到达尾节点的跨度;同理,
* 倒数第二个节点的索引值为1,头节点到达倒数第二个节点的跨度为llen-start(1)=llen-1。
* */
if (reverse) {
ln = zsl->tail;
if (start > 0)
ln = zslGetElementByRank(zsl, llen - start);
} else {
ln = zsl->header->level[0].forward;
if (start > 0)
ln = zslGetElementByRank(zsl, start + 1);
}
//定位到起始节点后,根据逆向状态,不为0时后退查询(ln-backward),为0时递进查询(ln->level[0].forward)
while (rangelen--) {
serverAssertWithInfo(c, zobj, ln != NULL);
ele = ln->ele;
if (withscores && c->resp > 2) addReplyArrayLen(c, 2);
addReplyBulkCBuffer(c, ele, sdslen(ele));
if (withscores) addReplyDouble(c, ln->score);
ln = reverse ? ln->backward : ln->level[0].forward;
}
} else {
serverPanic("Unknown sorted set encoding");
}
}
查找到跨度对应的节点,查找到跨度对应的节点,则在<1>处返回,如果我们传入的跨度大于头节点到尾节点的跨度,则返回NULL。
/*
* Finds an element by its rank. The rank argument needs to be 1-based.
* */
zskiplistNode *zslGetElementByRank(zskiplist *zsl, unsigned long rank) {
zskiplistNode *x;
unsigned long traversed = 0;//累计跨度
int i;
x = zsl->header;
/*
* 从头节点的最高层出发,如果基于当前层能够递进到前一个节点,
* 则把当前节点的跨度加到traversed。
*/
for (i = zsl->level - 1; i >= 0; i--) {
while (x->level[i].forward && (traversed + x->level[i].span) <= rank) {
traversed += x->level[i].span;
x = x->level[i].forward;
}
/*
* 如果累计跨度与调用方传入的跨度相等,则代表x已经前进到调用方
* 所要求达到的跨度的节点,返回x。
*/
if (traversed == rank) {
return x;//<1>
}
}
//如果传入的跨度大于头节点到尾节点的跨度,则返回NULL。
return NULL;
}
跳跃表除了可以根据索引区间来查询,还可以根据分值区间来查询,这里我们又见到了结构体zrangespec。当我们需要判断一个节点是否落在我们指定的分值区间内,需要调用zslValueGteMin()和zslValueLteMax(),当传入一个指定的分值和区间,zslValueGteMin()和zslValueLteMax()的结果不为0,则表明节点落在分值区间内。此外,这两个方法还可以判断一个跳跃表是否和区间有交集,比如调用zslValueGteMin()时,传入尾节点(跳跃表分值最大的节点)及一个指定区间,如果尾节点没有落在指定区间,代表此区间都大于尾节点,此时我们不需要遍历跳跃表即可返回一个空数组,告诉客户端在指定区间内找不到任何节点;同理,调用zslValueLteMax()时传入头节点L0层的前驱节点(跳跃表分值最小节点)没有落在区间内,则表明区间小于跳跃表,同样不需要遍历跳跃表即可返回空数组给客户端,告诉客户端在指定区间内找不到任何节点。
/*
* Struct to hold a inclusive/exclusive range spec by score comparison.
* 此结构体用于表示一个指定区间,minex为0时表示在进行最小值比较时,要包含最小值本身
* 同理maxex为0时表示进行最大值比较时,要包含最大值本身。
* 比如:min=2,max=9
* 当minex=0,maxex=0时,区间为:[2,9]
* 当minex=1,maxex=0时,区间为:(2,9]
* 当minex=0,maxex=1时,区间为:[2,9)
* 当minex=1,maxex=1时,区间为:(2,9)
* */
typedef struct {
double min, max;
int minex, maxex; /* are min or max exclusive? */
} zrangespec;
/*
* 如果spec->minex不为0,返回分值是否大于区间最小值的比较结果,
* 为0则返回分值是否大于等于区间最小值的比较结果。
* 如果传入一个跳跃表尾节点的分值zsl->tail.score(即:跳跃表最大分值)和区间返回结果为0,
* 则表示跳跃表和区间没有交集。
* 这里分两种情况:
* spec->minex不为0:区间要查询分值大于spec->min的元素,
* zsl->tail.score<=spec->min代表跳跃表最大分值小于等于min,返回结果为0。
* spec->minex为0:区间要查询分值大于等于spec->min的元素,
* zsl->tail.score<spec->min代表跳跃表最大分值小于min,返回结果为0。
*/
int zslValueGteMin(double value, zrangespec *spec) {
return spec->minex ? (value > spec->min) : (value >= spec->min);
}
/*
* 如果spec->maxex不为0,返回分值是否小于区间最大值的比较结果,为0则返回分值
* 是否小于等于区间最大值的比较结果。
* 如果传入一个跳跃表头节点L0层指向节点的分值
* zsl->header.level[0].forward.score(即:跳跃表最小分值)和
* 区间返回结果为0,则表示跳跃表和区间没有交集。
* 这里分两种情况:
* spec->maxex不为0:区间要查询分值小于spec->max的元素,
* zsl->header.level[0].forward.score>=spec->max代表跳跃表最小分值大于等于区间
* 最大分值,返回结果为0。
* spec->maxex为0:区间要查询分值小于等于spec->max的元素,
* zsl->header.level[0].forward.score>spec->max代表跳跃表最小分值大于区间最大分值,
* 返回结果为0。
*/
int zslValueLteMax(double value, zrangespec *spec) {
return spec->maxex ? (value < spec->max) : (value <= spec->max);
}
在真正根据分值区间查询跳跃表前,会校验区间是否有效,如果我们输入一个区间[a,b],但a>b,那么这个区间肯定是无效区间,无须遍历跳跃表;如果a=b,如果区间的开闭状态出现:(a,b)、(a,b]、[a,b)这三种情况,也是无效区间,只有[a,b]才会去查询节点,表示需要查找分值为a(或者b)的节点。当校验完区间是有效后,还会调用zslValueGteMin()和zslValueLteMax()判断跳跃表和区间是否存在交集,即区间是否整体大于跳跃表或整体小于跳跃表,如果出现这两种情况则表明区间和跳跃表无交集,也就不需要遍历。
/*
* Returns if there is a part of the zset is in range.
* 判断跳跃表和区间是否存在交集
* */
int zslIsInRange(zskiplist *zsl, zrangespec *range) {
zskiplistNode *x;
/*
* Test for ranges that will always be empty.
* 校验区间范围是否有效,无效则返回0表示查询结果为空:
* 1.如果最小值大于最大值,则无效。
* 2.如果最小值等于最大值,且区间为:(min,max)、(min,max]、[min,max)则无效。
* */
if (range->min > range->max ||
(range->min == range->max && (range->minex || range->maxex)))
return 0;
/*
* 如果尾节点不为NULL,则把跳跃表最大分值zsl->tail.score与区间比较,
* 如果range->minex不为0,则查询分值大于range->min的元素,如果跳跃表
* 最大分值zsl->tail.score小于等于range->min,则表示跳跃表和区间没有交集,
* 无须遍历跳跃表查询;同理如果range->minex为0,则查询分值大于等于range->min
* 的元素,如果zsl->tail.score小于range->min,则表示跳跃表和区间没有交集,
* 也无须遍历跳跃表查询。
*/
x = zsl->tail;
if (x == NULL || !zslValueGteMin(x->score, range))
return 0;
/*
* 如果头节点L0层的前驱节点不为NULL,则把跳跃表最小分值zsl->header->level[0].forward.score
* 与区间比较,如果range->maxex不为0,则查询分值小于range->maxex的元素,如果跳跃表最小
* 分值zsl->header->level[0].forward.score大于等于range->max,则表示跳跃表和区间没有交集,
* 无须遍历跳跃表查询;同理如果range->maxex为0,则查询分值小于等于range->maxex的元素,如果跳跃表
* 最小分值zsl->header->level[0].forward.score大于range->maxex,则表示跳跃表和区间没有交集,
* 也无须遍历跳跃表查询。
*/
x = zsl->header->level[0].forward;
if (x == NULL || !zslValueLteMax(x->score, range))
return 0;
//跳跃表和区间存在交集,需要遍历跳跃表查询。
return 1;
}
跳跃表允许我们根据分值区间进行正向查询(分值从小到大)或逆向查询(分值从大到小),如果是正向查询,则调用zslFirstInRange()方法,先判断跳跃表和指定区间是否存在交集,如果存在则查找指定区间内的分值最小的节点并返回。
/*
* Find the first node that is contained in the specified range.
* Returns NULL when no element is contained in the range.
* 查找落在指定区间的第一个节点,如果没有元素落在这个区间则返回NULL。
* */
zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec *range) {
zskiplistNode *x;
int i;
/*
* If everything is out of range, return early.
* 如果跳跃表和区间没有交集则无须遍历,直接返回NULL。
* */
if (!zslIsInRange(zsl, range)) return NULL;
//从头节点的最高层开始遍历
x = zsl->header;
for (i = zsl->level - 1; i >= 0; i--) {
/*
* Go forward while *OUT* of range.
* 如果x->level[i].forward不为NULL,根据其分值x->level[i].forward->score和
* range->minex判断前驱节点是否能前进。
* 这里分两种情况:
* range->minex不为0:判断前驱节点的分值是否大于range->min,如果小于等于的话
* expression=zslValueGteMin(x->level[i].forward->score, range)为0,
* 代表需要前进,找到大于min的节点,而!expression为1,while条件成立,x会
* 前进到它的前驱节点。当x的前驱节点的分值大于min,就会停止循环,x会停留在区间内
* 第一个节点的后继节点。
* range->minex为0:判断前驱节点的分值是否大于等于range->min,如果小于的话,
* expression=zslValueGteMin(x->level[i].forward->score, range),expression为0,
* 需要前进,找到大于等于min的节点,而!expression为1,while条件成立,x会
* 前进到它的前驱节点。当x的前驱节点的分值大于等于min,就会停止循环,x会停留在区间内
* 第一个节点的后继节点。
* */
while (x->level[i].forward &&
!zslValueGteMin(x->level[i].forward->score, range))
x = x->level[i].forward;
}
/*
* This is an inner range, so the next node cannot be NULL.
* 上面的循环会让x停留在区间内第一个节点的后继节点,为了达到区间内的
* 第一个节点,x要在L0层前进到它的前驱节点。
* */
x = x->level[0].forward;
serverAssert(x != NULL);
/*
* Check if score <= max.
* range->maxex不为0:如果区间内第一个节点的分值大于等于spec->max,
* expression=zslValueLteMax(x->score, range),expression结果为0
* !expression为1,表示查询异常,返回NULL。
* range->maxex为0:如果区间内第一个节点的分值大于spec->max,
* 则expression=zslValueLteMax(x->score, range),expression结果为0
* !expression为1,表示查询异常,返回NULL。
* */
if (!zslValueLteMax(x->score, range)) return NULL;
return x;
}
如果要进行逆向查询,则调用zslLastInRange(),这里同样先判断跳跃表是否和区间存在交集,只有存在交集才会进行下一步的判断,查找指定区间内分值最大的节点并返回。
/*
* Find the last node that is contained in the specified range.
* Returns NULL when no element is contained in the range.
* 查找落在指定区间的最后一个节点,如果没有元素落在这个区间则返回NULL。
* */
zskiplistNode *zslLastInRange(zskiplist *zsl, zrangespec *range) {
zskiplistNode *x;
int i;
/*If everything is out of range, return early.*/
if (!zslIsInRange(zsl, range)) return NULL;<br>
//从头节点的最高层开始遍历
x = zsl->header;
for (i = zsl->level - 1; i >= 0; i--) {
/*
* Go forward while *IN* range.
* 根据区间range和前驱节点的分值判断是否前进,如果x->level[i].forward
* 不为NULL,根据range->maxex判断前驱节点是否能前进。
* 这里分两种情况:
* 如果range->maxex不为0,且前驱节点的分值小于range->max,则可以前进。
* 如果range->maxex为0,且前驱节点的分值小于等于range->max,则可以前进。
* */
while (x->level[i].forward &&
zslValueLteMax(x->level[i].forward->score, range))
x = x->level[i].forward;
}
/* This is an inner range, so this node cannot be NULL. */
serverAssert(x != NULL);
/*
* Check if score >= min.
* 如果range->minex不为0,x的分值小于或等于range->min,代表查询出现异常,则返回NULL。
* 如果range->minex为0,x的分值小于range->min,代表查询出现异常,则返回NULL。
* */
if (!zslValueGteMin(x->score, range)) return NULL;
return x;
}
在了解完上面的内容后,下面我们要步入正题:如何根据分值区间进行正向或逆向查找节点。在下面代码<1>处,会根据逆向状态选择ln是指向区间分值最大的节点,或是分值最小的节点。在定位到起始节点后,会在<2>处的while循环对节点进行偏移,如果到达偏移位置后的ln不为NULL,则会进入<3>处的while循环,查找分值落在区间内的节点,这里会根据逆向状态是否不为0,决定是用backward指针后退,还是向L0层的前驱节点递进,一直到分值不落在区间内跳出while循环,或者ln为NULL,又或者limit为0结束while循环。如果我们没有指定偏移(offset)和返回数量(limit),则不会进行偏移,limit默认值为-1,limit--永远不为0,这里会返回落在区间内的所有节点,能结束while循环只有遇到分值不落在区间内的节点,或者是ln为NULL。
/* This command implements ZRANGEBYSCORE, ZREVRANGEBYSCORE. */
void genericZrangebyscoreCommand(client *c, int reverse) {
zrangespec range;//指定区间
robj *key = c->argv[1];
robj *zobj;
long offset = 0, limit = -1;//偏移和结果返回数量
int withscores = 0;
unsigned long rangelen = 0;
void *replylen = NULL;
int minidx, maxidx;
/*
* Parse the range arguments.
* 解析范围参数
* ZRANGEBYSCORE key min max和ZREVRANGEBYSCORE key max min两个命令
* 都是此函数实现的,如果客户端输入的命令为ZRANGEBYSCORE,则reverse为0,按
* 从小到大查找分值及元素,分值小的在前,分值大的在后。如果客户端输入的命令为
* ZREVRANGEBYSCORE,则reverse不为0,按从大到小查找分值及元素,分值大的在前,
* 分值小的在后。
*
* */
if (reverse) {
/* Range is given as [max,min] */
maxidx = 2;
minidx = 3;
} else {
/* Range is given as [min,max] */
minidx = 2;
maxidx = 3;
}
if (zslParseRange(c->argv[minidx], c->argv[maxidx], &range) != C_OK) {
addReplyError(c, "min or max is not a float");
return;
}
/*
* Parse optional extra arguments. Note that ZCOUNT will exactly have
* 4 arguments, so we'll never enter the following code path.
* 遍历可选参数,这里会判断是否要返回分值(withscores),是否要对查询结果进行偏移(offset)和数量(limit)的限制
* */
if (c->argc > 4) {
int remaining = c->argc - 4;
int pos = 4;
while (remaining) {
if (remaining >= 1 && !strcasecmp(c->argv[pos]->ptr, "withscores")) {
pos++;
remaining--;
withscores = 1;
} else if (remaining >= 3 && !strcasecmp(c->argv[pos]->ptr, "limit")) {
if ((getLongFromObjectOrReply(c, c->argv[pos + 1], &offset, NULL)
!= C_OK) ||
(getLongFromObjectOrReply(c, c->argv[pos + 2], &limit, NULL)
!= C_OK)) {
return;
}
pos += 3;
remaining -= 3;
} else {
addReply(c, shared.syntaxerr);
return;
}
}
}
/*
* Ok, lookup the key and get the range
* 如果key所对应的zobj不存在,或者zobj的类型不为zset,则退出。
* */
if ((zobj = lookupKeyReadOrReply(c, key, shared.emptyarray)) == NULL ||
checkType(c, zobj, OBJ_ZSET))
return;
if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
//压缩列表流程...
} else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {//zobj类型为跳跃表
zset *zs = zobj->ptr;
zskiplist *zsl = zs->zsl;
zskiplistNode *ln;
/*
* If reversed, get the last node in range as starting point.
* 如果是逆向查询,ln会指向区间分值最大的节点,如果是正向查询,ln则指向区间分值最小的节点。
* */
if (reverse) {
ln = zslLastInRange(zsl, &range);
} else {
ln = zslFirstInRange(zsl, &range);
}
/*
* No "first" element in the specified interval.
* 如果没有落在区间的开始节点则退出
* */
if (ln == NULL) {
addReply(c, shared.emptyarray);
return;
}
/* We don't know in advance how many matching elements there are in the
* list, so we push this object that will represent the multi-bulk
* length in the output buffer, and will "fix" it later */
replylen = addReplyDeferredLen(c);
/*
* If there is an offset, just traverse the number of elements without
* checking the score because that is done in the next loop.
* <2>如果有偏移量,则根据reverse状态选择是后退还是递进,以达到偏移量。
* 如果客户端有传入偏移,则offset不为0,这里会循环到offset为0或ln为NULL时跳出循环。
* 否则offset默认值为0,不会进入此循环。
* */
while (ln && offset--) {
if (reverse) {
ln = ln->backward;
} else {
ln = ln->level[0].forward;
}
}
/*
* <3>如果客户端有传入偏移和数量,则limit不为0,此时会根据reverse状态后退或者前进至
* ln为NULL或者limit为0,否则limit默认值为-1,limit--永远为true(注:只要limit
* 不为0,则永远为true,即便是负数),这里就会循环到ln为NULL时,获取所有分值符合区间
* 节点的。
* 除了limit为0,或者ln为NULL会跳出while循环,在<4>处还会根据reverse状态判断分值是否在区间,
* 如果不在则跳出循环,如果分值符合区间,还会在<5>处根据reverse状态选择是后退到后一个节点(ln->backward),
* 还是前进到前一个节点(ln->level[0].forward)。
*/
while (ln && limit--) {
/*Abort when the node is no longer in range.*/
if (reverse) {//<4>
if (!zslValueGteMin(ln->score, &range)) break;
} else {
if (!zslValueLteMax(ln->score, &range)) break;
}
rangelen++;
if (withscores && c->resp > 2) addReplyArrayLen(c, 2);
addReplyBulkCBuffer(c, ln->ele, sdslen(ln->ele));
if (withscores) addReplyDouble(c, ln->score);
/* Move to next node */
if (reverse) {//<5>
ln = ln->backward;
} else {
ln = ln->level[0].forward;
}
}
} else {
serverPanic("Unknown sorted set encoding");
}
if (withscores && c->resp == 2) rangelen *= 2;
setDeferredArrayLen(c, replylen, rangelen);
}
最后,我们要了解如何获取一个元素在跳跃表中的索引,其实这里面的逻辑也是非常的简单,我们先从字典上获取节点的分值,然后根据分值及元素获取其在跳跃表中的索引,这里依旧支持正向查询或逆向查询,如果是正向查询,分值越小,索引越小,如果分值相等,则元素越小,索引越小;如果是逆向查询,则分值越大,索引越小,如果分值相等,则元素越大,索引越小。
/* Given a sorted set object returns the 0-based rank of the object or
* -1 if the object does not exist.
* 返回元素在有序集合中的索引,如果返回-1则代表元素不在有序集合内。
*
* For rank we mean the position of the element in the sorted collection
* of elements. So the first element has rank 0, the second rank 1, and so
* forth up to length-1 elements.
* 在跳跃表中第一个元素的索引为0,第二个元素索引为1,以此类推,最后一个元素索引为length-1。
*
* If 'reverse' is false, the rank is returned considering as first element
* the one with the lowest score. Otherwise if 'reverse' is non-zero
* the rank is computed considering as element with rank 0 the one with
* the highest score.
* 如果reverse为0,跳跃表索引从分值最小的节点开始,即zsl->header.level[0].forward索引为0、
* zsl->header.level[0].forward.level[0].forward索引为1,zsl->tail索引为zsl-length-1;
* 如果reverse不为0,跳跃表索引从分值最大的节点开始,即zsl->tail索引为0,zsl->tail.backward索引
* 为1,zsl->header.level[0].forward索引为zsl->length-1
* */
long zsetRank(robj *zobj, sds ele, int reverse) {
unsigned long llen;
unsigned long rank;
llen = zsetLength(zobj);
if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
//压缩列表逻辑……
} else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = zobj->ptr;
zskiplist *zsl = zs->zsl;
dictEntry *de;
double score;
de = dictFind(zs->dict, ele);
if (de != NULL) {
/*
* 如果元素存在在跳跃表上,则获取元素的分支,并根据
* 分支判断其在跳跃表中的跨度,根据跨度计算节点在跳跃表
* 中的索引。
* 如果是正向查询(reverse为0),则索引为跨度(rank)-1。
* 如果是逆向查询(reverse不为0),则索引为跳跃表长度(zsl->length)-跨度(rank)。
*/
score = *(double *) dictGetVal(de);
rank = zslGetRank(zsl, score, ele);
/* Existing elements always have a rank. */
serverAssert(rank != 0);
if (reverse)
return llen - rank;
else
return rank - 1;
} else {//如果元素不在跳跃表上,则返回-1
return -1;
}
} else {
serverPanic("Unknown sorted set encoding");
}
}
获取完分值后,需要定位节点在跳跃表中的跨度,然后根据逆向状态及跨度,计算节点在跳跃表中的索引。
/* Find the rank for an element by both score and key.
* Returns 0 when the element cannot be found, rank otherwise.
* Note that the rank is 1-based due to the span of zsl->header to the
* first element.
* 根据给定的分支和元素查找其节点在跳跃表中的跨度,返回0代表节点不存在。
* */
unsigned long zslGetRank(zskiplist *zsl, double score, sds ele) {
zskiplistNode *x;
unsigned long rank = 0;
int i;
//从头节点最高层遍历,如果能前进到前一个节点,则把当前节点的跨度加到rank上
x = zsl->header;
for (i = zsl->level - 1; i >= 0; i--) {
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele, ele) <= 0))) {
rank += x->level[i].span;
x = x->level[i].forward;
}
/*
* x might be equal to zsl->header, so test if obj is non-NULL
* x可能停留在头节点,此处判断是保证节点的元素不为NULL。
* */
if (x->ele && sdscmp(x->ele, ele) == 0) {
return rank;
}
}
return 0;
}
至此,笔者和大家一起学习了Redis跳跃表的是如何插入节点、删除节点、更新节点以及如何对跳跃表中的节点进行不同维度(索引、分值)的查询。跳跃表是一种应用相当广泛的数据结构,很多场景下人们都用跳跃表代替B-Tree,因为跳跃表和B-Tree有着一样的查询时间复杂度O(logN),但跳跃表的实现却比B-Tree简单很多。而Redis正是借助了跳跃表的思路实现了有序集合,使得很多需要存储、排序海量数据的业务得以实现。
当然,由于笔者的时间精力有限,这里并没有完全介绍所有跳跃表命令的相关实现,但笔者相信能看到这里的人,基本已经掌握了跳跃表的整体脉络。