从队列容器选择,到算法一步步迭代,这是一篇从零开始逐步推演出“非阻塞线程安全队列”思想方法的文章。(有剧情反转~)
队列容器设计
若用数组作为队列的容器,就必须得加锁,因为数组是一块连续内存地址,多线程场景下,读写同一块内存地址不得不互斥地访问。
链式结构
链式结构就没有这个烦恼。链的每个结点都对应不同的内存地址,在多线程场景下,取头结点和插尾结点就不存在并发问题。(至少是降低了并发问题产生的概率)
通用的队列应该可存放任何类型的元素。
综上,就得声明一个带泛型的链结点:
// 结点 private static class Node<E> { E item; // 结点数据字段 Node<E> next; // 结点后续指针 }
易变的字段
会不会存在多线程同时操纵同一个 Node 结点的 item 和 next 字段的情况?
当然有可能!为了让所有线程都能看到最新的 item 和 next,就得保证其 “可见性”:
private static class Node<E> { volatile E item; volatile Node<E> next; }
volatile
的字面意思是“易变的”,它用来形容共享变量高速缓存中的副本:
为提高线程运行效率,每个线程在得到 CPU 运行时间片后,都会将主存中变量拷贝到 CPU 高速缓存中(图中 a1,a2 即是 a 的副本),之后对共享变量的运算都在缓存中进行。
若 a 初始值为0,启动线程1和2,a 就新增了两个副本 a1, a2。随后俩线程并发地对 a 执行自增。
线程1执行a1++
后 a1 = 1,线程2对线程1的操作一无所知,当它也执行a2++
后 a2 = 1。而共享变量正确的值应该是 2。
造成这个错误的原因在于:缓存是相互隔离的,导致线程无法感知对方对共享变量的操作,即 “当前线程对共享变量的操作对其他线程不可见。”
解决方案自然是让其变得 “可见”。
java 的volatile
关键词,用于告诉编译器该共享变量在缓存中的副本是“易变的”,线程不该相信它,于是乎:
- 线程写共享变量后,总是应该立刻将最新值同步到主存中。
- 线程读共享变量时,总是应该去主存拿最新值。
就这样通过读写主存,线程间建立了一种通信机制,让它们对共享变量的操作变得“可见”。
为了方便队头出队,队尾入队,得安排两个指针,分别指向一头一尾。
public class MyQueue<E> { volatile Node<E> head; // 当前链头 volatile Node<E> tail; // 当前链尾 }
同样的道理,链头尾指针在多线程场景下是共享变量,也得使用 volatile 保持它们的可见性。
阶段性总结:
- 每个线程都有自己独立的内存空间(本地内存)
- 多线程能同时访问的变量称为“共享变量”,它在主存中。出于效率的考量,线程会将主存中的共享变量拷贝到本地内存,后续对共享变量的操作都发生在本地内存。
- 线程本地内存相互独立,导致它们无法感知别人对共享变量的操作,即当前线程对共享变量的操作对其他线程不可见。
- volatile 关键词是 java 解决共享变量可见性问题的方案。
- 被声明为 volatile 的变量,就是在告诉编译器该共享变量在线程本地内存中的副本是“易变的”,线程不该相信它。线程写共享变量后,总是应该立刻将最新值同步到主存中。线程读共享变量时,总是应该去主存拿最新值。
- 多线程场景下,队列的头尾指针,即结点的数据域和指针域都需要保证可见性。
出队
基础版
单链表取头结点的算法很简单,特别是已知头结点 head 时:
// 出队版本一 public E poll() { Node<E> p = head; E item = p.item; // 暂存头结点数据 head = head.next; // 更新头结点指针 p.item = null; // 老头结点内容置空 p.next = null; // 老头结点后续指针置空 return item; }
在单线程情况下没什么毛病,但多线程就会出问题。
非阻塞线程安全版
某线程正在读头结点的 item 字段,另一个线程可能已经将其置空。
当然可以使用synchronize
将对 head 的操作包裹起来,但这样处理是“有锁的”、“阻塞的”,即同一时间只有一个线程能获取锁,其余竞争者都被挂起等待锁被释放,然后再唤醒。“挂起-唤醒”是耗时的。
性能更好的“非阻塞”方式是CAS
,全称为compare and swap
,即“比较再交换”。
当要更新变量值时,需要提供三个参数:1.当前值,2.期望值,3.新值。只有当前值和期望值相等时,才用新值替换当前值。
其中期望值就是线程被打断执行时暂存的值,当线程恢复执行后,用当前值和当时暂存值比较,如果相等,则表示被打断过程中其他线程没有修改过它,那此时更新它的值是安全的。
CAS 操作,被封装在sun.misc.Unsafe
中,结合当前场景再简单封装如下:
private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe(); static <E> boolean casItem(Node<E> node, E cmp, E val) { // 将 node 结点 item 字段值通过和期望值 cmp 比较,安全地置为 val。 return U.compareAndSwapObject(node, ITEM, cmp, val); }
casItem()
在多线程下将 node.item 通过和预期值cmp
字段比较安全地赋予新值val
。
除此之外,还需要一个更新头结点的 CAS 操作:
// 头结点脱链并更新头指针 final void updateHead(Node<E> h, Node<E> p) { if (h != p) { Node<E> h1 = h; // 暂存当前头指针 casHead(h, p); // 更新头指针到 p 的 CAS 竞争 h1.next = null; // 老头指针 next 置空 } } private boolean casHead(Node<E> cmp, Node<E> val) { // 将头指针 head 通过和预期值 cmp 比较安全地赋予新值 val return U.compareAndSwapObject(this, HEAD, cmp, val); }
CAS 通常需配合轮询使用,因为多线程同时修改一个共享变量时,只有一个线程会成功(返回 true),其余失败(返回 false),失败线程通过轮询再次尝试 CAS,直到成功为止。
用这个思路,写一个线程安全的取链头结点算法:
// 出队版本二 public E poll() { // 轮询 for (;;) { E item = head.item;// 暂存 item Node<E> next = head.next; // 暂存次头结点 // 将头结点 item 字段置空的 CAS 竞争 if (casItem(head, item, null)) { for(;;) { // 将头结点指向次头结点的 CAS 竞争 if (updateHead(head, next)) return item; } } } }
死循环修正版
这下修改头结点的 item 字段算是线程安全了,但这个算法还差点东西。若某线程在竞争中失败,另一个线程成功地将头结点 item 字段置空了,因为轮询的存在,失败线程会不停重试,但此时 head.item 的当前值已经变为 null 而不是暂存的 item。所以 CAS 轮询会一直失败,程序陷入死循环。
为了解决死循环,修改逻辑如下:
// 出队版本三 public E poll() { // 轮询 for (Node<E> h = head, p = h, q;;) { E item = p.item;// 暂存 item // 若头结点 item 为空,则不进行 CAS 竞争 if (item != null && casItem(p, item, null)) { for(;;) { // 出队结点为 p,p 后续结点是新的头结点 // 但若 p 是尾结点,则头指针还是停留在 p Node<E> newHead = ((q = p.next) != null) ? q : p; // 更新头结点的 CAS 竞争 if (updateHead(head, newHead)) return item; } } // q 是 p 后续结点 else if ((q = p.next) == null) { // 链被掏空,更新头结点指针为最后一个非空结点,返回 null updateHead(h, p); return null; } // 还有后续结点 else { p = q; // 工作结点 p 向后寻找第一个 item 非空的结点 } } }
新增了三个工作指针:
- p:表示第一个 item 字段不空的结点。
- q:p 的后续结点,用于向后遍历。
- h:暂存进入循环时的链头指针 head。
当某线程在将 item 置空的竞争中失败时,另一个线程成功地将头结点 item 置空。因为轮询的存在,失败线程继续尝试,当下一轮 for 循环执行到条件item != null
时会停止竞争,因为当前 p 指向结点的 item 字段已经被其他线程置空。此时 p 借助于 q 的帮助往后遍历寻找第一个 item 不空的结点。若找到则再次发起竞争,若遍历到链尾仍未找到,则表示链已经被掏空,直接返回 null。
ps:空链的表达方式不是 head = null,而是 head 指向一个结点,head.next = null,head.item = null,如下图所示:
(之所以叫空链版本一,是因为这不是最终形态,这样设计会有问题,详解见下一节)
巧妙脱链哨兵版
offer() 是非阻塞的,它在执行任何一条语句都有被打断的可能,这就不太妙了。
考虑下面这个场景:“线程A正执行出队,刚进入 for 循环,即刚把当前头指针暂存在 h,p 中,此时它被中断了,另一个出队线程得到执行机会,并抢先把头结点出队了。终于又轮到线程A继续执行。。。”
上图左侧为线程A正准备执行出队操作的初始状态,但它被中断了,另一个线程抢先把头结点 Node1 出队脱链了,并将 next 置空。当线程A恢复执行时,老母鸡变鸭的场景如上图右侧所示。
按照版本三的算法,之前暂存的头结点 p 的 item 字段已被置空,所以会往后遍历以定位到第一个 item 非空的结点。但尴尬的事情来了,因为在将头结点脱链时会将其 next 也置空。当 p 指针往后移动一个结点后就发现是空指针,版本三的算法判定这种情况即是链表被掏空,所以返回了 null。但明明链上还有一个 Node2 结点。。。
所以 “脱链结点” 和 “链尾结点” 的判定得区分开来。修改一下脱链方法:
final void updateHead(Node<E> h, Node<E> p) { // 若修改头结点 CAS 竞争成功,则将头结点自旋(next域指向自己) if (h != p && casHead(h, p)) lazySetNext(h, h); } // 将 head.next 指向自己 static <E> void lazySetNext(Node<E> node, Node<E> val) { U.putOrderedObject(node, NEXT, val); }
头结点脱链操作分为两步:
- 将头指针安全地指向新结点。
- 将头结点自旋,即 next 指针指向自己。 这两步操作的效果如下图所示:
脱链结点 和 链尾结点 的判断条件区分开之后,脱链结点就有 “哨兵” 的效果:
- 脱链结点的判断条件为
p == p.next
- 链尾结点的判断条件为
p.next == null
相应的修改出队算法:
// 出队版本四 public E poll() { restartFromHead: // 第一层轮询 for(;;) { // 第二层轮询 for (Node<E> h = head, p = h, q;;) { E item = p.item; // 找到真正的非空链头 if (item != null && casItem(p, item, null)) { for(;;) { Node<E> newHead = ((q = p.next) != null) ? q : p; if (updateHead(head, newHead)) return item; } } // 空链 else if ((q = p.next) == null) { updateHead(h, p); return null; } // 哨兵:p 指向结点已经脱链,从新 head 开始重新遍历 else if (p == q) continue restartFromHead; // 当还有后续结点 else { p = q; } } } }
新增了一个逻辑分支else if (p == q)
用于表示当前遍历结点已脱链,脱链即表示头结点 head 已被更新到新位置。所以“从头遍历以寻找第一个 item 非空的结点”这项工作应该从新的 head 出发,即 p 应该被新 head 赋值,为了实现该效果,就不得不用上 goto 这个骚操作,在 java 中用 continue 实现。
性能优化最终版
版本四已经可以正确地在多线程场景下实现出队,但还有一个地方可以被优化。
虽然 CAS 是非阻塞的竞争,但竞争就要耗费 CPU 资源。所以能不竞争就不竞争,在当前算法中“更新链头指针”就是可以缓一缓的竞争。
// 出队版本五 public E poll() { restartFromHead: for(;;) { for (Node<E> h = head, p = h, q;;) { E item = p.item; // 找到真正的非空头结点 if (item != null && casItem(p, item, null)) { // 有条件地更新头指针 if(p!=h) { Node<E> newHead = ((q = p.next) != null) ? q : p; if (updateHead(head, newHead)) return item; } return item; } else if ((q = p.next) == null) { updateHead(h, p); return null; } else if (p == q) continue restartFromHead; else { p = q; } } } }
为更新头指针增加了条件p!=h
,即只有当 head 指针滞后时才更新它。
因为 head 滞后必然造成 p 和 h 指向不同结点。p 的使命是从 head 出发往后定位到第一个 item 不空的结点。若 head 没有滞后,则 p 无需往后遍历,此时 p 必然等于 h。
不及时更新链头指针会导致算法出错吗?不会。因为该算法其实并不相信 head,所以安排了工作指针 p 通过遍历来找到真正的头结点。但不及时更新让竞争的次数直接减半,提高了性能。
阶段性总结:
非阻塞线程安全队列的出队算法总结如下:
- 总是从头指针出发向后寻找真正的头结点(item 非空),若找到,则将头结点 item 域置 null(CAS保证线程安全),表示结点出队,若未找到则返回 null。
- 采用 “自旋” 的方式来实现脱链(结点指针域与链脱钩),即 next 域指向自己(CAS保证线程安全)。自旋形成了哨兵的效果,使得往后遍历寻找真正头结点的过程中可感知到结点脱链。
- 滞后的头指针:“出队”、“脱链”、“更新头指针”不是配对的。理论上每次出队都应该脱链并更新头指针,为了提高性能,两次出队对应一次脱链+更新头指针,造成头结点会间歇性地滞后。
- 因为是非阻塞的,出队操作可能被其他线程打断,若是被入队打断则基本无害,因为队头队尾是两块不同的内存地址,不会有线程安全问题。若是被另一个出队线程打断,就可能发生想出的结点被其他线程抢先出队的情况。通过检测自旋+从头开始来定位到新得头结点解决。
入队
用同样的思路,一步步徒手写一个入队算法。
基础版
入队在链表结构上,即是链尾插入,这个算法很简单,特别是在已知尾指针tail
的情况下:
// 入队版本一 public boolean offer(E e) { Node<E> newNode = createNode(e)// 构建新结点 tail.next = newNode;// 链尾链入新结点 tail = newNode;// 更新链尾指针 return true; }
非阻塞线程安全版
但在多线程情况下就变得有点复杂,比如在向尾结点插入新结点的同时,另一个线程正在删除这个尾结点。
尾结点tail
这个变量是多线程共享的,对它的操作需要考虑线程安全问题。和出队操作一样使用 CAS。
对于链尾插入的场景来说,尾结点 next 指针的期望值是null
,若不是 null 则表示尾结点被另一个线程修改了,比如被另一个写线程抢先一步在尾部插入了一个新结点。尾结点 next 指针的新值自然是 newNode。
CAS 的封装如下:
// java.util.concurrent.ConcurrentLinkedQueue private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe(); static <E> boolean casNext(Node<E> node, Node<E> cmp, Node<E> val) { return U.compareAndSwapObject(node, NEXT, cmp, val); }
castNext()
即是 CAS 机制的一个实现,它在多线程下将 Node 结点的 next 指针安全地赋予新值。
除此之外,还需要一个casTail()
:
private boolean casTail(Node<E> cmp, Node<E> val) { return U.compareAndSwapObject(this, TAIL, cmp, val); }
它在多线程下将链尾指针安全地赋予新值。
用这个思路,写一个线程安全的入队算法:
// 入队版本二 public boolean offer(E e) { Node<E> newNode = createNode(e) // 轮询 for (;;) { // 尾插入 CAS 竞争 if (casNext(tail, null, newNode)) { // 若竞争成功,则更新尾指针 CAS 竞争 for(;;){ if (casTail(tail, newNode)) return true; } } } }