零基础手把手带你阅读Redis源代码系列-ZSet底层原理详解(跳表SkipList)

本文涉及的产品
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
简介: >之前就说了要来西索Redis,现在来辣!>本文的部分内容参考自《小林Coding》,部分地方根据源代码进行剖析。>Redis源码地址:https://github.com/redis/redis.git

之前就说了要来西索Redis,现在来辣!
本文的部分内容参考自《小林Coding》,部分地方根据源代码进行剖析。
Redis源码地址:https://github.com/redis/redis.git

ZSet

观其面

和Set类似,但是新增了一个排序字段。

image-20230723015004842

常用操作

# 往有序集合key中加入带分值元素
ZADD key score member [[score member]...]   
# 往有序集合key中删除元素
ZREM key member [member...]                 
# 返回有序集合key中元素member的分值
ZSCORE key member
# 返回有序集合key中元素个数
ZCARD key 
# 为有序集合key中元素member的分值加上increment
ZINCRBY key increment member 
# 正序获取有序集合key从start下标到stop下标的元素
ZRANGE key start stop [WITHSCORES]
# 倒序获取有序集合key从start下标到stop下标的元素
ZREVRANGE key start stop [WITHSCORES]
# 返回有序集合中指定分数区间内的成员,分数由低到高排序。
ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count]
# 返回指定成员区间内的成员,按字典正序排列, 分数必须相同。
ZRANGEBYLEX key min max [LIMIT offset count]
# 返回指定成员区间内的成员,按字典倒序排列, 分数必须相同
ZREVRANGEBYLEX key max min [LIMIT offset count]

运算操作

# 并集计算(相同元素分值相加),numberkeys一共多少个key,WEIGHTS每个key对应的分值乘积
ZUNIONSTORE destkey numberkeys key [key...] 
# 交集计算(相同元素分值相加),numberkeys一共多少个key,WEIGHTS每个key对应的分值乘积
ZINTERSTORE destkey numberkeys key [key...]

应用场景

排行榜

点赞数量和内容ID相关联,最后再根据ID去查,或者异步更新也可以

为什么不存全部数据?部分数据会修改,那么可能导致value伪重复,加大了业务复杂度

电话、姓名区间排序

使用有序集合的 ZRANGEBYLEXZREVRANGEBYLEX 可以帮助我们实现电话号码或姓名的排序,我们以 ZRANGEBYLEX (返回指定成员区间内的成员,按 key 正序排列,分数必须相同)为例。

注意:不要在分数不一致的 SortSet 集合中去使用 ZRANGEBYLEX和 ZREVRANGEBYLEX 指令,因为获取的结果会不准确。

1、电话排序

我们可以将电话号码存储到 SortSet 中,然后根据需要来获取号段:

> ZADD phone 0 13100111100 0 13110114300 0 13132110901 
(integer) 3
> ZADD phone 0 13200111100 0 13210414300 0 13252110901 
(integer) 3
> ZADD phone 0 13300111100 0 13310414300 0 13352110901 
(integer) 3

获取所有号码:

> ZRANGEBYLEX phone - +
1) "13100111100"
2) "13110114300"
3) "13132110901"
4) "13200111100"
5) "13210414300"
6) "13252110901"
7) "13300111100"
8) "13310414300"
9) "13352110901"

获取 132 号段的号码:

> ZRANGEBYLEX phone [132 (133
1) "13200111100"
2) "13210414300"
3) "13252110901"

获取132、133号段的号码:

> ZRANGEBYLEX phone [132 (134
1) "13200111100"
2) "13210414300"
3) "13252110901"
4) "13300111100"
5) "13310414300"
6) "13352110901"

2、姓名排序

> zadd names 0 Toumas 0 Jake 0 Bluetuo 0 Gaodeng 0 Aimini 0 Aidehua 
(integer) 6

获取所有人的名字:

> ZRANGEBYLEX names - +
1) "Aidehua"
2) "Aimini"
3) "Bluetuo"
4) "Gaodeng"
5) "Jake"
6) "Toumas"

获取名字中大写字母A开头的所有人:

> ZRANGEBYLEX names [A (B
1) "Aidehua"
2) "Aimini"

