无锁消息队列的实现

简介: 无锁消息队列的实现

1、无锁队列的场景

为什么需要无锁队列,有锁会产生哪些问题?

  • Cache损坏:线程频繁切换引发 Cache trashing
  • 在同步机制上的争抢队列:任务将大量的时间(睡眠、等待、唤醒)浪费在获得保护队列数据的互斥锁,而不是处理队列中的数据上
  • 动态内存分配:当一个任务从堆中分配内存时,会阻塞所有与这个任务共享地址空间的其它任务(进程中的所有线程)

两种实现方式

  • 基于链表:不断分配结点,zeroMQ
  • 基于循环数组:消息队列容量被固定。

2、 zeroMQ 队列的实现

zeroMQ 主要由两部分组成

  • yqueue_t:消息队列, 存储结点元素
  • ypipe_t:控制读写指针,底层控制

ypipe_t 在 yqueue_t 的基础上构建⼀个单写单读的无锁队列。

2.1、zeroMQ 特点

  • 适用于一写一读的应用场景。1个epoll + 线程池里每个线程绑定一个唯一的队列
  • 通过 chunk 机制批量分配结点,减少因为动态分配内存导致的线程间的互斥。
  • 通过 spare_ chunk 机制来降低 chunk 的频繁分配和释放。消息队列水位的局部性原理。
  • 通过预写机制,批量更新写入位置,减少 cas 的调用(同时读写消息队列竞争 cas )。
  • 唤醒机制:读端没有数据可读时可以进入 wait 状态。写端在写入数据时可以根据返回值获知写入数据前消息队列是否为空,如果写入之前为空则可以唤醒读端

2.2、原子操作

template<typename T>
 class atomic_ptr_t {
 public:
     void set(T *ptr_);      // ⾮原⼦操作,设置该指针的值,使用者保证安全
     T *xchg(T *val_);       // 原⼦操作,设置⼀个新的值,然后返回旧的值
     T *cas(T *cmp_, T *val_);// 原⼦操作
 private:
     volatile T *ptr;
 };

cas函数,原子操作,线程安全,比较后交换

//  Perform atomic 'compare and swap' operation on the pointer.
 //  The pointer is compared to 'cmp' argument and if they are equal, 
 //  its value is set to 'val'. Old value of the pointeris returned.
 inline T *cas (T *cmp_, T *val_) {
     T *old;
     __asm__ volatile (
         "lock; cmpxchg %2, %3"
         : "=a" (old), "=m" (ptr)
         : "r" (val_), "m" (ptr), "0" (cmp_)
         : "cc");
     return old;
 }

2.3、yqueue_t

2.3.1、数据结构

template<typename T, int N> // T 队列中元素类型,N 粒度
 class yqueue_t {
 public:
     inline yqueue_t();
     inline ~yqueue_t();
     inline T &front();   // Returns reference to the front element of the queue.
     inline T &back();    // Returns reference to the back element of the queue.
     inline void push();  // Adds an element to the back end of the queue.
     inline void pop();   // Removes an element from the front of the queue.
     inline void unpush() // Removes element from the back end of the queue。
 private:
     // Individual memory chunk to hold N elements.
     struct chunk_t {
         T values[N];
         chunk_t *prev;
         chunk_t *next;
     };
     chunk_t *begin_chunk;
     int begin_pos;
     chunk_t *back_chunk;
     int back_pos;
     chunk_t *end_chunk;
     int end_pos;
     atomic_ptr_t<chunk_t> spare_chunk; //空闲块,读写线程的共享变量
 };

chunk 机制

每次批量分配一批元素,减少内存的分配和释放(解决不断动态内存分配的问题)。yqueue_t 内部由一个个的 chunk 组成,每个chunk 能储存 N 个元素。

每次以 chunk 为单位批量分配一批元素,减少内存的分配和释放,从而解决不断动态内存分配的问题。当队列空间不足时分配一个chunk。

1704879107937.jpg

chunk 结构体

struct chunk_t {
     T values[N];    // 每个 chunk_t 可以容纳 N 个 T 类型的元素
     chunk_t *prev;  // 指向前一个 chunk_t
     chunk_t *next;  // 指向后一个 chunk_t
 };

