高并发内存池实战:用C++构建高性能服务器(上)

简介: 高并发内存池实战:用C++构建高性能服务器

前言:(Memory Pool)是一种内存分配方式,又被称为固定大小区块规划(fixed-size-blocks allocation)。通常我们习惯直接使用new、malloc等API申请分配内存,这样做的缺点在于:由于所申请内存块的大小不定,当频繁使用时会造成大量的内存碎片并进而降低性能。

秋招可以写进简历的6个实战项目:

当前项目是实现一个高并发的内存池,他的原型是google的一个开源项目tcmalloc,tcmalloc全称Thread-Caching Malloc,即线程缓存的malloc,实现了高效的多线程内存管理,用于替代系统的内存分配相关的函数(malloc、free)。

这个项目的要求的知识储备?

这个项目会用到C/C++、数据结构(链表、哈希桶)、操作系统内存管理、单例模式、多线程、互斥锁等等方面的知识。

一、基础概念

1.1什么是内存池

所谓池化技术,就是程序先向系统申请过量的资源,然后自己管理,当程序中需要申请内存时,不是直接向操作系统申请,而是直接从内存池中获取,释放内存时也不是将内存返回给操作系统,而是返回内存池中。

因为每次申请该资源都有较大的开销,这样提前申请好了,使用时就会非常快捷,能够大大提高程序运行效率。在计算机中有很多使用这种池技术的地方,例如线程池、连接池等。

动态内存申请malloc

C++中动态申请内存都是通过malloc去申请的,但实际上我们并不是直接去堆中获取内存的,而malloc就是一个内存池。

malloc() 相当于向系统 “批发” 了一块较大的内存空间,然后“零售” 给程序使用,当全部使用完或者程序有大量内存需求时,再根据需求向操作系统申请内存。

1.2内存池

内存池是指程序预先从操作系统申请一块足够大内存,此后,当程序中需要申请内存的时候,不是直接向操作系统申请,而是直接从内存池中获取;同理,当程序释放内存的时候,并不真正将内存返回给操作系统,而是返回内存池。当程序退出(或者特定时间)时,内存池才将之前申请的内存真正释放。

内存池主要解决的问题

内存池解决

  • 效率(主要)
  • 内存碎片。(系统的内存分配器的角度)

那么什么是内存碎片呢?

外部碎片一些空闲的连续内存区域太小,这些内存空间不连续,以至于合计的内存足够,但是不能满足一些的内存分配申请需求

内部碎片:由于一些对齐的需求,导致分配出去的空间中一些内存无法被利用。

1.3malloc

C/C++中我们要动态申请内存都是通过malloc去申请内存,但是我们要知道,实际我们不是直接去堆获取内存的。而malloc就是一个内存池。malloc() 相当于向操作系统“批发”了一块较大的内存空间,然后“零售”给程序用。当全部“售完”或程序有大量的内存需求时,再根据实际需求向操作系统“进货”。

malloc的实现方式有很多种,一般不同编译器平台用的都是不同的。比如windows的vs系列用的微软自己写的一套,linux gcc用的glibc中的ptmalloc。

640.png

原理图

二、定长内存池设计

2.1概述

作为程序员(C/C++)我们知道申请内存使用的是malloc,malloc其实就是一个通用的大众货,什么场景下都可以用,但是什么场景下都可以用就意味着什么场景下都不会有很高的性能,下面我们就先来设计一个定长内存池。

2.2设计思路

开辟内存:

  • 使用malloc开辟一大块内存,让_memory指针指向这个大块内存
  • _memory 设置为char* 类型,是为了方便切割时_memory向后移动多少字节数。

申请内存:

  • 将_memory强转为对应类型,然后赋值给对方,_memory指针向后移动对应字节数即可。
  • 如果有存在已经切割好的小块内存,则优先使用小块内存。

释放内存:

  • 用类型链表的结构来进行存储。
  • 用当前小块内存的头4字节存储下一个小块内存的地址,最后用_freeList指针指向第一个小块内存的地址(并不是将内存释放给操作系统)
  • 所以开辟内存时,开辟的内存大小必须大于或等于一个指针类型的大小。

代码位置:高并发内存池项目中的ObjectPool.h 文件。

2.3内容讲解

注意:核心三变量

char* _memory = nullptr; // 指向大块内存的指针
size_t _remainBytes = 0; // 大块内存分割过程中剩余的字节数
void* _freeList = nullptr; // 还回来的内存块用自由链接表的头指针链接起来

剩余内存字节数必须大于我们要申请的类型内存空间,回收内存块,我们只需要借助内存块头四个或八个字节(平台不同,指针字节数不同)储存下个内存块的地址。

