高并发内存池实战:用C++构建高性能服务器(下)

简介: 高并发内存池实战:用C++构建高性能服务器

五、完善整体项目释放流程

5.1thread cache

  • 当链表的长度过长,则回收一部分内存对象到central cache。
  • Size函数:记录(_freeList)挂接内存的个数–(插入和删除在加一行计算代码)

ThreadCache.h文件中的ThreadCache类

增加一个类函数ListTooLong声明

注意:

PopRange:common文件中FreeList类的函数
CentralCache.h文件中的ReleaseListToSpans函数

ListTooLong函数定义实现

void ThreadCache::ListTooLong(FreeList& list, size_t size)
{
  void* start = nullptr;
  void* end = nullptr;
  list.PopRange(start, end, list.MaxSize());  //取消挂接。
  // 将一定数量的对象释放到span跨度(CentralCache)
  CentralCache::GetInstance()->ReleaseListToSpans(start, size);
}
  void PopRange(void*& start, void*& end, size_t n)
  {
    assert(n <= _size);
    //  取走连续的链表空间
    start = _freeList;  // 起始地址
    end = start;    // 末尾地址
    for (size_t i = 0; i < n - 1; ++i)
    {
      end = NextObj(end);
    }
    _freeList = NextObj(end); //  剩余空间继续挂接
    NextObj(end) = nullptr;
    _size -= n;
  }

5.2central cache

释放回来时 use_count-=1。当use_count减到0时则表示所有对象都回到了span,则将span释放回page cache,page cache中会对前后相邻的空闲页进行合并。

  • PageCache.h文件中ReleaseSpanToPageCache函数(释放空闲span回到Pagecache,并合并相邻的span)
  • PageCache类中的unordered_map映射函数
//  将一定数量的对象释放到span跨度(CentralCache)
void CentralCache::ReleaseListToSpans(void* start, size_t size)
{
  size_t index = SizeClass::Index(size);  //  确定桶的位置
  _spanLists[index]._mtx.lock();      //  上桶锁
  while (start)
  {
    void* next = NextObj(start);
    Span* span = PageCache::GetInstance()->MapObjectToSpan(start);  //  通过映射找到span的位置
    NextObj(start) = span->_freeList; //  头插
    span->_freeList = start;
    span->_useCount -= 1; //  内存块借出去个数
    if (span->_useCount == 0) //  说明小块内存全回来了; 这个span就可以回收合并了
    {
      _spanLists[index].Erase(span);
      span->_freeList = nullptr;
      span->_next = nullptr;
      span->_prev = nullptr;
      //释放span给page cache时,使用page cache的锁就可以了
      //这时把桶锁解掉
      _spanLists[index]._mtx.unlock();    //  解锁
      PageCache::GetInstance()->_pageMtx.lock();
      PageCache::GetInstance()->ReleaseSpanToPageCache(span);   // 释放空闲span回到Pagecache,并合并相邻的span
      PageCache::GetInstance()->_pageMtx.unlock();
      _spanLists[index]._mtx.lock();      //  上桶锁
    }
    start = next;
  }
  _spanLists[index]._mtx.unlock();    //  解锁
}


// 获取从对象到span的映射
Span* PageCache::MapObjectToSpan(void* obj)
{
  PAGE_ID id = ((PAGE_ID)obj >> PAGE_SHIFT);
  std::unique_lock<std::mutex> lock(_pageMtx);  //RAII风格的锁,把pagecache的锁给这个类。
  auto ret = _idSpanMap.find(id);
  if (ret != _idSpanMap.end())
  {
    return ret->second;
  }
  else
  {
    assert(false);
    return nullptr;
  }
  return nullptr;
}

5.3page cache

如果central cache释放回一个span,则依次寻找span的前后page id的没有在使用的空闲span,看是否可以合并,如果合并继续向前寻找。这样就可以将切小的内存合并收缩成大的span,减少内存碎片。

// 释放空闲span回到Pagecache,并合并相邻的span
void PageCache::ReleaseSpanToPageCache(Span* span)
{
  //  向前合并
  while (true)
  {
    //  对span前的页尝试进行合并.缓解内存碎片(外碎片)问题
    PAGE_ID prevId = span->_pageId - 1; //  前面一页
    auto ret = _idSpanMap.find(prevId); //  查找是否存在
    //  查找前后的相邻页尝试合并
    if (ret == _idSpanMap.end())
    {
      break;  //前面的页号没有;不合并了
    }
    Span* prevSpan = ret->second;
    if (prevSpan->_isUse == true)
    {
      break;  //  span正在使用;不能合并
    }
    if (prevSpan->_n + span->_n > NPAGES - 1)
    {
      break;  //  内存超过_spanLists数组最大存储; 超过桶的存储能力(没办法管理);不能存储了
    }
    //  可以往前合并了
    span->_pageId = prevSpan->_pageId;  //  起始地址往前
    span->_n += prevSpan->_n; //  页数增加了
    _spanLists[prevSpan->_n].Erase(prevSpan); //  合并后,取消挂节;防止野指针出现
    delete prevSpan;  //  合并后;处理掉前面相邻的span
  }
  //  向后合并
  while (true)
  {
    //  对span后的页尝试进行合并.缓解内存碎片(外碎片)问题
    PAGE_ID backId = span->_pageId + span->_n;  //  后面span的一页
    auto ret = _idSpanMap.find(backId); //  查找是否存在
    //  查找前后的相邻页尝试合并
    if (ret == _idSpanMap.end())
    {
      break;  //后面的页号没有;不合并了
    }
    Span* backSpan = ret->second;
    if (backSpan->_isUse == true)
    {
      break;  //  span正在使用;不能合并
    }
    if (backSpan->_n + span->_n > NPAGES - 1)
    {
      break;  //  内存超过_spanLists数组最大存储; 超过桶的存储能力(没办法管理);不能存储了
    }
    //  可以往后合并了
    span->_n += backSpan->_n; //  页数增加了
    _spanLists[backSpan->_n].Erase(backSpan); //  合并后,取消挂节;防止野指针出现
    delete backSpan;  //  合并后;处理掉后面相邻的span
  }
  //  挂节到PageCache
  _spanLists[span->_n].PushFront(span);
  span->_isUse = false;
  //  记录page中头和尾内存的映射
  _idSpanMap[span->_pageId] = span;
  _idSpanMap[span->_pageId + span->_n - 1] = span;
}

用定长内存池替代new和delete提高效率

我们之前每申请/释放一个Span结点就需要new/delete一个

解决方案:把之前写的定长内存池利用上;定义在pagecache中声明。

高并发内存池-大于256KB的大块内存申请与释放问题

1)概述

  • 1 内存 <= 256KB ->三层缓存(走内存池)
  • 2 内存>256KB
  • a. 128*8K >= size >32 * 8K -> page cacheb
  • b. size > 128 * 8k ->找系统堆

申请流程

ConcurrentAlloc函数完善