spare_chunk 机制

已经读取完元素的 chunk 不会立即释放,而是根据局部性原理先回收到 spare_chunk 里面,当再次需要分配 chunk 的时候从 spare_chunk 中获取。spare_chunk 只保存一个最近回收的 chunk,当有新的空闲块时,保存该空闲块释放之前的空闲块。

atomic_ptr_t<chunk_t> spare_chunk;  // 空闲块

1704879118198.jpg

spare_chunk 结构体

队列控制

yqueue_t 内部有三个 chunk_t 类型指针以及对应的索引位置

chunk_t *begin_chunk;
 int begin_pos;       
 chunk_t *back_chunk; 
 int back_pos;         
 chunk_t *end_chunk;   
 int end_pos;
  • begin_chunk/begin_posbegin_chunk总是指向队头的 chunk,begin_pos指向第一个元素在当前 chunk 的位置,可读取的位置
  • back_chunk/back_posback_chunk 总是指向队尾的 chunk,back_pos 指向最后一个元素在当前 chunk 的位置,可写入的位置
  • end_chunk/end_posend_chunk 总是指向最后一个分配的 chunk,决定是否要分配 chunk 或者回收 chunk。注意区分 back_chunk/back_posend_chunk/end_pos 的作用。
  • 若最后一个 chunk 未满, back_chunkend_chunk 均指向最后一个 chunk,back_pos 当前可写入位置,end_pos指向下次可写入的位置,即 (back_pos +1) % N == end_pos
  • 若最后一个 chunk 满了,end_chunk指向新分配的 chunk,back_pos 指向 back_chunk 最后一个元素,end_pos指向 end_chunk 第一个元素,如下图所示

1704879129855.jpg

chunk_t 类型指针

2.3.2、类接口

构造函数

1704879139449.jpg

yqueue_t 构造函数

inline yqueue_t() {
     begin_chunk = (chunk_t *)malloc(sizeof(chunk_t)); // 预先分配 chunk
     alloc_assert(begin_chunk);
     begin_pos = 0;
     back_chunk = NULL; // back_chunk指向队尾元素所在的chunk,初始无元素,为空
     back_pos = 0;
     end_chunk = begin_chunk; // end_chunk指向最后分配的chunk,刚分配出来的chunk
     end_pos = 0;             // end_pos总是为0
 }

front/back 函数

函数返回的是左值引用,可以修改其值

  • begin_chunk->values[begin_pos]:队首可读元素
  • back_chunk->values[back_pos]:队尾可写元素
// 返回队列头部元素的引用,可以通过该引用更新元素,结合 pop 实现出队列操作
 inline T &front() {
     return begin_chunk->values[begin_pos];
 }
 // 返回队列尾部元素的引用,可以通过该引用更新元素,结合 push 实现插入操作。
 inline T &back() {
     return back_chunk->values[back_pos];
 }

push 函数

更新下一次元素写入的位置。

push 操作前,要判断若执行 push 操作后该 chunk 是否还有空间

  • ++end_pos != N,执行 push 操作后还有空间,该 chunk 还有空闲位置写入元素。此时, back_posend_pos 相差一个位置,(back_pos +1) % N == end_pos

无需扩容

  • ++end_pos == N,执行 push 操作后没有空间了。此时,该 chunk 只有 n-1 位置可以写入元素了,需要再分配新的 chunk。
  • 先尝试获取 spare_chunk,若有则直接使用;若没有,则重新分配 chunk
  • 更新 end_chunkend_pos,指向新的 chunk

1704879149116.jpg

需要扩容

