六、回收内存
6.1 ThreadCache
当某个线程申请的内存不使用了,可以将其归还给ThreadCache。ThreadCache将该内存块插入到对应哈希桶的自由链表中即可
但是随着线程不断的释放,对应自由链表的长度也会越来越长,这些内存堆积在一个ThreadCache中却没被使用就会浪费,可以这些内存还给CentralCache。归还后这些内存对其他线程来说也是可申请的,因此当ThreadCache某个桶当中的自由链表太长时可以进行一些处理
若ThreadCache某个桶当中自由链表的长度 大于等于 其一次批量向CentralCache申请的内存块上限值_maxSize,那么此时就把该自由链表当中的这些内存块还给CentralCache
void ThreadCache::Deallocate(void* ptr, size_t size) { assert(ptr != nullptr); assert(size <= MAX_BYTES); size_t bucketIndex = DataHandleRules::Index(size); _freeLists[bucketIndex].Push(ptr); //当链表长度大于一次批量申请的内存时就归还一段自由链表上的内存给CentralCache if (_freeLists[bucketIndex].Size() == _freeLists[bucketIndex].MaxSize()) ListTooLong(_freeLists[bucketIndex], size); }
当自由链表的长度大于一次批量申请的对象时,从该自由链表中取出一次批量个数的对象,然后将取出的这些对象还给CentralCache中对应的Span即可
void ThreadCache::ListTooLong(FreeList& list, size_t size) { void* start = nullptr, * end = nullptr; list.PopRange(start, end, list.MaxSize()); CentralCache::GetInstance()->ReleaseListToSpans(start, size); }
6.2 CentralCache
当ThreadCache中某个自由链表太长时,会将自由链表当中的这些内存块还给CentralCache中的Span,但是还给CentralCache的内存块不一定都是属于同一个Span的。CentralCache中的每个哈希桶中可能都不止一个Span,因此当计算出还回的内存应该还给CentralCache的哪一个桶后,还需要知道这些对象到底应该还给这个桶当中的哪一个Span
根据小内存块的地址得到其所在的页号
某个页当中的所有地址除以页的大小都等该页的页号。比如假设一页的大小是100,那么地址0~99都属于第0页,并且除以100都等于0,而地址100~199都属于第1页,它们除以100都等于1
找到小内存块对应的Span
虽然可以通过内存块的地址得到其所在的页号,但还是不能知道这个内存块属于哪个Span。因为一个Span管理的可能是多个页
为了解决这个问题,可以建立页号和Span之间的映射。由于这个映射关系在PageCache进行Span的合并时也需要用到,因此直接将其存放到PageCache中。
所以就需要在PageCache类中添加一个映射关系了,可以用C++当中的unordered_map进行实现,并且添加一个函数接口,用于让CentralCache获取这里的映射关系。(下面代码中只展示了PageCache类当中新增的成员)
//单例模式 class PageCache { public: Span* MapObjectToSpan(void* obj);//获取从对象到span的映射 private: std::unordered_map<PAGE_ID, Span*> _idSpanMap; };
每当PageCache分配Span给CentralCache时,都需要记录一下页号和Span之间的映射关系。此后当ThreadCache还对象给CentralCache时,才知道应该还给哪一个Span
因此当CentralCache在调用NewSpan()接口向PageCache申请k页的Span时,PageCache在返回这个k页的Span给CentralCache之前,应该建立这k个页号与其Span之间的映射关系
Span* PageCache::NewSpan(size_t k)//获取一个k页的span { assert(k > 0); //检查第k个桶中是否有可用的span if (!_spanLists[k].IsEmpty()) { Span* kSpan = _spanLists[k].PopFront(); for (PAGE_ID i = 0; i < kSpan->_num; ++i) { _idSpanMap[kSpan->_pageId + i] = kSpan; } return kSpan; } //检查后续的桶中是否有可用的span,若有则进行切分 for (int i = k + 1; i < NPAGES; ++i) { if (!_spanLists[i].IsEmpty()) { Span* nSpan = _spanLists[i].PopFront(); Span* kSpan = _spanPool.New(); kSpan->_pageId = nSpan->_pageId; nSpan->_pageId += k; kSpan->_num = k; nSpan->_num -= k; _spanLists[nSpan->_num].PushFront(nSpan); //建立id和span的映射,方便central cache回收小块内存时,查找对应的span for (PAGE_ID i = 0; i < kSpan->_num; ++i) { _idSpanMap[kSpan->_pageId + i] = kSpan; } return kSpan; } } //运行此处则说明PageCache中没有可用的span,需向堆中申请128页的span Span* newSpan = _spanPool.New(); void* address = SystemAlloc(NPAGES - 1); newSpan->_pageId = (PAGE_ID)address >> PAGE_SHIFT; newSpan->_num = NPAGES - 1; _spanLists[newSpan->_num].PushFront(newSpan); return NewSpan(k);//通过递归将大span分为小span }
此时就可以通过小内存块的地址 找到其对应页号 再找到其对应的Span了,直接将该内存块的地址除以页的大小得到页号,然后在unordered_map当中找到其对应的Span即可
//获取从小内存块到span的映射 Span* PageCache::MapObjectToSpan(void* obj) { PAGE_ID id = (PAGE_ID)obj >> PAGE_SHIFT; //页号 auto ret = _idSpanMap.find(id); if (ret != _idSpanMap.end()) { return ret->second; } else { assert(false); return nullptr; } }
此时当ThreadCache还对象给CentralCache时,就可以依次遍历这些内存块,将这些内存块插入到其对应Span的自由链表当中,并且及时更新该Span的_useCount计数即可
在ThreadCache还内存块给CentralCache的过程中,若CentralCache中某个Span的_useCount减到0时,说明这个Span分配出去的内存块全部都回来了,那么此时就可以将这个Span再进一步还给PageCache
void CentralCache::ReleaseListToSpans(void* start, size_t size) { size_t bucketIndex = DataHandleRules::Index(size); _spanLists[bucketIndex]._mtx.lock(); while (start) { void* next = NextObj(start); Span* span = PageCache::GetInstance()->MapObjectToSpan(start); NextObj(start) = span->_freeList; span->_freeList = start; span->_use_count--; if (span->_use_count == 0) //说明该span切分的内存块都已经归还了,该span可以归还给PageCache { _spanLists[bucketIndex].Erase(span); span->_freeList = nullptr;//看作整体,可使用页号转换为地址来找到内存 span->_next = nullptr; span->_prev = nullptr; _spanLists[bucketIndex]._mtx.unlock(); PageCache::GetInstance()->_pageMutex.lock(); PageCache::GetInstance()->ReleaseSpanToPageCache(span); PageCache::GetInstance()->_pageMutex.unlock(); _spanLists[bucketIndex]._mtx.lock(); } start = next; } _spanLists[bucketIndex]._mtx.unlock(); }
若把某个Span还给PageCache,需先将这个Span从CentralCache对应的双链表中移除,然后再将该Span的自由链表置空,因为PageCache中的Span是不需要切分成一个个的小对象的,以及该Span的前后指针也都应该置空,因为之后要将其插入到PageCache对应的双链表中。但Span当中记录的起始页号以及它管理的页数是不能清除的,否则对应内存块就找不到了
在CentralCache还Span给PageCache时也存在锁的问题,此时需要先将CentralCache中对应的桶锁解开,然后加上PageCache的大锁之后再进入PageCache进行相关操作(此时别的线程可以在桶中进行申请,并不会影响该线程归还,解开桶锁之前已经将要归还给PageCache的Span从CentralCache中的双链表中移除,其他线程无法找到该Span)
当处理完毕回到CentralCache时,除了将PageCache的大锁解开,还需立刻获得CentralCache对应的桶锁,然后将还未还完对象继续还给CentralCache中对应的Span
6.3 PageCache
若CentralCache中有某个Span的_useCount减到0了,那么CentralCache就需将这个Span还给PageCache
这个过程看似是非常简单的,PageCache只需将还回来的Span挂到对应的哈希桶上即可。但实际为了缓解内存外碎片问题,PageCache还需尝试将还回来的Span与其他空闲的Span进合并
PageCache前后页的合并
合并的过程可以分为向前合并和向后合并:
若还回来的Span的起始页号是num,该Span所管理的页数是n。那么在向前合并时,就需要判断第num-1页对应Span是否空闲,若空闲则可以将其进行合并,并且合并后还需要继续向前尝试进行合并,直到不能进行合并为止
而在向后合并时,就需要判断第num+n页对应的Span是否空闲,若空闲则可以将其进行合并,并且合并后还需要继续向后尝试进行合并,直到不能进行合并为止
因此PageCache在合并Span时需要通过页号获取到对应的Span的,这就是把页号与Span之间的映射关系存储到page cache的原因
但当通过页号找到其对应的Span时,这个Span此时可能挂在PageCache,也可能挂在Central Cache。而在只能合并挂在PageCache的Span,因为挂在CentralCache的Span中的内存块正在被其他线程使用
可是并不能通过Span结构当中的_useCount成员,来判断某个Span是在CentralCache还是在Page Cache。因为当CentralCache刚向PageCache申请到一个Span时,这个Span的_useCount就是等于0的,这时可能正在对该Span进行切分时,PageCache就把这个Span拿去进行合并了,这显然是不合理的
于是,可以在Span结构中增加一个_isUse成员,用于标记这个Span是否正在被使用,而当一个Span结构被创建时默认该Span是没有被使用的
//管理多个页的跨度结构 struct Span { Span* _prev = nullptr;//双向链表中的结构 Span* _next = nullptr; PAGE_ID _pageId = 0;//页号 size_t _num = 0;//页的数量 void* _freeList = nullptr;//自由链表 size_t _use_count = 0;//记录已分配给ThreadCache的小块内存的数量 bool _IsUse = false;//是否被使用 };
当CentralCache向PageCache申请到一个span时,需立即将该Span中的_isUse改为true
span->_isUse = true;
当CentralCache将某个Span还给PageCache时,也需将该Span的_isUse改为false
span->_isUse = false;
由于在合并PageCache当中的Span时,需要通过页号找到其对应的Span,因此PageCache中的Span也需要建立页号与Span之间的映射关系
与CentralCache中的Span不同的是,在PageCache中的Span只需建立Span的首尾页号与该Span之间的映射关系。因为当一个Span在尝试进行合并时,若是往前合并,那么只需要通过Span的尾页找到这个Span;若是向后合并,那么只需要通过Span的首页找到这个Span。即在进行合并时只需要用到Span与其首尾页之间的映射关系
因此获取k页的Span时,若是将n页的Span切成了一个k页的Span和一个n-k页的Span,除了需要建立k页Span中每个页与该Span之间的映射关系之外,还需要建立剩下的n-k页的span与其首尾页之间的映射关系
Span* PageCache::NewSpan(size_t k)//获取一个k页的span { assert(k > 0); //检查第k个桶中是否有可用的span if (!_spanLists[k].IsEmpty()) { Span* kSpan = _spanLists[k].PopFront(); for (PAGE_ID i = 0; i < kSpan->_num; ++i) { _idSpanMap[kSpan->_pageId + i] = kSpan; } return kSpan; } //检查后续的桶中是否有可用的span,若有则进行切分 for (int i = k + 1; i < NPAGES; ++i) { if (!_spanLists[i].IsEmpty()) { Span* nSpan = _spanLists[i].PopFront(); Span* kSpan = new Span; kSpan->_pageId = nSpan->_pageId; nSpan->_pageId += k; kSpan->_num = k; nSpan->_num -= k; _spanLists[nSpan->_num].PushFront(nSpan); //存储nSpan的首尾页号与nSpan建立映射关系,利于PageCache回收Central_Cache中span时进行合并页 _idSpanMap[nSpan->_pageId] = nSpan; _idSpanMap[nSpan->_pageId + nSpan->_num - 1] = nSpan; //建立id和span的映射,方便central cache回收小块内存时,查找对应的span for (PAGE_ID i = 0; i < kSpan->_num; ++i) { _idSpanMap[kSpan->_pageId + i] = kSpan; } return kSpan; } } //运行此处则说明PageCache中没有可用的span,需向堆中申请128页的span Span* newSpan = _spanPool.New(); void* address = SystemAlloc(NPAGES - 1); newSpan->_pageId = (PAGE_ID)address >> PAGE_SHIFT; newSpan->_num = NPAGES - 1; _spanLists[newSpan->_num].PushFront(newSpan); return NewSpan(k);//通过递归将大span分为小span }
此时PageCache当中的Span就都与其首尾页之间建立了映射关系,合并代码如下:
void PageCache::ReleaseSpanToPageCache(Span* span) { while (1)//向前合并 { PAGE_ID prevId = span->_pageId - 1; auto ret = _idSpanMap.find(prevId); if (ret == _idSpanMap.end()) { break; } //前面相邻页的span正在使用中 Span* prevSpan = ret->second; if (prevSpan->_IsUse == true) break; if (prevSpan->_num + span->_num > NPAGES - 1) break;//若合并后大于128页则无法管理 span->_pageId = prevSpan->_pageId; span->_num += prevSpan->_num; _spanLists[prevSpan->_num].Erase(prevSpan); delete prevSpan; } while (1)//向后合并 { PAGE_ID nextId = span->_pageId + span->_num; auto ret = _idSpanMap.find(nextId ); if (ret == _idSpanMap.end()) { break; } //后面相邻页的span正在使用中 Span* nextSpan = ret->second; if (nextSpan->_IsUse == true) break; if (nextSpan->_num + span->_num > NPAGES - 1) break;//若合并后大于128页则无法管理 span->_num += nextSpan->_num; _spanLists[nextSpan->_num].Erase(nextSpan); delete nextSpan; } _spanLists[span->_num].PushFront(span); _idSpanMap[span->_pageId] = span; _idSpanMap[span->_pageId + span->_num - 1] = span span->_IsUse = false; }
若没有通过页号获取到对应的Span,说明对应到该页的内存块还未申请,此时需停止合并
若通过页号获取到了其对应的Span,但该Span处于被使用的状态,也必须停止合并
若合并后大于128页则不能进行合并,因为PageCache无法对大于128页的Span进行管理
在合并Span时,由于这个Span是在PageCache的某个哈希桶的双链表当中,因此在合并后需要将其从对应的双链表中移除,然后再将这个被合并了的Span结构进行delete操作
在合并结束后,将合并后的Span挂入PageCache对应哈希桶的双链表中,并且需要建立该Span与其首位页之间的映射关系,便于以后能合并出更大的Span
七、存在问题以及解决
7.1 大于256KB的大块内存问题
申请过程
每个线程的ThreadCache是用于申请小于等于256KB的内存的,而对于大于256KB的内存,可以考虑直接向PageCache申请,但PageCache中最大的页也只有128页,因此若是大于128页的内存申请,则只能直接向堆申请
当申请大于256KB时,若然不能向ThreadCache申请,但分配内存时仍需向上对齐。而之前实现AlignUp()函数时,对传入字节数大于256KB的情况直接做了断言处理,因此需要对RoundUp函数稍作修改
static inline size_t AlignUp(size_t size) { if (size < 128) return _AlignUp(size, 8); else if (size < 1024) return _AlignUp(size, 16); else if (size < 8 * 1024) return _AlignUp(size, 128); else if (size < 64 * 1024) return _AlignUp(size, 1024); else if (size < 256 * 1024) return _AlignUp(size, 8 * 1024); else return _AlignUp(size, 1 << PAGE_SHIFT); }
之前的申请逻辑也需要进行修改,当申请内存的大小大于256KB时,就不向ThreadCache申请了,这时先计算出按页对齐后实际需申请的页数,然后通过调用NewSpan()申请指定页数的Span即可
static void* hcmalloc(size_t size) { if (size > MAX_BYTES) { size_t alignSize = DataHandleRules::AlignUp(size); size_t kPage = alignSize >> PAGE_SHIFT; PageCache::GetInstance()->_pageMutex.lock(); Span* span = PageCache::GetInstance()->NewSpan(kPage); PageCache::GetInstance()->_pageMutex.unlock(); void* address = (void*)(span->_pageId << PAGE_SHIFT); return address; } else { static ObjectPool<ThreadCache> threadCachePool; if (pTLSThreadCache == nullptr) pTLSThreadCache = new TreadCache; //cout << std::this_thread::get_id() << ":" << pTLSThreadCache << endl; return pTLSThreadCache->Allocate(size); } }
即申请大于256KB的内存时,会直接调用PageCache当中的NewSpan()函数进行申请
因此需要再对NewSpan()函数进行改造,当需要申请的内存页数大于128页时,就直接向堆区申请对应页数的内存块。而如果申请的内存页数是小于128页的,那就在PageCache中进行申请。因此当申请大于256KB的内存调用NewSpan()函数时也是需要加锁的,因为可能是在PageCache中进行申请的
Span* PageCache::NewSpan(size_t k)//获取一个k页的span { assert(k > 0); //大于128页的需求直接向堆区申请 if (k > NPAGES - 1) { void* address = SystemAlloc(k); Span* span = _spanPool.New(); span->_pageId = (PAGE_ID)address >> PAGE_SHIFT; span->_num = k; _idSpanMap[span->_pageId] = span;//便于归还内存时,通过地址找到对应的span return span; } //检查第k个桶中是否有可用的span if (!_spanLists[k].IsEmpty()) { Span* kSpan = _spanLists[k].PopFront(); for (PAGE_ID i = 0; i < kSpan->_num; ++i) { _idSpanMap[kSpan->_pageId + i] = kSpan; } return kSpan; } //检查后续的桶中是否有可用的span,若有则进行切分 for (int i = k + 1; i < NPAGES; ++i) { if (!_spanLists[i].IsEmpty()) { Span* nSpan = _spanLists[i].PopFront(); Span* kSpan = new Span; kSpan->_pageId = nSpan->_pageId; nSpan->_pageId += k; kSpan->_num = k; nSpan->_num -= k; _spanLists[nSpan->_num].PushFront(nSpan); //存储nSpan的首尾页号与nSpan建立映射关系,利于PageCache回收Central_Cache中span时进行合并页 _idSpanMap[nSpan->_pageId] = nSpan; _idSpanMap[nSpan->_pageId + nSpan->_num - 1] = nSpan; //建立id和span的映射,方便central cache回收小块内存时,查找对应的span for (PAGE_ID i = 0; i < kSpan->_num; ++i) { _idSpanMap[kSpan->_pageId + i] = kSpan; } return kSpan; } } //运行此处则说明PageCache中没有可用的span,需向堆中申请128页的span Span* newSpan = _spanPool.New(); void* address = SystemAlloc(NPAGES - 1); newSpan->_pageId = (PAGE_ID)address >> PAGE_SHIFT; newSpan->_num = NPAGES - 1; _spanLists[newSpan->_num].PushFront(newSpan); return NewSpan(k);//通过递归将大span分为小span }
释放过程
当释放对象时也需要判断释放对象的大小:
当释放内存块时,需先找到该内存块对应的Span。之前在申请大于256KB的内存时,已经给申请到的内存建立Span结构,并建立了起始页号与该Span之间的映射关系。此时就可以通过内存块的起始地址计算出起始页号,进而通过页号找到该对象对应的Span
static void hcfree(void* ptr,size_t size) { if (size > MAX_BYTES) { Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr); PageCache::GetInstance()->_pageMutex.lock(); PageCache::GetInstance()->ReleaseSpanToPageCache(span); PageCache::GetInstance()->_pageMutex.unlock(); } else { assert(pTLSThreadCache); pTLSThreadCache->Deallocate(ptr, size); } }
PageCache在回收Span时也需要进行判断,若该Span的大小是小于等于128页的,那么直接还给PageCache即可,PageCache会尝试对其进行合并。而若该Span的大小是大于128页的,那么说明该Span是直接向堆区申请的,直接将这块内存释放给堆区,然后将这个Span结构进行delete
void PageCache::ReleaseSpanToPageCache(Span* span) { //大于128页的内存直接归还给系统堆区 if (span->_num > NPAGES - 1) { void* address = (void*)(span->_pageId << PAGE_SHIFT); SystemFree(address); delete span; return; } //...... }
向区堆申请内存时调用的系统接口是VirtualAlloc(),则将内存释放的接口为VirtualFree(),而Linux下的brk和mmap对应的释放系统接口为sbrk和unmmap。此时可以将这些释放接口封装成一个叫做SystemFree()的接口,当需要将内存释放给堆时直接调用SystemFree()即可
inline static void SystemFree(void* ptr) { #ifdef _WIN32 VirtualFree(ptr, 0, MEM_RELEASE); #else // sbrk unmmap等 #endif }
7.2 未完全脱离使用new
tcmalloc是要在高并发场景下替代malloc进行内存申请的,因此tcmalloc在实现的时,其内部是不可能调用malloc函数的,但当前的代码中存在通过new获取到的内存,而new在底层实际上就是封装了malloc函数
为了完全脱离掉malloc函数,之前实现的定长对象内存池就该发挥作用了,代码中使用new时基本都是为Span结构的对象申请空间,而Span对象基本都是在PageCache层创建的,因此可以在PageCache类当中定义一个_spanPool,用于Span对象的申请和释放
ObjectPool<Span> _spanPool;
然后将代码中使用new的地方替换为调用定长内存池当中的New函数,将代码中使用delete的地方替换为调用定长内存池当中的Delete函数
//申请span对象 Span* span = _spanPool.New(); //释放span对象 _spanPool.Delete(span);
每个线程第一次申请内存时也都会创建其专属的ThreadCache对象,而这个ThreadCache目前也是new出来的,也需要对其进行替换
static ObjectPool<ThreadCache> threadCachePool; static std::mutex hcMtx; if (pTLSThreadCache == nullptr) { hcMtx.lock(); pTLSThreadCache = threadCachePool.New(); hcMtx.unlock(); }
将用于申请ThreadCache类对象的定长内存池定义为静态的,保持全局只有一个,让所有线程创建ThreadCache时,都在该定长内存池中申请内存即可
但在该定长内存池中申请内存时需要加锁,防止多个线程申请各自的ThreadCache对象时同时访问定长内存池而导致线程安全问题
7.3 释放对象时优化为不传对象大小
当使用malloc()函数申请内存时,需要指明申请内存的大小;当使用free()函数释放内存时,只需要传入指向这块内存的指针即可
而本博客目前讲述的内存池,在释放对象时除了需要传入指向该对象的指针,还需要传入该对象的大小
若也想在释放对象时不用传入对象的大小,那么就需要建立内存块地址与对象大小之间的关系。由于现在可以通过内存块的地址找到其对应的Span,而Span的自由链表中挂的都是相同大小的小内存块,因此可以在Span结构中再增加一个_objSize成员,该成员代表着这个Span管理的小内存块的大小
//管理多个页的跨度结构 struct Span { Span* _prev = nullptr;//双向链表中的结构 Span* _next = nullptr; PAGE_ID _pageId = 0;//页号 size_t _num = 0;//页的数量 void* _freeList = nullptr;//自由链表 size_t _use_count = 0;//记录已分配给ThreadCache的小块内存的数量 bool _IsUse = false;//是否被使用 size_t _objSize = 0;//切好的小内存块的大小 };
而所有的Span都是从PageCache中获取的,因此每当调用NewSpan()获取到一个k页的Span时,就应该将这个Span的_objSize保存下来
Span* span = PageCache::GetInstance()->NewSpan(DataHandleRules::NumMovePage(size)); span->_objSize = size;
代码中有两处:
在CentralCache获取非空Span时,若CentralCache对应的桶中没有非空的Span,此时会调用NewSpan()获取一个k页的span
另一处是当申请大于256KB内存时,会直接调用NewSpan()获取一个k页的Span
当释放小内存块时,就可以直接从内存块对应的Span中获取到该内存块的大小,准确来说获取到的是对齐以后的大小
static void hcfree(void* ptr) { Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr); size_t size = span->_objSize; if (size > MAX_BYTES) { PageCache::GetInstance()->_pageMutex.lock(); PageCache::GetInstance()->ReleaseSpanToPageCache(span); PageCache::GetInstance()->_pageMutex.unlock(); } else { assert(pTLSThreadCache); pTLSThreadCache->Deallocate(ptr, size); } }
7.4 读取映射关系时的加锁问题
页号与Span之间的映射关系是存储在PageCache类中的unordered_map的,当访问这个映射关系时是需要加锁的,因为STL容器是不保证线程安全的
若是在CentralCache访问这个unordered_map,或是在调用hcfree()函数释放内存时访问这个容器,那么就存在线程安全的问题。因为可能存在某个线程在PageCache中进行某些操作导致unordered_map修改,而其他线程可能也正在访问这个映射关系,因此当在PageCache外部访问这个映射关系时是需要加锁的
实际就是在调用PageCache对外提供访问映射关系的函数时需要加锁,可以考虑使用C++中的unique_lock
//获取从对象到span的映射 Span* PageCache::MapObjectToSpan(void* obj) { PAGE_ID id = (PAGE_ID)obj >> PAGE_SHIFT; //页号 std::unique_lock<std::mutex> lock(_pageMutex); //构造时加锁,析构时自动解锁 auto ret = _idSpanMap.find(id); if (ret != _idSpanMap.end()) { return ret->second; } else { assert(false); return nullptr; } }
八、性能瓶颈及其优化
8.1 多线程场景下性能对比
在多线程场景下对比malloc进行测试:
// ntimes 一轮申请和释放内存的次数 // rounds 轮次 void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds) { std::vector<std::thread> vthread(nworks); std::atomic<size_t> malloc_costtime = 0; std::atomic<size_t> free_costtime = 0; for (size_t k = 0; k < nworks; ++k) { vthread[k] = std::thread([&, k]() { std::vector<void*> v; v.reserve(ntimes); for (size_t j = 0; j < rounds; ++j) { size_t begin1 = clock(); for (size_t i = 0; i < ntimes; i++) { v.push_back(malloc(16)); //v.push_back(malloc((16 + i) % 8192 + 1)); } size_t end1 = clock(); size_t begin2 = clock(); for (size_t i = 0; i < ntimes; i++) { free(v[i]); } size_t end2 = clock(); v.clear(); malloc_costtime += (end1 - begin1); free_costtime += (end2 - begin2); } }); } for (auto& t : vthread) { t.join(); } cout << nworks << "个线程并发执行" << rounds << "轮次,每轮次malloc" << ntimes << "次: 花费:" << malloc_costtime << "ms" << endl; cout << nworks << "个线程并发执行" << rounds << "轮次,每轮次free" << ntimes << "次: 花费:" << free_costtime << "ms" << endl; cout << nworks << "个线程并发 malloc && free" << nworks * rounds * ntimes << "轮次,总计花费" << malloc_costtime + free_costtime << "ms" << endl; } // 单轮次申请释放次数 线程数 轮次 void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds) { std::vector<std::thread> vthread(nworks); std::atomic<size_t> malloc_costtime = 0; std::atomic<size_t> free_costtime = 0; for (size_t k = 0; k < nworks; ++k) { vthread[k] = std::thread([&]() { std::vector<void*> v; v.reserve(ntimes); for (size_t j = 0; j < rounds; ++j) { size_t begin1 = clock(); for (size_t i = 0; i < ntimes; i++) { v.push_back(hcmalloc(16)); //v.push_back(hcmalloc((16 + i) % 8192 + 1)); } size_t end1 = clock(); size_t begin2 = clock(); for (size_t i = 0; i < ntimes; i++) { hcfree(v[i]); } size_t end2 = clock(); v.clear(); malloc_costtime += (end1 - begin1); free_costtime += (end2 - begin2); } }); } for (auto& t : vthread) { t.join(); } cout << nworks << "个线程并发执行" << rounds << "轮次,每轮次hcmalloc" << ntimes << "次: 花费:" << malloc_costtime << "ms" << endl; cout << nworks << "个线程并发执行" << rounds << "轮次,每轮次hcfree" << ntimes << "次: 花费:" << free_costtime << "ms" << endl; cout << nworks << "个线程并发 hcmalloc && hcfree" << nworks * rounds * ntimes << "轮次,总计花费" << malloc_costtime + free_costtime << "ms" << endl; } int main() { size_t n = 1000; cout << "==========================================================" << endl; BenchmarkConcurrentMalloc(n, 4, 10); cout << endl << endl; BenchmarkMalloc(n, 4, 10); cout << "==========================================================" << endl; return 0; }
测试函数中,通过clock(0函数分别获取到每轮次申请和释放所花费的时间,然后将其对应累加到malloc_costtime和free_costtime上。最后就得到了nworks个线程跑rounds轮,每轮申请和释放ntimes次,这个过程申请所消耗的时间、释放所消耗的时间、申请和释放总共消耗的时间
创建线程时让线程执行的是lambda表达式,以值传递的方式捕捉了变量k,以引用传递的方式捕捉了其他父作用域中的变量,因此可以将各个线程消耗的时间累加到一起
将所有线程申请内存消耗的时间都累加到malloc_costtime上, 将释放内存消耗的时间都累加到free_costtime上,此时malloc_costtime和free_costtime可能被多个线程同时进行累加操作的,所以存在线程安全的问题。于是在定义这两个变量时使用了atomic类模板,这时对这两个变量的操作就是原子的了
固定大小内存的申请和释放
v.push_back(hcmalloc(16)); v.push_back(malloc(16));
由于此时申请释放的都是固定大小的对象,每个线程申请释放时访问的都是各自ThreadCache中的同一个桶,当ThreadCache的这个桶中没有内存块或内存块太多要归还时,也都会访问Central Cache的同一个桶,此时CentralCache中的桶锁的策略并没有起到作用
不同大小内存的申请和释放
v.push_back(hcmalloc((16 + i) % 8192 + 1)); v.push_back(malloc((16 + i) % 8192 + 1));
由于申请和释放内存的大小是不同的,此时CentralCache当中的桶锁就起作用了,hcmalloc的效率也有了较大增长,但总体来说还是差一点点
8.2 性能瓶颈分析
该项目此时与malloc之间还是有差距的,下面用VS中性能分析的工具来进行分析:
不难发现,性能的瓶颈点主要是MapObjectToSpan()中的锁问题
tcmalloc中针对这一点使用了基数树进行优化,使得在读取这个映射关系时可以做到不加锁
8.3 基数树优化
基数树实际上就是一个分层的哈希表,根据所分层数不同可分为单层基数树、二层基数树、三层基数树等
单层基数树
单层基数树实际采用的就是直接定址法,每一个页号对应Span的地址就存储数组中在以该页号为下标的位置
最坏的情况下需要建立所有页号与其Span之间的映射关系,因此这个数组中元素个数应该与页号的数目相同,数组中每个位置存储的就是对应Span的指针
// Single-level array template <int BITS> class HCMalloc_PageMap1 { private: static const int LENGTH = 1 << BITS; void** array_; public: typedef uintptr_t Number; explicit HCMalloc_PageMap1() { size_t size = sizeof(void*) << BITS; size_t alignSize = DataHandleRules::_AlignUp(size, 1 << PAGE_SHIFT); array_ = (void**)SystemAlloc(alignSize >> PAGE_SHIFT); memset(array_, 0, sizeof(void*) << BITS); } // Return the current value for KEY. Returns NULL if not yet set, // or if k is out of range. void* get(Number k) const { if ((k >> BITS) > 0) { return NULL; } return array_[k]; } // REQUIRES "k" is in range "[0,2^BITS-1]". // REQUIRES "k" has been ensured before. // // Sets the value 'v' for key 'k'. void set(Number k, void* v) { array_[k] = v; } };
代码中的非类型模板参数BITS表示存储页号最多需要bit位的个数。在32位下传入的是32-PAGE_SHIFT,在64位下传入的是64-PAGE_SHIFT。而其中的LENGTH成员代表的就是页号的数目,即
如32位平台下,以一页大小为8K为例,此时页的数目就是 ,因此存储页号最多需要19个比特位,此时传入非类型模板参数的值就是32 − 13 = 19 。由于32位平台下指针的大小是4字节,因此该数组的大小就是 * 4 = =2M,内存消耗不大,是可行的。但若是在64位平台下,此时该数组的大小是 * 8 = = G,明显是不可行的,对于64位的平台需使用三层基数树
二层基数树
以32位平台下,一页的大小为8K为例来说明,此时存储页号最多需要19个bit位
而二层基数树实际上就是把这19个比特位分为两次进行映射。如用前5个bit位在基数树的第一层进行映射,映射后得到对应的第二层,然后用剩下的bit位在基数树的第二层进行映射,映射后最终得到该页号对应的Span指针
在二层基数树中,第一层的数组占用 * 4 = Byte空间,第二层的数组最多占用 * * 4 = = 2M。二层基数树相比一层基数树的好处就是,一层基数树必须一开始就把2M的数组开辟出来,而二层基数树一开始时只需将第一层的数组开辟出来,当需要进行某一页号映射时再开辟对应的第二层的数组即可
// Two-level radix tree template <int BITS> class HCMalloc_PageMap2 { private: // Put 32 entries in the root and (2^BITS)/32 entries in each leaf. static const int ROOT_BITS = 5; static const int ROOT_LENGTH = 1 << ROOT_BITS; static const int LEAF_BITS = BITS - ROOT_BITS; static const int LEAF_LENGTH = 1 << LEAF_BITS; // Leaf node struct Leaf { void* values[LEAF_LENGTH]; }; Leaf* root_[ROOT_LENGTH]; // Pointers to 32 child nodes void* (*allocator_)(size_t); // Memory allocator public: typedef uintptr_t Number; explicit HCMalloc_PageMap2() { memset(root_, 0, sizeof(root_)); PreallocateMoreMemory(); } void* get(Number k) const { const Number i1 = k >> LEAF_BITS; const Number i2 = k & (LEAF_LENGTH - 1); if ((k >> BITS) > 0 || root_[i1] == NULL) { return NULL; } return root_[i1]->values[i2]; } void set(Number k, void* v) { const Number i1 = k >> LEAF_BITS; const Number i2 = k & (LEAF_LENGTH - 1); assert(i1 < ROOT_LENGTH); root_[i1]->values[i2] = v; } bool Ensure(Number start, size_t n) { for (Number key = start; key <= start + n - 1;) { const Number i1 = key >> LEAF_BITS; // Check for overflow if (i1 >= ROOT_LENGTH) return false; // Make 2nd level node if necessary if (root_[i1] == NULL) { static ObjectPool<Leaf> leafPool; Leaf* leaf = (Leaf*)leafPool.New(); memset(leaf, 0, sizeof(*leaf)); root_[i1] = leaf; } // Advance key past whatever is covered by this leaf node key = ((key >> LEAF_BITS) + 1) << LEAF_BITS; } return true; } void PreallocateMoreMemory() { // Allocate enough to keep track of all possible pages Ensure(0, 1 << BITS); } };
因此在二层基数树中有一个Ensure(0函数,当需要建立某一页号与其Span之间的映射关系时,需先调用该Ensure()函数确保用于映射该页号的空间是开辟了的,若没有开辟则会立即开辟。
而在32位平台下,就算将二层基数树第二层的数组全部开辟出来也就消耗了2M的空间,内存消耗也不算太多,因此可以在构造二层基数树时就把第二层的数组全部开辟出来
三层基数树
上述一层基数树和二层基数树都适用于32位平台,而对于64位的平台就需要用三层基数树
三层基数树与二层基数树类似,三层基数树实际上就是把存储页号的若干bit位分为三次进行映射
只有当要建立某一页号的映射关系时再开辟对应的数组空间,而没有建立映射的页号就可以不用开辟其对应的数组空间,可以在一定程度上节省内存空间,因此也存在Ensure()函数
//Three-level radix tree template <int BITS> class HCMalloc_PageMap3 { private: // How many bits should we consume at each interior level static const int INTERIOR_BITS = (BITS + 2) / 3; // Round-up static const int INTERIOR_LENGTH = 1 << INTERIOR_BITS; // How many bits should we consume at leaf level static const int LEAF_BITS = BITS - 2 * INTERIOR_BITS; static const int LEAF_LENGTH = 1 << LEAF_BITS; // Interior node struct Node { Node* ptrs[INTERIOR_LENGTH]; }; // Leaf node struct Leaf { void* values[LEAF_LENGTH]; }; Node* root_; // Root of radix tree void* (*allocator_)(size_t); // Memory allocator Node* NewNode() { Node* result = reinterpret_cast<Node*>((*allocator_)(sizeof(Node))); if (result != NULL) { memset(result, 0, sizeof(*result)); } return result; } public: typedef uintptr_t Number; explicit HCMalloc_PageMap3(void* (*allocator)(size_t)) { allocator_ = allocator; root_ = NewNode(); } void* get(Number k) const { const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS); const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1); const Number i3 = k & (LEAF_LENGTH - 1); if ((k >> BITS) > 0 || root_->ptrs[i1] == NULL || root_->ptrs[i1]->ptrs[i2] == NULL) { return NULL; } return reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3]; } void set(Number k, void* v) { ASSERT(k >> BITS == 0); const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS); const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1); const Number i3 = k & (LEAF_LENGTH - 1); reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3] = v; } bool Ensure(Number start, size_t n) { for (Number key = start; key <= start + n - 1;) { const Number i1 = key >> (LEAF_BITS + INTERIOR_BITS); const Number i2 = (key >> LEAF_BITS) & (INTERIOR_LENGTH - 1); // Check for overflow if (i1 >= INTERIOR_LENGTH || i2 >= INTERIOR_LENGTH) return false; // Make 2nd level node if necessary if (root_->ptrs[i1] == NULL) { Node* n = NewNode(); if (n == NULL) return false; root_->ptrs[i1] = n; } // Make leaf node if necessary if (root_->ptrs[i1]->ptrs[i2] == NULL) { Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)(sizeof(Leaf))); if (leaf == NULL) return false; memset(leaf, 0, sizeof(*leaf)); root_->ptrs[i1]->ptrs[i2] = reinterpret_cast<Node*>(leaf); } // Advance key past whatever is covered by this leaf node key = ((key >> LEAF_BITS) + 1) << LEAF_BITS; } return true; } void PreallocateMoreMemory() { } };
8.4 代码实现
用基数树对代码进行优化,此时将PageCache类当中的unorder_map用基数树进行替换即可,由于当前是32位平台,因此这里用几层基数树都可以(本博客以两层基数树为例)
//单例模式 class PageCache { public: //... private: //std::unordered_map<PAGE_ID, Span*> _idSpanMap; HCMalloc_PageMap2<32 - PAGE_SHIFT> _idSpanMap; };
需要建立页号与span的映射时,调用基数树中的set函数
_idSpanMap.set(span->_pageId, span);
需要读取某一页号对应的Span时,调用基数树中的get函数
Span* ret = (Span*)_idSpanMap.get(id);
用于读取映射关系的MapObjectToSpan函数内部就不需要加锁了
//获取从小内存块到span的映射 Span* PageCache::MapObjectToSpan(void* obj) { PAGE_ID id = ((PAGE_ID)obj >> PAGE_SHIFT); auto ret = (Span*)_idSpanMap.get(id); assert(ret != nullptr); return ret; }
为什么读取基数树映射关系时不需要加锁?
当某个线程在读取映射关系时,可能另外一个线程正在建立其他页号的映射关系,而此时无论我们用的是C++当中的map还是unordered_map,在读取映射关系时都是需要加锁的
因为C++中map的底层数据结构是红黑树,unordered_map的底层数据结构是哈希表,而无论是红黑树还是哈希表,当在插入数据时其底层的结构都有可能会发生变化。如红黑树在插入数据时可能会引起树的旋转,而哈希表在插入数据时可能会引起哈希表扩容。此时要避免出现数据不一致的问题,就不能让插入操作和读取操作同时进行,因此在读取映射关系的时候是需要加锁的
而对于基数树来说就不一样了,基数树的空间一旦开辟好了就不会发生变化,因此无论什么时候去读取某个页的映射,都是对应在一个固定的位置进行读取的。并且不会同时对同一个页进行读取映射和建立映射的操作,因为只有在释放对象时才需要读取映射,而建立映射的操作都是在PageCache中进行的。即读取映射时读取的都是对应Span的_useCount不等于0的页,而建立映射时建立的都是对应Span的_useCount等于0的页,所以不会同时对同一个页进行读取映射和建立映射的操作
优化后对比
固定大小内存的申请和释放
申请释放不同大小的内存
九、项目源码
GG-Bruse/High-concurrency-memory-pool: Study the high concurrency memory pool implemented by Google's open source project tcmalloc (Thread-Caching Malloc) (github.com)
https://github.com/GG-Bruse/High-concurrency-memory-pool