链表链接内存块方式:数据结构头插思想

存储方式

*(void**)obj = _freeList;

程序结束申请的内存自动回收(内存池,我们不需要关心;回收问题)

实现思路:先从freeList(回收链表)中查找是否还有内存块

有就返回内存块地址,没有— 接下来从 _memory(这个大的内存块)中取;如果内存不足,再申请一大块内存。

2.4代码实现

.h文件

#pragma once
#include <iostream>
#include <vector>
#include <time.h>
using std::cout;
using std::endl;
#ifdef _WIN32
  #include<windows.h>
#else
#endif
// 直接去堆上按页申请空间
inline static void* SystemAlloc(size_t kpage)
{
#ifdef _WIN32
  void* ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else
  // linux下brk mmap等
#endif
  if (ptr == nullptr)
    throw std::bad_alloc();
  return ptr;
}
template <class T>
class myObject
{
public: 
  T* New()
  {
    // 优先把还回来内存块对象,再次重复利用
    T* obj = nullptr;
    if (_freeList)
    {
      //  头删
      void* next= *(void**)_freeList;
      obj = (T*)_freeList;
      _freeList = next;
    }
    else
    {
      // 剩余内存不够一个对象大小时,则重新开大块空间
      if (_remainBytes < sizeof(T))
      {
        _remainBytes = 1024 * 128;
        //_memory = (char*)malloc(_remainBytes);
        _memory = (char*)SystemAlloc(_remainBytes >> 13); // 按页申请内存
        if (_memory == nullptr)
        {
          throw std::bad_alloc();
        }
      }
      obj = (T*)_memory;
      size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T); //申请内存空间必须大于或等于一个指针的空间
      _memory += objSize;
      _remainBytes -= objSize;
    }
    // 定位new,显示调用T的构造函数初始化
    new(obj)T;
    return obj;
  }
  void Delete(T* obj)
  {
    // 显示调用析构函数清理对象
    obj->~T();
    //  头插
    *(void**)obj = _freeList; //  取前四或八个字节
    _freeList = obj;
  }
private:
  char* _memory = nullptr;  //  指向大块内存的指针
  size_t _remainBytes = 0;  //  大块内存分割过程中剩余的字节数
  void* _freeList = nullptr;  //  还回来的内存块用自由链接表的头指针链接起来
};
【文章福利】小编推荐自己的Linux内核技术交流群:【 865977150】整理了一些个人觉得比较好的学习书籍、视频资料共享在群文件里面,有需要的可以自行添加哦!!!

2.5效率(malloc与定长内存池)

测试代码:.cpp文件

struct TreeNode
{
  int _val;
  TreeNode* _left;
  TreeNode* _right;
  TreeNode()
    :_val(0)
    , _left(nullptr)
    , _right(nullptr)
  {}
};
void TestObjectPool()
{
  // 申请释放的轮次
  const size_t Rounds = 100;
  // 每轮申请释放多少次
  const size_t N = 1000000;
  std::vector<TreeNode*> v1;
  v1.reserve(N);
  size_t begin1 = clock();
  for (size_t j = 0; j < Rounds; ++j)
  {
    for (int i = 0; i < N; ++i)
    {
      v1.push_back(new TreeNode);
    }
    for (int i = 0; i < N; ++i)
    {
      delete v1[i];
    }
    v1.clear();
  }
  size_t end1 = clock();
  std::vector<TreeNode*> v2;
  v2.reserve(N);
  myObject<TreeNode> TNPool;
  size_t begin2 = clock();
  for (size_t j = 0; j < Rounds; ++j)
  {
    for (int i = 0; i < N; ++i)
    {
      v2.push_back(TNPool.New());
    }
    for (int i = 0; i < N; ++i)
    {
      TNPool.Delete(v2[i]);
    }
    v2.clear();
  }
  size_t end2 = clock();
  cout << "new cost time:" << end1 - begin1 << endl;
  cout << "object pool cost time:" << end2 - begin2 << endl;
}

测试结果图:

结论:明显内存池效率更快。

三、高并发内存池整体设计框架

现代很多的开发环境都是多核多线程,在申请内存的场景下,必然存在激烈的锁竞争问题。malloc本身其实已经很优秀,那么我们项目的原型tcmalloc就是在多线程高并发的场景下更胜一筹,所以这次我们实现的内存池需要解决以下几方面的问题。

  • 1.性能问题。
  • 2.多线程环境下,锁竞争问题。
  • 3.内存碎片问题。

3.1三层设计思路

第一层:thread cache(线程缓存):

