【高并发内存池】第二篇:ThreadCache初步设计

简介: 线程缓存(ThreadCache)是每个线程独有的,用于完成线程内部小于256KB的空间申请场景。每个线程独享一个ThreadCache,线程从这里申请内存不需要加锁。

一. ThreadCache介绍


线程缓存(ThreadCache)是每个线程独有的,用于完成线程内部小于256KB的空间申请场景。每个线程独享一个ThreadCache,线程从这里申请内存不需要加锁。

图片.png

ThreadCache是开散列的哈希表结构,每个桶是一个定长大小内存块连接而成的自由链表:在这里图片.png


二. ThreadCache基本功能


1. 申请内存(Pop)


   当要申请的对象空间大小(记为bytes) <= 256KB时,先获取到线程本地存储的ThreadCache对象,计算bytes映射的自由链表桶的下标index。

   如果自由链表桶_freeLists[index]中有对象,则直接Pop一个小块定长内存返回。

   如果_freeLists[index]中没有对象,则批量从CentralCache中获取一定数量的小块定长内存,再把它们插入到相应的自由链表桶中,最后从这个桶中返回给外部一个小块定长内存。


2. 释放内存(Push)


   当要释放的对象空间(bytes)小于等于256KB时,将它们释放回ThreadCache。

   先计算bytes映射自由链表桶的位置index,然后将这块对象空间Push到_freeLists[index]中。

   当自由链表的长度过长,则回收一部分内存对象到CentralCahce中。


三. ThreadCache初步设计


1. ThreadCache基本框架


// ThreadCache本质是由一个哈希映射的已切分好的小块定长内存组成的自由链表的集合
class ThreadCache
{
public:
  // 申请一个小块定长内存
  void* Allocate(size_t bytes);
  // 释放一个小块定长内存
  void Deallocate(void* ptr, size_t bytes);
private:
  // 从中心缓存获取批量小块定长内存,并返回其中一个
  void* FetchFromCentralCache(size_t index, size_t alignBytes);
  FreeList _freeLists[NFREELIST]; // 哈希桶
};


ThreadCache只有一个成员变量ThreadCache::_freeLists[NFREELIST],它是一个哈希桶结构,元素是存储小块定长内存的自由链表,不同自由链表管理不同大小的小块定长内存,其中NFREELIST是桶的数量,具体它的值后面会讲。


下面是自由链表类(class FreeList)的声明:


// 返回传入空间的头4个或8个字节内容的引用
static inline void*& NextObj(void* obj)
{
  assert(obj);
  return *(void**)obj;
}
// 管理切分好的小块定长内存的自由链表
class FreeList
{
public:
  // 头插一个小块定长内存
  void PushFront(void* obj)
  {
  assert(obj);
  NextObj(obj) = _freeList;
  _freeList = obj;
  ++_size;
  }
  // 头删一个小块定长内存,并把它返还给外部
  void* PopFront()
  {
  assert(!Empty());
  void* obj = _freeList;
  _freeList = NextObj(obj);
  --_size;
  return obj;
  }
  // 判断自由链表是否为空
  bool Empty()
  {
  return _size == 0;
  }
private:
  size_t _size = 0;     // 小块定长内存的数量
  void* _freeList = nullptr; // 存储小块定长内存的自由链表
};

ThreadCache的两个成员函数ThreadCache::Allocate(...)和ThreadCache::Deallocate(...)分别用于线程内部自己申请、释放空间范围为:1byte ~ 256KB的内存空间,这两个函数的内部实现都是先通过传入的对象空间大小(bytes),去映射找到哈希桶中对应的下标为index的自由链表桶,然后再调用自由链表的成员函数FreeList::PopFront()和FreeList::PushFront(void* obj)来申请、释放小块定长空间。


2. 对象空间和哈希桶定长块空间的映射、对齐关系


当线程内部申请内存的大小 <= 256KB时,我们都是从线程专属的ThreadCache中去申请,每一个自由链表桶中存储的都是一个个固定大小的小块定长内存。


