深入分析redis之rax底层原理,前缀树?

文章目录


前言

本文参考源码版本为 redis6.2

前缀树是字符串查找时,经常使用的一种数据结构,能够在一个字符串集合中快速查找到某个字符串。

由于树中每个节点只存储字符串中的一个字符,故而有时会造成空间的浪费。Rax 的出现就是为了解决这一问题。Redis 对于 Rax 的解释为 A radix tree implement,基数树的一种实现。Rax中不仅可以存储字符串,同时还可以为这个字符串设置一个值,也就是key-value。


一、Radix Tree 是什么?

Radix Tree 是属于前缀树的一种类型。前缀树也称为 Trie Tree,其特点是,保存在树上的每个 key 会被拆分成单字符,然后逐一保存在树上的节点中。

前缀树的根节点不保存任何字符,而除了根节点以外的其他节点,每个节点只保存一个字符。当把从根节点到当前节点的路径上的字符拼接在一起时,就可以得到相应 key 的值了。

不同的是,rax 在前缀树上做了一些优化,来,继续往下看~

二、Radix Tree 数据结构

1. 结构

raxNode定义:


typedef struct raxNode {
    uint32_t iskey:1;     //节点是否包含key
    uint32_t isnull:1;    //节点的值是否为NULL
    uint32_t iscompr:1;   //节点是否被压缩
    uint32_t size:29;     //节点大小
    unsigned char data[]; //节点的实际存储数据
} raxNode;

该结构中的成员变量包括 4 个元数据,这四个元数据的含义分别如下。

  • iskey:表示从 Radix Tree 的根节点到当前节点路径上的字符组成的字符串,是否表示了一个完整的 key。如果是的话,那么 iskey 的值为 1。否则,iskey 的值为 0。不过,这里需要注意的是,当前节点所表示的 key,并不包含该节点自身的内容。
  • isnull:表示当前节点是否为空节点。如果当前节点是空节点,那么该节点就不需要为指向 value 的指针分配内存空间了。
  • iscompr:表示当前节点是非压缩节点,还是压缩节点。
  • size:表示当前节点的大小,具体值会根据节点是压缩节点还是非压缩节点而不同。如果当前节点是压缩节点,该值表示压缩数据的长度;如果是非压缩节点,该值表示该节点指向的子节点个数。

这 4 个元数据就对应了压缩节点非压缩节点头部的 HDR,其中,iskey、isnull 和 iscompr 分别用 1 bit 表示,而 size 占用 29 bit

另外,从 raxNode 结构体中,可以看到,除了元数据,该结构体中还有 char 类型数组 data。我们知道,data 是用来保存实际数据的。不过,这里保存的数据会根据当前节点的类型而有所不同:

  • 对于非压缩节点来说,data 数组包括子节点对应的字符、指向子节点的指针,以及节点表示 key 时对应的 value 指针;
  • 对于压缩节点来说,data 数组包括子节点对应的合并字符串、指向子节点的指针,以及节点为 key 时的 value 指针。

在 raxNode 的实现中,无论是非压缩节点还是压缩节点,具有两个特点:

  • 它们所代表的 key,是从根节点到当前节点路径上的字符串,但并不包含当前节点;
  • 它们本身就已经包含了子节点代表的字符或合并字符串。而对于它们的子节点来说,也都属于非压缩或压缩节点,所以,子节点本身又会保存,子节点的子节点所代表的字符或合并字符串。

而这两个特点给 Radix Tree 实际保存数据时的结构,带来了两个方面的变化。

  • 一方面,Radix Tree 非叶子节点,要不然是压缩节点,只指向单个子节点,要不然是非压缩节点,指向多个子节点,但每个子节点只表示一个字符。所以,非叶子节点无法同时指向表示单个字符的子节点和表示合并字符串的子节点。
  • 另一方面,对于 Radix Tree 的叶子节点来说,因为它没有子节点,所以,Redis 会用一个不包含子节点指针的 raxNode 节点来表示叶子节点,也就是说,叶子节点的 raxNode 元数据 size 为 0,没有子节点指针。如果叶子节点代表了一个 key,那么它的 raxNode 中是会保存这个 key 的 value 指针的。

如果你觉得晦涩不易懂,坚持,继续往下看~

2. 非压缩节点

这类节点会包含多个指向不同子节点的指针,以及多个子节点所对应的字符

同时,如果从根节点到一个非压缩节点的路径上的字符串,已经对应了 Radix Tree 中保存的一个 key,那么这个非压缩节点中还包含了指向这个 key 对应的 value 的指针。

结合 raxNode 结构来看看,如果是一个非压缩 node ,当我们有size个字符(bytes),就会有相对应size 个指向子节点 raxNode 的指针;