static void* ConcurrentAlloc(size_t size)
{
  if (size > MAX_BYTES) // 申请内存大于32页直接去PageCache申请内存。
  {
    size_t alignSize = SizeClass::RoundUp(size);  //  内存大小对齐数
    size_t kPage = alignSize >> PAGE_SHIFT;   //  算出页数
    Span* span = PageCache::GetInstance()->NewSpan(kPage);
    span->_objSize = size;
    void* ptr = (void*)(span->_pageId << PAGE_SHIFT); //  页内存的起始地址
    return ptr;
  }
  else
  {
    // 通过TLS 每个线程无锁的获取自己的专属的ThreadCache对象
    if (pTLSThreadCache == nullptr)
    {
      static myObject< ThreadCache> tcPool;
      //pTLSThreadCache = new ThreadCache;
      pTLSThreadCache = tcPool.New();
    }
    //cout << std::this_thread::get_id() << ":" << pTLSThreadCache << endl;
    return pTLSThreadCache->Allocate(size);
  }
}

2)NewSpan函数完善

//  获取一个k页的span
Span* PageCache::NewSpan(size_t k)
{
  assert(k > 0);
  if (k > NPAGES - 1)   // 大于128页去堆上申请
  {
    void* ptr = SystemAlloc(k);
    //Span* span = new Span;
    Span* span = _spanPool.New(); //  定长内存池,代替new;提高效率
    span->_pageId = ((PAGE_ID)ptr >> PAGE_SHIFT);
    span->_n = k;
    _idSpanMap[span->_pageId] = span; // 起始页号和span映射
    return span;
  }
  //  先检查第k个桶里面有没有span
  if (!_spanLists[k].Empty())
  {
    Span* kSpan= _spanLists[k].PopFront();  
    //  建立id和span的映射; 方便回收小块内存
    for (size_t i = 0; i < kSpan->_n; ++i)
    {
      _idSpanMap[kSpan->_pageId + i] = kSpan;
    }
    return kSpan; 
  }
  //  检查一下后面的桶里面有没有span, 如果有可以把他它进行切分
  for (size_t i = k + 1; i < NPAGES; ++i)
  {
    if (!_spanLists[i].Empty())
    {
      //  进行切分
      Span* nSpan = _spanLists[i].PopFront(); //  i:位置取出它头结点
      //Span* kSpan = new Span;   //  创k一个结点
      Span* kSpan = _spanPool.New();  //  定长内存池,代替new;提高效率
      kSpan->_pageId = nSpan->_pageId;  //  页的起始位置
      nSpan->_pageId += k;  //  分出k页;n页位置向后走(相当与一大块内存取出头一小块内存)
      kSpan->_n = k;  //  得到k页
      nSpan->_n -= k;
      //  剩余空间; 挂接到nSpan->_n即(i-k)页的位置
      _spanLists[nSpan->_n].PushFront(nSpan);
      // 存储nSpan的首位页号跟nSpan映射,方便page cache回收内存时
      //进行的合并查找
      _idSpanMap[nSpan->_pageId] = nSpan;
      _idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan; // 例如:20 ~24(5页)
      //  建立id和span的映射; 方便回收小块内存
      for (size_t i = 0; i < kSpan->_n; ++i)
      {
        _idSpanMap[kSpan->_pageId + i] = kSpan;
      }
      return kSpan;
    }
  }
  //走到这个位置就说明后面没有大页的span了
  //这时就去找堆要一个128页(最大页---NPAGES-1)的span
  //Span* bigSpan = new Span;
  Span* bigSpan = _spanPool.New();  //  定长内存池,代替new;提高效率
  void* ptr = SystemAlloc(NPAGES - 1);
  bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;  // 地址除以13;就是页号
  bigSpan->_n = NPAGES - 1;
  // 内存挂接到相应位置
  _spanLists[bigSpan->_n].PushFront(bigSpan);
  //  函数递归调用自己;减少代码冗余(上面的空间切分)在时间稍微影响,基本上没影响
  return NewSpan(k);
}

3)释放流程

ConcurrentFree函数完善

我们要给一个内存块函数自己自动推演内存大小;并进行内存释放和回收。

解决方案:

a. 通过hash映射法(size与span)

b. span结构中增加一个成员变量size

我们采用b方案;在GetOneSpan函数内来添加大小

static void ConcurrentFree(void* ptr)
{
  Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr);  // 通过地址,找到span的映射位置
  size_t size = span->_objSize;
  if (size > MAX_BYTES)
  {
    PageCache::GetInstance()->_pageMtx.lock();
    PageCache::GetInstance()->ReleaseSpanToPageCache(span);   // 释放空闲span回到Pagecache,并合并相邻的span
    PageCache::GetInstance()->_pageMtx.unlock();
  }
  else
  {
    assert(pTLSThreadCache);
    pTLSThreadCache->Deallocate(ptr, size);
  }
}

ReleaseSpanToPageCache函数完善

// 释放空闲span回到Pagecache,并合并相邻的span
void PageCache::ReleaseSpanToPageCache(Span* span)
{
  if (span->_n > NPAGES - 1)  //  大于128 页直接向堆释放掉
  {
    void* ptr = (void*)(span->_pageId << PAGE_SHIFT); //通过页号起始,算出起始页的地址
    SystemFree(ptr);  //  释放
    //  delete span;
    _spanPool.Delete(span); //  定长内存池代替delete;提高效率
    return;
  }
  //  向前合并
  while (true)
  {
    //  对span前的页尝试进行合并.缓解内存碎片(外碎片)问题
    PAGE_ID prevId = span->_pageId - 1; //  前面一页
    auto ret = _idSpanMap.find(prevId); //  查找是否存在
    //  查找前后的相邻页尝试合并
    if (ret == _idSpanMap.end())
    {
      break;  //前面的页号没有;不合并了
    }
    Span* prevSpan = ret->second;
    if (prevSpan->_isUse == true)
    {
      break;  //  span正在使用;不能合并
    }
    if (prevSpan->_n + span->_n > NPAGES - 1)
    {
      break;  //  内存超过_spanLists数组最大存储; 超过桶的存储能力(没办法管理);不能存储了
    }
    //  可以往前合并了
    span->_pageId = prevSpan->_pageId;  //  起始地址往前
    span->_n += prevSpan->_n; //  页数增加了
    _spanLists[prevSpan->_n].Erase(prevSpan); //  合并后,取消挂节;防止野指针出现
    //delete prevSpan;  //  合并后;处理掉前面相邻的span
    _spanPool.Delete(prevSpan); //  定长内存池代替delete;提高效率
  }
  //  向后合并
  while (true)
  {
    //  对span后的页尝试进行合并.缓解内存碎片(外碎片)问题
    PAGE_ID backId = span->_pageId + span->_n;  //  后面span的一页
    auto ret = _idSpanMap.find(backId); //  查找是否存在
    //  查找前后的相邻页尝试合并
    if (ret == _idSpanMap.end())
    {
      break;  //后面的页号没有;不合并了
    }
    Span* backSpan = ret->second;
    if (backSpan->_isUse == true)
    {
      break;  //  span正在使用;不能合并
    }
    if (backSpan->_n + span->_n > NPAGES - 1)
    {
      break;  //  内存超过_spanLists数组最大存储; 超过桶的存储能力(没办法管理);不能存储了
    }
    //  可以往后合并了
    span->_n += backSpan->_n; //  页数增加了
    _spanLists[backSpan->_n].Erase(backSpan); //  合并后,取消挂节;防止野指针出现
    //delete backSpan;  //  合并后;处理掉后面相邻的span
    _spanPool.Delete(backSpan); //  定长内存池代替delete;提高效率
  }
  //  挂节到PageCache
  _spanLists[span->_n].PushFront(span);
  span->_isUse = false;
  //  记录page中头和尾内存的映射
  _idSpanMap[span->_pageId] = span;
  _idSpanMap[span->_pageId + span->_n - 1] = span;
}