如果以字节为单位每个字节都去创建一个自由链表桶的话总共需要256 * 1024个桶,这样分得太细但利用率却不高,很多桶用不着,导致空间浪费:

图片.png

下面设计了一套关于传入的对象空间(bytes)和自由链表桶中存储的小块定长内存(alignBytes)的对齐规则:

图片.png


下面举例关于8字节对齐方式的理解:


   外部申请的是1字节空间,我们给它8字节空间,有7字节的空间浪费。

   外部申请的是4字节空间,我们给它8字节空间,有4字节的空间浪费。

   外部申请的是8字节空间,我们给它8字节空间,没有空间浪费。

   外部申请的是10字节空间,我们给它16字节空间,有6字节的空间浪费。

   外部申请的是15字节空间,我们给它16字节空间,有1字节的空间浪费。


根据上面对齐规则得到的哈希桶结构如下图所示:

图片.png

一开始以8字节对齐的原因是指针大小最大在64位平台下是8字节,所以一开始以8字节对齐,这样确保每一个未被使用的小块定长内存能够存储它下一个小块定长内存的地址。


虽然这种对齐方法会造成内碎片空间浪费的问题,但整体控制在最多11%左右的内碎片,而且最终桶的数量也较少,便于管理。


最多11%左右的内碎片这个是怎么的得来的呢?


需要说明的是,1~128这个区间太小了我们不做讨论,想象一下:1字节就算是对齐到2字节也有百分之五十的浪费率,所以我们就从第二个区间开始进行计算。


内碎片占用率 = (浪费的空间 / 总空间大小) * 100%


   对于 [128+1, 1024] 这个范围,内碎片占用率最大是 15/(128+16) ≈ 11%

   对于 [1024+1, 8*1024] 这个范围,内碎片占用率最大是 127/(1024+128) ≈ 11%

   对于 [81024+1, 641024] 这个范围,内碎片占用率最大是 1023/(8*1024+1024) ≈ 11%

   对于 [641024+1, 2561024 ]这个范围,内碎片占用率最大是 (8 * 1024-1)/(64 * 1024+8*1024) ≈ 11%


下面我们专门封装一个类来计算传入的对象空间(bytes)和自由链表桶存储的小块定长内存(alignBytes)之间的映射、对齐关系:


// 计算传入对象大小和小块定长内存的对齐和映射规则
class SizeClass
{
public:
  // 根据传入对象空间大小(bytes)
  // 来计算对齐后得到的小块定长内存的大小(alignBytes)
  static inline size_t RoundUp(size_t bytes)
  {
  if (bytes <= 128)
  {
    return _RoundUp(bytes, 8);
  }
  else if (bytes <= 1024)
  {
    return _RoundUp(bytes, 16);
  }
  else if (bytes <= 8 * 1024)
  {
    return _RoundUp(bytes, 128);
  }
  else if (bytes <= 64 * 1024)
  {
    return _RoundUp(bytes, 1024);
  }
  else if (bytes <= 256 * 1024)
  {
    return _RoundUp(bytes, 8 * 1024);
  }
  else
  {
    assert("ThreadCache over 256k\n");
    return -1;
  }
  }
  // 根据传入对象空间大小(bytes)
  // 计算映射到哪一个自由链表桶
  static inline size_t Index(size_t bytes)
  {
  // 记录每一个对齐区间有多少个桶
  static int group_array[4] = { 16, 56, 56, 56 };
  // 根据传入的对象大小映射得到自由链表桶对应的下标
  if (bytes <= 128) {
    return _Index(bytes, 3);
  }
  else if (bytes <= 1024) {
    return _Index(bytes - 128, 4) + group_array[0];
  }
  else if (bytes <= 8 * 1024) {
    return _Index(bytes - 1024, 7) + group_array[1] + group_array[0];
  }
  else if (bytes <= 64 * 1024) {
    return _Index(bytes - 8 * 1024, 10) + group_array[2] + group_array[1] + group_array[0];
  }
  else if (bytes <= 256 * 1024) {
    return _Index(bytes - 64 * 1024, 13) + group_array[3] + group_array[2] + group_array[1] + group_array[0];
  }
  else {
    assert("ThreadCache over 256k\n");
    return -1;
  }
  }
private:
  // 子函数
  static inline size_t _RoundUp(size_t bytes, size_t alignNum)
  {
  return ((bytes + alignNum - 1) & ~(alignNum - 1));
  }
  static inline size_t _Index(size_t bytes, size_t align_shift)
  {
  return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;
  }
};


   调用SizeClass::RoundUp(size_t bytes)函数,只需传入想要申请的对象空间的大小(bytes),就会返回对齐后申请到的实际小块定长内存的大小(alignBytes)。(其中:对象空间大小 <= 申请到的小块定长内存大小)

   调用SizeClass::Index(size_t bytes)函数,只需传入想要申请的对象空间的大小(bytes),就会返回映射到的自由链表桶的下标index。


   关于 _RoundUp(…) 子函数


