无锁CAS_无所队列

简介: 无锁CAS_无所队列

什么是CAS?

⽐较并交换(compare and swap, CAS),是原⼦操作的⼀种,可⽤于在多线程编程中实现不被打断的数据 交换操作,从⽽避免多线程同时改写某⼀数据时由于执⾏顺序不确定性以及中断的不可预知性产⽣的数据 不⼀致问题。 该操作通过将内存中的值与指定数据进⾏⽐较,当数值⼀样时将内存中的数据替换为新的 值。

上⾯的CAS返回bool告知原⼦性交换是否成功。

操作的原⼦性

从上图能看到⼀个i++对应的操作是:

(1)把变量i从内存(RAM)加载到寄存器;

(2)把寄存器的值加1;

(3)把寄存器的值写回内存(RAM)

那如果有多个线程去做i++操作的时候,也就可能导致这样⼀种情况:

总结下上⾯的问题,也就是说我们整个存储的结构如下图:

我们所有的变量⾸先是存储在主存(RAM)上,CPU要去操作的时候⾸先会加载到寄存器,然后才操作, 操作好了才写会主存。 关于CPU和cache的更详细介绍可以参考:https://www.cnblogs.com/jokerjason/p/10711022.html

原⼦操作

对于gcc、g++编译器来讲,它们提供了⼀组API来做原⼦操作:

详细⽂档⻅:https://gcc.gnu.org/onlinedocs/gcc-4.1.1/gcc/Atomic-Builtins.html#Atomic

Builtins

为什么需要⽆锁队列

锁引起的问题

Cache损坏(Cache trashing)

保存和恢复上下⽂的过程中还隐藏了额外的开销:Cache中的数据会失效,因为它缓存的是将被换出任务 的数据,这些数据对于新换进的任务是没⽤的。处理器的运⾏速度⽐主存快N倍,所以⼤量的处理器时间被浪 费在处理器与主存的数据传输上。这就是在处理器和主存之间引⼊Cache的原因。Cache是⼀种速度更快 但容量更⼩的内存(也更加昂贵),当处理器要访问主存中的数据时,这些数据⾸先被拷⻉到Cache中,因为这些数据在不久的将来可能⼜会被处理器访问。Cache misses对性能有⾮常⼤的影响,因为处理器访问 Cache中的数据将⽐直接访问主存快得多。 线程被频繁抢占产⽣的Cache损坏将导致应⽤程序性能下降。

在同步机制上的争抢队列

阻塞不是微不⾜道的操作。它导致操作系统暂停当前的任务或使其进⼊睡眠状态(等待,不占⽤任何的处理器)。直到资源(例如互斥锁)可⽤,被阻塞的任务才可以解除阻塞状态(唤醒)。在⼀个负载较重的应⽤程序中使⽤这样的阻塞队列来在线程之间传递消息会导致严重的争⽤问题。也就是说,任务将⼤量的时间(睡眠,等待,唤醒)浪费在获得保护队列数据的互斥锁,⽽不是处理队列中的数据上。 ⾮阻塞机制⼤展伸⼿的机会到了。任务之间不争抢任何资源,在队列中预定⼀个位置,然后在这个位置上 插⼊或提取数据。这种机制使⽤了⼀种被称之为CAS(⽐较和交换)的特殊操作,这个特殊操作是⼀种特殊的指令,它可以原⼦的完成以下操作:它需要3个操作数m,A,B,其中m是⼀个内存地址,操作将m指向的内存中的内容与A⽐较,如果相等则将B写⼊到m指向的内存中并返回true,如果不相等则什么也不做返回 false。

1 volatile int a;
2 a = 1;
3
4 // this will loop while 'a' is not equal to 1.
5 // If it is equal to 1 the operation will atomically set a to
2 and return true
6 while (!CAS(&a, 1, 2))
7 {
8 ;
9 }

动态内存分配

在多线程系统中,需要仔细的考虑动态内存分配。当⼀个任务从堆中分配内存时,标准的内存分配机制会阻塞所有与这个任务共享地址空间的其它任务(进程中的所有线程)。这样做的原因是让处理更简单,且它⼯作得很好。两个线程不会被分配到⼀块相同的地址的内存,因为它们没办法同时执⾏分配请求。显然线程频繁分配内存会导致应⽤程序性能下降(必须注意,向标准队列或map插⼊数据的时候都会导致堆上的动态内存分配)

⽆锁队列的实现

1  、 适用于1写1读的场景

2  、 通过chunk模式批量分配节点,减少因动态内存分配线程之间的互斥,写线程申请内存,读线程释放内存也会导致动态内存的互斥

3  、通过spare_chunk的作用(消息队列水位局部性原理,一般消息数量在一个位置上下波动) 来降低chunk的频繁分配和释放

