多核分布式队列的实现:“偷”与“自私”的运用(2)

简介:
3. 本地化队列的实现思路
要给线程指定一个本地化队列,通常的做法是先将创建好的队列放入一个数组中,然后给线程编号,从0开始进行编号,编号为0的线程对应于数组下标为0位置上存放的队列,编号为1的线程对应于数组下标为1位置上存放的队列,...。
每个线程要获取自己的本地化队列时,只需要先获取线程编号,然后就可以通过线程编号去访问对应的队列,由于每个线程的编号都不相同,因此每个线程访问的队列都不相同,即每个队列只有一个线程访问它,这样就可以实现每个线程的本地化队列。
那么如何给线程编号从0开始编号呢,操作系统并没有直接提供这种功能。即使操作系统提供了线程从0开始编号的功能也没有用,因为并不一定所有的线程 都会访问分布式队列。例如有8个线程,其中编号为0,3,5,7的线程会访问分布式队列,那么在创建分布式队列时,就需要创建8个本地队列,否则线程编号 将无法和存放队列的数组下标对应起来。
看到这里,目标已经很明确了,那就是要给所有访问分布式队列的线程从0开始依次编号。比如有N个线程要访问分布式队列,那么需要给这N个线程依次编号为0,1,...N-1。下面就来讨论如何给线程编号的问题。
4. 给线程编号的方法
在操作系统中,通常提供了线程本地存储的API,通过API可以给每个线程设定一个数据(可以是指针,也可以是一个整数),同时也可以通过API来 取出当前线程设置的那个数据。比如给一个线程A设定一个整数0,那么线程A执行的任何地方都可以调用相应的API获取到整数0,这样就可以在程序的任何地 获取到线程A的编号为0。
在Windows系列操作系统中,提供了Tls_Alloc(),Tls_SetValue(),Tls_GetValue(),Tls_Free()这几个函数来实现线程本地存储操作。
pthread中,可以通过pthread_key_create(), pthread_setspecific(), pthread_getspecific()等函数来实现线程本地存储操作,其中pthread_create_key()和Tls_Alloc()功能 相同,只是参数有所不同,Tls_SetValue()和pthread_setspecific()功能等价,Tls_GetValue()和 pthread_getspecific()功能等价。
下面演示一下TlsAlloc(),Tls_SetValue(),Tls_GetValue(),Tls_Free()这几个函数的基本用法。
DWORD g_dwTlsIndex;
LONG volatile g_dwThreadId = 0;
 
int GetId()
{
//获取当前执行线程的由TlsSetValue()设置的值
int nId = (int)TlsGetValue(g_dwTlsIndex);
return (nId-1);
}
 
void ThreadFunc(void *args)
{
    LONG  Id = AtomicIncrement (&g_dwThreadId); //对g_dwThreadId进行原子加1操作
    TlsSetValue(g_dwTlsIndex, (void *)Id);  //给当前执行的线程设置一个值
 
    printf("ThreadFunc2: Thread Id = %ld\n", GetId());
}
 
int main(int argc, char* argv[])
{
    g_dwTlsIndex = TlsAlloc();  //分配一个线程本地存储索引,需要在创建线程前执行
 
    _beginthread(ThreadFunc, 0, NULL);
    _beginthread(ThreadFunc, 0, NULL);
 
Sleep(100); //延时等待上面两个线程执行完
TlsFree(g_dwTlsIndex);
return 0;
}
需要说明一下,在ThreadFunc()函数中,使用了一个AtomicIncrement()函数,这个函数相当于Windows操作系统中的InterlockedIncrement()函数。在Widnows系统中,可以使用以下宏定义来实现AtomicIncrement()函数:
#define AtomicIncrement(x)  InterlockedIncrement(x)
上面程序在运行后,会打印出以下结果:
ThreadFunc: Thread Id = 0
ThreadFunc: Thread Id = 1
 
从上面代码和执行结果可以看出,虽然GetValue()在ThreadFunc()函数中执行,但是两个线程执行GetValue()得到的值是 不同的,一个线程得到的是0,另外一个线程得到的是1。这主要是因为两个线程调用TlsSetValue()设置的值并不相同,一个为1,另一个为2。
需要注意的是,TlsGetValue()的返回值为0表示失败,所以使用TlsSetValue()函数时,应该从1开始设置,然后在GetId()函数中,返回的是TlsGetValue()的返回值减1。
采用上面的方法,就可以设计出分布式队列中的线程Id自动编号和获取功能了。下面是详细的实现代码:
class  CDistributedQueue {
private:
       DWORD m_dwTlsIndex;
       LONG volatile m_lThreadIdIndex;
public:
       CDistributedQueue();
       virtual ~CDistributedQueue();
       LONG ThreadIdGet();
       //可以添加其他成员函数在下面
};
 
CDistributedQueue::CDistributedQueue()
{
       m_dwTlsIndex = TlsAlloc();
       m_lThreadIdIndex = 0;
}
 
CDistributedQueue::~CDistributedQueue()
{
       TlsFree(m_dwTlsIndex);
}
 
