五、完善整体项目释放流程
5.1thread cache
- 当链表的长度过长,则回收一部分内存对象到central cache。
- Size函数:记录(_freeList)挂接内存的个数–(插入和删除在加一行计算代码)
ThreadCache.h文件中的ThreadCache类
增加一个类函数ListTooLong声明
注意:
PopRange:common文件中FreeList类的函数 CentralCache.h文件中的ReleaseListToSpans函数
ListTooLong函数定义实现
void ThreadCache::ListTooLong(FreeList& list, size_t size) { void* start = nullptr; void* end = nullptr; list.PopRange(start, end, list.MaxSize()); //取消挂接。 // 将一定数量的对象释放到span跨度(CentralCache) CentralCache::GetInstance()->ReleaseListToSpans(start, size); } void PopRange(void*& start, void*& end, size_t n) { assert(n <= _size); // 取走连续的链表空间 start = _freeList; // 起始地址 end = start; // 末尾地址 for (size_t i = 0; i < n - 1; ++i) { end = NextObj(end); } _freeList = NextObj(end); // 剩余空间继续挂接 NextObj(end) = nullptr; _size -= n; }
5.2central cache
释放回来时 use_count-=1。当use_count减到0时则表示所有对象都回到了span,则将span释放回page cache,page cache中会对前后相邻的空闲页进行合并。
- PageCache.h文件中ReleaseSpanToPageCache函数(释放空闲span回到Pagecache,并合并相邻的span)
- PageCache类中的unordered_map映射函数
// 将一定数量的对象释放到span跨度(CentralCache) void CentralCache::ReleaseListToSpans(void* start, size_t size) { size_t index = SizeClass::Index(size); // 确定桶的位置 _spanLists[index]._mtx.lock(); // 上桶锁 while (start) { void* next = NextObj(start); Span* span = PageCache::GetInstance()->MapObjectToSpan(start); // 通过映射找到span的位置 NextObj(start) = span->_freeList; // 头插 span->_freeList = start; span->_useCount -= 1; // 内存块借出去个数 if (span->_useCount == 0) // 说明小块内存全回来了; 这个span就可以回收合并了 { _spanLists[index].Erase(span); span->_freeList = nullptr; span->_next = nullptr; span->_prev = nullptr; //释放span给page cache时,使用page cache的锁就可以了 //这时把桶锁解掉 _spanLists[index]._mtx.unlock(); // 解锁 PageCache::GetInstance()->_pageMtx.lock(); PageCache::GetInstance()->ReleaseSpanToPageCache(span); // 释放空闲span回到Pagecache,并合并相邻的span PageCache::GetInstance()->_pageMtx.unlock(); _spanLists[index]._mtx.lock(); // 上桶锁 } start = next; } _spanLists[index]._mtx.unlock(); // 解锁 }
// 获取从对象到span的映射 Span* PageCache::MapObjectToSpan(void* obj) { PAGE_ID id = ((PAGE_ID)obj >> PAGE_SHIFT); std::unique_lock<std::mutex> lock(_pageMtx); //RAII风格的锁,把pagecache的锁给这个类。 auto ret = _idSpanMap.find(id); if (ret != _idSpanMap.end()) { return ret->second; } else { assert(false); return nullptr; } return nullptr; }
5.3page cache
如果central cache释放回一个span,则依次寻找span的前后page id的没有在使用的空闲span,看是否可以合并,如果合并继续向前寻找。这样就可以将切小的内存合并收缩成大的span,减少内存碎片。
// 释放空闲span回到Pagecache,并合并相邻的span void PageCache::ReleaseSpanToPageCache(Span* span) { // 向前合并 while (true) { // 对span前的页尝试进行合并.缓解内存碎片(外碎片)问题 PAGE_ID prevId = span->_pageId - 1; // 前面一页 auto ret = _idSpanMap.find(prevId); // 查找是否存在 // 查找前后的相邻页尝试合并 if (ret == _idSpanMap.end()) { break; //前面的页号没有;不合并了 } Span* prevSpan = ret->second; if (prevSpan->_isUse == true) { break; // span正在使用;不能合并 } if (prevSpan->_n + span->_n > NPAGES - 1) { break; // 内存超过_spanLists数组最大存储; 超过桶的存储能力(没办法管理);不能存储了 } // 可以往前合并了 span->_pageId = prevSpan->_pageId; // 起始地址往前 span->_n += prevSpan->_n; // 页数增加了 _spanLists[prevSpan->_n].Erase(prevSpan); // 合并后,取消挂节;防止野指针出现 delete prevSpan; // 合并后;处理掉前面相邻的span } // 向后合并 while (true) { // 对span后的页尝试进行合并.缓解内存碎片(外碎片)问题 PAGE_ID backId = span->_pageId + span->_n; // 后面span的一页 auto ret = _idSpanMap.find(backId); // 查找是否存在 // 查找前后的相邻页尝试合并 if (ret == _idSpanMap.end()) { break; //后面的页号没有;不合并了 } Span* backSpan = ret->second; if (backSpan->_isUse == true) { break; // span正在使用;不能合并 } if (backSpan->_n + span->_n > NPAGES - 1) { break; // 内存超过_spanLists数组最大存储; 超过桶的存储能力(没办法管理);不能存储了 } // 可以往后合并了 span->_n += backSpan->_n; // 页数增加了 _spanLists[backSpan->_n].Erase(backSpan); // 合并后,取消挂节;防止野指针出现 delete backSpan; // 合并后;处理掉后面相邻的span } // 挂节到PageCache _spanLists[span->_n].PushFront(span); span->_isUse = false; // 记录page中头和尾内存的映射 _idSpanMap[span->_pageId] = span; _idSpanMap[span->_pageId + span->_n - 1] = span; }
用定长内存池替代new和delete提高效率
我们之前每申请/释放一个Span结点就需要new/delete一个
解决方案:把之前写的定长内存池利用上;定义在pagecache中声明。
高并发内存池-大于256KB的大块内存申请与释放问题
1)概述
- 1 内存 <= 256KB ->三层缓存(走内存池)
- 2 内存>256KB
- a. 128*8K >= size >32 * 8K -> page cacheb
- b. size > 128 * 8k ->找系统堆
申请流程
ConcurrentAlloc函数完善
static void* ConcurrentAlloc(size_t size) { if (size > MAX_BYTES) // 申请内存大于32页直接去PageCache申请内存。 { size_t alignSize = SizeClass::RoundUp(size); // 内存大小对齐数 size_t kPage = alignSize >> PAGE_SHIFT; // 算出页数 Span* span = PageCache::GetInstance()->NewSpan(kPage); span->_objSize = size; void* ptr = (void*)(span->_pageId << PAGE_SHIFT); // 页内存的起始地址 return ptr; } else { // 通过TLS 每个线程无锁的获取自己的专属的ThreadCache对象 if (pTLSThreadCache == nullptr) { static myObject< ThreadCache> tcPool; //pTLSThreadCache = new ThreadCache; pTLSThreadCache = tcPool.New(); } //cout << std::this_thread::get_id() << ":" << pTLSThreadCache << endl; return pTLSThreadCache->Allocate(size); } }
2)NewSpan函数完善
// 获取一个k页的span Span* PageCache::NewSpan(size_t k) { assert(k > 0); if (k > NPAGES - 1) // 大于128页去堆上申请 { void* ptr = SystemAlloc(k); //Span* span = new Span; Span* span = _spanPool.New(); // 定长内存池,代替new;提高效率 span->_pageId = ((PAGE_ID)ptr >> PAGE_SHIFT); span->_n = k; _idSpanMap[span->_pageId] = span; // 起始页号和span映射 return span; } // 先检查第k个桶里面有没有span if (!_spanLists[k].Empty()) { Span* kSpan= _spanLists[k].PopFront(); // 建立id和span的映射; 方便回收小块内存 for (size_t i = 0; i < kSpan->_n; ++i) { _idSpanMap[kSpan->_pageId + i] = kSpan; } return kSpan; } // 检查一下后面的桶里面有没有span, 如果有可以把他它进行切分 for (size_t i = k + 1; i < NPAGES; ++i) { if (!_spanLists[i].Empty()) { // 进行切分 Span* nSpan = _spanLists[i].PopFront(); // i:位置取出它头结点 //Span* kSpan = new Span; // 创k一个结点 Span* kSpan = _spanPool.New(); // 定长内存池,代替new;提高效率 kSpan->_pageId = nSpan->_pageId; // 页的起始位置 nSpan->_pageId += k; // 分出k页;n页位置向后走(相当与一大块内存取出头一小块内存) kSpan->_n = k; // 得到k页 nSpan->_n -= k; // 剩余空间; 挂接到nSpan->_n即(i-k)页的位置 _spanLists[nSpan->_n].PushFront(nSpan); // 存储nSpan的首位页号跟nSpan映射,方便page cache回收内存时 //进行的合并查找 _idSpanMap[nSpan->_pageId] = nSpan; _idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan; // 例如:20 ~24(5页) // 建立id和span的映射; 方便回收小块内存 for (size_t i = 0; i < kSpan->_n; ++i) { _idSpanMap[kSpan->_pageId + i] = kSpan; } return kSpan; } } //走到这个位置就说明后面没有大页的span了 //这时就去找堆要一个128页(最大页---NPAGES-1)的span //Span* bigSpan = new Span; Span* bigSpan = _spanPool.New(); // 定长内存池,代替new;提高效率 void* ptr = SystemAlloc(NPAGES - 1); bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT; // 地址除以13;就是页号 bigSpan->_n = NPAGES - 1; // 内存挂接到相应位置 _spanLists[bigSpan->_n].PushFront(bigSpan); // 函数递归调用自己;减少代码冗余(上面的空间切分)在时间稍微影响,基本上没影响 return NewSpan(k); }
3)释放流程
ConcurrentFree函数完善
我们要给一个内存块函数自己自动推演内存大小;并进行内存释放和回收。
解决方案:
a. 通过hash映射法(size与span)
b. span结构中增加一个成员变量size
我们采用b方案;在GetOneSpan函数内来添加大小
static void ConcurrentFree(void* ptr) { Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr); // 通过地址,找到span的映射位置 size_t size = span->_objSize; if (size > MAX_BYTES) { PageCache::GetInstance()->_pageMtx.lock(); PageCache::GetInstance()->ReleaseSpanToPageCache(span); // 释放空闲span回到Pagecache,并合并相邻的span PageCache::GetInstance()->_pageMtx.unlock(); } else { assert(pTLSThreadCache); pTLSThreadCache->Deallocate(ptr, size); } }
ReleaseSpanToPageCache函数完善
// 释放空闲span回到Pagecache,并合并相邻的span void PageCache::ReleaseSpanToPageCache(Span* span) { if (span->_n > NPAGES - 1) // 大于128 页直接向堆释放掉 { void* ptr = (void*)(span->_pageId << PAGE_SHIFT); //通过页号起始,算出起始页的地址 SystemFree(ptr); // 释放 // delete span; _spanPool.Delete(span); // 定长内存池代替delete;提高效率 return; } // 向前合并 while (true) { // 对span前的页尝试进行合并.缓解内存碎片(外碎片)问题 PAGE_ID prevId = span->_pageId - 1; // 前面一页 auto ret = _idSpanMap.find(prevId); // 查找是否存在 // 查找前后的相邻页尝试合并 if (ret == _idSpanMap.end()) { break; //前面的页号没有;不合并了 } Span* prevSpan = ret->second; if (prevSpan->_isUse == true) { break; // span正在使用;不能合并 } if (prevSpan->_n + span->_n > NPAGES - 1) { break; // 内存超过_spanLists数组最大存储; 超过桶的存储能力(没办法管理);不能存储了 } // 可以往前合并了 span->_pageId = prevSpan->_pageId; // 起始地址往前 span->_n += prevSpan->_n; // 页数增加了 _spanLists[prevSpan->_n].Erase(prevSpan); // 合并后,取消挂节;防止野指针出现 //delete prevSpan; // 合并后;处理掉前面相邻的span _spanPool.Delete(prevSpan); // 定长内存池代替delete;提高效率 } // 向后合并 while (true) { // 对span后的页尝试进行合并.缓解内存碎片(外碎片)问题 PAGE_ID backId = span->_pageId + span->_n; // 后面span的一页 auto ret = _idSpanMap.find(backId); // 查找是否存在 // 查找前后的相邻页尝试合并 if (ret == _idSpanMap.end()) { break; //后面的页号没有;不合并了 } Span* backSpan = ret->second; if (backSpan->_isUse == true) { break; // span正在使用;不能合并 } if (backSpan->_n + span->_n > NPAGES - 1) { break; // 内存超过_spanLists数组最大存储; 超过桶的存储能力(没办法管理);不能存储了 } // 可以往后合并了 span->_n += backSpan->_n; // 页数增加了 _spanLists[backSpan->_n].Erase(backSpan); // 合并后,取消挂节;防止野指针出现 //delete backSpan; // 合并后;处理掉后面相邻的span _spanPool.Delete(backSpan); // 定长内存池代替delete;提高效率 } // 挂节到PageCache _spanLists[span->_n].PushFront(span); span->_isUse = false; // 记录page中头和尾内存的映射 _idSpanMap[span->_pageId] = span; _idSpanMap[span->_pageId + span->_n - 1] = span; }
高并发内存池-针对性能瓶颈使用基数树进行优化
1)概述
通过性能检测,我们明显发现;内存池性能很差???
通过性能检测;我们发现我们映射中的lock(加锁)消耗性能(映射加锁是为了保证多线程安全)
三层空间
不同平台下,存储页号需要位数不同
- 32- - -19位
- 64- - -51位
一层
本质是id与span*的映射
使用tcmalloc源码中实现基数树进行优化
利用hash映射的部分全部改成基树(读写分离)
#pragma once #include"Common.h" // Single-level array template <int BITS> class TCMalloc_PageMap1 { private: static const int LENGTH = 1 << BITS; void** array_; public: typedef uintptr_t Number; //explicit TCMalloc_PageMap1(void* (*allocator)(size_t)) { explicit TCMalloc_PageMap1() { //array_ = reinterpret_cast<void**>((*allocator)(sizeof(void*) << BITS)); size_t size = sizeof(void*) << BITS; size_t alignSize = SizeClass::_RoundUp(size, 1 << PAGE_SHIFT); array_ = (void**)SystemAlloc(alignSize >> PAGE_SHIFT); memset(array_, 0, sizeof(void*) << BITS); } // Return the current value for KEY. Returns NULL if not yet set, // or if k is out of range. void* get(Number k) const { if ((k >> BITS) > 0) { return NULL; } return array_[k]; } // REQUIRES "k" is in range "[0,2^BITS-1]". // REQUIRES "k" has been ensured before. // // Sets the value 'v' for key 'k'. void set(Number k, void* v) { array_[k] = v; } }; // Two-level radix tree template <int BITS> class TCMalloc_PageMap2 { private: // Put 32 entries in the root and (2^BITS)/32 entries in each leaf. static const int ROOT_BITS = 5; static const int ROOT_LENGTH = 1 << ROOT_BITS; static const int LEAF_BITS = BITS - ROOT_BITS; static const int LEAF_LENGTH = 1 << LEAF_BITS; // Leaf node struct Leaf { void* values[LEAF_LENGTH]; }; Leaf* root_[ROOT_LENGTH]; // Pointers to 32 child nodes void* (*allocator_)(size_t); // Memory allocator public: typedef uintptr_t Number; //explicit TCMalloc_PageMap2(void* (*allocator)(size_t)) { explicit TCMalloc_PageMap2() { //allocator_ = allocator; memset(root_, 0, sizeof(root_)); PreallocateMoreMemory(); } void* get(Number k) const { const Number i1 = k >> LEAF_BITS; const Number i2 = k & (LEAF_LENGTH - 1); if ((k >> BITS) > 0 || root_[i1] == NULL) { return NULL; } return root_[i1]->values[i2]; } void set(Number k, void* v) { const Number i1 = k >> LEAF_BITS; const Number i2 = k & (LEAF_LENGTH - 1); assert(i1 < ROOT_LENGTH); root_[i1]->values[i2] = v; } bool Ensure(Number start, size_t n) { for (Number key = start; key <= start + n - 1;) { const Number i1 = key >> LEAF_BITS; // Check for overflow if (i1 >= ROOT_LENGTH) return false; // Make 2nd level node if necessary if (root_[i1] == NULL) { //Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)(sizeof(Leaf))); //if (leaf == NULL) return false; static myObject<Leaf> leafPool; Leaf* leaf = (Leaf*)leafPool.New(); memset(leaf, 0, sizeof(*leaf)); root_[i1] = leaf; } // Advance key past whatever is covered by this leaf node key = ((key >> LEAF_BITS) + 1) << LEAF_BITS; } return true; } void PreallocateMoreMemory() { // Allocate enough to keep track of all possible pages Ensure(0, 1 << BITS); } }; // Three-level radix tree template <int BITS> class TCMalloc_PageMap3 { private: // How many bits should we consume at each interior level static const int INTERIOR_BITS = (BITS + 2) / 3; // Round-up static const int INTERIOR_LENGTH = 1 << INTERIOR_BITS; // How many bits should we consume at leaf level static const int LEAF_BITS = BITS - 2 * INTERIOR_BITS; static const int LEAF_LENGTH = 1 << LEAF_BITS; // Interior node struct Node { Node* ptrs[INTERIOR_LENGTH]; }; // Leaf node struct Leaf { void* values[LEAF_LENGTH]; }; Node* root_; // Root of radix tree void* (*allocator_)(size_t); // Memory allocator Node* NewNode() { Node* result = reinterpret_cast<Node*>((*allocator_)(sizeof(Node))); if (result != NULL) { memset(result, 0, sizeof(*result)); } return result; } public: typedef uintptr_t Number; explicit TCMalloc_PageMap3(void* (*allocator)(size_t)) { allocator_ = allocator; root_ = NewNode(); } void* get(Number k) const { const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS); const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1); const Number i3 = k & (LEAF_LENGTH - 1); if ((k >> BITS) > 0 || root_->ptrs[i1] == NULL || root_->ptrs[i1]->ptrs[i2] == NULL) { return NULL; } return reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3]; } void set(Number k, void* v) { ASSERT(k >> BITS == 0); const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS); const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1); const Number i3 = k & (LEAF_LENGTH - 1); reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3] = v; } bool Ensure(Number start, size_t n) { for (Number key = start; key <= start + n - 1;) { const Number i1 = key >> (LEAF_BITS + INTERIOR_BITS); const Number i2 = (key >> LEAF_BITS) & (INTERIOR_LENGTH - 1); // Check for overflow if (i1 >= INTERIOR_LENGTH || i2 >= INTERIOR_LENGTH) return false; // Make 2nd level node if necessary if (root_->ptrs[i1] == NULL) { Node* n = NewNode(); if (n == NULL) return false; root_->ptrs[i1] = n; } // Make leaf node if necessary if (root_->ptrs[i1]->ptrs[i2] == NULL) { Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)(sizeof(Leaf))); if (leaf == NULL) return false; memset(leaf, 0, sizeof(*leaf)); root_->ptrs[i1]->ptrs[i2] = reinterpret_cast<Node*>(leaf); } // Advance key past whatever is covered by this leaf node key = ((key >> LEAF_BITS) + 1) << LEAF_BITS; } return true; } void PreallocateMoreMemory() { } };
基树与hash对比
1、只有在这两个函数中回去建立id和span的映射,也就是说回去写
2、基数树,写之前会提前开好空间,写数据过程中,不会动结构。
3、读写是分离的。线程1对一个位置读写的时候,线程2不可能对这个位置读写
5.4框架代码和代码总结
common.h共享文件
#pragma once #include <iostream> #include <vector> #include <thread> #include <algorithm> #include <mutex> #include <unordered_map> #include <time.h> #include <assert.h> using std::cout; using std::endl; static const size_t MAX_BYTES = 256 * 1024; // 一页8kb; 32页 static const size_t NFREELIST = 208; // central and thread总共桶的个数 static const size_t NPAGES = 129; // page总共桶的个数; 数组下标从1开始总共128页 static const size_t PAGE_SHIFT = 13; // 一页字节数2^13 // 不同平台下页数不同;可能导致存页数变量范围不够 #ifdef _WIN64 typedef unsigned long long PAGE_ID; #elif _WIN32 typedef size_t PAGE_ID; #else // Linux #endif #ifdef _WIN32 #include<windows.h> #else // linux #endif // 直接去堆上按页申请空间 inline static void* SystemAlloc(size_t kpage) { #ifdef _WIN32 void* ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE); #else // linux下brk mmap等 #endif if (ptr == nullptr) throw std::bad_alloc(); return ptr; } // 释放内存 inline static void SystemFree(void* ptr) { #ifdef _WIN32 VirtualFree(ptr, 0, MEM_RELEASE); #else // sbrk unmmap等 #endif } static void*& NextObj(void* obj) // 前四个或八个字节(下一个结点的地址) { return *(void**)obj; } // 管理小对象的自由链表 class FreeList { public: void Push(void* obj) { // 头插 NextObj(obj) = _freeList; _freeList = obj; ++_size; } void* Pop() { assert(_freeList); // 头删 void* obj = _freeList; _freeList = NextObj(obj); --_size; return obj; } void PushRange(void* start, void* end, size_t n) { // 一连串空间存储 NextObj(end) = _freeList; _freeList = start; _size += n; } void PopRange(void*& start, void*& end, size_t n) { assert(n <= _size); // 取走连续的链表空间 start = _freeList; // 起始地址 end = start; // 末尾地址 for (size_t i = 0; i < n - 1; ++i) { end = NextObj(end); } _freeList = NextObj(end); // 剩余空间继续挂接 NextObj(end) = nullptr; _size -= n; } bool Empty() { return _freeList == nullptr; } size_t& MaxSize() { return _maxSize; } size_t Size() { return _size; } private: void* _freeList = nullptr; size_t _maxSize = 1; // 表示向CentrealCache要空间数量的上限 size_t _size = 0; // 记录(_freeList)挂接内存的个数 }; // 计算对象大小的对齐映射规则 struct SizeClass { // 整体控制在最多10%左右的内碎片浪费 // [1,128] 8byte对齐 freelist[0,16) // [128+1,1024] 16byte对齐 freelist[16,72) // [1024+1,8*1024] 128byte对齐 freelist[72,128) // [8*1024+1,64*1024] 1024byte对齐 freelist[128,184) // [64*1024+1,256*1024] 8*1024byte对齐 freelist[184,208) static inline size_t _RoundUp(size_t bytes, size_t alignNum) // 算出桶的大小(即size的对齐数) { return ((bytes + alignNum - 1) & ~(alignNum - 1)); } // 对齐大小计算 static inline size_t RoundUp(size_t size) { // 确定每个区间的对齐数 if (size <= 128) { return _RoundUp(size, 8); } else if (size <= 1024) { return _RoundUp(size, 16); } else if (size <= 8 * 1024) { return _RoundUp(size, 128); } else if (size <= 64 * 1024) { return _RoundUp(size, 1024); } else if (size <= 256 * 1024) { return _RoundUp(size, 8 * 1024); } else { return _RoundUp(size, 8 * 1024); } } static inline size_t _Index(size_t bytes, size_t align_shift) { return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1; // 桶下标从0开始计数 } // 计算映射的哪一个自由链表桶 static inline size_t Index(size_t bytes) { assert(bytes <= MAX_BYTES); // 每个区间有多少个链 static int group_array[4] = { 16, 56, 56, 56 }; if (bytes <= 128) { return _Index(bytes, 3); } else if (bytes <= 1024) { return _Index(bytes - 128, 4) + group_array[0]; } else if (bytes <= 8 * 1024) { return _Index(bytes - 1024, 7) + group_array[1] + group_array[0]; } else if (bytes <= 64 * 1024) { return _Index(bytes - 8 * 1024, 10) + group_array[2] + group_array[1] + group_array[0]; } else if (bytes <= 256 * 1024) { return _Index(bytes - 64 * 1024, 13) + group_array[3] + group_array[2] + group_array[1] + group_array[0]; } else { assert(false); } return -1; } // 一次从中心缓存获取多少个 static size_t NumMoveSize(size_t size) { assert(size > 0); // [2, 512],一次批量移动多少个对象的(慢启动)上限值 // 小对象一次批量上限高 // 小对象一次批量上限低 int num = MAX_BYTES / size; if (num < 2) num = 2; if (num > 512) num = 512; return num; } // 计算一次向系统获取几个页 // 单个对象 8byte // ... // 单个对象 256KB static size_t NumMovePage(size_t size) { size_t num = NumMoveSize(size); // 要几页 size_t npage = num * size; //总字节数 npage >>= PAGE_SHIFT; // 计算多少页,一页8k(2^13);则PAGE_SHIFT为13 if (npage == 0) npage = 1; return npage; } }; // Span管理一个跨度的大块内存 // 管理以页为单位的大块内存 struct Span { PAGE_ID _pageId = 0; // 页号 size_t _n = 0; // 页数 // 双向链表的结构 Span* _next = nullptr; Span* _prev = nullptr; size_t _objSize = 0; // 切好的小对象大小 size_t _useCount = 0; // 使用计数,切好的小块内存分配给ThreadCache void* _freeList = nullptr; // 大块内存切小链接起来,这样回收回来的内存也方便链接 bool _isUse = false; // 是否在被使用 }; // 带头双向循环链表 class SpanList { public: SpanList() { _head = new Span; _head->_next = _head; _head->_prev = _head; } Span* Begin() { return _head->_next; } Span* End() { return _head; } bool Empty() { return _head->_next == _head; } void PushFront(Span* span) { Insert(Begin(), span); } Span* PopFront() { Span* front = _head->_next; Erase(front); return front; } void Insert(Span* pos, Span* newSpan) { assert(pos); assert(newSpan); // 头插 Span* prev = pos->_prev; prev->_next = newSpan; newSpan->_prev = prev; newSpan->_next = pos; pos->_prev = newSpan; } void Erase(Span* pos) { assert(pos); assert(pos != _head); Span* prev = pos->_prev; Span* next = pos->_next; prev->_next = next; next->_prev = prev; } private: Span* _head; public: std::mutex _mtx; // 桶锁: 数组_spanLists中每一个位置的桶都有一把锁 };
六、高并发内存池三大框架代码
6.1threadcache框架(声明)
#pragma once #include "common.h" // thread cache本质是由一个哈希映射的对象自由链表构成 class ThreadCache { public: // 申请和释放内存对象 void* Allocate(size_t size); void Deallocate(void* ptr, size_t size); // 从中心缓存获取对象 void* FetchFromCentralCache(size_t index, size_t size); // 释放对象时,链表过长时,回收内存回到中心缓存 void ListTooLong(FreeList& list, size_t size); private: FreeList _freeLists[NFREELIST]; }; // TLS thread local storage static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;
6.2CentralCache框架(声明)
#pragma once #include "common.h" // 单例模式---饿汉模式 class CentralCache { public: static CentralCache* GetInstance() { return &_sInst; } // 获取一个非空的span Span* GetOneSpan(SpanList& list, size_t byte_size); // list: 表示_spanLists中span的_freeList连续挂接空间的数量 // 从中心缓存获取一定数量的对象给thread cache size_t FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size); // size:内存空间大小;batchNum:内存空间个数 // start 和 end 表示内存空间地址起始到终末范围 // 将一定数量的对象释放到span跨度 void ReleaseListToSpans(void* start, size_t size); private: SpanList _spanLists[NFREELIST]; private: CentralCache() {}; CentralCache(const CentralCache& t) = delete; static CentralCache _sInst; };
6.3pagecache框架(声明)
#pragma once #include "common.h" #include "ObjectPool.h" #include "PageMap.h" class PageCache { public: static PageCache* GetInstance() { return &_sInst; } // 获取一个k页的span Span* NewSpan(size_t k); // 获取从对象到span的映射 Span* MapObjectToSpan(void* obj); // 释放空闲span回到Pagecache,并合并相邻的span void ReleaseSpanToPageCache(Span* span); std::mutex _pageMtx; private: SpanList _spanLists[NPAGES]; PageCache() {}; PageCache(const PageCache& t) = delete; myObject<Span> _spanPool; //std::unordered_map<PAGE_ID, Span*> _idSpanMap; TCMalloc_PageMap2<32 - PAGE_SHIFT>_idSpanMap; // 基树 static PageCache _sInst; };
6.4threadcache框架(定义)
#pragma once #include "ThreadCache.h" #include "CentralCache.h" // 慢开始反馈调节算法 // 1、最开始不会一次向central cache一次批量要太多,因为要太多了可能用不完 // 2、如果你不要这个size大小内存需求,那么batchNum就会不断增长,直到上限 // 3、size越大,一次向central cache要的batchNum就越小 // 4、size越小,一次向central cache要的batchNum就越大 void* ThreadCache::FetchFromCentralCache(size_t index, size_t size) { // 慢开始反馈调节法 不会开始要太多内存 size_t batchNum = min(_freeLists[index].MaxSize(), SizeClass::NumMoveSize(size)); if (batchNum == _freeLists[index].MaxSize()) { _freeLists[index].MaxSize() += 1; } void* start = nullptr; void* end = nullptr; size_t actualNum = CentralCache::GetInstance()->FetchRangeObj(start, end, batchNum, size); // 获取连续实际内存的数目 assert(actualNum >= 1); // 至少一个 if (actualNum == 1) { assert(start == end); } else { _freeLists[index].PushRange(NextObj(start), end, actualNum - 1); } return start; } // 申请内存对象, 空间先到_freeLists链表中去取,如果没有去中心缓存获取空间 void* ThreadCache::Allocate(size_t size) { assert(size <= MAX_BYTES); size_t alignSize = SizeClass::RoundUp(size); size_t index = SizeClass::Index(size); if (!_freeLists[index].Empty()) { return _freeLists[index].Pop(); } else { return FetchFromCentralCache(index, alignSize); } } // 释放内存对象,空间挂起到_freeLists链表中去 void ThreadCache::Deallocate(void* ptr, size_t size) { assert(ptr); assert(size <= MAX_BYTES); // 找对映射的自由链表桶,对象插入进入 size_t index = SizeClass::Index(size); _freeLists[index].Push(ptr); //当链表长度大于一次批量申请的内存时就开始还一段list给central cache if (_freeLists[index].Size() >= _freeLists[index].MaxSize()) { // 释放对象时,链表过长时,回收内存回到中心缓存 ListTooLong(_freeLists[index], size); } } void ThreadCache::ListTooLong(FreeList& list, size_t size) { void* start = nullptr; void* end = nullptr; list.PopRange(start, end, list.MaxSize()); //取消挂接。 // 将一定数量的对象释放到span跨度(CentralCache) CentralCache::GetInstance()->ReleaseListToSpans(start, size); }
6.5CentralCache框架(定义)
#pragma once #include"CentralCache.h" #include"PageCache.h" CentralCache CentralCache::_sInst; //定义 // 获取一个非空的span Span* CentralCache::GetOneSpan(SpanList& list, size_t byte_size) { // 先从桶的span中找内存(查看当前的spanlist中是否有还有未分配对象的span) Span* it = list.Begin(); while (it != list.End()) { if (it->_freeList != nullptr) { return it; } else { it = it->_next; } } // 先把central 桶锁解除:这样如果某他线程释放内存对象回来,不会阻塞 list._mtx.unlock(); // 访问page时 需要加锁 PageCache::GetInstance()->_pageMtx.lock(); // 没有空闲的span去PageCache中要 Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(byte_size)); // 获取一个k页的span span->_isUse = true; // 内存span已经使用了 span->_objSize = byte_size; // 切分内存大小记录下来;便于改良回收 PageCache::GetInstance()->_pageMtx.unlock(); //对获取span进行切分,不需要加锁,因为其他线程拿不到这个span char* start = (char*)(span->_pageId << PAGE_SHIFT); // k页的span的起始地址 size_t bytes = span->_n << PAGE_SHIFT; // 内存字节数 char* end = start + bytes; // 把大块内存切成自由链表链表 // 1. 先切一块作头,然后尾插(这样保证物理空间顺序与逻辑顺序保持一致) span->_freeList = start; start += byte_size; void* tail = span->_freeList; while (start < end) { NextObj(tail) = start; tail = NextObj(tail); start += byte_size; } NextObj(tail) = nullptr; // 最后一个内存块下个内存地址置空 // 挂节的时候恢复cengtral的锁 list._mtx.lock(); list.PushFront(span); return span; } // 从中心缓存获取一定数量的对象给thread cache size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size) { // 每个桶加锁 size_t index = SizeClass::Index(size); _spanLists[index]._mtx.lock(); Span* span = GetOneSpan(_spanLists[index], size); assert(span); assert(span->_freeList); // 从span中获取batchNum个对象 // 如果不够batchNum个,有多少拿多少 start = span->_freeList; end = start; size_t i = 0, actualNum = 1; while (i < batchNum - 1 && NextObj(end) != nullptr) // 取连续内存块,当batchNum超过原有时,按拥有的数量给 { end = NextObj(end); ++i; ++actualNum; } span->_freeList = NextObj(end); // 剩余空间仍挂在Span中的_freeList去 NextObj(end) = nullptr; // 最后end记录下一个空间地址置空 span->_useCount += actualNum; _spanLists[index]._mtx.unlock(); // 解锁 return actualNum; } void CentralCache::ReleaseListToSpans(void* start, size_t size) { size_t index = SizeClass::Index(size); // 确定桶的位置 _spanLists[index]._mtx.lock(); // 上桶锁 while (start) { void* next = NextObj(start); Span* span = PageCache::GetInstance()->MapObjectToSpan(start); // 通过映射找到span的位置 NextObj(start) = span->_freeList; // 头插 span->_freeList = start; span->_useCount -= 1; // 内存块借出去个数 if (span->_useCount == 0) // 说明小块内存全回来了; 这个span就可以回收合并了 { _spanLists[index].Erase(span); span->_freeList = nullptr; span->_next = nullptr; span->_prev = nullptr; //释放span给page cache时,使用page cache的锁就可以了 //这时把桶锁解掉 _spanLists[index]._mtx.unlock(); // 解锁 PageCache::GetInstance()->_pageMtx.lock(); PageCache::GetInstance()->ReleaseSpanToPageCache(span); // 释放空闲span回到Pagecache,并合并相邻的span PageCache::GetInstance()->_pageMtx.unlock(); _spanLists[index]._mtx.lock(); // 上桶锁 } start = next; } _spanLists[index]._mtx.unlock(); // 解锁 }
6.6pagecache框架(定义)
#pragma once #include "PageCache.h" PageCache PageCache::_sInst; // 获取一个k页的span Span* PageCache::NewSpan(size_t k) { assert(k > 0); if (k > NPAGES - 1) // 大于128页去堆上申请 { void* ptr = SystemAlloc(k); //Span* span = new Span; Span* span = _spanPool.New(); // 定长内存池,代替new;提高效率 span->_pageId = ((PAGE_ID)ptr >> PAGE_SHIFT); span->_n = k; //_idSpanMap[span->_pageId] = span; // 起始页号和span映射 _idSpanMap.set(span->_pageId, span); return span; } // 先检查第k个桶里面有没有span if (!_spanLists[k].Empty()) { Span* kSpan= _spanLists[k].PopFront(); // 建立id和span的映射; 方便回收小块内存 for (size_t i = 0; i < kSpan->_n; ++i) { //_idSpanMap[kSpan->_pageId + i] = kSpan; _idSpanMap.set(kSpan->_pageId + i, kSpan); } return kSpan; } // 检查一下后面的桶里面有没有span, 如果有可以把他它进行切分 for (size_t i = k + 1; i < NPAGES; ++i) { if (!_spanLists[i].Empty()) { // 进行切分 Span* nSpan = _spanLists[i].PopFront(); // i:位置取出它头结点 //Span* kSpan = new Span; // 创k一个结点 Span* kSpan = _spanPool.New(); // 定长内存池,代替new;提高效率 kSpan->_pageId = nSpan->_pageId; // 页的起始位置 nSpan->_pageId += k; // 分出k页;n页位置向后走(相当与一大块内存取出头一小块内存) kSpan->_n = k; // 得到k页 nSpan->_n -= k; // 剩余空间; 挂接到nSpan->_n即(i-k)页的位置 _spanLists[nSpan->_n].PushFront(nSpan); // 存储nSpan的首位页号跟nSpan映射,方便page cache回收内存时 //进行的合并查找 //_idSpanMap[nSpan->_pageId] = nSpan; _idSpanMap.set(nSpan->_pageId, nSpan); //_idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan; // 例如:20 ~24(5页) _idSpanMap.set(nSpan->_pageId + nSpan->_n - 1, nSpan); // 建立id和span的映射; 方便回收小块内存 for (size_t i = 0; i < kSpan->_n; ++i) { //_idSpanMap[kSpan->_pageId + i] = kSpan; _idSpanMap.set(kSpan->_pageId + i, kSpan); } return kSpan; } } //走到这个位置就说明后面没有大页的span了 //这时就去找堆要一个128页(最大页---NPAGES-1)的span //Span* bigSpan = new Span; Span* bigSpan = _spanPool.New(); // 定长内存池,代替new;提高效率 void* ptr = SystemAlloc(NPAGES - 1); bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT; // 地址除以13;就是页号 bigSpan->_n = NPAGES - 1; // 内存挂接到相应位置 _spanLists[bigSpan->_n].PushFront(bigSpan); // 函数递归调用自己;减少代码冗余(上面的空间切分)在时间稍微影响,基本上没影响 return NewSpan(k); } // 获取从对象到span的映射 Span* PageCache::MapObjectToSpan(void* obj) { PAGE_ID id = ((PAGE_ID)obj >> PAGE_SHIFT); //std::unique_lock<std::mutex> lock(_pageMtx); //RAII风格的锁,把pagecache的锁给这个类。 //auto ret = _idSpanMap.find(id); //if (ret != _idSpanMap.end()) //{ // return ret->second; //} //else //{ // assert(false); // return nullptr; //} Span* ret = (Span*)_idSpanMap.get(id); assert(ret != nullptr); return ret; } // 释放空闲span回到Pagecache,并合并相邻的span void PageCache::ReleaseSpanToPageCache(Span* span) { if (span->_n > NPAGES - 1) // 大于128 页直接向堆释放掉 { void* ptr = (void*)(span->_pageId << PAGE_SHIFT); //通过页号起始,算出起始页的地址 SystemFree(ptr); // 释放 // delete span; _spanPool.Delete(span); // 定长内存池代替delete;提高效率 return; } // 向前合并 while (true) { // 对span前的页尝试进行合并.缓解内存碎片(外碎片)问题 PAGE_ID prevId = span->_pageId - 1; // 前面一页 //auto ret = _idSpanMap.find(prevId); // 查找是否存在 // 查找前后的相邻页尝试合并 //if (ret == _idSpanMap.end()) //{ // break; //前面的页号没有;不合并了 //} Span* ret = (Span*)_idSpanMap.get(prevId); if (ret == nullptr) { break; //前面的页号没有;不合并了 } //Span* prevSpan = ret->second; Span* prevSpan = ret; if (prevSpan->_isUse == true) { break; // span正在使用;不能合并 } if (prevSpan->_n + span->_n > NPAGES - 1) { break; // 内存超过_spanLists数组最大存储; 超过桶的存储能力(没办法管理);不能存储了 } // 可以往前合并了 span->_pageId = prevSpan->_pageId; // 起始地址往前 span->_n += prevSpan->_n; // 页数增加了 _spanLists[prevSpan->_n].Erase(prevSpan); // 合并后,取消挂节;防止野指针出现 //delete prevSpan; // 合并后;处理掉前面相邻的span _spanPool.Delete(prevSpan); // 定长内存池代替delete;提高效率 } // 向后合并 while (true) { // 对span后的页尝试进行合并.缓解内存碎片(外碎片)问题 PAGE_ID backId = span->_pageId + span->_n; // 后面span的一页 //auto ret = _idSpanMap.find(backId); // 查找是否存在 // 查找前后的相邻页尝试合并 //if (ret == _idSpanMap.end()) //{ // break; //后面的页号没有;不合并了 //} Span* ret = (Span*)_idSpanMap.get(backId); if (ret == nullptr) { break; //后面的页号没有;不合并了 } Span* backSpan = ret; if (backSpan->_isUse == true) { break; // span正在使用;不能合并 } if (backSpan->_n + span->_n > NPAGES - 1) { break; // 内存超过_spanLists数组最大存储; 超过桶的存储能力(没办法管理);不能存储了 } // 可以往后合并了 span->_n += backSpan->_n; // 页数增加了 _spanLists[backSpan->_n].Erase(backSpan); // 合并后,取消挂节;防止野指针出现 //delete backSpan; // 合并后;处理掉后面相邻的span _spanPool.Delete(backSpan); // 定长内存池代替delete;提高效率 } // 挂节到PageCache _spanLists[span->_n].PushFront(span); span->_isUse = false; // 记录page中头和尾内存的映射 //_idSpanMap[span->_pageId] = span; //_idSpanMap[span->_pageId + span->_n - 1] = span; _idSpanMap.set(span->_pageId, span); _idSpanMap.set(span->_pageId + span->_n - 1, span); }
定长内存池
(ObjectPool.h文件)
作用替代高并发内存池中的new和delete
#pragma once #include"common.h" template <class T> class myObject { public: T* New() { // 优先把还回来内存块对象,再次重复利用 T* obj = nullptr; if (_freeList) { // 头删 void* next= *(void**)_freeList; obj = (T*)_freeList; _freeList = next; } else { // 剩余内存不够一个对象大小时,则重新开大块空间 if (_remainBytes < sizeof(T)) { _remainBytes = 1024 * 128; //_memory = (char*)malloc(_remainBytes); _memory = (char*)SystemAlloc(_remainBytes >> 13); // 按页申请内存 if (_memory == nullptr) { throw std::bad_alloc(); } } obj = (T*)_memory; size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T); //申请内存空间必须大于或等于一个指针的空间 _memory += objSize; _remainBytes -= objSize; } // 定位new,显示调用T的构造函数初始化 new(obj)T; return obj; } void Delete(T* obj) { // 显示调用析构函数清理对象 obj->~T(); // 头插 *(void**)obj = _freeList; // 取前四或八个字节 _freeList = obj; } private: char* _memory = nullptr; // 指向大块内存的指针 size_t _remainBytes = 0; // 大块内存分割过程中剩余的字节数 void* _freeList = nullptr; // 还回来的内存块用自由链接表的头指针链接起来 };
线程局部存储
- 1 内存 <= 256KB ->三层缓存(走内存池)
- 2 内存>256KB
- a. 128*8K >= size >32 * 8K -> page cacheb
- b. size > 128 * 8k ->找系统堆
(ConcurrentAlloc.h)文件
#pragma once #include "Common.h" #include "ThreadCache.h" #include "PageCache.h" #include "ObjectPool.h" static void* ConcurrentAlloc(size_t size) { if (size > MAX_BYTES) // 申请内存大于32页直接去PageCache申请内存。 { size_t alignSize = SizeClass::RoundUp(size); // 内存大小对齐数 size_t kPage = alignSize >> PAGE_SHIFT; // 算出页数 Span* span = PageCache::GetInstance()->NewSpan(kPage); span->_objSize = size; void* ptr = (void*)(span->_pageId << PAGE_SHIFT); // 页内存的起始地址 return ptr; } else { // 通过TLS 每个线程无锁的获取自己的专属的ThreadCache对象 if (pTLSThreadCache == nullptr) { static myObject< ThreadCache> tcPool; //pTLSThreadCache = new ThreadCache; pTLSThreadCache = tcPool.New(); } //cout << std::this_thread::get_id() << ":" << pTLSThreadCache << endl; return pTLSThreadCache->Allocate(size); } } static void ConcurrentFree(void* ptr) { Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr); // 通过地址,找到span的映射位置 size_t size = span->_objSize; if (size > MAX_BYTES) { PageCache::GetInstance()->_pageMtx.lock(); PageCache::GetInstance()->ReleaseSpanToPageCache(span); // 释放空闲span回到Pagecache,并合并相邻的span PageCache::GetInstance()->_pageMtx.unlock(); } else { assert(pTLSThreadCache); pTLSThreadCache->Deallocate(ptr, size); } }
测试性能(系统的malloc与我们写的高并发内存池)
(Benchmark.cpp文件)
注意:
- 测试代码中出现了lamda
- 测试代码借鉴某大佬的(^ v ^)
#include "ConcurrentAlloc.h" // nworks:创建线程的次数 // rounds:经历多少轮次 // ntimes:一轮申请释放的次数 void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds) { std::vector<std::thread> vthread(nworks); std::atomic<size_t> malloc_costtime = 0; std::atomic<size_t> free_costtime = 0; for (size_t k = 0; k < nworks; ++k) { vthread[k] = std::thread([&, k]() { std::vector<void*> v; v.reserve(ntimes); for (size_t j = 0; j < rounds; ++j) { size_t begin1 = clock(); for (size_t i = 0; i < ntimes; i++) { //v.push_back(malloc(16)); v.push_back(malloc((16 + i) % 8192 + 1)); } size_t end1 = clock(); size_t begin2 = clock(); for (size_t i = 0; i < ntimes; i++) { free(v[i]); } size_t end2 = clock(); v.clear(); malloc_costtime += (end1 - begin1); free_costtime += (end2 - begin2); } }); } for (auto& t : vthread) { t.join(); } cout << nworks << "个线程并发执行" << rounds << "轮次,每轮次malloc" << ntimes << "次: 花费:" << malloc_costtime << "ms" << endl; cout << nworks << "个线程并发执行" << rounds << "轮次,每轮次malloc" << ntimes << "次: 花费:" << free_costtime << "ms" << endl; cout << nworks << "个线程并发malloc & free" << nworks * rounds * ntimes << "次,总计花费:" << malloc_costtime + free_costtime << "ms" << endl; } // 单轮次申请释放次数 线程数 轮次 void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds) { std::vector<std::thread> vthread(nworks); std::atomic<size_t> malloc_costtime = 0; std::atomic<size_t> free_costtime = 0; for (size_t k = 0; k < nworks; ++k) { vthread[k] = std::thread([&]() { std::vector<void*> v; v.reserve(ntimes); for (size_t j = 0; j < rounds; ++j) { size_t begin1 = clock(); for (size_t i = 0; i < ntimes; i++) { //v.push_back(ConcurrentAlloc(16)); v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1)); } size_t end1 = clock(); size_t begin2 = clock(); for (size_t i = 0; i < ntimes; i++) { ConcurrentFree(v[i]); } size_t end2 = clock(); v.clear(); malloc_costtime += (end1 - begin1); free_costtime += (end2 - begin2); } }); } for (auto& t : vthread) { t.join(); } cout << nworks << "个线程并发执行" << rounds << "轮次,每轮次malloc" << ntimes << "次: 花费:" << malloc_costtime << "ms" << endl; cout << nworks << "个线程并发执行" << rounds << "轮次,每轮次malloc" << ntimes << "次: 花费:" << free_costtime << "ms" << endl; cout << nworks << "个线程并发malloc & free" << nworks * rounds * ntimes << "次,总计花费:" << malloc_costtime + free_costtime << "ms" << endl; } int main() { size_t n = 1000; cout << "==========================================================" << endl; BenchmarkConcurrentMalloc(n, 4, 10); // 自己写的内存池 cout << endl << endl; BenchmarkMalloc(n, 4, 10); // 系统malloc cout << "==========================================================" << endl; return 0; }
七、性能测试结果图
明显效率大大滴提高了
当前项目实现的不足
不同平台替换方式不同。 基于unix的系统上的glibc,使用了weak alias的方式替换。具体来说是因为这些入口函数都被定义成了weak symbols,再加上gcc支持
alias attribute,所以替换就变成了这种通用形式。