4  、通过预写机制,批量更新写入位置,减少cas的调用(同时读写消息队列对于cas是有竞争的)

5  、 巧妙的唤醒机制

       (1) 读端没有数据可读时可以进行wait状态

       (2) 写端在写入数据时可以根据返回值获知写入数据前消息队列是否为空,如果写入之前为空则可以唤醒读端。

参考zmq ⻅源码的ypipe.hpp、yqueue.hpp,这些源码可以在⼯程项⽬使⽤,但要注意,这⾥只⽀持单写单读取的场景。

用法

ypipe_t<int, 100> yqueue; // 设置元素类型为int , 每个chunk为100个元素
// 写入元素为count , false代表这次已经写完数据
yqueue.write(count, false); // enqueue的顺序是无法保证的,我们只能计算enqueue的个数
// flush后读端能看到更新后的数据,返回false(刷新之前队列为空),可以notify唤醒读端
// 返回true(说明队列本身有数据)
// flush才会真正触发cas的调用
yqueue.flush()
//读取数据   返回true:读到元素
//返回false: 消息队列为空,可以让出cup , 或者wait写端唤醒
yqueue.read(&value)
-----------------------------------------------------------
yqueue.write(count1, true); // 写入元素为count1
yqueue.write(count2, true); // true代表这次没写完
yqueue.write(count3, false);// false代表写完了
yqueue.flush(); // flush后读端能看到更新后的数据

重点:

yqueue_t和ypipe_t的数据结构

难点:

ypipe_t r指针的预读机制不好理解,r可以理解为read_end,r并不是读数据的位置索引,⽽是我们可以最多读到哪个位置的索引。读数据的索引位置还是begin_pos。

原⼦操作函数

// This class encapsulates several atomic operations on pointer s.

template<typename T>   class atomic_ptr_t {

public:  inline void set (T *ptr_); //⾮原⼦操作

           inline T *xchg (T *val_); //原⼦操作,设置⼀个新的值,然 后返回旧的值

           inline T *cas (T *cmp_, T *val_);//原⼦操作

private:

           volatile T *ptr;

}

set函数,把私有成员ptr指针设置成参数ptr_的值,不是⼀个原⼦操作,需要使⽤者确保执⾏set过程没有其他线程使⽤ptr的值。

xchg函数,把私有成员ptr指针设置成参数val_的值,并返回ptr设置之前的值。原⼦操作,线程安全。

cas函数,原⼦操作,线程安全,把私有成员ptr指针与参数cmp_指针⽐较: 如果相等,就把ptr设置为参数val_的值,返回ptr设置之前的值; 如果不相等直接返回ptr值。

yqueue_t   (消息队列,节点元素的存储)

类接⼝和变量

// T is the type of the object in the queue.队列中元素的类型
// N is granularity(粒度) of the queue,简单来说就是yqueue_t一个结点可以装载N个T类
//型的元素
template <typename T, int N> class yqueue_t
{
    public:
    inline yqueue_t ();// Create the queue.
    inline ~yqueue_t ();// Destroy the queue.
    inline T &front ();// Returns reference to the front element of the
    queue. If the queue is empty, behaviour is undefined.
    inline T &back ();// Returns reference to the back element of the
    queue.If the queue is empty, behaviour is undefined.
    inline void push ();// Adds an element to the back end of the queue.
    inline void pop ();// Removes an element from the front of the queue.
    inline void unpush ()// Removes element from the back end of the queue。
    //回滚时使用
    private:
                    // Individual memory chunk to hold N elements.
    struct chunk_t
    {
        T values [N];
        chunk_t *prev;
        chunk_t *next;
    };
    chunk_t *begin_chunk;
    int begin_pos;
    chunk_t *back_chunk;
    int back_pos;
    chunk_t *end_chunk;
    int end_pos;
    atomic_ptr_t<chunk_t> spare_chunk; //空闲块(我把所有元素都已经出队的块称为空闲
    //块),读写线程的共享变量
};

push:要先back写入数据,push的时候更新写位置索引。

pop: 先通过front取出来,pop的时候更新读位置索引。

不管是push还是pop都是先找到数据。

数据结构逻辑

yqueue_t的实现,每次批量分配⼀批元素,减少内存的分配和释放(解决不断动态内存分配的问题)。 yqueue_t内部由⼀个⼀个chunk组成,每个chunk保存N个元素。

当队列空间不⾜时每次分配⼀个chunk_t,每个chunk_t能存储N个元素。 在数据出队列后,队列有多余空间的时候,回收的chunk也不是⻢上释放,⽽是根据局部性原理先回收到 spare_chunk⾥⾯,当再次需要分配chunk_t的时候从spare_chunk中获取。

yqueue_t内部有三个chunk_t类型指针以及对应的索引位置:

begin_chunk/begin_pos:begin_chunk⽤于指向队列头的chunk,begin_pos⽤于指向队列第⼀个 元素在当前chunk中的位置。               (维护队列头部)

back_chunk/back_pos:back_chunk⽤于指向队列尾的chunk,back_pos⽤于指向队列最后⼀个元素在当前chunk的位置。也就是当前可写入的位置  (维护队列尾部)

end_chunk/end_pos:由于chunk是批量分配的,所以end_chunk⽤于指向分配的最后⼀个chunk位置。                                                (负责chunk分配和回收)

这⾥特别需要注意区分back_chunk/back_pos和end_chunk/end_pos的作⽤:

back_chunk/back_pos:对应的是元素存储位置;

end_chunk/end_pos:决定是否要分配chunk或者回收chunk。

上图中:

有三块chunk,分别由begin_chunk、back_chunk、end_chunk组成。

begin_pos指向begin_chunk中的第n个元素。

back_pos指向back_chunk的最后⼀个元素。

由于back_pos已经指向了back_chunk的最后⼀个元素,所以end_pos就指向了end_chunk的第⼀个元素。

另外还有⼀个spare_chunk指针,⽤于保存释放的chunk指针,当需要再次分配chunk的时候,会⾸先查看 这⾥,从这⾥分配chunk。这⾥使⽤了原⼦的cas操作来完成,利⽤了操作系统的局部性原理。

yqueue_t构造函数

front、back函数

这⾥的front()或者back()函数,需要注意的返回的是左值引⽤,我们可以修改其值。 对于先进后出队列⽽⾔:

begin_chunk->values[begin_pos]代表队列头可读元素, 读取队列头元素即是读取begin_pos位置的元素;

back_chunk->values[back_pos]代表队列尾可写元素,写⼊元素时则是更新back_pos位置的元素, 要确保元素真正⽣效,还需要调⽤push函数更新back_pos的位置,避免下次更新的时候⼜是更新当前 back_pos位置对应的元素。

push函数