高并发内存池-针对性能瓶颈使用基数树进行优化

1)概述

通过性能检测,我们明显发现;内存池性能很差???

通过性能检测;我们发现我们映射中的lock(加锁)消耗性能(映射加锁是为了保证多线程安全)

三层空间

不同平台下,存储页号需要位数不同

  • 32- - -19位
  • 64- - -51位

一层

本质是id与span*的映射

使用tcmalloc源码中实现基数树进行优化

利用hash映射的部分全部改成基树(读写分离)

#pragma once
#include"Common.h"
// Single-level array
template <int BITS>
class TCMalloc_PageMap1 {
private:
  static const int LENGTH = 1 << BITS;
  void** array_;
public:
  typedef uintptr_t Number;
  //explicit TCMalloc_PageMap1(void* (*allocator)(size_t)) {
  explicit TCMalloc_PageMap1() {
    //array_ = reinterpret_cast<void**>((*allocator)(sizeof(void*) << BITS));
    size_t size = sizeof(void*) << BITS;
    size_t alignSize = SizeClass::_RoundUp(size, 1 << PAGE_SHIFT);
    array_ = (void**)SystemAlloc(alignSize >> PAGE_SHIFT);
    memset(array_, 0, sizeof(void*) << BITS);
  }
  // Return the current value for KEY.  Returns NULL if not yet set,
  // or if k is out of range.
  void* get(Number k) const {
    if ((k >> BITS) > 0) {
      return NULL;
    }
    return array_[k];
  }
  // REQUIRES "k" is in range "[0,2^BITS-1]".
  // REQUIRES "k" has been ensured before.
  //
  // Sets the value 'v' for key 'k'.
  void set(Number k, void* v) {
    array_[k] = v;
  }
};
// Two-level radix tree
template <int BITS>
class TCMalloc_PageMap2 {
private:
  // Put 32 entries in the root and (2^BITS)/32 entries in each leaf.
  static const int ROOT_BITS = 5;
  static const int ROOT_LENGTH = 1 << ROOT_BITS;
  static const int LEAF_BITS = BITS - ROOT_BITS;
  static const int LEAF_LENGTH = 1 << LEAF_BITS;
  // Leaf node
  struct Leaf {
    void* values[LEAF_LENGTH];
  };
  Leaf* root_[ROOT_LENGTH];             // Pointers to 32 child nodes
  void* (*allocator_)(size_t);          // Memory allocator
public:
  typedef uintptr_t Number;
  //explicit TCMalloc_PageMap2(void* (*allocator)(size_t)) {
  explicit TCMalloc_PageMap2() {
    //allocator_ = allocator;
    memset(root_, 0, sizeof(root_));
    PreallocateMoreMemory();
  }
  void* get(Number k) const {
    const Number i1 = k >> LEAF_BITS;
    const Number i2 = k & (LEAF_LENGTH - 1);
    if ((k >> BITS) > 0 || root_[i1] == NULL) {
      return NULL;
    }
    return root_[i1]->values[i2];
  }
  void set(Number k, void* v) {
    const Number i1 = k >> LEAF_BITS;
    const Number i2 = k & (LEAF_LENGTH - 1);
    assert(i1 < ROOT_LENGTH);
    root_[i1]->values[i2] = v;
  }
  bool Ensure(Number start, size_t n) {
    for (Number key = start; key <= start + n - 1;) {
      const Number i1 = key >> LEAF_BITS;
      // Check for overflow
      if (i1 >= ROOT_LENGTH)
        return false;
      // Make 2nd level node if necessary
      if (root_[i1] == NULL) {
        //Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)(sizeof(Leaf)));
        //if (leaf == NULL) return false;
        static myObject<Leaf> leafPool;
        Leaf* leaf = (Leaf*)leafPool.New();
        memset(leaf, 0, sizeof(*leaf));
        root_[i1] = leaf;
      }
      // Advance key past whatever is covered by this leaf node
      key = ((key >> LEAF_BITS) + 1) << LEAF_BITS;
    }
    return true;
  }
  void PreallocateMoreMemory() {
    // Allocate enough to keep track of all possible pages
    Ensure(0, 1 << BITS);
  }
};
// Three-level radix tree
template <int BITS>
class TCMalloc_PageMap3 {
private:
  // How many bits should we consume at each interior level
  static const int INTERIOR_BITS = (BITS + 2) / 3; // Round-up
  static const int INTERIOR_LENGTH = 1 << INTERIOR_BITS;
  // How many bits should we consume at leaf level
  static const int LEAF_BITS = BITS - 2 * INTERIOR_BITS;
  static const int LEAF_LENGTH = 1 << LEAF_BITS;
  // Interior node
  struct Node {
    Node* ptrs[INTERIOR_LENGTH];
  };
  // Leaf node
  struct Leaf {
    void* values[LEAF_LENGTH];
  };
  Node* root_;                          // Root of radix tree
  void* (*allocator_)(size_t);          // Memory allocator
  Node* NewNode() {
    Node* result = reinterpret_cast<Node*>((*allocator_)(sizeof(Node)));
    if (result != NULL) {
      memset(result, 0, sizeof(*result));
    }
    return result;
  }
public:
  typedef uintptr_t Number;
  explicit TCMalloc_PageMap3(void* (*allocator)(size_t)) {
    allocator_ = allocator;
    root_ = NewNode();
  }
  void* get(Number k) const {
    const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);
    const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);
    const Number i3 = k & (LEAF_LENGTH - 1);
    if ((k >> BITS) > 0 ||
      root_->ptrs[i1] == NULL || root_->ptrs[i1]->ptrs[i2] == NULL) {
      return NULL;
    }
    return reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3];
  }
  void set(Number k, void* v) {
    ASSERT(k >> BITS == 0);
    const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);
    const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);
    const Number i3 = k & (LEAF_LENGTH - 1);
    reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3] = v;
  }
  bool Ensure(Number start, size_t n) {
    for (Number key = start; key <= start + n - 1;) {
      const Number i1 = key >> (LEAF_BITS + INTERIOR_BITS);
      const Number i2 = (key >> LEAF_BITS) & (INTERIOR_LENGTH - 1);
      // Check for overflow
      if (i1 >= INTERIOR_LENGTH || i2 >= INTERIOR_LENGTH)
        return false;
      // Make 2nd level node if necessary
      if (root_->ptrs[i1] == NULL) {
        Node* n = NewNode();
        if (n == NULL) return false;
        root_->ptrs[i1] = n;
      }
      // Make leaf node if necessary
      if (root_->ptrs[i1]->ptrs[i2] == NULL) {
        Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)(sizeof(Leaf)));
        if (leaf == NULL) return false;
        memset(leaf, 0, sizeof(*leaf));
        root_->ptrs[i1]->ptrs[i2] = reinterpret_cast<Node*>(leaf);
      }
      // Advance key past whatever is covered by this leaf node
      key = ((key >> LEAF_BITS) + 1) << LEAF_BITS;
    }
    return true;
  }
  void PreallocateMoreMemory() {
  }
};

