写在前面
以下内容是基于Redis 6.2.6 版本整理总结
一、跳表(skiplist)
如何理解跳表?在了解跳表之前,我们先从普通链表开始,一点点揭开跳表的神秘面纱~
首先,普通单链表来说,即使链表是有序的,我们要查找某个元素,也需要从头到尾遍历整个链表。这样效率很低,时间复杂度是O(n)。
那么有没有方法提升查询效率呢?我们可以尝试为链表建立“索引”来提升查询效率。如下图,我们在原始链表的基础上,每两个元素提取一个索引,down指向原始链表的节点:
此时,假如我们要查询值为19的节点,我们从索引层开始遍历,当遍历到16时,下个节点的值为23,所以,19一定在这两个节点之间。我们通过16节点的down指针来到原始链表,将继续遍历,直到找到值为19的节点。在没有建“索引”之前,我们需要遍历8次,才能找到19,而在建立“索引”后,需要6次就能找到,也就是,索引帮我们减少了查询的次数。
那如果我们再建一级索引呢?哈哈哈,没想到吧也是6次,这是因为我们的数据量太少,即便加了两级索引,优化效果也不是很明显。在数据量大时,优化效果还是很明显的,有兴趣可以自己动手画一画。
1.1 跳表的时间复杂度
假设链表有n个节点,每两个节点生成一个索引,则有第一层索引节点的个数为n/2,第二层索引节点的个数是第一层个数的一半n/4,以此类推,第h层节点的个数就是n/(2^h)。假设,第h层有两个节点,则:h = log2n - 1,再算上原始链表,则整个跳表的高度就是log2n。
我们在查询某个数据的时候,每一层需要遍历m个节点,那么在跳表中查询某个数据的时间复杂度就是:O(m*log2n)。那m是多少呢? 按照上面每两个节点上升一个索引节点的索引结构,我们每一层索引最多遍历3个节点,为什么呢?解释如下:
假设我们查找的还是19,在第k层索引中,当我们遍历到11时,发现19在11和23之间,我们通过11的down节点,来到第k-1层。在第k-1层索引中,11 到 23 最多包含3个节点(包含11 和 23 的节点),所以在第k-1索引,我们最多需要遍历3个节点,依次类推,每一层索引都最多只需要遍历3个节点。
通过上面的分析,我们知道了m = 3,也就是说在跳表中查询任意节点的时间复杂度是O(3*log2n),去掉常数项后,时间复杂度就是:O(log2n)。这个查找跟二分查找的时间复杂度一样。换句话说,我们是基于单链表实现了二分查找,神奇吧。但是,这种查询效率的提升是有代价的,也就是我们需要维护多层级索引,才能实现。这也是一种空间换时间的思路。
1.2 空间复杂度
要实现log2n的时间复杂度,跳表就需要额外存储这些索引的空间。那么,需要多大的空间呢?我们来分析一下:
假设原始链表有n个节点,按照每连个节点上升一个索引节点的索引结构,第一层有n/2,第二层n/4,依次类推,第h层有n/2^h个索引节点。假设第h层有2个节点。则总共有:
n/2 + n/4 + n/8 + … + 2 = n-2。所以,跳表的空间复杂度是O(n)。
也就是说,如果将含有n个节点的链表构造成跳表,我们还需要额外再用接近n个节点来存储这些索引,还有没有办法较少索引占的空间呢?答案是有的,上面的分析是基于每两个节点上升一个索引节点,那么换成3个、5个呢?如果为3,也很好分析,需要的索引总数为:n/3 + n/9 + n/27 + … + 3 + 1 = n/2。尽管空间复杂还是O(n),但实际上索引的数量已经减少了一半了。
在实际开发中,原始链表中的对象可能是很大的对象,而索引节点只是存储关键的值和指针,相较于原始节点,大小可以忽略不计。
1.3 跳表的插入和删除
我们想在跳表中插入和删除一个节点,第一步是要找到插入和删除的位置,然后再执行插入或者删除,因为跳表的查询时间复杂度是O(log2n),插入和删除的时间复杂度也是O(log2n)。
1.3.1 插入
1.3.2 删除
删除操作就需要注意一下,如果删除的节点也存在于索引节点中,那么,索引中的节点也要删除。单链表中的删除,需要拿到前驱节点的指针,如果是双向链表就不用考虑了。
1.4 跳表索引的动态更新
当我们一直往跳表中添加元素,如果不更新索引就可能出现,某2个索引之间的索引数过多,极端情况下,会退化为单向链表。
作为一种动态数据结构,我们需要某种手段作为索引节点和原始链表大小的平衡,也就是说,当链表中的节点数增多时,也响应的增加一些索引节点,避免复杂度的退化。红黑树和AVL树是通过左旋和右旋来维持左右子树的平衡。跳表则是通过随机函数来维护这种平衡。
2、跳表在Redis中的应用
有序集合 zet 的底层实现就是跳表。大部分情况下,跳表的效率可以和平衡树媲美,平均时间复杂度O(logn),最坏O(n)。
2.1 跳表源码
每次创建一个新的跳表节点时,会根据幂次定律(越大的数出现的概率越小)随机生成一个介于1到32之间的数作为level数组的大小,这个数组大小就是层的高度。level层数确定源码:
// src/t_zset.c int zslRandomLevel(void) { int level = 1; while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF)) level += 1; return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL; }
两个宏定义
// src/sever.h #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^64 elements */ #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
server.h
// 996行 /* ZSETs use a specialized version of Skiplists */ typedef struct zskiplistNode { // sds 对象,唯一的 sds ele; // 分值 double score; // 后退指针,用于从后往前遍历使用 struct zskiplistNode *backward; // 层数 struct zskiplistLevel { // 前进指针 struct zskiplistNode *forward; // 跨度,用来确定本节点再链表中的排位 zrank unsigned long span; } level[]; } zskiplistNode; typedef struct zskiplist { // 指向跳表头节点和尾节点的指针 struct zskiplistNode *header, *tail; // 跳表中的元素个数,不包含头节点 zcard unsigned long length; // 跳表中层数最高的节点的层数 int level; } zskiplist;
1.3 创建skiplist
zskiplistNode *zslCreateNode(int level, double score, sds ele) { zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel)); zn->score = score; zn->ele = ele; return zn; } /* Create a new skiplist. */ zskiplist *zslCreate(void) { int j; zskiplist *zsl; zsl = zmalloc(sizeof(*zsl)); zsl->level = 1; zsl->length = 0; zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL); for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) { zsl->header->level[j].forward = NULL; zsl->header->level[j].span = 0; } zsl->header->backward = NULL; zsl->tail = NULL; return zsl; }
1.4 跳表的插入和删除
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) { zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x; unsigned int rank[ZSKIPLIST_MAXLEVEL]; int i, level; serverAssert(!isnan(score)); x = zsl->header; for (i = zsl->level-1; i >= 0; i--) { /* store rank that is crossed to reach the insert position */ rank[i] = i == (zsl->level-1) ? 0 : rank[i+1]; while (x->level[i].forward && (x->level[i].forward->score < score || (x->level[i].forward->score == score && sdscmp(x->level[i].forward->ele,ele) < 0))) { rank[i] += x->level[i].span; x = x->level[i].forward; } update[i] = x; } /* we assume the element is not already inside, since we allow duplicated * scores, reinserting the same element should never happen since the * caller of zslInsert() should test in the hash table if the element is * already inside or not. */ level = zslRandomLevel(); if (level > zsl->level) { for (i = zsl->level; i < level; i++) { rank[i] = 0; update[i] = zsl->header; update[i]->level[i].span = zsl->length; } zsl->level = level; } x = zslCreateNode(level,score,ele); for (i = 0; i < level; i++) { x->level[i].forward = update[i]->level[i].forward; update[i]->level[i].forward = x; /* update span covered by update[i] as x is inserted here */ x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]); update[i]->level[i].span = (rank[0] - rank[i]) + 1; } /* increment span for untouched levels */ for (i = level; i < zsl->level; i++) { update[i]->level[i].span++; } x->backward = (update[0] == zsl->header) ? NULL : update[0]; if (x->level[0].forward) x->level[0].forward->backward = x; else zsl->tail = x; zsl->length++; return x; } // 删除 void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) { int i; for (i = 0; i < zsl->level; i++) { if (update[i]->level[i].forward == x) { update[i]->level[i].span += x->level[i].span - 1; update[i]->level[i].forward = x->level[i].forward; } else { update[i]->level[i].span -= 1; } } if (x->level[0].forward) { x->level[0].forward->backward = x->backward; } else { zsl->tail = x->backward; } while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL) zsl->level--; zsl->length--; }
三、总结
- 跳表是有序集合zset的实现之一
- 跳表由zskiplist 和 zskiplistNode两个结构组成,zskiplist保存跳表的信息,如表头和表尾节点、跳表的长度等,zskiplistNode 保存节点详细信息
- 每个跳表节点的层高都是 1~32 之间的随机数
- 跳表中的对象是唯一的
- 跳表中的元素是按照分值从小到大排列,当分值相同时,按照成员对象的大小排序