Redis数据结构之——跳表skiplist

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: Redis数据结构之——跳表skiplist

写在前面

以下内容是基于Redis 6.2.6 版本整理总结

一、跳表(skiplist)

如何理解跳表?在了解跳表之前,我们先从普通链表开始,一点点揭开跳表的神秘面纱~

首先,普通单链表来说,即使链表是有序的,我们要查找某个元素,也需要从头到尾遍历整个链表。这样效率很低,时间复杂度是O(n)。

那么有没有方法提升查询效率呢?我们可以尝试为链表建立“索引”来提升查询效率。如下图,我们在原始链表的基础上,每两个元素提取一个索引,down指向原始链表的节点:

此时,假如我们要查询值为19的节点,我们从索引层开始遍历,当遍历到16时,下个节点的值为23,所以,19一定在这两个节点之间。我们通过16节点的down指针来到原始链表,将继续遍历,直到找到值为19的节点。在没有建“索引”之前,我们需要遍历8次,才能找到19,而在建立“索引”后,需要6次就能找到,也就是,索引帮我们减少了查询的次数。

那如果我们再建一级索引呢?哈哈哈,没想到吧也是6次,这是因为我们的数据量太少,即便加了两级索引,优化效果也不是很明显。在数据量大时,优化效果还是很明显的,有兴趣可以自己动手画一画。

1.1 跳表的时间复杂度

假设链表有n个节点,每两个节点生成一个索引,则有第一层索引节点的个数为n/2,第二层索引节点的个数是第一层个数的一半n/4,以此类推,第h层节点的个数就是n/(2^h)。假设,第h层有两个节点,则:h = log2n - 1,再算上原始链表,则整个跳表的高度就是log2n。

我们在查询某个数据的时候,每一层需要遍历m个节点,那么在跳表中查询某个数据的时间复杂度就是:O(m*log2n)。那m是多少呢? 按照上面每两个节点上升一个索引节点的索引结构,我们每一层索引最多遍历3个节点,为什么呢?解释如下:

假设我们查找的还是19,在第k层索引中,当我们遍历到11时,发现19在11和23之间,我们通过11的down节点,来到第k-1层。在第k-1层索引中,11 到 23 最多包含3个节点(包含11 和 23 的节点),所以在第k-1索引,我们最多需要遍历3个节点,依次类推,每一层索引都最多只需要遍历3个节点。

通过上面的分析,我们知道了m = 3,也就是说在跳表中查询任意节点的时间复杂度是O(3*log2n),去掉常数项后,时间复杂度就是:O(log2n)。这个查找跟二分查找的时间复杂度一样。换句话说,我们是基于单链表实现了二分查找,神奇吧。但是,这种查询效率的提升是有代价的,也就是我们需要维护多层级索引,才能实现。这也是一种空间换时间的思路。

1.2 空间复杂度

要实现log2n的时间复杂度,跳表就需要额外存储这些索引的空间。那么,需要多大的空间呢?我们来分析一下:

假设原始链表有n个节点,按照每连个节点上升一个索引节点的索引结构,第一层有n/2,第二层n/4,依次类推,第h层有n/2^h个索引节点。假设第h层有2个节点。则总共有:

n/2 + n/4 + n/8 + … + 2 = n-2。所以,跳表的空间复杂度是O(n)

也就是说,如果将含有n个节点的链表构造成跳表,我们还需要额外再用接近n个节点来存储这些索引,还有没有办法较少索引占的空间呢?答案是有的,上面的分析是基于每两个节点上升一个索引节点,那么换成3个、5个呢?如果为3,也很好分析,需要的索引总数为:n/3 + n/9 + n/27 + … + 3 + 1 = n/2。尽管空间复杂还是O(n),但实际上索引的数量已经减少了一半了。

在实际开发中,原始链表中的对象可能是很大的对象,而索引节点只是存储关键的值和指针,相较于原始节点,大小可以忽略不计。

1.3 跳表的插入和删除

我们想在跳表中插入和删除一个节点,第一步是要找到插入和删除的位置,然后再执行插入或者删除,因为跳表的查询时间复杂度是O(log2n),插入和删除的时间复杂度也是O(log2n)。

1.3.1 插入

1.3.2 删除

删除操作就需要注意一下,如果删除的节点也存在于索引节点中,那么,索引中的节点也要删除。单链表中的删除,需要拿到前驱节点的指针,如果是双向链表就不用考虑了。

1.4 跳表索引的动态更新

当我们一直往跳表中添加元素,如果不更新索引就可能出现,某2个索引之间的索引数过多,极端情况下,会退化为单向链表。

作为一种动态数据结构,我们需要某种手段作为索引节点和原始链表大小的平衡,也就是说,当链表中的节点数增多时,也响应的增加一些索引节点,避免复杂度的退化。红黑树和AVL树是通过左旋和右旋来维持左右子树的平衡。跳表则是通过随机函数来维护这种平衡。