获取名字中大写字母 C 到 Z 的所有人:

> ZRANGEBYLEX names [C [Z
1) "Gaodeng"
2) "Jake"
3) "Toumas"

究其身

cv完了,现在来看看源码。

Zset 类型的底层数据结构是由压缩列表或跳表实现的:

  • 如果有序集合的元素个数小于 128 个,并且每个元素的值小于 64 字节时,Redis 会使用压缩列表作为 Zset 类型的底层数据结构;
  • 如果有序集合的元素不满足上面的条件,Redis 会使用跳表作为 Zset 类型的底层数据结构;

在 Redis 7.0 中,压缩列表数据结构已经废弃了,交由 listpack 数据结构来实现了。

listpack和压缩列表在之前讲List的时候就已经讲过了,这里主要来讲一讲跳表。

跳表

跳表是从链表的结构上面改进出来的,可以说是多层链表

image-20230723042434378

图中头节点有 L0~L2 三个头指针,分别指向了不同层级的节点,然后每个层级的节点都通过指针连接起来:

  • L0 层级共有 5 个节点,分别是节点1、2、3、4、5;
  • L1 层级共有 3 个节点,分别是节点 2、3、5;
  • L2 层级只有 1 个节点,也就是节点 3 。

下面我们来看看实际的结构

如上图,记录头节点和尾节点(图上没有,后面细说),我们封装在一起,成了整个链跳表体

typedef struct zskiplist {
   
   
    struct zskiplistNode *header, *tail;    // 头节点、尾节点
    unsigned long length;                    // 跳表长度
    int level;                                // 最高的层级
} zskiplist;

那么节点又是怎么定义的呢?如果让我来做的话,我会用给一个数组来表示下一个节点(详细看上图),不过也确实是这样做的

/* ZSETs use a specialized version of Skiplists */
typedef struct zskiplistNode {
   
   
    sds ele;        // 内容
    double score;    // 用于排序的字段,我们称之为权重
    struct zskiplistNode *backward;    //后向指针 前继
    // level 数组中的每一个元素代表跳表的一层,也就是由 zskiplistLevel 结构体表示,比如 leve[0] 就表示第一层,leve[1] 就表示第二层。
    struct zskiplistLevel {
   
   
        struct zskiplistNode *forward;    // 每层的前向指针 后继
        unsigned long span;        // 跨度
    } level[];    
} zskiplistNode;

这里对前向和后向进行一个解释:
其实就是一个双链表,不过后续指针是一个指针数组,前向指针是后继,后向指针是前继

那么这个跨度是啥?
跨度

用于计算这个节点在跳表中的排位。具体怎么做的呢?因为跳表中的节点都是按序排列的,那么计算某个节点排位的时候,从头节点点到该结点的查询路径上,将==沿途访问过==的所有层的跨度累加起来,得到的结果就是目标节点在跳表中的排位。

没懂?看上图,把这个节点前面的阶段求个前缀和,他们的结果就是跨度,也就是当前节点的编号。

跳表的创建

// 定义最高层数 Redis 7.0 定义为 32,Redis 5.0 定义为 64,Redis 3.0 定义为 32
#define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^64 elements */

/* Create a new skiplist. */
zskiplist *zslCreate(void) {
   
   
    int j;
    zskiplist *zsl;

    zsl = zmalloc(sizeof(*zsl));
    zsl->level = 1;            // 初始只有一层
    zsl->length = 0;
    zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);    // 创建一个节点,节点创建在后面讲
    // 把每一层都初始化
    for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
   
   
        zsl->header->level[j].forward = NULL;
        zsl->header->level[j].span = 0;
    }
    // 初始化跳表
    zsl->header->backward = NULL;
    zsl->tail = NULL;
    return zsl;
}

节点创建

/* Create a skiplist node with the specified number of levels.
 * The SDS string 'ele' is referenced by the node after the call. */
zskiplistNode *zslCreateNode(int level, double score, sds ele) {
   
   
    zskiplistNode *zn =
        zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
    zn->score = score;
    zn->ele = ele;
    return zn;
}

跳表的释放