该子函数通过对齐数计算出对象空间对齐后的字节数,最容易想到的就是下面这种写法:


static inline size_t _RoundUp(size_t bytes, size_t alignNum)
{
  size_t alignSize = 0;
  if (bytes%alignNum != 0)
  {
  alignSize = (bytes / alignNum + 1)*alignNum;
  }
  else
  {
  alignSize = bytes;
  }
  return alignSize;
}



除了上述写法,我们还可以通过位运算的方式来进行计算,虽然位运算可能并没有上面的写法容易理解,但计算机执行位运算的速度是比执行乘法和除法更快的:


//位运算写法
static inline size_t _RoundUp(size_t bytes, size_t alignNum)
{
  return ((bytes + alignNum - 1)&~(alignNum - 1));
}

 

   关于 _Index(…) 子函数


该子函数通过对齐数和对象空间的大小计算出要申请的小块定长内存所对应的哈希桶下标,下面是它的一般写法:


static inline size_t _Index(size_t bytes, size_t alignNum)
{
  size_t index = 0;
  if (bytes%alignNum != 0)
  {
  index = bytes / alignNum;
  }
  else
  {
  index = bytes / alignNum - 1;
  }
  return index;
}

当然,为了提高效率下面也提供了一个用位运算来解决的方法,需要注意的是,此时我们并不是传入该字节数的对齐数,而是将对齐数写成2的n次方的形式后,将这个n值进行传入。比如对齐数是8,传入的就是3。


//位运算写法
static inline size_t _Index(size_t bytes, size_t alignShift)
{
  return ((bytes + (1 << alignShift) - 1) >> alignShift) - 1;
}

 

   为什么SizeClass类中的成员函数都要设置为static和inline?


SizeClass类中包括如下四个成员函数:


   size_t RoundUp(size_t bytes)

   size_t _RoundUp(size_t bytes, size_t alignNum)

   size_t Index(size_t bytes)

   size_t _Index(size_t bytes, size_t alignShift)


我们把它们都设置为static,这样就可以在外部通过类名直接调用函数,而不用再去创建一个SizeClass类的对象去调用;把它们都设置为inline,因为它们内部代码比较短而且逻辑简单,调用时直接展开,避免再去创建函数栈帧,达到提高效率的效果。


四. ThreadCahe初步设计的代码整合


文件分装:

图片.png

Common.h

主要用于存放:


   公共头文件、公共函数、const常量、展开的命名空间等。

   管理切分好的小块定长内存的自由链表桶类:class FreeList。

   处理对象空间对齐、映射规则的类:calss SizeClass。