更新下一个元素写入位置,如果end_pos超过chunk的索引位置(==N)则申请一个chunk(先尝试从 spare_chunk获取,如果为空再申请分配全新的chunk

//  Adds an element to the back end of the queue.
    inline void push()
    {
        back_chunk = end_chunk;
        back_pos = end_pos; //
        if (++end_pos != N) //end_pos==N表明这个链表结点已经满了
            return;
        chunk_t *sc = spare_chunk.xchg(NULL); // 为什么设置为NULL? 因为如果把之前值取出来了则没有spare chunk了,所以设置为NULL
        if (sc)                               // 如果有spare chunk则继续复用它
        {
            end_chunk->next = sc;
            sc->prev = end_chunk;
        }
        else // 没有则重新分配
        {
            static int s_cout = 0;
            end_chunk->next = (chunk_t *)malloc(sizeof(chunk_t)); // 分配一个chunk
            alloc_assert(end_chunk->next);
            end_chunk->next->prev = end_chunk;
            // printf("s_cout:%d\n", ++s_cout);
        }
        end_chunk = end_chunk->next;
        end_pos = 0;
    }

这⾥分为两种情况:

第⼀种情况:++end_pos != N,说明当前chunk还有空余的位置可以继续插⼊新元素;

第⼆种情况:++end_pos == N,说明该chunk只有N-1的位置可以写⼊元素了,需要再分配⼀个 chunk空间。

需要新分配chunk时,先尝试从spare_chunk获取,如果获取到则直接使⽤,如果spare_chunk为 NULL则需要重新分配chunk。

最终都是要更新end_chunk和end_pos。

pop函数

这里主要更新下一次读取的位置,并检测是否需要释放chunk(先保存到spare_chunk,然后检测

spare_chunk返回值是否为空,如果返回值不为空说明之前有保存chunk,但我们只能保存一个chunk, 所以把之前的chunk释放掉)

//  Removes an element from the front end of the queue.
    inline void pop()
    {
        if (++begin_pos == N) // 删除满一个chunk才回收chunk
        {
            chunk_t *o = begin_chunk;
            begin_chunk = begin_chunk->next; // 更新begin_chunk位置
            begin_chunk->prev = NULL;
            begin_pos = 0;
            //  'o' has been more recently used than spare_chunk,
            //  so for cache reasons we'll get rid of the spare and
            //  use 'o' as the spare.
            chunk_t *cs = spare_chunk.xchg(o); //由于局部性原理,总是保存最新的空闲块而释放先前的空闲快                  
             //(原子操作)保存新的释放旧的
            free(cs);              
        }
    }

这⾥有两个点需要注意:

1. pop掉的元素,其销毁⼯作交给调⽤者完成,即是pop前调⽤者需要通过front()接⼝读取并进⾏销毁 (⽐如动态分配的对象)。

2. 空闲块的保存,要求是原⼦操作。因为闲块是读写线程的共享变量,因为在push中也使⽤了 spare_chunk。

ypipe_t

ypipe_t在yqueue_t的基础上构建⼀个单写单读的⽆锁队列

最核心的点:

w: 用来控制是否需要唤醒读端,当读端没有数据可以读取的时候,将c变量设置为NULL

f: 用来控制写入位置,当该f被更新到c的时候读端才能看到写入的数据

r: 用来控制可读位置,特别重点注意,这个r不是读位置的索引,而是读位置==r的时候说明已经队

列为空了。

类接口和变量

template <typename T, int N>
 class ypipe_t
{
public:
     inline ypipe_t();
     inline virtual ~ypipe_t();
// 写⼊数据,incomplete参数表示写⼊是否还没完成,在没完成的时候不会修改flush指针,即这部分数据不会让读线程看到。
    inline void write(const T &value_, bool incomplete_);
    inline bool unwrite(T *value_);
// 刷新所有已经完成的数据到管道,返回false意味着读线程在休眠,在这种情况下
//调⽤者需要唤醒读线程。
    inline bool flush();
// 这⾥⾯有两个点,⼀个是检查是否有数据可读,⼀个是预取
    inline bool check_read();
    inline bool read(T *value_);
    inline bool probe(bool (*fn)(T &));
protected:
    yqueue_t<T, N> queue;
    T *w;//指向第⼀个未刷新的元素,只被写线程使⽤
    T *r;//指向第⼀个还没预提取的元素,只被读线程使⽤
    T *f;//指向下⼀轮要被刷新的⼀批元素中的第⼀个
    atomic_ptr_t<T> c;//读写线程共享的指针,指向每⼀轮刷新的起点(看代码的
//时候会详细说)。当c为空时,表示读线程睡眠(只会在读线程中被设置为空)
    ypipe_t(const ypipe_t &);
    const ypipe_t &operator=(const ypipe_t &);
 };

主要变量:

// Points to the first un-flushed item. This variable is used exclusively by writer thread.

T *w;//指向第⼀个未刷新的元素,只被写线程使⽤

// Points to the first un-prefetched item. This variable is used exclusively by reader thread.

T *r;//指向第⼀个还没预提取的元素,只被读线程使⽤

// Points to the first item to be flushed in the future.

T *f;//指向下⼀轮要被刷新的⼀批元素中的第⼀个

// The single point of contention between writer and reader thread.

// Points past the last flushed item. If it is NULL,reader is asleep.

// This pointer should be always accessed using atomic operations.

atomic_ptr_t<T> c;//读写线程共享的指针,指向每⼀轮刷新的起点。当c为空时,表示读线程睡眠(只会在读线程中被设置为空)

主要接⼝:

void write (const T &value, bool incomplete):写⼊数据,incomplete参数表示写⼊是否还没完 成,在没完成的时候不会修改flush指针,即这部分数据不会让读线程看到。

bool flush ():刷新所有已经完成的数据到管道,返回false意味着读线程在休眠,在这种情况下调⽤ 者需要唤醒读线程。

bool read (T *value_):读数据,将读出的数据写⼊value指针中,返回false意味着没有数据可读。

ypipe_t ()初始化

//  Initialises the pipe.
    inline ypipe_t()
    {
        //  Insert terminator element into the queue.
        queue.push();//yqueue_t的尾指针加1,开始back_chunk为空,现在back_chunk指向第一个chunk_t块的第一个位置
        //  Let all the pointers to point to the terminator.
        //  (unless pipe is dead, in which case c is set to NULL).
        r = w = f = &queue.back();//就是让r、w、f、c四个指针都指向这个end迭代器
        c.set(&queue.back());
    }

write函数

// 写入数据,incomplete参数表示写入是否还没完成,在没完成的时候不会修改flush指针,即这部分数据不会让读线程看到。
 inline void write(const T &value_, bool incomplete_)
    {
        //  Place the value to the queue, add new terminator element.
        queue.back() = value_;
        queue.push();
        //  Move the "flush up to here" poiter.
        if (!incomplete_)
            f = &queue.back();      // 记录要刷新的位置
    }

flush函数

主要是将w更新到f的位置,说明已经写到的位置。

//  Flush all the completed items into the pipe. Returns false if
    //  the reader thread is sleeping. In that case, caller is obliged to
    //  wake the reader up before using the pipe again.
    // 刷新所有已经完成的数据到管道,返回false意味着读线程在休眠,在这种情况下调用者需要唤醒读线程。
    // 批量刷新的机制, 写入批量后唤醒读线程;
    // 反悔机制 unwrite
    inline bool flush()
    {
        //  If there are no un-flushed items, do nothing.
        //  w由写端去控制,只受f修改
        if (w == f)     // 不需要刷新,即是还没有新元素加入
            return true;
        //  Try to set 'c' to 'f'.
        // read时如果没有数据可以读取则c的值会被置为NULL,这点非常重要
        // 如果c为NULL,则此时w和null不相等,返回null,null和w不相等 走 return false ,
        // c被设置为f
        if (c.cas(w, f) != w)   // 尝试将c设置为f,即是准备更新w的位置
        {
            //  Compare-and-swap was unseccessful because 'c' is NULL.
            //  This means that the reader is asleep. Therefore we don't
            //  care about thread-safeness and update c in non-atomic
            //  manner. We'll return false to let the caller know
            //  that reader is sleeping.
            c.set(f);
            w = f;
            return false;       // 线程看到flush返回false之后会发送一个消息给读线程,这个是需要写业务去做处理
        }
        //  Reader is alive. Nothing special to do now. Just move
        //  the 'first un-flushed item' pointer to 'f'.
        w = f;
        return true;
    }

read函数

r实际上是用来控制可以读取到的位置(注意不是读到r,而是r的前一位置可以读取,r位置是不可以读 取的),当frontr重叠的时候说明没有数据可以读取。

//  Check whether item is available for reading.
    // 这里面有两个点,一个是检查是否有数据可读,一个是预取
    inline bool check_read()
    {
        //  Was the value prefetched already? If so, return.
        if (&queue.front() != r && r) //判断是否在前几次调用read函数时已经预取数据了return true;
            return true;
        //  There's no prefetched value, so let us prefetch more values.
        //  Prefetching is to simply retrieve the
        //  pointer from c in atomic fashion. If there are no
        //  items to prefetch, set c to NULL (using compare-and-swap).
        // 两种情况
        // 如果此时还没有写入数据,则c和&queue.front()相等,则c被设置为NULL
        // 如果此时已经写入数据,则c和&queue.front()不相等,返回w的位置
        r = c.cas(&queue.front(), NULL);//尝试预取数据,r其实是指向可以读取到的位置
        //,并不是用来表示读索引位置
        //  If there are no elements prefetched, exit.
        //  During pipe's lifetime r should never be NULL, however,
        //  it can happen during pipe shutdown when items
        //  are being deallocated.
        if (&queue.front() == r || !r)//判断是否成功预取数据
            return false;
        //  There was at least one value prefetched.
        return true;
    }
    //  Reads an item from the pipe. Returns false if there is no value.
    //  available.
    inline bool read(T *value_)
    {
        //  Try to prefetch a value.
        if (!check_read()) // 检测有没有数据可以读取
            return false;
        //  There was at least one value prefetched.
        //  Return it to the caller.
        *value_ = queue.front();
        queue.pop();
        return true;
    }

如果指针r指向的是队头元素(r==&queue.front())最核心的一点r不是我们read的位置索引,而是 用来识别我们可以读取到哪个位置就不能再读

或者r没有指向任何元素(NULL)

则说明队列中并没有可读的数据,这个时候check_read尝试去预取数据。所谓的预取就是令 r=c (cas函数就是返回c本身的值,看上⾯关于cas的实现), ⽽c在write中被指向f(⻅上图),这时从queue.front()到f这个位置的数据都被预取出来了,然后每次调⽤read都能取出⼀段。值得注意的是,当c==&queue.front()时,代表数据被取完了,这时把c指向NULL,接着读线程会睡眠,这也是给写线程检查读线程是否睡眠的标志。 继续上⾯写⼊AB数据的场景,第⼀次调⽤read时,会先check_read,把指针r指向指针c的位置(所谓的预取),这时r,c,w,f的关系如下:

如果此时在r = c.cas(&queue.front(), NULL); 执⾏时没有flush的操作。则说明没有数据可以读取, 最终返回false;

如果在r = c.cas(&queue.front(), NULL); 之前写⼊⽅write新数据后并调⽤了flush,则r被更新,最终返回true。

⽽_c指针,则是读写线程都可以操作,因此需要使⽤原⼦的CAS操作来修改,它的可能值有以下⼏种:

NULL:读线程设置,此时意味着已经没有数据可读,读线程在休眠。

⾮零:写线程设置,这⾥⼜区分两种情况:

旧值为_w的情况下,cas(_w,_f)操作修改为_f,意味着如果原先的值为_w,则原⼦性的修改为 _f,表示有更多已被刷新的数据可读。

在旧值为NULL的情况下,此时读线程休眠,因此可以安全的设置为当前_f指针的位置。

基于循环数组的⽆锁队列

重点

ArrayLockFreeQueue数据结构,可以理解为⼀个环形数组;

多线程写⼊时候,m_maximumReadIndex、m_writeIndex索引如何更新

在更新m_maximumReadIndex的时候为什么要让出cpu;

多线程读取的时候,m_readIndex如何更新。

可读位置是由m_maximumReadIndex控制,⽽不是m_writeIndex去控制的。 m_maximumReadIndex的更新由m_writeIndex。

类接⼝和变量

1 template <typename ELEM_T, QUEUE_INT Q_SIZE = ARRAY_LOCK_FREE_Q_D
EFAULT_SIZE>
2 class ArrayLockFreeQueue
3 {
4 public:
5
6 ArrayLockFreeQueue();
7 virtual ~ArrayLockFreeQueue();
8
9 QUEUE_INT size();
10
11 bool enqueue(const ELEM_T &a_data); // ⼊队列
12
13 bool dequeue(ELEM_T &a_data); // 出队列
14
15 bool try_dequeue(ELEM_T &a_data); // 尝试⼊队列
16
17 private:
18
19 ELEM_T m_thequeue[Q_SIZE];
20
21 volatile QUEUE_INT m_count; // 队列的元素个数
22 volatile QUEUE_INT m_writeIndex;//新元素⼊列时存放位置在数组中的下标
23
24 volatile QUEUE_INT m_readIndex;// 下⼀个出列元素在数组中的下标
25
26 volatile QUEUE_INT m_maximumReadIndex; //最后⼀个已经完成⼊列操作
                                          //的元素在数组中的下标
27
28 inline QUEUE_INT countToIndex(QUEUE_INT a_count);
29 };

三种不同下标:

m_count; // 队列的元素个数

m_writeIndex;//新元素⼊列时存放位置在数组中的下标

m_readIndex;/ 下⼀个出列的元素在数组中的下标

m_maximumReadIndex; //最后⼀个已经完成⼊列操作的元素在数组中的下标。如果它的值跟 m_writeIndex不⼀致,表明有写请求尚未完成。这意味着,有写请求成功申请了空间但数据还没完全 写进队列。所以如果有线程要读取,必须要等到写线程将数据完全写⼊到队列之后。

必须指明的是使⽤3种不同的下标都是必须的,因为队列允许任意数量的⽣产者和消费者围绕着它⼯作。已 经存在⼀种基于循环数组的⽆锁队列,使得唯⼀的⽣产者和唯⼀的消费者可以良好的⼯作[11]。它的实现相 当简洁⾮常值得阅读。

该程序使⽤gcc内置的__sync_bool_compare_and_swap,但重新做了宏定义封装。 #define CAS(a_ptr, a_oldVal, a_newVal) __sync_bool_compare_and_swap(a_ptr, a_oldVal, a_newVal)

enqueue⼊队列

template <typename ELEM_T, QUEUE_INT Q_SIZE>
bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::enqueue(const ELEM_T &a_data)
{
  QUEUE_INT currentWriteIndex;    // 获取写指针的位置
  QUEUE_INT currentReadIndex;
  do
  {
    currentWriteIndex = m_writeIndex;
    currentReadIndex = m_readIndex;
    if(countToIndex(currentWriteIndex + 1) ==
      countToIndex(currentReadIndex))
    {
      return false; //队列未空    
    }
  } while(!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex+1)));
  // We know now that this index is reserved for us. Use it to save the data
    // 第一次cas竞争写入位置
  m_thequeue[countToIndex(currentWriteIndex)] = a_data;
  // update the maximum read index after saving the data. It wouldn't fail if there is only one thread 
  // inserting in the queue. It might fail if there are more than 1 producer threads because this
  // operation has to be done in the same order as the previous CAS
  while(!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1)))
  {
     // this is a good place to yield the thread in case there are more
    // software threads than hardware processors and you have more
    // than 1 producer thread
    // have a look at sched_yield (POSIX.1b)
    sched_yield();    // 当线程超过cpu核数的时候如果不让出cpu导致一直循环在此。
  }
  AtomicAdd(&m_count, 1);
  return true;
}

