多核分布式队列的实现:“偷”与“自私”的运用(4)

简介:
在设计CDistributeQueue类时,通常有两种方案值得考虑:
  • 1、 本地队列预先创建好,当有线程访问时就可以直接根据线程编号去访问对应的本地队列。
  • 2、 不预先创建本地队列,当线程第一次访问分布式队列时,由于获取不到线程编号,由此可以断定本线程是第1次访问分布式队列,此时才创建本地队列。
方案1和方案2可以说各有优缺点。方案1中,在事先不知道有多少线程会访问分布式队列的情况下,预先创建好本地队列会造成程序初始化时间过长,并且可能有一些创建好的队列得不到使用。
方案2中,采用线程访问分布式队列时才创建本地队列,初始化时比较简单,并且不会造成多创建了本地队列的情况。缺点是编程时,队列的操作代码会变复杂一些,效率会有所降低。
下面的代码中,给出的是方案2的实现。
  • 1) 类的原型定义和成员函数
//获取线程Id回调函数定义
typedef int (*GetThreadIdFunc)(void *pArg);
 
template <class T, class LocalQueue, class SharedQueue>
class  CDistributedQueue {
private:
    LocalQueue **   m_ppLocalQueue;    // 本地队列数组
    SharedQueue *   m_pSharedQueue;    // 共享队列池或共享队列
 
    int             m_nLocalQueueSize;
    int             m_nSharedQueueSize;
    int             m_nLocalQueueCount;
    int             m_nSharedQueueCount;
    DWORD       m_dwTlsIndex;        //线程本地存储索引
    LONG volatile   m_lThreadIdIndex;     //线程编号最大值
    GetThreadIdFunc m_GetThreadIdFunc;   //获取线程编号回调函数,如果由外面
                                                                  //的线程池提供编号时,需要传入回调函数
    void *          m_pThreadIdFuncArg;  //获取线程编号回调函数的参数
 
 CFastLock       m_LocalQueueResizeLock; //专为下面的ResizeLocalQueue函数使用
 void ResizeLocalQueue();             //将m_ppLocalQueue数组的大小扩大一倍
public:
    CDistributedQueue(){
        m_GetThreadIdFunc = NULL;
        m_pThreadIdFuncArg = NULL;
        m_lThreadIdIndex = 0;
    };
    void Create( int nLocalQueueSize, int nLocalQueueCount,
        int nSharedQueueSize, int nSharedQueueCount);
    void Create( int nLocalQueueSize, int nLocalQueueCount,
        int nSharedQueueSize, int nSharedQueueCount,
        GetThreadIdFunc GetThreadId, void * pThreadIdFuncArg);
 
    virtual ~CDistributedQueue();
 
    LONG ThreadIdGet();
 
    void EnQueue(T &Data);
    int  DeQueue(T &Data);
 
    void PushToLocalQueue(T &Data);
    void PushToLocalQueue(T &Data, int nIndex);
    int PopFromLocalQueue(T &Data);
 
    SharedQueue *GetSharedQueue() { return m_pSharedQueue; };
    int PrivatizeSharedQueue(int nSharedQueueIndex);
};
 
说明一下:CDistributedQueue类中有三个模板参数,第1个模板参数T是表示数据类型;第2个模板参数是表示本地队列类的类型,为一 个不需要使用锁的普通队列,比如环形队列等;第3个模板参数是表示一个需要使用锁的共享队列类,可以是一个队列池类,也可以是普通的使用锁的共享队列类。
 
  • 1) 构造函数和析构函数代码