#pragma once
#include <thread>
#include <assert.h>
#include <iostream>
using std::cout;
using std::endl;
static const size_t NFREELIST = 208;        // 小块定长内存自由链表桶的数量
static const size_t MAX_BYTES = 256 * 1024; // ThreadCache可申请的最大内存空间
// 返回传入空间的头4个或8个字节内容的引用
static inline void*& NextObj(void* obj)
{
  assert(obj);
  return *(void**)obj;
}
// 管理切分好的小块定长内存的自由链表
class FreeList
{
public:
  // 头插一个小块定长内存
  void PushFront(void* obj)
  {
  assert(obj);
  NextObj(obj) = _freeList;
  _freeList = obj;
  ++_size;
  }
  // 头删一个小块定长内存
  void* PopFront()
  {
  assert(!Empty());
  void* obj = _freeList;
  _freeList = NextObj(obj);
  --_size;
  return obj;
  }
  // 判断自由链表是否为空
  bool Empty()
  {
  return _size == 0;
  }
private:
  size_t _size = 0;     // 小块定长内存的数数量
  void* _freeList = nullptr; // 存储小块定长内存的自由链表
};
// 计算传入对象大小和小块定长内存的对齐映射规则
class SizeClass
{
public:
  // 传入对象空间大小(bytes)来计算对齐后得到的小块定长内存的大小(alignBytes)
  static inline size_t RoundUp(size_t bytes)
  {
  if (bytes <= 128)
  {
    return _RoundUp(bytes, 8);
  }
  else if (bytes <= 1024)
  {
    return _RoundUp(bytes, 16);
  }
  else if (bytes <= 8 * 1024)
  {
    return _RoundUp(bytes, 128);
  }
  else if (bytes <= 64 * 1024)
  {
    return _RoundUp(bytes, 1024);
  }
  else if (bytes <= 256 * 1024)
  {
    return _RoundUp(bytes, 8 * 1024);
  }
  else
  {
    assert("ThreadCache over 256k\n");
    return -1;
  }
  }
  // 传入对象空间大小(bytes)来计算映射到哪一个自由链表桶
  static inline size_t Index(size_t bytes)
  {
  // 记录每一个对齐区间有多少个桶
  static int group_array[4] = { 16, 56, 56, 56 };
  // 根据传入的对象大小映射得到自由链表桶对应的下标
  if (bytes <= 128) {
    return _Index(bytes, 3);
  }
  else if (bytes <= 1024) {
    return _Index(bytes - 128, 4) + group_array[0];
  }
  else if (bytes <= 8 * 1024) {
    return _Index(bytes - 1024, 7) + group_array[1] + group_array[0];
  }
  else if (bytes <= 64 * 1024) {
    return _Index(bytes - 8 * 1024, 10) + group_array[2] + group_array[1] + group_array[0];
  }
  else if (bytes <= 256 * 1024) {
    return _Index(bytes - 64 * 1024, 13) + group_array[3] + group_array[2] + group_array[1] + group_array[0];
  }
  else {
    assert("ThreadCache over 256k\n");
    return -1;
  }
  }
private:
  // 子函数
  static inline size_t _RoundUp(size_t bytes, size_t alignNum)
  {
  return ((bytes + alignNum - 1) & ~(alignNum - 1));
  }
  static inline size_t _Index(size_t bytes, size_t align_shift)
  {
  return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;
  }
};


ThreadCache.h


存放ThreadCache类的声明,它成员函数的实现单独放到另外一个文件里,因为ThreadCache.h这个文件后面还会被其它文件包含,而其它文件只需要用它的声明就够了。


#pragma once 
#include "Common.h"
// ThreadCache本质是由一个哈希映射的已切分好的小块定长内存组成的自由链表集合
class ThreadCache
{
public:
  // 申请一个小块定长内存
  void* Allocate(size_t bytes);
  // 释放一个小块定长内存
  void Deallocate(void* ptr, size_t bytes);
private:
  // 从中心缓存获取批量小块定长内存,并返回其中一个
  void* FetchFromCentralCache(size_t index, size_t alignBytes);
  FreeList _freeLists[NFREELIST]; // 哈希桶
};
// TLS thread local storage
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;


ThreadCache.cpp


ThreadCache类的成员函数的具体实现放到这里:


#include "ThreadCache.h"
// 申请一个小块定长内存
void* ThreadCache::Allocate(size_t bytes)
{
  // ThreadCache只能申请小于等于256KB的对象空间
  assert(bytes <= MAX_BYTES);
  // 1、确定对齐后的小块定长内存大小以及它映射到那个自由链表桶
  size_t index = SizeClass::Index(bytes);
  size_t alignBytes = SizeClass::RoundUp(bytes);
  // 2、如果自由链表桶不空的话,把第一个小块定长内存取出来
  //    如果桶中没有小块定长内存,就到CentralCache中去批量申请
  if (!_freeLists[index].Empty())
  {
  return _freeLists[index].PopFront();
  }
  else
  {
  return FetchFromCentralCache(index, alignBytes);
  }
}
// 释放一个小块定长内存
void ThreadCache::Deallocate(void* ptr, size_t bytes)
{
  assert(ptr);
  assert(bytes <= MAX_BYTES);
  // 1、计算映射到哪一个自由链表桶
  size_t index = SizeClass::Index(bytes);
  // 2、把小块定长内存头插到自由链表桶中
  _freeLists[index].PushFront(ptr);
}
// 从中心缓存获取“一批”小块定长内存,并返回给外部“一块”小块定长内存
void* ThreadCache::FetchFromCentralCache(size_t index, size_t alignBytes)
{
  //...
  return nullptr;
}



五. ThreadCaheTLS无锁访问测试


线程局部存储(TLS),是一种变量的存储方法,这个变量在它所在的线程内是全局可访问的,但是不能被其他线程访问到,这样就保持了数据的线程独立性。而熟知的全局变量,是所有线程都可以访问的,这样就不可避免需要锁来控制,增加了控制成本和代码复杂度。通过TLS,可以每个线程可以无锁地获取自己的专属的ThreadCache对象。


测试TLS还需要新增两个文件,下图通过星号标识的就是这两个文件:

图片.png

ConcurrentAlloc.h


创建TLS专门封装的两个函数:


#pragma once
#include "Common.h"
#include "ThreadCache.h"
static void* ConcurrentAlloc(size_t bytes)
{
  // 通过TLS,每个线程无锁的获取自己的专属的ThreadCache对象
  if (pTLSThreadCache == nullptr)
  {
  pTLSThreadCache = new ThreadCache;
  }
  cout << std::this_thread::get_id() << ":" << pTLSThreadCache << endl;
  return pTLSThreadCache->Allocate(bytes);
}
static void ConcurrentFree(void* ptr, size_t bytes)
{
  assert(pTLSThreadCache);
  pTLSThreadCache->Deallocate(ptr, bytes);
}


Test.cpp

创建两个新线程,分别打印自己的TLS:

#include "ConcurrentAlloc.h"
void Alloc1()
{
  for (size_t i = 0; i < 5; ++i)
  {
  void* ptr = ConcurrentAlloc(6);
  }
}
void Alloc2()
{
  for (size_t i = 0; i < 5; ++i)
  {
  void* ptr = ConcurrentAlloc(7);
  }
}
void TLSTest()
{
  std::thread t1(Alloc1);
  t1.join();
  std::thread t2(Alloc2);
  t2.join();
}
int main()
{
  TLSTest();
  return 0;
}


编译运行,可以看到两个线程都有各自独立的TLS:

图片.png