下⾯的图示特别重要,需要耐⼼分析。

以下插图展示了对队列执⾏操作时各下标是如何变化的。如果⼀个位置被标记为X,标识这个位置⾥存放了 数据。空⽩表示位置是空的。对于下图的情况,队列中存放了两个元素。WriteIndex指示的位置是新元素将 会被插⼊的位置。ReadIndex指向的位置中的元素将会在下⼀次pop操作中被弹出。

当⽣产者准备将数据插⼊到队列中,它⾸先通过增加WriteIndex的值来申请空间。MaximumReadIndex指 向最后⼀个存放有效数据的位置(也就是实际的队列尾)。

⼀旦空间的申请完成,⽣产者就可以将数据拷⻉到刚刚申请到的位置中。完成之后增加 MaximumReadIndex使得它与WriteIndex的⼀致。

现在队列中有3个元素,接着⼜有⼀个⽣产者尝试向队列中插⼊元素。

在第⼀个⽣产者完成数据拷⻉之前,⼜有另外⼀个⽣产者申请了⼀个新的空间准备拷⻉数据。现在有两个 ⽣产者同时向队列插⼊数据。

现在⽣产者开始拷⻉数据,在完成拷⻉之后,对MaximumReadIndex的递增操作必须严格遵循⼀个顺序: 第⼀个⽣产者线程⾸先递增MaximumReadIndex,接着才轮到第⼆个⽣产者。这个顺序必须被严格遵守的原因是,我们必须保证数据被完全拷⻉到队列之后才允许消费者线程将其出列。

