内存“银行”(三)

简介: 内存“银行”(三)

threadcache回收内存

       当某个线程申请的对象不用了,可以将其释放给thread cache,然后thread cache将该对象插入到对应哈希桶的自由链表当中即可。

  但是随着线程不断的释放,对应自由链表的长度也会越来越长,这些内存堆积在一个thread cache中就是一种浪费,我们应该将这些内存还给central cache,这样一来,这些内存对其他线程来说也是可申请的,因此当thread cache某个桶当中的自由链表太长时我们可以进行一些处理。

  如果thread cache某个桶当中自由链表的长度超过它一次批量向central cache申请的对象个数,那么此时我们就要把该自由链表当中的这些对象还给central cache。

//释放内存对象
void ThreadCache::Deallocate(void* ptr, size_t size)
{
    assert(ptr);
    assert(size <= MAX_BYTES);
    //找出对应的自由链表桶将对象插入
    size_t index = SizeClass::Index(size);
    _freeLists[index].Push(ptr);
    //当自由链表长度大于一次批量申请的对象个数时就开始还一段list给central cache
    if (_freeLists[index].Size() >= _freeLists[index].MaxSize())
    {
        ListTooLong(_freeLists[index], size);
    }
}

       当自由链表的长度大于一次批量申请的对象时,我们具体的做法就是,从该自由链表中取出一次批量个数的对象,然后将取出的这些对象还给central cache中对应的span即可。

//释放对象导致链表过长,回收内存到中心缓存
void ThreadCache::ListTooLong(FreeList& list, size_t size)
{
    void* start = nullptr;
    void* end = nullptr;
    //从list中取出一次批量个数的对象
    list.PopRange(start, end, list.MaxSize());
    //将取出的对象还给central cache中对应的span
    CentralCache::GetInstance()->ReleaseListToSpans(start, size);
}

       从上述代码可以看出,FreeList类需要支持用Size函数获取自由链表中对象的个数,还需要支持用PopRange函数从自由链表中取出指定个数的对象。因此我们需要给FreeList类增加一个对应的PopRange函数,然后再增加一个_size成员变量,该成员变量用于记录当前自由链表中对象的个数,当我们向自由链表插入或删除对象时,都应该更新_size的值。

//管理切分好的小对象的自由链表
class FreeList
{
public:
    //将释放的对象头插到自由链表
    void Push(void* obj)
    {
        assert(obj);
        //头插
        NextObj(obj) = _freeList;
        _freeList = obj;
        _size++;
    }
    //从自由链表头部获取一个对象
    void* Pop()
    {
        assert(_freeList);
        //头删
        void* obj = _freeList;
        _freeList = NextObj(_freeList);
        _size--;
        return obj;
    }
    //插入一段范围的对象到自由链表
    void PushRange(void* start, void* end, size_t n)
    {
        assert(start);
        assert(end);
        //头插
        NextObj(end) = _freeList;
        _freeList = start;
        _size += n;
    }
    //从自由链表获取一段范围的对象
    void PopRange(void*& start, void*& end, size_t n)
    {
        assert(n <= _size);
        //头删
        start = _freeList;
        end = start;
        for (size_t i = 0; i < n - 1;i++)
        {
            end = NextObj(end);
        }
        _freeList = NextObj(end); //自由链表指向end的下一个对象
        NextObj(end) = nullptr; //取出的一段链表的表尾置空
        _size -= n;
    }
    bool Empty()
    {
        return _freeList == nullptr;
    }
    size_t& MaxSize()
    {
        return _maxSize;
    }
    size_t Size()
    {
        return _size;
    }
private:
    void* _freeList = nullptr; //自由链表
    size_t _maxSize = 1;
    size_t _size = 0;
};

       而对于FreeList类当中的PushRange成员函数,我们最好也像PopRange一样给它增加一个参数,表示插入对象的个数,不然我们这时还需要通过遍历统计插入对象的个数。

  因此之前在调用PushRange的地方就需要修改一下,而我们实际就在一个地方调用过PushRange函数,并且此时插入对象的个数也是很容易知道的。当时thread cache从central cache获取了actualNum个对象,将其中的一个返回给了申请对象的线程,剩下的actualNum-1个挂到了thread cache对应的桶当中,所以这里插入对象的个数就是actualNum-1。

_freeLists[index].PushRange(NextObj(start), end, actualNum - 1);

说明一下:

  当thread cache的某个自由链表过长时,我们实际就是把这个自由链表当中全部的对象都还给central cache了,但这里在设计PopRange接口时还是设计的是取出指定个数的对象,因为在某些情况下当自由链表过长时,我们可能并不一定想把链表中全部的对象都取出来还给central cache,这样设计就是为了增加代码的可修改性。

  其次,当我们判断thread cache是否应该还对象给central cache时,还可以综合考虑每个thread cache整体的大小。比如当某个thread cache的总占用大小超过一定阈值时,我们就将该thread cache当中的对象还一些给central cache,这样就尽量避免了某个线程的thread cache占用太多的内存。对于这一点,在tcmalloc当中就是考虑到了的。

 

centralcache回收内存

       当thread cache中某个自由链表太长时,会将自由链表当中的这些对象还给central cache中的span。

  但是需要注意的是,还给central cache的这些对象不一定都是属于同一个span的。central cache中的每个哈希桶当中可能都不止一个span,因此当我们计算出还回来的对象应该还给central cache的哪一个桶后,还需要知道这些对象到底应该还给这个桶当中的哪一个span。

