开发者社区> 杨粼波> 正文
阿里云
为了无法计算的价值
打开APP
阿里云APP内打开

无锁(lock-free)数据结构

简介:
+关注继续查看

 提到并行计算通常都会想到加锁,事实却并非如此,大多数并发是不需要加锁的。比如在不同电脑上运行的代码编辑器,两者并发运行不需要加锁。在一台电脑上同时运行的媒体播放放器和代码编辑器,两者并发运行不需要加锁(当然系统调用和进程调度是要加锁的)。在同一个进程中运行多个线程,如果各自处理独立的事情也不需要加锁(当然系统调用、进程调度和内存分配是要加锁的)。在以上这些情况里,各个并发实体之间没有共享数据,所以虽然并发运行但不需要加锁。

多线程并发运行时,虽然有共享数据,如果所有线程只是读取共享数据而不修改它,也是不用加锁的,比如代码段就是共享的“数据”,每个线程都会读取,但是不用加锁。排除所有这些情况,多线程之间有共享数据,有的线程要修改这些共享数据,有的线程要读取这些共享数据,这才是程序员需要关注的情况,也是本节我们讨论的范围。

在并发的环境里,加锁可以保护共享的数据,但是加锁也会存在一些问题:

  • 由于临界区无法并发运行,进入临界区就需要等待,加锁带来效率的降低。
  • 在复杂的情况下,很容易造成死锁,并发实体之间无止境的互相等待。
  • 在中断/信号处理函数中不能加锁,给并发处理带来困难。
  • 优先级倒置造成实时系统不能正常工作。低级优先进程拿到高优先级进程需要的锁,结果是高/低优先级的进程都无法运行,中等优先级的进程可能在狂跑。

由于并发与加锁(互斥)的矛盾关系,无锁数据结构自然成为程序员关注的焦点,这也是本节要介绍的:



      CPU提供的原子操作

大约在七八年前,我们用apache的xerces来解析XML文件,奇怪的是多线程反而比单线程慢。他们找了很久也没有找出原因,只是证实使用多进程代替多线程会快一个数量级,在Windows上他们就使用了多进程的方式。后来移植到linux时候,我发现xerces每创建一个结点都会去更新一些全局的统计信息,比如把结点的总数加一,它使用的pthread_mutex实现互斥。这就是问题所在:一个XML文档有数以万计的结点,以50个线程并发为例,每个线程解析一个XML文档,总共要进行上百万次的加锁/解锁,几乎所有线程都在等待,你说能快得了吗?

当时我知道Windows下有InterlockedIncrement之类的函数,它们利用CPU一些特殊指令,保证对整数的基本操作是原子的。查找了一些资源发现Linux下也有类似的函数,后来我把所有加锁去掉,换成这些原子操作,速度比多进程运行还快了几倍。下面我们看++和—的原子操作在IA架构上的实现:

None.gif#define ATOMIC_SMP_LOCK "lock ; "
ExpandedBlockStart.giftypedef struct volatile int counter; } atomic_t;
None.gif
None.gifstatic __inline__ void atomic_inc(atomic_t *v)
ExpandedBlockStart.gif{
InBlock.gif    __asm__ __volatile__(
InBlock.gif        ATOMIC_SMP_LOCK "incl %0"
InBlock.gif        :"=m" (v->counter)
InBlock.gif        :"m" (v->counter));
ExpandedBlockEnd.gif}

None.gif
None.gifstatic __inline__ void atomic_dec(atomic_t *v)
ExpandedBlockStart.gif{
InBlock.gif    __asm__ __volatile__(
InBlock.gif        ATOMIC_SMP_LOCK "decl %0"
InBlock.gif        :"=m" (v->counter)
InBlock.gif        :"m" (v->counter));
ExpandedBlockEnd.gif}


单入单出的循环队列。单入单出的循环队列是一种特殊情况,虽然特殊但是很实用,重要的是它不需要加锁。这里的单入是指只有一个线程向队列里追加数据(push),单出只是指只有一个线程从队列里取数据(pop),循环队列与普通队列相比,不同之处在于它的最大数据储存量是事先固定好的,不能动态增长。尽管有这些限制它的应用还是相当广泛的。这我们介绍一下它的实现:

数据下定义如下:

None.giftypedef struct _FifoRing
ExpandedBlockStart.gif{
InBlock.gif    int r_cursor;
InBlock.gif    int w_cursor;
InBlock.gif    size_t length;
InBlock.gif    void* data[0];
InBlock.gif
ExpandedBlockEnd.gif}
FifoRing;

r_cursor指向队列头,用于取数据(pop)。w_cursor指向队列尾,用于追加数据(push)。length表示队列的最大数据储存量,data表示存放的数据,[0]在这里表示变长的缓冲区(前面我们已经讲过)。


