带你读《Redis 5设计与源码分析》之三:跳 跃 表-阿里云开发者社区

开发者社区> 被纵养的懒猫> 正文

带你读《Redis 5设计与源码分析》之三:跳 跃 表

简介: 本书从底层源码的角度,对 Redis的数据结构以及持久化、主从复制、哨兵和集群等特性的实现原理进行了详尽的剖析,图文并茂。
+关注继续查看

点击查看第一章
点击查看第二章
第3章

跳 跃 表

有序集合在生活中较常见,如根据成绩对学生进行排名、根据得分对游戏玩家进行排名等。对于有序集合的底层实现,我们可以使用数组、链表、平衡树等结构。数组不便于元素的插入和删除;链表的查询效率低,需要遍历所有元素;平衡树或者红黑树等结构虽然效率高但实现复杂。Redis采用了一种新型的数据结构——跳跃表。跳跃表的效率堪比红黑树,然而其实现却远比红黑树简单。

3.1 简介

在了解跳跃表之前,我们先了解一下有序链表。有序链表是所有元素以递增或递减方式有序排列的数据结构,其中每个节点都有指向下个节点的next指针,最后一个节点的next指针指向NULL。递增有序链表举例如图3-1所示。
image.png
如图3-1所示的有序链表,如果要查询值为51的元素,需要从第一个元素开始依次向后查找、比较才可以找到,查找顺序为1→11→21→31→41→51,共6次比较,时间复杂度为O(N)。有序链表的插入和删除操作都需要先找到合适的位置再修改next指针,修改操作基本不消耗时间,所以插入、删除、修改有序链表的耗时主要在查找元素上。
如果我们将有序链表中的部分节点分层,每一层都是一个有序链表。在查找时优先从最高层开始向后查找,当到达某节点时,如果next节点值大于要查找的值或next指针指向NULL,则从当前节点下降一层继续向后查找,这样是否可以提升查找效率呢?
image.png
分层有序链表如图3-2所示,我们再次查找值为51的节点,查找步骤如下。
1)从第2层开始,1节点比51节点小,向后比较。
2)21节点比51节点小,继续向后比较。第2层21节点的next指针指向NULL,所以从21节点开始需要下降一层到第1层继续向后比较。
3)第1层中,21节点的next节点为41节点,41节点比51节点小,继续向后比较。第1层41节点的next节点为61节点,比要查找的51节点大,所以从41节点开始下降一层到第0层继续向后比较。
4)在第0层,51节点为要查询的节点,节点被找到。
采用图3-2所示的数据结构后,总共查找4次就可以找到51节点,比有序链表少2次。当数据量大时,优势会更明显。
综上所述,通过将有序集合的部分节点分层,由最上层开始依次向后查找,如果本层的next节点大于要查找的值或next节点为NULL,则从本节点开始,降低一层继续向后查找,依次类推,如果找到则返回节点;否则返回NULL。采用该原理查找节点,在节点数量比较多时,可以跳过一些节点,查询效率大大提升,这就是跳跃表的基本思想。
跳跃表的实现过程如图3-3所示。
image.png
从图3-3中我们可以看出跳跃表有如下性质。
1)跳跃表由很多层构成。
2)跳跃表有一个头(header)节点,头节点中有一个64层的结构,每层的结构包含指向本层的下个节点的指针, 指向本层下个节点中间所跨越的节点个数为本层的跨度(span)。
3)除头节点外,层数最多的节点的层高为跳跃表的高度(level),图3-3中跳跃表的高度为3。
4)每层都是一个有序链表,数据递增。
5)除header节点外,一个元素在上层有序链表中出现,则它一定会在下层有序链表中出现。
6)跳跃表每层最后一个节点指向NULL,表示本层有序链表的结束。
7)跳跃表拥有一个tail指针,指向跳跃表最后一个节点。
8)最底层的有序链表包含所有节点,最底层的节点个数为跳跃表的长度(length)(不包括头节点),图3-3中跳跃表的长度为7。
9)每个节点包含一个后退指针,头节点和第一个节点指向NULL;其他节点指向最底层的前一个节点。
跳跃表每个节点维护了多个指向其他节点的指针,所以在跳跃表进行查找、插入、删除操作时可以跳过一些节点,快速找到操作需要的节点。归根结底,跳跃表是以牺牲空间的形式来达到快速查找的目的。跳跃表与平衡树相比,实现方式更简单,只要熟悉有序链表,就可以轻松地掌握跳跃表。