如何根据对象的地址得到对象所在的页号?

  首先我们必须理解的是,某个页当中的所有地址除以页的大小都等该页的页号。比如我们这里假设一页的大小是100,那么地址0~99都属于第0页,它们除以100都等于0,而地址100~199都属于第1页,它们除以100都等于1。

如何找到一个对象对应的span?

  虽然我们现在可以通过对象的地址得到其所在的页号,但是我们还是不能知道这个对象到底属于哪一个span。因为一个span管理的可能是多个页。

  为了解决这个问题,我们可以建立页号和span之间的映射。由于这个映射关系在page cache进行span的合并时也需要用到,因此我们直接将其存放到page cache里面。这时我们就需要在PageCache类当中添加一个映射关系了,这里可以用C++当中的unordered_map进行实现,并且添加一个函数接口,用于让central cache获取这里的映射关系。(下面代码中只展示了PageCache类当中新增的成员)

//单例模式
class PageCache
{
public:
    //获取从对象到span的映射
    Span* MapObjectToSpan(void* obj);
private:
    std::unordered_map<PAGE_ID, Span*> _idSpanMap;
};

       每当page cache分配span给central cache时,都需要记录一下页号和span之间的映射关系。此后当thread cache还对象给central cache时,才知道应该具体还给哪一个span。

  因此当central cache在调用NewSpan接口向page cache申请k页的span时,page cache在返回这个k页的span给central cache之前,应该建立这k个页号与该span之间的映射关系。

//获取一个k页的span
Span* PageCache::NewSpan(size_t k)
{
    assert(k > 0 && k < NPAGES);
    //先检查第k个桶里面有没有span
    if (!_spanLists[k].Empty())
    {
        Span* kSpan = _spanLists[k].PopFront();
        //建立页号与span的映射,方便central cache回收小块内存时查找对应的span
        for (PAGE_ID i = 0; i < kSpan->_n; i++)
        {
            _idSpanMap[kSpan->_pageId + i] = kSpan;
        }
        return kSpan;
    }
    //检查一下后面的桶里面有没有span,如果有可以将其进行切分
    for (size_t i = k + 1; i < NPAGES; i++)
    {
        if (!_spanLists[i].Empty())
        {
            Span* nSpan = _spanLists[i].PopFront();
            Span* kSpan = new Span;
            //在nSpan的头部切k页下来
            kSpan->_pageId = nSpan->_pageId;
            kSpan->_n = k;
            nSpan->_pageId += k;
            nSpan->_n -= k;
            //将剩下的挂到对应映射的位置
            _spanLists[nSpan->_n].PushFront(nSpan);
            //建立页号与span的映射,方便central cache回收小块内存时查找对应的span
            for (PAGE_ID i = 0; i < kSpan->_n; i++)
            {
                _idSpanMap[kSpan->_pageId + i] = kSpan;
            }
            return kSpan;
        }
    }
    //走到这里说明后面没有大页的span了,这时就向堆申请一个128页的span
    Span* bigSpan = new Span;
    void* ptr = SystemAlloc(NPAGES - 1);
    bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
    bigSpan->_n = NPAGES - 1;
    _spanLists[bigSpan->_n].PushFront(bigSpan);
    //尽量避免代码重复,递归调用自己
    return NewSpan(k);
}

       此时我们就可以通过对象的地址找到该对象对应的span了,直接将该对象的地址除以页的大小得到页号,然后在unordered_map当中找到其对应的span即可。

//获取从对象到span的映射
Span* PageCache::MapObjectToSpan(void* obj)
{
    PAGE_ID id = (PAGE_ID)obj >> PAGE_SHIFT; //页号
    auto ret = _idSpanMap.find(id);
    if (ret != _idSpanMap.end())
    {
        return ret->second;
    }
    else
    {
        assert(false);
        return nullptr;
    }
}

       注意一下,当我们要通过某个页号查找其对应的span时,该页号与其span之间的映射一定是建立过的,如果此时我们没有在unordered_map当中找到,则说明我们之前的代码逻辑有问题,因此当没有找到对应的span时可以直接用断言结束程序,以表明程序逻辑出错。

central cache回收内存

  这时当thread cache还对象给central cache时,就可以依次遍历这些对象,将这些对象插入到其对应span的自由链表当中,并且及时更新该span的_usseCount计数即可。

  在thread cache还对象给central cache的过程中,如果central cache中某个span的_useCount减到0时,说明这个span分配出去的对象全部都还回来了,那么此时就可以将这个span再进一步还给page cache。