每个线程独享一个thread cache,用于小于256k的内存分配情况,线程从这里申请内存不需要加锁(因为其他线程无法访问当前线程的 thread cache,没有竞争关系)

第二层:central cache(中心缓存):

所有线程共享一个central cache,thread cache 是按需从central cache中获取对象,central cache在合适的时候会收回thread cache中的对象,避免一个线程占用太多资源。

central cache 是所有线程共享的,所以存在竞争关系,需要加锁;这里使用的锁是桶锁,并且因为只有thread cache没有内存时才会申请内存,所以这里竞争不会太激烈。

第三层:page cache(页缓存):

页缓存存储的内存是以页为单位进行存储的及分配的。central cache没有内存时,则会从page cache中申请一定数量的page,并切割成定长大小的小内存块。

当一个span的几个跨度页的对象都回收后,page cache会回收central cache满足条件的span对象,并且合并相邻的页,组成更大的页,缓解内存碎片的问题。

3.2thread cache 设计思路(线程缓存)

thread cache是哈希桶结构,每个桶是一个按桶位置映射大小的内存块对象的自由链表。每个线程都会有一个thread cache对象,这样每个线程在这里获取对象和释放对象时是无锁的。

线程申请内存:

线程申请内存时,会有不同规格的内存申请(4字节、5字节等),根据范围划定不同的自由链表,设计多个自由链表管理不同规格的内存小块。实质就相当于使用多个定长内存池的自由链表,每个内存小块采用向上对齐原则(可能会出现多申请内存的情况,这就是内碎片)

例如:

  • 需要9字节,则开辟一个大小为2个8字节的空间的节点
  • 需要100字节,则开辟一个大小为13个8字节的空间的节点。

线程对齐规则:

整体控制在最多10%左右的内碎片浪费,总计设计208个桶,[0,15]个桶,每个桶的对齐数相差8字节(最高128字节对齐),[16,71]个桶,每个桶的对齐数相差16字节(最高1024字节对齐)以此类推。

注意:_freeLists是一个数组,每个元素都是自由链表类型(即存储自由链表的头结点)

线程释放内存

  • 释放内存后:采用自由链表的结构来管理切好的小块内存(每一个切分好的小块内存就是一个节点)
  • 具体方法是:用切分好的小块内存的前4字节或8字节来存储下一个小块内存的地址。
  • 插入节点时,采用头插的方式。

thread cache代码框架

本质:就相当于一个数组,每个数组挂个桶;而每个桶就类似于上面那个定长内存池一样。

相关数据:

static const size_t MAX_BYTES = 256 * 1024; // 一页8kb
static const size_t NFREELIST = 208;    //  总共桶的个数
  • index:表示_freeLists数组的下标也就是桶的位置
  • size:表示一块内存的字节数
  • ptr:表示需要挂起的内存地址

(ThreadCache.h文件)

// thread cache本质是由一个哈希映射的对象自由链表构成
class ThreadCache
{
public:
  // 申请和释放内存对象
  void* Allocate(size_t size);
  void Deallocate(void* ptr, size_t size);
  // 从中心缓存获取对象
  void* FetchFromCentralCache(size_t index, size_t size); 
private:
  FreeList _freeLists[NFREELIST];
};
// TLS thread local storage
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;
  • RoundUp函数:表示内存对齐数(例如:对齐数为8,size是7或者9对齐后实际分配内存大小为8或16)
  • Index函数:根据size大小计算出_freeLists数组下标位置(也就是桶的位置)
  • Empty函数:判断_freeLists数组下标位置挂接内存是否为空
  • Pop函数:_freeLists数组下标位置内存块头删取出来并返回该内存块地址
  • Push函数:_freeLists数组下标位置内存块头插挂接该内存块

(ThreadCache.cpp文件)

// 申请内存对象, 空间先到_freeLists链表中去取,如果没有去中心缓存获取空间
void* ThreadCache::Allocate(size_t size)
{
  assert(size <= MAX_BYTES);
  size_t alignSize = SizeClass::RoundUp(size);
  size_t index = SizeClass::Index(size);
  if (!_freeLists[index].Empty())
  {
    return _freeLists[index].Pop();
  }
  else
  {
    return FetchFromCentralCache(index, alignSize);
  }
}
  // 释放内存对象,空间挂起到_freeLists链表中去
void ThreadCache::Deallocate(void* ptr, size_t size)
{
  assert(ptr);
  assert(size <= MAX_BYTES);
  // 找对映射的自由链表桶,对象插入进入
  size_t index = SizeClass::Index(size);
  _freeLists[index].Push(ptr);
}

(common.h文件)