3.2 跳跃表节点与结构

由3.1节我们知道,跳跃表由多个节点构成,每个节点由很多层构成,每层都有指向本层下个节点的指针。那么,Redis中的跳跃表是如何实现的呢?

3.2.1 跳跃表节点

下面我们来看跳跃表节点的zskiplistNode结构体。

typedef struct zskiplistNode {
    sds ele;
    double score;
    struct zskiplistNode *backward;
    struct zskiplistLevel {
        struct zskiplistNode *forward;
        unsigned int span;
    } level[];
} zskiplistNode;

该结构体包含如下属性。
1)ele:用于存储字符串类型的数据。
2)score:用于存储排序的分值。
3)backward:后退指针,只能指向当前节点最底层的前一个节点,头节点和第一个节点—backward指向NULL,从后向前遍历跳跃表时使用。
4)level:为柔性数组。每个节点的数组长度不一样,在生成跳跃表节点时,随机生成一个1~64的值,值越大出现的概率越低。
level数组的每项包含以下两个元素。

  • forward:指向本层下一个节点,尾节点的forward指向NULL。
  • span:forward指向的节点与本节点之间的元素个数。span值越大,跳过的节点个数越多。

跳跃表是Redis有序集合的底层实现方式之一,所以每个节点的ele存储有序集合的成员member值,score存储成员score值。所有节点的分值是按从小到大的方式排序的,当有序集合的成员分值相同时,节点会按member的字典序进行排序。

3.2.2 跳跃表结构

除了跳跃表节点外,还需要一个跳跃表结构来管理节点,Redis使用zskiplist结构体,定义如下:

typedef struct zskiplist {
    struct zskiplistNode *header, *tail;
    unsigned long length;
    int level;
} zskiplist;

该结构体包含如下属性。
1)header:指向跳跃表头节点。头节点是跳跃表的一个特殊节点,它的level数组元素个数为64。头节点在有序集合中不存储任何member和score值,ele值为NULL,score值为0;也不计入跳跃表的总长度。头节点在初始化时,64个元素的forward都指向NULL,span值都为0。
2)tail:指向跳跃表尾节点。
3)length:跳跃表长度,表示除头节点之外的节点总数。
4)level:跳跃表的高度。
通过跳跃表结构体的属性我们可以看到,程序可以在O(1)的时间复杂度下, 快速获取到跳跃表的头节点、尾节点、长度和高度。

3.3 基本操作

我们已经知道了跳跃表节点和跳跃表结构体的定义,下面我们分别介绍跳跃表的创建、插入、查找和删除操作。

3.3.1 创建跳跃表

1.节点层高
节点层高的最小值为1,最大值是ZSKIPLIST_MAXLEVEL,Redis5中节点层高的值为64。

#define ZSKIPLIST_MAXLEVEL 64

Redis通过zslRandomLevel函数随机生成一个1~64的值,作为新建节点的高度,值越大出现的概率越低。节点层高确定之后便不会再修改。生成随机层高的代码如下。

#define ZSKIPLIST_P 0.25      /* Skiplist P = 1/4 */
int zslRandomLevel(void) {
    int level = 1;
    while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
        level += 1;
    return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}