(while(!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1))) {sched_yield(); } 让出cpu的⽬的也是为了让排在最前⾯的⽣产者完成m_maximumReadIndex的更 新)

第⼀个⽣产者完成了数据拷⻉,并对MaximumReadIndex完成了递增,现在第⼆个⽣产者可以递增 MaximumReadIndex了。

第⼆个⽣产者完成了对MaximumReadIndex的递增,现在队列中有5个元素。

dequeue出队列

template <typename ELEM_T, QUEUE_INT Q_SIZE>
bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::dequeue(ELEM_T &a_data)
{
  QUEUE_INT currentMaximumReadIndex;
  QUEUE_INT currentReadIndex;
  do
  {
     // to ensure thread-safety when there is more than 1 producer thread
        // a second index is defined (m_maximumReadIndex)
    currentReadIndex = m_readIndex;
    currentMaximumReadIndex = m_maximumReadIndex;
    if(countToIndex(currentReadIndex) ==
      countToIndex(currentMaximumReadIndex))
    {
      // the queue is empty or
      // a producer thread has allocate space in the queue but is 
      // waiting to commit the data into it
      return false;
    }
    // retrieve the data from the queue
    a_data = m_thequeue[countToIndex(currentReadIndex)];
    // try to perfrom now the CAS operation on the read index. If we succeed
    // a_data already contains what m_readIndex pointed to before we 
    // increased it
    if(CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1)))
    {
      AtomicSub(&m_count, 1); // 真正读取到了数据
      return true;
    }
  } while(true);
  assert(0);
   // Add this return statement to avoid compiler warnings
  return false;
}