static void*& NextObj(void* obj)  //  前四个或八个字节(下一个结点的地址)
{
  return *(void**)obj;
}
// 管理小对象的自由链表
class FreeList
{
public:
  void Push(void* obj)
  {
    //  头插
    NextObj(obj) = _freeList;
    _freeList = obj;
  }
  void* Pop()
  {
    //  头删
    void* obj = _freeList;
    _freeList = NextObj(obj);
    return obj;
  }
  bool Empty()
  {
    return _freeList == nullptr;
  }
private:
  void* _freeList = nullptr;
};

自由链表的哈希桶跟对象大小的映射关系

(common.h文件)

// 计算对象大小的对齐映射规则
class SizeClass
{
public:
  // 整体控制在最多10%左右的内碎片浪费
  // [1,128]          8byte对齐     freelist[0,16)
  // [128+1,1024]       16byte对齐      freelist[16,72)
  // [1024+1,8*1024]      128byte对齐     freelist[72,128)
  // [8*1024+1,64*1024]   1024byte对齐     freelist[128,184)
  // [64*1024+1,256*1024]   8*1024byte对齐   freelist[184,208)
  static inline size_t _RoundUp(size_t bytes, size_t alignNum)  //  算出桶的大小(即size的对齐数)
  {
    return ((bytes + alignNum - 1) & ~(alignNum - 1));
  }
  // 对齐大小计算
  static inline size_t RoundUp(size_t size)
  {
    //  确定每个区间的对齐数
    if (size <= 128)
    {
      return _RoundUp(size, 8);
    }
    else if (size <= 1024)
    {
      return _RoundUp(size, 16);
    }
    else if (size <= 8 * 1024)
    {
      return _RoundUp(size, 128);
    }
    else if (size <= 64 * 1024)
    {
      return _RoundUp(size, 1024);
    }
    else if (size <= 256 * 1024)
    {
      return _RoundUp(size, 8 * 1024);
    }
    else
    {
      assert(false);
      return -1;
    }
  }
  static inline size_t _Index(size_t bytes, size_t align_shift) 
  {
    return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1; //  桶下标从0开始计数
  }
  // 计算映射的哪一个自由链表桶
  static inline size_t Index(size_t bytes)
  {
    assert(bytes <= MAX_BYTES);
    // 每个区间有多少个链
    static int group_array[4] = { 16, 56, 56, 56 };
    if (bytes <= 128) {
      return _Index(bytes, 3);
    }
    else if (bytes <= 1024) {
      return _Index(bytes - 128, 4) + group_array[0];
    }
    else if (bytes <= 8 * 1024) {
      return _Index(bytes - 1024, 7) + group_array[1] + group_array[0];
    }
    else if (bytes <= 64 * 1024) {
      return _Index(bytes - 8 * 1024, 10) + group_array[2] + group_array[1] + group_array[0];
    }
    else if (bytes <= 256 * 1024) {
      return _Index(bytes - 64 * 1024, 13) + group_array[3] + group_array[2] + group_array[1] + group_array[0];
    }
    else {
      assert(false);
    }
    return -1;
  }
};

3.3线程缓存无锁设计(TLS)

(ConcurrentAlloc.h文件)

#pragma once
#include "Common.h"
#include "ThreadCache.h"
static void* ConcurrentAlloc(size_t size)
{
  // 通过TLS 每个线程无锁的获取自己的专属的ThreadCache对象
  if (pTLSThreadCache == nullptr)
  {
    pTLSThreadCache = new ThreadCache;
  }
  cout << std::this_thread::get_id() << ":" << pTLSThreadCache << endl;
  return pTLSThreadCache->Allocate(size);
}
static void ConcurrentFree(void* ptr, size_t size)
{
  assert(pTLSThreadCache);
  pTLSThreadCache->Deallocate(ptr, size);
}
  • TLS:thread local storage 线程本地存储(linux和Windows下有各自的TLS)
  • TLS是一种变量的存储方式,这个变量在它所在的线程内是全局可访问的,但是不能被其他线程访问,这样就保证了线程的独立性。
  • 静态TLS使用方法:
  • _declspec(thread) DWORD data=0;
  • 声明了一个 _declspec(thread) 类型的变量,会为每一个线程创建一个单独的拷贝。

原理:

  • 在x86 CPU上,将为每次引用的静态TLS变量生成3个辅助机器指令
  • 如果在进程中创建子线程,那么系统将会捕获它并且自动分配一另一个内存块,以便存放新线程的静态TLS变量。
//每个线程刚创建时,就会获得这个指针,每个线程的指针是不同的。
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;
//static 修饰,可以保证变量只在当前文件可见。

