并发编程(三): 使用C++11实现无锁stack(lock-free stack)-阿里云开发者社区

开发者社区> anzhsoft> 正文

并发编程(三): 使用C++11实现无锁stack(lock-free stack)

简介:
+关注继续查看

前几篇文章,我们讨论了如何使用mutex保护数据及使用使用condition variable在多线程中进行同步。然而,使用mutex将会导致一下问题:

  • 等待互斥锁会消耗宝贵的时间 — 有时候是很多时间。这种延迟会损害系统的scalability。尤其是在现在可用的core越多越多的情况下。
  • 低优先级的线程可以获得互斥锁,因此阻碍需要同一互斥锁的高优先级线程。这个问题称为优先级倒置(priority inversion )
  • 可能因为分配的时间片结束,持有互斥锁的线程被取消调度。这对于等待同一互斥锁的其他线程有不利影响,因为等待时间现在会更长。这个问题称为锁护送(lock convoying)

互斥锁的问题还不只这些。早在1994年10月,John D. Valois 在拉斯维加斯的并行和分布系统系统国际大会上的一篇论文—《Implementing Lock-Free Queues》已经研究了无锁队列的实现,有兴趣的可以拜读一下。

实现无锁数据结构的基础是CAS:Compare & Set,或是 Compare & Swap。CAS用C语言描述的代码(来自Wikipedia Compare And Swap)

int compare_and_swap (int* reg, int oldval, int newval) 
{
  ATOMIC();
  int old_reg_val = *reg;
  if (old_reg_val == oldval) 
     *reg = newval;
  END_ATOMIC();
  return old_reg_val;
}

CAS是个原子操作,保证了如果需要更新的地址没有被他人改动多,那么它可以安全的写入。而这也是我们对于某个数据或者数据结构加锁要保护的内容,保证读写的一致性,不出现dirty data。现在几乎所有的CPU指令都支持CAS的原子操作,X86下对应的是 CMPXCHG 汇编指令现在,我们将使用CAS来实现无锁的stack,然后你就能够理解CAS的用法了。

C++11中CAS实现:

template< class T>
struct atomic
{
public:
bool compare_exchange_weak( T& expected, T desired,
                            std::memory_order success,
                            std::memory_order failure );
bool compare_exchange_weak( T& expected, T desired,
                            std::memory_order success,
                            std::memory_order failure ) volatile;
bool compare_exchange_weak( T& expected, T desired,
                            std::memory_order order =
                                std::memory_order_seq_cst );
bool compare_exchange_weak( T& expected, T desired,
                            std::memory_order order =
                                std::memory_order_seq_cst ) volatile;
bool compare_exchange_strong( T& expected, T desired,
                              std::memory_order success,
                              std::memory_order failure );
bool compare_exchange_strong( T& expected, T desired,
                              std::memory_order success,
                              std::memory_order failure ) volatile;    
bool compare_exchange_strong( T& expected, T desired,
                              std::memory_order order =
                                  std::memory_order_seq_cst );
bool compare_exchange_strong( T& expected, T desired,
                              std::memory_order order =
                                  std::memory_order_seq_cst ) volatile;
...
};
Please refer to http://en.cppreference.com/w/cpp/atomic/atomic/compare_exchange to more information.

对上面的版本进行一下说明。翻译自上述url:

Atomically compares the value stored in *this with the value of expected, and if those are equal, replaces the former with desired (performs read-modify-write operation). Otherwise, loads the actual value stored in*this intoexpected (performs load operation).

自动的比较*this的值和expect的值,如果相等,那么将*this的值替换为desired的值(进行读-修改-写操作)。否则如果不相等,那么将*this的值存到expected处。

伪码就是:

if *this == expected:

    *this = desired;

else:

    expected = *this;
The memory models for the read-modify-write and load operations aresuccess andfailure respectively. In the (2) and (4) versionsorder is used for both read-modify-write and load operations, except thatstd::memory_order_release andstd::memory_order_relaxed are used for the load operation iforder==std::memory_order_acq_rel, ororder==std::memory_order_release respectively.