2、跳表在Redis中的应用

有序集合 zet 的底层实现就是跳表。大部分情况下,跳表的效率可以和平衡树媲美,平均时间复杂度O(logn),最坏O(n)。

2.1 跳表源码

每次创建一个新的跳表节点时,会根据幂次定律(越大的数出现的概率越小)随机生成一个介于1到32之间的数作为level数组的大小,这个数组大小就是层的高度。level层数确定源码:

// src/t_zset.c
int zslRandomLevel(void) {
    int level = 1;
    while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
        level += 1;
    return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}

两个宏定义

// src/sever.h
#define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^64 elements */
#define ZSKIPLIST_P 0.25      /* Skiplist P = 1/4 */

server.h

// 996行
/* ZSETs use a specialized version of Skiplists */
typedef struct zskiplistNode {
  // sds 对象,唯一的
    sds ele;
    // 分值
    double score;
    // 后退指针,用于从后往前遍历使用
    struct zskiplistNode *backward;
    // 层数
    struct zskiplistLevel {
      // 前进指针
        struct zskiplistNode *forward;
        // 跨度,用来确定本节点再链表中的排位  zrank
        unsigned long span;
    } level[];
} zskiplistNode;
typedef struct zskiplist {
  // 指向跳表头节点和尾节点的指针
    struct zskiplistNode *header, *tail;
    // 跳表中的元素个数,不包含头节点 zcard
    unsigned long length;
    // 跳表中层数最高的节点的层数
    int level;
} zskiplist;
1.3 创建skiplist
zskiplistNode *zslCreateNode(int level, double score, sds ele) {
    zskiplistNode *zn =
        zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
    zn->score = score;
    zn->ele = ele;
    return zn;
}
/* Create a new skiplist. */
zskiplist *zslCreate(void) {
    int j;
    zskiplist *zsl;
    zsl = zmalloc(sizeof(*zsl));
    zsl->level = 1;
    zsl->length = 0;
    zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
    for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
        zsl->header->level[j].forward = NULL;
        zsl->header->level[j].span = 0;
    }
    zsl->header->backward = NULL;
    zsl->tail = NULL;
    return zsl;
}
1.4 跳表的插入和删除
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    unsigned int rank[ZSKIPLIST_MAXLEVEL];
    int i, level;
    serverAssert(!isnan(score));
    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {
        /* store rank that is crossed to reach the insert position */
        rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
        while (x->level[i].forward &&
                (x->level[i].forward->score < score ||
                    (x->level[i].forward->score == score &&
                    sdscmp(x->level[i].forward->ele,ele) < 0)))
        {
            rank[i] += x->level[i].span;
            x = x->level[i].forward;
        }
        update[i] = x;
    }
    /* we assume the element is not already inside, since we allow duplicated
     * scores, reinserting the same element should never happen since the
     * caller of zslInsert() should test in the hash table if the element is
     * already inside or not. */
    level = zslRandomLevel();
    if (level > zsl->level) {
        for (i = zsl->level; i < level; i++) {
            rank[i] = 0;
            update[i] = zsl->header;
            update[i]->level[i].span = zsl->length;
        }
        zsl->level = level;
    }
    x = zslCreateNode(level,score,ele);
    for (i = 0; i < level; i++) {
        x->level[i].forward = update[i]->level[i].forward;
        update[i]->level[i].forward = x;
        /* update span covered by update[i] as x is inserted here */
        x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
        update[i]->level[i].span = (rank[0] - rank[i]) + 1;
    }
    /* increment span for untouched levels */
    for (i = level; i < zsl->level; i++) {
        update[i]->level[i].span++;
    }
    x->backward = (update[0] == zsl->header) ? NULL : update[0];
    if (x->level[0].forward)
        x->level[0].forward->backward = x;
    else
        zsl->tail = x;
    zsl->length++;
    return x;
}
// 删除
void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
    int i;
    for (i = 0; i < zsl->level; i++) {
        if (update[i]->level[i].forward == x) {
            update[i]->level[i].span += x->level[i].span - 1;
            update[i]->level[i].forward = x->level[i].forward;
        } else {
            update[i]->level[i].span -= 1;
        }
    }
    if (x->level[0].forward) {
        x->level[0].forward->backward = x->backward;
    } else {
        zsl->tail = x->backward;
    }
    while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
        zsl->level--;
    zsl->length--;
}

三、总结

  1. 跳表是有序集合zset的实现之一
  2. 跳表由zskiplist 和 zskiplistNode两个结构组成,zskiplist保存跳表的信息,如表头和表尾节点、跳表的长度等,zskiplistNode 保存节点详细信息
  3. 每个跳表节点的层高都是 1~32 之间的随机数
  4. 跳表中的对象是唯一的
  5. 跳表中的元素是按照分值从小到大排列,当分值相同时,按照成员对象的大小排序

