前言
源码阅读方式建议:
VScode打开Redis源码,结合黄健宏老师的《Redis设计与实现》,先理解概念,再回归代码。
在阅读到数据库的具体实现时,建议在Linux系统下编译并运行Redis,对其的理解将更为直观。
源码阅读顺序:
- 阅读数据结构的源码
- 阅读内存编码数据结构实现
- 阅读数据类型的实现
- 阅读单机数据库相关代码
- 阅读客户端和服务器相关代码
- 阅读多机(Cluster)的实现代码
黄健宏老师的《Redis设计与实现》(第二版)
https://www.w3cschool.cn/hdclil/
部分资料及图片摘抄自网上,并附有参考链接,侵删。
基本数据结构
学习Redis源码建议自底向上,从底层数据结构入手,一步一步感受Redis的设计之巧妙,源码之美妙。
动态字符串SDS
源码文件:sds.h 和 sds.c。
Redis 构建了一种名为简单动态字符串(simple dynamic string,SDS)的抽象类型,并用作默认字符串表示。
PS:传统的C语言字符串在Redis中只用来作为字符串字面量(常量),如打印日志等等。
结构定义
先来看看SDS结构的定义:
// 类型别名,用于指向 sdshdr 的 buf 属性 typedef char *sds; //保存字符串对象的结构 struct sdshdr { // buf 中已占用空间的长度 int len; // buf 中剩余可用空间的长度 int free; // 数据空间 char buf[]; };
由此观之,Redis的SDS其实是基于C语言传统的字符串数组进行封装,以便能够对其更好的操作(后面解释为什么)。一个SDS实例的示意图如下:
值得注意的是,SDS也遵循C字符串的空字符结尾,并不计入len中,这么做的好处就是可以使用C相关的字符串库函数。
如:
printf("%s", s->buf);
对比C字符串
接着我们思考一个问题:这么封装C字符串有什么好处呢?
**其一、快速获取字符串长度。**通过len属性可以在O(1)的时间复杂度下获取一个字符串的长度,而C字符串需要O(n)。
**其二、杜绝缓冲区溢出。**C字符串由于没有记录自身长度,很可能在一些操作(如strcat等)时,造成缓冲区溢出,见下图:
对s1执行拼接操作之后:
而SDS的API在对SDS进行修改时,会检查其空间是否充足,如果不够,会先进行扩容,再进行相应的操作。
**其三、**减少修改字符串时带来的内存重分配次数。对于一个包含了 N 个字符的C字符串来说,这个C字符串的底层实现总是一个N+1个字符的数组(额外一个字符用于存空字符)。
如果执行拼接操作(append),那么在执行这个操作之前,程序需要先通过内存重分配来扩展底层数组的空间大小——如果忘了这一步就会产生缓冲区溢出。
如果执行截断操作(trim),那么在执行这个操作之后,程序需要通过内存重分配来释放字符串不再使用的那部分空间——如果忘了这一步就会产生内存泄漏。
内存重分配可能会造成系统调用,一般程序很少进行字符串长度修改,但是Redis作为数据库,会进行大量的字符串修改。为了避免这种缺陷,SDS通过未使用空间解除了字符串长度和底层数组长度之间的关联: 在SDS中,,buf 数组的长度不一定就是字符数量加一,数组里面可以包含未使用的字节,而这些字节的数量就由 SDS 的 free 属性记录。
通过未使用空间,SDS 实现了空间预分配和惰性空间释放两种优化策略。
**其四、空间预分配策略。**当API对SDS进行修改时,不仅会分配必要空间还会分配额外空间,具体策略如下:
当修改后SDS长度小于1M(默认最大长度),则额外分配len长度;
当大于1M,则额外分配1M;
这种预分配策略,将连续增长 N 次字符串所需的内存重分配次数从必定 N 次降低为最多 N 次。
**其五、惰性空间释放。**当SDS的API需要缩短SDS保存的字符串时, 程序并不立即使用内存重分配来回收缩短后多出来的字节,而是使用free属性将这些字节的数量记录起来,并等待将来使用。
**其六、支持二进制。**C字符串是以’\0’结尾的,这使得C字符串只能保存文本数据,而不能保存像图片、音频、视频、压缩文件这样的二进制数据。但是Redis需要、也具备这样的特点。
可以说正是诸如此类的小改进,造就了Redis的高效和稳定。
总结一下:
接口API
总览
函数 | 作用 | 时间复杂度 |
sdsnew | 创建一个包含给定 C 字符串的 SDS 。 | , N 为给定 C 字符串的长度。 |
sdsempty | 创建一个不包含任何内容的空 SDS 。 | |
sdsfree | 释放给定的 SDS 。 | |
sdslen | 返回 SDS 的已使用空间字节数。 | 读取 len 属性来直接获得, 。 |
sdsavail | 返回 SDS 的未使用空间字节数。 | 读取 free 属性来直接获得, 。 |
sdsdup | 创建一个给定 SDS 的副本(copy)。 | , N 为给定 SDS 的长度。 |
sdsclear | 清空 SDS 保存的字符串内容。 | 因为惰性空间释放策略,复杂度为 。 |
sdscat | 将给定 C 字符串拼接到 SDS 字符串的末尾。 | , N 为被拼接 C 字符串的长度。 |
sdscatsds | 将给定 SDS 字符串拼接到另一个 SDS 字符串的末尾。 | , N 为被拼接 SDS 字符串的长度。 |
sdscpy | 将给定的 C 字符串复制到 SDS 里面, 覆盖 SDS 原有的字符串。 | , N 为被复制 C 字符串的长度。 |
sdsgrowzero | 用空字符将 SDS 扩展至给定长度。 | , N 为扩展新增的字节数。 |
sdsrange | 保留 SDS 给定区间内的数据, 不在区间内的数据会被覆盖或清除。 | , N 为被保留数据的字节数。 |
sdstrim | 接受一个 SDS 和一个 C 字符串作为参数, 从 SDS 左右两端分别移除所有在 C 字符串中出现过的字符。 | , M 为 SDS 的长度, N 为给定 C 字符串的长度。 |
sdscmp | 对比两个 SDS 字符串是否相同。 | , N 为两个 SDS 中较短的那个 SDS 的长度。 |
挑几个API来看看源码。
sdslen
/* * 返回 sds 实际保存的字符串的长度 * * T = O(1) */ static inline size_t sdslen(const sds s) { struct sdshdr *sh = (void*)(s-(sizeof(struct sdshdr))); return sh->len; }
请大家结合sds的结构体定义,先想想,为什么这个API可以获取SDS长度?
在结构体里面, char* buf 和char buf[1]的效果差不多(对齐的情况),占4个字节;char buf[0] 和char buf[]是一样的,不占内存。
参考链接:https://www.cnblogs.com/hpplinux/archive/2012/07/06/2579502.html
所以(void*)(s-(sizeof(struct sdshdr)))其实就将内存地址转移到了SDS结构体地址,于是就可以通过这个地址直接访问其成员变量。
sdsnewlen初始化字符串
//根据初始化字符串(无符号)及长度建立sds //这是很多API所都需要的 sds sdsnewlen(const void *init, size_t initlen) { struct sdshdr *sh; // 根据是否有初始化内容,选择适当的内存分配方式 // T = O(N) if (init) { // zmalloc 不初始化所分配的内存(这个函数之后介绍) sh = zmalloc(sizeof(struct sdshdr)+initlen+1); } else { // zcalloc 将分配的内存全部初始化为 0 sh = zcalloc(sizeof(struct sdshdr)+initlen+1); } // 内存分配失败,返回 if (sh == NULL) return NULL; // 设置初始化长度 sh->len = initlen; // 新 sds 不预留任何空间 sh->free = 0; // 如果有指定初始化内容,将它们复制到 sdshdr 的 buf 中 // T = O(N) if (initlen && init) memcpy(sh->buf, init, initlen); // 以 \0 结尾 sh->buf[initlen] = '\0'; // 返回 buf 部分,而不是整个 sdshdr //因为我们要的是char return (char*)sh->buf; }
sdsclear惰性删除策略
//以O(1)的时间完成字符串的“清空” //只需要将终止符放在0即可 void sdsclear(sds s) { // 取出 sdshdr struct sdshdr *sh = (void*) (s-(sizeof(struct sdshdr))); // 重新计算属性 sh->free += sh->len; sh->len = 0; // 将结束符放到最前面(相当于惰性地删除 buf 中的内容) sh->buf[0] = '\0'; }
sdsMakeRoomFor
这个函数对sds的free进行扩充,2倍原大小或者加上1M的额外空间。
sds sdsMakeRoomFor(sds s, size_t addlen) { struct sdshdr *sh, *newsh; // 获取 s 目前的空余空间长度 size_t free = sdsavail(s); size_t len, newlen; // s 目前的空余空间已经足够,无须再进行扩展,直接返回 if (free >= addlen) return s; // 获取 s 目前已占用空间的长度 len = sdslen(s); sh = (void*) (s-(sizeof(struct sdshdr))); // s 最少需要的长度 newlen = (len+addlen); // 根据新长度,为 s 分配新空间所需的大小 if (newlen < SDS_MAX_PREALLOC) // 如果新长度小于 SDS_MAX_PREALLOC // 那么为它分配两倍于所需长度的空间 newlen *= 2; else // 否则,分配长度为目前长度加上 SDS_MAX_PREALLOC newlen += SDS_MAX_PREALLOC; // T = O(N) newsh = zrealloc(sh, sizeof(struct sdshdr)+newlen+1); // 内存不足,分配失败,返回 if (newsh == NULL) return NULL; // 更新 sds 的空余长度 newsh->free = newlen - len; // 返回 sds return newsh->buf; }
链表List
源码文件:adlist.h 和 adlist.c。
Redis实现的是双端链表,其被广泛用于实现 Redis 的各种功能,比如列表键,发布与订阅等等。通过将链表的void *value设置为不同的类型,Redis的链表可以用于保存各种不同类型的值。
结构定义
/* * 双端链表结构定义 */ typedef struct list { // 表头节点 listNode *head; // 表尾节点 listNode *tail; // 以下这些是函数指针,负责对应的功能 // 节点值复制函数 void *(*dup)(void *ptr); // 节点值释放函数 void (*free)(void *ptr); // 节点值对比函数 int (*match)(void *ptr, void *key); // 链表所包含的节点数量 unsigned long len; } list;
其中链表节点定义如下:
// 双端链表节点 typedef struct listNode { // 前置节点 struct listNode *prev; // 后置节点 struct listNode *next; // 节点的值 void *value; } listNode;
这里有一个迭代器值得注意,它被用作遍历双端链表,利用next返回遍历到的链表节点。
//双端链表迭代器 typedef struct listIter { // 当前迭代到的节点 listNode *next; // 迭代的方向(前向还是后向) int direction; } listIter;
一个链表的实例如下图所示:
接口API
总览:
listSetDupMethod | 将给定的函数设置为链表的节点值复制函数。 | 。 |
listGetDupMethod | 返回链表当前正在使用的节点值复制函数。 | 复制函数可以通过链表的 dup 属性直接获得, |
listSetFreeMethod | 将给定的函数设置为链表的节点值释放函数。 | 。 |
listGetFree | 返回链表当前正在使用的节点值释放函数。 | 释放函数可以通过链表的 free 属性直接获得, |
listSetMatchMethod | 将给定的函数设置为链表的节点值对比函数。 |
listGetMatchMethod | 返回链表当前正在使用的节点值对比函数。 | 对比函数可以通过链表的 match 属性直接获得, |
listLength | 返回链表的长度(包含了多少个节点)。 | 链表长度可以通过链表的 len 属性直接获得, 。 |
listFirst | 返回链表的表头节点。 | 表头节点可以通过链表的 head 属性直接获得, 。 |
listLast | 返回链表的表尾节点。 | 表尾节点可以通过链表的 tail 属性直接获得, 。 |
listPrevNode | 返回给定节点的前置节点。 | 前置节点可以通过节点的 prev 属性直接获得, 。 |
listNextNode | 返回给定节点的后置节点。 | 后置节点可以通过节点的 next 属性直接获得, 。 |
listNodeValue | 返回给定节点目前正在保存的值。 | 节点值可以通过节点的 value 属性直接获得, 。 |
listCreate | 创建一个不包含任何节点的新链表。 |
listAddNodeHead | 将一个包含给定值的新节点添加到给定链表的表头。 | |
listAddNodeTail | 将一个包含给定值的新节点添加到给定链表的表尾。 |
listInsertNode | 将一个包含给定值的新节点添加到给定节点的之前或者之后。 | |
listSearchKey | 查找并返回链表中包含给定值的节点。 | , N 为链表长度。 |
listIndex | 返回链表在给定索引上的节点。 | , N 为链表长度。 |
listDelNode | 从链表中删除给定节点。 | 。 |
listRotate | 将链表的表尾节点弹出,然后将被弹出的节点插入到链表的表头, 成为新的表头节点。 | |
listDup | 复制一个给定链表的副本。 | , N 为链表长度。 |
listRelease | 释放给定链表,以及链表中的所有节点。 | , N 为链表长度。 |
来挑几个API看看其具体实现:
**listAddNodeHead:**插入节点到头部
list *listAddNodeHead(list *list, void *value) { listNode *node; // 为节点分配内存 if ((node = zmalloc(sizeof(*node))) == NULL) return NULL; // 保存值指针 node->value = value; // 添加节点到空链表 if (list->len == 0) { list->head = list->tail = node; node->prev = node->next = NULL; } // 添加节点到非空链表 else { node->prev = NULL; node->next = list->head; list->head->prev = node; list->head = node; } // 更新链表节点数 list->len++; return list; }
**listGetIterator:**产生一个迭代器
/* 为给定链表创建一个迭代器, * 之后每次对这个迭代器调用 listNext 都返回被迭代到的链表节点 * direction 参数决定了迭代器的迭代方向: * AL_START_HEAD :从表头向表尾迭代 * AL_START_TAIL :从表尾想表头迭代 */ listIter *listGetIterator(list *list, int direction) { // 为迭代器分配内存 listIter *iter; if ((iter = zmalloc(sizeof(*iter))) == NULL) return NULL; // 根据迭代方向,设置迭代器的起始节点 if (direction == AL_START_HEAD) iter->next = list->head; else iter->next = list->tail; // 记录迭代方向 iter->direction = direction; return iter; }
**listSearchKey:**在链表中查找指定值的节点
listNode *listSearchKey(list *list, void *key) { listIter *iter; listNode *node; // 迭代整个链表 iter = listGetIterator(list, AL_START_HEAD); while((node = listNext(iter)) != NULL) { // 对比 if (list->match) { if (list->match(node->value, key)) { listReleaseIterator(iter); // 找到 return node; } } else { if (key == node->value) { listReleaseIterator(iter); // 找到 return node; } } } listReleaseIterator(iter); // 未找到 return NULL; }
- 话说list迭代器有什么用?
从源码上来看,list的迭代器主要是为了方便实现对list双向前进。迭代器作为一种设计模式,可以使得list适应不同的算法。
Redis源码(1)基本数据结构2:https://developer.aliyun.com/article/1508219