简单测试一下:

void Alloc1()
{
  for (size_t i = 0; i < 5; ++i)
  {
    void* ptr = ConcurrentAlloc(16);
  }
}
void Alloc2()
{
  for (size_t i = 0; i < 5; ++i)
  {
    void* ptr = ConcurrentAlloc(17);
  }
}
void TLSTest()
{
  std::thread t1(Alloc1);
  t1.join();
  std::thread t2(Alloc2);
  t2.join();
}
int main()
{
  //TestObjectPool();
  TLSTest();
  return 0;
}

640.jpg

3.4central cache设计思路(中心缓存)

central cache也是一个哈希桶结构,他的哈希桶的映射关系跟thread cache是一样的。不同的是他的每个哈希桶位置挂是SpanList链表结构,不过每个映射桶下面的span中的大内存块被按映射关系切成了一个个小内存块对象挂在span的自由链表中。

640.jpg

注意:

  • span是双向链表,而span下挂载的小块内存对象是单链表。
  • 中心缓存需要加桶锁。
  • _spanLists 是一个数组,数组中每个元素都是一个span自由链表的_head头指针。
  • 每个span又是一个单向自由链表。

640.jpg

内存申请:

  1. 当thread cache中没有内存时,就会批量向central cache申请一些内存对象,从对应的span中取出小块内存对象给thread cache,这个过程是需要加锁的(加桶锁)
  2. 这里批量获取对象的数量使用了类似网络tcp协议拥塞控制的慢开始算法。
  3. 如果所有的span都为空了(即central cache使用完了),则将空的span链在一起,向page cache申请一个span对象,span对象中是一些以页为单位的内存,需要切成对应的小块内存对象,并链接起来,挂到span中。
  4. central cache中每一个span都有一个use_count,分配一个对象给thread cache,就加加。

内存释放:

  1. 当thread_cache将内存释放回central cache中的时,释放回来就减减use_count。
  2. 当use_count减到0时则表示所有对象都回到了span,则将span释放回page cache,page cache中会对前后相邻的空闲页进行合并。

CentralCache 代码框架:

  • 注意:
  • CentrealCache 是多个线程都可以进行访问的。我们设计时需要考虑下面两点
  • 单例模式 2. 临界资源加锁

(CentralCache.h文件)

#pragma once
#include "common.h"
//  单例模式---饿汉模式
class CentralCache
{
public:
  static CentralCache* GetInstance()
  {
    return &_sInst;
  }
  // 获取一个非空的span
  Span* GetOneSpan(SpanList& list, size_t byte_size);   //  list: 表示_spanLists中span的_freeList连续挂接空间的数量
  // 从中心缓存获取一定数量的对象给thread cache
  size_t FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size); //  size:内存空间大小;batchNum:内存空间个数
  //  start 和 end 表示内存空间地址起始到终末范围
private:
  SpanList _spanLists[NFREELIST];
private:
  CentralCache() {};
  CentralCache(const CentralCache& t) = delete;
  static CentralCache _sInst; //  声明
};

GetOneSpan函数:需要到PageCache模块去讲解(CentralCache.cpp文件)

CentralCache CentralCache::_sInst;  //定义
// 获取一个非空的span
Span* CentralCache::GetOneSpan(SpanList& list, size_t byte_size)
{
  //  ...
  return nullptr;
}
// 从中心缓存获取一定数量的对象给thread cache
size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size)
{
  //  每个桶加锁
  size_t index = SizeClass::Index(size);
  _spanLists[index]._mtx.lock();
  Span* span = GetOneSpan(_spanLists[index], size);
  assert(span);
  assert(span->_freeList);
  // 从span中获取batchNum个对象
  // 如果不够batchNum个,有多少拿多少
  start = span->_freeList;
  end = start;
  size_t i = 0, actualNum = 1;
  while (i < batchNum - 1 && NextObj(end) != nullptr)   //  取连续内存块,当batchNum超过原有时,按拥有的数量给
  {
    end = NextObj(end);
    ++i;
    ++actualNum;
  }
  span->_freeList = NextObj(end);   // 剩余空间仍挂在Span中的_freeList去
  NextObj(end) = nullptr;       //  最后end记录下一个空间地址置空
  span->_useCount += actualNum;
  _spanLists[index]._mtx.unlock();  //  解锁
  return actualNum;
}

以页为单位的大内存管理span的定义及spanlist定义(common.h文件)