上述代码中,level的初始值为1,通过while循环,每次生成一个随机值,取这个值的低16位作为x,当x小于0.25倍的0xFFFF时,level的值加1;否则退出while循环。最终返回level和ZSKIPLIST_MAXLEVEL两者中的最小值。
下面计算节点的期望层高。假设p=ZSKIPLIST_P:
1)节点层高为1的概率为(1-p)。
2)节点层高为2的概率为p(1-p)。
3)节点层高为3的概率为p2(1-p)。
4)……
5)节点层高为n的概率为pn-1(1-p)。
所以节点的期望层高为

E=1×(1-p)+2×p(1-p)+3×p2(1-p)+…
=(1-p)∑+∞i=1ipi-1
=1/(1-p)

当p=0.25时,跳跃表节点的期望层高为1/(1-0.25)≈1.33。
2.创建跳跃表节点
跳跃表的每个节点都是有序集合的一个元素,在创建跳跃表节点时,待创建节点的层高、分值、member等都已确定。对于跳跃表的每个节点,我们需要申请内存来存储,代码如下。

zskiplistNode *zn =
zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));

zskiplistNode结构体的最后一个元素为柔性数组,申请内存时需要指定柔性数组的大小,一个节点占用的内存大小为zskiplistNode的内存大小与level个zskiplistLevel的内存大小之和。
分配好空间之后,进行节点变量初始化。代码如下。

zn->score = score;
zn->ele = ele;
return zn;

3.头节点
头节点是一个特殊的节点,不存储有序集合的member信息。头节点是跳跃表中第一个插入的节点,其level数组的每项forward都为NULL,span值都为0。

for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
     zsl->header->level[j].forward = NULL;
     zsl->header->level[j].span = 0;
}

4.创建跳跃表的步骤
创建完头节点后,就可以创建跳跃表。创建跳跃表的步骤如下。
1)创建跳跃表结构体对象zsl。
2)将zsl的头节点指针指向新创建的头节点。
3)跳跃表层高初始化为1,长度初始化为0,尾节点指向NULL。
相关代码如下。

zskiplist *zsl;
zsl = zmalloc(sizeof(*zsl)); 
zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
zsl->header->backward = NULL;
zsl->level = 1;
zsl->length = 0;
zsl->tail = NULL;

3.3.2 插入节点

插入节点的步骤:① 查找要插入的位置;② 调整跳跃表高度;③ 插入节点;④ 调整backward。
1.查找要插入的位置
查找是跳跃表操作中使用最多的操作,无论是获取、插入还是删除,都需要查找到指定的节点位置。通过3.1节内容,我们已经大概知道了跳跃表查找的基本逻辑,下面借助跳跃表的插入节点的过程深入了解跳跃表的查找过程。
如图3-4所示的跳跃表,长度为3,高度为2。若要插入一个节点,分值为31,层高为3,则插入节点时查找被更新节点的代码如下。
image.png

x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
    rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
    while (x->level[i].forward &&
            (x->level[i].forward->score < score ||
                (x->level[i].forward->score == score &&
                sdscmp(x->level[i].forward->ele,ele) < 0)))
    {
        rank[i] += x->level[i].span;
        x = x->level[i].forward;
    }
    update[i] = x;
}

为了找到要更新的节点,我们需要以下两个长度为64的数组来辅助操作。
update[]:插入节点时,需要更新被插入节点每层的前一个节点。由于每层更新的节点不一样,所以将每层需要更新的节点记录在update[i]中。
rank[]:记录当前层从header节点到update[i]节点所经历的步长,在更新update[i]的span和设置新插入节点的span时用到。
查找节点(score=31,level=3)的插入位置,逻辑如下。
1)第一次for循环,i=1。x为跳跃表的头节点。
2)此时i的值与zsl->level-1相等,所以rank[1]的值为0。
3)header->level[1].forward存在,并且header->level[1].forward->score==1小于要插入的score,所以可以进入while循环,rank[1]=1,x为第一个节点。
4)第一个节点的第1层的forward指向NULL,所以不会再进入while循环。经过第一次for循环,rank[1]=1。x和update[1]都为第一个节点(score=1)。
5)经过第二次for循环,i=0。x为跳跃表的第一个节点(score=1)。
6)此时i的值与zsl->level-1不相等,所以rank[0]等于rank[1]的值,值为1。
7)x->level[0]->forward存在,并且x->level[0].foreard->score==21小于要插入的score,所以可以进入while循环,rank[0]=2。x为第二个节点(score=21)。
8)x->level[0]->forward存在,并且x->level[0].foreard->score==41大于要插入的score,所以不会再进入while,经过第二次for循环,rank[0]=2。x和update[0]都为第二个节点(score=21)。
update和rank赋值后的跳跃表如图3-5所示。
image.png
2.调整跳跃表高度
由上文可知,插入节点的高度是随机的,假设要插入节点的高度为3,大于跳跃表的高度2,所以我们需要调整跳跃表的高度。代码如下。