//将一定数量的对象还给对应的span
void CentralCache::ReleaseListToSpans(void* start, size_t size)
{
    size_t index = SizeClass::Index(size);
    _spanLists[index]._mtx.lock(); //加锁
    while (start)
    {
        void* next = NextObj(start); //记录下一个
        Span* span = PageCache::GetInstance()->MapObjectToSpan(start);
        //将对象头插到span的自由链表
        NextObj(start) = span->_freeList;
        span->_freeList = start;
        span->_useCount--; //更新被分配给thread cache的计数
        if (span->_useCount == 0) //说明这个span分配出去的对象全部都回来了
        {
            //此时这个span就可以再回收给page cache,page cache可以再尝试去做前后页的合并
            _spanLists[index].Erase(span);
            span->_freeList = nullptr; //自由链表置空
            span->_next = nullptr;
            span->_prev = nullptr;
            //释放span给page cache时,使用page cache的锁就可以了,这时把桶锁解掉
            _spanLists[index]._mtx.unlock(); //解桶锁
            PageCache::GetInstance()->_pageMtx.lock(); //加大锁
            PageCache::GetInstance()->ReleaseSpanToPageCache(span);
            PageCache::GetInstance()->_pageMtx.unlock(); //解大锁
            _spanLists[index]._mtx.lock(); //加桶锁
        }
        start = next;
    }
    _spanLists[index]._mtx.unlock(); //解锁
}

       需要注意,如果要把某个span还给page cache,我们需要先将这个span从central cache对应的双链表中移除,然后再将该span的自由链表置空,因为page cache中的span是不需要切分成一个个的小对象的,以及该span的前后指针也都应该置空,因为之后要将其插入到page cache对应的双链表中。但span当中记录的起始页号以及它管理的页数是不能清除的,否则对应内存块就找不到了。

  并且在central cache还span给page cache时也存在锁的问题,此时需要先将central cache中对应的桶锁解掉,然后再加上page cache的大锁之后才能进入page cache进行相关操作,当处理完毕回到central cache时,除了将page cache的大锁解掉,还需要立刻获得central cache对应的桶锁,然后将还未还完对象继续还给central cache中对应的span。

pagecache回收内存

       如果central cache中有某个span的_useCount减到0了,那么central cache就需要将这个span还给page cache了。

  这个过程看似是非常简单的,page cache只需将还回来的span挂到对应的哈希桶上就行了。但实际为了缓解内存碎片的问题,page cache还需要尝试将还回来的span与其他空闲的span进行合并。

page cache进行前后页的合并

  合并的过程可以分为向前合并和向后合并。如果还回来的span的起始页号是num,该span所管理的页数是n。那么在向前合并时,就需要判断第num-1页对应span是否空闲,如果空闲则可以将其进行合并,并且合并后还需要继续向前尝试进行合并,直到不能进行合并为止。而在向后合并时,就需要判断第num+n页对应的span是否空闲,如果空闲则可以将其进行合并,并且合并后还需要继续向后尝试进行合并,直到不能进行合并为止。

  因此page cache在合并span时,是需要通过页号获取到对应的span的,这就是我们要把页号与span之间的映射关系存储到page cache的原因。

  但需要注意的是,当我们通过页号找到其对应的span时,这个span此时可能挂在page cache,也可能挂在central cache。而在合并时我们只能合并挂在page cache的span,因为挂在central cache的span当中的对象正在被其他线程使用。

  可是我们不能通过span结构当中的_useCount成员,来判断某个span到底是在central cache还是在page cache。因为当central cache刚向page cache申请到一个span时,这个span的_useCount就是等于0的,这时可能当我们正在对该span进行切分的时候,page cache就把这个span拿去进行合并了,这显然是不合理的。

  鉴于此,我们可以在span结构中再增加一个_isUse成员,用于标记这个span是否正在被使用,而当一个span结构被创建时我们默认该span是没有被使用的。

//管理以页为单位的大块内存
struct Span
{
    PAGE_ID _pageId = 0;        //大块内存起始页的页号
    size_t _n = 0;              //页的数量
    Span* _next = nullptr;      //双链表结构
    Span* _prev = nullptr;
    size_t _useCount = 0;       //切好的小块内存,被分配给thread cache的计数
    void* _freeList = nullptr;  //切好的小块内存的自由链表
    bool _isUse = false;        //是否在被使用
};

       因此当central cache向page cache申请到一个span时,需要立即将该span的_isUse改为true。

span->_isUse = true;

       而当central cache将某个span还给page cache时,也就需要将该span的_isUse改成false。

span->_isUse = false;

       由于在合并page cache当中的span时,需要通过页号找到其对应的span,而一个span是在被分配给central cache时,才建立的各个页号与span之间的映射关系,因此page cache当中的span也需要建立页号与span之间的映射关系。

  与central cache中的span不同的是,在page cache中,只需建立一个span的首尾页号与该span之间的映射关系。因为当一个span在尝试进行合并时,如果是往前合并,那么只需要通过一个span的尾页找到这个span,如果是向后合并,那么只需要通过一个span的首页找到这个span。也就是说,在进行合并时我们只需要用到span与其首尾页之间的映射关系就够了。

  因此当我们申请k页的span时,如果是将n页的span切成了一个k页的span和一个n-k页的span,我们除了需要建立k页span中每个页与该span之间的映射关系之外,还需要建立剩下的n-k页的span与其首尾页之间的映射关系。

