无锁队列
为什么需要⽆锁队列
加锁会引起的问题:
- Cache损坏(Cache trashing)
- 在同步机制上的争抢队列
为了解决或者尽量避免这些问题,可以使用无锁队列
Cache损坏(Cache trashing)
锁互斥会导致阻塞,会导致线程切换。
CPU切换线程时在保存和恢复上下⽂的过程中还隐藏了额外的开销:
Cache中的数据会失效,因为它缓存的是将被换出任务的数据,这些数据对于新换进的任务是没⽤的。
处理器的运⾏速度⽐主存快N倍,所以⼤量的处理器时间被浪费在处理器与主存的数据传输上。
这就是在处理器和主存之间引⼊Cache的原因。
Cache是⼀种速度更快但容量更⼩的内存(也更加昂贵),当处理器要访问主存中的数据时,这些数据⾸先被拷⻉到Cache中,因为这些数据在不久的将来可能⼜会被处理器访问。Cache misses对性能有⾮常⼤的影响,因为处理器访问Cache中的数据将⽐直接访问主存快得多。线程被频繁抢占产⽣的Cache损坏将导致应⽤程序性能下降。
在同步机制上的争抢队列
阻塞不是微不⾜道的操作。它导致操作系统暂停当前的任务或使其进⼊睡眠状态(等待,不占⽤任何的处理器)。直到资源(例如互斥锁)可⽤,被阻塞的任务才可以解除阻塞状态(唤醒)。
在⼀个负载较重的应⽤程序中使⽤这样的阻塞队列来在线程之间传递消息会导致严重的争⽤问题。
任务将⼤量的时间(睡眠,等待,唤醒)浪费在获得保护队列数据的互斥锁,⽽不是处理队列中的数据上。
任务队列此时可以采用⾮阻塞机制来避免这种频繁的加锁场景导致的性能低下。无锁队列也非常适合这种生产者消费者模式下的任务队列场景。
链表实现无锁队列
链表初始化一个队列
InitQueue(Q) { node = new node() node->next = NULL; Q->head = Q->tail = node; }
push队列
- 第一步,把tail指针的next指向要加入的结点。
tail->next = p;
- 第二步,把tail指针移到队尾。
tail = p;
EnQueue(Q, data) //进队列 { //准备新加入的结点数据 n = new node(); n->value = data; n->next = NULL; do { p = Q->tail; //取链表尾指针的快照 } while( CAS(p->next, NULL, n) != TRUE); //while条件注释:如果没有把结点链在尾指针上,再试 CAS(Q->tail, p, n); //置尾结点 tail = n; }
使用CAS实现无锁队列。CAS参考原子操作CAS
有一个潜在的问题——如果T1线程在用CAS更新tail指针的之前,线程停掉或是挂掉了,那么其它线程就进入死循环了。下面是改良版的EnQueue()
EnQueue(Q, data) //进队列改良版 v1 { n = new node(); n->value = data; n->next = NULL; p = Q->tail; oldp = p do { while (p->next != NULL) p = p->next; } while( CAS(p.next, NULL, n) != TRUE); //如果没有把结点链在尾上,再试 CAS(Q->tail, oldp, n); //置尾结点 }
但是改良版也有个问题:让每个线程,自己fetch 指针 p
到链表尾。但是这样的fetch会很影响性能。而且,如果一个线程不断的EnQueue,会导致所有的其它线程都去 fetch 他们的 p
指针到队尾。
能不能不要所有的线程都干同一个事?这样可以节省整体的时间?
比如:直接 fetch Q->tail
到队尾?因为,所有的线程都共享着 Q->tail,所以,一旦有人动了它后,相当于其它的线程也跟着动了,于是,我们的代码可以改进成如下的实现:
EnQueue(Q, data) //进队列改良版 v2 { n = new node(); n->value = data; n->next = NULL; while(TRUE) { //先取一下尾指针和尾指针的next tail = Q->tail; next = tail->next; //如果尾指针已经被移动了,则重新开始 if ( tail != Q->tail ) continue; //如果尾指针的 next 不为NULL,则 fetch 全局尾指针到next if ( next != NULL ) { CAS(Q->tail, tail, next); continue; } //如果加入结点成功,则退出 if ( CAS(tail->next, next, n) == TRUE ) break; } CAS(Q->tail, tail, n); //置尾结点 }
这也是 Java 中的 ConcurrentLinkedQueue
的实现逻辑
pop队列
出列比较简单,如下所示:
DeQueue(Q) //出队列 { do{ p = Q->head; if (p->next == NULL){ return ERR_EMPTY_QUEUE; } while( CAS(Q->head, p, p->next) != TRUE ); return p->next->value; }
如果 p->next和 tail
都指向同一个结点,这意味着队列为空,应该返回 ERR_EMPTY_QUEUE
,但是,在判断 p->next == NULL
时,另外一个EnQueue操作做了一半,此时的 p->next 不为 NULL了,但是 tail 指针还差最后一步,没有更新到新加的结点,这个时候就会出现,在 EnQueue 并没有完成的时候, DeQueue 已经把新增加的结点给取走了,此时,队列为空,但是,p->next与 tail 并没有指向同一个结点。
改进
DeQueue(Q) //出队列,改进版 { while(TRUE) { //取出头指针,尾指针,和第一个元素的指针 head = Q->head; tail = Q->tail; next = head->next; // Q->head 指针已移动,重新取 head指针 if ( head != Q->head ) continue; // 如果是空队列 if ( head == tail && next == NULL ) { return ERR_EMPTY_QUEUE; } //如果 tail 指针落后了 if ( head == tail && next != NULL ) { CAS(Q->tail, tail, next); continue; } //移动 head 指针成功后,取出数据 if ( CAS( Q->head, head, next) == TRUE){ value = next->value; break; } } free(head); //释放老的dummy结点 return value; }
这也是Java 的 ConcurrentLinkedQueue
的 poll
方法的实现逻辑。
用数组实现无锁队列
使用数组来实现队列是很常见的方法,因为没有内存的分部和释放,一切都会变得简单,实现的思路如下:
1)数组队列应该是一个ring buffer形式的数组(环形数组)
2)数组的元素应该有三个可能的值:HEAD,TAIL,EMPTY(当然,还有实际的数据)
3)数组一开始全部初始化成EMPTY,有两个相邻的元素要初始化成HEAD和TAIL,这代表空队列。
4)EnQueue操作。假设数据x要入队列,定位TAIL的位置,使用double-CAS方法把(TAIL, EMPTY) 更新成 (x, TAIL)。需要注意,如果找不到(TAIL, EMPTY),则说明队列满了。
5)DeQueue操作。定位HEAD的位置,把(HEAD, x)更新成(EMPTY, HEAD),并把x返回。同样需要注意,如果x是TAIL,则说明队列为空。
算法的一个关键是——如何定位HEAD或TAIL?
1)我们可以声明两个计数器,一个用来计数EnQueue的次数,一个用来计数DeQueue的次数。
2)这两个计算器使用使用Fetch&ADD来进行原子累加,在EnQueue或DeQueue完成的时候累加就好了。
3)累加后求个模什么的就可以知道TAIL和HEAD的位置了。