一. PageCache介绍
页缓存(PageCache)是在中心缓存(CentralCache)下面的一层缓存机制,它存储的是以页为单位的未切分大块跨度内存。中心缓存没有Span对象时,从PageCache分配出一定数量页(一页是8KB)的Span,把它们切割成定长大小的小块内存,分配给中心缓存。当中心缓存中一个Span的所有小块定长内存都回收以后,PageCache会回收中心缓存的Span对象,拿回来合并相邻页,组成更大的页,以缓解内存碎片的问题。
二. PageCache基本框架
下面是PageCache的基本框架,其核心成员是SpanList对象组成的哈希桶:
// 饿汉的单例模式 class PageCache { public: // 返回PageCache的单例对象 static PageCache* GetInstance() { return &_sInst; } // 返回给CentralCache一个k页的Span Span* NewSpan(size_t k); // 获取表锁 std::mutex& GetPageMutex(); private: std::mutex _pageMtx;// 表锁 SpanList _spanLists[NPAGES];// 存储未切分的span private: PageCache() {} PageCache(const PageCache&) = delete; static PageCache _sInst; };
虽然CentralCache和PageCache的核心结构都是SpanList对象组成的哈希桶,但是它们是有区别的:
CentralCache中的哈希桶,跟ThreadCache一样是按照对象大小对齐后映射小块定长内存大小的,它的每一个SpanList中挂的Span都按映射的值切成一个个小块定长内存存储在自由链表中。
PageCache中的SpanList是按照Span存储的页的数量直接定址映射的,也就是说第i号桶中挂的Span都存储有i页的连续内存空间,且这些连续页空间是未被切分成小块定长内存的。
我们规定PageCache中的Span对象最多存储128页的连续内存。因为下标从0开始,所以我们创建129个桶,这样桶的下标和页数就是完全一对一的直接映射:
// 把这个写到Common.h中 static const size_t NPAGES = 129;// PageCache可申请的最大页数
另外和CentralCache不同的是,PageCache不再使用SpanList中定义的桶锁,而是在自己类中封装一个针对其成员PageCache::_spanLists[NPAGES]整个哈希表的表锁,这样设计的原因从两个不同的角度来看:
首先CentralCache为什么使用桶锁呢?因为每一次不论是CentralCache向上供给ThreadCache一批小块定长内存,还是向下(PageCache)申请一个特定页大小的Span对象都只需访问CentralCache特定的一个桶就够了,这个时候如果用表锁,反而会降低效率。
然后是PageCache,现在它的功能是拨给CentralCache一个特定数量页的Span对象,比如CentralCache向PageCache申请4页的Span对象,PageCache发现自己的4号桶后面没有挂Span,则向后面的桶中去寻找更大页的Span,假设在10号桶位置找到一个Span,则将链表中的第一个10页的Span分裂为一个4页Span和一个6页Span,4页的Span对象返回给CentralCache,另外一个6页的Span挂到PageCache自己的6号桶中。即我们对PageCache进行一次操作不是仅仅使用它的一个桶,这个时候如果使用桶锁就需要频繁地申请、释放锁,这样做固然不会有线程安全的问题,但是对性能有消耗,不如干脆定义一个表锁更好。
初步设计的PageCache只有三个成员函数:
PageCache::GetInstance():获取进程中唯一的PageCache的单例对象。
PageCache::GetPageMutex():获取PageCache的表锁。
PageCache::NewSpan(size_t k):返回给CentralCache一个k页的Span对象。
三. 其它接口和数据的补充
1. 直接向堆申请页为单位的大块内存
各个系统通过系统调用直接向堆申请内存空间的方法:
Windows:VirtualAlloc是一个Windows API函数,该函数的功能是在调用进程的虚地址空间,预定或者提交一部分页。
Linuxs:使用brk、mmap。
针对所有平台,我们统一封装一个SystemAlloc()函数放到Common.h中,该函数只需传入需要向堆申请的页的数量即可,内部完成后会返回申请到的大块连续页内存的起始地址:
// PS:目前下面只写了Windows平台的 // Linux平台的后面在完善 // 条件编译不同系统的系统头文件 #ifdef _WIN32 #include <windows.h> #else // Linux #endif // 条件编译不同系统到堆上以页为单位申请空间 inline static void* SystemAlloc(size_t kpage) { #ifdef _WIN32 void* ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE); #else // linux下brk mmap等 #endif if (ptr == nullptr) throw std::bad_alloc(); return ptr; }
2. SpanList类中的补充
在上一篇文章中有说明SpanList类,它是管理Span的双向带头循环链表,下面是它的基本框架:
// 管理span的带头双向循环链表 class SpanList { public: // 构造函数中new一个哨兵位的头结点 SpanList() { _head = new Span; _head->_prev = _head; _head->_next = _head; } // 在pos位置插入一个新的大块跨度内存 void Insert(Span* pos, Span* newSpan); // 移除pos位置的一个大块跨度内存 void Erase(Span* pos); // 获取桶锁 std::mutex& GetSpanListMutex(); private: Span* _head; // 哨兵位的头结点 std::mutex _mtx; // 桶锁 };
这次我们要在SpanList类中补充5个成员函数,下面在类里的成员函数都是新补充的:
// 管理span的带头双向循环链表 class SpanList { public: // 获取第一个Span Span* Begin() { return _head->_next; } // 获取哨兵位头结点 Span* End() { return _head; } // 判空 bool Empty() { return Begin() == _head; } // 头删一个span对象并返回给外部 Span* PopFront() { assert(!Empty()); Span* ret = Begin(); Erase(Begin()); return ret; } // 头插一个span对象 void PushFront(Span* span) { assert(span); Insert(Begin(), span); } private: Span* _head; // 哨兵位的头结点 std::mutex _mtx; // 桶锁 };
3. SizeClass类中的补充
我们来梳理申请内存的顺序:
首先是线程向自己的ThreadCache申请一个小于256KB的对象空间,ThreadCache发现映射后的自由链表桶中没有对齐后的小块定长内存了,这时它向下一层CentralCache通过慢启动反馈调节法去批量要小块定长内存。
CentralCache发现自己对应桶中的Span没有切分好的小块定长内存,或者桶里直接就没有Span对象,这时CentralCache继续向下一层PageCache去要一个特定数量页的Span过来,然后CentralCache把这个Span切分成一个个小块定长内存,把它们一部分交给ThreadCache,一部分挂到自己桶里。
CentralCache向下一层PageCache去要一个特定数量页的Span过来,这个特定数量指的是多少呢?至少我们得保证这个连续页的大内存切分成小块定长内存后能满足CentralCache向上批发给ThreadCache的数量,最好还能剩余部分小块定长内存给CentralCache,基于这个思想我们综合设计了下面的方法:
// 计算一次向系统获取几个页合适 // 单个对象 8byte --- 分给CentralCache一个1页的Span // ... // 单个对象 256KB --- 分给CentralCache一个64页的Span static size_t SizeClass::NumMovePage(size_t alignBytes) { // LimitSize函数的定义在下面 size_t num = LimitSize(alignBytes); size_t npage = num * alignBytes; npage >>= PAGE_SHIFT; if (npage == 0) npage = 1; return npage; } // PS:这个LimitSize函数在上一篇有讲过 // ThreadCache一次从中心缓存批量获取小块定长内存数量的上、下限 static size_t LimitSize(size_t alignBytes) { assert(alignBytes != 0); // [2, 512],一次批量移动多少个对象的(慢启动)上限值 // 大对象(最大256KB)一次批量下限为2个 // 小对象(最小8字节)一次批量上限为512个 size_t num = MAX_BYTES / alignBytes; if (num < 2) num = 2; if (num > 512) num = 512; return num; }
补充说明:
PAGE_SHIFT的值是无符号整型数字13
static const size_t PAGE_SHIFT = 13;
四. PageCache成员函数的实现
在这里要着重说明PageCache::NewSpan(size_t k)这个函数,它的作用是返回一个k页的Span对象给上一层的CentralCache。
首先我们通过传入的参数k直接定址找到PageCache对应存储k页Span的SpanList桶PageCache::_spanLists[k],看看桶中是否挂有Span,有的话头删这个桶中的Span并返回给外部:
// 1、根据k直接定址映射,看对应SpanList桶中是否挂有Span if (!_spanLists[k].Empty()) { return _spanLists[k].PopFront(); }
如果k页的SpanList桶中没有挂Span,那么继续往下找更大页的Span,找到之后切分这个更大页的Span,切出k页Span给外部,剩下n-k页的Span挂到下标为n-k的SpanList桶中:
// 2、走到这里说明定址映射的桶中没有Span,那么看更大页的桶是否有Span for (size_t n = k + 1; n < NPAGES; ++n) { // 更大页的桶有Span的话就对其进行切分 if (!_spanLists[n].Empty()) { Span* kSpan = new Span(); Span* nSpan = _spanLists[n].PopFront(); kSpan->_n = k; kSpan->_pageId = nSpan->_pageId; nSpan->_n -= k; nSpan->_pageId += k; _spanLists[nSpan->_n].PushFront(nSpan); return kSpan; } }
PS:我们可以通过起始页的编号来计算起始页的地址,反过来也能计算:
如果更大页的Span都没有,那么就向堆中申请一个128页的连续大块未切分内存,然后从中切出需要的k也Span:
// 3、走到这一步说明整个PageCache中找不到一个>=k页的Span,这时向堆申请一个128页的Span Span* bigSpan = new Span; void* ptr = SystemAlloc(NPAGES - 1); bigSpan->_n = NPAGES - 1; bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT; _spanLists[NPAGES - 1].PushFront(bigSpan); return NewSpan(k);
PS:申请到128页大块内存后为了代码简洁就递归再次调用NewSpan(...)函数去执行第二步切分更大Span的逻辑,既然我们都知道要重新切了,再这样递归调用是否会消耗性能呢?答案是不会的,递归后从头开始执行到第二步的切分逻辑最多也不过128次循环而已,计算机处理128次循环须臾之间就能完成,所以为了代码简洁,我们直接递归调用就行。
函数完整代码实现:
// 返回给CentralCache一个k页的Span Span* PageCache::NewSpan(size_t k) { assert(k > 0 && k < NPAGES); // 1、根据k直接定址映射,看对应SpanList桶中是否挂有Span if (!_spanLists[k].Empty()) { return _spanLists[k].PopFront(); } // 2、走到这里说明定址映射的桶中没有Span,那么看更大页的桶是否有Span for (size_t n = k + 1; n < NPAGES; ++n) { // 更大页的桶有Span的话就对其进行切分 if (!_spanLists[n].Empty()) { Span* kSpan = new Span(); Span* nSpan = _spanLists[n].PopFront(); kSpan->_n = k; kSpan->_pageId = nSpan->_pageId; nSpan->_n -= k; nSpan->_pageId += k; _spanLists[nSpan->_n].PushFront(nSpan); return kSpan; } } // 3、走到这一步说明整个PageCache中找不到一个>=k页的Span,这时向堆申请一个128页的Span Span* bigSpan = new Span; void* ptr = SystemAlloc(NPAGES - 1); bigSpan->_n = NPAGES - 1; bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT; _spanLists[NPAGES - 1].PushFront(bigSpan); return NewSpan(k); }
五. CentralCache申请一个非空的span
CentralCache中有一个函数CentralCache::GetOneSpan(...)用于申请一个非空的span,我们现在可以实现这个函数了:
// 从特定的SpanList中获取一个非空的Span Span* CentralCache::GetOneSpan(SpanList& list, size_t alignBytes) { assert(alignBytes >= 8); // 1、先到链表中找非空的Span Span* it = list.Begin(); while (it != list.End()) { if (it->_freeList != nullptr) { return it; } else { it = it->_next; } } // 2、走到这里说明没有list中没有非空的Span,那么就需要到PageCache申请特定页大小的Span // 这里可以释放桶锁,后面逻辑不再访问该桶 list.GetSpanListMutex().unlock(); // 申请PageCache的表锁,到PageCache中申请特定页的Span过来 PageCache::GetInstance()->GetPageMutex().lock(); Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(alignBytes)); PageCache::GetInstance()->GetPageMutex().unlock(); // 把从PageCache申请到的Span切成一个个的小块定长内存 char* start = (char*)(span->_pageId << PAGE_SHIFT);// 起始页地址 size_t bytes = span->_n << PAGE_SHIFT;// 连续页的字节数 char* end = start + bytes;// 最后一页的最后地址 span->_freeList = start; start += alignBytes; void* tail = span->_freeList; while (start < end) { NextObj(tail) = start; tail = start; start += alignBytes; } NextObj(tail) = nullptr; // 3、重新申请桶锁,把切好的Span挂到桶上 list.GetSpanListMutex().lock(); list.PushFront(span); return span; }