//获取一个k页的span
Span* PageCache::NewSpan(size_t k)
{
    assert(k > 0 && k < NPAGES);
    //先检查第k个桶里面有没有span
    if (!_spanLists[k].Empty())
    {
        Span* kSpan = _spanLists[k].PopFront();
        //建立页号与span的映射,方便central cache回收小块内存时查找对应的span
        for (PAGE_ID i = 0; i < kSpan->_n; i++)
        {
            _idSpanMap[kSpan->_pageId + i] = kSpan;
        }
        return kSpan;
    }
    //检查一下后面的桶里面有没有span,如果有可以将其进行切分
    for (size_t i = k + 1; i < NPAGES; i++)
    {
        if (!_spanLists[i].Empty())
        {
            Span* nSpan = _spanLists[i].PopFront();
            Span* kSpan = new Span;
            //在nSpan的头部切k页下来
            kSpan->_pageId = nSpan->_pageId;
            kSpan->_n = k;
            nSpan->_pageId += k;
            nSpan->_n -= k;
            //将剩下的挂到对应映射的位置
            _spanLists[nSpan->_n].PushFront(nSpan);
            //存储nSpan的首尾页号与nSpan之间的映射,方便page cache合并span时进行前后页的查找
            _idSpanMap[nSpan->_pageId] = nSpan;
            _idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan;
            //建立页号与span的映射,方便central cache回收小块内存时查找对应的span
            for (PAGE_ID i = 0; i < kSpan->_n; i++)
            {
                _idSpanMap[kSpan->_pageId + i] = kSpan;
            }
            return kSpan;
        }
    }
    //走到这里说明后面没有大页的span了,这时就向堆申请一个128页的span
    Span* bigSpan = new Span;
    void* ptr = SystemAlloc(NPAGES - 1);
    bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
    bigSpan->_n = NPAGES - 1;
    _spanLists[bigSpan->_n].PushFront(bigSpan);
    //尽量避免代码重复,递归调用自己
    return NewSpan(k);
}

       此时page cache当中的span就都与其首尾页之间建立了映射关系,现在我们就可以进行span的合并了,其合并逻辑如下:

//释放空闲的span回到PageCache,并合并相邻的span
void PageCache::ReleaseSpanToPageCache(Span* span)
{
    //对span的前后页,尝试进行合并,缓解内存碎片问题
    //1、向前合并
    while (1)
    {
        PAGE_ID prevId = span->_pageId - 1;
        auto ret = _idSpanMap.find(prevId);
        //前面的页号没有(还未向系统申请),停止向前合并
        if (ret == _idSpanMap.end())
        {
            break;
        }
        //前面的页号对应的span正在被使用,停止向前合并
        Span* prevSpan = ret->second;
        if (prevSpan->_isUse == true)
        {
            break;
        }
        //合并出超过128页的span无法进行管理,停止向前合并
        if (prevSpan->_n + span->_n > NPAGES - 1)
        {
            break;
        }
        //进行向前合并
        span->_pageId = prevSpan->_pageId;
        span->_n += prevSpan->_n;
        //将prevSpan从对应的双链表中移除
        _spanLists[prevSpan->_n].Erase(prevSpan);
        delete prevSpan;
    }
    //2、向后合并
    while (1)
    {
        PAGE_ID nextId = span->_pageId + span->_n;
        auto ret = _idSpanMap.find(nextId);
        //后面的页号没有(还未向系统申请),停止向后合并
        if (ret == _idSpanMap.end())
        {
            break;
        }
        //后面的页号对应的span正在被使用,停止向后合并
        Span* nextSpan = ret->second;
        if (nextSpan->_isUse == true)
        {
            break;
        }
        //合并出超过128页的span无法进行管理,停止向后合并
        if (nextSpan->_n + span->_n > NPAGES - 1)
        {
            break;
        }
        //进行向后合并
        span->_n += nextSpan->_n;
        //将nextSpan从对应的双链表中移除
        _spanLists[nextSpan->_n].Erase(nextSpan);
        delete nextSpan;
    }
    //将合并后的span挂到对应的双链表当中
    _spanLists[span->_n].PushFront(span);
    //建立该span与其首尾页的映射
    _idSpanMap[span->_pageId] = span;
    _idSpanMap[span->_pageId + span->_n - 1] = span;
    //将该span设置为未被使用的状态
    span->_isUse = false;
}

需要注意的是,在向前或向后进行合并的过程中:

  • 如果没有通过页号获取到其对应的span,说明对应到该页的内存块还未申请,此时需要停止合并。
  • 如果通过页号获取到了其对应的span,但该span处于被使用的状态,那我们也必须停止合并。
  • 如果合并后大于128页则不能进行本次合并,因为page cache无法对大于128页的span进行管理。

       在合并span时,由于这个span是在page cache的某个哈希桶的双链表当中的,因此在合并后需要将其从对应的双链表中移除,然后再将这个被合并了的span结构进行delete。

       除此之外,在合并结束后,除了将合并后的span挂到page cache对应哈希桶的双链表当中,还需要建立该span与其首位页之间的映射关系,便于此后合并出更大的span。

释放内存过程联调

ConcurrentFree函数

  至此我们将thread cache、central cache以及page cache的释放流程也都写完了,此时我们就可以向外提供一个ConcurrentFree函数,用于释放内存块,释放内存块时每个线程通过自己的thread cache对象,调用thread cache中释放内存对象的接口即可。

static void ConcurrentFree(void* ptr, size_t size/*暂时*/)
{
    assert(pTLSThreadCache);
    pTLSThreadCache->Deallocate(ptr, size);
}

释放内存过程联调测试

  之前我们在测试申请流程时,让单个线程进行了三次内存申请,现在我们再将这三个对象再进行释放,看看这其中的释放流程是如何进行的。

