内存编码数据结构的实现
这一部分主要介绍Redis特制的内存编码数据结构,建议结合图像来理解。
整数集合
源码:intset.h和intset.c。
整数集合(intset)是集合键的底层实现之一: 当一个集合只包含整数值元素,并且这个集合的元素数量(见后)不多时,Redis就会使用整数集合作为集合键的底层实现。
结构定义
typedef struct intset { // 编码方式 uint32_t encoding; // 集合包含的元素数量 uint32_t length; // 保存元素的数组 int8_t contents[]; } intset;
其中,content是整数集合的底层实现,所有的元素都是该contents数组的一项,值得注意的是,虽然该数组声明为int8_t类型,但是该数组中存储的数据类型由encoding来决定:
encoding有三个选择:INTSET_ENC_INT16/32/64。
//intset 的编码方式 #define INTSET_ENC_INT16 (sizeof(int16_t)) #define INTSET_ENC_INT32 (sizeof(int32_t)) #define INTSET_ENC_INT64 (sizeof(int64_t))
升级操作
当向当前的整数集合插入一个大于当前encodeing类型的数据时,必须先进行一次不可逆的升级操作:即将所有的元素编码升级为足以容纳新元素的encoding。
具体分为三步:
1.根据新元素的类型, 扩展整数集合底层数组的空间大小。
2.将底层数组现有的所有元素都转换成与新元素相同的类型,并将类型转换后的元素放置到正确的位上,而且在放置元素的过程中,需要继续维持底层数组的有序性质不变。
3.将新元素添加到底层数组里面。
举个例子:
开始时,整数集合的contents数据类型为INT16,其中存储的元素如下:
现在需要插入一个数字:65535,超过了INT16的数据范围,故需要进行升级操作。先进行空间的拓展,64*4=256位,现在只有48位,更新数组空间之后:
空间分配完毕,现在需要对原有的元素进行转移(因为原有的数字是INT16位格式存储的),为了保证元素有序性,从最后一位3进行转移:
转移原有字符之后,插入新元素65535:
升级操作的时间复杂度为O(N)。而且值得注意的是,引起升级操作的元素要么存在索引0位置(负的太离谱),要么在length-1(大的太离谱)。
之所以Redis的整形数组采用这种升级的方式,有两个好处:
一是节约内存,如果想容纳INT64的数字,那么传统做法是直接声明INT64的数组,但有时其实并不需要INT64的格式存储,浪费内存;只有当真正需要INT64时,才会分配相应的内存空间。
二是高灵活性,整数集合可以通过自动升级底层数组来适应新元素, 所以我们可以随意地将 int16_t 、int32_t 或者int64_t类型的整数添加到集合中,而不必担心出现类型错误,这种做法非常灵活。
PS:升级之后不可降级。
接口API
总览
函数 | 作用 | 时间复杂度 |
intsetNew | 创建一个新的整数集合。 | O(1) |
intsetAdd | 将给定元素添加到整数集合里面。 | O(N) |
intsetRemove | 从整数集合中移除给定元素。 | O(N) |
intsetFind | 检查给定值是否存在于集合。 | 因为底层数组有序,查找可以通过二分查找法来进行, 所以复杂度为 O(logN) 。 |
intsetRandom | 从整数集合中随机返回一个元素。 | O(1) |
intsetGet | 取出底层数组在给定索引上的元素。 | O(1) |
intsetLen | 返回整数集合包含的元素个数。 | O(1) |
intsetBlobLen | 返回整数集合占用的内存字节数。 | O(1) |
比较好奇的是从底层数组怎么返回我想要的值:
/* * 根据给定的编码方式 enc ,返回集合的底层数组在 pos 索引上的元素。 * T = O(1) */ static int64_t _intsetGetEncoded(intset *is, int pos, uint8_t enc) { int64_t v64; int32_t v32; int16_t v16; // ((ENCODING*)is->contents) 首先将数组转换回被编码的类型 // 然后 ((ENCODING*)is->contents)+pos 计算出元素在数组中的正确位置 // 之后 memcpy(&vEnc, ..., sizeof(vEnc)) 再从数组中拷贝出正确数量的字节 // 如果有需要的话, memrevEncifbe(&vEnc) 会对拷贝出的字节进行大小端转换 // 最后将值返回 if (enc == INTSET_ENC_INT64) { memcpy(&v64,((int64_t*)is->contents)+pos,sizeof(v64)); memrev64ifbe(&v64); return v64; } else if (enc == INTSET_ENC_INT32) { memcpy(&v32,((int32_t*)is->contents)+pos,sizeof(v32)); memrev32ifbe(&v32); return v32; } else { memcpy(&v16,((int16_t*)is->contents)+pos,sizeof(v16)); memrev16ifbe(&v16); return v16; } } /* * 根据集合的编码方式,返回底层数组在 pos 索引上的值 */ static int64_t _intsetGet(intset *is, int pos) { return _intsetGetEncoded(is,pos,intrev32ifbe(is->encoding)); }
不对,上述的操作只是在指定的pos插入,那么如何确定这个pos呢?其实整数集合中的整数都是有序的,从小到大排列,因此在插入或者查找某值时都是先使用二分查找的方式进行查找,以下是往整数集合中插入一个value的函数:
intset *intsetAdd(intset *is, int64_t value, uint8_t *success) { // 计算编码 value 所需的长度 uint8_t valenc = _intsetValueEncoding(value); uint32_t pos; // 默认设置插入为成功 if (success) *success = 1; // 如果 value 的编码比整数集合现在的编码要大 // 那么表示 value 必然可以添加到整数集合中 // 并且整数集合需要对自身进行升级,才能满足 value 所需的编码 if (valenc > intrev32ifbe(is->encoding)) { return intsetUpgradeAndAdd(is,value); } else { // 运行到这里,表示整数集合现有的编码方式适用于 value // 在整数集合中查找 value ,看他是否存在: // - 如果存在,那么将 *success 设置为 0 ,并返回未经改动的整数集合 // - 如果不存在,那么可以插入 value 的位置将被保存到 pos 指针中 // 等待后续程序使用 if (intsetSearch(is,value,&pos)) { if (success) *success = 0; return is; } // 运行到这里,表示 value 不存在于集合中 // 程序需要将 value 添加到整数集合中 // 为 value 在集合中分配空间 is = intsetResize(is,intrev32ifbe(is->length)+1); // 如果新元素不是被添加到底层数组的末尾 // 那么需要对现有元素的数据进行移动,空出 pos 上的位置,用于设置新值 // 举个例子 // 如果数组为: // | x | y | z | ? | // |<----->| // 而新元素 n 的 pos 为 1 ,那么数组将移动 y 和 z 两个元素 // | x | y | y | z | // |<----->| // 这样就可以将新元素设置到 pos 上了: // | x | n | y | z | // T = O(N) if (pos < intrev32ifbe(is->length)) intsetMoveTail(is,pos,pos+1); } // 将新值设置到底层数组的指定位置中 _intsetSet(is,pos,value); // 增一集合元素数量的计数器 is->length = intrev32ifbe(intrev32ifbe(is->length)+1); // 返回添加新元素后的整数集合 return is; }
那么怎么插入一个数据呢?
/* * 根据集合的编码方式,将底层数组在 pos 位置上的值设为 value 。 */ static void _intsetSet(intset *is, int pos, int64_t value) { // 取出集合的编码方式 uint32_t encoding = intrev32ifbe(is->encoding); // 根据编码 ((Enc_t*)is->contents) 将数组转换回正确的类型 // 然后 ((Enc_t*)is->contents)[pos] 定位到数组索引上 // 接着 ((Enc_t*)is->contents)[pos] = value 将值赋给数组 // 最后, ((Enc_t*)is->contents)+pos 定位到刚刚设置的新值上 // 如果有需要的话, memrevEncifbe 将对值进行大小端转换 if (encoding == INTSET_ENC_INT64) { ((int64_t*)is->contents)[pos] = value; memrev64ifbe(((int64_t*)is->contents)+pos); } else if (encoding == INTSET_ENC_INT32) { ((int32_t*)is->contents)[pos] = value; memrev32ifbe(((int32_t*)is->contents)+pos); } else { ((int16_t*)is->contents)[pos] = value; memrev16ifbe(((int16_t*)is->contents)+pos); } }
看到其中有个memrev16/32/64ifbe函数,有些好奇,遂F12,找到了个:
/*将16位小端序转为大端序*/ void memrev16(void *p) { unsigned char *x = p, t; t = x[0]; x[0] = x[1]; x[1] = t; } /* 将32位小端序转为大端序 */ void memrev32(void *p) { unsigned char *x = p, t; t = x[0]; x[0] = x[3]; x[3] = t; t = x[1]; x[1] = x[2]; x[2] = t; }
补充关于大端和小端序的转换,值得注意的是,大端小端在内存的存储的顺序是按照字节,而不是按位!
例如小端中的:00000000 10000000,
在大端中存储不是:00000001 00000000 (即按位逆序)
而是按字节相反:10000000 00000000。
示意图:
转换方式:
所以现在来看memrev16/32就明白了,先用char*来获取单一字节的数据,而后两两换位,即实现大端到小端的转变。不妨试试不看源码,写写memrev64?
那么集合是怎么进行删除的呢?盲猜是靠内存的拷贝移动,看了一下果然如此:
static void intsetMoveTail(intset *is, uint32_t from, uint32_t to) { void *src, *dst; // 要移动的元素个数 uint32_t bytes = intrev32ifbe(is->length)-from; // 集合的编码方式 uint32_t encoding = intrev32ifbe(is->encoding); // 根据不同的编码 // src = (Enc_t*)is->contents+from 记录移动开始的位置 // dst = (Enc_t*)is_.contents+to 记录移动结束的位置 // bytes *= sizeof(Enc_t) 计算一共要移动多少字节 if (encoding == INTSET_ENC_INT64) { src = (int64_t*)is->contents+from; dst = (int64_t*)is->contents+to; bytes *= sizeof(int64_t); } else if (encoding == INTSET_ENC_INT32) { src = (int32_t*)is->contents+from; dst = (int32_t*)is->contents+to; bytes *= sizeof(int32_t); } else { src = (int16_t*)is->contents+from; dst = (int16_t*)is->contents+to; bytes *= sizeof(int16_t); } // 进行移动 // T = O(N) memmove(dst,src,bytes); }
压缩列表
源码参见:ziplist.h和ziplist.c。
压缩列表是 Redis 为了节约内存而开发的,由一系列特殊编码的连续内存块组成的顺序型(sequential)数据结构。
Redis的有序集合、哈希以及列表都直接或者间接使用了压缩列表。当有序集合或哈希的元素数目比较少,且元素都是短字符串或整形 时,Redis便使用压缩列表作为其底层数据存储方式。列表使用快速链表(quicklist)数据结构存储,而快速链表就是双向链表与压缩列表的组合(见后)。
一个压缩列表可以包含任意多个节点(entry), 每个节点可以保存一个字节数组或者一个整数值。以下就是一个压缩列表的结构:
1、zlbytes:压缩列表的字节长度,占4个字节,因此压缩列表最长**(2^32)-1字节**;
2、zltail:压缩列表尾元素相对于压缩列表起始地址的偏移量,占4个字节;
3、zllen:压缩列表的元素数目,占两个字节;那么当压缩列表的元素数目超过(2^16)-2怎么处理呢?此时通过zllen字段无法获得压缩列表的元素数目,必须遍历整个压缩列表才能获取到元素数目;
4、entry:压缩列表存储的元素,可以为字节数组或者整数;
5、zlend:压缩列表的结尾(不是尾元素的意思),占一个字节,恒为0xFF。
在Redis中,压缩列表定义如下:
/* * 创建并返回一个新的 ziplist */ unsigned char *ziplistNew(void) { // ZIPLIST_HEADER_SIZE 是 ziplist 表头的大小 // +1 字节是表末端 ZIP_END 的大小 unsigned int bytes = ZIPLIST_HEADER_SIZE+1; // 为表头和表末端分配空间 unsigned char *zl = zmalloc(bytes); // 初始化表属性 ZIPLIST_BYTES(zl) = intrev32ifbe(bytes); ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(ZIPLIST_HEADER_SIZE); ZIPLIST_LENGTH(zl) = 0; // 设置表末端 zl[bytes-1] = ZIP_END; return zl; }
为便于快速操作压缩列表获取各字段的数据,redis有以下宏定义(char * zl指向压缩列表首地址):
// 定位到bytes 属性,该属性记录了整个 ziplist 所占用的内存字节数 #define ZIPLIST_BYTES(zl) (*((uint32_t*)(zl))) // 定位到offset 属性,该属性记录了到达表尾节点的偏移量 #define ZIPLIST_TAIL_OFFSET(zl) (*((uint32_t*)((zl)+sizeof(uint32_t)))) // 定位到length 属性,该属性记录了 ziplist 包含的节点数量 #define ZIPLIST_LENGTH(zl) (*((uint16_t*)((zl)+sizeof(uint32_t)*2))) // 返回 ziplist 表头的大小 #define ZIPLIST_HEADER_SIZE (sizeof(uint32_t)*2+sizeof(uint16_t)) // 返回指向 ziplist 第一个节点(的起始位置)的指针 #define ZIPLIST_ENTRY_HEAD(zl) ((zl)+ZIPLIST_HEADER_SIZE) // 返回指向 ziplist 最后一个节点(的起始位置)的指针 #define ZIPLIST_ENTRY_TAIL(zl) ((zl)+intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))) // 返回指向 ziplist 末端 ZIP_END (的起始位置)的指针 #define ZIPLIST_ENTRY_END(zl) ((zl)+intrev32ifbe(ZIPLIST_BYTES(zl))-1)
结构定义
压缩列表的节点信息定义如下:
// 保存 ziplist 节点信息的结构 typedef struct zlentry { // prevrawlen :前置节点的长度 // prevrawlensize :编码 prevrawlen 所需的字节大小 unsigned int prevrawlensize, prevrawlen; // len :当前节点值的长度 // lensize :编码 len 所需的字节大小 unsigned int lensize, len; // 当前节点 header 的大小 // 等于 prevrawlensize + lensize unsigned int headersize; // 当前节点值所使用的编码类型 unsigned char encoding; // 指向当前节点的指针 unsigned char *p; } zlentry;
Redis源码、面试指南(2)内存编码数据结构(下):https://developer.aliyun.com/article/1508226