写在前面
在看无锁队列之前,我们先来看看看队列的操作。队列是一种非常重要的数据结构,其特性是先进先出(FIFO),符合流水线业务流程。在进程间通信、网络通信间经常采用队列做缓存,缓解数据处理压力。根据队列操作的场景可分为以下4种:
无锁队列应用场景:
如果你的业务,数据量不大,一秒只需要处理几百上千的数据,就没必要用无锁队列了。当需要处理的数据非常多,比如,每秒需要处理几十万条数据时,可以考虑用无锁队列。
一、有锁和无锁
实际上有锁和无锁,就是我们平时所说的乐观锁和悲观锁:
- 悲观锁:一种悲观的加锁策略,它认为每次访问共享资源的时候,总会发生冲突,所以宁愿牺牲性能(时间)来保证数据安全;
- 乐观锁:是一种乐观的策略,它假设线程访问共享资源不会发生冲突,所以不需要加锁,因此线程将不断执行,不需要停止。一旦碰到冲突,就重试当前操作直到没有冲突为止。
无锁通过(CAS Compare And Swap)技术来实现,CAS是原⼦操作的⼀种,可⽤于在多线程编程中实现不被打断的数据交换操作,从⽽避免多线程同时改写某⼀数据时由于执⾏顺序不确定性以及中断的不可预知性产⽣的数据不⼀致问题。
bool CAS( int * pAddr, int nExpected, int nNew ) { if ( *pAddr == nExpected ) { *pAddr = nNew ; return true ; } else{ return false ; } }
工作原理:将pAddr地址中的元素与nExpected比较,如果相等,则更新pAddr的值为nNew,并返回true;否则返回false。
二、无锁队列的优势
上面我们提到,在数据量小的时候直接采用加锁的方式,实现资源的排他性访问即可。但是加锁的缺点也很明显:
- CPU会将大量的时间用在锁的维护上,而不是数据处理;
- 在线程之间切换的时候,导致Cache中的数据失效。CPU访问Cache的速度是远大于内存的,所以需要尽量减少线程频繁切换 。
三、无锁队列实现
3.1 zmq实现
zmq实现无锁队列,只要在ypipe.hpp 和 yqueue.hpp 两个文件,适用于一读一写的场景。
3.1.1 yqueue.hpp
template <typename T, int N> class yqueue_t { ...... private: // Individual memory chunk to hold N elements. // 链表结点称之为chunk_t struct chunk_t { T values[N]; //每个chunk_t可以容纳N个T类型的元素,以后就以一个chunk_t为单位申请内存 chunk_t *prev; chunk_t *next; }; chunk_t *begin_chunk; // 链表头结点 int begin_pos; // 起始点 chunk_t *back_chunk; // 队列中最后一个元素所在的链表结点 int back_pos; // 尾部 chunk_t *end_chunk; // 拿来扩容的,总是指向链表的最后一个结点 int end_pos; atomic_ptr_t<chunk_t> spare_chunk; //空闲块(把所有元素都已经出队的chunk,称为空闲块),读写线程的共享变量 };
yqueue_t 结构,内部由⼀个⼀个chunk组成,每个chunk保存N个元素。chunk之间通过指针连接。
当队列空间不⾜时每次分配⼀个chunk_t,每个chunk_t能存储N个元素。
在数据出队列后,队列有多余空间的时候,回收的chunk也不是⻢上释放,⽽是根据局部性原理先回收到spare_chunk⾥⾯,当再次需要分配chunk_t的时候从spare_chunk中获取。
yqueue_t内部有三个chunk_t类型指针以及对应的索引位置:
- begin_chunk/begin_pos:begin_chunk⽤于指向队列头的chunk,begin_pos⽤于指向队列第⼀个元素在当前chunk中的位置。
- back_chunk/back_pos:back_chunk⽤于指向队列尾的chunk,back_po⽤于指向队列最后⼀个元素在当前chunk的位置。
- end_chunk/end_pos:由于chunk是批量分配的,所以end_chunk⽤于指向分配的最后⼀个chunk位置
这⾥特别需要注意区分back_chunk/back_pos和end_chunk/end_pos的作⽤:
- back_chunk/back_pos:对应的是元素存储位置;
- end_chunk/end_pos:决定是否要分配chunk或者回收chunk。
yqueue_t 构造函数
inline yqueue_t() { begin_chunk = (chunk_t *)malloc(sizeof(chunk_t)); alloc_assert(begin_chunk); begin_pos = 0; back_chunk = NULL; //back_chunk总是指向队列中最后一个元素所在的chunk,现在还没有元素,所以初始为空 back_pos = 0; end_chunk = begin_chunk; //end_chunk总是指向链表的最后一个chunk end_pos = 0; }
说明:
采用chunk的机制,减少内存分配次数,采用spark_chunk的机制回收读完的chunk,充分利用局部性原理,提升性能。
end_chunk 总是指向最后分配的chunk,刚分配出来的chunk,end_pos也是0
back_chunk 在插入元素时才会指向对应的chunk,初始化的是是指向NULL的。
ront、back函数
inline T &front() // 返回的是引⽤,是个左值,调⽤者可以通过其修改容器的值 { return begin_chunk->values[begin_pos]; // 队列⾸个chunk对应的的begin_pos } inline T &back() // 返回的是引⽤,是个左值,调⽤者可以通过其修改容器的值 { return back_chunk->values[back_pos]; }
说明:
我们可以通过 begin_chunk->values[begin_pos] 获取到队列的头部,以及back_chunk->values[back_pos]获取到队列的尾部
Push函数
每次调用push,会更新 back_chunk 和 back_pos ,以及根据end_pos的值,决定是否需要重新分配chunk。
inline void push() { back_chunk = end_chunk; back_pos = end_pos; // if (++end_pos != N) //end_pos!=N表明这个chunk节点还没有满 return; chunk_t *sc = spare_chunk.xchg(NULL); // 为什么设置为NULL? 因为如果把之前值取出来了则没有spare chunk了,所以设置为NULL if (sc) // 如果有spare chunk则继续复用它 { end_chunk->next = sc; sc->prev = end_chunk; } else // 没有则重新分配 { // static int s_cout = 0; // printf("s_cout:%d\n", ++s_cout); end_chunk->next = (chunk_t *)malloc(sizeof(chunk_t)); // 分配一个chunk alloc_assert(end_chunk->next); end_chunk->next->prev = end_chunk; } end_chunk = end_chunk->next; end_pos = 0; }
重新分配规则如下:
- 如果 ++end_pos != N 说明当前chunk还有空间,直接返回
- 如果++end_pos == N 说明,当前chunk只有N-1的位置可用,需要再按分配一个chunk。 这个chunk 会先尝试从spare_chunk获取,如果spare_chunk为NULL,则需要重新分配。
pop函数
pop的时候begin_pos就会++,也就是会更新front的位置。当chunk 中的所有元素都被取出才会触发chunk回收机制。spare_chunk的操作要求是原子操作,因为读写线程都会访问spare_chunk。
inline void pop() { if (++begin_pos == N) // 删除满一个chunk才回收chunk { chunk_t *o = begin_chunk; begin_chunk = begin_chunk->next; begin_chunk->prev = NULL; begin_pos = 0; // 'o' has been more recently used than spare_chunk, // so for cache reasons we'll get rid of the spare and // use 'o' as the spare. chunk_t *cs = spare_chunk.xchg(o); //由于局部性原理,总是保存最新的空闲块而释放先前的空闲快 free(cs); } }
3.2 ypipe_t
inline ypipe_t() { // Insert terminator element into the queue. queue.push(); //yqueue_t的尾指针加1,开始back_chunk为空,现在back_chunk指向第一个chunk_t块的第一个位置 // Let all the pointers to point to the terminator. // (unless pipe is dead, in which case c is set to NULL). r = w = f = &queue.back(); //就是让r、w、f、c四个指针都指向这个end迭代器 c.set(&queue.back()); }
在ypipe对象创建的时候,更新yqueue的back_chunk的值,以及初始化 r w f c四个指针。
write函数
write的incomplete_标志位决定要不要更新f的位置
inline void write(const T &value_, bool incomplete_) { // Place the value to the queue, add new terminator element. queue.back() = value_; queue.push(); // Move the "flush up to here" poiter. if (!incomplete_) { f = &queue.back(); // 记录要刷新的位置 // printf("1 f:%p, w:%p\n", f, w); } else { // printf("0 f:%p, w:%p\n", f, w); } }
flush函数
flush的核心是更新c指针和w指针的位置
inline bool flush() { // If there are no un-flushed items, do nothing. if (w == f) // 不需要刷新,即是还没有新元素加入 return true; // Try to set 'c' to 'f'. // read时如果没有数据可以读取则c的值会被置为NULL,如果c==null 说明read线程在 休眠,可以安全的设置c的值 if (c.cas(w, f) != w) // 尝试将c设置为f,即是准备更新w的位置 { // Compare-and-swap was unseccessful because 'c' is NULL. // This means that the reader is asleep. Therefore we don't // care about thread-safeness and update c in non-atomic // manner. We'll return false to let the caller know // that reader is sleeping. c.set(f); // 更新w的位置 w = f; return false; //线程看到flush返回false之后会发送一个消息给读线程,这需要写业务去做处理 } else // 读端还有数据可读取 { // Reader is alive. Nothing special to do now. Just move // the 'first un-flushed item' pointer to 'f'. w = f; // 只需要更新w的位置 return true; } }
read函数
read会先采取预读的机制,判断有无数据可读,可读就填充到value_。核心就是 check_read函数。主要通过比较c和队头的位置判断是否有数据可读,如果可读返回的就是flush后的f的位置。
inline bool check_read() { // Was the value prefetched already? If so, return. if (&queue.front() != r && r) //判断是否在前几次调用read函数时已经预取数据了return true; return true; // There's no prefetched value, so let us prefetch more values. // Prefetching is to simply retrieve the // pointer from c in atomic fashion. If there are no // items to prefetch, set c to NULL (using compare-and-swap). // 两种情况 // 1. 如果c值和queue.front()相等,返回旧c值,将c值置为NULL,说明此时没有数据可读 // 当c==&queue.front()时,代表数据被取完了,这时把c指向NULL,接着读线程会睡眠,这也是给写线程检查读线程是否睡眠的标志。 // 2. 如果c值和queue.front()不相等, 直接返回c值,此时可能有数据度的去 r = c.cas(&queue.front(), NULL); //尝试预取数据 // If there are no elements prefetched, exit. // During pipe's lifetime r should never be NULL, however, // it can happen during pipe shutdown when items are being deallocated. if (&queue.front() == r || !r) //判断是否成功预取数据 return false; // There was at least one value prefetched. return true; } inline bool read(T *value_) { // Try to prefetch a value. if (!check_read()) return false; // There was at least one value prefetched. // Return it to the caller. *value_ = queue.front(); queue.pop(); return true; }
小结
flush 会更新c 和 w 指针的位置为当前已经写入成功的位置。 read操作会通过比较c 和 front的关系,如果有就预读一部分元素,直至全部读完。在没有数据可读的时候,读会休眠。 注意c指针,读写线程都会访问到,所以需要CAS操作保证线程安全。
3.2 循环数组完整代码
ArrayLockFreeQueue.h
#ifndef _ARRAYLOCKFREEQUEUE_H___ #define _ARRAYLOCKFREEQUEUE_H___ #include <stdint.h> #ifdef _WIN64 #define QUEUE_INT int64_t #else #define QUEUE_INT unsigned long #endif #define ARRAY_LOCK_FREE_Q_DEFAULT_SIZE 65535 // 2^16 template <typename ELEM_T, QUEUE_INT Q_SIZE = ARRAY_LOCK_FREE_Q_DEFAULT_SIZE> class ArrayLockFreeQueue { public: ArrayLockFreeQueue(); virtual ~ArrayLockFreeQueue(); QUEUE_INT size(); // 队列大小 bool enqueue(const ELEM_T &a_data); // 入队列 bool dequeue(ELEM_T &a_data); // 出队列 bool try_dequeue(ELEM_T &a_data); // 尝试入队列 private: ELEM_T m_thequeue[Q_SIZE]; volatile QUEUE_INT m_count; volatile QUEUE_INT m_writeIndex; volatile QUEUE_INT m_readIndex; volatile QUEUE_INT m_maximumReadIndex; inline QUEUE_INT countToIndex(QUEUE_INT a_count); }; #include "ArrayLockFreeQueueImp.h" #endif
三种不同下标的含义:
- m_writeIndex;//新元素⼊列时存放位置在数组中的下标
- m_readIndex;/ 下⼀个出列的元素在数组中的下标
- m_maximumReadIndex; //最后⼀个已经完成⼊列操作的元素在数组中的下标。如果它的值跟m_writeIndex不⼀致,表明有写请求尚未完成。这意味着,有写请求成功申请了空间但数据还没完全写进队列。所以如果有线程要读取,必须要等到写线程将数据完全写⼊到队列之后。
以上3种下标都是必须的,因为队列允许任意数量的⽣产者和消费者围绕着它⼯作。已经存在⼀种基于循环数组的⽆锁队列,使得唯⼀的⽣产者和唯⼀的消费者可以良好的⼯作[11]。它的实现相当简洁⾮常值得阅读。
该程序使⽤gcc内置的__sync_bool_compare_and_swap,但重新做了宏定义封装。
#define CAS(a_ptr, a_oldVal, a_newVal) __sync_bool_compare_and_swap(a_ptr, a_oldVal, a_newVal)
ArrayLockFreeQueueImp.h
#ifndef _ARRAYLOCKFREEQUEUEIMP_H___ #define _ARRAYLOCKFREEQUEUEIMP_H___ #include "ArrayLockFreeQueue.h" #include <assert.h> #include "atom_opt.h" template <typename ELEM_T, QUEUE_INT Q_SIZE> ArrayLockFreeQueue<ELEM_T, Q_SIZE>::ArrayLockFreeQueue() : m_writeIndex(0), m_readIndex(0), m_maximumReadIndex(0) { m_count = 0; } template <typename ELEM_T, QUEUE_INT Q_SIZE> ArrayLockFreeQueue<ELEM_T, Q_SIZE>::~ArrayLockFreeQueue() { } template <typename ELEM_T, QUEUE_INT Q_SIZE> inline QUEUE_INT ArrayLockFreeQueue<ELEM_T, Q_SIZE>::countToIndex(QUEUE_INT a_count) { return (a_count % Q_SIZE); // 取余的时候 } template <typename ELEM_T, QUEUE_INT Q_SIZE> QUEUE_INT ArrayLockFreeQueue<ELEM_T, Q_SIZE>::size() { QUEUE_INT currentWriteIndex = m_writeIndex; QUEUE_INT currentReadIndex = m_readIndex; if(currentWriteIndex>=currentReadIndex) return currentWriteIndex - currentReadIndex; else return Q_SIZE + currentWriteIndex - currentReadIndex; } template <typename ELEM_T, QUEUE_INT Q_SIZE> bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::enqueue(const ELEM_T &a_data) { QUEUE_INT currentWriteIndex; // 获取写指针的位置 QUEUE_INT currentReadIndex; // 1. 获取可写入的位置 do { currentWriteIndex = m_writeIndex; currentReadIndex = m_readIndex; if(countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex)) { return false; // 队列已经满了 } // 目的是为了获取一个能写入的位置 } while(!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex+1))); // 获取写入位置后 currentWriteIndex 是一个临时变量,保存我们写入的位置 // We know now that this index is reserved for us. Use it to save the data m_thequeue[countToIndex(currentWriteIndex)] = a_data; // 把数据更新到对应的位置 // 2. 更新可读的位置,按着m_maximumReadIndex+1的操作 // update the maximum read index after saving the data. It wouldn't fail if there is only one thread // inserting in the queue. It might fail if there are more than 1 producer threads because this // operation has to be done in the same order as the previous CAS while(!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1))) { // this is a good place to yield the thread in case there are more // software threads than hardware processors and you have more // than 1 producer thread // have a look at sched_yield (POSIX.1b) sched_yield(); // 当线程超过cpu核数的时候如果不让出cpu导致一直循环在此。 } AtomicAdd(&m_count, 1); return true; } template <typename ELEM_T, QUEUE_INT Q_SIZE> bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::try_dequeue(ELEM_T &a_data) { return dequeue(a_data); } template <typename ELEM_T, QUEUE_INT Q_SIZE> bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::dequeue(ELEM_T &a_data) { QUEUE_INT currentMaximumReadIndex; QUEUE_INT currentReadIndex; do { // to ensure thread-safety when there is more than 1 producer thread // a second index is defined (m_maximumReadIndex) currentReadIndex = m_readIndex; currentMaximumReadIndex = m_maximumReadIndex; if(countToIndex(currentReadIndex) == countToIndex(currentMaximumReadIndex)) // 如果不为空,获取到读索引的位置 { // the queue is empty or // a producer thread has allocate space in the queue but is // waiting to commit the data into it return false; } // retrieve the data from the queue a_data = m_thequeue[countToIndex(currentReadIndex)]; // 从临时位置读取的 // try to perfrom now the CAS operation on the read index. If we succeed // a_data already contains what m_readIndex pointed to before we // increased it if(CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1))) { AtomicSub(&m_count, 1); // 真正读取到了数据,元素-1 return true; } } while(true); assert(0); // Add this return statement to avoid compiler warnings return false; } #endif
运行原理:
一读一写:
- 写数据分为两步(1)分配空间;(2)拷贝数据。当生产者向队列中插入数据时,会先通过WriteIdex来申请空间。MaximumReadIndex指向最后⼀个存放有效数据的位置(也就是实际的队列尾)。一旦空间申请完成,生产者就可以拷贝数据到该位置,然后MaximumReadIndex+1与WriteIdex保持一致。
- 读数据:消费者线程从ReadIndex的位置取数据,然后尝试用CAS将ReadIndex加一,把该元素会被pop出队列
多读多写
- 写数据:多线程与单线程写的区别主要在,可能统一时刻有多个线程往队列中写数据,假设当前有两个线程A、B:A线程申请空间成功WriteIdex+1,还未拷贝数据,线程B又申请了空间WriteIdex+1,此时,WriteIdex已经往后移动了两格。对MaximumReadIndex的递增操作必须严格遵循⼀个顺序:第⼀个⽣产者线程⾸先递增MaximumReadIndex,接着才轮到第⼆个⽣产者。这个顺序必须被严格遵守的原因是,我们必须保证数据被完全拷⻉到队列之后才允许消费者线程将其出列。
- 读数据:消费者线程从ReadIndex的位置取数据,然后尝试用CAS将ReadIndex加一。如果操作成功,元素出队。因为ReadIndex的更新是原子的,所有同一时刻只有一个线程能读到数据,并更新ReadIndex的值。
特殊情况:
现在有⼀个⽣产者正在向队列中添加元素。它已经成功的申请了空间,但尚未完成数据拷⻉。任何其它企图从队列中移除元素的消费者都会发现队列⾮空(因为writeIndex不等于readIndex)。但它不能读取readIndex所指向位置中的数据,因为readIndex与MaximumReadIndex相等。消费者将会在do循环中不断的反复尝试,直到⽣产者完成数据拷⻉增加MaximumReadIndex的值,或者队列变成空(这在多个消费者的场景下会发⽣)