level = zslRandomLevel();
for (i = zsl->level; i < level; i++) {
    rank[i] = 0;
    update[i] = zsl->header;
    update[i]->level[i].span = zsl->length;
}
zsl->level = level;

此时,i的值为2,level的值为3,所以只能进入一次for循环。由于header的第0层到第1层的forward都已经指向了相应的节点,而新添加的节点的高度大于跳跃表的原高度,所以第2层只需要更新header节点即可。前面我们介绍过,rank是用来更新span的变量,其值是头节点到update[i]所经过的节点数,而此次修改的是头节点,所以rank[2]为0,update[2]一定为头节点。update[2]->level[2].span的值先赋值为跳跃表的总长度,后续在计算新插入节点level[2]的span时会用到此值。在更新完新插入节点level[2]的span之后会对update[2]->level[2].span的值进行重新计算赋值。
调整高度后的跳跃表如图3-6所示。
image.png
3.插入节点
当update和rank都赋值且节点已创建好后,便可以插入节点了。代码如下。

x = zslCreateNode(level,score,ele);
for (i = 0; i < level; i++) {
    x->level[i].forward = update[i]->level[i].forward;
    update[i]->level[i].forward = x;
    x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
    update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}

level的值为3,所以可以执行三次for循环,插入过程如下。
(1)第一次for循环
1)x的level[0]的forward为update[0]的level[0]的forward节点,即x->level[0].forward为score=41的节点。
2)update[0]的level[0]的下一个节点为新插入的节点。
3)rank[0]-rank[0]=0,update[0]->level[0].span=1,所以x->level[0].span=1。
4)update[0]->level[0].span=0+1=1。
插入节点并更新第0层后的跳跃表如图3-7所示。
image.png
(2)第2次for循环
1)x的level[1]的forward为update[1]的level[1]的forward节点,即x->level[1].forward为NULL。
2)update[1]的level[1]的下一个节点为新插入的节点。
3)rank[0]-rank[1]=1,update[1]->level[1].span=2,所以x->level[1].span=1。
4)update[1]->level[1].span=1+1=2。
插入节点并更新第1层后的跳跃表如图3-8所示。
(3)第3次for循环
1)x的level[2]的forward为update[2]的level[2]的forward节点,即x->level[2].forward为NULL。
2)update[2]的level[2]的下一个节点为新插入的节点。
3)rank[0]-rank[2]=2,因为update[2]->level[2].span=3,所以x->level[2].span=1。
4)update[2]->level[2].span=2+1=3。
插入节点并更新第2层后的跳跃表如图3-9所示。
新插入节点的高度大于原跳跃表高度,所以下面代码不会运行。但如果新插入节点的高度小于原跳跃表高度,则从level到zsl->level-1层的update[i]节点forward不会指向新插入的节点,所以不用更新update[i]的forward指针,只将这些level层的span加1即可。代码如下。
image.png
image.png

for (i = level; i < zsl->level; i++) {
    update[i]->level[i].span++;
}

4.调整backward
根据update的赋值过程,新插入节点的前一个节点一定是update[0],由于每个节点的后退指针只有一个,与此节点的层数无关,所以当插入节点不是最后一个节点时,需要更新被插入节点的backward指向update[0]。如果新插入节点是最后一个节点,则需要更新跳跃表的尾节点为新插入节点。插入节点后,更新跳跃表的长度加1。代码如下。