文章参考与<零声教育>的C/C++linux服务期高级架构系统教程学习

相关文章
|
17天前
|
消息中间件 缓存 NoSQL
Redis各类数据结构详细介绍及其在Go语言Gin框架下实践应用
这只是利用Go语言和Gin框架与Redis交互最基础部分展示;根据具体业务需求可能需要更复杂查询、事务处理或订阅发布功能实现更多高级特性应用场景。
144 86
|
17天前
|
存储 缓存 NoSQL
Redis基础命令与数据结构概览
Redis是一个功能强大的键值存储系统,提供了丰富的数据结构以及相应的操作命令来满足现代应用程序对于高速读写和灵活数据处理的需求。通过掌握这些基础命令,开发者能够高效地对Redis进行操作,实现数据存储和管理的高性能方案。
57 12
|
16天前
|
存储 消息中间件 NoSQL
【Redis】常用数据结构之List篇:从常用命令到典型使用场景
本文将系统探讨 Redis List 的核心特性、完整命令体系、底层存储实现以及典型实践场景,为读者构建从理论到应用的完整认知框架,助力开发者在实际业务中高效运用这一数据结构解决问题。
|
25天前
|
存储 缓存 NoSQL
【Redis】 常用数据结构之String篇:从SET/GET到INCR的超全教程
无论是需要快速缓存用户信息,还是实现高并发场景下的精准计数,深入理解String的特性与最佳实践,都是提升Redis使用效率的关键。接下来,让我们从基础命令开始,逐步揭开String数据结构的神秘面纱。
|
10月前
|
C语言
【数据结构】栈和队列(c语言实现)(附源码)
本文介绍了栈和队列两种数据结构。栈是一种只能在一端进行插入和删除操作的线性表,遵循“先进后出”原则;队列则在一端插入、另一端删除,遵循“先进先出”原则。文章详细讲解了栈和队列的结构定义、方法声明及实现,并提供了完整的代码示例。栈和队列在实际应用中非常广泛,如二叉树的层序遍历和快速排序的非递归实现等。
872 9
|
10月前
|
存储 算法
非递归实现后序遍历时,如何避免栈溢出?
后序遍历的递归实现和非递归实现各有优缺点,在实际应用中需要根据具体的问题需求、二叉树的特点以及性能和空间的限制等因素来选择合适的实现方式。
221 59
|
3月前
|
编译器 C语言 C++
栈区的非法访问导致的死循环(x64)
这段内容主要分析了一段C语言代码在VS2022中形成死循环的原因,涉及栈区内存布局和数组越界问题。代码中`arr[15]`越界访问,修改了变量`i`的值,导致`for`循环条件始终为真,形成死循环。原因是VS2022栈区从低地址到高地址分配内存,`arr`数组与`i`相邻,`arr[15]`恰好覆盖`i`的地址。而在VS2019中,栈区先分配高地址再分配低地址,因此相同代码表现不同。这说明编译器对栈区内存分配顺序的实现差异会导致程序行为不一致,需避免数组越界以确保代码健壮性。
49 0
栈区的非法访问导致的死循环(x64)
232.用栈实现队列,225. 用队列实现栈
在232题中,通过两个栈(`stIn`和`stOut`)模拟队列的先入先出(FIFO)行为。`push`操作将元素压入`stIn`,`pop`和`peek`操作则通过将`stIn`的元素转移到`stOut`来实现队列的顺序访问。 225题则是利用单个队列(`que`)模拟栈的后入先出(LIFO)特性。通过多次调整队列头部元素的位置,确保弹出顺序符合栈的要求。`top`操作直接返回队列尾部元素,`empty`判断队列是否为空。 两题均仅使用基础数据结构操作,展示了栈与队列之间的转换逻辑。
|
8月前
|
存储 C语言 C++
【C++数据结构——栈与队列】顺序栈的基本运算(头歌实践教学平台习题)【合集】
本关任务:编写一个程序实现顺序栈的基本运算。开始你的任务吧,祝你成功!​ 相关知识 初始化栈 销毁栈 判断栈是否为空 进栈 出栈 取栈顶元素 1.初始化栈 概念:初始化栈是为栈的使用做准备,包括分配内存空间(如果是动态分配)和设置栈的初始状态。栈有顺序栈和链式栈两种常见形式。对于顺序栈,通常需要定义一个数组来存储栈元素,并设置一个变量来记录栈顶位置;对于链式栈,需要定义节点结构,包含数据域和指针域,同时初始化栈顶指针。 示例(顺序栈): 以下是一个简单的顺序栈初始化示例,假设用C语言实现,栈中存储
332 77
|
7月前
|
算法 调度 C++
STL——栈和队列和优先队列
通过以上对栈、队列和优先队列的详细解释和示例,希望能帮助读者更好地理解和应用这些重要的数据结构。
162 11