// Span管理一个跨度的大块内存
// 管理以页为单位的大块内存
struct Span
{
  PAGE_ID _pageId = 0;  //  页号
  size_t n = 0;     //  页数
  //  双向链表的结构
  Span* next = nullptr;
  Span* prev = nullptr;
  size_t _useCount = 0;   //  使用计数,切好的小块内存分配给ThreadCache
  void* _freeList = nullptr;  // 大块内存切小链接起来,这样回收回来的内存也方便链接
};
// 带头双向循环链表
class SpanList
{
public:
  SpanList()
  {
    _head = new Span;
    _head->next = _head;
    _head->prev = _head;
  }
  Span* Begin()
  {
    return _head->next;
  }
  Span* End()
  {
    return _head;
  }
  void Insert(Span* pos, Span* newSpan)
  {
    //  头插
    Span* prev = pos->prev;
    prev->next = newSpan;
    newSpan->prev = prev;
    newSpan->next = pos;
    pos->prev = newSpan;
  }
  void Erase(Span* pos)
  {
    assert(pos != _head);
    Span* prev = pos->prev;
    Span* next = pos->next;
    prev->next = next;
    next->prev = prev;
  }
private:
  Span* _head;
public:
  std::mutex _mtx;  //  桶锁: 数组_spanLists中每一个位置的桶都有一把锁
};

慢开始反馈调节算法

补全hreadCache.h中FetchFromCentralCache函数只声明未实现

完善common.h文件

1)FreeList中成员变量和函数:

void PushRange(void* start, void* end)
  {
    //  一连串空间存储
    NextObj(end) = _freeList;
    _freeList = start;
  }

MaxSize函数调节页数:

640.jpg

2)类SizeClass的静态成员函数

// 一次从中心缓存获取多少个
  static size_t NumMoveSize(size_t size)
  {
    assert(size > 0);
    // [2, 512],一次批量移动多少个对象的(慢启动)上限值
    // 小对象一次批量上限高
    // 小对象一次批量上限低
    int num = MAX_BYTES / size;
    if (num < 2)
      num = 2;
    if (num > 512)
      num = 512;
    return num;
  }

(ThreadCache.cpp文件)

// 慢开始反馈调节算法
// 1、最开始不会一次向central cache一次批量要太多,因为要太多了可能用不完
// 2、如果你不要这个size大小内存需求,那么batchNum就会不断增长,直到上限
// 3、size越大,一次向central cache要的batchNum就越小
// 4、size越小,一次向central cache要的batchNum就越大
void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{
  //  慢开始反馈调节法    不会开始要太多内存
  size_t  batchNum = std::min(_freeLists[index].MaxSize(), SizeClass::NumMoveSize(size));
  if (batchNum == _freeLists[index].MaxSize())
  {
    ++_freeLists[index].MaxSize();
  }
  void* start = nullptr;
  void* end = nullptr;
  size_t actualNum = CentralCache::GetInstance()->FetchRangeObj(start, end, batchNum, size);  //  获取连续实际内存的数目
  assert(actualNum >= 1); //  至少一个
  if (actualNum == 1)
  {
    assert(start == end);
  }
  else
  {
    _freeLists[index].PushRange(NextObj(start), end);
  }
  return start;
}

3.5page cache设计思路(页缓存)

page cache中也是哈希桶结构,但是每个节点存储的都是span,因为页缓存是所有线程共享的,只需要定义一个对象,所以这里将 page cache 设计为单例模式(这里采用饿汉模式的设计方法)。

注意:page cache需要加整体锁(因为是所有线程共享的)

640.jpg

申请内存:

  • 当central cache向page cache申请内存时,page cache先检查对应位置有没有span,如果没有则向更大页寻找一个span,如果找到则分裂成两个。
  • 例如:申请的是4page,4page后面没有挂span,则向后面寻找更大的span,假设在10page位置找到一个span,则将10page span分裂为一个4page span和一个6page span,把4page的span返回给central cache,把6page的span挂载到对应的6号桶中去。
  • 如果找到128 page都没有合适的span,则向系统使用mmap、brk或者是VirtualAlloc等方式申请一个128page 的span挂在自由链表中,再重复1中的过程。
  • 第一次申请内存时,page cache是空的,经过上面的过程后会向堆申请一个128page的span,然后重复上面的过程,将128page的span进行分裂和重新挂载。

释放内存:

如果central cache释放回一个span,则依次寻找span的前后page id的span,看是否可以合并,如果合并继续向前寻找。这样就可以将切小的内存合并收缩成大的span,减少内存碎片。

640.jpg

PageCache 代码框架:

  1. page cache是一个以页为单位的span自由链表
  2. 为了保证全局只有唯一的page cache,这个类被设计成了单例模式。
  3. 注意线程安全,需要锁(此处我们采用整体锁)