/**   分布式队列的创建函数
 
       @param  int nLocalQueueSize - 本地子队列的大小 
       @param  int nLocalQueueCount - 本地队列的个数(数组的大小)   
       @param  int nSharedQueueSize - 共享子队列的大小      
       @param  int nSharedQueueCount - 共享子队列的个数    
       @return  void - 无
*/
template <class T, class LocalQueue, class SharedQueue>
void CDistributedQueue<T, LocalQueue, SharedQueue>::Create(
                   int nLocalQueueSize, int nLocalQueueCount,
                   int nSharedQueueSize, int nSharedQueueCount)
{
    m_nLocalQueueSize = nLocalQueueSize;
    m_nSharedQueueSize = nSharedQueueSize;
    if ( nLocalQueueCount != 0 )
    {
        m_nLocalQueueCount = nLocalQueueCount;
    }
    else
    {
        m_nLocalQueueCount = omp_get_num_procs();
    }
 
    if ( nSharedQueueCount != 0 )
    {
        m_nSharedQueueCount = nSharedQueueCount;
    }
    else
    {
        m_nSharedQueueCount = omp_get_num_procs();
    }
 
    m_ppLocalQueue =  new LocalQueue *[m_nLocalQueueCount];
    int i;
    for ( i = 0; i < m_nLocalQueueCount; i++ )
    {
        m_ppLocalQueue[i] = NULL;
    }
    m_pSharedQueue = new SharedQueue(m_nSharedQueueCount, m_nSharedQueueSize);
    m_dwTlsIndex = TlsAlloc();
    m_lThreadIdIndex = 0;
}
 
 
/**   分布式队列的创建函数
 
       @param  int nLocalQueueSize - 本地子队列的大小 
       @param  int nLocalQueueCount - 本地队列的个数(数组的大小)   
       @param  int nSharedQueueSize - 共享子队列的大小      
       @param  int nSharedQueueCount - 共享子队列的个数    
       @param  GetThreadIdFunc GetThreadId - 获取线程Id回调函数  
       @param  void * pThreadIdFuncArg - GetThreadId回调函数的参数
       @return  void - 无
*/
template <class T, class LocalQueue, class SharedQueue>
void CDistributedQueue<T, LocalQueue, SharedQueue>::Create(
    int nLocalQueueSize, int nLocalQueueCount,
    int nSharedQueueSize, int nSharedQueueCount,
    GetThreadIdFunc GetThreadId, void * pThreadIdFuncArg)
{
    m_GetThreadIdFunc = GetThreadId;
    m_pThreadIdFuncArg = pThreadIdFuncArg;
    Create(nLocalQueueSize, nLocalQueueCount, nSharedQueueSize, nSharedQueueCount);
}
 
/**   分布式队列的析构函数
 
       @return  - 无     
*/
template <class T, class LocalQueue, class SharedQueue>
CDistributedQueue<T, LocalQueue, SharedQueue>::~CDistributedQueue()
{
    int i;
    for ( i = 0; i < m_nLocalQueueCount; i++ )
    {
        if ( m_ppLocalQueue[i] != NULL )
        {
            delete m_ppLocalQueue[i];
        }
    }
    delete [] m_ppLocalQueue;
    delete m_pSharedQueue;
    TlsFree(m_dwTlsIndex);
}
 
  • 2) 将本地队列数组扩大一倍的内部成员函数代码
这个函数主要是考虑有可能程序升级后,访问的线程数量可能大于本地队列数组的大小的情况,此时采取将本地队列数组扩大一倍的策略。
/**   分布式队列的将本地队列数组扩大一倍的内部成员函数
 
       @return  void - 无
*/
template <class T, class LocalQueue, class SharedQueue>
void CDistributedQueue<T, LocalQueue, SharedQueue>::ResizeLocalQueue()
{
    //将本地队列数组扩大一倍, 防止线程数量多于队列数量,以保证程序安全
    int i;
 
    LocalQueue **ppQueue = new LocalQueue *[m_nLocalQueueCount * 2];
    for ( i = 0; i < m_nLocalQueueCount; i++ )
    {
        ppQueue[i] = m_ppLocalQueue[i];
    }
    for ( i = m_nLocalQueueCount; i < m_nLocalQueueCount * 2; i++ )
    {
        ppQueue[i] = NULL;
    }
    delete [] m_ppLocalQueue;
    m_ppLocalQueue = ppQueue;
 
    //使用原子操作避免m_nLocalQueueCount的数据竞争问题
    AtomicWrite((LONG volatile *)&m_nLocalQueueCount, m_nLocalQueueCount * 2);
}
 
  • 3) 获取线程Id成员函数代码