//  Adds an element to the back end of the queue.
 inline void push() {
     back_chunk = end_chunk;
     back_pos = end_pos; 
     // 若执行 push 操作后该结点是否还有空间
     // 1、若该 chunk 结点仍有空间,无需扩容
     if (++end_pos != N) return;
     // 2、若该 chunk 结点没有空间了,需要扩容
     // 为什么设置为NULL? 因为如果把 spare chunk 取出则没有了,所以设置为NULL
     chunk_t *sc = spare_chunk.xchg(NULL);
     // 判断是否有 spare_chunk
     // 2.1、如果有spare_chunk,则继续复用它
     if (sc) {
         end_chunk->next = sc;
         sc->prev = end_chunk;
     }
     // 2.2、如果没有spare_chunk,则重新分配
     else {
         end_chunk->next = (chunk_t *)malloc(sizeof(chunk_t)); // 分配一个chunk
         alloc_assert(end_chunk->next);
         end_chunk->next->prev = end_chunk;  
     }
     // 更新 end_chunk 和 end_pos 
     end_chunk = end_chunk->next;
     end_pos = 0;
 }

pop 函数

更新下一次读取的位置

pop 操作前,需要判断执行 pop 操作后是否需要释放该 chunk(其中的所有元素是否已经取出)

  • ++begin_pos != N,还有元素没有取出,还要继续被使⽤;
  • ++begin_pos == N,所有元素都已经取出,需要回收该 chunk。先把最后回收的 chunk 保存到 spare_chunk,然后释放上一次保存的chunk。
//  Removes an element from the front end of the queue.
 inline void pop() {
     // 判断是否需要释放该 chunk,删除满一个chunk才会回收chunk
     if (++begin_pos == N) {
         chunk_t *o = begin_chunk;
         // 重新设置当前 chunk
         begin_chunk = begin_chunk->next;
         begin_chunk->prev = NULL;
         begin_pos = 0;
         // spare_chunk 只保留一个 chunk
         // 由于局部性原理,总是保存最新的空闲块而释放先前的空闲快
         chunk_t *cs = spare_chunk.xchg(o); 
         free(cs);
     }
 }

注意:

  • pop 掉的元素,由调用者负责销毁
  • 空闲块 spare_chunk 的保存,要求是原子操作。这是因为空闲块是读写线程的共享变量

2.4、ypipe_t

template<typename T, int N>
 class ypipe_t {
 public:
     inline ypipe_t();
     inline virtual ~ypipe_t();
     inline void write(const T &value_, bool incomplete_);
     inline bool unwrite(T *value_);
     inline bool flush();
     inline bool check_read();
     inline bool read(T *value_);
     inline bool probe(bool (*fn)(T &));
 protected:
     yqueue_t<T, N> queue;
     T *w;
     T *r;
     T *f;
     atomic_ptr_t<T> c;
     ypipe_t(const ypipe_t &);
     const ypipe_t &operator=(const ypipe_t &);
 };

2.4.1、数据结构

重点在于控制读写的指针,个人感觉英文注释有助于理解,就贴在这儿了。

// Allocation-efficient queue to store pipe items.
 // Front of the queue points to the first prefetched item, back of
 // the pipe points to last un-flushed item. Front is used only by
 // reader thread, while back is used only by writer thread.
 yqueue_t<T, N> queue;
 // Points to the first un-flushed item. This variable is used
 // exclusively by writer thread.
 T *w;
 // Points to the first un-prefetched item. This variable is used
 // exclusively by reader thread.
 T *r;
 // Points to the first item to be flushed in the future.
 T *f;
 // The single point of contention between writer and reader thread.
 // Points past the last flushed item. If it is NULL,
 // reader is asleep. This pointer should be always accessed using
 // atomic operations.
 atomic_ptr_t<T> c;
  • r: 用来控制可读位置,指向第一个未预取的元素,读线程使用。注意区分,r-1位置可读取,r 位置位未预取,没能取出来则不能读取,读取到 r 位置就不能再读了
  • w: 用来控制是否需要唤醒读端,指向第一个未刷新的元素,写线程使用。当读端没有数据可以读取的时候,将c变量设置为NULL,w 由写端控制,只受 f 修改
  • f: 用来控制写入位置,指向下一轮要被刷新的一批元素中的第一个。当该 f 被更新到 c 的时候,读端才能看到写入的数据
  • c:读写线程共享的指针,指向每⼀轮刷新的起点。当 c == w 时,指向该轮刷新的起点;当 c == NULL 时,无数据可读,表示读线程睡眠(只会在读线程中被设置为空)。

