一、跳表(zsl)的基本结构
了解redis中地跳表zsl之前,首先要了解理想的跳表
理想跳表:
上图总共有4层,自下往上,分别为level=1,2,3,4
每层都有个span,但是最底层,不需要span=1,因为直接forward就行了。
span=2是什么意思呢?比如在1号节点,可以直接跳到3号节点。
每个节点有好几个level,比如1号节点,有span=2,4,8,它可以跳到2,3,5,8这几个节点。
因此,在访问8号节点的时候,如果使用普通链表,那么需要从1号节点遍历到8号节点,比较费事。如果使用了跳表,那么可以直接可以由1号节点跳到8号节点,从而提升查找性能。
而redis中的跳表(zsl),不同于理想跳表,在插入删除上性能更加高。但主要原理是一样的。
1、跳表结构zskiplist
typedef struct zskiplist { struct zskiplistNode *header, *tail; unsigned long length; int level; } zskiplist;
header
,tail
: 链表的头和尾,数据全在底层,是个双向链表length
:最底层链表的长度(节点数量),也就是命令zcard的返回值level
:跳表的总层数(比如,上图总层数为4)
2、跳表节点zskiplistNode
/* ZSETs use a specialized version of Skiplists */ typedef struct zskiplistNode { sds ele; double score; struct zskiplistNode *backward; struct zskiplistLevel { struct zskiplistNode *forward; unsigned long span; } level[]; } zskiplistNode;
ele
:member,具体的值score
:根据score用来排序的,因为zset是一个有序集合backward
: 链表当前节点的上一个节点struct zskiplistLevel level[]
:每个层级结构体,(是一个柔性数组(变长数组))
forward
:跳表对应层级的下一个节点
span
:对应层级的跳跃的长度
score、member是什么,看到这个redis命令就知道了
zadd key score member
二、跳表的创建与删除
1、创建一个跳表节点zslCreateNode
zskiplistNode *zslCreateNode(int level, double score, sds ele) { zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel)); zn->score = score; zn->ele = ele; return zn; }
2、创建跳表zslCreate
/* Create a new skiplist. */ zskiplist *zslCreate(void) { int j; zskiplist *zsl; zsl = zmalloc(sizeof(*zsl)); zsl->level = 1; zsl->length = 0; zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL); for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) { zsl->header->level[j].forward = NULL; zsl->header->level[j].span = 0; } zsl->header->backward = NULL; zsl->tail = NULL; return zsl; }
3、删除一个跳表节点zslFreeNode
void zslFreeNode(zskiplistNode *node) { sdsfree(node->ele); zfree(node); }
4、删除跳表zslFree
/* Free a whole skiplist. */ void zslFree(zskiplist *zsl) { zskiplistNode *node = zsl->header->level[0].forward, *next; zfree(zsl->header); while(node) { next = node->level[0].forward; zslFreeNode(node); node = next; } zfree(zsl); }
三、跳表查找节点
1、跳表查找RankzslGetRank
查找元素的排序rank,如果不存在则返回0
是通过score从小到大排序的
从level最顶层开始查找
- 如果当前节点的score 小于 要查找的score
- 如果当前节点的score和要查找的score 相等,则比较字符串(sds)小的
如果上述条件满足之一,就向右移动span长度,直到不满足为止。
然后跳跃至下一层,直到最后一层为止。
比较最终选择的节点和 要查找的节点,如果score和ele都相等,则返回对应的rank
/* Find the rank for an element by both score and key. * Returns 0 when the element cannot be found, rank otherwise. * Note that the rank is 1-based due to the span of zsl->header to the * first element. */ unsigned long zslGetRank(zskiplist *zsl, double score, sds ele) { zskiplistNode *x; unsigned long rank = 0; int i; x = zsl->header; for (i = zsl->level-1; i >= 0; i--) { while (x->level[i].forward && (x->level[i].forward->score < score || (x->level[i].forward->score == score && sdscmp(x->level[i].forward->ele,ele) <= 0))) { rank += x->level[i].span; x = x->level[i].forward; } /* x might be equal to zsl->header, so test if obj is non-NULL */ if (x->ele && x->score == score && sdscmp(x->ele,ele) == 0) { return rank; } } return 0; }
四、跳表插入与删除节点
1、理想跳表与redis中跳表区别
理想的跳表是,每间隔一个节点,创建一层,但是如果对理想跳表结构进行删除增加操作,很有可能改变跳表结构;如果重构理想结构,将是巨大的运算;考虑用概率的方法来进行优化。zslRandomLevel
是通过概率,来获得要修改的层,这样可以避免,大幅度修改整个跳表结构。经过证明,当数据量足够大(256)时,通过概率构造的跳表趋向于理想跳表,并且此时如果删除节点,无需重构跳表结构,此时依然趋向于理想跳表。
从节约内存出发,redis 考虑牺牲一点时间复杂度让跳表结构更加变扁平,就像二叉堆改成四叉堆结构;并且redis 还限制了跳表的最高层级为 32 ;节点数量大于 128 或者有一个字符串长度大于 64 ,则使用跳表( skiplist )
因此在redis跳表中,当向level=1 (底层)插入节点的时候,通过zslRandomLevel
概率计算获取层数,对小于该层的就行修改指针关系。目的是为了节约计算量,如果每次都全部调整,会影响性能,通过随机算法,有选择地进行调整。
2、获得随机levelzslRandomLevel
获得跳表一个随机最高的层,ZSKIPLIST_P
默认为1/4
得到第一层的概率为0.75,第二层的概率为(1-0.25)*0.75,以此类推,但不超过ZSKIPLIST_MAXLEVEL
(默认32层)
/* Returns a random level for the new skiplist node we are going to create. * The return value of this function is between 1 and ZSKIPLIST_MAXLEVEL * (both inclusive), with a powerlaw-alike distribution where higher * levels are less likely to be returned. */ int zslRandomLevel(void) { int level = 1; while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF)) level += 1; return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL; }
3、插入节点zslInsert
/* Insert a new node in the skiplist. Assumes the element does not already * exist (up to the caller to enforce that). The skiplist takes ownership * of the passed SDS string 'ele'. */ zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) { zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x; //update:记录从顶层到插入位置进行跳跃的节点 unsigned long rank[ZSKIPLIST_MAXLEVEL];//rank:记录每层从header到当前位置经过的总路径,rank[0]就是header到插入位置前一个节点的总路径了 int i, level; serverAssert(!isnan(score)); x = zsl->header; for (i = zsl->level-1; i >= 0; i--) { /* store rank that is crossed to reach the insert position */ rank[i] = i == (zsl->level-1) ? 0 : rank[i+1]; //横向查找,如果下一个节点存在,并且下一个节点score小于要插入的score,就横向继续移动。如果不满足要求,就往下跳一层 while (x->level[i].forward && (x->level[i].forward->score < score || (x->level[i].forward->score == score && sdscmp(x->level[i].forward->ele,ele) < 0))) { rank[i] += x->level[i].span;//记录每一层,走过的总路径 x = x->level[i].forward; } update[i] = x;//记录每层要跳跃的节点(下一轮循环,就是下一层了) } /* we assume the element is not already inside, since we allow duplicated * scores, reinserting the same element should never happen since the * caller of zslInsert() should test in the hash table if the element is * already inside or not. */ //根据随机算法去获得一个随机的level,用于调整不大于该level的跳表结构,而不是整个跳表调整,是为了保证插入性能 level = zslRandomLevel(); //如果随机得到的level值比当前level还要大,就要设置最大层为新的level if (level > zsl->level) { for (i = zsl->level; i < level; i++) { rank[i] = 0; update[i] = zsl->header; update[i]->level[i].span = zsl->length; } zsl->level = level; } x = zslCreateNode(level,score,ele);//创建要插入的节点 //插入节点,并建立指针关系 for (i = 0; i < level; i++) { x->level[i].forward = update[i]->level[i].forward;//x指向当前层最后一跳的节点 的下一个 update[i]->level[i].forward = x;//当前层最后一跳的节点 指向 x /* update span covered by update[i] as x is inserted here */ x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);//x跳到,level[i]原来要跳的节点,还需要的span。也就是说原来update[i]->level[i]要跳s1长度,但是它跳到x节点,跳了s2,那么x还要跳s1-s2长度才能到达原先要跳的地方 update[i]->level[i].span = (rank[0] - rank[i]) + 1;//由于在level[i]对应节点之后插了一个节点x,那么如果要跳到原来下一个节点,那么需要+1 } /* increment span for untouched levels */ //由于插入了一个节点,大于之前最大level的层,跨度要是整个跳表,因此每层数量都要+1 for (i = level; i < zsl->level; i++) { update[i]->level[i].span++; } //插入到最底层的节点update[0]后面(比x的score小又最接近的节点) x->backward = (update[0] == zsl->header) ? NULL : update[0]; if (x->level[0].forward) x->level[0].forward->backward = x; else zsl->tail = x; zsl->length++; return x; }
update
:记录从顶层到插入位置进行跳跃的节点
rank
:记录每层从header到当前位置经过的总路径,rank[0]就是header到插入位置的总路径了
下图为理解update和rank的一个例子:
理解插入过程
假设现在处于update[1],要调整与x(新插入的节点),之间的指针关系
比如当前i=1
- x指向 update[1]原来应该指向的位置
x->level[i].forward = update[i]->level[i].forward;
- update[1]指向新插入的x
update[i]->level[i].forward = x
- x的span=1
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i])
- 新的update[1]的span=2 (其中rank[0]-rank[1]=1,那么1+1=2)
update[i]->level[i].span = (rank[0] - rank[i]) + 1
可以发现,对于顶底的span会越来越小,而顶层的span是整个跳表的长度。
redis中的跳表并不是理想的跳表,每层的span不一定相等,只对经常插入的位置进行再细分span。
4、删除节点zslDelete
、zslDeleteNode
zslDelete
int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) { zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x; int i; x = zsl->header; for (i = zsl->level-1; i >= 0; i--) { while (x->level[i].forward && (x->level[i].forward->score < score || (x->level[i].forward->score == score && sdscmp(x->level[i].forward->ele,ele) < 0))) { x = x->level[i].forward; } update[i] = x; } /* We may have multiple elements with the same score, what we need * is to find the element with both the right score and object. */ x = x->level[0].forward; if (x && score == x->score && sdscmp(x->ele,ele) == 0) { zslDeleteNode(zsl, x, update); if (!node) zslFreeNode(x); else *node = x; return 1; } return 0; /* not found */ }
zslDeleteNode
/* Internal function used by zslDelete, zslDeleteRangeByScore and * zslDeleteRangeByRank. */ void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) { int i; for (i = 0; i < zsl->level; i++) { if (update[i]->level[i].forward == x) { update[i]->level[i].span += x->level[i].span - 1; update[i]->level[i].forward = x->level[i].forward; } else { update[i]->level[i].span -= 1; } } if (x->level[0].forward) { x->level[0].forward->backward = x->backward; } else { zsl->tail = x->backward; } while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL) zsl->level--; zsl->length--; }