获取线程Id成员函数中,这个函数中完成本地队列的创建和分派工作。先是判断获取的线程Id是否为0,如果为0则表明还没有创建本地队列,此时需要给线程进行编号,并创建一个新的本地队列放到数组中下标等于线程编号的位置上。
/**   分布式队列的获取线程Id函数
       如果m_GetThreadIdFunc回调函数不为空,则使用它获取Id
       否则根据分布式队列内部的编号机制获取线程Id
 
       @return  LONG - 返回线程的编号   
*/
template <class T, class LocalQueue, class SharedQueue>
LONG CDistributedQueue<T, LocalQueue, SharedQueue>::ThreadIdGet()
{
    LONG Id;
    LocalQueue *pQueue = NULL;
 
    if ( m_GetThreadIdFunc != NULL )
    {
        Id = (*m_GetThreadIdFunc)(m_pThreadIdFuncArg);
        if ( Id >= m_nLocalQueueCount )
        {
            CScopedLock<CFastLock> slock(m_LocalQueueResizeLock);
            if ( Id >= m_nLocalQueueCount )
            {
                ResizeLocalQueue();
            }
        }
        if ( m_ppLocalQueue[Id] == NULL )
        {
            m_ppLocalQueue[Id] = new LocalQueue(m_nLocalQueueSize);
        }
        return Id;
    }
    else
    {
        Id = (LONG )TlsGetValue(m_dwTlsIndex);
        if ( Id == 0 )
        {
            Id = AtomicIncrement(&m_lThreadIdIndex);
            TlsSetValue(m_dwTlsIndex, (void *)Id);
            pQueue = new LocalQueue(m_nLocalQueueSize);
        }
        --Id;
    }
 
    if ( Id >= m_nLocalQueueCount)
    {
        CScopedLock<CFastLock> slock(m_LocalQueueResizeLock);
        if ( Id >= m_nLocalQueueCount )
        {
            ResizeLocalQueue();
        }
    }
    if ( pQueue != NULL )
    {
        m_ppLocalQueue[Id] = pQueue;
    }
return Id;
}
 
  • 4) 进队操作策略1的进队操作代码
/**   分布式队列的进队操作函数
       这里假定了本地队列可以无限进队
       进队策略按以下优先级进行:
       1、本地队列空时进入本地队列,、共享队列未满时进入共享队列
       3、共享队列满时进入本地队列
 
       @param  T &Data - 要进队的数据    
       @return  void - 无
*/
template <class T, class LocalQueue, class SharedQueue>
void CDistributedQueue<T, LocalQueue, SharedQueue>::EnQueue(T &Data)
{
    int nId = ThreadIdGet();
    if ( m_ppLocalQueue[nId]->IsEmpty() )
    {
        m_ppLocalQueue[nId]->EnQueue(Data);
    }
    else if ( m_pSharedQueue->Push(Data) != CAPI_SUCCESS )
    {
        int nId = ThreadIdGet();
        m_ppLocalQueue[nId]->EnQueue(Data);
    }
    else
    {
        //这个分支不需要做任何事
    }
    return;
}
 
  • 5) 本地队列的操作代码
/**   分布式队列的本地队列进队函数
       将数据进入到当前线程的本地队列中
 
       @param  T &Data - 要进队的数据    
       @return  void - 无
*/
template <class T, class LocalQueue, class SharedQueue>
void CDistributedQueue<T, LocalQueue, SharedQueue>::PushToLocalQueue(
T &Data)
{
    int nId = ThreadIdGet();
    m_ppLocalQueue[nId]->EnQueue(Data);
    return;
}
 
/**   分布式队列的指定序号本地队列进队函数
    这是一个为特殊需求而设计的函数
    使用这个函数要特别小心,必须保证不会发生数据竞争问题
 
       @param  T &Data - 要进队的数据    
       @param  int nIndex - 本地队列的序号     
       @return  void - 无
*/
template <class T, class LocalQueue, class SharedQueue>
void CDistributedQueue<T, LocalQueue, SharedQueue>::PushToLocalQueue(
T &Data, int nIndex)
{
    if ( nIndex >= m_nLocalQueueCount * 2)
    {
        return;
    }
 
    if ( nIndex >= m_nLocalQueueCount )
    {
        CScopedLock<CFastLock> slock(m_LocalQueueResizeLock);
        if ( nIndex >= m_nLocalQueueCount )
        {
            ResizeLocalQueue();
        }
    }
 
    if ( m_ppLocalQueue[nIndex] == NULL )
    {
        m_ppLocalQueue[nIndex] = new LocalQueue(m_nLocalQueueSize);
    }
 
    m_ppLocalQueue[nIndex]->EnQueue(Data);
    return;
}
 
/**   分布式队列的本地队列出队函数
 
       @param  T &Data - 接收出队的数据
       @return  int - 出队成功返回CAPI_SUCCESS, 失败(队列为空)返回CAPI_FAILED.      
*/
template <class T, class LocalQueue, class SharedQueue>
int CDistributedQueue<T, LocalQueue, SharedQueue>::PopFromLocalQueue(
T &Data)
{
    int nId = ThreadIdGet();
    return m_ppLocalQueue[nId]->DeQueue(Data);
}
 
  • 6) 出队操作代码