2.4.2、类接口

构造函数

1704879200037.jpg

ypipe_t 构造函数

//  Initialises the pipe.
 inline ypipe_t() {
     //  Insert terminator element into the queue.
     queue.push(); //yqueue_t的尾指针加1,开始back_chunk为空,现在back_chunk指向第一个chunk_t块的第一个位置
     //  Let all the pointers to point to the terminator.
     //  (unless pipe is dead, in which case c is set to NULL).
     r = w = f = &queue.back(); //就是让r、w、f、c四个指针都指向这个end迭代器
     c.set(&queue.back());
 }

write 函数

向管道批量写入数据,此时还未刷新。参数 incomplete 若为 true,没有写完,希望继续写入。若为 false,则刷新,刷新后才能被读线程看到。

1704879207733.jpg

write

// Write an item to the pipe. Don't flush it yet. If incomplete is
 // set to true the item is assumed to be continued by items
 // subsequently written to the pipe. Incomplete items are never flushed down the stream.
 inline void write(const T &value_, bool incomplete_) {
     //  Place the value to the queue, add new terminator element.
     // 写入数据:通过 back 获取可写位置并写入数据;通过 push 更新下一个可写位置
     queue.back() = value_;
     queue.push();
     //  Move the "flush up to here" poiter.
     // 判断写操作是否已经完成
     // 1、若写入完成,则刷新 flush 指针
     if (!incomplete_) {    
         f = &queue.back(); // 记录要刷新的位置
     }
     // 2、若写入还未完成,不修改 flush 指针,read 没有数据
 }

flush 函数

批量刷新已经写入的数据到管道,并将指针 c, w 更新到指针 f 的位置。

对于写线程独享指针 w 来说,直接更新 w = f 。

对于读写线程共享的指针 c 来说,通过 cas 原子操作来更新 c 的值,做到线程安全,关于 c 值的总结见后文的 read 函数。c.cas(w, f) 比较 c 和 w 的值

  • 若 c == w,读端有数据可读,说明读线程活跃,则原子操作更新 c = f
  • 若 c != w,读端无数据可读,说明读线程休眠,读写线程无竞争,则非原子操作更新 c = f

刷新后,c, w, f 三者的关系:c = w = f

1704879216134.jpg

flush

// Flush all the completed items into the pipe. Returns false if
 // the reader thread is sleeping. In that case, caller is obliged to
 // wake the reader up before using the pipe again.
 // 刷新所有已经完成的数据到管道,返回false说明读线程在休眠,此时需要调用者唤醒读线程。
 // 批量刷新的机制, 写入批量后唤醒读线程;
 inline bool flush() {
     // If there are no un-flushed items, do nothing.
     // 1、没有新元素加入,不需要更新
     if (w == f) {
         return true;
     }
     // Try to set 'c' to 'f'.
     // 尝试将 c 设置为 f
     // 注意:参考 read 函数,读端没有读取到数据,则 c = NULL,
     // cas 操作:比较 c 和 w 是否相等。相等则 c = f,否则不做任何操作。返回旧的 c 值
     // 1、若 c != w,说明读端没有读取到数据(c = NULL), 不做任何操作,返回NULL
     if (c.cas(w, f) != w) {
         //  Compare-and-swap was unseccessful because 'c' is NULL.
         //  This means that the reader is asleep. Therefore we don't
         //  care about thread-safeness and update c in non-atomic
         //  manner. We'll return false to let the caller know
         //  that reader is sleeping.
         c.set(f); // 更新 c,非原子操作(因为读线程睡眠,不存在竞争)
         w = f;    // 更新 w
         return false; // 返回 false 需要唤醒读线程,这需要写业务去做处理
     }
     // 2、若 c == w,读端还有数据可读取,则自动更新 c = f(原子操作),返回c(w)
     else {
         //  Reader is alive. Nothing special to do now. Just move
         //  the 'first un-flushed item' pointer to 'f'.
         w = f;    // 更新 w
         return true;
     }
 }

read 函数

检测是否有数据可读,若有则读取队首元素并 pop 掉

