快表
快速列表 quicklist
是 3.2
版本新添加的编码类型,结合了 ziplist
和 linkedlist
的一种编码。
同时在 3.2
版本中,列表也废弃了 ziplist
和 linkedlist
,这也是《Redis设计与实现》一书中没有提到的一种底层数据结构。
快表构成
这里我直接下载了Redis6.0源码,快表实现在quicklist.h
:
typedef struct quicklist {
quicklistNode *head;
quicklistNode *tail;
unsigned long count; /* total count of all entries in all ziplists */
unsigned long len; /* number of quicklistNodes */
int fill : QL_FILL_BITS; /* fill factor for individual nodes */
unsigned int compress : QL_COMP_BITS; /* depth of end nodes not to compress;0=off */
unsigned int bookmark_count: QL_BM_BITS;
quicklistBookmark bookmarks[];
} quicklist;
- count:所有ziplist中的节点数。
- len:quicklistNode的数量。
- fill:限制ziplist的最大大小,可以通过配置文件来配置。
- compress:压缩程度,0表示不压缩,可通过配置文件配置。
fill
- 当数字为正数时:表示每个节点的
ziplist
最多包含的entry
个数。 - 当数字为负数:
- -1:每个节点的
ziplist
字节大小不能超过4kb。 - -2:每个节点的
ziplist
字节大小不能超过8kb (redis默认值)。 - -3:每个节点的
ziplist
字节大小不能超过16kb。 - -4:每个节点的
ziplist
字节大小不能超过32kb。 - -5:每个节点的
ziplist
字节大小不能超过64kb。
- -1:每个节点的
compress
因为链表的特性,一般首尾两端操作较频繁,中部操作相对较少,所以 redis
提供压缩深度配置:list-compress-depth
,也就是属性 compress
。
- 0:表示都不压缩。这是Redis的默认值。
- 1:表示
quicklist
两端各有1个节点不压缩,中间的节点压缩。 - 2:表示
quicklist
两端各有2个节点不压缩,中间的节点压缩。 - 3:表示
quicklist
两端各有3个节点不压缩,中间的节点压缩。
快表节点构成
typedef struct quicklistNode {
struct quicklistNode *prev;
struct quicklistNode *next;
unsigned char *zl;
unsigned int sz; /* ziplist size in bytes */
unsigned int count : 16; /* count of items in ziplist */
unsigned int encoding : 2; /* RAW==1 or LZF==2 */
unsigned int container : 2; /* NONE==1 or ZIPLIST==2 */
unsigned int recompress : 1; /* was this node previous compressed? */
unsigned int attempted_compress : 1; /* node can't compress; too small */
unsigned int extra : 10; /* more bits to steal for future usage */
} quicklistNode;
- zl:不设置压缩数据参数 recompress 时指向一个 ziplist 结构,设置压缩数据参数recompress 时指向 quicklistLZF 结构。
- sz:ziplist的字节数。
- count:ziplist中包含的节点数量。
- encoding:1表示压缩过,2表示没压缩。
- container:1表示NONE,2表示ziplist。
- recompress:标记 quicklist 节点的 ziplist 之前是否被解压缩过,如果recompress 为 1,则等待被再次压缩。
压缩过的ziplist
typedef struct quicklistLZF {
unsigned int sz; /* LZF size in bytes*/
char compressed[];
} quicklistLZF;
源码分析
快表的初始化
quicklist *quicklistCreate(void) {
struct quicklist *quicklist;
quicklist = zmalloc(sizeof(*quicklist));
quicklist->head = quicklist->tail = NULL;
quicklist->len = 0;
quicklist->count = 0;
quicklist->compress = 0;
quicklist->fill = -2;
quicklist->bookmark_count = 0;
return quicklist;
}
快表的插入
quicklist
可以在头部或者尾部插入数据:quicklist.c/quicklistPushHead
、quicklist.c/quicklistPushTail
,我们就挑一个从头部插入的代码来看看吧
int quicklistPushHead(quicklist *quicklist, void *value, size_t sz) {
quicklistNode *orig_head = quicklist->head;
if (likely(
_quicklistNodeAllowInsert(quicklist->head, quicklist->fill, sz))) {
quicklist->head->zl =
ziplistPush(quicklist->head->zl, value, sz, ZIPLIST_HEAD);
quicklistNodeUpdateSz(quicklist->head);
} else {
quicklistNode *node = quicklistCreateNode();
node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD);
quicklistNodeUpdateSz(node);
_quicklistInsertNodeBefore(quicklist, quicklist->head, node);
}
quicklist->count++;
quicklist->head->count++;
return (orig_head != quicklist->head);
}
首先判断头节点上的ziplist
大小是否超过限制,如果没有超过限制,就插入到ziplist
中;如果ziplist
超过大小限制,则创建一个新的quicklistNode
,再创建新的ziplist
,然后把ziplist
放到节点中,最后把quicklistNode
插入原来的头结点上,形成新的头节点。
当然quicklist
也可以从任意指定的位置插入
REDIS_STATIC void _quicklistInsert(quicklist *quicklist, quicklistEntry *entry,
void *value, const size_t sz, int after) {
int full = 0, at_tail = 0, at_head = 0, full_next = 0, full_prev = 0;
int fill = quicklist->fill;
quicklistNode *node = entry->node;
quicklistNode *new_node = NULL;
if (!node) {
/* we have no reference node, so let's create only node in the list */
D("No node given!");
new_node = quicklistCreateNode();
new_node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD);
__quicklistInsertNode(quicklist, NULL, new_node, after);
new_node->count++;
quicklist->count++;
return;
}
/* Populate accounting flags for easier boolean checks later */
if (!_quicklistNodeAllowInsert(node, fill, sz)) {
D("Current node is full with count %d with requested fill %lu",
node->count, fill);
full = 1;
}
if (after && (entry->offset == node->count)) {
D("At Tail of current ziplist");
at_tail = 1;
if (!_quicklistNodeAllowInsert(node->next, fill, sz)) {
D("Next node is full too.");
full_next = 1;
}
}
if (!after && (entry->offset == 0)) {
D("At Head");
at_head = 1;
if (!_quicklistNodeAllowInsert(node->prev, fill, sz)) {
D("Prev node is full too.");
full_prev = 1;
}
}
/* Now determine where and how to insert the new element */
if (!full && after) {
D("Not full, inserting after current position.");
quicklistDecompressNodeForUse(node);
unsigned char *next = ziplistNext(node->zl, entry->zi);
if (next == NULL) {
node->zl = ziplistPush(node->zl, value, sz, ZIPLIST_TAIL);
} else {
node->zl = ziplistInsert(node->zl, next, value, sz);
}
node->count++;
quicklistNodeUpdateSz(node);
quicklistRecompressOnly(quicklist, node);
} else if (!full && !after) {
D("Not full, inserting before current position.");
quicklistDecompressNodeForUse(node);
node->zl = ziplistInsert(node->zl, entry->zi, value, sz);
node->count++;
quicklistNodeUpdateSz(node);
quicklistRecompressOnly(quicklist, node);
} else if (full && at_tail && node->next && !full_next && after) {
/* If we are: at tail, next has free space, and inserting after:
* - insert entry at head of next node. */
D("Full and tail, but next isn't full; inserting next node head");
new_node = node->next;
quicklistDecompressNodeForUse(new_node);
new_node->zl = ziplistPush(new_node->zl, value, sz, ZIPLIST_HEAD);
new_node->count++;
quicklistNodeUpdateSz(new_node);
quicklistRecompressOnly(quicklist, new_node);
} else if (full && at_head && node->prev && !full_prev && !after) {
/* If we are: at head, previous has free space, and inserting before:
* - insert entry at tail of previous node. */
D("Full and head, but prev isn't full, inserting prev node tail");
new_node = node->prev;
quicklistDecompressNodeForUse(new_node);
new_node->zl = ziplistPush(new_node->zl, value, sz, ZIPLIST_TAIL);
new_node->count++;
quicklistNodeUpdateSz(new_node);
quicklistRecompressOnly(quicklist, new_node);
} else if (full && ((at_tail && node->next && full_next && after) ||
(at_head && node->prev && full_prev && !after))) {
/* If we are: full, and our prev/next is full, then:
* - create new node and attach to quicklist */
D("\tprovisioning new node...");
new_node = quicklistCreateNode();
new_node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD);
new_node->count++;
quicklistNodeUpdateSz(new_node);
__quicklistInsertNode(quicklist, node, new_node, after);
} else if (full) {
/* else, node is full we need to split it. */
/* covers both after and !after cases */
D("\tsplitting node...");
quicklistDecompressNodeForUse(node);
new_node = _quicklistSplitNode(node, entry->offset, after);
new_node->zl = ziplistPush(new_node->zl, value, sz,
after ? ZIPLIST_HEAD : ZIPLIST_TAIL);
new_node->count++;
quicklistNodeUpdateSz(new_node);
__quicklistInsertNode(quicklist, node, new_node, after);
_quicklistMergeNodes(quicklist, node);
}
quicklist->count++;
}
代码老长了,大概可以总结为:
- 当前节点是
NULL
:创建一个新的节点,插入就好。 - 当前节点的
ziplist
大小没有超过限制时:直接插入到ziplist
就好。 - 当前节点的
ziplist
大小超过限制时:- 如果插入的位置是
ziplist
的两端:- 如果相邻的节点的
ziplist
大小没有超过限制,那么就插入到相邻节点的ziplist
中。 - 如果相邻的节点的
ziplist
大小也超过限制,这时需要创建一个新的节点插入。
- 如果相邻的节点的
- 如果插入的位置是
ziplist
的中间:
则需要把当前ziplist
从插入位置 分裂 (_quicklistSplitNode
) 为两个节点,然后把数据插入第二个节点上。
- 如果插入的位置是