/**   分布式队列的出队函数
    出队操作策略为,先从本地队列中出队,如果失败则从共享队列中出队
 
       @param  T &Data - 接收出队的数据
       @return  int - 成功返回CAPI_SUCCESS, 失败返回CAPI_FAILED.    
*/
template <class T, class LocalQueue, class SharedQueue>
int CDistributedQueue<T, LocalQueue, SharedQueue>::DeQueue(T &Data)
{
    int nRet;
 
    int nId = ThreadIdGet();
   
    nRet = m_ppLocalQueue[nId]->DeQueue(Data);
    if ( nRet == CAPI_FAILED )
    {
        nRet = m_pSharedQueue->Pop(Data);
    }
    return nRet;  
}

本文转自Intel_ISN 51CTO博客,原文链接:http://blog.51cto.com/intelisn/130452,如需转载请自行联系原作者
相关文章
|
28天前
|
消息中间件 存储 NoSQL
一文读懂python分布式任务队列-celery
# 一文读懂Python分布式任务队列-Celery Celery是一个分布式任务执行框架,支持大量并发任务。它采用生产者-消费者模型,由Broker、Worker和Backend组成。生产者提交任务到队列,Worker异步执行,结果存储在Backend。适用于异步任务、大规模实时任务和定时任务。5月更文挑战第17天
38 1
|
1月前
|
消息中间件 监控 NoSQL
在Windows下设置分布式队列Celery的心跳轮询
在Windows下设置分布式队列Celery的心跳轮询
448 0
|
1月前
|
消息中间件 监控 NoSQL
一文读懂python分布式任务队列-celery
celery是一个简单,灵活、可靠的分布式任务执行框架,可以支持大量任务的并发执行。celery采用典型生产者和消费者模型。生产者提交任务到任务队列,众多消费者从任务队列中取任务执行【2月更文挑战第11天】
55834 5
|
7月前
|
消息中间件 NoSQL Go
Golang微服务框架Kratos应用分布式计划任务队列Asynq
Asynq是一个使用Go语言实现的分布式任务队列和异步处理库,它由Redis提供支持,它提供了轻量级的、易于使用的API,并且具有高可扩展性和高可定制化性。其作者Ken Hibino,任职于Google。
142 0
|
7月前
|
消息中间件 存储 NoSQL
Golang微服务框架Kratos应用分布式任务队列Machinery
go machinery是一个基于分布式消息分发的异步任务队列框架,类似python中常用celery框架,主要用于异步任务和定时任务。
158 0
|
10月前
|
NoSQL Go Redis
Asynq: 基于Redis实现的Go生态分布式任务队列和异步处理库
Asynq: 基于Redis实现的Go生态分布式任务队列和异步处理库
345 0
|
存储 分布式计算 安全
「分布式架构」最终一致性:暗示的切换队列
「分布式架构」最终一致性:暗示的切换队列
|
缓存 运维 负载均衡
Redis连环炮:内存淘汰?事务?分布式锁?分步式限流?异步队列?延时队列?高可用?如何部署?哈希槽?数据库和缓存的数据一致性?
Redis连环炮:内存淘汰?事务?分布式锁?分步式限流?异步队列?延时队列?高可用?如何部署?哈希槽?数据库和缓存的数据一致性?
195 0
Redis连环炮:内存淘汰?事务?分布式锁?分步式限流?异步队列?延时队列?高可用?如何部署?哈希槽?数据库和缓存的数据一致性?
|
消息中间件 存储 NoSQL
Kratos微服务框架下实现分布式任务队列
提起分布式任务队列(Distributed Task Queue),就不得不提Python的Celery。而Asynq和Machinery就是GO当中类似于Celery的分布式任务队列。
2276 0
Kratos微服务框架下实现分布式任务队列
一个高性能、轻量级的分布式内存队列系统--beanstalk
 Beanstalk是一个高性能、轻量级的、分布式的、内存型的消息队列系统。最初设计的目的是想通过后台异步执行耗时的任务来降低高容量Web应用系统的页面访问延迟。其实Beanstalkd是典型的类Memcached设计,协议和使用方式都是同样的风格。其基本设计思想很简单:高性能离不开异步,异步离不开队列,而内部都是生产者-消费者模式的。
一个高性能、轻量级的分布式内存队列系统--beanstalk

热门文章

最新文章