相关数据:

static const size_t MAX_BYTES = 256 * 1024; // 一页8kb
static const size_t NFREELIST = 208;    //  central and thread总共桶的个数
static const size_t NPAGES = 129;   //  page总共桶的个数; 数组下标从1开始总共128页
static const size_t PAGE_SHIFT = 13;    //  一页字节数2^13

(PageCache.h文件)

#pragma once
#include "common.h"
class PageCache
{
public:
  static PageCache* GetInstance()
  {
    return &_sInst;
  }
  //  获取一个k页的span
  Span* NewSpan(size_t k);
  std::mutex _pageMtx;
private:
  SpanList _spanLists[NPAGES];
  std::unordered_map<PAGE_ID, Span*> _idSpanMap;
  //  页号于span映射;便于回收内存
  PageCache() {};
  PageCache(const PageCache& t) = delete;
  static PageCache _sInst;
};

完善一下common.h文件:类SpanList

void PushFront(Span* span)  // 头插
  {
    Insert(Begin(), span);
  }
  Span* PopFront()  //  头删
  {
    Span* front = _head->_next;
    Erase(front);
    return front;
  }
  bool Empty()
  {
    return _head->_next == _head;
  }

(PageCache.cpp文件)

注意:地址除以一页的字节数就是页号(Page中_pageId)Page中_n 表示几页(_n<<13表示这块内存的大小)

//  获取一个k页的span
Span* PageCache::NewSpan(size_t k)
{
  assert(k > 0 && k < NPAGES);
  //  先检查第k个桶里面有没有span
  if (!_spanLists[k].Empty())
  {
    return _spanLists[k].PopFront();  //  有就直接pop头结点并返回
  }
  //  检查一下后面的桶里面有没有span, 如果有可以把他它进行切分
  for (size_t i = k + 1; i < NPAGES; ++i)
  {
    if (!_spanLists[i].Empty())
    {
      //  进行切分
      Span* nSpan = _spanLists[i].PopFront(); //  i:位置取出它头结点
      Span* kSpan = new Span;   //  创k一个结点
      kSpan->_pageId = nSpan->_pageId;  //  页的起始位置
      nSpan->_pageId += k;  //  分出k页;n页位置向后走(相当与一大块内存取出头一小块内存)
      kSpan->_n = k;  //  得到k页
      nSpan->_n -= k;
      //  剩余空间; 挂接到nSpan->_n即(i-k)页的位置
      _spanLists[nSpan->_n].PushFront(nSpan);
      // 存储nSpan的首位页号跟nSpan映射,方便page cache回收内存时
      //进行的合并查找
      _idSpanMap[nSpan->_pageId] = nSpan;
      _idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan; // 例如:20 ~24(5页)
      //  建立id和span的映射; 方便(centralcache)回收小块内存
      for (size_t i = 0; i < kSpan->_n; ++i)
      {
        _idSpanMap[kSpan->_pageId + i] = kSpan;
      }
      return kSpan;
    }
  }
  //走到这个位置就说明后面没有大页的span了
  //这时就去找堆要一个128页(最大页---NPAGES-1)的span
  Span* bigSpan = new Span;
  void* ptr = SystemAlloc(NPAGES - 1);
  bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;  // 地址除以13;就是页号
  bigSpan->_n = NPAGES - 1;
  // 内存挂接到相应位置
  _spanLists[bigSpan->_n].PushFront(bigSpan);
  //  函数递归调用自己;减少代码冗余(上面的空间切分)在时间稍微影响,基本上没影响
  return NewSpan(k);
}

windows和Linux下如何直接向堆申请页为单位的大块内存:

VirtualAlloc

brk和mmap

我们按一页8k(8*1024)也就是2^13(1<<13)

(common.h文件)

#ifdef _WIN32
  #include<windows.h>
#else
  //  linux
#endif
// 直接去堆上按页申请空间
inline static void* SystemAlloc(size_t kpage)
{
#ifdef _WIN32
  void* ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else
  // linux下brk mmap等
#endif
  if (ptr == nullptr)
    throw std::bad_alloc();
  return ptr;
}

获取一个非空的span

完善一下common.h文件

  • 类SizeClass
// 计算一次向系统获取几个页
  // 单个对象 8byte
  // ...
  // 单个对象 256KB
  static size_t NumMovePage(size_t size)
  {
    size_t num = NumMoveSize(size); // 要几页
    size_t npage = num * size;  //总字节数
    npage >>= PAGE_SHIFT;   // 计算多少页,一页8k(2^13);则PAGE_SHIFT为13
    if (npage == 0)
      npage = 1;
    return npage;
  }


