五、申请内存
5.1 ThreadCache
5.1.1 ThreadCache整体设计
定长内存池只需支持固定大小内存块的申请释放,因此定长内存池中只需一个自由链表管理释放回来的内存块。现在要支持申请和释放不同大小的内存块,那么就需要多个自由链表来管理释放回来的内存块。ThreadCache实际上是一个哈希桶结构,每个桶中存放的都是一个自由链表
ThreadCache支持小于等于256KB内存的申请,若将每种字节数的内存块都用一个自由链表进行管理的话,那么就需要20多万个(256*1024)自由链表,光是存储这些自由链表的头指针就需要消耗大量内存,这显然是得不偿失的
此时可以选择做一些平衡的牺牲,让这些字节数按照某种规则进行对齐,例如让这些字节数都按照8字节进行向上对齐。譬如当线程申请1~8字节的内存时会直接给出8字节,而当线程申请9~16字节的内存时会直接给出16字节,以此类推
因此当线程要申请某一大小的内存块时,就需要经过对齐规则计算得到对齐后的字节数,进而找到对应的哈希桶,若该哈希桶中的自由链表中有内存块,那就从自由链表中头删一个内存块进行返回;若该自由链表已经为空了,那么就需要向下一层的CentralCache进行获取
但由于对齐的原因,就会产生一些碎片化的内存无法被利用,比如线程只申请了6Byte的内存,而ThreadCache却直接给了8Byte的内存,多给出的2Byte就无法被利用,导致了一定程度的空间浪费,这些因为某些对齐原因导致无法被利用的内存,就是内存碎片中的内碎片问题
5.1.2 ThreadCache哈希桶映射与对齐规则
内存块是会被链接到自由链表上的,因此一开始肯定是按8字节进行对齐是最合适的,因为必须保证这些内存块,无论是在32位平台下还是64位平台下,都至少能够存储得下一个指针
但若所有的字节数都按照8字节进行对齐的话,那么就需要建立256 * 1024 ÷ 8 = 32768 个桶,这个数量还是比较多的,实际上可以让不同范围的字节数按照不同的对齐数进行对齐
虽然对齐产生的内碎片会引起一定程度上的空间浪费,但按照上面的对齐规则,可以将浪费率控制到百分之十左右。
需要说明的是,1~128这个区间不做讨论,因为1字节就算是对齐到2字节也有百分之五十的浪费率,并且小区间就算浪费率较高也并不会产生太大的浪费,这里从第二个区间开始进行计算
根据上面的公式,要得到某个区间的最大浪费率,就应该让分子取到最大,让分母取到最小。
比如 129~1024 这个区间的对齐数是16,那么最大浪费的字节数就是15,而最小对齐后的字节数就是这个区间内的前16个数所对齐到的字节数,即144。那么该区间的最大浪费率也就是15 ÷ 144 ≈ 10.42%。同样的道理,后面两个区间的最大浪费率分别是127 ÷ 1152 ≈ 11.02% 和1023 ÷ 9216 ≈ 11.10%。
对齐函数的编写
关于这个函数可以封装到一个DataHandleRules类中,但当中的成员函数最好设置为静态成员函数,否则在调用这些函数时就需要通过对象去调用。并且对于这些可能会频繁调用的函数,可以考虑将其设置为内联函数
在获取某一字节数向上对齐后的字节数时,可以先判断该字节数属于哪一个区间,然后再通过调用一个子函数进行进一步处理
static inline size_t AlignUp(size_t size) { if (size < 128) return _AlignUp(size, 8); else if (size < 1024) return _AlignUp(size, 16); else if (size < 8 * 1024) return _AlignUp(size, 128); else if (size < 64 * 1024) return _AlignUp(size, 1024); else if (size < 256 * 1024) return _AlignUp(size, 8 * 1024); else { assert(false); return -1; } }
此时就需要编写子函数,该子函数需要通过对齐数计算出某一字节数对齐后的字节数
//一般写法 static inline size_t _AlignUp(size_t bytes, size_t alignNum) { size_t alignSize = 0; if (bytes%alignNum != 0) { alignSize = (bytes / alignNum + 1)*alignNum; } else { alignSize = bytes; } return alignSize; }
除了上述写法还可以通过位运算的方式来进行计算,虽然位运算并不容易理解,但计算机执行位运算的速度是比执行乘法和除法更快的
static inline size_t _AlignUp(size_t bytes, size_t alignNum) { return ((bytes + alignNum - 1) & ~(alignNum - 1)); }
对于上述位运算,以10字节按8字节对齐为例进行分析。8 − 1 = 7,7就是一个低三位为1其余位为0的二进制序列,将10与7相加,相当于将10字节当中不够8字节的剩余字节数补上了
然后再将该值与7按位取反后的值(11000)进行与运算,而7按位取反后是一个低三位为0其余位为1的二进制序列,该操作进行后相当于屏蔽了该值的低三位而该值的其余位保持不变,此时得到的值就是10字节按8字节对齐后的值,即16字节
映射函数的编写
在获取某一字节数对应的哈希桶下标时,也是先判断该字节数属于哪一个区间,然后再通过调用子函数进一步处理
static inline size_t Index(size_t bytes) { assert(bytes <= MAX_BYTES); static int group_array[4] = { 16, 56, 56, 56 }; if (bytes <= 128) { return _Index(bytes, 3);//8 等于 2的3次方 } else if (bytes <= 1024) { return _Index(bytes - 128, 4) + group_array[0]; } else if (bytes <= 8 * 1024) { return _Index(bytes - 1024, 7) + group_array[1] + group_array[0]; } else if (bytes <= 64 * 1024) { return _Index(bytes - 8 * 1024, 10) + group_array[2] + group_array[1] + group_array[0]; } else if (bytes <= 256 * 1024) { return _Index(bytes - 64 * 1024, 13) + group_array[3] + group_array[2] + group_array[1] + group_array[0]; } else { assert(false); return -1; } }
为了提高效率同样使用位运算来解决,但是此时传入的并不是该字节数的对齐数,而是将对齐数写成2的n次方的形式后,将这个n值进行传入。比如对齐数是8,传入的就是3
static inline size_t _Index(size_t bytes, size_t align_shift) { return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1; }
以10字节按8字节对齐为例进行分析。此时传入的alignShift就是3,将1左移3位后得到的实际上就是对齐数8,8 − 1 = 7 ,即还是让10与7相加。
之后再将该值向右移3位,实际上就是让17除以8,此时相当于屏蔽了该值二进制的低三位,因为除以8得到的值与其二进制的低三位无关,所以我们可以说是将10对齐后的字节数除以了8,此时得到了2,而最后还需要减一是因为数组的下标是从0开始的
5.1.3 TSL无锁访问
每个线程都有一个各自独享的ThreadCache,那应该如何创建这个ThreadCache?显然不能将这个ThreadCache创建为全局属性,因为全局变量是所有线程共享的,这样就不可避免的需要使用锁来进行控制,会增加了控制成本和代码复杂度,并且效率也会有所降低
要实现每个线程无锁的访问属于独自的ThreadCache,可以使用线程局部存储TLS(Thread Local Storage)。这是一种变量的存储方法,使用该存储方法的变量在它所在的线程是全局可访问的,但是不能被其他线程访问到,这样就保持了数据的线程独立性
// TLS Thread Local Storage static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;
通过TLS,每个线程可以无锁的获取各自专属的ThreadCache对象
if (pTLSThreadCache == nullptr) { pTLSThreadCache = new ThreadCache; }
5.1.4 ThreadCache核心设计
按照上述的映射对齐规则,ThreadCache中桶的个数即自由链表的个数是208,以及ThreadCache允许申请的最大内存大小256KB,可以将这些数据按照如下方式进行定义:
static const size_t MAX_BYTES = 256 * 1024;//能在threadcache申请的最大字节数 static const size_t NFREELIST = 208;//thread_cache && central_cache 桶数
ThreadCache本质是一个存储208个自由链表的数组,目前ThreadCache就先提供一个Allocate()函数用于申请对即可,后面随着不断编写增加即可
class ThreadCache { public: void* Allocate(size_t size); private: FreeList _freeLists[NFREELIST]; };
在ThreadCache申请对象时,通过所给字节数计算出对应的哈希桶下标。若桶中自由链表不为空,则从该自由链表中取出一个对象进行返回;但若此时自由链表为空,那么就需从CentralCache获取,FetchFromCentralCache()函数就是ThreadCache类中的一个成员函数
void* ThreadCache::Allocate(size_t size) { assert(size <= MAX_BYTES); size_t alignSize = DataHandleRules::AlignUp(size); size_t bucketIndex = DataHandleRules::Index(size); if (!_freeLists[bucketIndex].IsEmpty()){ return _freeLists[bucketIndex].Pop(); } else { return FetchFromCentralCache(bucketIndex, alignSize); } }
慢开始反馈调节算法
当ThreadCache向CentralCache申请内存时,应该向CentralCach申请多少个小内存块呢?若申请的太少,那么ThreadCache在短时间内用完了又需要申请;但若一次性申请的太多,可能用不完就浪费了
鉴于此,这里采用慢开始反馈调节算法。当ThreadCache向CentralCache申请内存时,若申请的是较小的对象,那么可以多给一点,但若申请的是较大的对象,就可以少给一点
通过下面这个函数,就可以根据所需申请的内存块的大小计算出具体给出的内存块个数的上限值,并且将该上限值控制到2~512个之间。就算ThreadCache要申请的对象再小,最多CentralCache一次性给出512个内存块
static size_t MoveSize(size_t size) { assert(size > 0); // [2, 512] 一次批量移动多少个对象的(慢启动)上限值 int num = MAX_BYTES / size; if (num < 2) num = 2; //大对象一次批量上限低 if (num > 512) num = 512; //小对象一次批量上限高 return num; }
既然计算的是上限值,那么具体该给出多少呢?
在FreeList结构中增加一个叫做_maxSize的成员变量,该变量的初始值设置为1,并且提供一个公有成员函数MaxSize()用于获取这个变量。即现在ThreadCache中的每个自由链表都会有一个各自的_maxSize
class FreeList//自由链表:用于管理切分过的小块内存 { public: void Push(void* obj) { assert(obj != nullptr); NextObj(obj) = _freeList; _freeList = obj; ++_size; } void* Pop() { assert(_freeList != nullptr); void* obj = _freeList; _freeList = NextObj(obj); --_size; return obj; } void PushRange(void* start, void* end, size_t n)//头插一段内存块 { NextObj(end) = _freeList; _freeList = start; _size += n; } void PopRange(void*& start, void*& end, size_t n) { assert(n <= _size); start = _freeList; end = start; for (size_t i = 0; i < n - 1; ++i) { end = NextObj(end); } _freeList = NextObj(end); NextObj(end) = nullptr; _size -= n; } bool IsEmpty() { return _freeList == nullptr; } size_t& MaxSize() { return _maxSize; } size_t Size() { return _size; } private: void* _freeList = nullptr; size_t _maxSize = 1; size_t _size = 0; };
此时当ThreadCache申请对象时,会比较_maxSize和计算得出的值,取出其中的较小值作为本次申请对象的个数。若本次采用的是_maxSize的值,那么会将ThreadCache中该自由链表的_maxSize的值增加
ThreadCache第一次向CentralCache申请某大小的内存块时,申请到的都是一个,但下一次申请同样大小的对象时,因为该自由链表中的_maxSize增加了,就会申请到三个。直到该自由链表中_maxSize的值,超过上限值后就不会继续增长了,此后申请到的内存块数都是计算出的上限值
//慢开始反馈调节算法 //并不会一开始一批量向central_cache索要太多,可能使用不完 size_t batchNum = min(_freeLists[index].MaxSize(), DataHandleRules::MoveSize(size)); if (batchNum == _freeLists[index].MaxSize()) { _freeLists[index].MaxSize() += 2;//若不断需要size大小的内存,那么batchNum就会不断增长直至上限 }
ThreadCache向CentralCache申请内存块
每次ThreadCache向CentralCache申请对象时,先通过慢开始反馈调节算法计算出本次应申请的小内存块的个数,然后再向CentralCache进行申请
若ThreadCache最终申请到小内存块的个数为1,那么直接将该内存块返回即可。为什么需要返回一个申请到的内存呢?因为ThreadCache要向CentralCache申请内存块,其实是由于某个线程向ThreadCache申请但ThreadCache当中没有,才导致ThreadCache向CentralCache申请内存块。因此CentralCache将内存块返回给ThreadCache后,ThreadCache会将该内存块返回给申请对象的线程
但若ThreadCache最终申请到多个内存块,那么除了将第一个内存块返回之外,还需要将剩下的内存块挂入ThreadCache对应的哈希桶中
void* ThreadCache::FetchFromCentralCache(size_t index, size_t size) { //慢开始反馈调节算法 //并不会一开始一批量向central_cache索要太多,可能使用不完 size_t batchNum = min(_freeLists[index].MaxSize(), DataHandleRules::MoveSize(size)); if (batchNum == _freeLists[index].MaxSize()) { _freeLists[index].MaxSize() += 2;//若不断需要size大小的内存,那么batchNum就会不断增长直至上限 } void* start = nullptr, * end = nullptr; size_t actualNum = CentralCache::GetInstance()->FetchMemoryBlock(start, end, batchNum, size); assert(actualNum > 0);//至少分配一个 if (actualNum == 1) { assert(start == end); return start; } else{ _freeLists[index].PushRange(NextObj(start), end, actualNum - 1);//将后面的内存头插thread_cache自由链表中 return start;//将第一个内存块返回给外面使用 } }
5.2 CentralCache
5.2.1 CentralCache整体设计
CentralCache与ThreadCache的相同之处
CentralCache结构与ThreadCache是基本类似的,都为哈希桶结构,并且遵循的对齐映射规则相同。这样的好处就是:当ThreadCache的某个桶中没有内存了,就可以直接到CentralCache中相同位置的哈希桶里去索取内存
CentralCache与ThreadCache的不同之处
CentralCache与ThreadCache有两个明显不同的地方:
1.ThreadCache是每个线程独享的,但是CentralCache是所有线程共享的。每个线程的Thread Cache没有内存了都会去找CentralCache,因此在访问CentralCache时是需要加锁的。但在加锁时并不是将整个CentralCache全部锁上,而是使用桶锁,即每个桶都有一个锁。只有当多个线程同时访问CentralCache的同一个桶时才会存在锁竞争,若是多个线程同时访问CentralCache的不同桶就不会存在锁竞争,这也使得锁竞争并不是十分激烈
2.ThreadCache的每个桶中挂的是一个个切好的内存块,而CentralCache的每个桶中挂的是一个个的Span(跨度)
每个span管理的都是一个以页为单位的大块内存,每个桶里面的若干span是按照双链表的形式链接起来的,并且每个span里面还有一个自由链表,这个自由链表里面挂的就是一个个切好了的内存块,根据其所在的哈希桶被切成了对应的大小
5.2.2 CentralCache结构设计
页号的类型
每个程序运行起来后都有自己的进程地址空间,在32位平台下,进程地址空间的大小是字节;而在64位平台下,进程地址空间的大小就是字节
页的大小一般是4K或者8K,以8K为例。在32位平台下,进程地址空间就可以被分成 ÷= 个页;在64位平台下,进程地址空间就可以被分成÷=个页。页号本质与地址一样,都是一个编号,只不过地址是以一个字节为一个单位,而页是以多个字节为一个单位
由于页号在64位平台下的取值范围是[0,) ,因此不能简单的用一个无符号整型来存储页号(只使用32位环境下),这时需要借助条件编译来解决该问题
//Win64环境下_WIN64和_WIN32都存在,Win32环境下只存在_WIN32 #ifdef _WIN64 typedef unsigned long long PAGE_ID; #elif _WIN32 typedef size_t PAGE_ID; #else//Linux //... #endif
Span的结构
CentralCache的每个桶里挂的是一个个的Span,Span是管理以页为单位的大块内存的,其结构如下:
//管理多个页的跨度结构 struct Span { Span* _prev = nullptr;//双向链表中的结构 Span* _next = nullptr; PAGE_ID _pageId = 0;//页号 size_t _num = 0;//页的数量 void* _freeList = nullptr;//自由链表 size_t _use_count = 0;//记录已分配给ThreadCache的小块内存的数量 };
对于Span管理的以页为单位的大块内存,需要知道这块内存具体在哪一个位置,以便于之后PageCache进行前后页的合并缓解内存碎片问题,因此Span结构当中会记录所管理大块内存起始页的页号 (具体如何合并在后面讲解)
每一个Span管理多少页并不是固定的,由后面的算法来控制,因此span结构中有一个_num成员来代表着该Span管理的页的数量
每个Span管理的大块内存,都会被切成小内存块挂到当前Span的自由链表中,比如8Byte哈希桶中的Span,会被切成一个个8Byte大小的内存块挂到当前Span的自由链表中,因此Span结构中需要自由链表_freeList来存储小块内存块
Span结构中的_use_count成员记录的是,当前Span中已经分配给TreadCache的小块内存块,当某个Span的_use_count计数变为0时,代表当前Span分配出去的小内存块已经全部还回来了,此CentralCache就可以将这个Span再还给PageCache
每个桶当中的Span是以双链表的形式组织起来的,当需要将某个Span归还给PageCache时,就可以很方便的将该Span从双链表结构中移出。若用单链表结构的话则较为麻烦,因为单链表在删除时需要知道当前结点的前一个结点
双链表结构
CentralCache的每个哈希桶中存储的都是一个双链表结构,对于该双链表结构可以进行封装:
class SpanList { public: SpanList() { _head = new Span; assert(_head != nullptr); _head->_next = _head; _head->_prev = _head; } Span* Begin() { return _head->_next; } Span* End() { return _head; } bool IsEmpty() { return _head == _head->_next; } void PushFront(Span* span) { Insert(Begin(), span); } Span* PopFront() { Span* front = _head->_next; Erase(front); return front; } void Insert(Span* pos, Span* newSpan) { assert(pos); assert(newSpan); Span* prev = pos->_prev; newSpan->_next = pos; prev->_next = newSpan; newSpan->_prev = prev; pos->_prev = newSpan; } void Erase(Span* pos) { assert(pos != nullptr); assert(pos != _head); Span* prev = pos->_prev; Span* next = pos->_next; prev->_next = next; next->_prev = prev; } private: Span* _head = nullptr; public: std::mutex _mtx;//桶锁 };
注意:从双链表删除的Span会还给PageCache,相当于只是把这个Span从双链表中移除,不需要对删除的Span进行delete操作
5.2.3 CentralCaChe核心设计
所有线程使用的都是同一个CentralCache,即在整个项目中只需要有一个CentralCache对象即可,那么可以使用单例模式进行CentralCache的编写
单例模式可以保证项目中该类只有一个实例,并提供一个访问它的全局访问点,该实例被所有程序模块共享。单例模式又分为饿汉模式和懒汉模式,懒汉模式较为复杂,这里使用饿汉模式即可
//饿汉单例模式 class CentralCache { public: static CentralCache* GetInstance(); private: SpanList _spanLists[NFREELIST]; private: CentralCache() {} CentralCache(const CentralCache&) = delete; static CentralCache _sInst;//声明 };
CentralCache CentralCache::_sInst; CentralCache* CentralCache::GetInstance() { return &_sInst; }
CentralCache向ThreadCache提供内存块
要从CentralCache获取batchNum个指定大小的内存块,这些内存块肯定都是从CentralCache对应哈希桶中的某个Span中取出来的,因此取出来的这batchNum个内存块是链接在一起的,只需要得到这段链表的头和尾即可,可以采用输出型参数进行获取
size_t CentralCache::FetchMemoryBlock(void*& start, void*& end, size_t batchNum, size_t size) { size_t index = DataHandleRules::Index(size); _spanLists[index]._mtx.lock(); Span* span = GetOneSpan(_spanLists[index], size); assert(span); assert(span->_freeList); //从span中获取batchNum块内存块,若不够则有多少获取多少 start = span->_freeList; end = span->_freeList; size_t count = 0, actualNum = 1; while (NextObj(end) != nullptr && count < batchNum - 1) { end = NextObj(end); ++count; ++actualNum; } span->_freeList = NextObj(end); NextObj(end) = nullptr; span->_use_count += actualNum; _spanLists[index]._mtx.unlock(); return actualNum; }
由于CentralCache是所有线程共享的,所以在访问CentralCache中的哈希桶时,需要先给对应的哈希桶加上桶锁,在获取到对象后再将桶锁解掉
在向CentralCache获取内存块时,先是在CentralCache对应的哈希桶中获取到一个非空的Span,然后从这个Span的自由链表中取出batchNum个对象即可,但可能这个非空的span的自由链表当中对象的个数不足batchNum个,这时该自由链表当中有多少个对象就给多少即可
ThreadCache实际从CentralCache获得的对象的个数可能与传入的batchNum值是不一样的,因此需要统计本次过程中实际ThreadCache获取到的内存块个数,并根据该值及时更新_use_count
虽然实际申请到对象的个数可能比batchNum要小,但这并不会产生任何影响。因为ThreadCache的本意就是向CentralCache申请一个内存块,之所以一次多申请一些内存块,是因为制定了这样的策略来提高效率,使得下次线程再申请相同大小的对象时就可以直接在ThreadCache中获取,而不用再向CentralCache申请对象
CentralCache从PageCache中获取一个非空的Span
ThreadCache向CentralCache申请内存块时,CentralCache需要先从对应的哈希桶中获取到一个非空的Span,然后从这个非空的Span中取出若干内存块给ThreadCache
首先遍历CentralCache对应哈希桶中的双链表,若该双链表中有非空的Span,那么直接将该Span进行返回即可。但若遍历双链表后发现双链表中没有空闲的Span,那么此时CentralCache就需要向PageCache申请内存块
但是,该向PageCache申请多大的内存块呢?可以根据具体所需内存块的大小来决定,之前就根据所需内存块的大小计算出ThreadCache一次向CentralCache申请内存块的个数上限,现在则是根据所需内存块的大小计算出CentralCache一次应该向PageCache申请几页的Span
先根据所需内存块的大小计算出ThreadCache一次向CentralCache申请内存块的个数上限,然后将这个上限值乘以单个内存块的大小,就算出了具体需要多少字节,最后再将这个算出来的字节数转换为页数。若转换后不够一页,那么就申请一页,否则转换出来是几页就申请几页。即Central Cache向PageCache申请内存时,要求申请到的内存尽量能够满足ThreadCache向CentralCache申请时的上限
//一次central_cache向page_cache获取多少页的Span static size_t NumMovePage(size_t size) { size_t num = MoveSize(size); size_t npage = num * size; npage >>= PAGE_SHIFT; if (npage == 0) npage = 1; return npage; }
代码中的PAGE_SHIFT代表页大小转换偏移。以页的大小为8K为例,PAGE_SHIFT的值为13
//页大小转换偏移 一页为2^13,即8KB static const size_t PAGE_SHIFT = 13;
当CentralCache申请到若干页的Span后,还需要将这个Span切成一个个对应大小的小内存块挂到该Span的自由链表中
找到一个Span所管理的大块内存块呢?首先需要计算出该Span的起始地址,即用这个Span的起始页号乘以一页的大小即可得到这个Span的起始地址,然后用这个Span的页数乘以一页的大小就可以得到这个Span所管理的内存块的大小,用起始地址加上内存块的大小即可得到这块内存块的结束位置
明确了这块内存的起始和结束位置后,就可以进行切分了。根据所需内存块的大小,每次从大块内存切出一块固定大小的内存块尾插到Span的自由链表中即可
为什么是尾插呢?因为若是将切好的内存块尾插到自由链表,这些内存块看起来是按照链式结构链接起来的,而实际其在物理空间上是连续的,这时当把这些连续内存分配给某个线程使用时,可以提高该线程的CPU缓存命中率
Span* CentralCache::GetOneSpan(SpanList& list, size_t size) { //查看当前CentralCache中的spanlist中是否有还有尚未分配的span Span* it = list.Begin(); while (it != list.End()) { if (it->_freeList != nullptr) { return it; } else { it = it->_next; } } list._mtx.unlock();//将central_cache桶锁释放,此时若其他线程释放内存回来并不会导致阻塞 //运行到此处时即没有空闲span,只能向Page_cache索取 PageCache::GetInstance()->_pageMutex.lock(); Span* span = PageCache::GetInstance()->NewSpan(DataHandleRules::NumMovePage(size)); PageCache::GetInstance()->_pageMutex.unlock(); //计算span的大块内存的起始地址和大块内存的大小(字节数) char* start = (char*)(span->_pageId << PAGE_SHIFT); size_t bytes = span->_num << PAGE_SHIFT; char* end = start + bytes; //将大块内存切成自由链表并链接起来 span->_freeList = start; start += size; void* tail = span->_freeList; while (start < end) { NextObj(tail) = start; tail = NextObj(tail); start += size; } NextObj(tail) = nullptr; list._mtx.lock(); list.PushFront(span);//将span挂入对应的桶中 return span; }
在访问PageCache前,应先把CentralCache对应的桶锁解开。虽然此时CentralCache的这个桶当中是没有内存供其他ThreadCache申请的,但ThreadCache除了申请内存还会归还内存,若在访问PageCache前将CentralCache对应的桶锁解开,那么此时其他ThreadCache想要归还内存到CentralCache的这个桶时就不会发生阻塞
因此在调用NewSpan()函数之前,应先将CentralCache对应的桶锁解掉,然后再将PageCache的大锁加上,当申请到k页的Span后将PageCache的大锁解开,但此时不需要立刻加上桶锁。因为CentralCache拿到k页的Span后还需进行切分操作,此时别的线程访问不到该Span,可以在Span切好后需要将其挂入Central Cache对应的桶上时,再加上对应的桶锁
5.3 PageCache
5.3.1 PageCache整体设计
PageCache与CentralCache结构的相同之处
PageCache与CentralCache一样,都是哈希桶结构,并且PageCache的每个哈希桶中挂的也是一个个的Span,并且也是按照双链表的结构链接起来的
PageCache与CentralCache结构的不同之处
CentralCache的映射规则与ThreadCache保持一致,而PageCache的映射规则与它们都不相同PageCache的映射规则采用的是直接定址法,如1号桶挂的都是1页的Span,2号桶挂的都是2页的span,以此类推
CentralCache每个桶中的Span被切成了一个个对应大小的内存块,以供ThreadCache申请。而PageCache当中的Span是没有被进一步切小的,因为PageCache服务的是CentralCache,当CentralCache没有Span时,向PageCache申请某一固定页数的Span,而切分申请到的这个Span由CentralCache完成
PageCache中有多少个桶由编写自行决定,本博客中采用的就是最大128页的方案。为了让桶号与页号对应,将第0号桶空出,因此需要将哈希桶的个数设置为129
//page_cache的桶数+1 || page_cache的最大页数+1 (下标为0位置空出) static const size_t NPAGES = 129;
本博客为什么采用最大128页的方案呢?因为线程申请单个内存块最大是256KB,而128页可以正好被切成4个256KB的内存块,因此是足够的。若是在采用更大页的方案也是可以的,根据具体的需求进行设置即可
PageCache类设计
当每个线程的ThreadCache没有内存时都会向CentralCache申请,此时多个线程的ThreadCache若访问的不是同一个桶,那么这些线程是可以同时进行访问的。这时CentralCache的多个桶就可能同时向PageCache申请内存的,所以PageCache是存在线程安全问题的,因此在访问PageCache时是必须要加锁的
但是在PageCache中不能使用桶锁,因为当CentralCache向PageCache申请内存时,PageCache可能会将其他桶当中大页的Span切小后再给CentralCache。此外,当CentralCache将某个Span归还给PageCache时,PageCache也会尝试将该Span与其他桶当中的Span进行合并
即在访问PageCache时,可能需要访问PageCache中的多个桶,若PageCache用桶锁就会出现大量频繁的加锁和解锁,导致程序的效率低下。因此在访问PageCache时使用一个大锁将整个Page Cache锁住
PageCache对象在整个进程中也是只能存在一个的,因此需要将其设计为单例模式
//饿汉单例模式 class PageCache { public: static PageCache* GetInstance();//提供一个全局访问接口 private: PageCache() {} PageCache(const PageCache&) = delete; static PageCache _sInst;//声明 SpanList _spanLists[NPAGES]; public: std::mutex _pageMutex;//整个page_cache的锁 };
PageCache PageCache::_sInst; PageCache* PageCache::GetInstance() { return &_sInst; }
5.3.2 PageCache获取Span
当调用上述的GetOneSpan()尝试从CentralCache的某个哈希桶获取一个非空的Span时,若遍历哈希桶中的双链表后发现双链表中没有Span,或该双链表中的Span都为空,那么CentralCache就需向PageCache申请若干页的Span了,PageCache获取一个k页的Span并提供给CentralCache呢?
PageCache是直接按照页数进行映射的,若CentralCache要获取一个k页的Span,在PageCache的第k号桶中取出一个Span返回给CentralCache即可。但若第k号桶中没有Span了,这时并不是直接转而向堆区申请一个k页的Span,而是要继续在后面更大的桶中寻找Span
直接向堆申请以页为单位的内存时,应尽量申请大块一点的内存块,因为此时申请到的内存是连续的,当线程需要内存时可以将其切小后分配给线程,而当线程将内存释放后又可以将其合并成大块的连续内存。若向堆申请内存时申请的是小块内存,而需申请多次,那么申请到的内存就不一定是连续的了
当第k号桶中没有空闲的Span时,可以继续找第k+1号桶,因为可以将k+1页的Span切分成一个k页的Span和一个1页的Span,这时可以将k页的Span返回,而将切分后1页的Span挂到1号桶中。但若后面的桶当中都没有空闲的Span,此时就只能向堆申请一个128页的内存块,并将其用一个Span结构管理,然后将128页的Span切分成k页的Span和128-k页的Span,其中k页的Span返回给CentralCache,而128-k页的Span就挂到第128-k号桶中
即每次向堆申请的都是128页大小的内存块,CentralCache中的Span实际都是由128页的Span切分而成的
Span* PageCache::NewSpan(size_t k)//获取一个k页的span { assert(k > 0); //检查第k个桶中是否有可用的span if (!_spanLists[k].IsEmpty()) { return _spanLists[k].PopFront(); } //检查后续的桶中是否有可用的span,若有则进行切分 for (int i = k + 1; i < NPAGES; ++i) { if (!_spanLists[i].IsEmpty()) { Span* nSpan = _spanLists[i].PopFront(); Span* kSpan = _spanPool.New(); kSpan->_pageId = nSpan->_pageId; nSpan->_pageId += k; kSpan->_num = k; nSpan->_num -= k; _spanLists[nSpan->_num].PushFront(nSpan); return kSpan; } } //运行此处则说明PageCache中没有可用的span,需向堆中申请128页的span Span* newSpan = new Span; void* address = SystemAlloc(NPAGES - 1); newSpan->_pageId = (PAGE_ID)address >> PAGE_SHIFT; newSpan->_num = NPAGES - 1; _spanLists[newSpan->_num].PushFront(newSpan); return NewSpan(k);//通过递归将大span分为小span }