需要注意的是,字符不是存在子节点,而是存在于父节点中(可以思考下, 为什么不存在子节点中?)。

[header iscompr=0][abc][a-ptr][b-ptr][c-ptr](value-ptr?)

3. 压缩节点

这类节点会包含一个指向子节点的指针,以及子节点所代表的合并的字符串

和非压缩节点类似,如果从根节点到一个压缩节点的路径上的字符串,已经对应了 Radix Tree 中保存的一个 key,那么,这个压缩节点中还包含指向这个 key 对应的 value 的指针。

注意,压缩节点仅有一个子节点

[header iscompr=1][abc][c-ptr](value-ptr?)

4. 图解Radix Tree

假设Radix Tree树中包含以下几个字符串 “foo”, “foobar” 和 “footer”;如果node节点代表rax树中的一个key,就写在 [] 里面,反之写在 () 里面。

              (f) ""
                \
                (o) "f"
                  \
                  (o) "fo"
                    \
                  [t   b] "foo"
                  /     \
         "foot" (e)     (a) "foob"
                /         \
      "foote" (r)         (r) "fooba"
              /             \
    "footer" []             [] "foobar"

然而,这里有个常见优化点,对于连续只有单个子节点的node可以压缩成一个 「压缩节点」,因此,上面的树形图变成了下面这种:

                  ["foo"] ""
                     |
                  [t   b] "foo"
                  /     \
        "foot" ("er")    ("ar") "foob"
                 /          \
       "footer" []          [] "foobar"

这便是Radix Tree的模型图了。

不过,这种树形图实现上却是有点麻烦,比如 当字符串 “first” 要插入上面的 Radix Tree 中,这里就涉及到节点切割了,因为此时 “foo” 就不再是一个公共前缀了,最终树形图如下:

                    (f) ""
                    /
                 (i o) "f"
                 /   \
    "fi"  ("rst")  (o) "fo"
              /        \
    "first" []       [t   b] "foo"
                     /     \
           "foot" ("er")    ("ar") "foob"
                    /          \
          "footer" []          [] "foobar"

三、基本操作

1. 查询

rax提供了查找key的接口raxFind,该接口用于获取key对应的value值,定义如下:

/* Find a key in the rax, returns raxNotFound special void pointer value
 * if the item was not found, otherwise the value associated with the
 * item is returned. */
void *raxFind(rax *rax, unsigned char *s, size_t len) {
    raxNode *h;

    int splitpos = 0;
    size_t i = raxLowWalk(rax,s,len,&h,NULL,&splitpos,NULL);
    if (i != len || (h->iscompr && splitpos != 0) || !h->iskey)
        return raxNotFound;
    return raxGetData(h);
}

其中,该方法的作用有是,在rax中查找长度为 len 的字符串 s (s为rax中的key),找到之后返回 key 对应的 value。

可以看到,具体的查询逻辑交给了 raxLowWalk 去完成,该方法作为底层方法,在增删查改操作中均有调用,接口定义为:

static inline size_t raxLowWalk(rax *rax, unsigned char *s, size_t len, raxNode **stopnode, raxNode ***plink, int *splitpos, raxStack *ts) 

其中,入参定义为:

  • rax 为待查找的 Rax;
  • s 为待查找的 key;
  • len 为 s 的长度;
  • stopnode 为查找过程中的终止节点,也就意味着,当 rax 查找到该节点时,待查找的 key 已经匹配完成,或者当前节点无法与带查找的 key 匹配;
  • plink 用于记录父节点中指向 stopnode 的指针的位置,当 stopnode 变化时,也需要修改父节点指向该节点的指针;
  • splitpos 用于记录压缩节点的匹配位置;用于压缩节点中切割并插入
  • 当 ts 不为空时,会将查找该 key 的路径写入该变量