LONG CDistributedQueue::ThreadIdGet()
{
       LONG Id = (LONG )TlsGetValue(m_dwTlsIndex);
if ( Id == 0 )
{
    Id = AtomicIncrement(&m_lThreadIdIndex);
    TlsSetValue(Id);
}
return (Id - 1);
}
上面的代码中,设置或获取线程编号都在ThreadIdGet()一个成员函数内完成,先判断获取的Id是否为0,如果为0,表明线程还没有被设置 Id,因此将m_lThreadIdIndex原子加1,然后再设置给对应的线程。每调用一次TlsSetValue()函数,其设置的Id值依次加1, 这样就可以得到一个1,2,3,...序列。每个线程调用了TlsSetValue()函数后,下一个调用TlsGetValue()函数时,获得的值一 定大于0,因此每个线程最多只能执行TlsSetValue()函数一次。
采用上面的方法来获取线程编号,必须保证创建的本地队列数量大于等于访问队列的线程数量,否则队列数量不足,将会造成没有足够的本地队列供线程使用,程序中可能会造成越界等不可预测的异常。常用的解决办法是将本地队列的数量扩大一倍。
上面这种线程编号方法,非常方便,任何访问分布式队列的线程都可以被自动编号,调用分布式队列的线程不需要为编号操心。
有了给线程自动编号的方法后,就可以实现分布式队列的各个具体操作如进队、出队等。当然在实现具体的操作代码前,有必要了解一下分布式队列中是如何进行进队和出队操作的。


本文转自Intel_ISN 51CTO博客,原文链接:http://blog.51cto.com/intelisn/130445,如需转载请自行联系原作者
相关文章
|
27天前
|
消息中间件 存储 NoSQL
一文读懂python分布式任务队列-celery
# 一文读懂Python分布式任务队列-Celery Celery是一个分布式任务执行框架,支持大量并发任务。它采用生产者-消费者模型,由Broker、Worker和Backend组成。生产者提交任务到队列,Worker异步执行,结果存储在Backend。适用于异步任务、大规模实时任务和定时任务。5月更文挑战第17天
38 1
|
1月前
|
消息中间件 监控 NoSQL
在Windows下设置分布式队列Celery的心跳轮询
在Windows下设置分布式队列Celery的心跳轮询
447 0
|
1月前
|
消息中间件 监控 NoSQL
一文读懂python分布式任务队列-celery
celery是一个简单,灵活、可靠的分布式任务执行框架,可以支持大量任务的并发执行。celery采用典型生产者和消费者模型。生产者提交任务到任务队列,众多消费者从任务队列中取任务执行【2月更文挑战第11天】
55834 5
|
7月前
|
消息中间件 NoSQL Go
Golang微服务框架Kratos应用分布式计划任务队列Asynq
Asynq是一个使用Go语言实现的分布式任务队列和异步处理库,它由Redis提供支持,它提供了轻量级的、易于使用的API,并且具有高可扩展性和高可定制化性。其作者Ken Hibino,任职于Google。
141 0
|
7月前
|
消息中间件 存储 NoSQL
Golang微服务框架Kratos应用分布式任务队列Machinery
go machinery是一个基于分布式消息分发的异步任务队列框架,类似python中常用celery框架,主要用于异步任务和定时任务。
158 0
|
10月前
|
NoSQL Go Redis
Asynq: 基于Redis实现的Go生态分布式任务队列和异步处理库
Asynq: 基于Redis实现的Go生态分布式任务队列和异步处理库
345 0
|
存储 分布式计算 安全
「分布式架构」最终一致性:暗示的切换队列
「分布式架构」最终一致性:暗示的切换队列
|
缓存 运维 负载均衡
Redis连环炮:内存淘汰?事务?分布式锁?分步式限流?异步队列?延时队列?高可用?如何部署?哈希槽?数据库和缓存的数据一致性?
Redis连环炮:内存淘汰?事务?分布式锁?分步式限流?异步队列?延时队列?高可用?如何部署?哈希槽?数据库和缓存的数据一致性?
195 0
Redis连环炮:内存淘汰?事务?分布式锁?分步式限流?异步队列?延时队列?高可用?如何部署?哈希槽?数据库和缓存的数据一致性?
|
消息中间件 存储 NoSQL
Kratos微服务框架下实现分布式任务队列
提起分布式任务队列(Distributed Task Queue),就不得不提Python的Celery。而Asynq和Machinery就是GO当中类似于Celery的分布式任务队列。
2276 0
Kratos微服务框架下实现分布式任务队列
一个高性能、轻量级的分布式内存队列系统--beanstalk
 Beanstalk是一个高性能、轻量级的、分布式的、内存型的消息队列系统。最初设计的目的是想通过后台异步执行耗时的任务来降低高容量Web应用系统的页面访问延迟。其实Beanstalkd是典型的类Memcached设计,协议和使用方式都是同样的风格。其基本设计思想很简单:高性能离不开异步,异步离不开队列,而内部都是生产者-消费者模式的。
一个高性能、轻量级的分布式内存队列系统--beanstalk

热门文章

最新文章