success对应于read-modify-write的内存模型;failure则对应于失败时的load。对于order = std::memory_order_seq_cst的函数,那么该memory order适用于read-modify-write and load,除非是如果order==std::memory_order_acq_rel,那么load将使用std::memory_order_release;如果order==std::memory_order_release,那么load将使用std::memory_order_relaxed

更多信息memory order请阅读:http://en.cppreference.com/w/cpp/atomic/memory_order

The weak forms (1-2) of the functions are allowed to fail spuriously, that is, act as if*this!= expected even if they are equal. When a compare-and-exchange is in a loop, the weak version will yield better performance on some platforms. When a weak compare-and-exchange would require a loop and a strong one would not, the strong one is preferable.

weak形式允许假失败,该函数直接比较原子对象所封装的值与参数 expected 的物理内容,所以某些情况下,对象的比较操作在使用 operator==() 判断时相等,但 compare_exchange_weak 判断时却可能失败,因为对象底层的物理内容中可能存在位对齐或其他逻辑表示相同但是物理表示不同的值(比如 true 和 2 或 3,它们在逻辑上都表示"真",但在物理上两者的表示并不相同)。可以虚假的返回false(和expected相同)。若本atomic的T值和expected相同则用val值替换本atomic的T值,返回true;若不同则用本atomic的T值替换expected,返回false。  
与compare_exchange_weak 不同, strong版本的 compare-and-exchange 操作不允许(spuriously 地)返回 false,即原子对象所封装的值与参数 expected 的物理内容相同,比较操作一定会为 true。不过在某些平台下,如果算法本身需要循环操作来做检查, compare_exchange_weak 的性能会更好。因此对于某些不需要采用循环操作的算法而言, 通常采用compare_exchange_strong 更好

下面代码部分来自http://en.cppreference.com/w/cpp/atomic/atomic/compare_exchange。

#include <atomic>
#include <string>
#include <iostream>
using namespace std;
template<typename T>
struct node
{
    T data;
    node* next;
    node(const T& data) : data(data), next(nullptr) {}
};

template<typename T>
class stack
{
    std::atomic<node<T>*> head;

 public:
    stack():head(nullptr){}
    void push(const T& data);
    T pop();
}; 
注意在这里添加了stack的构造函数,把head初始化为nullptr。如果不初始化它为nullptr,那么使用链表存储的stack将无法确定终点在哪儿。。。

首先看一下push的实现:

    void push(const T& data)
    {
        node<T>* new_node = new node<T>(data);

        // put the current value of head into new_node->next
        new_node->next = head.load(std::memory_order_relaxed);

        // now make new_node the new head, but if the head
        // is no longer what's stored in new_node->next
        // (some other thread must have inserted a node just now)
        // then put that new head into new_node->next and try again
        while(!head.compare_exchange_weak(new_node->next,
                                          new_node,
                                          std::memory_order_release,
                                          std::memory_order_relaxed))
                ; // the body of the loop is empty
    }
主要是理解这两句:

head.compare_exchange_weak(new_node->next,
                           new_node,
可以简单用一下代码来概括该调用的效果:
if ( head == new_node->new){
     head = new_node;
     return true;
}
else{
    new_node->next = head;
    return false;
}
因此,如果没有其他的线程push,那么head将指向当前的new_node,push完成。否则,说明其他线程push过新数据,那么将当前push的新节点重新放到顶端,此时的head是最新的head。这样,通过CAS,我们可以实现了thread-safe stack。

接下来看一下pop:

    T pop()
    {
        while(1){
            auto result = head.load(std::memory_order_relaxed);
            if (result == nullptr)
              throw std::string("Cannot pop from empty stack");
            if(head.compare_exchange_weak(result,result->next,
                                          std::memory_order_release,
                                          std::memory_order_relaxed))
               return result->data;
        }
    }
我们为什么要限制result != nullptr?因为有可能当前stack仅有一个元素,线程B在pop时被调度,线程A pop成功,那么线程B再pop就会出问题。

其实,上述的pop可以简化,因为result其实在failed时候已经更新为head了。因此简化代码可以是:

  T pop()
    {
      auto result = head.load(std::memory_order_relaxed);
      while( result != nullptr && !head.compare_exchange_weak(result,result->next,
                                          std::memory_order_release,
                                          std::memory_order_relaxed));
      if( result != nullptr)
        return result->data;
      else
        throw std::string("Cannot pop from empty stack");
    }

尊重原创,转载请注明出处: anzhsoft http://blog.csdn.net/anzhsoft/article/details/19125619


参考资料:

1. http://en.wikipedia.org/wiki/Compare-and-swap

2. http://en.wikipedia.org/wiki/Fetch-and-add

3. http://en.cppreference.com/w/cpp/atomic/atomic/compare_exchange

4. http://technet.microsoft.com/zh-cn/hh874698


更多学习:

1. GCC实现 http://www.oschina.net/translate/a-fast-lock-free-queue-for-cpp?cmp

2. GCC实现 http://www.ibm.com/developerworks/cn/aix/library/au-multithreaded_structures2/index.html

陈皓同学的精彩博文: http://coolshell.cn/articles/8239.html

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
【高并发】ThreadLocal学会了这些,你也能和面试官扯皮了!
作者个人研发的在高并发场景下,提供的简单、稳定、可扩展的延迟消息队列框架,具有精准的定时任务和延迟队列处理功能。自开源半年多以来,已成功为十几家中小型企业提供了精准定时调度方案,经受住了生产环境的考验。为使更多童鞋受益,现给出开源框架地址: https://github.com/sunshinelyz/mykit-delay PS: 欢迎各位Star源码,也可以pr你牛逼哄哄的代码。
26 0
Linux下高并发socket最大连接数所受的各种限制
http://blog.csdn.net/guowake/article/details/6615728 1、修改用户进程可打开文件数限制 在Linux平台上,无论编写客户端程序还是服务端程序,在进行高并发TCP连接处理时,最高的并发数量都要受到系统对用户单一进程同时可打开文件数量的限制(这是因为系统为每个TCP连接都要创建一个socket句柄,每个socket句柄同时也是一个文件句柄)。
1787 0
使用loadrunner进行压力测试之----post请求
1. 发送post请求时使用web_submit_data  如: 1 web_submit_data("create",//事务名 2 "Action=http://bizhi.
819 0
解决sitemesh3装饰页面不能使用freemarker标签问题
如题,这个问题其实在sitemesh2中已经很好的解决了,不过在sitemesh3中可能没有解决,所以要自己写代码解决了,下面我先讲下sitemesh2是如何解决的: sitemesh-freemarker com.
883 0
Chromium中多线程及并发技术要点(C/C++)
类别 类 说明 示例 线程机制 Thread (参考:线程模型及应用指南)   MessagePump   MessageQueue   SequencedWorkerPool 它是一个线程池,用于执行需要串行执行的任务请求,这些请求依据不同的Token分组,只在相同组内保证执行顺序。
1292 0
编程实现文件拷贝
面试题 - 编程实现文件拷贝。(这个题目在笔试的时候经常出现,下面的代码给出了两种实现方案) import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.Out
1774 0
C#多线程编程系列(四)- 使用线程池
原文:C#多线程编程系列(四)- 使用线程池 目录 1.1 简介 1.2 在线程池中调用委托 1.3 向线程池中放入异步操作 1.4 线程池与并行度 1.5 实现一个取消选项 1.6 在线程池中使用等待事件处理器及超时 1.7 使用计时器 1.8 使用BackgroundWorker组件 参考书籍 笔者水平有限,如果错误欢迎各位批评指正! 1.1 简介 在本章中,主要介绍线程池(ThreadPool)的使用;在C#中它叫System.Threading.ThreadPool,在使用线程池之前首先我们得明白一个问题,那就是为什么要使用线程池。
846 0
+关注
101
文章
0
问答
文章排行榜
最热
最新
相关电子书
更多
《Nacos架构&原理》
立即下载
《看见新力量:二》电子书
立即下载
云上自动化运维(CloudOps)白皮书
立即下载