该函数返回 s 的匹配长度,当 s ! = len 时,表示未查找到该key;当s == len时,需要检验 stopnode 是否为 key,并且当 stopnode 为压缩节点时,还需要检查splitpos是否为0(可能匹配到某个压缩节点中间的某个元素),具体实现如下:

    raxNode *h = rax->head; // 从跟节点开始匹配
    raxNode **parentlink = &rax->head;

    size_t i = 0; /* Position in the string. */
    size_t j = 0; /* Position in the node children (or bytes if compressed).*/
    while(h->size && i < len) {
        unsigned char *v = h->data;
          
        if (h->iscompr) {
            // 压缩节点,挨个比对,相同则继续往下找;反之,则退出,说明匹配到这就开始不同或者已经到末尾了
            for (j = 0; j < h->size && i < len; j++, i++) {
                if (v[j] != s[i]) break;
            }
            if (j != h->size) break;
        } else { // 非压缩节点,只要找到一个相同的就可以继续往下,在子节点中查找了
            /* Even when h->size is large, linear scan provides good
             * performances compared to other approaches that are in theory
             * more sounding, like performing a binary search. */ 
            for (j = 0; j < h->size; j++) {
                if (v[j] == s[i]) break;
            }
            if (j == h->size) break;
            // 可以匹配到当前字符,继续往下匹配下一个字符,然后会移动到子节点进行匹配
            i++;
        }
        // 记录路径信息
        if (ts) raxStackPush(ts,h); /* Save stack of parent nodes. */
        // 找到第一个子节点的地址
        raxNode **children = raxNodeFirstChildPtr(h);
        if (h->iscompr) j = 0; /* Compressed node only child is at index 0. */
        // 将当前节点移动到其第j个子节点
        memcpy(&h,children+j,sizeof(h));
        parentlink = children+j;
        j = 0; /* If the new node is non compressed and we do not
                  iterate again (since i == len) set the split
                  position to 0 to signal this node represents
                  the searched key. */
    }
    if (stopnode) *stopnode = h;
    if (plink) *plink = parentlink;
    if (splitpos && h->iscompr) *splitpos = j;
    return i;

实现逻辑:

  • 从根节点开始匹配,直到当前待查找节点无子节点或者 s 查找完毕。对于每个节点来说,如果为压缩节点,则需要与 s 中的字符完全匹配。如果为非压缩节点,则查找与当前待匹配字符相同的字符。
  • 如果当前待匹配节点能够与 s 匹配,则移动位置到其子节点,继续匹配,直至匹配完成。
  • 记录相关信息,比如路径等

2. 插入

插入元素比较复杂,源码中也花了大量注释说明。

向 rax 中插入 key-value 对,对于已存在的 key, rax 提供了2种方案,覆盖或者不覆盖原有的 value,对应的接口分别为 raxInsert、raxTryInsert。

插入操作的真正实现函数 raxGenericInsert,方法定义如下:

int raxGenericInsert(rax *rax, unsigned char *s, size_t len, void *data, void **old, int overwrite)

第一步,当完全匹配并且目标位置的节点不需要切割

    size_t i;
    int j = 0; // 切割位置,当定位到具体的压缩节点时,j用于定位该压缩节点待插入的位置
    raxNode *h, **parentlink;
    
    // 在插入元素前,会通过 raxLowWalk 判断 key 是否存在或者找到待插入位置。
    i = raxLowWalk(rax,s,len,&h,&parentlink,&j,NULL);

     // 1. i == len(代表已经匹配到了整个字符串)
     // 2. 如果不是压缩节点 或者 是压缩节点但该压缩节点不需要切割(j==0)
     // 注:对于压缩节点node,如果要在node中插入新元素 或者 该node中代表了这次要插入元素的key, 那么将会重新分配该node, 确保空间足够
    if (i == len && (!h->iscompr || j == 0 /* not in the middle if j is 0 */)) {
        /* Make space for the value pointer if needed. */
        if (!h->iskey || (h->isnull && overwrite)) {
            h = raxReallocForData(h,data);
            if (h) memcpy(parentlink,&h,sizeof(h));
        }
        if (h == NULL) {
            errno = ENOMEM;
            return 0;
        }
        
        // 如果key存在,根据方法入参进行处理(获取旧数据或者重写)
        if (h->iskey) {
            if (old) *old = raxGetData(h);
            if (overwrite) raxSetData(h,data);
            errno = 0;
            return 0; /* Element already exists. */
        }

        // 给新增节点赋值
        raxSetData(h,data);
        rax->numele++;
        return 1; /* Element inserted. */
    }

第二步:对于需要切割的压缩节点,有几种情况:

假设我们当前所在节点 h 包含字符串 “ANNIBALE” ,h 有且仅有一个子节点(“SC”),并且该节点没有子节点,用 ‘0’ 表示,结构如下:

"ANNIBALE" -> "SCO" -> []

案例1: 插入 “ANNIENTARE”

  
               |B| -> "ALE" -> "SCO" -> []
     "ANNI" -> |-|
               |E| -> (... continue algo ...) "NTARE" -> []

需要将 “ANNIBALE” 拆分成3部分:压缩节点,非压缩节点,压缩节点

案例2: 插入 “ANNIBALI”

                  |E| -> "SCO" -> []
     "ANNIBAL" -> |-|
                  |I| -> (... continue algo ...) []

需要将 “ANNIBALE” 拆成2部分:压缩节点,非压缩节点。