x->backward = (update[0] == zsl->header) ? NULL : update[0];
if (x->level[0].forward)
    x->level[0].forward->backward = x;
else
    zsl->tail = x;
zsl->length++;
return x;

插入新节点后的跳跃表如图3-10所示。
image.png

3.3.3 删除节点

删除节点的步骤:1)查找需要更新的节点;2)设置span和forward。
图3-10中的跳跃表的长度为3,高度为3,此时删除score=31的节点,将此节点记录为x。
1.查找需要更新的节点
查找需要更新的节点要借助update数组,数组的赋值方式与3.3.2中update的赋值方式相同,不再赘述。查找完毕之后,update[2]=header,update[1]为score=1的节点,update[0]为score=21的节点。删除节点前的跳跃表如图3-11所示。
image.png
2.设置span和forward
删除节点需要设置update数组中每个节点的span和forward。
假设x的第i层的span值为a,update[i]第i层的span值为b,由于删除了一个节点,所以a+b-1的值就是update[i]第i层的span新值。update[i]的第i的新forward就是x节点第i层的forward,这个类似链表删除元素的操作。
如果update[i]第i层的forward不为x,说明update[i]的层高大于x的层高,即update[i]第i层指向了指向了x的后续节点或指向NULL。由于删除了一个节点,所以update[i]的leve[i]的span需要减1。
如果update[i]的forward不为x,在要删除的节点的高度小于跳跃表高度的情况下出现,i大于x高度的节点的forward与x无关,所以这些节点只需更新其span减1即可。
设置span和forward的代码如下。

void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
    int i;
    for (i = 0; i < zsl->level; i++) {
        if (update[i]->level[i].forward == x) {
            update[i]->level[i].span += x->level[i].span - 1;
            update[i]->level[i].forward = x->level[i].forward;
        } else {
            update[i]->level[i].span -= 1;
        }
    }
}

设置span和forward后的跳跃表如图3-12所示。
image.png
update节点更新完毕之后,需要更新backward指针、跳跃表高度和长度。如果x不为最后一个节点,直接将第0层后一个节点的backward赋值为x的backward即可;否则,将跳跃表的尾指针指向x的backward节点即可。代码如下。

if (x->level[0].forward) {
    x->level[0].forward->backward = x->backward;
else {
    zsl->tail = x->backward;
}
while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
    zsl->level--;
zsl->length--;

当删除的x节点是跳跃表的最高节点,并且没有其他节点与x节点的高度相同时,需要将跳跃表的高度减1。
由于删除了一个节点,跳跃表的长度需要减1。
删除节点后的跳跃表如图3-13所示。

3.3.4 删除跳跃表

获取到跳跃表对象之后,从头节点的第0层开始,通过forward指针逐步向后遍历,每遇到一个节点便将释放其内存。当所有节点的内存都被释放之后,释放跳跃表对象,即完成了跳跃表的删除操作。代码如下。
image.png

void zslFree(zskiplist *zsl) {
    zskiplistNode *node = zsl->header->level[0].forward, *next;

    zfree(zsl->header);
    while(node) {
        next = node->level[0].forward;
        zslFreeNode(node);
        node = next;
    }
    zfree(zsl);
}

3.4 跳跃表的应用

在Redis中,跳跃表主要应用于有序集合的底层实现(有序集合的另一种实现方式为压缩列表)。
Redis的配置文件中关于有序集合底层实现的两个配置。
1)zset-max-ziplist-entries 128:zset采用压缩列表时,元素个数最大值。默认值为128。
2)zset-max-ziplist-value 64:zset采用压缩列表时,每个元素的字符串长度最大值。默认值为64。
zset添加元素的主要逻辑位于t_zset.c的zaddGenericCommand函数中。zset插入第一个元素时,会判断下面两种条件:

  • zset-max-ziplist-entries的值是否等于0;
  • zset-max-ziplist-value小于要插入元素的字符串长度。