/* Free a whole skiplist. */
void zslFree(zskiplist *zsl) {
   
   
    zskiplistNode *node = zsl->header->level[0].forward, *next;

    zfree(zsl->header);    // 释放头节点
    // 在初始化的时候直接指向了第一个元素
    while(node) {
   
   
        next = node->level[0].forward;
        // 释放每一个节点
        zslFreeNode(node);
        node = next;
    }
    zfree(zsl);
}

/* Free the specified skiplist node. The referenced SDS string representation
 * of the element is freed too, unless node->ele is set to NULL before calling
 * this function. */
void zslFreeNode(zskiplistNode *node) {
   
   
    sdsfree(node->ele);    // 释放member
    zfree(node);    // 释放节点
}

/* Free an sds string. No operation is performed if 's' is NULL. */
void sdsfree(sds s) {
   
   
    if (s == NULL) return;
    s_free((char*)s-sdsHdrSize(s[-1]));    // s_free就是z_free
}
// 呃呃呃,这不解释了
void zfree(void *ptr) {
   
   
#ifndef HAVE_MALLOC_SIZE
    void *realptr;
    size_t oldsize;
#endif

    if (ptr == NULL) return;
#ifdef HAVE_MALLOC_SIZE
    update_zmalloc_stat_free(zmalloc_size(ptr));
    free(ptr);
#else
    realptr = (char*)ptr-PREFIX_SIZE;
    oldsize = *((size_t*)realptr);
    update_zmalloc_stat_free(oldsize+PREFIX_SIZE);
    free(realptr);
#endif
}

跳表节点查询过程

查找一个跳表节点的过程时,跳表会从头节点的最高层开始,逐一遍历每一层。在遍历某一层的跳表节点时,会用跳表节点中的 SDS 类型的元素和元素的权重来进行判断,共有两个判断条件:

  • 如果当前节点的权重 < 要查找的权重时,跳表就会访问该层上的下一个节点。
  • 如果当前节点的权重 = 要查找的权重时,并且当前节点的 SDS 类型数据 < 要查找的数据时,跳表就会访问该层上的下一个节点。

如果上面两个条件都不满足,或者下一个节点为空时,跳表就会使用目前遍历到的节点的 level 数组里的下一层指针,然后沿着下一层指针继续查找,这就相当于跳到了下一层接着查找。

举个例子,下图有个 3 层级的跳表。

image-20230723053340966

如果要查找「元素:abcd,权重:4」的节点,查找的过程是这样的:

  • 先从头节点的最高层开始,L2 指向了「元素:abc,权重:3」节点,这个节点的权重比要查找节点的小,所以要访问该层上的下一个节点;
  • 但是该层的下一个节点是空节点( leve[2]指向的是空节点),于是就会跳到「元素:abc,权重:3」节点的下一层去找,也就是 leve[1];
  • 「元素:abc,权重:3」节点的 leve[1] 的下一个指针指向了「元素:abcde,权重:4」的节点,然后将其和要查找的节点比较。虽然「元素:abcde,权重:4」的节点的权重和要查找的权重相同,但是当前节点的 SDS 类型数据 > 要查找的数据,所以会继续跳到「元素:abc,权重:3」节点的下一层去找,也就是 leve[0];
  • 「元素:abc,权重:3」节点的 leve[0] 的下一个指针指向了「元素:abcd,权重:4」的节点,该节点正是要查找的节点,查询结束。
// 这里是查找的源码,来自下面新增节点的函数
    x = zsl->header;
    //从顶层开始,遍历开始,每次移动不只是移动头节点的level,是所有节点的level,每次都是从上一次遍历到的节点开始
    for (i = zsl->level-1; i >= 0; i--) {
   
   
        /* store rank that is crossed to reach the insert position */
        rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];        //如果从头开始,跨度为0,否则累加上一层已经计算的跨度
        while (x->level[i].forward &&
                (x->level[i].forward->score < score ||            // 权值小于目标
                    (x->level[i].forward->score == score &&
                    sdscmp(x->level[i].forward->ele,ele) < 0)))    // 当前节点的memer小于目标节点
        {
   
   
            rank[i] += x->level[i].span;        //计算跨度
            x = x->level[i].forward;            //到下一个索引位置
        }
        update[i] = x;        //记录当前层开始修改的节点的位置
    }