void* p1 = ConcurrentAlloc(6);
void* p2 = ConcurrentAlloc(8);
void* p3 = ConcurrentAlloc(1);
ConcurrentFree(p1, 6);
ConcurrentFree(p2, 8);
ConcurrentFree(p3, 1);

       首先,这三次申请和释放的对象大小进行对齐后都是8字节,因此对应操作的就是thread cache和central cache的第0号桶,以及page cache的第1号桶。

  由于第三次对象申请时,刚好将thread cache第0号桶当中仅剩的一个对象拿走了,因此在三次对象申请后thread cache的第0号桶当中是没有对象的。

  通过监视窗口可以看到,此时thread cache第0号桶中自由链表的_maxSize已经慢增长到了3,而当我们释放完第一个对象后,该自由链表当中对象的个数只有一个,因此不会将该自由链表当中的对象进一步还给central cache。

  当第二个对象释放给thread cache的第0号桶后,该桶对应自由链表当中对象的个数变成了2,也是不会进行ListTooLong操作的。

        直到第三个对象释放给thread cache的第0号桶时,此时该自由链表的_size的值变为3,与_maxSize的值相等,现在thread cache就需要将对象给central cache了。

        thread cache先是将第0号桶当中的对象弹出MaxSize个,在这里实际上就是全部弹出,此时该自由链表_size的值变为0,然后继续调用central cache当中的ReleaseListToSpans函数,将这三个对象还给central cache当中对应的span。

        在进入central cache的第0号桶还对象之前,先把第0号桶对应的桶锁加上,然后通过查page cache中的映射表找到其对应的span,最后将这个对象头插到该span的自由链表中,并将该span的_useCount进行--。当第一个对象还给其对应的span时,可以看到该span的_useCount减到了2。

        而由于我们只进行了三次对象申请,并且这些对象大小对齐后大小都是8字节,因此我们申请的这三个对象实际都是同一个span切分出来的。当我们将这三个对象都还给这个span时,该span的_useCount就减为了0。

        现在central cache就需要将这个span进一步还给page cache,而在将该span交给page cache之前,会将该span的自由链表以及前后指针都置空。并且在进入page cache之前会先将central cache第0号桶的桶锁解掉,然后再加上page cache的大锁,之后才能进入page cache进行相关操作。

        由于这个一页的span是从128页的span的头部切下来的,在向前合并时由于前面的页还未向系统申请,因此在查映射关系时是无法找到的,此时直接停止了向前合并。

(说明一下:由于下面是重新另外进行的一次调试,因此监视窗口显示的span的起始页号与之前的不同,实际应该与上面一致)

        而在向后合并时,由于page cache没有将该页后面的页分配给central cache,因此在向后合并时肯定能够找到一个127页的span进行合并。合并后就变成了一个128页的span,这时我们将原来127页的span从第127号桶删除,然后还需要将该127页的span结构进行delete,因为它管理的127页已经与1页的span进行合并了,不再需要它来管理了。

        紧接着将这个128页的span插入到第128号桶,然后建立该span与其首尾页的映射,便于下次被用于合并,最后再将该span的状态设置为未被使用的状态即可。

        当从page cache回来后,除了将page cache的大锁解掉,还需要立刻加上central cache中对应的桶锁,然后继续将对象还给central cache中的span,但此时实际上是还完了,因此再将central cache的桶锁解掉就行了。

        至此我们便完成了这三个对象的申请和释放流程。

大于256KB的大块内存申请问题

申请过程

  之前说到,每个线程的thread cache是用于申请小于等于256KB的内存的,而对于大于256KB的内存,我们可以考虑直接向page cache申请,但page cache中最大的页也就只有128页,因此如果是大于128页的内存申请,就只能直接向堆申请了。

  而我们之前实现RoundUp函数时,对传入字节数大于256KB的情况直接做了断言处理,因此这里需要对RoundUp函数稍作修改。

//获取向上对齐后的字节数
static inline size_t RoundUp(size_t bytes)
{
    if (bytes <= 128)
    {
        return _RoundUp(bytes, 8);
    }
    else if (bytes <= 1024)
    {
        return _RoundUp(bytes, 16);
    }
    else if (bytes <= 8 * 1024)
    {
        return _RoundUp(bytes, 128);
    }
    else if (bytes <= 64 * 1024)
    {
        return _RoundUp(bytes, 1024);
    }
    else if (bytes <= 256 * 1024)
    {
        return _RoundUp(bytes, 8 * 1024);
    }
    else
    {
        //大于256KB的按页对齐
        return _RoundUp(bytes, 1 << PAGE_SHIFT);
    }
}

       现在对于之前的申请逻辑就需要进行修改了,当申请对象的大小大于256KB时,就不用向thread cache申请了,这时先计算出按页对齐后实际需要申请的页数,然后通过调用NewSpan申请指定页数的span即可。

static void* ConcurrentAlloc(size_t size)
{
    if (size > MAX_BYTES) //大于256KB的内存申请
    {
        //计算出对齐后需要申请的页数
        size_t alignSize = SizeClass::RoundUp(size);
        size_t kPage = alignSize >> PAGE_SHIFT;
        //向page cache申请kPage页的span
        PageCache::GetInstance()->_pageMtx.lock();
        Span* span = PageCache::GetInstance()->NewSpan(kPage);
        PageCache::GetInstance()->_pageMtx.unlock();
        void* ptr = (void*)(span->_pageId << PAGE_SHIFT);
        return ptr;
    }
    else
    {
        //通过TLS,每个线程无锁的获取自己专属的ThreadCache对象
        if (pTLSThreadCache == nullptr)
        {
            pTLSThreadCache = new ThreadCache;
        }
        cout << std::this_thread::get_id() << ":" << pTLSThreadCache << endl;
        return pTLSThreadCache->Allocate(size);
    }
}

       也就是说,申请大于256KB的内存时,会直接调用page cache当中的NewSpan函数进行申请,因此这里我们需要再对NewSpan函数进行改造,当需要申请的内存页数大于128页时,就直接向堆申请对应页数的内存块。而如果申请的内存页数是小于128页的,那就在page cache中进行申请,因此当申请大于256KB的内存调用NewSpan函数时也是需要加锁的,因为我们可能是在page cache中进行申请的。