基树与hash对比

1、只有在这两个函数中回去建立id和span的映射,也就是说回去写

2、基数树,写之前会提前开好空间,写数据过程中,不会动结构。

3、读写是分离的。线程1对一个位置读写的时候,线程2不640.jpg可能对这个位置读写


5.4框架代码和代码总结

common.h共享文件

#pragma once
#include <iostream>
#include <vector>
#include <thread>
#include <algorithm>
#include <mutex>
#include <unordered_map>
#include <time.h>
#include <assert.h>
using std::cout;
using std::endl;
static const size_t MAX_BYTES = 256 * 1024; // 一页8kb; 32页
static const size_t NFREELIST = 208;    //  central and thread总共桶的个数
static const size_t NPAGES = 129;   //  page总共桶的个数; 数组下标从1开始总共128页
static const size_t PAGE_SHIFT = 13;    //  一页字节数2^13
//  不同平台下页数不同;可能导致存页数变量范围不够
#ifdef _WIN64
  typedef unsigned long long PAGE_ID;
#elif _WIN32
  typedef size_t PAGE_ID;
#else
  // Linux
#endif  
#ifdef _WIN32
  #include<windows.h>
#else
  //  linux
#endif
// 直接去堆上按页申请空间
inline static void* SystemAlloc(size_t kpage)
{
#ifdef _WIN32
  void* ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else
  // linux下brk mmap等
#endif
  if (ptr == nullptr)
    throw std::bad_alloc();
  return ptr;
}
//  释放内存
inline static void SystemFree(void* ptr)
{
#ifdef _WIN32
  VirtualFree(ptr, 0, MEM_RELEASE);
#else
  // sbrk unmmap等
#endif
}
static void*& NextObj(void* obj)  //  前四个或八个字节(下一个结点的地址)
{
  return *(void**)obj;
}
// 管理小对象的自由链表
class FreeList
{
public:
  void Push(void* obj)
  {
    //  头插
    NextObj(obj) = _freeList;
    _freeList = obj;
    ++_size;
  }
  void* Pop()
  {
    assert(_freeList);
    //  头删
    void* obj = _freeList;
    _freeList = NextObj(obj);
    --_size;
    return obj;
  }
  void PushRange(void* start, void* end, size_t n)
  {
    //  一连串空间存储
    NextObj(end) = _freeList;
    _freeList = start;
    _size += n;
  }
  void PopRange(void*& start, void*& end, size_t n)
  {
    assert(n <= _size);
    //  取走连续的链表空间
    start = _freeList;  // 起始地址
    end = start;    // 末尾地址
    for (size_t i = 0; i < n - 1; ++i)
    {
      end = NextObj(end);
    }
    _freeList = NextObj(end); //  剩余空间继续挂接
    NextObj(end) = nullptr;
    _size -= n;
  }
  bool Empty()
  {
    return _freeList == nullptr;
  }
  size_t& MaxSize()
  {
    return _maxSize;
  }
  size_t Size()
  {
    return _size;
  }
private:
  void* _freeList = nullptr;
  size_t _maxSize = 1;  // 表示向CentrealCache要空间数量的上限
  size_t _size = 0;   // 记录(_freeList)挂接内存的个数
};
// 计算对象大小的对齐映射规则
struct SizeClass
{
  // 整体控制在最多10%左右的内碎片浪费
  // [1,128]          8byte对齐     freelist[0,16)
  // [128+1,1024]       16byte对齐      freelist[16,72)
  // [1024+1,8*1024]      128byte对齐     freelist[72,128)
  // [8*1024+1,64*1024]   1024byte对齐     freelist[128,184)
  // [64*1024+1,256*1024]   8*1024byte对齐   freelist[184,208)
  static inline size_t _RoundUp(size_t bytes, size_t alignNum)  //  算出桶的大小(即size的对齐数)
  {
    return ((bytes + alignNum - 1) & ~(alignNum - 1));
  }
  // 对齐大小计算
  static inline size_t RoundUp(size_t size)
  {
    //  确定每个区间的对齐数
    if (size <= 128)
    {
      return _RoundUp(size, 8);
    }
    else if (size <= 1024)
    {
      return _RoundUp(size, 16);
    }
    else if (size <= 8 * 1024)
    {
      return _RoundUp(size, 128);
    }
    else if (size <= 64 * 1024)
    {
      return _RoundUp(size, 1024);
    }
    else if (size <= 256 * 1024)
    {
      return _RoundUp(size, 8 * 1024);
    }
    else
    {
      return _RoundUp(size, 8 * 1024);
    }
  }
  static inline size_t _Index(size_t bytes, size_t align_shift) 
  {
    return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1; //  桶下标从0开始计数
  }
  // 计算映射的哪一个自由链表桶
  static inline size_t Index(size_t bytes)
  {
    assert(bytes <= MAX_BYTES);
    // 每个区间有多少个链
    static int group_array[4] = { 16, 56, 56, 56 };
    if (bytes <= 128) {
      return _Index(bytes, 3);
    }
    else if (bytes <= 1024) {
      return _Index(bytes - 128, 4) + group_array[0];
    }
    else if (bytes <= 8 * 1024) {
      return _Index(bytes - 1024, 7) + group_array[1] + group_array[0];
    }
    else if (bytes <= 64 * 1024) {
      return _Index(bytes - 8 * 1024, 10) + group_array[2] + group_array[1] + group_array[0];
    }
    else if (bytes <= 256 * 1024) {
      return _Index(bytes - 64 * 1024, 13) + group_array[3] + group_array[2] + group_array[1] + group_array[0];
    }
    else {
      assert(false);
    }
    return -1;
  }
  // 一次从中心缓存获取多少个
  static size_t NumMoveSize(size_t size)
  {
    assert(size > 0);
    // [2, 512],一次批量移动多少个对象的(慢启动)上限值
    // 小对象一次批量上限高
    // 小对象一次批量上限低
    int num = MAX_BYTES / size;
    if (num < 2)
      num = 2;
    if (num > 512)
      num = 512;
    return num;
  }
  // 计算一次向系统获取几个页
  // 单个对象 8byte
  // ...
  // 单个对象 256KB
  static size_t NumMovePage(size_t size)
  {
    size_t num = NumMoveSize(size); // 要几页
    size_t npage = num * size;  //总字节数
    npage >>= PAGE_SHIFT;   // 计算多少页,一页8k(2^13);则PAGE_SHIFT为13
    if (npage == 0)
      npage = 1;
    return npage;
  }
};
// Span管理一个跨度的大块内存
// 管理以页为单位的大块内存
struct Span
{ 
  PAGE_ID _pageId = 0;  //  页号
  size_t _n = 0;      //  页数
  //  双向链表的结构
  Span* _next = nullptr;
  Span* _prev = nullptr;
  size_t _objSize = 0;    //  切好的小对象大小
  size_t _useCount = 0;   //  使用计数,切好的小块内存分配给ThreadCache
  void* _freeList = nullptr;  //  大块内存切小链接起来,这样回收回来的内存也方便链接
  bool _isUse = false;    //  是否在被使用
};
// 带头双向循环链表
class SpanList
{
public:
  SpanList()
  {
    _head = new Span;
    _head->_next = _head;
    _head->_prev = _head;
  }
  Span* Begin()
  {
    return _head->_next;
  }
  Span* End()
  {
    return _head;
  }
  bool Empty()
  {
    return _head->_next == _head;
  }
  void PushFront(Span* span)
  {
    Insert(Begin(), span);
  }
  Span* PopFront()
  {
    Span* front = _head->_next;
    Erase(front);
    return front;
  }
  void Insert(Span* pos, Span* newSpan)
  {
    assert(pos);
    assert(newSpan);
    //  头插
    Span* prev = pos->_prev;
    prev->_next = newSpan;
    newSpan->_prev = prev;
    newSpan->_next = pos;
    pos->_prev = newSpan;
  }
  void Erase(Span* pos)
  {
    assert(pos);
    assert(pos != _head);
    Span* prev = pos->_prev;
    Span* next = pos->_next;
    prev->_next = next;
    next->_prev = prev;
  }
private:
  Span* _head;
public:
  std::mutex _mtx;  //  桶锁: 数组_spanLists中每一个位置的桶都有一把锁
};