满足任一条件Redis就会采用跳跃表作为底层实现,否则采用压缩列表作为底层实现方式。

if (server.zset_max_ziplist_entries == 0 ||
    server.zset_max_ziplist_value < sdslen(c->argv[scoreidx+1]->ptr))
{
    zobj = createZsetObject();//创建跳跃表结构
} else {
    zobj = createZsetZiplistObject();//创建压缩列表结构
}

一般情况下,不会将zset-max-ziplist-entries配置成0,元素的字符串长度也不会太长,所以在创建有序集合时,默认使用压缩列表的底层实现。zset新插入元素时,会判断以下两种条件:

  • zset中元素个数大于zset_max_ziplist_entries;
  • 插入元素的字符串长度大于zset_max_ziplist_value。

当满足任一条件时,Redis便会将zset的底层实现由压缩列表转为跳跃表。代码如下。

if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries)
    zsetConvert(zobj,OBJ_ENCODING_SKIPLIST);
if (sdslen(ele) > server.zset_max_ziplist_value)
    zsetConvert(zobj,OBJ_ENCODING_SKIPLIST);

值得注意的是,zset在转为跳跃表之后,即使元素被逐渐删除,也不会重新转为压缩列表。

3.5 本章小结

本章介绍了跳跃表的基本原理和实现过程。跳跃表的原理简单,其查询、插入、删除的平均复杂度都为O(logN)。跳跃表主要应用于有序集合的底层实现。

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
redis redis-2.6.17 安装失败 原因分析
  redis 安装 redis-2.6.17   redis 集群 安装 http://knight-black-bob.iteye.com/blog/2343192       zmalloc.
737 0
[redis设计与实现][3]基本数据结构——字典
Redis字典采用哈希表实现。 哈希表: [cce lang=”c”] typedef struct dictht { //哈希表数组 dictEntry **table; //哈希表大小 unsigned long size; //哈希表掩码,用于计算索引值,总是等于size –
1141 0
更高效地提高redis client多线程操作的并发吞吐设计
Redis是一个非常高效的基于内存的NOSQL数据库,它提供非常高效的数据读写效能.在实际应用中往往是带宽和CLIENT库读写损耗过高导致无法更好地发挥出Redis更出色的能力.下面结合一些redis本身的特性和一些client操作上的改变来提高整个redis操作的交通.
896 0
[redis设计与实现][1]基本数据结构——sds
SDS(Simple Dynamic String):对C字符串的封装,可修改、可自动伸缩的字符串实现。Redis默认的字符串实现。 SDS定义:(sds.h) [cce lang=”c”] struct sdshdr { unsigned int len; unsigned int fr
1515 0
【需求设计】从一个小需求感受Redis的独特魅力
从一个小小的需求看Redis在实际项目是如何使用的
441 0
[redis设计与实现][4]基本数据结构——跳跃表
Redis使用跳跃表和字典共同来实现有序集合键(sorted set)。 定义: 跳跃表节点: [cce lang=”c”] typedef struct zskiplistNode { //成员对象 robj *obj; //分值 double score; //后退指针 st
1805 0
JVM源码分析之System.currentTimeMillis及nanoTime原理详解
##概述 上周`@望陶`问了我一个现象很诡异的问题,说JDK7和JDK8下的`System.nanoTime()`输出完全不一样,而且差距还非常大,是不是两个版本里的实现不一样,之前我也没注意过这个细节,觉得非常奇怪,于是自己也在本地mac机器上马上测试了一下,得到如下输出: ``` ~/
6493 0
560
文章
1795
问答
来源圈子
更多
+ 订阅
文章排行榜
最热
最新
相关电子书
更多
文娱运维技术
立即下载
《SaaS模式云原生数据仓库应用场景实践》
立即下载
《看见新力量:二》电子书
立即下载