// Reads an item from the pipe. Returns false if there is no value.
 // available.
 inline bool read(T *value_) {
     // Try to prefetch a value.
     // 检测是否有数据可以读取
     if (!check_read())
         return false;
     // There was at least one value prefetched.
     // Return it to the caller.
     // 读取操作:front + pop
     *value_ = queue.front();
     queue.pop();
     return true;
 }

从上述可以看出,关键在于检测是否有数据可以读取 check_read()

check_read 函数

check_read 检测是否有数据可读,若队列为空则尝试去预取数据。

// 尝试预取数据,更新 r = 旧的 c (w=f),分为两种情况
 // 1. 如果此时还没有数据写入,c == &queue.front(), 将 c 置为 NULL
 // 2. 如果此时已经有数据写入,c != &queue.front(),返回 c
 r = c.cas(&queue.front(), NULL); // 尝试预取数据,r 指向可以读取到的位置

预取数据指的是 r = c,而根据上文提到的 flush 函数,有 r = c = w = f,表示从 &queue.front() 到 f 这些位置的数据都被预取出来了。此后每次调用 read 都能取出一块数据,当 c== &queue.front()时,代表数据被取完了,这时将 c 置为 NULL,读线程睡眠,这也是写线程检查读线程是否睡眠的标志。是否真的取到了数据,还要再次判断。

// 判断是否成功预取数据
 // 若队列中无数据可读,r == front,当前队列为空;r == NULL,初始时队列为空
 if (&queue.front() == r || !r) 
     return false;
 return true;

若成功预取数据,则下次 read 操作 check_read 检测是否还有上次预取的数据可读,若有则直接返回 true。若没有数据可读,则再次尝试预取数据。

// 第一次 read 时,r == front,这里判断不成立
 // 判断是否前几次调用 read 函数时已经预取到数据了
 if (&queue.front() != r && r) 
     return true;    // 有数据可以读

下面来看具体例子

1704879225478.jpg

flush

如图所示,第一次调用 readcheck_read 检测到没有数据可读,尝试预取数据成功,更新 r = c = w = f,read 读出数据 A。第二次调用 readcheck_read 检测到仍有数据可读,则 read 读出数据 B。第三次调用 read, check_read 检测到没有数据可读,尝试预取数据失败,更新 r = NULL,读线程休眠,read 返回 false,本次读取失败。

这里总结一下,读写线程共享的指针 c 值的情况

  • NULL:读线程设置,此时意味着已经没有数据可读,读线程在休眠
  • 非零:写线程设置,分两种情况
  • 旧值为 w,cas(w,f) 操作修改为 f,意味着如果旧值为 w,则原子性的修改为 f,表示有更多已被刷新的数据可读。
  • 旧值为 NULL,读线程休眠,因此可以安全的设置为当前 f 指针的位置。

下面为 check_ read 函数的完整代码

// Check whether item is available for reading.
 // 这里面有两个点,一个是检查是否有数据可读,一个是预取
 inline bool check_read() {
     // Was the value prefetched already? If so, return.
     // 第一次 read 时,r == front,这里判断不成立
     // 判断是否在前几次调用 read 函数时已经预取到数据了,return true
     if (&queue.front() != r && r) 
         return true;    // 有数据可以读
     // There's no prefetched value, so let us prefetch more values.
     // Prefetching is to simply retrieve the
     // pointer from c in atomic fashion. If there are no
     // items to prefetch, set c to NULL (using compare-and-swap).
     // 尝试预取数据,更新 r = 旧的 c (w=f),分为两种情况
     // 1. 如果此时还没有数据写入,c == &queue.front(), 将 c 置为 NULL
     // 2. 如果此时已经有数据写入,c != &queue.front(),返回 c 的位置 
     r = c.cas(&queue.front(), NULL); // 尝试预取数据,r 指向可以读取到的位置
     //  If there are no elements prefetched, exit.
     //  During pipe's lifetime r should never be NULL, however,
     //  it can happen during pipe shutdown when items are being deallocated.
     // 判断是否成功预取数据
     // 若队列中无数据可读,r == front,当前队列为空;r == NULL,初始时队列为空
     if (&queue.front() == r || !r) 
         return false;
     //  There was at least one value prefetched.
     return true;
 }