六、高并发内存池三大框架代码

6.1threadcache框架(声明)

#pragma once
#include "common.h"
// thread cache本质是由一个哈希映射的对象自由链表构成
class ThreadCache
{
public:
  // 申请和释放内存对象
  void* Allocate(size_t size);
  void Deallocate(void* ptr, size_t size);
  // 从中心缓存获取对象
  void* FetchFromCentralCache(size_t index, size_t size);
  // 释放对象时,链表过长时,回收内存回到中心缓存
  void ListTooLong(FreeList& list, size_t size);
private:
  FreeList _freeLists[NFREELIST];
};
// TLS thread local storage
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;

6.2CentralCache框架(声明)

#pragma once
#include "common.h"
//  单例模式---饿汉模式
class CentralCache
{
public:
  static CentralCache* GetInstance()
  {
    return &_sInst;
  }
  // 获取一个非空的span
  Span* GetOneSpan(SpanList& list, size_t byte_size);   //  list: 表示_spanLists中span的_freeList连续挂接空间的数量
  // 从中心缓存获取一定数量的对象给thread cache
  size_t FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size); //  size:内存空间大小;batchNum:内存空间个数
  //  start 和 end 表示内存空间地址起始到终末范围
  // 将一定数量的对象释放到span跨度
  void ReleaseListToSpans(void* start, size_t size);
private:
  SpanList _spanLists[NFREELIST];
private:
  CentralCache() {};
  CentralCache(const CentralCache& t) = delete;
  static CentralCache _sInst;
};

6.3pagecache框架(声明)

#pragma once
#include "common.h"
#include "ObjectPool.h"
#include "PageMap.h"
class PageCache
{
public:
  static PageCache* GetInstance()
  {
    return &_sInst;
  }
  //  获取一个k页的span
  Span* NewSpan(size_t k);
  // 获取从对象到span的映射
  Span* MapObjectToSpan(void* obj);
  // 释放空闲span回到Pagecache,并合并相邻的span
  void ReleaseSpanToPageCache(Span* span);
  std::mutex _pageMtx;
private:
  SpanList _spanLists[NPAGES];
  PageCache() {};
  PageCache(const PageCache& t) = delete;
  myObject<Span> _spanPool;
  //std::unordered_map<PAGE_ID, Span*> _idSpanMap;
  TCMalloc_PageMap2<32 - PAGE_SHIFT>_idSpanMap; //  基树
  static PageCache _sInst;
};

6.4threadcache框架(定义)

#pragma once
#include "ThreadCache.h"
#include "CentralCache.h"
// 慢开始反馈调节算法
// 1、最开始不会一次向central cache一次批量要太多,因为要太多了可能用不完
// 2、如果你不要这个size大小内存需求,那么batchNum就会不断增长,直到上限
// 3、size越大,一次向central cache要的batchNum就越小
// 4、size越小,一次向central cache要的batchNum就越大
void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{
  //  慢开始反馈调节法    不会开始要太多内存
  size_t  batchNum = min(_freeLists[index].MaxSize(), SizeClass::NumMoveSize(size));
  if (batchNum == _freeLists[index].MaxSize())
  {
    _freeLists[index].MaxSize() += 1;
  }
  void* start = nullptr;
  void* end = nullptr;
  size_t actualNum = CentralCache::GetInstance()->FetchRangeObj(start, end, batchNum, size);  //  获取连续实际内存的数目
  assert(actualNum >= 1); //  至少一个
  if (actualNum == 1)
  {
    assert(start == end);
  }
  else
  {
    _freeLists[index].PushRange(NextObj(start), end, actualNum - 1);
  }
  return start;
}
// 申请内存对象, 空间先到_freeLists链表中去取,如果没有去中心缓存获取空间
void* ThreadCache::Allocate(size_t size)
{
  assert(size <= MAX_BYTES);
  size_t alignSize = SizeClass::RoundUp(size);
  size_t index = SizeClass::Index(size);
  if (!_freeLists[index].Empty())
  {
    return _freeLists[index].Pop();
  }
  else
  {
    return FetchFromCentralCache(index, alignSize);
  }
}
  // 释放内存对象,空间挂起到_freeLists链表中去
void ThreadCache::Deallocate(void* ptr, size_t size)
{
  assert(ptr);
  assert(size <= MAX_BYTES);
  // 找对映射的自由链表桶,对象插入进入
  size_t index = SizeClass::Index(size);
  _freeLists[index].Push(ptr);
  //当链表长度大于一次批量申请的内存时就开始还一段list给central cache
  if (_freeLists[index].Size() >= _freeLists[index].MaxSize())
  {
    // 释放对象时,链表过长时,回收内存回到中心缓存
    ListTooLong(_freeLists[index], size);
  }
}
void ThreadCache::ListTooLong(FreeList& list, size_t size)
{
  void* start = nullptr;
  void* end = nullptr;
  list.PopRange(start, end, list.MaxSize());  //取消挂接。
  // 将一定数量的对象释放到span跨度(CentralCache)
  CentralCache::GetInstance()->ReleaseListToSpans(start, size);
}

6.5CentralCache框架(定义)