新增节点

/* Insert a new node in the skiplist. Assumes the element does not already
 * exist (up to the caller to enforce that). The skiplist takes ownership
 * of the passed SDS string 'ele'. */
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
   
   
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    unsigned long rank[ZSKIPLIST_MAXLEVEL];
    int i, level;

    serverAssert(!isnan(score));
    x = zsl->header;
    //从顶层开始,遍历开始,每次移动不只是移动头节点的level,是所有节点的level,每次都是从上一次遍历到的节点开始
    for (i = zsl->level-1; i >= 0; i--) {
   
   
        /* store rank that is crossed to reach the insert position */
        rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];        //如果从头开始,跨度为0,否则累加上一层已经计算的跨度
        while (x->level[i].forward &&
                (x->level[i].forward->score < score ||            // 权值小于目标
                    (x->level[i].forward->score == score &&
                    sdscmp(x->level[i].forward->ele,ele) < 0)))    // 当前节点的Memer小于目标节点
        {
   
   
            rank[i] += x->level[i].span;        //计算跨度
            x = x->level[i].forward;            //到下一个索引位置
        }
        update[i] = x;        //记录当前层开始修改的节点的位置
    }
    /* we assume the element is not already inside, since we allow duplicated
     * scores, reinserting the same element should never happen since the
     * caller of zslInsert() should test in the hash table if the element is
     * already inside or not. */
     // 假设要插入的元素在跳跃表中尚不存在,因为跳跃表允许有相同的分值(score)。
    // 再次插入相同的元素是不应该发生的,因为调用 zslInsert() 函数的调用者应该在哈希表(或其他数据结构)中先测试元素是否已经存在。
    // 在跳跃表中,允许多个节点具有相同的分值,但它们的元素值必须不同。
    level = zslRandomLevel();    //随机生成一个层数
    if (level > zsl->level) {
   
           
        // 如果这个level达到新高,那么往上面走,走到顶全部初始化(你可以理解为对level进行扩容)
        for (i = zsl->level; i < level; i++) {
   
   
            rank[i] = 0;
            update[i] = zsl->header;
            // 新的level,那么再old_level到level之间的跨度每层都是length
            update[i]->level[i].span = zsl->length;
        }
        zsl->level = level;
    }
    x = zslCreateNode(level,score,ele);    // 创建一个新的节点
    for (i = 0; i < level; i++) {
   
   
        //插入节点(基础知识:单链表的节点插入),修改第Li层的后继
        x->level[i].forward = update[i]->level[i].forward;    
        update[i]->level[i].forward = x;

        /* update span covered by update[i] as x is inserted here */
        // (rank[0] - rank[i])为新节点到前一层节点的跨度差
        // update[i]->level[i].span 表示的是原本前一个节点到后一个节点建的跨度
        // 这里新来了一个节点,前一个节点新节点的跨度是rank[0] - rank[i],另外一个就互补,见一下就可以了
        x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
        // 修改前一个节点的跨度,但是这个节点本身也算一个系欸但,所以要加1
        update[i]->level[i].span = (rank[0] - rank[i]) + 1;
    }

    /* increment span for untouched levels */
    // 前面我们只修改了同层,上面的每一层因为中间多了一个,所以跨度加一
    for (i = level; i < zsl->level; i++) {
   
   
        update[i]->level[i].span++;
    }
    // 修改x的前继指针
    x->backward = (update[0] == zsl->header) ? NULL : update[0];
    if (x->level[0].forward)
        x->level[0].forward->backward = x;    // 只修改最下面一层的
    else
        zsl->tail = x;    //把x放在最后
    zsl->length++;
    return x;
}

删除节点

/* Delete an element with matching score/element from the skiplist.
 * The function returns 1 if the node was found and deleted, otherwise
 * 0 is returned.
 *
 * If 'node' is NULL the deleted node is freed by zslFreeNode(), otherwise
 * it is not freed (but just unlinked) and *node is set to the node pointer,
 * so that it is possible for the caller to reuse the node (including the
 * referenced SDS string at node->ele). */