以下插图展示了元素出列的时候各种下标是如何变化的,队列中初始有2个元素。WriteIndex指示的位置是 新元素将会被插⼊的位置。ReadIndex指向的位置中的元素将会在下⼀次pop操作中被弹出。

消费者线程拷⻉数组ReadIndex位置的元素,然后尝试⽤CAS操作将ReadIndex加1。如果操作成功消费者 成功的将数据出列。因为CAS操作是原⼦的,所以只有唯⼀的线程可以在同⼀时刻更新ReadIndex的值。 如果操作失败,读取新的ReadIndex值,以重复以上操作(copy数据,CAS)。

现在⼜有⼀个消费者将元素出列,队列变成空。

现在有⼀个⽣产者正在向队列中添加元素。它已经成功的申请了空间,但尚未完成数据拷⻉。任何其它企 图从队列中移除元素的消费者都会发现队列⾮空(因为writeIndex不等于readIndex)。但它不能读取 readIndex所指向位置中的数据,因为readIndex与MaximumReadIndex相等。消费者将会在do循环中不 断的反复尝试,直到⽣产者完成数据拷⻉增加MaximumReadIndex的值,或者队列变成空(这在多个消费 者的场景下会发⽣)。

当⽣产者完成数据拷⻉,队列的⼤⼩是1,消费者线程可以读取这个数据了.

在多于⼀个⽣产者线程的情况下yielding处理器的必要性

读者可能注意到了enqueue函数中使⽤了sched_yield()来主动的让出处理器,对于⼀个声称⽆锁的算法⽽ ⾔,这个调⽤看起来有点奇怪。正如⽂章开始的部分解释过的,多线程环境下影响性能的其中⼀个因素就 是Cache损坏。⽽产⽣Cache损坏的⼀种情况就是⼀个线程被抢占,操作系统需要保存被抢占线程的上下 ⽂,然后将被选中作为下⼀个调度线程的上下⽂载⼊。此时Cache中缓存的数据都会失效,因为它是被抢 占线程的数据⽽不是新线程的数据。所以,当此算法调⽤sched_yield()意味着告诉操作系统:"我要把处理器时间让给其它线程,因为我要等 待某件事情的发⽣"。⽆锁算法和通过阻塞机制同步的算法的⼀个主要区别在于⽆锁算法不会阻塞在线程同 步上,那么为什么在这⾥我们要主动请求操作系统抢占⾃⼰呢?这个问题的答案没那么简单。它与有多少个⽣产者线程在并发的往队列中存放数据有关:每个⽣产者线程所执⾏的CAS操作都必须严格遵循FIFO次 序,⼀个⽤于申请空间,另⼀个⽤于通知消费者数据已经写⼊完成可以被读取了。