案例3: 插入 “AGO”(和案例1类似,不过,需要在原节点中设置 iscompr = 0)

            |N| -> "NIBALE" -> "SCO" -> []
     |A| -> |-|
            |G| -> (... continue algo ...) |O| -> []

需要将 “ANNIBALE” 节点拆分为3部分:非压缩节点,非压缩节点,压缩节点。

案例4: 插入 “CIAO”

     |A| -> "NNIBALE" -> "SCO" -> []
     |-|
     |C| -> (... continue algo ...) "IAO" -> []

此时需要将 “ANNIBALE” 节点拆分为2部分:第一部分是非压缩节点,第二部分为压缩节点。

案例5: 插入 “ANNI”

"ANNI" -> "BALE" -> "SCO" -> []

将 “ANNIBALE” 拆分成2个压缩节点。


综上,对于压缩节点拆分过程,总体而言分为2类:
  • 一类是新插入的key是当前节点的一部分
  • 另一类是新插入的key和压缩节点的某个位置不匹配

源码实现上,是用了两种算法覆盖以上5种案例:

算法1: 覆盖案例 1~4,出现字符不匹配的位置,是在某个压缩节点上。

SPLITPOS 代表压缩节点切割位置,例如 压缩节点包含 “ANNIBALE”,将插入 “ANNIENTARE”,那么 SPLITPOS = 4,也就是 未匹配元素的下标。

  • 1.保存压缩节点 NEXT 指针(指向唯一的子节点)
  • 2.创建切割节点 “split node”,即第一个未匹配位置,len = 1;
  • 3a.如果 SPLITPOS == 0,用 “split node” 代替旧节点,并拷贝数据;也就意味着,该压缩节点只会被被分成两部分
  • 3b. 如果 SPLITPOS != 0,也就说该压缩节点存在匹配成功的前缀部分,切割压缩节点,并修改相应指针。如果新压缩节点len == 1, 设置 iscompr = 0
  • 4a. 如果原压缩节点切割后,剩余后缀长度 postfix len 大于 0,就创建一个 “postfix node” 后缀节点;如果 “postfix node” len == 1, 设置 iscompr = 0;设置 “postfix node” 的子节点指针 指向 NEXT
  • 4b. 如果 postfix len == 0, 用 NEXT 作为 postfix 指针
  • 5.设置 postfix 节点作为 splitnode 的第一个子节点
  • 6.设置 “split node” 为当前节点, 然后将处理,新key未匹配部分