3、基于数组的循环无锁队列

使用循环数组实现的队列,需要牺牲一个存储单元,front 指向队头元素所在位置,rear 指向新元素将要入队的位置(实际上不存储元素)

  • 判断队空:front = rear
  • 判断队满:(rear + 1) % Q_SIZE = front

3.1、数据结构

1704879237200.jpg

循环数组队列

队列的使用了一个循环数组和3个下标:

  • m_writeIndex:指向新元素的将要入队(可写入)的位置。只表示写请求成功并申请空间,并不代表数据已经写入,不能用于读取。
  • m_readIndex:指向待出队(可读出)元素的位置。
  • m_maximumReadIndex:指向存放最后一个有效数据(已经写入)的位置,即可读位置的边界,[readIndex, maximumReadIndex)这一范围内的数据都可以读取。

1704879246143.jpg

三个指针

3.2、入队操作

入队操作流程如下,包含两个 cas 操作

  • 获取可写的位置(cas 操作):元素申请到了空间(入队成功等待写入数据)
  • 写入数据
  • 更新可读的位置(cas 操作):通知消费者数据已经写入可以被读取,该操作保证了生产者线程的按序递增入队

多线程环境下,生产者线程要按序递增完成元素入队操作,原因是必须保证数据被完全拷贝到队列之后才允许消费者线程将其出队。

例如:初始时,队列为空,m_writeIndex = 0, m_readIndex = 0,m_maximumReadIndex = 0,有两个生产者线程。

  • 获取可写位置的 cas 操作中,每次只能有一个线程修改指针。若线程1先执行 cas 操作,成功并更新后 m_writeIndex = 1,线程1 currentWriteIndex = 0,准备写入数据。线程2后执行 cas 操作失败,继续循环。下一轮循环, m_writeIndex = 1,线程2 currentWriteIndex = 1,cas 操作成功更新。这时 m_writeIndex = 2,线程1写入 m_thequeue[0] 的位置,线程2写入 m_thequeue[c1] 的位置,保证每次只有一个生产者线程更新 m_writeIndex + 1。
  • 更新可读位置的 cas 操作中,每次只能有一个线程修改指针。当前 m_maximumReadIndex = 0,线程1 currentWriteIndex = 0,线程2 currentWriteIndex = 1。若线程2先执行 cas 操作,失败并让出 cpu。若线程1执行 cas 操作,成功并更新 m_maximumReadIndex = 1,保证了生产者线程按序更新 m_maximumReadIndex + 1。
template <typename ELEM_T, QUEUE_INT Q_SIZE>
 bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::enqueue(const ELEM_T &a_data) 
 {
     QUEUE_INT currentWriteIndex; 
     QUEUE_INT currentReadIndex;  
     // 1、获取可写入的位置(CAS)
     do 
     {
          // 获取当前读写指针的位置
         currentWriteIndex = m_writeIndex;
         currentReadIndex = m_readIndex;
          // 判断队列是否满了,(write + 1) % Q_SIZE == read   
         if(countToIndex(currentWriteIndex + 1) ==
             countToIndex(currentReadIndex))
         {
             return false;   
         }
     // 获取可写入的位置(CAS),cas 更新失败则继续循环
     } while(!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex+1)));
     // 2、写入数据
     m_thequeue[countToIndex(currentWriteIndex)] = a_data;  
     // 3、更新可读的位置(CAS),多线程环境下生产者线程按序更新可读位置
     while(!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1)))
     {
         sched_yield(); // cas 更新失败后,让出 cpu
     }
     // 原子操作;队列中元素数量+1
     AtomicAdd(&m_count, 1);
     return true;
 }

sched_yield() 操作将 cpu 让给其他线程是很有必要的。