#pragma once
#include"CentralCache.h"
#include"PageCache.h"
CentralCache CentralCache::_sInst;  //定义
// 获取一个非空的span
Span* CentralCache::GetOneSpan(SpanList& list, size_t byte_size)
{
  //  先从桶的span中找内存(查看当前的spanlist中是否有还有未分配对象的span)
  Span* it = list.Begin();
  while (it != list.End())
  {
    if (it->_freeList != nullptr)
    {
      return it;
    }
    else
    {
      it = it->_next;
    }
  }
  // 先把central 桶锁解除:这样如果某他线程释放内存对象回来,不会阻塞
  list._mtx.unlock();
  //  访问page时 需要加锁
  PageCache::GetInstance()->_pageMtx.lock();
  //  没有空闲的span去PageCache中要
  Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(byte_size));  //  获取一个k页的span
  span->_isUse = true;  //  内存span已经使用了
  span->_objSize = byte_size; //  切分内存大小记录下来;便于改良回收
  PageCache::GetInstance()->_pageMtx.unlock();
  //对获取span进行切分,不需要加锁,因为其他线程拿不到这个span
  char* start = (char*)(span->_pageId << PAGE_SHIFT); // k页的span的起始地址
  size_t bytes = span->_n << PAGE_SHIFT;  // 内存字节数
  char* end = start + bytes;
  //  把大块内存切成自由链表链表
  //  1. 先切一块作头,然后尾插(这样保证物理空间顺序与逻辑顺序保持一致)
  span->_freeList = start;
  start += byte_size;
  void* tail = span->_freeList;
  while (start < end)
  {
    NextObj(tail) = start;
    tail = NextObj(tail);
    start += byte_size;
  }
  NextObj(tail) = nullptr;  //  最后一个内存块下个内存地址置空
  //  挂节的时候恢复cengtral的锁
  list._mtx.lock();
  list.PushFront(span);
  return span;
}
// 从中心缓存获取一定数量的对象给thread cache
size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size)
{
  //  每个桶加锁
  size_t index = SizeClass::Index(size);
  _spanLists[index]._mtx.lock();
  Span* span = GetOneSpan(_spanLists[index], size);
  assert(span);
  assert(span->_freeList);
  // 从span中获取batchNum个对象
  // 如果不够batchNum个,有多少拿多少
  start = span->_freeList;
  end = start;
  size_t i = 0, actualNum = 1;
  while (i < batchNum - 1 && NextObj(end) != nullptr)   //  取连续内存块,当batchNum超过原有时,按拥有的数量给
  {
    end = NextObj(end);
    ++i;
    ++actualNum;
  }
  span->_freeList = NextObj(end);   // 剩余空间仍挂在Span中的_freeList去
  NextObj(end) = nullptr;       //  最后end记录下一个空间地址置空
  span->_useCount += actualNum;
  _spanLists[index]._mtx.unlock();  //  解锁
  return actualNum;
}
void CentralCache::ReleaseListToSpans(void* start, size_t size)
{
  size_t index = SizeClass::Index(size);  //  确定桶的位置
  _spanLists[index]._mtx.lock();      //  上桶锁
  while (start)
  {
    void* next = NextObj(start);
    Span* span = PageCache::GetInstance()->MapObjectToSpan(start);  //  通过映射找到span的位置
    NextObj(start) = span->_freeList; //  头插
    span->_freeList = start;
    span->_useCount -= 1; //  内存块借出去个数
    if (span->_useCount == 0) //  说明小块内存全回来了; 这个span就可以回收合并了
    {
      _spanLists[index].Erase(span);
      span->_freeList = nullptr;
      span->_next = nullptr;
      span->_prev = nullptr;
      //释放span给page cache时,使用page cache的锁就可以了
      //这时把桶锁解掉
      _spanLists[index]._mtx.unlock();    //  解锁
      PageCache::GetInstance()->_pageMtx.lock();
      PageCache::GetInstance()->ReleaseSpanToPageCache(span);   // 释放空闲span回到Pagecache,并合并相邻的span
      PageCache::GetInstance()->_pageMtx.unlock();
      _spanLists[index]._mtx.lock();      //  上桶锁
    }
    start = next;
  }
  _spanLists[index]._mtx.unlock();    //  解锁
}

6.6pagecache框架(定义)

#pragma once
#include "PageCache.h"
PageCache PageCache::_sInst;
//  获取一个k页的span
Span* PageCache::NewSpan(size_t k)
{
  assert(k > 0);
  if (k > NPAGES - 1)   // 大于128页去堆上申请
  {
    void* ptr = SystemAlloc(k);
    //Span* span = new Span;
    Span* span = _spanPool.New(); //  定长内存池,代替new;提高效率
    span->_pageId = ((PAGE_ID)ptr >> PAGE_SHIFT);
    span->_n = k;
    //_idSpanMap[span->_pageId] = span; // 起始页号和span映射
    _idSpanMap.set(span->_pageId, span);
    return span;
  }
  //  先检查第k个桶里面有没有span
  if (!_spanLists[k].Empty())
  {
    Span* kSpan= _spanLists[k].PopFront();  
    //  建立id和span的映射; 方便回收小块内存
    for (size_t i = 0; i < kSpan->_n; ++i)
    {
      //_idSpanMap[kSpan->_pageId + i] = kSpan;
      _idSpanMap.set(kSpan->_pageId + i, kSpan);
    }
    return kSpan; 
  }
  //  检查一下后面的桶里面有没有span, 如果有可以把他它进行切分
  for (size_t i = k + 1; i < NPAGES; ++i)
  {
    if (!_spanLists[i].Empty())
    {
      //  进行切分
      Span* nSpan = _spanLists[i].PopFront(); //  i:位置取出它头结点
      //Span* kSpan = new Span;   //  创k一个结点
      Span* kSpan = _spanPool.New();  //  定长内存池,代替new;提高效率
      kSpan->_pageId = nSpan->_pageId;  //  页的起始位置
      nSpan->_pageId += k;  //  分出k页;n页位置向后走(相当与一大块内存取出头一小块内存)
      kSpan->_n = k;  //  得到k页
      nSpan->_n -= k;
      //  剩余空间; 挂接到nSpan->_n即(i-k)页的位置
      _spanLists[nSpan->_n].PushFront(nSpan);
      // 存储nSpan的首位页号跟nSpan映射,方便page cache回收内存时
      //进行的合并查找
      //_idSpanMap[nSpan->_pageId] = nSpan;
      _idSpanMap.set(nSpan->_pageId, nSpan);
      //_idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan; // 例如:20 ~24(5页)
      _idSpanMap.set(nSpan->_pageId + nSpan->_n - 1, nSpan);
      //  建立id和span的映射; 方便回收小块内存
      for (size_t i = 0; i < kSpan->_n; ++i)
      {
        //_idSpanMap[kSpan->_pageId + i] = kSpan;
        _idSpanMap.set(kSpan->_pageId + i, kSpan);
      }
      return kSpan;
    }
  }
  //走到这个位置就说明后面没有大页的span了
  //这时就去找堆要一个128页(最大页---NPAGES-1)的span
  //Span* bigSpan = new Span;
  Span* bigSpan = _spanPool.New();  //  定长内存池,代替new;提高效率
  void* ptr = SystemAlloc(NPAGES - 1);
  bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;  // 地址除以13;就是页号
  bigSpan->_n = NPAGES - 1;
  // 内存挂接到相应位置
  _spanLists[bigSpan->_n].PushFront(bigSpan);
  //  函数递归调用自己;减少代码冗余(上面的空间切分)在时间稍微影响,基本上没影响
  return NewSpan(k);
}
// 获取从对象到span的映射
Span* PageCache::MapObjectToSpan(void* obj)
{
  PAGE_ID id = ((PAGE_ID)obj >> PAGE_SHIFT);
  //std::unique_lock<std::mutex> lock(_pageMtx);  //RAII风格的锁,把pagecache的锁给这个类。
  //auto ret = _idSpanMap.find(id);
  //if (ret != _idSpanMap.end())
  //{
  //  return ret->second;
  //}
  //else
  //{
  //  assert(false);
  //  return nullptr;
  //}
  Span* ret = (Span*)_idSpanMap.get(id);
  assert(ret != nullptr);
  return ret;
}
// 释放空闲span回到Pagecache,并合并相邻的span
void PageCache::ReleaseSpanToPageCache(Span* span)
{
  if (span->_n > NPAGES - 1)  //  大于128 页直接向堆释放掉
  {
    void* ptr = (void*)(span->_pageId << PAGE_SHIFT); //通过页号起始,算出起始页的地址
    SystemFree(ptr);  //  释放
    //  delete span;
    _spanPool.Delete(span); //  定长内存池代替delete;提高效率
    return;
  }
  //  向前合并
  while (true)
  {
    //  对span前的页尝试进行合并.缓解内存碎片(外碎片)问题
    PAGE_ID prevId = span->_pageId - 1; //  前面一页
    //auto ret = _idSpanMap.find(prevId); //  查找是否存在
    //  查找前后的相邻页尝试合并
    //if (ret == _idSpanMap.end())
    //{
    //  break;  //前面的页号没有;不合并了
    //}
    Span* ret = (Span*)_idSpanMap.get(prevId);
    if (ret == nullptr)
    {
      break;  //前面的页号没有;不合并了
    }
    //Span* prevSpan = ret->second;
    Span* prevSpan = ret;
    if (prevSpan->_isUse == true)
    {
      break;  //  span正在使用;不能合并
    }
    if (prevSpan->_n + span->_n > NPAGES - 1)
    {
      break;  //  内存超过_spanLists数组最大存储; 超过桶的存储能力(没办法管理);不能存储了
    }
    //  可以往前合并了
    span->_pageId = prevSpan->_pageId;  //  起始地址往前
    span->_n += prevSpan->_n; //  页数增加了
    _spanLists[prevSpan->_n].Erase(prevSpan); //  合并后,取消挂节;防止野指针出现
    //delete prevSpan;  //  合并后;处理掉前面相邻的span
    _spanPool.Delete(prevSpan); //  定长内存池代替delete;提高效率
  }
  //  向后合并
  while (true)
  {
    //  对span后的页尝试进行合并.缓解内存碎片(外碎片)问题
    PAGE_ID backId = span->_pageId + span->_n;  //  后面span的一页
    //auto ret = _idSpanMap.find(backId); //  查找是否存在
    //  查找前后的相邻页尝试合并
    //if (ret == _idSpanMap.end())
    //{
    //  break;  //后面的页号没有;不合并了
    //}
    Span* ret = (Span*)_idSpanMap.get(backId);
    if (ret == nullptr)
    {
      break;  //后面的页号没有;不合并了
    }
    Span* backSpan = ret;
    if (backSpan->_isUse == true)
    {
      break;  //  span正在使用;不能合并
    }
    if (backSpan->_n + span->_n > NPAGES - 1)
    {
      break;  //  内存超过_spanLists数组最大存储; 超过桶的存储能力(没办法管理);不能存储了
    }
    //  可以往后合并了
    span->_n += backSpan->_n; //  页数增加了
    _spanLists[backSpan->_n].Erase(backSpan); //  合并后,取消挂节;防止野指针出现
    //delete backSpan;  //  合并后;处理掉后面相邻的span
    _spanPool.Delete(backSpan); //  定长内存池代替delete;提高效率
  }
  //  挂节到PageCache
  _spanLists[span->_n].PushFront(span);
  span->_isUse = false;
  //  记录page中头和尾内存的映射
  //_idSpanMap[span->_pageId] = span;
  //_idSpanMap[span->_pageId + span->_n - 1] = span;
  _idSpanMap.set(span->_pageId, span);
  _idSpanMap.set(span->_pageId + span->_n - 1, span);
}