源码实现如下:

    /* ------------------------- ALGORITHM 1 --------------------------- */
    if (h->iscompr && i != len) {
        /* 1: Save next pointer. */
        // 用 NEXT 保存 h的子节点指针
        raxNode **childfield = raxNodeLastChildPtr(h);
        raxNode *next;
        memcpy(&next,childfield,sizeof(next));

        if (h->iskey) {
            debugf("key value is %p\n", raxGetData(h));
        }

        // 设置新增节点长度,这里 j 是压缩节点切割位置,因此压缩节点会被切割成两部分,长度分别是 j 和 postfixlen
        size_t trimmedlen = j;
        size_t postfixlen = h->size - j - 1;
        // 仔细想想?当压缩节点被切割成两部分,也就是说该压缩节点部分与 s 没有相同部分;如果该压缩节点本身 iskey, 那么切割后 split_node 自然也是 iskey (注意:iskey表示从根节点到压缩节点的父节点组成的key)
        int split_node_is_key = !trimmedlen && h->iskey && !h->isnull;
        size_t nodesize;

        /* 2: 创建 split node 节点,并分配空间 */
        // 可以看到,这里会切割成三部分,当切割位置在第一个字符时(j==0),即trimmedlen=0 表示没有公共前缀,此时会切割成两部分
        raxNode *splitnode = raxNewNode(1, split_node_is_key);  // 切割位置
        raxNode *trimmed = NULL; // 已匹配的前缀部分
        raxNode *postfix = NULL; // 切割位置之后,未匹配的部分

        // 如果匹配前缀大于0,分配新空间
        if (trimmedlen) {
            nodesize = sizeof(raxNode)+trimmedlen+raxPadding(trimmedlen)+
                       sizeof(raxNode*);
            if (h->iskey && !h->isnull) nodesize += sizeof(void*);
            trimmed = rax_malloc(nodesize);
        }

        // 未匹配部分长度大于0,分配空间
        if (postfixlen) {
            nodesize = sizeof(raxNode)+postfixlen+raxPadding(postfixlen)+
                       sizeof(raxNode*);
            postfix = rax_malloc(nodesize);
        }

        /* OOM? Abort now that the tree is untouched. */
        // 分配失败
        if (splitnode == NULL ||
            (trimmedlen && trimmed == NULL) ||
            (postfixlen && postfix == NULL))
        {
            rax_free(splitnode);
            rax_free(trimmed);
            rax_free(postfix);
            errno = ENOMEM;
            return 0;
        }
        splitnode->data[0] = h->data[j];
        
        // j == 0,意味着压缩节点只需拆分成两部分,split node 与 后缀部分已经拆分了,因此 parentlink 将指向 split node
        if (j == 0) {
            // 3a. 仔细想想?原压缩节点 iskey, 到这一步,split node 同样 iskey,因此将数据拷贝过去
            if (h->iskey) {
                void *ndata = raxGetData(h);
                raxSetData(splitnode,ndata);
            }
            memcpy(parentlink,&splitnode,sizeof(splitnode));
        } else {
            // 3b. 切割公共前缀,也就意味着,压缩节点将被分成三部分
            trimmed->size = j;
            memcpy(trimmed->data,h->data,j);
            trimmed->iscompr = j > 1 ? 1 : 0;
            trimmed->iskey = h->iskey;
            trimmed->isnull = h->isnull;
            if (h->iskey && !h->isnull) {
                void *ndata = raxGetData(h);
                raxSetData(trimmed,ndata);
            }
            raxNode **cp = raxNodeLastChildPtr(trimmed);
            memcpy(cp,&splitnode,sizeof(splitnode));
            memcpy(parentlink,&trimmed,sizeof(trimmed));
            parentlink = cp; /* Set parentlink to splitnode parent. */
            rax->numnodes++;
        }

        // 4. 创建 postfix 节点,即原压缩节点未匹配的后缀部分
        if (postfixlen) {
            /* 4a: create a postfix node. */
            postfix->iskey = 0;
            postfix->isnull = 0;
            postfix->size = postfixlen;
            postfix->iscompr = postfixlen > 1;
            memcpy(postfix->data,h->data+j+1,postfixlen);
            raxNode **cp = raxNodeLastChildPtr(postfix);
            memcpy(cp,&next,sizeof(next));
            rax->numnodes++;
        } else {
            /* 4b: just use next as postfix node. */
            postfix = next;
        }

        // 5. 设置 postfix 节点作为 splitnode 的第一个子节点
        raxNode **splitchild = raxNodeLastChildPtr(splitnode);
        memcpy(splitchild,&postfix,sizeof(postfix));

         // 6. 继续插入。以上只处理了原压缩节点,对于新插入key, 未匹配部分还未插入,源码稍后介绍;这里插入之后,splitnode将会多一个子节点
        rax_free(h);
        h = splitnode;
    }

算法2:应对场景5,即在压缩节点某个位置处,完成了字符串 s 的匹配,此时,只需将压缩节点拆分成两部分即可。

例如,rax树中存在节点 “ANNIBALE”,将插入 “ANNI”,此时 SPLITPOS = 4。

  • 1.保存当前压缩节点 NEXT 指针
  • 2.创建后缀节点 postfix node,保存从 SPLITPOS 之后的所有字符,如果 postfix node 的 len == 1, 设置 iscompr = 0
  • 3.切割匹配的公共部分 trimmed 节点,下标为0~SPLITPOS
  • 4.设置 postfix 为 trimmed 的第一个子节点

源码实现如下:

else if (h->iscompr && i == len) {
    /* ------------------------- ALGORITHM 2 --------------------------- */
        // 切割并分配 postfix 和 trimmed 节点
        size_t postfixlen = h->size - j;
        size_t nodesize = sizeof(raxNode)+postfixlen+raxPadding(postfixlen)+
                          sizeof(raxNode*);
        if (data != NULL) nodesize += sizeof(void*);
        raxNode *postfix = rax_malloc(nodesize);

        nodesize = sizeof(raxNode)+j+raxPadding(j)+sizeof(raxNode*);
        if (h->iskey && !h->isnull) nodesize += sizeof(void*);
        raxNode *trimmed = rax_malloc(nodesize);

        // 分配失败
        if (postfix == NULL || trimmed == NULL) {
            rax_free(postfix);
            rax_free(trimmed);
            errno = ENOMEM;
            return 0;
        }

        /* 1: Save next pointer. */
        raxNode **childfield = raxNodeLastChildPtr(h);
        raxNode *next;
        memcpy(&next,childfield,sizeof(next));

        /* 2: Create the postfix node. */
        postfix->size = postfixlen;
        postfix->iscompr = postfixlen > 1;
        postfix->iskey = 1;
        postfix->isnull = 0;
        memcpy(postfix->data,h->data+j,postfixlen);
        raxSetData(postfix,data);
        raxNode **cp = raxNodeLastChildPtr(postfix);
        memcpy(cp,&next,sizeof(next));
        rax->numnodes++;

        /* 3: Trim the compressed node. */
        trimmed->size = j;
        trimmed->iscompr = j > 1;
        trimmed->iskey = 0;
        trimmed->isnull = 0;
        memcpy(trimmed->data,h->data,j);
        memcpy(parentlink,&trimmed,sizeof(trimmed));
        if (h->iskey) {
            void *aux = raxGetData(h);
            raxSetData(trimmed,aux);
        }

        /* Fix the trimmed node child pointer to point to
         * the postfix node. */
        cp = raxNodeLastChildPtr(trimmed);
        memcpy(cp,&postfix,sizeof(postfix));

        /* Finish! We don't need to continue with the insertion
         * algorithm for ALGO 2. The key is already inserted. */
        rax->numele++;
        rax_free(h);
        return 1; /* Key inserted. */
    }