//获取一个k页的span
Span* PageCache::NewSpan(size_t k)
{
    assert(k > 0);
    if (k > NPAGES - 1) //大于128页直接找堆申请
    {
        void* ptr = SystemAlloc(k);
        Span* span = new Span;
        span->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
        span->_n = k;
        //建立页号与span之间的映射
        _idSpanMap[span->_pageId] = span;
        return span;
    }
    //先检查第k个桶里面有没有span
    if (!_spanLists[k].Empty())
    {
        Span* kSpan = _spanLists[k].PopFront();
        //建立页号与span的映射,方便central cache回收小块内存时查找对应的span
        for (PAGE_ID i = 0; i < kSpan->_n; i++)
        {
            _idSpanMap[kSpan->_pageId + i] = kSpan;
        }
        return kSpan;
    }
    //检查一下后面的桶里面有没有span,如果有可以将其进行切分
    for (size_t i = k + 1; i < NPAGES; i++)
    {
        if (!_spanLists[i].Empty())
        {
            Span* nSpan = _spanLists[i].PopFront();
            Span* kSpan = new Span;
            //在nSpan的头部切k页下来
            kSpan->_pageId = nSpan->_pageId;
            kSpan->_n = k;
            nSpan->_pageId += k;
            nSpan->_n -= k;
            //将剩下的挂到对应映射的位置
            _spanLists[nSpan->_n].PushFront(nSpan);
            //存储nSpan的首尾页号与nSpan之间的映射,方便page cache合并span时进行前后页的查找
            _idSpanMap[nSpan->_pageId] = nSpan;
            _idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan;
            //建立页号与span的映射,方便central cache回收小块内存时查找对应的span
            for (PAGE_ID i = 0; i < kSpan->_n; i++)
            {
                _idSpanMap[kSpan->_pageId + i] = kSpan;
            }
            return kSpan;
        }
    }
    //走到这里说明后面没有大页的span了,这时就向堆申请一个128页的span
    Span* bigSpan = new Span;
    void* ptr = SystemAlloc(NPAGES - 1);
    bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
    bigSpan->_n = NPAGES - 1;
    _spanLists[bigSpan->_n].PushFront(bigSpan);
    //尽量避免代码重复,递归调用自己
    return NewSpan(k);
}

释放过程

当释放对象时,我们需要判断释放对象的大小:

        因此当释放对象时,我们需要先找到该对象对应的span,但是在释放对象时我们只知道该对象的起始地址。这也就是我们在申请大于256KB的内存时,也要给申请到的内存建立span结构,并建立起始页号与该span之间的映射关系的原因。此时我们就可以通过释放对象的起始地址计算出起始页号,进而通过页号找到该对象对应的span。

static void ConcurrentFree(void* ptr, size_t size)
{
    if (size > MAX_BYTES) //大于256KB的内存释放
    {
        Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr);
        PageCache::GetInstance()->_pageMtx.lock();
        PageCache::GetInstance()->ReleaseSpanToPageCache(span);
        PageCache::GetInstance()->_pageMtx.unlock();
    }
    else
    {
        assert(pTLSThreadCache);
        pTLSThreadCache->Deallocate(ptr, size);
    }
}

       因此page cache在回收span时也需要进行判断,如果该span的大小是小于等于128页的,那么直接还给page cache进行了,page cache会尝试对其进行合并。而如果该span的大小是大于128页的,那么说明该span是直接向堆申请的,我们直接将这块内存释放给堆,然后将这个span结构进行delete就行了。

//释放空闲的span回到PageCache,并合并相邻的span
void PageCache::ReleaseSpanToPageCache(Span* span)
{
    if (span->_n > NPAGES - 1) //大于128页直接释放给堆
    {
        void* ptr = (void*)(span->_pageId << PAGE_SHIFT);
        SystemFree(ptr);
        delete span;
        return;
    }
    //对span的前后页,尝试进行合并,缓解内存碎片问题
    //1、向前合并
    while (1)
    {
        PAGE_ID prevId = span->_pageId - 1;
        auto ret = _idSpanMap.find(prevId);
        //前面的页号没有(还未向系统申请),停止向前合并
        if (ret == _idSpanMap.end())
        {
            break;
        }
        //前面的页号对应的span正在被使用,停止向前合并
        Span* prevSpan = ret->second;
        if (prevSpan->_isUse == true)
        {
            break;
        }
        //合并出超过128页的span无法进行管理,停止向前合并
        if (prevSpan->_n + span->_n > NPAGES - 1)
        {
            break;
        }
        //进行向前合并
        span->_pageId = prevSpan->_pageId;
        span->_n += prevSpan->_n;
        //将prevSpan从对应的双链表中移除
        _spanLists[prevSpan->_n].Erase(prevSpan);
        delete prevSpan;
    }
    //2、向后合并
    while (1)
    {
        PAGE_ID nextId = span->_pageId + span->_n;
        auto ret = _idSpanMap.find(nextId);
        //后面的页号没有(还未向系统申请),停止向后合并
        if (ret == _idSpanMap.end())
        {
            break;
        }
        //后面的页号对应的span正在被使用,停止向后合并
        Span* nextSpan = ret->second;
        if (nextSpan->_isUse == true)
        {
            break;
        }
        //合并出超过128页的span无法进行管理,停止向后合并
        if (nextSpan->_n + span->_n > NPAGES - 1)
        {
            break;
        }
        //进行向后合并
        span->_n += nextSpan->_n;
        //将nextSpan从对应的双链表中移除
        _spanLists[nextSpan->_n].Erase(nextSpan);
        delete nextSpan;
    }
    //将合并后的span挂到对应的双链表当中
    _spanLists[span->_n].PushFront(span);
    //建立该span与其首尾页的映射
    _idSpanMap[span->_pageId] = span;
    _idSpanMap[span->_pageId + span->_n - 1] = span;
    //将该span设置为未被使用的状态
    span->_isUse = false;
}

       说明一下,直接向堆申请内存时我们调用的接口是VirtualAlloc,与之对应的将内存释放给堆的接口叫做VirtualFree,而Linux下的brk和mmap对应的释放接口叫做sbrk和unmmap。此时我们也可以将这些释放接口封装成一个叫做SystemFree的接口,当我们需要将内存释放给堆时直接调用SystemFree即可。