创建函数
None.gifFifoRing* fifo_ring_create(size_t length)
ExpandedBlockStart.gif{
InBlock.gif    FifoRing* thiz = NULL;
InBlock.gif
InBlock.gif    return_val_if_fail(length > 1, NULL);
InBlock.gif
InBlock.gif    thiz = (FifoRing*)malloc(sizeof(FifoRing) + length * sizeof(void*));
InBlock.gif
InBlock.gif    if(thiz != NULL)
ExpandedSubBlockStart.gif    {
InBlock.gif        thiz->r_cursor = 0;
InBlock.gif        thiz->w_cursor = 0;
InBlock.gif        thiz->length   = length;
ExpandedSubBlockEnd.gif    }

InBlock.gif
InBlock.gif    return thiz;
ExpandedBlockEnd.gif}

None.gif

这里我们要求队列的长度大于1而不是大于0,为什么呢?排除长度为1的队列没有什么意义的原因外,更重要的原因是队列头与队列尾重叠 (r_cursor= =w_cursor) 时,到底表示是满队列还是空队列?这个要搞清楚才行,上次一个同事犯了这个错误,让我们查了很久。这里我们认为队列头与队列尾重叠时表示队列为空,这与队列初始状态一致,后面在写的时候始终保留一个空位,避免队列头与队列尾重叠,这样可以消除歧义了。

追加数据(push)

None.gifRet fifo_ring_push(FifoRing* thiz, void* data)
ExpandedBlockStart.gif{
InBlock.gif    int w_cursor = 0;
InBlock.gif    Ret ret = RET_FAIL;
InBlock.gif    return_val_if_fail(thiz != NULL, RET_FAIL);
InBlock.gif
InBlock.gif    w_cursor = (thiz->w_cursor + 1) % thiz->length;
InBlock.gif
InBlock.gif    if(w_cursor != thiz->r_cursor)
ExpandedSubBlockStart.gif    {
InBlock.gif        thiz->data[thiz->w_cursor] = data;
InBlock.gif        thiz->w_cursor = w_cursor;
InBlock.gif
InBlock.gif        ret = RET_OK;
ExpandedSubBlockEnd.gif    }

InBlock.gif
InBlock.gif    return ret;
ExpandedBlockEnd.gif}

None.gif

队列头和队列尾之间还有一个以上的空位时就追加数据,否则返回失败。

取数据(pop)

None.gifRet fifo_ring_pop(FifoRing* thiz, void** data)
ExpandedBlockStart.gif{
InBlock.gif    Ret ret = RET_FAIL;
InBlock.gif    return_val_if_fail(thiz != NULL && data != NULL, RET_FAIL);
InBlock.gif
InBlock.gif    if(thiz->r_cursor != thiz->w_cursor)
ExpandedSubBlockStart.gif    {
InBlock.gif        *data = thiz->data[thiz->r_cursor];
InBlock.gif        thiz->r_cursor = (thiz->r_cursor + 1)%thiz->length;
InBlock.gif
InBlock.gif        ret = RET_OK;
ExpandedSubBlockEnd.gif    }

InBlock.gif
InBlock.gif    return ret;
ExpandedBlockEnd.gif}

None.gif

队列头和队列尾不重叠表示队列不为空,取数据并移动队列头。



      单写多读的无锁数据结构
      
单写表示只有一个线程去修改共享数据结构,多读表示有多个线程去读取共享数据结构。前面介绍的读写锁可以有效的解决这个问题,但更高效的办法是使用无锁数据结构。思路如下:

就像为了避免显示闪烁而使用的双缓冲一样,我们使用两份数据结构,一份数据结构用于读取,所有线程都可以在不加锁的情况下读取这个数据结构。另外一份数据结构用于修改,由于只有一个线程会修改它,所以也不用加锁。

在修改之后,我们再交换读/写的两个函数结构,把另外一份也修改过来,这样两个数据结构就一致了。在交换时要保证没有线程在读取,所以我们还需要一个读线程的引用计数。现在我们看看怎么把前面写的双向链表改为单写多读的无锁数据结构。

为了保证交换是原子的,我们需要一个新的原子操作CAS(compare and swap)。

None.gif#define CAS(_a, _o, _n)                                    \
ExpandedBlockStart.gif({ __typeof__(_o) __o = _o;                                \
InBlock.gif   __asm__ __volatile__(                                   \
InBlock.gif       "lock cmpxchg %3,%1"                                \
InBlock.gif       : "=a" (__o), "=m" (*(volatile unsigned int *)(_a)) \
InBlock.gif       :  "0" (__o), "r" (_n) );                           \
InBlock.gif   __o;                                                    \
ExpandedBlockEnd.gif}
)

数据结构

None.giftypedef struct _SwmrDList
ExpandedBlockStart.gif{
InBlock.gif    atomic_t rd_index_and_ref;
InBlock.gif    DList* dlists[2];
ExpandedBlockEnd.gif}
SwmrDList;

两个链表,一个用于读一个用于写。rd_index_and_ref的最高字节记录用于读取的双向链表的索引,低24位用于记录读取线程的引用记数,最大支持16777216个线程同时读取,应该是足够了,所以后面不考虑它的溢出。

读取操作