定长内存池

(ObjectPool.h文件)

作用替代高并发内存池中的new和delete

#pragma once
#include"common.h"
template <class T>
class myObject
{
public: 
  T* New()
  {
    // 优先把还回来内存块对象,再次重复利用
    T* obj = nullptr;
    if (_freeList)
    {
      //  头删
      void* next= *(void**)_freeList;
      obj = (T*)_freeList;
      _freeList = next;
    }
    else
    {
      // 剩余内存不够一个对象大小时,则重新开大块空间
      if (_remainBytes < sizeof(T))
      {
        _remainBytes = 1024 * 128;
        //_memory = (char*)malloc(_remainBytes);
        _memory = (char*)SystemAlloc(_remainBytes >> 13); // 按页申请内存
        if (_memory == nullptr)
        {
          throw std::bad_alloc();
        }
      }
      obj = (T*)_memory;
      size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T); //申请内存空间必须大于或等于一个指针的空间
      _memory += objSize;
      _remainBytes -= objSize;
    }
    // 定位new,显示调用T的构造函数初始化
    new(obj)T;
    return obj;
  }
  void Delete(T* obj)
  {
    // 显示调用析构函数清理对象
    obj->~T();
    //  头插
    *(void**)obj = _freeList; //  取前四或八个字节
    _freeList = obj;
  }
private:
  char* _memory = nullptr;  //  指向大块内存的指针
  size_t _remainBytes = 0;  //  大块内存分割过程中剩余的字节数
  void* _freeList = nullptr;  //  还回来的内存块用自由链接表的头指针链接起来
};

线程局部存储

  • 1 内存 <= 256KB ->三层缓存(走内存池)
  • 2 内存>256KB
  • a. 128*8K >= size >32 * 8K -> page cacheb
  • b. size > 128 * 8k ->找系统堆

(ConcurrentAlloc.h)文件

#pragma once
#include "Common.h"
#include "ThreadCache.h"
#include "PageCache.h"
#include "ObjectPool.h"
static void* ConcurrentAlloc(size_t size)
{
  if (size > MAX_BYTES) // 申请内存大于32页直接去PageCache申请内存。
  {
    size_t alignSize = SizeClass::RoundUp(size);  //  内存大小对齐数
    size_t kPage = alignSize >> PAGE_SHIFT;   //  算出页数
    Span* span = PageCache::GetInstance()->NewSpan(kPage);
    span->_objSize = size;
    void* ptr = (void*)(span->_pageId << PAGE_SHIFT); //  页内存的起始地址
    return ptr;
  }
  else
  {
    // 通过TLS 每个线程无锁的获取自己的专属的ThreadCache对象
    if (pTLSThreadCache == nullptr)
    {
      static myObject< ThreadCache> tcPool;
      //pTLSThreadCache = new ThreadCache;
      pTLSThreadCache = tcPool.New();
    }
    //cout << std::this_thread::get_id() << ":" << pTLSThreadCache << endl;
    return pTLSThreadCache->Allocate(size);
  }
}
static void ConcurrentFree(void* ptr)
{
  Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr);  // 通过地址,找到span的映射位置
  size_t size = span->_objSize;
  if (size > MAX_BYTES)
  {
    PageCache::GetInstance()->_pageMtx.lock();
    PageCache::GetInstance()->ReleaseSpanToPageCache(span);   // 释放空闲span回到Pagecache,并合并相邻的span
    PageCache::GetInstance()->_pageMtx.unlock();
  }
  else
  {
    assert(pTLSThreadCache);
    pTLSThreadCache->Deallocate(ptr, size);
  }
}

测试性能(系统的malloc与我们写的高并发内存池)

(Benchmark.cpp文件)