无锁算法和通过阻塞机制同步的算法的一个主要区别在于无锁算法不会阻塞在线程同步上。多线程环境下,多生产者线程向并发的往队列中存放数据,每个生产者线程所执行的 cas 操作都必须严格遵循 FIFO 次序,一个用于申请空间,另一个用于通知消费者数据已经写入完成可以被读取了。例如第一个 cas 操作的执行顺序是线程1,2,3,第二个 cas 操作的执行顺序也必须是线程1,2,3。若线程1执行第二个 cas 操作的时候被抢占,那么线程2,3只能在 cpu 上忙等,所以应当尽快让出 cpu,让线程1先执行。

3.3、出队操作

出队操作流程如下

  • 读取数据
  • 更新可读的位置(cas 操作):元素出队,保证每次只有一个生产者线程更新指针m_readIndex
emplate <typename ELEM_T, QUEUE_INT Q_SIZE>
 bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::dequeue(ELEM_T &a_data)
 {
     QUEUE_INT currentMaximumReadIndex;
     QUEUE_INT currentReadIndex;
     do
     {
          // 获取当前可读位置[m_readIndex,  m_maximumReadIndex)
         currentReadIndex = m_readIndex;
         currentMaximumReadIndex = m_maximumReadIndex;
         // 若队列为空,返回 false
         if(countToIndex(currentReadIndex) ==
             countToIndex(currentMaximumReadIndex))      
         {
             return false;
         }
         // 读取数据
         a_data = m_thequeue[countToIndex(currentReadIndex)]; 
          // 更新可读的位置(CAS)
         if(CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1)))
         {
             AtomicSub(&m_count, 1); // 原子操作,元素-1
             return true;
         }
     } while(true);
     assert(0);
     return false;
 }

3.4、代码实现

// ArrayLockFreeQueue.h
 #ifndef _ARRAYLOCKFREEQUEUE_H___
 #define _ARRAYLOCKFREEQUEUE_H___
 #include <stdint.h>
 #ifdef _WIN64
 #define QUEUE_INT int64_t
 #else
 #define QUEUE_INT unsigned long
 #endif
 #define ARRAY_LOCK_FREE_Q_DEFAULT_SIZE 65535
 template <typename ELEM_T, QUEUE_INT Q_SIZE = ARRAY_LOCK_FREE_Q_DEFAULT_SIZE>
 class ArrayLockFreeQueue
 {
 public:
     ArrayLockFreeQueue();
     virtual ~ArrayLockFreeQueue();
     QUEUE_INT size();
     bool enqueue(const ELEM_T &a_data); 
     bool dequeue(ELEM_T &a_data);
     bool try_dequeue(ELEM_T &a_data);
 private:
     ELEM_T m_thequeue[Q_SIZE];                  
     volatile QUEUE_INT m_count;         
     volatile QUEUE_INT m_writeIndex;    
     volatile QUEUE_INT m_readIndex;     
     volatile QUEUE_INT m_maximumReadIndex;
     inline QUEUE_INT countToIndex(QUEUE_INT a_count);
 };
 #include "ArrayLockFreeQueueImp.h"
 #endif
 // ArrayLockFreeQueueImp.h
 #ifndef _ARRAYLOCKFREEQUEUEIMP_H___
 #define _ARRAYLOCKFREEQUEUEIMP_H___
 #include "ArrayLockFreeQueue.h"
 #include <assert.h>
 #include "atom_opt.h"
 template <typename ELEM_T, QUEUE_INT Q_SIZE>
 ArrayLockFreeQueue<ELEM_T, Q_SIZE>::ArrayLockFreeQueue() :
     m_writeIndex(0),
     m_readIndex(0),
     m_maximumReadIndex(0)
 {
     m_count = 0;
 }
 template <typename ELEM_T, QUEUE_INT Q_SIZE>
 ArrayLockFreeQueue<ELEM_T, Q_SIZE>::~ArrayLockFreeQueue()
 {}
 template <typename ELEM_T, QUEUE_INT Q_SIZE>
 inline QUEUE_INT ArrayLockFreeQueue<ELEM_T, Q_SIZE>::countToIndex(QUEUE_INT a_count)
 {
     return (a_count % Q_SIZE);      
 }
 template <typename ELEM_T, QUEUE_INT Q_SIZE>
 QUEUE_INT ArrayLockFreeQueue<ELEM_T, Q_SIZE>::size()
 {
     QUEUE_INT currentWriteIndex = m_writeIndex;
     QUEUE_INT currentReadIndex = m_readIndex;
     if(currentWriteIndex>=currentReadIndex)
         return currentWriteIndex - currentReadIndex;
     else
         return Q_SIZE + currentWriteIndex - currentReadIndex;
 }
 template <typename ELEM_T, QUEUE_INT Q_SIZE>
 bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::enqueue(const ELEM_T &a_data) 
 {
     QUEUE_INT currentWriteIndex; 
     QUEUE_INT currentReadIndex;
     do 
     {
         currentWriteIndex = m_writeIndex;
         currentReadIndex = m_readIndex;
         if(countToIndex(currentWriteIndex + 1) ==
             countToIndex(currentReadIndex))
         {
             return false;   
         }
     } while(!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex+1)));
     m_thequeue[countToIndex(currentWriteIndex)] = a_data; 
     while(!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1)))
     {
         sched_yield();      
     }
     AtomicAdd(&m_count, 1);
     return true;
 }
 template <typename ELEM_T, QUEUE_INT Q_SIZE>
 bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::try_dequeue(ELEM_T &a_data)
 {
     return dequeue(a_data);
 }
 template <typename ELEM_T, QUEUE_INT Q_SIZE>
 bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::dequeue(ELEM_T &a_data)
 {
     QUEUE_INT currentMaximumReadIndex;
     QUEUE_INT currentReadIndex;
     do
     {
         currentReadIndex = m_readIndex;
         currentMaximumReadIndex = m_maximumReadIndex;
         if(countToIndex(currentReadIndex) ==
             countToIndex(currentMaximumReadIndex))      
         {
             return false;
         }
         a_data = m_thequeue[countToIndex(currentReadIndex)]; 
         if(CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1)))
         {
             AtomicSub(&m_count, 1); // 真正读取到了数据,元素-1
         }
     } while(true);
     assert(0);
     return false;
 }
 #endif