Span* CentralCache::GetOneSpan(SpanList& list, size_t byte_size)
{
  //  先从桶的span中找内存
  Span* it = list.Begin();
  while (it != list.End())
  {
    if (it->_freeList != nullptr)
    {
      return it;
    }
    it = it->_next;
  }
  // 先把central 桶锁解除:这样如果某他线程释放内存对象回来,不会阻塞
  list._mtx.unlock();
  //  访问page时 需要加锁
  PageCache::GetInstance()->_pageMtx.lock();
  //  没有空闲的span去PageCache中要
  Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(byte_size));  //  获取一个k页的span
  PageCache::GetInstance()->_pageMtx.unlock();
  //对获取span进行切分,不需要加锁,因为其他线程拿不到这个span
  char* start = (char*)(span->_pageId << PAGE_SHIFT); // k页的span的起始地址
  size_t bytes = span->_n << PAGE_SHIFT;  // 内存字节数
  char* end = start + bytes;
  //  把大块内存切成自由链表链表
  //  1. 先切一块作头,然后尾插(这样保证物理空间顺序与逻辑顺序保持一致)
  span->_freeList = start;
  start += byte_size;
  void* tail = span->_freeList;
  while (start < end)
  {
    NextObj(tail) = start;
    tail = NextObj(tail);
    start += byte_size;
  }
  NextObj(tail)=nullptr;
  //  挂节的时候恢复cengtral的锁
  list._mtx.lock();
  list.PushFront(span);
  return span;
}


相关文章
|
1月前
|
JSON 程序员 数据格式
深入探索 “JSON for Modern C++“:安装、构建与应用
深入探索 “JSON for Modern C++“:安装、构建与应用
40 0
|
1月前
|
存储 分布式计算 网络协议
阿里云服务器内存型r7、r8a、r8y实例区别参考
在阿里云目前的活动中,属于内存型实例规格的云服务器有内存型r7、内存型r8a、内存型r8y这几个实例规格,相比于活动内的经济型e、通用算力型u1实例来说,这些实例规格等性能更强,与计算型和通用型相比,它的内存更大,因此这些内存型实例规格主要适用于数据库、中间件和数据分析与挖掘,Hadoop、Spark集群等场景,本文为大家介绍内存型r7、r8a、r8y实例区别及最新活动价格,以供参考。
阿里云服务器内存型r7、r8a、r8y实例区别参考
|
1月前
|
开发工具 C语言 C++
CMake构建大型C/C++项目:跨平台设计与高级应用(二)
CMake构建大型C/C++项目:跨平台设计与高级应用
43 0
|
1月前
|
存储 C++ 开发者
C++程序设计基础:构建稳固的编程基石
C++程序设计基础:构建稳固的编程基石
23 1
|
13天前
|
数据采集 API 数据安全/隐私保护
畅游网络:构建C++网络爬虫的指南
本文介绍如何使用C++和cpprestsdk库构建高效网络爬虫,以抓取知乎热点信息。通过亿牛云爬虫代理服务解决IP限制问题,利用多线程提升数据采集速度。示例代码展示如何配置代理、发送HTTP请求及处理响应,实现多线程抓取。注意替换有效代理服务器参数,并处理异常。
畅游网络:构建C++网络爬虫的指南
|
24天前
|
存储 缓存 NoSQL
Redis 服务器指南:高性能内存数据库的完整使用指南
Redis 服务器指南:高性能内存数据库的完整使用指南
|
28天前
|
存储 缓存 PHP
阿里云服务器实例、CPU内存、带宽、操作系统选择参考
对于使用阿里云服务器的用户来说,云服务器的选择和使用非常重要,如果实例、内存、CPU、带宽等配置选择错误,可能会影响到自己业务在云服务器上的计算性能及后期运营状况,本文为大家介绍一下阿里云服务器实例、CPU内存、带宽、操作系统的选择注意事项,以供参考。
阿里云服务器实例、CPU内存、带宽、操作系统选择参考
|
1月前
|
算法 程序员 C语言
C++设计哲学:构建高效和灵活代码的艺术
C++设计哲学:构建高效和灵活代码的艺术
60 1
|
1月前
|
编译器 持续交付 项目管理
CMake构建大型C/C++项目:跨平台设计与高级应用(三)
CMake构建大型C/C++项目:跨平台设计与高级应用
44 0
|
1月前
|
编译器 Linux C语言
CMake构建大型C/C++项目:跨平台设计与高级应用(一)
CMake构建大型C/C++项目:跨平台设计与高级应用
70 0