到这里,还有一点工作需要完成, 那就是新key未匹配的部分,还需要插入rax树,源码实现如下:

    while(i < len) {
        raxNode *child;

        // 这里就是判断要处理成压缩节点还是非压缩节点
        if (h->size == 0 && len-i > 1) {
            debugf("Inserting compressed node\n");
            size_t comprsize = len-i;
            if (comprsize > RAX_NODE_MAX_SIZE)
                comprsize = RAX_NODE_MAX_SIZE;
            raxNode *newh = raxCompressNode(h,s+i,comprsize,&child);
            if (newh == NULL) goto oom;
            h = newh;
            memcpy(parentlink,&h,sizeof(h));
            parentlink = raxNodeLastChildPtr(h);
            i += comprsize;
        } else {
            debugf("Inserting normal node\n");
            raxNode **new_parentlink;
            raxNode *newh = raxAddChild(h,s[i],&child,&new_parentlink);
            if (newh == NULL) goto oom;
            h = newh;
            memcpy(parentlink,&h,sizeof(h));
            parentlink = new_parentlink;
            i++;
        }
        rax->numnodes++;
        h = child;
    }
    raxNode *newh = raxReallocForData(h,data);
    if (newh == NULL) goto oom;
    h = newh;
    if (!h->iskey) rax->numele++;
    raxSetData(h,data);
    memcpy(parentlink,&h,sizeof(h));
    return 1; /* Element inserted. */

以上便是插入新key的全部过程。可以看出,实现上根据列举的5种案例进行处理。

3. 删除

rax 的删除操作主要有3个接口,可以删除 rax 中的某个 key,或者释放整个 rax,在释放 rax 时,还可以设置释放回调函数,在释放rax 的每个 key 时,都会调用这个回调函数,3个接口的定义如下:

// 在 rax 中删除长度为 len 的 s
int raxRemove(rax *rax, unsigned char *s, size_t len, void **old);
// 释放 rax
void raxFree(rax *rax);
// 释放 rax, 释放每个 key 时,都会调用 free_callback 方法
void raxFreeWithCallback(rax *rax, void (*free_callback)(void*));

其中,rax 的释放操作,采用的是深度优先算法

下面重点介绍 raxRemove 方法:

第一步:当删除 rax 中的某个 key-value 对时,首先查找 key 是否存在,不存在则直接返回,存在则需要进行删除操作,实现如下:

    raxNode *h;
    raxStack ts;
    
    raxStackInit(&ts); // 保存路径
    int splitpos = 0;
    size_t i = raxLowWalk(rax,s,len,&h,NULL,&splitpos,&ts);
    if (i != len || (h->iscompr && splitpos != 0) || !h->iskey) { // 未找到目标 key, 直接退出
        raxStackFree(&ts);
        return 0;
    }

第二步,如果 key 存在,则需要进行删除操作,删除操作完成后,rax 树可能需要进行压缩。具体可以分为下面 2 种情况,此处所说的压缩是指将某个节点与其子节点压缩成一个节点,叶子节点没有子节点,不能进行压缩。

  • 某个节点只有一个子节点,该子节点之前是key,经删除操作后不再是key,此时可以将该节点与其子节点压缩。
  • 某个节点有两个子节点,经过删除操作后,只剩下一个子节点,如果这个子节点不是key,则可以将该节点与这个子节点压缩。

案例1:
假设 rax 树存储了 “FOO” = 1 和 “FOOBAR” = 2,结构如下:

"FOO" -> "BAR" -> [] (2)
          (1)

删除 “FOO” ,压缩后,结构如下:

"FOOBAR" -> [] (2)

案例2:
假设 rax 树存储了 “FOOBAR” = 1 和 “FOOTER” = 2

          |B| -> "AR" -> [] (1)
 "FOO" -> |-|
          |T| -> "ER" -> [] (2)

删除 “FOOTER” 后,结构如下:

"FOO" -> |B| -> "AR" -> [] (1)

可以压缩成的结果如下:

"FOOBAR" -> [] (1)

删除操作具体可以分为2个阶段,删除阶段以及压缩阶段

1)删除阶段:

利用查找key时记录的匹配路径,依次向上直到无法删除为止。

    if (h->size == 0) {
        debugf("Key deleted in node without children. Cleanup needed.\n");
        raxNode *child = NULL;
        while(h != rax->head) {
            child = h;
            debugf("Freeing child %p [%.*s] key:%d\n", (void*)child,
                (int)child->size, (char*)child->data, child->iskey);
            rax_free(child);
            rax->numnodes--;
            h = raxStackPop(&ts);
            // 如果节点为 key,或者子节点 size != 1,则无法删除
            if (h->iskey || (!h->iscompr && h->size != 1)) break;
        }
        if (child) {
            raxNode *new = raxRemoveChild(h,child);
            if (new != h) {
                raxNode *parent = raxStackPeek(&ts);
                raxNode **parentlink;
                if (parent == NULL) {
                    parentlink = &rax->head;
                } else {
                    parentlink = raxFindParentLink(parent,h);
                }
                memcpy(parentlink,&new,sizeof(new));
            }

            // 删除后尝试进行压缩
            if (new->size == 1 && new->iskey == 0) {
                trycompress = 1;
                h = new;
            }
        }
    } else if (h->size == 1) {
        // 可以尝试进行压缩
        trycompress = 1;
    }

2)压缩阶段: 压缩过程可以细化为2步。

① 找到可以进行压缩的第一个元素,之后将所有可进行压缩的节点进行压缩。由于raxRowWalk函数已经记录了查找key的过程,压缩时只需从记录栈中不断弹出元素,即可找到可进行压缩的第一个元素,过程如下:

        raxNode *parent;
        while(1) {
            parent = raxStackPop(&ts);
            if (!parent || parent->iskey ||
                (!parent->iscompr && parent->size != 1)) break;
            h = parent;
        }
        // 可以进行压缩的第一个节点
        raxNode *start = h; 

② 找到第一个可压缩节点后,进行数据压缩。由于可压缩的节点都只有一个子节点,压缩过程只需要读取每个节点的内容,创建新的节点,并填充新节点的内容即可:

       /* Scan chain of nodes we can compress. */
        size_t comprsize = h->size;
        int nodes = 1;
        while(h->size != 0) {
            raxNode **cp = raxNodeLastChildPtr(h);
            memcpy(&h,cp,sizeof(h));
            if (h->iskey || (!h->iscompr && h->size != 1)) break;
            /* Stop here if going to the next node would result into
             * a compressed node larger than h->size can hold. */
            if (comprsize + h->size > RAX_NODE_MAX_SIZE) break;
            nodes++;
            comprsize += h->size;
        }
        if (nodes > 1) {
            /* If we can compress, create the new node and populate it. */
            size_t nodesize =
                sizeof(raxNode)+comprsize+raxPadding(comprsize)+sizeof(raxNode*);
            raxNode *new = rax_malloc(nodesize);
            /* An out of memory here just means we cannot optimize this
             * node, but the tree is left in a consistent state. */
            if (new == NULL) {
                raxStackFree(&ts);
                return 1;
            }
            new->iskey = 0;
            new->isnull = 0;
            new->iscompr = 1;
            new->size = comprsize;
            rax->numnodes++;

            /* Scan again, this time to populate the new node content and
             * to fix the new node child pointer. At the same time we free
             * all the nodes that we'll no longer use. */
            comprsize = 0;
            h = start;
            while(h->size != 0) {
                memcpy(new->data+comprsize,h->data,h->size);
                comprsize += h->size;
                raxNode **cp = raxNodeLastChildPtr(h);
                raxNode *tofree = h;
                memcpy(&h,cp,sizeof(h));
                rax_free(tofree); rax->numnodes--;
                if (h->iskey || (!h->iscompr && h->size != 1)) break;
            }

            /* Now 'h' points to the first node that we still need to use,
             * so our new node child pointer will point to it. */
            raxNode **cp = raxNodeLastChildPtr(new);
            memcpy(cp,&h,sizeof(h));

            /* Fix parent link. */
            if (parent) {
                raxNode **parentlink = raxFindParentLink(parent,start);
                memcpy(parentlink,&new,sizeof(new));
            } else {
                rax->head = new;
            }
        }

4. 遍历

为了能够遍历 rax 中所有的 key, Redis 提供了迭代器操作。Redis 中实现的迭代器为双向迭代器,可以向前,也可以向后,顺序是按照 key 的字典序排列的。通过 rax 的结构图可以看出,如果某个节点为 key,则其子节点的 key 按照字典序比该节点的 key 大。另外,如果当前节点为非压缩节点,则其最左侧节点的key是其所有子节点的 key 中最小的。主要方法有:

void raxStart(raxIterator *it, rax *rt);
int raxSeek(raxIterator *it, const char *op, unsigned char *ele, size_t len);
int raxNext(raxIterator *it);
int raxPrev(raxIterator *it);

1)raxStart 用于初始化 raxIterator 结构:

/* Initialize a Rax iterator. This call should be performed a single time
 * to initialize the iterator, and must be followed by a raxSeek() call,
 * otherwise the raxPrev()/raxNext() functions will just return EOF. */
void raxStart(raxIterator *it, rax *rt) {
    it->flags = RAX_ITER_EOF; /* No crash if the iterator is not seeked. */
    it->rt = rt;
    it->key_len = 0;
    it->key = it->key_static_string;
    it->key_max = RAX_ITER_STATIC_LEN;
    it->data = NULL;
    it->node_cb = NULL;
    raxStackInit(&it->stack);
}

2)raxSeek,在raxStart初始化迭代器后,必须调用raxSeek函数初始化迭代器的位置。

int raxSeek(raxIterator *it, const char *op, unsigned char *ele, size_t len);
  • it为raxStart初始化的迭代器。
  • op为查找操作符,可以为大于(>)、小于(<)、大于等于(>=)、小于等于(<=)、等于(=)、首个元素(^)、末尾元素($)。
  • ele为待查找的key。
  • len为ele的长度。

查找末尾元素可以直接在Rax中找到最右侧的叶子节点,查找首个元素被转换为查找大于等于空的操作。

处理大于、小于、等于等操作主要分为以下几步:

  • 在rax中查找key:
  • 如果key找到,并且op中设置了等于,则操作完成:
  • 如果仅仅设置等于,并没有找到key,则将迭代器的标志位设置为末尾。
  • 如果设置了等于但没有找到key,或者设置了大于或者小于符号,则需要继续查找。

3)raxNext & raxPrev,主要用于迭代上一个或者下一个key

/* Go to the next element in the scope of the iterator 'it'.
 * If EOF (or out of memory) is reached, 0 is returned, otherwise 1 is
 * returned. In case 0 is returned because of OOM, errno is set to ENOMEM. */
int raxNext(raxIterator *it) {
    if (!raxIteratorNextStep(it,0)) {
        errno = ENOMEM;
        return 0;
    }
    if (it->flags & RAX_ITER_EOF) {
        errno = 0;
        return 0;
    }
    return 1;
}

/* Go to the previous element in the scope of the iterator 'it'.
 * If EOF (or out of memory) is reached, 0 is returned, otherwise 1 is
 * returned. In case 0 is returned because of OOM, errno is set to ENOMEM. */
int raxPrev(raxIterator *it) {
    if (!raxIteratorPrevStep(it,0)) {
        errno = ENOMEM;
        return 0;
    }
    if (it->flags & RAX_ITER_EOF) {
        errno = 0;
        return 0;
    }
    return 1;
}

4)raxStop & raxEOF:
raxEOF用于标识迭代器迭代结束,raxStop用于结束迭代并释放相关资源:

/* Free the iterator. */
void raxStop(raxIterator *it) {
    if (it->key != it->key_static_string) rax_free(it->key);
    raxStackFree(&it->stack);
}

/* Return if the iterator is in an EOF state. This happens when raxSeek()
 * failed to seek an appropriate element, so that raxNext() or raxPrev()
 * will return zero, or when an EOF condition was reached while iterating
 * with raxNext() and raxPrev(). */
int raxEOF(raxIterator *it) {
    return it->flags & RAX_ITER_EOF;
}

总结

首先,要理解 rax 底层实现,第一步需要理解 rax 的结构;其次,要理解 压缩节点 和 非压缩节点 的区别

来,再回顾下 压缩节点 和 非压缩节点 的区别:
它们的相同之处在于:

  • 都有保存元数据的节点头 HDR;
  • 都会包含指向子节点的指针,
  • 以及子节点所代表的字符串。从根节点到当前节点路径上的字符串如果是 Radix Tree 的一个 key,它们都会包含指向 key 对应 value 的指针。

不同之处在于:

  • 非压缩节点指向的子节点,每个子节点代表一个字符,非压缩节点可以指向多个子节点;
  • 压缩节点指向的子节点,代表的是一个合并字符串,压缩节点只能指向一个子节点。



相关参考:

redis readme.md
Redis · 引擎特性 · radix tree 源码解析

上一篇:C++多态原理


下一篇:pwn -- 沙盒机制详解