None.gifint      swmr_dlist_find(SwmrDList* thiz, DListDataCompareFunc cmp, void* ctx)
ExpandedBlockStart.gif{
InBlock.gif    int ret = 0;
InBlock.gif    return_val_if_fail(thiz != NULL && thiz->dlists != NULL, -1);
InBlock.gif
InBlock.gif    atomic_inc(&(thiz->rd_index_and_ref));
InBlock.gif    size_t rd_index = (thiz->rd_index_and_ref.counter>>24) & 0x1;
InBlock.gif    ret = dlist_find(thiz->dlists[rd_index], cmp, ctx);
InBlock.gif    atomic_dec(&(thiz->rd_index_and_ref));
InBlock.gif
InBlock.gif    return ret;
ExpandedBlockEnd.gif}

修改操作

None.gifRet swmr_dlist_insert(SwmrDList* thiz, size_t index, void* data)
ExpandedBlockStart.gif{
InBlock.gif    Ret ret = RET_FAIL;
InBlock.gif    DList* wr_dlist = NULL;
InBlock.gif    return_val_if_fail(thiz != NULL && thiz->dlists != NULL, ret);
InBlock.gif
InBlock.gif    size_t wr_index = !((thiz->rd_index_and_ref.counter>>24) & 0x1);
InBlock.gif    if((ret = dlist_insert(thiz->dlists[wr_index], index, data)) == RET_OK)
ExpandedSubBlockStart.gif    {
InBlock.gif        int rd_index_old = thiz->rd_index_and_ref.counter & 0xFF000000;
InBlock.gif        int rd_index_new = wr_index << 24;
InBlock.gif
InBlock.gif        do
ExpandedSubBlockStart.gif        {
InBlock.gif            usleep(100);
ExpandedSubBlockEnd.gif        }
while(CAS(&(thiz->rd_index_and_ref), rd_index_old, rd_index_new));
InBlock.gif
InBlock.gif        wr_index = rd_index_old>>24;
InBlock.gif        ret = dlist_insert(thiz->dlists[wr_index], index, data);
ExpandedSubBlockEnd.gif    }

InBlock.gif
InBlock.gif    return ret;
ExpandedBlockEnd.gif}

先修改用于修改的双向链表,修改完成之后等到没有线程读取时,交换读/写两个链表,再修改另一个链表,此时两个链表状态保持一致。

稍做改进,对修改的操作进行加锁,就可以支持多读多写的数据结构,读是无锁的,写是加锁的。


      真正的无锁数据结构
      Andrei Alexandrescu的《Lock-FreeDataStructures》估计是这方面最经典的论文了,对他的方法我开始感到惊奇后来感到失望,惊奇的是算法的巧妙,失望的是无锁的限制和代价。作者最后说这种数据结构只适用于WRRMBNTM(Write-Rarely-Read-Many -But-Not-Too-Many)的情况。而且每次修改都要拷贝整个数据结构(甚至多次),所以不要指望这种方法能带来多少性能上的提高,唯一的好处是能避免加锁带来的部分副作用。有兴趣的朋友可以看下这篇论文,这里我就不重复了。

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
数据结构-B+tree
B+ 树是 B 树的扩展,它允许高效的插入、删除和搜索操作。 在 B 树中,键和记录都可以存储在内部节点和叶节点中。而在 B+ 树中,记录(数据)只能存储在叶子节点上,而内部节点只能存储键值。 B+树的叶子节点以单链表的形式链接在一起,使搜索查询更加高效。 B+树用于存储无法存储在主存储器中的大量数据。由于主存的大小总是有限的,B+树的内部节点(访问记录的键)存储在主存中,而叶节点存储在辅助内存中。
43 0
x3d
Free
Free无论是译作“自由”还是“免费”对于使用者来说似乎都有好处,但当自己处于开发者或开发厂商的角度,可能就是“双重标准”了,毕竟没有几个人能像Linus那样Just For Fun,也没有人能像Richard Matthew Stallman那样发明“Copyleft”;大家要吃饭。
732 0
Form_Form树形结构HTree的开发(案例)
2014-06-09 Created By BaoXinjian 一、摘要 Oracle Developer 6.0以上版本提供了hierarchy tree(层次树)的概念,htree控件非常方便,只需要少量的编程即可实现显示层次结构的目的。
690 0
DIOCP开源项目-Delphi高性能无锁队列(lock-free)
最近想在DIOCP中加入任务调度线程,DIOCP的工作线程作为生产者(producer)将接受到的数据对象,投递到任务调度线程中,然后统一进行分配。然而这一切都需要一个队列, 这几天都在关注无锁队列。   [队列] 首先是一个队列,简单的队列就是,生产者把数据压入队列(push), 消费者通过队列Pop出数据进行处理。
1253 0
+关注
杨粼波
网游的老兵
1151
文章
0
问答
文章排行榜
最热
最新
相关电子书
更多
低代码开发师(初级)实战教程
立即下载
阿里巴巴DevOps 最佳实践手册
立即下载
冬季实战营第三期:MySQL数据库进阶实战
立即下载