4、参考

相关文章
|
6月前
|
消息中间件
无锁消息队列
无锁消息队列
36 0
|
5月前
|
消息中间件 缓存 网络协议
无锁消息队列的设计实现
无锁消息队列的设计实现
53 2
|
4月前
|
消息中间件 C语言 RocketMQ
消息队列 MQ操作报错合集之出现"Connection reset by peer"的错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
4月前
|
消息中间件 Java C语言
消息队列 MQ使用问题之在使用C++客户端和GBase的ESQL进行编译时出现core dump,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
18天前
|
消息中间件 存储 Kafka
MQ 消息队列核心原理,12 条最全面总结!
本文总结了消息队列的12个核心原理,涵盖消息顺序性、ACK机制、持久化及高可用性等内容。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
|
2月前
|
消息中间件
手撸MQ消息队列——循环数组
队列是一种常用的数据结构,类似于栈,但采用先进先出(FIFO)的原则。生活中常见的排队场景就是队列的应用实例。在数据结构中,队列通常用数组实现,包括入队(队尾插入元素)和出队(队头移除元素)两种基本操作。本文介绍了如何用数组实现队列,包括定义数组长度、维护队头和队尾下标(front 和 tail),并通过取模运算解决下标越界问题。此外,还讨论了队列的空与满状态判断,以及并发和等待机制的实现。通过示例代码展示了队列的基本操作及优化方法,确保多线程环境下的正确性和高效性。
37 0
手撸MQ消息队列——循环数组
|
3月前
|
消息中间件 存储 缓存
一个用过消息队列的人,竟不知为何要用 MQ?
一个用过消息队列的人,竟不知为何要用 MQ?
158 1
|
4月前
|
消息中间件 开发工具 RocketMQ
消息队列 MQ使用问题之一直连接master失败,是什么原因
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
4月前
|
消息中间件 Prometheus 监控
消息队列 MQ使用问题之如何将旧集群的store目录迁移到新集群
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
4月前
|
消息中间件 安全 PHP
消息队列 MQ使用问题之如何获取PHP客户端代码
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。