如果我们的应⽤程序只有唯⼀的⽣产者操作这个队列,sche_yield()将永远没有机会被调⽤,第2个CAS操 作永远不会失败。因为在⼀个⽣产者的情况下没有⼈能破坏⽣产者执⾏这两个CAS操作的FIFO顺序。

⽽当多于⼀个⽣产者线程往队列中存放数据的时候,问题就出现了。概括来说,⼀个⽣产者通过第1个CAS 操作申请空间,然后将数据写⼊到申请到的空间中,然后执⾏第2个CAS操作通知消费者数据准备完毕可供 读取了。这第2个CAS操作必须遵循FIFO顺序,也就是说,如果A线程第⾸先执⾏完第⼀个CAS操作,那么 它也要第1个执⾏完第2个CAS操作,如果A线程在执⾏完第⼀个CAS操作之后停⽌,然后B线程执⾏完第1 个CAS操作,那么B线程将⽆法完成第2个CAS操作,因为它要等待A先完成第2个CAS操作。⽽这就是问题 产⽣的根源。让我们考虑如下场景,3个消费者线程和1个消费者线程:

线程1,2,3按顺序调⽤第1个CAS操作申请了空间。那么它们完成第2个CAS操作的顺序也应该与这个 顺序⼀致,1,2,3。

线程2⾸先尝试执⾏第2个CAS,但它会失败,因为线程1还没完成它的第2此CAS操作呢。同样对于线 程3也是⼀样的。

线程2和3将会不断的调⽤它们的第2个CAS操作,直到线程1完成它的第2个CAS操作为⽌。

线程1最终完成了它的第2个CAS,现在线程3必须等线程2先完成它的第2个CAS。

线程2也完成了,最终线程3也完成。

在上⾯的场景中,⽣产者可能会在第2个CAS操作上⾃旋⼀段时间,⽤于等待先于它执⾏第1个CAS操作的 线程完成它的第2次CAS操作。在⼀个物理处理器数量⼤于操作队列线程数量的系统上,这不会有太严重的 问题:因为每个线程都可以分配在⾃⼰的处理器上执⾏,它们最终都会很快完成各⾃的第2次CAS操作。虽 然算法导致线程处理忙等状态,但这正是我们所期望的,因为这使得操作更快的完成。也就是说在这种情 况下我们是不需要sche_yield()的,它完全可以从代码中删除。 但是,在⼀个物理处理器数量少于线程数量的系统上,sche_yield()就变得⾄关重要了。让我们再次考查上 ⾯3个线程的场景,当线程3准备向队列中插⼊数据:如果线程1在执⾏完第1个CAS操作,在执⾏第2个 CAS操作之前被抢占,那么线程2,3就会⼀直在它们的第2个CAS操作上忙等(它们忙等,不让出处理器, 线程1也就没机会执⾏,它们就只能继续忙等),直到线程1重新被唤醒,完成它的第2个CAS操作。这就是 需要sche_yield()的场合了,操作系统应该避免让线程2,3处于忙等状态。它们应该尽快的让出处理器让 线程1执⾏,使得线程1可以把它的第2个CAS操作完成。这样线程2和3才能继续完成它们的操作。

【基于数组的无锁队列(译)】https://zhuanlan.zhihu.com/p/3398573

目录
相关文章
|
8月前
|
应用服务中间件 Linux 调度
锁和原子操作CAS的底层实现
锁和原子操作CAS的底层实现
66 0
|
存储 编译器 API
锁与原子操作CAS
锁与原子操作CAS
157 0
|
7月前
|
Java Linux
如何实现无锁并发:深入理解CAS原理
如何实现无锁并发:深入理解CAS原理
257 2
|
8月前
|
算法 调度 数据安全/隐私保护
什么是CAS锁
什么是CAS锁
100 0
|
8月前
|
算法
原子操作CAS
原子操作CAS
48 0
|
8月前
|
存储 安全 中间件
锁与原子操作CAS的底层实现
锁与原子操作CAS的底层实现
|
8月前
|
缓存 Linux API
原子操作CAS与锁实现
原子操作CAS与锁实现
|
8月前
|
存储 缓存 算法
理解原子操作与CAS锁
理解原子操作与CAS锁
89 0
|
安全 Java C++
CAS自旋锁到底是什么?为什么能实现线程安全?
本文是博主对多线程学习总结记录,希望对大家有所帮助。
1257 0
CAS自旋锁到底是什么?为什么能实现线程安全?
|
安全 算法 架构师
带你了解什么是无锁并发 CAS
带你了解什么是无锁并发 CAS
268 0
带你了解什么是无锁并发 CAS

热门文章

最新文章