//直接将内存还给堆
inline static void SystemFree(void* ptr)
{
#ifdef _WIN32
    VirtualFree(ptr, 0, MEM_RELEASE);
#else
    //linux下sbrk unmmap等
#endif
}

简单测试

下面我们对大于256KB的申请释放流程进行简单的测试:

//找page cache申请
void* p1 = ConcurrentAlloc(257 * 1024); //257KB
ConcurrentFree(p1, 257 * 1024);
//找堆申请
void* p2 = ConcurrentAlloc(129 * 8 * 1024); //129页
ConcurrentFree(p2, 129 * 8 * 1024);

       当申请257KB的内存时,由于257KB的内存按页向上对齐后是33页,并没有大于128页,因此不会直接向堆进行申请,会向page cache申请内存,但此时page cache当中实际是没有内存的,最终page cache就会向堆申请一个128页的span,将其切分成33页的span和95页的span,并将33页的span进行返回。

        而在释放内存时,由于该对象的大小大于了256KB,因此不会将其还给thread cache,而是直接调用的page cache当中的释放接口。

        由于该对象的大小是33页,不大于128页,因此page cache也不会直接将该对象还给堆,而是尝试对其进行合并,最终就会把这个33页的span和之前剩下的95页的span进行合并,最终将合并后的128页的span挂到第128号桶中。

  当申请129页的内存时,由于是大于256KB的,于是还是调用的page cache对应的申请接口,但此时申请的内存同时也大于128页,因此会直接向堆申请。在申请后还会建立该span与其起始页号之间的映射,便于释放时可以通过页号找到该span。

        在释放内存时,通过对象的地址找到其对应的span,从span结构中得知释放内存的大小大于128页,于是会将该内存直接还给堆。

多线程环境下对比malloc测试

       之前我们只是对代码进行了一些基础的单元测试,下面我们在多线程场景下对比malloc进行测试。

void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
    std::vector<std::thread> vthread(nworks);
    std::atomic<size_t> malloc_costtime = 0;
    std::atomic<size_t> free_costtime = 0;
    for (size_t k = 0; k < nworks; ++k)
    {
        vthread[k] = std::thread([&, k]() {
            std::vector<void*> v;
            v.reserve(ntimes);
            for (size_t j = 0; j < rounds; ++j)
            {
                size_t begin1 = clock();
                for (size_t i = 0; i < ntimes; i++)
                {
                    v.push_back(malloc(16));
                    //v.push_back(malloc((16 + i) % 8192 + 1));
                }
                size_t end1 = clock();
                size_t begin2 = clock();
                for (size_t i = 0; i < ntimes; i++)
                {
                    free(v[i]);
                }
                size_t end2 = clock();
                v.clear();
                malloc_costtime += (end1 - begin1);
                free_costtime += (end2 - begin2);
            }
        });
    }
    for (auto& t : vthread)
    {
        t.join();
    }
    printf("%u个线程并发执行%u轮次,每轮次malloc %u次: 花费:%u ms\n",
        nworks, rounds, ntimes, malloc_costtime);
    printf("%u个线程并发执行%u轮次,每轮次free %u次: 花费:%u ms\n",
        nworks, rounds, ntimes, free_costtime);
    printf("%u个线程并发malloc&free %u次,总计花费:%u ms\n",
        nworks, nworks*rounds*ntimes, malloc_costtime + free_costtime);
}
void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
    std::vector<std::thread> vthread(nworks);
    std::atomic<size_t> malloc_costtime = 0;
    std::atomic<size_t> free_costtime = 0;
    for (size_t k = 0; k < nworks; ++k)
    {
        vthread[k] = std::thread([&]() {
            std::vector<void*> v;
            v.reserve(ntimes);
            for (size_t j = 0; j < rounds; ++j)
            {
                size_t begin1 = clock();
                for (size_t i = 0; i < ntimes; i++)
                {
                    v.push_back(ConcurrentAlloc(16));
                    //v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1));
                }
                size_t end1 = clock();
                size_t begin2 = clock();
                for (size_t i = 0; i < ntimes; i++)
                {
                    ConcurrentFree(v[i]);
                }
                size_t end2 = clock();
                v.clear();
                malloc_costtime += (end1 - begin1);
                free_costtime += (end2 - begin2);
            }
        });
    }
    for (auto& t : vthread)
    {
        t.join();
    }
    printf("%u个线程并发执行%u轮次,每轮次concurrent alloc %u次: 花费:%u ms\n",
        nworks, rounds, ntimes, malloc_costtime);
    printf("%u个线程并发执行%u轮次,每轮次concurrent dealloc %u次: 花费:%u ms\n",
        nworks, rounds, ntimes, free_costtime);
    printf("%u个线程并发concurrent alloc&dealloc %u次,总计花费:%u ms\n",
        nworks, nworks*rounds*ntimes, malloc_costtime + free_costtime);
}
int main()
{
    size_t n = 10000;
    cout << "==========================================================" <<
        endl;
    BenchmarkConcurrentMalloc(n, 4, 10);
    cout << endl << endl;
    BenchmarkMalloc(n, 4, 10);
    cout << "==========================================================" <<
        endl;
    return 0;
}

