对象编码
1.1、redisObject
redis 数据库通过 dict 实现映射关系。key 的固定类型是 string,底层由 sds 实现。value 的类型有多种,底层由 redisObject 实现,这种通用的数据结构可以存储不同类型的 value。
typedef struct redisObject { unsigned type:4; // 对象类型:string, hash, list, set unsigned encoding:4; // 编码方式 unsigned lru:LRU_BITS; // lru:24位,最近一次访问时间,单位秒, // lfu: 高16位,最近一次访问时间;低8位,逻辑访问次数 int refcount; // 引用计数,计数为 0,对象无人引用,可以回收 void *ptr; // 数据指针,指向对象内容 } robj;
编码方式
#define OBJ_ENCODING_RAW 0 /* Raw representation */ #define OBJ_ENCODING_INT 1 /* Encoded as integer */ #define OBJ_ENCODING_HT 2 /* Encoded as hash table */ #define OBJ_ENCODING_ZIPMAP 3 /* Encoded as zipmap */ #define OBJ_ENCODING_LINKEDLIST 4 /* No longer used: old list encoding. */ #define OBJ_ENCODING_ZIPLIST 5 /* Encoded as ziplist */ #define OBJ_ENCODING_INTSET 6 /* Encoded as intset */ #define OBJ_ENCODING_SKIPLIST 7 /* Encoded as skiplist */ #define OBJ_ENCODING_EMBSTR 8 /* Embedded sds string encoding */ #define OBJ_ENCODING_QUICKLIST 9 /* Encoded as linked list of ziplists */ #define OBJ_ENCODING_STREAM 10 /* Encoded as a radix tree of listpacks */
数据结构
#define OBJ_STRING 0 /* String object. */ #define OBJ_LIST 1 /* List object. */ #define OBJ_SET 2 /* Set object. */ #define OBJ_ZSET 3 /* Sorted set object. */ #define OBJ_HASH 4 /* Hash object. */
1.2、string
编码方式
OBJ_ENCODING_INT
:字符串长度不超过 20 且能转成整数OBJ_ENCODING_EMBSTR
:字符串长度不超过 44OBJ_ENCODING_RAW
:字符串长度大于 44
数据存储
embstr, raw
:sds 存储,embstr 连续存储,malloc 一次;raw 离散存储,malloc 两次int
:转换成 long 类型存储在 redisObject 的 ptr 指针位置。
string
1.3、list
编码方式
OBJ_ENCODING_QUICKLIST
OBJ_ENCODING_ZIPLIST
数据存储
- 双向链表 quicklist
- 压缩列表 ziplis
list
1.4、set
编码方式
OBJ_ENCODING_INTSET
:元素全是整数且节点数量不超过 512 (set-max-intset-entries)OBJ_ENCODING_HT
:除上述情况外,HT编码(字典 dict)
数据存储
- 整数数组 intset
- 字典 dict
set
1.5、zset
编码方式
OBJ_ENCODING_ZIPLIST
:节点数量不超过 zset-max-ziplist-entries,默认128。且字符串长度不超过 zset-max-ziplist-value,默认64。OBJ_ENCODING_SKIPLIST
:节点数量大于 128 或字符串长度大于 64
数据存储
- 列表 ziplist
- 跳表 skiplist
zset
1.6、hash
编码方式
OBJ_ENCODING_ZIPLIST
:节点数量不超过 hash-max-ziplist-entries,默认512。且字符串长度不超过 hash-max-ziplist-value,默认64。OBJ_ENCODING_HT
:节点数量大于 512 或字符串长度大于 64
数据存储
- 列表 ziplist
- 字典 dict
hash
2、存储结构
2.1、dict
redis 是 KV 数据库,而 KV 映射关系是通过字典来实现的。
2.1.1、dict 结构
dict
字典 dict 采用数组 + 链表来解决哈希冲突。dict 由哈希表 dictht + 哈希节点 dictEntry 组成。哈希表有两个,通常 ht[0] 使用,ht[1] 不使用;rehash 时,ht[0] 存储 rehash 之前的数据,ht[1] 存储新数据和 ht[0] 迁移来的数据。
// 字典的实现,相当于 c++ 中类的封装 typedef struct dict { dictType *type; // dict 类型,封装成员函数 void *privdata; // 私有数据,连接的上下文 dictht ht[2]; // 散列表,一个存储当前数据,另一个 rehash 时使用 long rehashidx; // rehash 的位置,-1 表示未进行 unsigned long iterators; /* number of iterators currently running */ } dict; // 哈希表 typedef struct dictht { dictEntry **table; // entry 指针数组,保存 entry 的指针 unsigned long size; // 哈希表大小,2的n次幂 unsigned long sizemask; // 哈希表掩码 size-1,hash 取余运算优化成位运算 unsigned long used; // 实际存储元素 entry 的个数 } dictht; // 哈希节点 typedef struct dictEntry { void *key; union { void *val; uint64_t u64; int64_t s64; double d; } v; struct dictEntry *next; } dictEntry;
当向 dict 添加键值对时,首先根据 key 计算出 hash 值(h),然后利用 h & sizemask 来计算元素应该存储到数组中的哪个索引位置。利用 sizemask 优化取余运算 (hash(key) % size) 成位运算 hash(key) & sizemask,提升运算效率。
2.1.2、dict 扩容
负载因子:LoadFactor = used / size,表示哈希冲突的激烈程度。
dict 每次添加元素后,检查负载因子,满足以下两种情况时会触发哈希表扩容:
- LoadFactor > 1 且没有 fork (持久化rdb, aof, rdb-aof) 进程,扩容,大小翻倍。
- LoadFactor > 5,立即扩容,大小为恰好包含used的 2n ,涉及到写时复制原理。负载因子太大,索引效率极低,必须立即扩容。
// dict.c dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing) { ... /* Get the index of the new element, or -1 if * the element already exists. */ if ((index = _dictKeyIndex(d, key, dictHashKey(d,key), existing)) == -1) return NULL; ... } static long _dictKeyIndex(dict *d, const void *key, uint64_t hash, dictEntry **existing) { ... /* Expand the hash table if needed */ if (_dictExpandIfNeeded(d) == DICT_ERR) return -1; ... } /* Expand the hash table if needed */ static int _dictExpandIfNeeded(dict *d) { /* Incremental rehashing already in progress. Return. */ // 如果正在 rehash,则返回ok if (dictIsRehashing(d)) return DICT_OK; /* If the hash table is empty expand it to the initial size. */ // 如果哈希表为空,则初始化哈希表为默认大小为4 if (d->ht[0].size == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE); /* If we reached the 1:1 ratio, and we are allowed to resize the hash * table (global setting) or we should avoid it but the ratio between * elements/buckets is over the "safe" threshold, we resize doubling * the number of buckets. */ // 当负载因子达到1以上且没有 fork 进程 or 负载因子超过5,开始扩容 if (d->ht[0].used >= d->ht[0].size && (dict_can_resize || d->ht[0].used/d->ht[0].size > dict_force_resize_ratio)) { // 扩容翻倍 return dictExpand(d, d->ht[0].used*2); } return DICT_OK; } /* Expand or create the hash table */ // 重置哈希表的大小,扩容或缩容,重置大小为恰好包含 size 的二次幂 int dictExpand(dict *d, unsigned long size) { // 若正在 rehash 或如果当前存储元素 entry 数量超过了要申请的 size 大小,报错 if (dictIsRehashing(d) || d->ht[0].used > size) return DICT_ERR; // 申请新的哈希表,实际申请大小为第一个大于或等于size的2次幂 dictht n; unsigned long realsize = _dictNextPower(size); /* Rehashing to the same table size is not useful. */ if (realsize == d->ht[0].size) return DICT_ERR; /* Allocate the new hash table and initialize all pointers to NULL */ n.size = realsize; // 设置哈希表的 size 为实际申请大小 n.sizemask = realsize-1; // 设置哈希表掩码 size-1 n.table = zcalloc(realsize*sizeof(dictEntry*)); n.used = 0; /* Is this the first initialization? If so it's not really a rehashing * we just set the first hash table so that it can accept keys. */ // 第一次初始化,不需要rehash,直接设置ht[0]指向新创建的哈希表即可 if (d->ht[0].table == NULL) { d->ht[0] = n; return DICT_OK; } /* Prepare a second hash table for incremental rehashing */ // 否则,rehash d->ht[1] = n; // ht[1] 指向新创建的哈希表 d->rehashidx = 0; // 标识 rehash 的开始 return DICT_OK; }
2.1.3、dict 缩容
dict 每次删除元素后,检查负载因子,若LoadFactor < 0.1,发生缩容,收缩大小是恰好包含 used 的 2n 。
// t_hash.c int hashTypeDelete(robj *o, sds field) { ... if (dictDelete((dict*)o->ptr, field) == C_OK) { deleted = 1; // 删除成功后,检查是否需要缩容 if (htNeedsResize(o->ptr)) dictResize(o->ptr); } ... } // server.c // 是否需要 rehash int htNeedsResize(dict *dict) { long long size, used; size = dictSlots(dict); // 哈希表的大小 used = dictSize(dict); // entry 的数量 // 若哈希表的 size > 4 (哈希表的初始大小)且负载因子低于0.1 return (size > DICT_HT_INITIAL_SIZE && (used*100/size < HASHTABLE_MIN_FILL)); } // dict.c int dictResize(dict *d) { // 哈希表重置后的大小 unsigned long minimal; // 如果正在 rehash or bgsave or bgwriteof ,返回错误 if (!dict_can_resize || dictIsRehashing(d)) return DICT_ERR; // 获取 entry 个数 minimal = d->ht[0].used; // 若缩容后小于哈希表初始大小(4),则重置为哈希表初始大小 if (minimal < DICT_HT_INITIAL_SIZE) minimal = DICT_HT_INITIAL_SIZE; // 哈希表缩容 return dictExpand(d, minimal); }
2.1.4、dict rehash
根据负载因子的值,决定当前扩容还是收缩
- LoadFactor >= 1,扩容
- LoadFactor < 0.1,收缩
当扩容缩容造成映射规则发生改变,需要 rehash。rehash 以数组槽为单位,每个槽位对应一个链表 entry,链接着多个冲突的元素。
渐进式 rehash
背景:为了解决哈希表存储大量元素,rehash 占用主线程,其他命令无法及时响应。
发生 rehash 时,ht[0] 存储旧数据,ht[1] 存储新数据和 ht[0] 迁移来的数据。也就是说,新增数据,写入 ht[1]。查询、修改和删除则会在 ht[0] 和 ht[1] 依次查找并执行。这样可以确保 ht[0] 的数据只减不增,直至 rehash 为空。处理渐进式 rehash 的过程中,不会发生扩容和缩容。
此外,采用分治的方法,将 rehash 分散到
- 增删改查: rehash 1 次
- 定时器:rehash 1ms,最大执行 1ms,每次步长100个数组槽位。
rehash 函数原型
int dictRehash(dict *d, int n) { // n: rehash 的次数 // 访问空槽位的最大次数,否则一直移动空桶,没有 rehash,造成长时间阻塞 int empty_visits = n * 10; // 正在 rehash,返回 if (!dictIsRehashing(d)) return 0; // 执行n次 reash 且 ht[0] 有元素 while(n-- && d->ht[0].used != 0) { dictEntry *de, *nextde; assert(d->ht[0].size > (unsigned long)d->rehashidx); // 循环寻找非空槽位 while(d->ht[0].table[d->rehashidx] == NULL) { d->rehashidx++; // ht[0] 向后移动一个槽位 if (--empty_visits == 0) return 1; // 访问空桶次数达到最大,退出 } // 寻找到非空槽位 de = d->ht[0].table[d->rehashidx]; // 将旧哈希表 ht[0] 该槽位中所有元素放入新哈希表 ht[1] while(de) { uint64_t h; nextde = de->next; // 获取该槽位的 entry 链表在新哈希表中的索引位置 h = dictHashKey(d, de->key) & d->ht[1].sizemask; // 头插到新哈希表 de->next = d->ht[1].table[h]; d->ht[1].table[h] = de; d->ht[0].used--; d->ht[1].used++; de = nextde; } d->ht[0].table[d->rehashidx] = NULL; d->rehashidx++; } // rehash 是否完成 if (d->ht[0].used == 0) { // rehash 完成 zfree(d->ht[0].table); // 释放 ht[0] 的内存 d->ht[0] = d->ht[1]; // 将 ht[1] 交给 ht[0] _dictReset(&d->ht[1]); // ht[1] 初始化为空 d->rehashidx = -1; // 标识 rehash 结束 return 0; } return 1; }
redis 中提供两种 rehash 的方式
- rehash 1 次:对 dict 的增删改查操作,平摊到每次客户端连接的操作中。
- rehash 1 ms:reactor 一次事件循环需要处理网络事件和定时事件。在处理定时事件的时候(例如:rehash),没有网络事件,不会造成其他事件的延时处理。
// rehash 1次 static void _dictRehashStep(dict *d) { if (d->iterators == 0) dictRehash(d,1); } // rehash 1ms int dictRehashMilliseconds(dict *d, int ms) { long long start = timeInMilliseconds(); int rehashes = 0; // 每次步长: rehash 100次 while(dictRehash(d,100)) { rehashes += 100; // 超时,则退出 if (timeInMilliseconds()-start > ms) break; } return rehashes; }
scan 命令
keys 命令的问题
- 一次性吐出所有满足条件的 key,没有办法筛选
- 数据量大的遍历,导致 redis 服务器卡顿。redis 单线程,其他命令必须等待 key 命令结束后才能执行。
# 迭代数据库中的键 scan cursor [MATCH pattern] [COUNT count] [TYPE type]
scan 遍历顺序
scan 遍历顺序
从遍历开始的那一刻,对 redis 已经存在的数据进行遍历,若因扩容或缩容造成映射算法发生改变,键的槽位可能会发生改变,继续遍历会发生错误。
因此 scan 采用高位进位加法的遍历顺序,这样 rehash 后的槽位在遍历顺序上是相邻的,对 sacn 那刻起已经存在的元素遍历不会出现重复和遗漏。例外:在scan过程当中,发生两次缩容的时候,会发生数据重复。
2.2、skiplist
跳表的特点
- 多层级有序链表
- 最底层包含所有的元素
- 支持二分查找,快速定位边界,然后在最底层找到范围内所有元素(区别红黑树)。
- 增删改查的时间复杂度都是 O(log2n)。
2.2.1、理想跳表
理想跳表是多层级有序链表,采取空间换时间的方法,每隔一个节点生成一个层级节点,模拟二叉树结构,最底层包含所有的元素。
理想跳表
但是如果对理想跳表结构进行增删操作,很可能改变跳表结构。若重构链表,代价极大。考虑用概率的方法来优化。每次增加节点的时候,1/2 的概率增加一个层级,1/4 的概率增加两个层级,以此类推。经过证明,当数据量足够大时,通过概率构造的跳表趋向于理想跳表,并且此时如果删除节点,无需重构跳表结构,此时依然趋向于理想跳表。
2.2.2、redis 跳表
从节约内存角度出发,redis 考虑牺牲一些时间性能让跳表结构变得更加扁平。以循环双向链表结构实现,每次增加节点时,1/4 的概率增加一个层级,跳表的最高层级为 32。当节点数量大于 128 或者有一个字符串长度大于 64,则使用跳表结构。
例:插入17,先比较第 4 层,(6, nil), 从 6 节点往下跳。比较第 3 层,(6, 25),从 6 节点往下跳。比较第 2 层,(9, 25),从 9 节点往下跳。比较第1层,(12, 19),在 12 节点后插入 节点17。
redis 跳表
redis 跳表结构如图所示
skiplist
#define ZSKIPLIST_MAXLEVEL 32 // 跳表的层级 #define ZSKIPLIST_P 0.25 // 每个节点增加层级的概率 typedef struct zskiplistNode { sds ele; // 节点存储的数据 double score; // 节点分数,排序使用 struct zskiplistNode *backward; // 前一个节点指针 struct zskiplistLevel { // 多级索引数组 struct zskiplistNode *forward; // 下一个节点指针 unsigned long span; // 索引跨度 } level[]; } zskiplistNode; typedef struct zskiplist { struct zskiplistNode *header, *tail; // 头尾节点指针 unsigned long length; // 节点数量 int level; // 最大的索引层,默认是1 } zskiplist;
2.3、ziplist
2.3.1、ziplist 结构
ziplist 压缩列表是内存连续的双向链表,可以在任意一端进行增删操作,且该操作的时间复杂度为 O(1)。节点间通过记录上一节点和当前节点长度来寻址。
ziplist
各个字段的意义是:
- zlbytes:uint32_t,整个压缩列表的长度
- zltail:uint32_t,记录压缩列表尾节点距离的 ziplist 首地址的偏移,以此确定尾节点的位置。
- zllen:uint16_t,记录压缩列表包含的节点数量。
- entry:压缩列表包含的列表节点
- zlend:标记压缩列表的末端
源码使用宏定义实现的
// uint32_t类型: zlbytes,zltail; uint16_t类型: zllen; uint8_t类型: zlend // zlbytes:压缩列表占用的总字节数 #define ZIPLIST_BYTES(zl) (*((uint32_t*)(zl))) // zltail:尾节点 zlentry 的偏移量 #define ZIPLIST_TAIL_OFFSET(zl) (*((uint32_t*)((zl)+sizeof(uint32_t)))) // zllen: 压缩列表包含的 zlentry 节点数量 #define ZIPLIST_LENGTH(zl) (*((uint16_t*)((zl)+sizeof(uint32_t)*2))) // 压缩列表的 header 大小: zlbytes + zltail + zlen #define ZIPLIST_HEADER_SIZE (sizeof(uint32_t)*2+sizeof(uint16_t)) // zlend:压缩列表结束标志 #define ZIPLIST_END_SIZE (sizeof(uint8_t)) // 指向头节点 zlentry 的起始位置 #define ZIPLIST_ENTRY_HEAD(zl) ((zl)+ZIPLIST_HEADER_SIZE) // 指向尾节点 zlentry 的起始位置 #define ZIPLIST_ENTRY_TAIL(zl) ((zl)+intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))) // 指向压缩列表的结束位置 #define ZIPLIST_ENTRY_END(zl) ((zl)+intrev32ifbe(ZIPLIST_BYTES(zl))-1)
2.3.2、zlentry
ziplist 中的 entry 不采用双向链表的前后指针(因为这样会占用16个字节,浪费内存),而是记录前一个节点和当前节点的长度,实现双向遍历。
前一节点的长度,占 1 个或 5 个字节。
- 如果前一节点的长度小于 254 字节,则采用 1 个字节来保存这个长度值
- 如果前一节点的节点大于 254 字节,则采用 5 个字节来保存这个长度值,第一个字节为0xfe,后 4 个字节才是真实长度数据
typedef struct zlentry { unsigned int prevrawlensize; // 编码 prevrawlen 的大小,占1个或5个字节 unsigned int prevrawlen; // 前一个 entry 的长度 unsigned int lensize; // 编码 len 的大小 unsigned int len; // 当前节点长度 unsigned int headersize; // 当前节点的 header 大小 unsigned char encoding; // 编码方式,记录节点的数据类型(字符串或整数) unsigned char *p; // 指向节点的数据(字符串或整数) } zlentry;
2.3.3、连锁更新
现在,假设我们有N个连续的、长度为 250~253 字节之间的 entry。当新增节点的字节数大于254。entry 的 previous_entry_length
属性需要由 1 个字节扩展到 5 个字节。
ziplist 连锁更新
ziplist 这种特殊情况下产生的连续多次空间扩展操作称之为连锁更新(Cascade Update)。新增、删除都可能导致连锁更新的发生。
2.4、quicklist
双向链表 quicklist
quicklist
源码如下:
// 压缩的阈值:元素长度小于48,不压缩;元素压缩前后长度差不超过8,不压缩 #define MIN_COMPRESS_BYTES 48 // 双向链表中的节点 typedef struct quicklistNode { struct quicklistNode *prev; // 指向前一个节点 struct quicklistNode *next; // 指向后一个节点 unsigned char *zl; // 指向ziplist unsigned int sz; // 该 ziplist 占用空间大小 unsigned int count : 16; // 该 ziplist 存储的元素个数,16bit,最大65536 unsigned int encoding : 2; // 是否采用LZF压缩算法: RAW=1 LZF=2 unsigned int container : 2; // 存储方式,ziplist=2,未来可能支持其他数据结构 unsigned int recompress : 1; // was this node previous compressed? unsigned int attempted_compress : 1; // node can't compress; too small unsigned int extra : 10; //预留 } quicklistNode; // 管理双向链表 typedef struct quicklist { quicklistNode *head; // 指向头节点 quicklistNode *tail; // 指向尾节点 unsigned long count; // 元素总数,即所有ziplist中的元素总数 unsigned long len; // 双向链表中的节点个数 int fill : QL_FILL_BITS; // ziplist 大小设置 unsigned int compress : QL_COMP_BITS; // 节点压缩深度 unsigned int bookmark_count: QL_BM_BITS; quicklistBookmark bookmarks[]; } quicklist; // 被压缩过的ziplist typedef struct quicklistLZF { unsigned int sz; // 压缩后的ziplist的大小 char compressed[]; // 柔性数组,存储压缩后的ziplist } quicklistLZF;
2.5、sds
2.5.1、SDS 特点
- 获取字符串长度的时间复杂度为
O(1)
- 支持动态扩容
- 减少内存分配次数
- 二进制安全:可以存储图片,二进制协议等数据
2.5.2、SDS 结构体
struct __attribute__ ((__packed__)) sdshdr8 { // header uint8_t len; // 已经保存的字符串字节数 uint8_t alloc; // buf 申请的总字节数 unsigned char flags; // 不同SDS的头部类型,用来控制SDS头的大小 // body char buf[]; // 存储字符串,柔性数组,不占用空间 };
申请过程如下
// 申请动态字符串的空间 s = malloc(sizeof(struct sdshdr8) + alloc); sds = s + sizeof(struct sdshdr8) // 释放空间 free(sds-sizeof(struct sdshdr8))
2.5.3、动态扩容
redis 字符串的最大长度为 512M
- 若新字符串小于1M,加倍扩容 + 1
'/0'
; - 若新字符串大于1M,每次至多扩容1M + 1
'/0'
。称为内存预分配。
sds
2.6、intset
2.6.1、inset 结构
intset 是基于整数数组来实现的 set 集合,具有长度可变、有序等特征。
特点
- 元素唯一、有序
- 具备类型升级机制,可以节省内存空间
- 底层采用二分查找方式来查询
typedef struct intset { uint32_t encoding; // 编码方式:支持16位,32位,64位整数 uint32_t length; // 元素个数 int8_t contents[]; // 整数数组,保存集合数据 } intset;
为了方便查找,inset 中所有整数升序依次保存在 contents 数组中
intset contents
编码方式:INTSET_ENC_INT16
,数组中每个整数不超过int16_t
的范围,占两个字节。
2.6.2、intset 升级
超出了 int16_t
范围,intset 会自动升级编码方式到合适的大小。
例,添加数字 50000,超出了int16_t
的范围,升级编码方式如下:
- 升级编码为
INTSET_ENC_INT32
, 每个整数占4字节,并按照新的编码方式及元素个数扩容数组 - 逆序依次将数组中的元素拷贝到扩容后的正确位置(首尾 * 2)
- 将待添加的元素放入数组末尾
- 最后,将 inset 的 encoding 属性改为
INTSET_ENC_INT32
,将length
属性改为4
/* Insert an integer in the intset */ // 参数:待插入的 *is, 待插入的值 value, 是否插入成功 *success intset *intsetAdd(intset *is, int64_t value, uint8_t *success) { // 获取当前值对应的编码方式 uint8_t valenc = _intsetValueEncoding(value); // 要插入的位置 uint32_t pos; if (success) *success = 1; // 判断编码是否超过了当前 intset 的编码 if (valenc > intrev32ifbe(is->encoding)) { // 超出编码,需要升级,intsetUpgradeAndAdd 函数 return intsetUpgradeAndAdd(is,value); } else { // 在当前 intset 中查找值与 value 一样的元素的下标 pos if (intsetSearch(is,value,&pos)) { // 如果找到了,则无需插入,直接结束并返回失败 if (success) *success = 0; return is; } // 数组扩容 is = intsetResize(is,intrev32ifbe(is->length)+1); // 移动数组中 pos 之后的元素到 pos + 1,给新元素腾出空间 if (pos < intrev32ifbe(is->length)) intsetMoveTail(is,pos,pos+1); } // 插入新元素 _intsetSet(is,pos,value); // 重置元素长度 is->length = intrev32ifbe(intrev32ifbe(is->length)+1); return is; } /* Upgrades the intset to a larger encoding and inserts the given integer. */ static intset *intsetUpgradeAndAdd(intset *is, int64_t value) { // 获取当前 insert 编码 uint8_t curenc = intrev32ifbe(is->encoding); // 获取新编码 uint8_t newenc = _intsetValueEncoding(value); // 获取元素个数 int length = intrev32ifbe(is->length); // 判断元素值的正负(超出表示范围正负均可),小于0插入队首,大于0插入队尾 int prepend = value < 0 ? 1 : 0; // 重置编码为新编码 is->encoding = intrev32ifbe(newenc); // 重置数组的大小 is = intsetResize(is,intrev32ifbe(is->length)+1); // 倒序,逐个元素搬到新的位置 while(length--) // _intsetGetEncoded 按照旧编码方式查找旧元素 // _intsetSet按照新编码方式插入新元素 _intsetSet(is,length+prepend,_intsetGetEncoded(is,length,curenc)); // 插入新元素,prepend 决定是队首还是队尾 if (prepend) _intsetSet(is,0,value); else _intsetSet(is,intrev32ifbe(is->length),value); // 修改数组长度 is->length = intrev32ifbe(intrev32ifbe(is->length)+1); return is; } static uint8_t intsetSearch(intset *is, int64_t value, uint32_t *pos) { // 初始化二分查找需要的 min、max、mid int min = 0, max = intrev32ifbe(is->length)-1, mid = -1; int64_t cur = -1; // 数组为空,则不用查找 if (intrev32ifbe(is->length) == 0) { if (pos) *pos = 0; return 0; } else { // 数组不为空,判断 value 是否大于最大值,小于最小值 // 大于最大值,不用找了,插入队尾 if (value > _intsetGet(is,max)) { if (pos) *pos = intrev32ifbe(is->length); return 0; } else if (value < _intsetGet(is,0)) { // 小于最小值,不用找了,插入队首 if (pos) *pos = 0; return 0; } } // 二分查找 while(max >= min) { mid = ((unsigned int)min + (unsigned int)max) >> 1; cur = _intsetGet(is,mid); if (value > cur) { min = mid+1; } else if (value < cur) { max = mid-1; } else { break; } } if (value == cur) { if (pos) *pos = mid; return 1; } else { if (pos) *pos = min; return 0; } }