相关文章
|
存储 缓存 安全
高并发内存池实战:用C++构建高性能服务器(下)
高并发内存池实战:用C++构建高性能服务器
高并发内存池实战:用C++构建高性能服务器(下)
|
13天前
|
监控 Java 数据库连接
线程池在高并发下如何防止内存泄漏?
线程池在高并发下如何防止内存泄漏?
|
3月前
|
存储 缓存 NoSQL
Redis内存管理揭秘:掌握淘汰策略,让你的数据库在高并发下也能游刃有余,守护业务稳定运行!
【8月更文挑战第22天】Redis的内存淘汰策略管理内存使用,防止溢出。主要包括:noeviction(拒绝新写入)、LRU/LFU(淘汰最少使用/最不常用数据)、RANDOM(随机淘汰)及TTL(淘汰接近过期数据)。策略选择需依据应用场景、数据特性和性能需求。可通过Redis命令行工具或配置文件进行设置。
72 2
|
5月前
|
存储 缓存 NoSQL
Redis是一种高性能的内存数据库,常用于高并发环境下的缓存解决方案
【6月更文挑战第18天】**Redis摘要:** 高性能内存数据库,擅长高并发缓存。数据存内存,访问迅速;支持字符串、列表等多元数据类型;具备持久化防止数据丢失;丰富命令集便于操作;通过节点集群实现数据分片与负载均衡,增强可用性和扩展性。理想的缓存解决方案。
75 1
|
4月前
|
设计模式 安全 NoSQL
Java面试题:设计一个线程安全的单例模式,并解释其内存占用和垃圾回收机制;使用生产者消费者模式实现一个并发安全的队列;设计一个支持高并发的分布式锁
Java面试题:设计一个线程安全的单例模式,并解释其内存占用和垃圾回收机制;使用生产者消费者模式实现一个并发安全的队列;设计一个支持高并发的分布式锁
64 0
|
4月前
|
设计模式 安全 Java
Java面试题:如何实现一个线程安全的单例模式,并确保其在高并发环境下的内存管理效率?如何使用CyclicBarrier来实现一个多阶段的数据处理任务,确保所有阶段的数据一致性?
Java面试题:如何实现一个线程安全的单例模式,并确保其在高并发环境下的内存管理效率?如何使用CyclicBarrier来实现一个多阶段的数据处理任务,确保所有阶段的数据一致性?
56 0
|
4月前
|
设计模式 存储 缓存
Java面试题:结合建造者模式与内存优化,设计一个可扩展的高性能对象创建框架?利用多线程工具类与并发框架,实现一个高并发的分布式任务调度系统?设计一个高性能的实时事件通知系统
Java面试题:结合建造者模式与内存优化,设计一个可扩展的高性能对象创建框架?利用多线程工具类与并发框架,实现一个高并发的分布式任务调度系统?设计一个高性能的实时事件通知系统
51 0
|
4月前
|
存储 安全 Java
Java面试题:假设你正在开发一个Java后端服务,该服务需要处理高并发的用户请求,并且对内存使用效率有严格的要求,在多线程环境下,如何确保共享资源的线程安全?
Java面试题:假设你正在开发一个Java后端服务,该服务需要处理高并发的用户请求,并且对内存使用效率有严格的要求,在多线程环境下,如何确保共享资源的线程安全?
66 0
|
6月前
|
缓存 Java 程序员
【项目日记(一)】高并发内存池项目介绍
【项目日记(一)】高并发内存池项目介绍
【项目日记(一)】高并发内存池项目介绍
|
6月前
|
SQL 资源调度 关系型数据库
实时计算 Flink版产品使用合集之可以使用高并发大内存的方式读取存量数据吗
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
104 3

热门文章

最新文章

  • 1
    高并发场景下,到底先更新缓存还是先更新数据库?
    59
  • 2
    Java面试题:解释Java NIO与BIO的区别,以及NIO的优势和应用场景。如何在高并发应用中实现NIO?
    66
  • 3
    Java面试题:设计一个线程安全的单例模式,并解释其内存占用和垃圾回收机制;使用生产者消费者模式实现一个并发安全的队列;设计一个支持高并发的分布式锁
    64
  • 4
    Java面试题:如何实现一个线程安全的单例模式,并确保其在高并发环境下的内存管理效率?如何使用CyclicBarrier来实现一个多阶段的数据处理任务,确保所有阶段的数据一致性?
    56
  • 5
    Java面试题:结合建造者模式与内存优化,设计一个可扩展的高性能对象创建框架?利用多线程工具类与并发框架,实现一个高并发的分布式任务调度系统?设计一个高性能的实时事件通知系统
    51
  • 6
    Java面试题:假设你正在开发一个Java后端服务,该服务需要处理高并发的用户请求,并且对内存使用效率有严格的要求,在多线程环境下,如何确保共享资源的线程安全?
    66
  • 7
    在Java中实现高并发的数据访问控制
    39
  • 8
    使用Java构建一个高并发的网络服务
    26
  • 9
    微服务06----Eureka注册中心,微服务的两大服务,订单服务和用户服务,订单服务需要远程调用我们的用,户服务,消费者,如果环境改变,硬编码问题就会随之产生,为了应对高并发,我们可能会部署成一个集
    36
  • 10
    如何设计一个秒杀系统,(高并发高可用分布式集群)
    122