前言:(Memory Pool)是一种内存分配方式,又被称为固定大小区块规划(fixed-size-blocks allocation)。通常我们习惯直接使用new、malloc等API申请分配内存,这样做的缺点在于:由于所申请内存块的大小不定,当频繁使用时会造成大量的内存碎片并进而降低性能。
秋招可以写进简历的6个实战项目:
- 深入理解内存管理:优化你的C++代码
- 高效C++项目实战:秋招简历项目解析(提供源码下载)
- 基于Qt框架实战:MP3音乐播放器搜索引擎构
- 建高性能云原生应用:使用Golang的实践指南
- 开启全新存储时代:SPDK文件系统项目实战指南
- 高速读写、负载均衡:基础架构KV存储项目最佳实践
- FFmpeg+SDL播放器开发实践:解析、解码、渲染全流程详解
- 项目介绍
当前项目是实现一个高并发的内存池,他的原型是google的一个开源项目tcmalloc,tcmalloc全称Thread-Caching Malloc,即线程缓存的malloc,实现了高效的多线程内存管理,用于替代系统的内存分配相关的函数(malloc、free)。
这个项目的要求的知识储备?
这个项目会用到C/C++、数据结构(链表、哈希桶)、操作系统内存管理、单例模式、多线程、互斥锁等等方面的知识。
一、基础概念
1.1什么是内存池
所谓池化技术,就是程序先向系统申请过量的资源,然后自己管理,当程序中需要申请内存时,不是直接向操作系统申请,而是直接从内存池中获取,释放内存时也不是将内存返回给操作系统,而是返回内存池中。
因为每次申请该资源都有较大的开销,这样提前申请好了,使用时就会非常快捷,能够大大提高程序运行效率。在计算机中有很多使用这种池技术的地方,例如线程池、连接池等。
动态内存申请malloc
C++中动态申请内存都是通过malloc去申请的,但实际上我们并不是直接去堆中获取内存的,而malloc就是一个内存池。
malloc() 相当于向系统 “批发” 了一块较大的内存空间,然后“零售” 给程序使用,当全部使用完或者程序有大量内存需求时,再根据需求向操作系统申请内存。
1.2内存池
内存池是指程序预先从操作系统申请一块足够大内存,此后,当程序中需要申请内存的时候,不是直接向操作系统申请,而是直接从内存池中获取;同理,当程序释放内存的时候,并不真正将内存返回给操作系统,而是返回内存池。当程序退出(或者特定时间)时,内存池才将之前申请的内存真正释放。
内存池主要解决的问题
内存池解决
- 效率(主要)
- 内存碎片。(系统的内存分配器的角度)
那么什么是内存碎片呢?
外部碎片:一些空闲的连续内存区域太小,这些内存空间不连续,以至于合计的内存足够,但是不能满足一些的内存分配申请需求
内部碎片:由于一些对齐的需求,导致分配出去的空间中一些内存无法被利用。
1.3malloc
C/C++中我们要动态申请内存都是通过malloc去申请内存,但是我们要知道,实际我们不是直接去堆获取内存的。而malloc就是一个内存池。malloc() 相当于向操作系统“批发”了一块较大的内存空间,然后“零售”给程序用。当全部“售完”或程序有大量的内存需求时,再根据实际需求向操作系统“进货”。
malloc的实现方式有很多种,一般不同编译器平台用的都是不同的。比如windows的vs系列用的微软自己写的一套,linux gcc用的glibc中的ptmalloc。
原理图
二、定长内存池设计
2.1概述
作为程序员(C/C++)我们知道申请内存使用的是malloc,malloc其实就是一个通用的大众货,什么场景下都可以用,但是什么场景下都可以用就意味着什么场景下都不会有很高的性能,下面我们就先来设计一个定长内存池。
2.2设计思路
开辟内存:
- 使用malloc开辟一大块内存,让_memory指针指向这个大块内存
- _memory 设置为char* 类型,是为了方便切割时_memory向后移动多少字节数。
申请内存:
- 将_memory强转为对应类型,然后赋值给对方,_memory指针向后移动对应字节数即可。
- 如果有存在已经切割好的小块内存,则优先使用小块内存。
释放内存:
- 用类型链表的结构来进行存储。
- 用当前小块内存的头4字节存储下一个小块内存的地址,最后用_freeList指针指向第一个小块内存的地址(并不是将内存释放给操作系统)
- 所以开辟内存时,开辟的内存大小必须大于或等于一个指针类型的大小。
代码位置:高并发内存池项目中的ObjectPool.h 文件。
2.3内容讲解
注意:核心三变量
char* _memory = nullptr; // 指向大块内存的指针 size_t _remainBytes = 0; // 大块内存分割过程中剩余的字节数 void* _freeList = nullptr; // 还回来的内存块用自由链接表的头指针链接起来
剩余内存字节数必须大于我们要申请的类型内存空间,回收内存块,我们只需要借助内存块头四个或八个字节(平台不同,指针字节数不同)储存下个内存块的地址。
链表链接内存块方式:数据结构头插思想
存储方式
*(void**)obj = _freeList;
程序结束申请的内存自动回收(内存池,我们不需要关心;回收问题)
实现思路:先从freeList(回收链表)中查找是否还有内存块
有就返回内存块地址,没有— 接下来从 _memory(这个大的内存块)中取;如果内存不足,再申请一大块内存。
2.4代码实现
.h文件
#pragma once #include <iostream> #include <vector> #include <time.h> using std::cout; using std::endl; #ifdef _WIN32 #include<windows.h> #else #endif // 直接去堆上按页申请空间 inline static void* SystemAlloc(size_t kpage) { #ifdef _WIN32 void* ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE); #else // linux下brk mmap等 #endif if (ptr == nullptr) throw std::bad_alloc(); return ptr; } template <class T> class myObject { public: T* New() { // 优先把还回来内存块对象,再次重复利用 T* obj = nullptr; if (_freeList) { // 头删 void* next= *(void**)_freeList; obj = (T*)_freeList; _freeList = next; } else { // 剩余内存不够一个对象大小时,则重新开大块空间 if (_remainBytes < sizeof(T)) { _remainBytes = 1024 * 128; //_memory = (char*)malloc(_remainBytes); _memory = (char*)SystemAlloc(_remainBytes >> 13); // 按页申请内存 if (_memory == nullptr) { throw std::bad_alloc(); } } obj = (T*)_memory; size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T); //申请内存空间必须大于或等于一个指针的空间 _memory += objSize; _remainBytes -= objSize; } // 定位new,显示调用T的构造函数初始化 new(obj)T; return obj; } void Delete(T* obj) { // 显示调用析构函数清理对象 obj->~T(); // 头插 *(void**)obj = _freeList; // 取前四或八个字节 _freeList = obj; } private: char* _memory = nullptr; // 指向大块内存的指针 size_t _remainBytes = 0; // 大块内存分割过程中剩余的字节数 void* _freeList = nullptr; // 还回来的内存块用自由链接表的头指针链接起来 };
【文章福利】小编推荐自己的Linux内核技术交流群:【 865977150】整理了一些个人觉得比较好的学习书籍、视频资料共享在群文件里面,有需要的可以自行添加哦!!!
2.5效率(malloc与定长内存池)
测试代码:.cpp文件
struct TreeNode { int _val; TreeNode* _left; TreeNode* _right; TreeNode() :_val(0) , _left(nullptr) , _right(nullptr) {} }; void TestObjectPool() { // 申请释放的轮次 const size_t Rounds = 100; // 每轮申请释放多少次 const size_t N = 1000000; std::vector<TreeNode*> v1; v1.reserve(N); size_t begin1 = clock(); for (size_t j = 0; j < Rounds; ++j) { for (int i = 0; i < N; ++i) { v1.push_back(new TreeNode); } for (int i = 0; i < N; ++i) { delete v1[i]; } v1.clear(); } size_t end1 = clock(); std::vector<TreeNode*> v2; v2.reserve(N); myObject<TreeNode> TNPool; size_t begin2 = clock(); for (size_t j = 0; j < Rounds; ++j) { for (int i = 0; i < N; ++i) { v2.push_back(TNPool.New()); } for (int i = 0; i < N; ++i) { TNPool.Delete(v2[i]); } v2.clear(); } size_t end2 = clock(); cout << "new cost time:" << end1 - begin1 << endl; cout << "object pool cost time:" << end2 - begin2 << endl; }
测试结果图:
结论:明显内存池效率更快。
三、高并发内存池整体设计框架
现代很多的开发环境都是多核多线程,在申请内存的场景下,必然存在激烈的锁竞争问题。malloc本身其实已经很优秀,那么我们项目的原型tcmalloc就是在多线程高并发的场景下更胜一筹,所以这次我们实现的内存池需要解决以下几方面的问题。
- 1.性能问题。
- 2.多线程环境下,锁竞争问题。
- 3.内存碎片问题。
3.1三层设计思路
第一层:thread cache(线程缓存):
每个线程独享一个thread cache,用于小于256k的内存分配情况,线程从这里申请内存不需要加锁(因为其他线程无法访问当前线程的 thread cache,没有竞争关系)
第二层:central cache(中心缓存):
所有线程共享一个central cache,thread cache 是按需从central cache中获取对象,central cache在合适的时候会收回thread cache中的对象,避免一个线程占用太多资源。
central cache 是所有线程共享的,所以存在竞争关系,需要加锁;这里使用的锁是桶锁,并且因为只有thread cache没有内存时才会申请内存,所以这里竞争不会太激烈。
第三层:page cache(页缓存):
页缓存存储的内存是以页为单位进行存储的及分配的。central cache没有内存时,则会从page cache中申请一定数量的page,并切割成定长大小的小内存块。
当一个span的几个跨度页的对象都回收后,page cache会回收central cache满足条件的span对象,并且合并相邻的页,组成更大的页,缓解内存碎片的问题。
3.2thread cache 设计思路(线程缓存)
thread cache是哈希桶结构,每个桶是一个按桶位置映射大小的内存块对象的自由链表。每个线程都会有一个thread cache对象,这样每个线程在这里获取对象和释放对象时是无锁的。
线程申请内存:
线程申请内存时,会有不同规格的内存申请(4字节、5字节等),根据范围划定不同的自由链表,设计多个自由链表管理不同规格的内存小块。实质就相当于使用多个定长内存池的自由链表,每个内存小块采用向上对齐原则(可能会出现多申请内存的情况,这就是内碎片)
例如:
- 需要9字节,则开辟一个大小为2个8字节的空间的节点
- 需要100字节,则开辟一个大小为13个8字节的空间的节点。
线程对齐规则:
整体控制在最多10%左右的内碎片浪费,总计设计208个桶,[0,15]个桶,每个桶的对齐数相差8字节(最高128字节对齐),[16,71]个桶,每个桶的对齐数相差16字节(最高1024字节对齐)以此类推。
注意:_freeLists是一个数组,每个元素都是自由链表类型(即存储自由链表的头结点)
线程释放内存
- 释放内存后:采用自由链表的结构来管理切好的小块内存(每一个切分好的小块内存就是一个节点)
- 具体方法是:用切分好的小块内存的前4字节或8字节来存储下一个小块内存的地址。
- 插入节点时,采用头插的方式。
thread cache代码框架
本质:就相当于一个数组,每个数组挂个桶;而每个桶就类似于上面那个定长内存池一样。
相关数据:
static const size_t MAX_BYTES = 256 * 1024; // 一页8kb static const size_t NFREELIST = 208; // 总共桶的个数
- index:表示_freeLists数组的下标也就是桶的位置
- size:表示一块内存的字节数
- ptr:表示需要挂起的内存地址
(ThreadCache.h文件)
// thread cache本质是由一个哈希映射的对象自由链表构成 class ThreadCache { public: // 申请和释放内存对象 void* Allocate(size_t size); void Deallocate(void* ptr, size_t size); // 从中心缓存获取对象 void* FetchFromCentralCache(size_t index, size_t size); private: FreeList _freeLists[NFREELIST]; }; // TLS thread local storage static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;
- RoundUp函数:表示内存对齐数(例如:对齐数为8,size是7或者9对齐后实际分配内存大小为8或16)
- Index函数:根据size大小计算出_freeLists数组下标位置(也就是桶的位置)
- Empty函数:判断_freeLists数组下标位置挂接内存是否为空
- Pop函数:_freeLists数组下标位置内存块头删取出来并返回该内存块地址
- Push函数:_freeLists数组下标位置内存块头插挂接该内存块
(ThreadCache.cpp文件)
// 申请内存对象, 空间先到_freeLists链表中去取,如果没有去中心缓存获取空间 void* ThreadCache::Allocate(size_t size) { assert(size <= MAX_BYTES); size_t alignSize = SizeClass::RoundUp(size); size_t index = SizeClass::Index(size); if (!_freeLists[index].Empty()) { return _freeLists[index].Pop(); } else { return FetchFromCentralCache(index, alignSize); } } // 释放内存对象,空间挂起到_freeLists链表中去 void ThreadCache::Deallocate(void* ptr, size_t size) { assert(ptr); assert(size <= MAX_BYTES); // 找对映射的自由链表桶,对象插入进入 size_t index = SizeClass::Index(size); _freeLists[index].Push(ptr); }
(common.h文件)
static void*& NextObj(void* obj) // 前四个或八个字节(下一个结点的地址) { return *(void**)obj; } // 管理小对象的自由链表 class FreeList { public: void Push(void* obj) { // 头插 NextObj(obj) = _freeList; _freeList = obj; } void* Pop() { // 头删 void* obj = _freeList; _freeList = NextObj(obj); return obj; } bool Empty() { return _freeList == nullptr; } private: void* _freeList = nullptr; };
自由链表的哈希桶跟对象大小的映射关系
(common.h文件)
// 计算对象大小的对齐映射规则 class SizeClass { public: // 整体控制在最多10%左右的内碎片浪费 // [1,128] 8byte对齐 freelist[0,16) // [128+1,1024] 16byte对齐 freelist[16,72) // [1024+1,8*1024] 128byte对齐 freelist[72,128) // [8*1024+1,64*1024] 1024byte对齐 freelist[128,184) // [64*1024+1,256*1024] 8*1024byte对齐 freelist[184,208) static inline size_t _RoundUp(size_t bytes, size_t alignNum) // 算出桶的大小(即size的对齐数) { return ((bytes + alignNum - 1) & ~(alignNum - 1)); } // 对齐大小计算 static inline size_t RoundUp(size_t size) { // 确定每个区间的对齐数 if (size <= 128) { return _RoundUp(size, 8); } else if (size <= 1024) { return _RoundUp(size, 16); } else if (size <= 8 * 1024) { return _RoundUp(size, 128); } else if (size <= 64 * 1024) { return _RoundUp(size, 1024); } else if (size <= 256 * 1024) { return _RoundUp(size, 8 * 1024); } else { assert(false); return -1; } } static inline size_t _Index(size_t bytes, size_t align_shift) { return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1; // 桶下标从0开始计数 } // 计算映射的哪一个自由链表桶 static inline size_t Index(size_t bytes) { assert(bytes <= MAX_BYTES); // 每个区间有多少个链 static int group_array[4] = { 16, 56, 56, 56 }; if (bytes <= 128) { return _Index(bytes, 3); } else if (bytes <= 1024) { return _Index(bytes - 128, 4) + group_array[0]; } else if (bytes <= 8 * 1024) { return _Index(bytes - 1024, 7) + group_array[1] + group_array[0]; } else if (bytes <= 64 * 1024) { return _Index(bytes - 8 * 1024, 10) + group_array[2] + group_array[1] + group_array[0]; } else if (bytes <= 256 * 1024) { return _Index(bytes - 64 * 1024, 13) + group_array[3] + group_array[2] + group_array[1] + group_array[0]; } else { assert(false); } return -1; } };
3.3线程缓存无锁设计(TLS)
(ConcurrentAlloc.h文件)
#pragma once #include "Common.h" #include "ThreadCache.h" static void* ConcurrentAlloc(size_t size) { // 通过TLS 每个线程无锁的获取自己的专属的ThreadCache对象 if (pTLSThreadCache == nullptr) { pTLSThreadCache = new ThreadCache; } cout << std::this_thread::get_id() << ":" << pTLSThreadCache << endl; return pTLSThreadCache->Allocate(size); } static void ConcurrentFree(void* ptr, size_t size) { assert(pTLSThreadCache); pTLSThreadCache->Deallocate(ptr, size); }
- TLS:thread local storage 线程本地存储(linux和Windows下有各自的TLS)
- TLS是一种变量的存储方式,这个变量在它所在的线程内是全局可访问的,但是不能被其他线程访问,这样就保证了线程的独立性。
- 静态TLS使用方法:
- _declspec(thread) DWORD data=0;
- 声明了一个 _declspec(thread) 类型的变量,会为每一个线程创建一个单独的拷贝。
原理:
- 在x86 CPU上,将为每次引用的静态TLS变量生成3个辅助机器指令
- 如果在进程中创建子线程,那么系统将会捕获它并且自动分配一另一个内存块,以便存放新线程的静态TLS变量。
//每个线程刚创建时,就会获得这个指针,每个线程的指针是不同的。 static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr; //static 修饰,可以保证变量只在当前文件可见。
简单测试一下:
void Alloc1() { for (size_t i = 0; i < 5; ++i) { void* ptr = ConcurrentAlloc(16); } } void Alloc2() { for (size_t i = 0; i < 5; ++i) { void* ptr = ConcurrentAlloc(17); } } void TLSTest() { std::thread t1(Alloc1); t1.join(); std::thread t2(Alloc2); t2.join(); } int main() { //TestObjectPool(); TLSTest(); return 0; }
3.4central cache设计思路(中心缓存)
central cache也是一个哈希桶结构,他的哈希桶的映射关系跟thread cache是一样的。不同的是他的每个哈希桶位置挂是SpanList链表结构,不过每个映射桶下面的span中的大内存块被按映射关系切成了一个个小内存块对象挂在span的自由链表中。
注意:
- span是双向链表,而span下挂载的小块内存对象是单链表。
- 中心缓存需要加桶锁。
- _spanLists 是一个数组,数组中每个元素都是一个span自由链表的_head头指针。
- 每个span又是一个单向自由链表。
内存申请:
- 当thread cache中没有内存时,就会批量向central cache申请一些内存对象,从对应的span中取出小块内存对象给thread cache,这个过程是需要加锁的(加桶锁)
- 这里批量获取对象的数量使用了类似网络tcp协议拥塞控制的慢开始算法。
- 如果所有的span都为空了(即central cache使用完了),则将空的span链在一起,向page cache申请一个span对象,span对象中是一些以页为单位的内存,需要切成对应的小块内存对象,并链接起来,挂到span中。
- central cache中每一个span都有一个use_count,分配一个对象给thread cache,就加加。
内存释放:
- 当thread_cache将内存释放回central cache中的时,释放回来就减减use_count。
- 当use_count减到0时则表示所有对象都回到了span,则将span释放回page cache,page cache中会对前后相邻的空闲页进行合并。
CentralCache 代码框架:
- 注意:
- CentrealCache 是多个线程都可以进行访问的。我们设计时需要考虑下面两点
- 单例模式 2. 临界资源加锁
(CentralCache.h文件)
#pragma once #include "common.h" // 单例模式---饿汉模式 class CentralCache { public: static CentralCache* GetInstance() { return &_sInst; } // 获取一个非空的span Span* GetOneSpan(SpanList& list, size_t byte_size); // list: 表示_spanLists中span的_freeList连续挂接空间的数量 // 从中心缓存获取一定数量的对象给thread cache size_t FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size); // size:内存空间大小;batchNum:内存空间个数 // start 和 end 表示内存空间地址起始到终末范围 private: SpanList _spanLists[NFREELIST]; private: CentralCache() {}; CentralCache(const CentralCache& t) = delete; static CentralCache _sInst; // 声明 };
GetOneSpan函数:需要到PageCache模块去讲解(CentralCache.cpp文件)
CentralCache CentralCache::_sInst; //定义 // 获取一个非空的span Span* CentralCache::GetOneSpan(SpanList& list, size_t byte_size) { // ... return nullptr; } // 从中心缓存获取一定数量的对象给thread cache size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size) { // 每个桶加锁 size_t index = SizeClass::Index(size); _spanLists[index]._mtx.lock(); Span* span = GetOneSpan(_spanLists[index], size); assert(span); assert(span->_freeList); // 从span中获取batchNum个对象 // 如果不够batchNum个,有多少拿多少 start = span->_freeList; end = start; size_t i = 0, actualNum = 1; while (i < batchNum - 1 && NextObj(end) != nullptr) // 取连续内存块,当batchNum超过原有时,按拥有的数量给 { end = NextObj(end); ++i; ++actualNum; } span->_freeList = NextObj(end); // 剩余空间仍挂在Span中的_freeList去 NextObj(end) = nullptr; // 最后end记录下一个空间地址置空 span->_useCount += actualNum; _spanLists[index]._mtx.unlock(); // 解锁 return actualNum; }
以页为单位的大内存管理span的定义及spanlist定义(common.h文件)
// Span管理一个跨度的大块内存 // 管理以页为单位的大块内存 struct Span { PAGE_ID _pageId = 0; // 页号 size_t n = 0; // 页数 // 双向链表的结构 Span* next = nullptr; Span* prev = nullptr; size_t _useCount = 0; // 使用计数,切好的小块内存分配给ThreadCache void* _freeList = nullptr; // 大块内存切小链接起来,这样回收回来的内存也方便链接 }; // 带头双向循环链表 class SpanList { public: SpanList() { _head = new Span; _head->next = _head; _head->prev = _head; } Span* Begin() { return _head->next; } Span* End() { return _head; } void Insert(Span* pos, Span* newSpan) { // 头插 Span* prev = pos->prev; prev->next = newSpan; newSpan->prev = prev; newSpan->next = pos; pos->prev = newSpan; } void Erase(Span* pos) { assert(pos != _head); Span* prev = pos->prev; Span* next = pos->next; prev->next = next; next->prev = prev; } private: Span* _head; public: std::mutex _mtx; // 桶锁: 数组_spanLists中每一个位置的桶都有一把锁 };
慢开始反馈调节算法
补全hreadCache.h中FetchFromCentralCache函数只声明未实现
完善common.h文件
1)FreeList中成员变量和函数:
void PushRange(void* start, void* end) { // 一连串空间存储 NextObj(end) = _freeList; _freeList = start; }
MaxSize函数调节页数:
2)类SizeClass的静态成员函数
// 一次从中心缓存获取多少个 static size_t NumMoveSize(size_t size) { assert(size > 0); // [2, 512],一次批量移动多少个对象的(慢启动)上限值 // 小对象一次批量上限高 // 小对象一次批量上限低 int num = MAX_BYTES / size; if (num < 2) num = 2; if (num > 512) num = 512; return num; }
(ThreadCache.cpp文件)
// 慢开始反馈调节算法 // 1、最开始不会一次向central cache一次批量要太多,因为要太多了可能用不完 // 2、如果你不要这个size大小内存需求,那么batchNum就会不断增长,直到上限 // 3、size越大,一次向central cache要的batchNum就越小 // 4、size越小,一次向central cache要的batchNum就越大 void* ThreadCache::FetchFromCentralCache(size_t index, size_t size) { // 慢开始反馈调节法 不会开始要太多内存 size_t batchNum = std::min(_freeLists[index].MaxSize(), SizeClass::NumMoveSize(size)); if (batchNum == _freeLists[index].MaxSize()) { ++_freeLists[index].MaxSize(); } void* start = nullptr; void* end = nullptr; size_t actualNum = CentralCache::GetInstance()->FetchRangeObj(start, end, batchNum, size); // 获取连续实际内存的数目 assert(actualNum >= 1); // 至少一个 if (actualNum == 1) { assert(start == end); } else { _freeLists[index].PushRange(NextObj(start), end); } return start; }
3.5page cache设计思路(页缓存)
page cache中也是哈希桶结构,但是每个节点存储的都是span,因为页缓存是所有线程共享的,只需要定义一个对象,所以这里将 page cache 设计为单例模式(这里采用饿汉模式的设计方法)。
注意:page cache需要加整体锁(因为是所有线程共享的)
申请内存:
- 当central cache向page cache申请内存时,page cache先检查对应位置有没有span,如果没有则向更大页寻找一个span,如果找到则分裂成两个。
- 例如:申请的是4page,4page后面没有挂span,则向后面寻找更大的span,假设在10page位置找到一个span,则将10page span分裂为一个4page span和一个6page span,把4page的span返回给central cache,把6page的span挂载到对应的6号桶中去。
- 如果找到128 page都没有合适的span,则向系统使用mmap、brk或者是VirtualAlloc等方式申请一个128page 的span挂在自由链表中,再重复1中的过程。
- 第一次申请内存时,page cache是空的,经过上面的过程后会向堆申请一个128page的span,然后重复上面的过程,将128page的span进行分裂和重新挂载。
释放内存:
如果central cache释放回一个span,则依次寻找span的前后page id的span,看是否可以合并,如果合并继续向前寻找。这样就可以将切小的内存合并收缩成大的span,减少内存碎片。
PageCache 代码框架:
- page cache是一个以页为单位的span自由链表
- 为了保证全局只有唯一的page cache,这个类被设计成了单例模式。
- 注意线程安全,需要锁(此处我们采用整体锁)
相关数据:
static const size_t MAX_BYTES = 256 * 1024; // 一页8kb static const size_t NFREELIST = 208; // central and thread总共桶的个数 static const size_t NPAGES = 129; // page总共桶的个数; 数组下标从1开始总共128页 static const size_t PAGE_SHIFT = 13; // 一页字节数2^13
(PageCache.h文件)
#pragma once #include "common.h" class PageCache { public: static PageCache* GetInstance() { return &_sInst; } // 获取一个k页的span Span* NewSpan(size_t k); std::mutex _pageMtx; private: SpanList _spanLists[NPAGES]; std::unordered_map<PAGE_ID, Span*> _idSpanMap; // 页号于span映射;便于回收内存 PageCache() {}; PageCache(const PageCache& t) = delete; static PageCache _sInst; };
完善一下common.h文件:类SpanList
void PushFront(Span* span) // 头插 { Insert(Begin(), span); } Span* PopFront() // 头删 { Span* front = _head->_next; Erase(front); return front; } bool Empty() { return _head->_next == _head; }
(PageCache.cpp文件)
注意:地址除以一页的字节数就是页号(Page中_pageId)Page中_n 表示几页(_n<<13表示这块内存的大小)
// 获取一个k页的span Span* PageCache::NewSpan(size_t k) { assert(k > 0 && k < NPAGES); // 先检查第k个桶里面有没有span if (!_spanLists[k].Empty()) { return _spanLists[k].PopFront(); // 有就直接pop头结点并返回 } // 检查一下后面的桶里面有没有span, 如果有可以把他它进行切分 for (size_t i = k + 1; i < NPAGES; ++i) { if (!_spanLists[i].Empty()) { // 进行切分 Span* nSpan = _spanLists[i].PopFront(); // i:位置取出它头结点 Span* kSpan = new Span; // 创k一个结点 kSpan->_pageId = nSpan->_pageId; // 页的起始位置 nSpan->_pageId += k; // 分出k页;n页位置向后走(相当与一大块内存取出头一小块内存) kSpan->_n = k; // 得到k页 nSpan->_n -= k; // 剩余空间; 挂接到nSpan->_n即(i-k)页的位置 _spanLists[nSpan->_n].PushFront(nSpan); // 存储nSpan的首位页号跟nSpan映射,方便page cache回收内存时 //进行的合并查找 _idSpanMap[nSpan->_pageId] = nSpan; _idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan; // 例如:20 ~24(5页) // 建立id和span的映射; 方便(centralcache)回收小块内存 for (size_t i = 0; i < kSpan->_n; ++i) { _idSpanMap[kSpan->_pageId + i] = kSpan; } return kSpan; } } //走到这个位置就说明后面没有大页的span了 //这时就去找堆要一个128页(最大页---NPAGES-1)的span Span* bigSpan = new Span; void* ptr = SystemAlloc(NPAGES - 1); bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT; // 地址除以13;就是页号 bigSpan->_n = NPAGES - 1; // 内存挂接到相应位置 _spanLists[bigSpan->_n].PushFront(bigSpan); // 函数递归调用自己;减少代码冗余(上面的空间切分)在时间稍微影响,基本上没影响 return NewSpan(k); }
windows和Linux下如何直接向堆申请页为单位的大块内存:
VirtualAlloc
brk和mmap
我们按一页8k(8*1024)也就是2^13(1<<13)
(common.h文件)
#ifdef _WIN32 #include<windows.h> #else // linux #endif // 直接去堆上按页申请空间 inline static void* SystemAlloc(size_t kpage) { #ifdef _WIN32 void* ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE); #else // linux下brk mmap等 #endif if (ptr == nullptr) throw std::bad_alloc(); return ptr; }
获取一个非空的span
完善一下common.h文件
- 类SizeClass
// 计算一次向系统获取几个页 // 单个对象 8byte // ... // 单个对象 256KB static size_t NumMovePage(size_t size) { size_t num = NumMoveSize(size); // 要几页 size_t npage = num * size; //总字节数 npage >>= PAGE_SHIFT; // 计算多少页,一页8k(2^13);则PAGE_SHIFT为13 if (npage == 0) npage = 1; return npage; }
Span* CentralCache::GetOneSpan(SpanList& list, size_t byte_size) { // 先从桶的span中找内存 Span* it = list.Begin(); while (it != list.End()) { if (it->_freeList != nullptr) { return it; } it = it->_next; } // 先把central 桶锁解除:这样如果某他线程释放内存对象回来,不会阻塞 list._mtx.unlock(); // 访问page时 需要加锁 PageCache::GetInstance()->_pageMtx.lock(); // 没有空闲的span去PageCache中要 Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(byte_size)); // 获取一个k页的span PageCache::GetInstance()->_pageMtx.unlock(); //对获取span进行切分,不需要加锁,因为其他线程拿不到这个span char* start = (char*)(span->_pageId << PAGE_SHIFT); // k页的span的起始地址 size_t bytes = span->_n << PAGE_SHIFT; // 内存字节数 char* end = start + bytes; // 把大块内存切成自由链表链表 // 1. 先切一块作头,然后尾插(这样保证物理空间顺序与逻辑顺序保持一致) span->_freeList = start; start += byte_size; void* tail = span->_freeList; while (start < end) { NextObj(tail) = start; tail = NextObj(tail); start += byte_size; } NextObj(tail)=nullptr; // 挂节的时候恢复cengtral的锁 list._mtx.lock(); list.PushFront(span); return span; }