注意:

  • 测试代码中出现了lamda
  • 测试代码借鉴某大佬的(^ v ^)
#include "ConcurrentAlloc.h"
//  nworks:创建线程的次数
//  rounds:经历多少轮次
//  ntimes:一轮申请释放的次数
void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
  std::vector<std::thread> vthread(nworks);
  std::atomic<size_t> malloc_costtime = 0;
  std::atomic<size_t> free_costtime = 0;
  for (size_t k = 0; k < nworks; ++k)
  {
    vthread[k] = std::thread([&, k]() {
      std::vector<void*> v;
      v.reserve(ntimes);
      for (size_t j = 0; j < rounds; ++j)
      {
        size_t begin1 = clock();
        for (size_t i = 0; i < ntimes; i++)
        {
          //v.push_back(malloc(16));
          v.push_back(malloc((16 + i) % 8192 + 1));
        }
        size_t end1 = clock();
        size_t begin2 = clock();
        for (size_t i = 0; i < ntimes; i++)
        {
          free(v[i]);
        }
        size_t end2 = clock();
        v.clear();
        malloc_costtime += (end1 - begin1);
        free_costtime += (end2 - begin2);
      }
      });
  }
  for (auto& t : vthread)
  {
    t.join();
  }
  cout << nworks << "个线程并发执行" << rounds << "轮次,每轮次malloc" << ntimes << "次: 花费:" << malloc_costtime << "ms" << endl;
  cout << nworks << "个线程并发执行" << rounds << "轮次,每轮次malloc" << ntimes << "次: 花费:" << free_costtime << "ms" << endl;
  cout << nworks << "个线程并发malloc & free" << nworks * rounds * ntimes << "次,总计花费:" << malloc_costtime + free_costtime << "ms" << endl; 
}
// 单轮次申请释放次数 线程数 轮次
void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
  std::vector<std::thread> vthread(nworks);
  std::atomic<size_t> malloc_costtime = 0;
  std::atomic<size_t> free_costtime = 0;
  for (size_t k = 0; k < nworks; ++k)
  {
    vthread[k] = std::thread([&]() {
      std::vector<void*> v;
      v.reserve(ntimes);
      for (size_t j = 0; j < rounds; ++j)
      {
        size_t begin1 = clock();
        for (size_t i = 0; i < ntimes; i++)
        {
          //v.push_back(ConcurrentAlloc(16));
          v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1));
        }
        size_t end1 = clock();
        size_t begin2 = clock();
        for (size_t i = 0; i < ntimes; i++)
        {
          ConcurrentFree(v[i]);
        }
        size_t end2 = clock();
        v.clear();
        malloc_costtime += (end1 - begin1);
        free_costtime += (end2 - begin2);
      }
      });
  }
  for (auto& t : vthread)
  {
    t.join();
  }
  cout << nworks << "个线程并发执行" << rounds << "轮次,每轮次malloc" << ntimes << "次: 花费:" << malloc_costtime << "ms" << endl;
  cout << nworks << "个线程并发执行" << rounds << "轮次,每轮次malloc" << ntimes << "次: 花费:" << free_costtime << "ms" << endl;
  cout << nworks << "个线程并发malloc & free" << nworks * rounds * ntimes << "次,总计花费:" << malloc_costtime + free_costtime << "ms" << endl;
}
int main()
{
  size_t n = 1000;
  cout << "==========================================================" << endl;
  BenchmarkConcurrentMalloc(n, 4, 10);  //  自己写的内存池
  cout << endl << endl;
  BenchmarkMalloc(n, 4, 10);  // 系统malloc
  cout << "==========================================================" << endl;
  return 0;
}

七、性能测试结果图

明显效率大大滴提高了

640.jpg

当前项目实现的不足

不同平台替换方式不同。 基于unix的系统上的glibc,使用了weak alias的方式替换。具体来说是因为这些入口函数都被定义成了weak symbols,再加上gcc支持

alias attribute,所以替换就变成了这种通用形式。

相关文章
|
30天前
|
监控 中间件 Java
后端技术:构建高效、稳定的服务器端应用
【10月更文挑战第5天】后端技术:构建高效、稳定的服务器端应用
69 0
|
1月前
|
监控 关系型数据库 Serverless
探索后端技术:构建高效、可靠的服务器端应用
本文将深入探讨后端开发的核心概念和关键技术,从服务器架构到数据库管理,再到安全防护,为读者提供全面的后端技术指南。无论是初学者还是经验丰富的开发者,都能从中汲取灵感,提升自己的技术水平。
|
11天前
|
关系型数据库 API 数据库
后端开发的艺术:从零到一构建高效服务器
在数字化时代,后端开发是支撑现代互联网应用的基石。本文旨在探讨后端开发的核心概念、关键技术以及如何构建一个高效的服务器。我们将从基础的编程语言选择开始,逐步深入到数据库设计、API开发和性能优化等关键领域。通过实际案例分析,我们将揭示后端开发的复杂性和挑战性,同时提供实用的解决方案和最佳实践。无论你是初学者还是有经验的开发者,这篇文章都将为你提供宝贵的见解和启发。
|
22天前
|
存储 安全 Linux
【开源指南】用二叉树实现高性能共享内存管理
本文介绍了一种使用C++实现的共享内存管理方案,通过借鉴Android property的设计思路,采用二叉树结构存储键值对,提高了数据检索效率。该方案包括设置和获取接口,支持多进程/线程安全,并提供了一个简单的测试示例验证其有效性。
52 5
|
1月前
|
缓存 负载均衡 Java
c++写高性能的任务流线程池(万字详解!)
本文介绍了一种高性能的任务流线程池设计,涵盖多种优化机制。首先介绍了Work Steal机制,通过任务偷窃提高资源利用率。接着讨论了优先级任务,使不同优先级的任务得到合理调度。然后提出了缓存机制,通过环形缓存队列提升程序负载能力。Local Thread机制则通过预先创建线程减少创建和销毁线程的开销。Lock Free机制进一步减少了锁的竞争。容量动态调整机制根据任务负载动态调整线程数量。批量处理机制提高了任务处理效率。此外,还介绍了负载均衡、避免等待、预测优化、减少复制等策略。最后,任务组的设计便于管理和复用多任务。整体设计旨在提升线程池的性能和稳定性。
72 5
|
1月前
|
JSON JavaScript 前端开发
使用 Node.js 和 Express 构建 RESTful API 服务器
【10月更文挑战第3天】使用 Node.js 和 Express 构建 RESTful API 服务器
|
23天前
|
JSON JavaScript 前端开发
使用JavaScript和Node.js构建简单的RESTful API服务器
【10月更文挑战第12天】使用JavaScript和Node.js构建简单的RESTful API服务器
15 0
|
28天前
|
监控 Java 关系型数据库
构建高效可靠的服务器端应用
【10月更文挑战第6天】构建高效可靠的服务器端应用
|
22天前
|
存储 编译器 对象存储
【C++打怪之路Lv5】-- 类和对象(下)
【C++打怪之路Lv5】-- 类和对象(下)
21 4
|
22天前
|
编译器 C语言 C++
【C++打怪之路Lv4】-- 类和对象(中)
【C++打怪之路Lv4】-- 类和对象(中)
19 4

热门文章

最新文章