其中测试函数各个参数的含义如下:

  • ntimes:单轮次申请和释放内存的次数。
  • nworks:线程数。
  • rounds:轮次。

  在测试函数中,我们通过clock函数分别获取到每轮次申请和释放所花费的时间,然后将其对应累加到malloc_costtime和free_costtime上。最后我们就得到了,nworks个线程跑rounds轮,每轮申请和释放ntimes次,这个过程申请所消耗的时间、释放所消耗的时间、申请和释放总共消耗的时间。

  注意,我们创建线程时让线程执行的是lambda表达式,而我们这里在使用lambda表达式时,以值传递的方式捕捉了变量k,以引用传递的方式捕捉了其他父作用域中的变量,因此我们可以将各个线程消耗的时间累加到一起。

  我们将所有线程申请内存消耗的时间都累加到malloc_costtime上, 将释放内存消耗的时间都累加到free_costtime上,此时malloc_costtime和free_costtime可能被多个线程同时进行累加操作的,所以存在线程安全的问题。鉴于此,我们在定义这两个变量时使用了atomic类模板,这时对它们的操作就是原子操作了。

固定大小内存的申请和释放

我们先来测试一下固定大小内存的申请和释放:

1. v.push_back(malloc(16));
2. v.push_back(ConcurrentAlloc(16));

       此时4个线程执行10轮操作,每轮申请释放10000次,总共申请释放了40万次,运行后可以看到,malloc的效率还是更高的。

  由于此时我们申请释放的都是固定大小的对象,每个线程申请释放时访问的都是各自thread cache的同一个桶,当thread cache的这个桶中没有对象或对象太多要归还时,也都会访问central cache的同一个桶。此时central cache中的桶锁就不起作用了,因为我们让central cache使用桶锁的目的就是为了,让多个thread cache可以同时访问central cache的不同桶,而此时每个thread cache访问的却都是central cache中的同一个桶。

不同大小内存的申请和释放

下面我们再来测试一下不同大小内存的申请和释放:

1. v.push_back(malloc((16 + i) % 8192 + 1));
2. v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1));

       运行后可以看到,由于申请和释放内存的大小是不同的,此时central cache当中的桶锁就起作用了,ConcurrentAlloc的效率也有了较大增长,但相比malloc来说还是差一点点。

项目源码

Gitee:高并发内存池项目: 本项目实现的是一个高并发的内存池,它的原型是Google的一个开源项目tcmalloc,tcmalloc全称Thread-Caching Malloc,即线程缓存的malloc,实现了高效的多线程内存管理,用于替换系统的内存分配相关函数malloc和free。

相关文章
|
9月前
|
存储 缓存 算法
内存“银行”(二)
内存“银行”(二)
|
9月前
|
存储 缓存 算法
|
3天前
|
存储
浮点数在内存中的存储
浮点数在内存中的存储
25 0
|
3天前
|
存储
数据在内存中的存储之整数存储
数据在内存中的存储之整数存储
21 0
|
3天前
|
存储 C语言
C语言第二十九弹---浮点数在内存中的存储
C语言第二十九弹---浮点数在内存中的存储
|
2天前
|
存储 小程序 编译器
数据在内存中的存储(探索内存的秘密)
数据在内存中的存储(探索内存的秘密)
9 0
|
3天前
|
存储 监控 NoSQL
Redis处理大量数据主要依赖于其内存存储结构、高效的数据结构和算法,以及一系列的优化策略
【5月更文挑战第15天】Redis处理大量数据依赖内存存储、高效数据结构和优化策略。选择合适的数据结构、利用批量操作减少网络开销、控制批量大小、使用Redis Cluster进行分布式存储、优化内存使用及监控调优是关键。通过这些方法,Redis能有效处理大量数据并保持高性能。
22 0
|
1天前
|
存储 算法 关系型数据库
实时计算 Flink版产品使用合集之在Flink Stream API中,可以在任务启动时初始化一些静态的参数并将其存储在内存中吗
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
15 4
|
3天前
|
存储 编译器 程序员
C语言:数据在内存中的存储
C语言:数据在内存中的存储
15 2
|
3天前
|
存储
整数和浮点数在内存中存储
整数的2进制表⽰⽅法有三种,即原码、反码和补码。
18 0