int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) {
   
   
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    int i;

    x = zsl->header;
    // 这里再将新增的时候讲过了
    for (i = zsl->level-1; i >= 0; i--) {
   
   
        while (x->level[i].forward &&
                (x->level[i].forward->score < score ||
                    (x->level[i].forward->score == score &&
                     sdscmp(x->level[i].forward->ele,ele) < 0)))
        {
   
   
            x = x->level[i].forward;
        }
        update[i] = x;
    }
    /* We may have multiple elements with the same score, what we need
     * is to find the element with both the right score and object. */
    x = x->level[0].forward;
    if (x && score == x->score && sdscmp(x->ele,ele) == 0) {
   
   
        // 进行节点删除,并且把关联系欸但进行修改
        zslDeleteNode(zsl, x, update);
        if (!node)
            // 把空间释放
            zslFreeNode(x);
        else
            *node = x;
        return 1;
    }
    return 0; /* not found */
}
解决连锁反应
void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
   
   
    int i;
    for (i = 0; i < zsl->level; i++) {
   
   
        if (update[i]->level[i].forward == x) {
   
   
            update[i]->level[i].span += x->level[i].span - 1;
            update[i]->level[i].forward = x->level[i].forward;
        } else {
   
   
            update[i]->level[i].span -= 1;
        }
    }
    if (x->level[0].forward) {
   
   
        x->level[0].forward->backward = x->backward;
    } else {
   
   
        zsl->tail = x->backward;
    }
    while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
        zsl->level--;
    zsl->length--;
}
相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
4天前
|
缓存 NoSQL Apache
【Redis】布隆过滤器原理与应用
【Redis】布隆过滤器原理与应用
20 1
|
4天前
|
负载均衡 NoSQL 关系型数据库
深入浅出Redis(六):Redis的主从架构与主从复制原理
深入浅出Redis(六):Redis的主从架构与主从复制原理
|
4天前
|
SQL NoSQL Java
Redis数据类型 Hash Set Zset Bitmap HyperLogLog GEO
Redis数据类型 Hash Set Zset Bitmap HyperLogLog GEO
18 0
|
4天前
|
监控 NoSQL 算法
深入剖析Redis哨兵模式的原理和应用
Redis的哨兵模式是实现高可用性和自动故障转移的机制,当主服务器故障时,哨兵能自动检测并进行故障转移,确保服务连续和稳定性。哨兵模式通过监控主从服务器状态、自动故障转移、防止数据不一致,提高容错能力和负载均衡,降低运维成本,实现高可用性。哨兵通过检测主观下线和客观下线状态,以及选举Leader Sentinel来协调故障转移。Raft算法在其中用于领导者选举和状态一致性。哨兵模式通过综合考虑多种因素选举新主服务器并执行故障转移,保障集群稳定运行。
88 0
深入剖析Redis哨兵模式的原理和应用
|
4天前
|
存储 NoSQL Java
redis zset详解:排行榜绝佳选择
新发布的App中,搜索功能使用Redis的有序集合(ZSET)来显示四个热门搜索词。由于应用初期,热门搜索显示的是测试词汇,为提升专业形象,计划删除这些测试词。文章介绍了ZSET的特性,如有序性、唯一性和快速查找,并讲解了如何在命令行中操作ZSET。此外,还分享了利用ZSET实现热搜功能的思路,每次搜索时增加对应词的分数以实现排序。最后,提供了Java代码示例展示了如何在Redisson中操作ZSET数据,以及如何实现热搜词汇功能。
56 1
|
4天前
|
存储 NoSQL Redis
Redis入门到通关之Redis数据结构-ZSet篇
Redis入门到通关之Redis数据结构-ZSet篇
23 1
|
4天前
|
存储 NoSQL Redis
Redis入门到通关之数据结构解析-SkipList
Redis入门到通关之数据结构解析-SkipList
30 0
|
4天前
|
NoSQL Redis
Redis入门到通关之Redis主从数据同步原理
Redis入门到通关之Redis主从数据同步原理
31 0
|
4天前
|
存储 NoSQL Redis
Redis入门到通关之Redission原理
Redis入门到通关之Redission原理
23 0
|
4天前
|
存储 NoSQL Java
Redis入门到通关之ZSet命令
Redis入门到通关之ZSet命令
18 0