ConcurrentLinkedQueue 原理探究
介绍
ConcurrentLinkedQueue 是线程安全的无界非阻塞队列,底层使用单向链表实现,对于入队和出队操作使用 CAS 实现线程安全。
ConcurrentLinkedQueue 内部的队列使用单向链表方式实现,其中有两个volatile类型的Node节点
分别用来存放队列的头、尾节点。
下面无参构造函数中可以知道,默认头尾节点都是指向 item 为 null 的哨兵节点,新元素插入队尾,获取元素从头部出队。
private transient volatile Node<E> head; private transient volatile Node<E> tail; public ConcurrentLinkedQueue() { head = tail = new Node<E>(null); }
在 Node 节点中维护一个使用 volatile 修饰的变量 item,用来存放节点的值;next 存放链表的下一个节点;其内部则使用 UNSafe 工具类提供的 CAS 算法来保证出入队时操作链表的原子性。
offer 操作
offer 操作是在队尾增加一个元素,如果传递的参数是 null 则抛出异常,否则由于 ConcurrentLinkedQueue 是无界队列,该方法会一直返回 true,另外,由于使用的是 CAS 算法,所以该方法不会被阻塞挂起调用方线程。下面我们看源码:
public boolean offer(E e) { //(1)为null则抛出异常 checkNotNull(e); //(2)构造Node节点,内部调用unsafe.putObject final Node<E> newNode = new Node<E>(e); //(3)从尾节点插入 for (Node<E> t = tail, p = t;;) { Node<E> q = p.next; //(4)如果q == null则说明p是尾节点,执行插入 if (q == null) { //(5)使用CAS设置p节点的next节点 if (p.casNext(null, newNode)) { //(6)如果p != t,则将入队节点设置成tail节点,更新失败了也没关系,因为失败了表示有其他线程成功更新了tail节点 if (p != t) // hop two nodes at a time casTail(t, newNode); // Failure is OK. return true; } } else if (p == q) //(7)多线程操作时,由于poll操作移除元素后可能会把head变成自引用,也就是head的next变成了head,这里需要重新找新的head p = (t != (t = tail)) ? t : head; else //(8)寻找尾节点 p = (p != t && t != (t = tail)) ? t : q; } }
我们解析一下这个方法的执行流程,首先当调用该方法时,代码(1)对传参进行检查,为 null 则抛出异常,否则执行代码(2)并使用 item 作为构造函数参数创建一个新的节点,然后代码(3)从队列尾部节点开始循环,打算从尾部添加元素,到代码(4)时队列状态如下图。
这时候节点 p、t、head、tail 都指向 item 为 null 的哨兵节点,由于哨兵节点 next 节点为 null,所以 q 也是 null。
代码(4)发现 q == null 则执行代码(5),使用 CAS 判断 p 节点的 next 是否为 null,为 null 则使用 newNode 替换 p 的 next 节点,然后执行代码(6),但这时候 p == t 所以没有设置尾部节点,然后退出 offer 方法。目前队列状态如下图:
刚才我们讲解的是一个线程调用 offer 方法的情况,但是如果多个线程同时调用,就会存在多个线程同时执行到代码(5)的情况。
假设线程 1 调用了 offer(item1),线程 2 调用了 offer(item2),都同时执行到了代码(5),p.casNext(null,newNode)。由于 CAS 是原子性的,我们假设线程 1 先执行 CAS 操作,发现 p.next 为 null,则更新为 item1,这时候线程 2 也会判断 p.next 是否为 null,发现不为 null,则跳转到代码(3),然后执行代码(4)。这时队列分布如下图:
随后线程 2 发现不满足代码(4),则跳转执行代码(8),然后把 q 赋值给了 p。队列状态如下:
然后线程 2 再次跳转到代码(3)执行,当执行到代码(4)时队列状态如下图:
这时候q == null
,所以线程 2 会执行代码(5),通过 CAS 操作判断 p.next 节点是否为 null,是 null 则使用 item2 替换,不是则继续循环尝试。假设 CAS 成功,那么执行代码(6),由于p != t
,所以设置 tail 节点为 item2,然后退出 offer 方法。这时候队列分布如下:
到现在我们就差代码(7)没有讲过,其实在这一步主要是在执行 poll 操作后才会执行。在执行 poll 可能会发生如下情况:
我们分析一下如果是这种状态时调用 offer 添加元素,在执行到代码(4)时状态图如下:
这里由于 q 节点不为空并且p == q
所以执行到代码(7)由于t == tail
所以 p 被赋值为 head,然后重新循环,循环后执行到代码(4),队列状态如下:
这时候由于q == null
,所以执行代码(5)进行 CAS 操作,如果当前没有其他线程执行 offer 操作,则 CAS 操作会成功,p.next 节点被设置为新增节点。然后执行代码(6),由于p != t
所以设置新节点为队列的尾部节点,现在队列状态如下图:
📢 需要注意,这里自引用节点会被垃圾回收掉
总结:offer 操作的关键步骤是代码(5),通过 CAS 操作来控制同一时刻只有一个线程可以追加元素到队列末尾,而失败的线程会进行循环尝试 CAS 操作,直到 CAS 成功。
add 操作
add 操作是在链表末尾添加一个元素,内部还是调用的 offer 方法。
public boolean add(E e) { return offer(e); }
poll 操作
poll 操作是在队列头部获取并移除一个元素,如果队列为空则返回 null。下面看看该方法的实现原理。
public E poll() { //(1)goto标记 restartFromHead: //(2)无限循环 for (;;) { for (Node<E> h = head, p = h, q;;) { //(3)保存当前节点 E item = p.item; //(4)当前节点有值切CAS变为null if (item != null && p.casItem(item, null)) { //(5)CAS成功后标记当前节点并移除 if (p != h) // hop two nodes at a time //更新头节点 updateHead(h, ((q = p.next) != null) ? q : p); return item; } //(6)当前队列为空则返回null else if ((q = p.next) == null) { updateHead(h, p); return null; } //(7)如果当前节点被自引用,则重新寻找头节点 else if (p == q) continue restartFromHead; else //(8)如果下一个元素不为空,则将头节点的下一个节点设置成头节点 p = q; } } }
final void updateHead(Node<E> h, Node<E> p) { if (h != p && casHead(h, p)) h.lazySetNext(h); }
由于 poll 方法是从头部获取一个元素并移除,所以代码(2)内层循环是从 head 节点开始,代码(3)获取当前队列头节点,队列一开始为空时队列状态如下:
由于 head 节点指向的 item 为 null 的哨兵节点,所以会执行到代码(6),假设这个过程中没有线程调用 offer 方法,则此时 q 等于 null,这时队列状态如下:
随后执行 updateHead 方法,由于h == p
,所以没有设置头节点,然后返回 null。
假设执行到代码(6)时,已经有其它线程调用了 offer 方法并成功添加了一个元素到队列,这时候 q 指向的是新增加的元素节点,如下图:
然后不满足代码(6)(q = p.next) == null
,执行代码(7)的时候 p != q
则执行代码(8),然后将 p 指向了节点 q,队列状态如下:
然后程序又去执行代码(3),p 现在指向的不为 null,则执行p.casItem(item, null)
通过 CAS 操作尝试设置 p 的 item 值为 null,如果此时 CAS 成功,执行代码(5),此时p != h
则设置头节点为 p,并且设置 h 的 next 节点为自己,poll 然后返回被移除的节点 item。队列图如下:
目前的队列状态就是我们刚才讲 offer 操作时,执行到代码(7)的状态。
现在我们还有代码(7)没执行过,我们看一下什么时候执行。假设线程 1 执行 poll 操作时,队列状态如下:
那么执行p.casItem(item, null)
通过 CAS 操作尝试设置 p 的 item 值为 null,假设 CAS 设置成功则标记该节点并从队列中将其移除。队列状态如下:
然后,由于p != h
,所以会执行 updateHead 方法,假如线程 1 执行 updateHead 前另外一个线程 2 开始 poll 操作,这时候线程 2 的 p 指向 head 节点,但是还没有执行到代码(6),这时候队列状态如下:
然后线程 1 执行 updateHead 操作,执行完毕后线程 1 退出,这时候队列状态如下:
然后线程 2 继续执行代码(6), q = p.next
,由于该节点是自引用节点,所以p == q
,所以会执行代码(7)跳到外层循环 restartFromHead,获取当前队列头 head,现在的状态如下:
总结:poll 方法在移除一个元素时,只是简单地使用 CAS 操作把当前节点的 item 值设置为 null,然后通过重新设置头节点将该元素从队列里面移除,被移除的节点就成了孤立节点,这个节点会在垃圾回收时被回收掉。另外,如果在执行分支中发现头节点被修改了,要跳到外层循环重新获取新的头节点。
peek 操作
peek 操作是获取队列头部一个元素(只获取不移除),如果队列为空则返回 null。下面看下其实现原理。
public E peek() { //1. restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { //2. E item = p.item; //3. if (item != null || (q = p.next) == null) { updateHead(h, p); return item; } //4. else if (p == q) continue restartFromHead; //5. else p = q; } } }
peek 操作的代码结构与 poll 操作类似,不同之处在于代码(3)中少了 castItem 操作。其实这很正常,因为 peek 只是获取队列头元素值,并不清空其值。根据前面的介绍我们知道第一次执行 offer 后 head 指向的是哨兵节点(也就是 item 为 null 的节点),那么第一次执行 peek 时在代码(3)中会发现item == null
,然后执行q = p.next
,这时候 q 节点指向的才是队列里面第一个真正的元素,或者如果队列为 null 则 q 指向 null。
当队列为空时,队列如下:
这时候执行 updateHead,由于 h 节点等于 p 节点,所以不进行任何操作,然后 peek 操作会返回 null。
当队列中至少有一个元素时(假设只有一个),队列状态如下:
这时候执行代码(5), p 指向了 q 节点,然后执行代码(3),此时队列状态如下:
执行代码(3)时发现 item 不为 null,所以执行 updateHead 方法,由于h != p
,所以设置头节点,设置后队列状态如下:
最后剔除哨兵节点。
总结:peek 操作的代码与 poll 操作类似,只是前者只获取队列头元素但是并不从队列里将它删除,而后者获取后需要从队列里面将它删除。另外,在第一次调用 peek 操作时,会删除哨兵节点,并让队列的 head 节点指向队列里面第一个元素或者为 null。
size 操作
计算当前队列元素个数,在并发环境下不是很有用,因为 CAS 没有加锁,所以从调用 size 方法到返回结果期间有可能增删元素,导致统计的元素个数不精确。
public int size() { int count = 0; for (Node<E> p = first(); p != null; p = succ(p)) if (p.item != null) // Collection.size() spec says to max out if (++count == Integer.MAX_VALUE) break; return count; }
//获取第一个队列元素,(剔除哨兵节点),没有则为null Node<E> first() { restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { boolean hasItem = (p.item != null); if (hasItem || (q = p.next) == null) { updateHead(h, p); return hasItem ? p : null; } else if (p == q) continue restartFromHead; else p = q; } } }
remove 操作
如果队列里面存在该元素则删除该元素,如果存在多个则删除第一个,并返回 true,否则返回 false。
public boolean remove(Object o) { if (o != null) { Node<E> next, pred = null; for (Node<E> p = first(); p != null; pred = p, p = next) { boolean removed = false; E item = p.item; if (item != null) { // 若不匹配,则获取next节点继续匹配 if (!o.equals(item)) { next = succ(p); continue; } //相等则使用CAS设置为null removed = p.casItem(item, null); } // 获取删除节点的后继节点 next = succ(p); //如果有前驱节点,并且next节点不为null,则链接前驱节点到next节点 if (pred != null && next != null) // unlink // 将被删除的节点通过CAS移除队列 pred.casNext(p, next); if (removed) return true; } } return false; }
contains 操作
判断队列里面是否含有指定对象,由于是遍历整个队列,所以像 size 操作一样结果也不是那么精确,有可能调用该方法时元素还在队列里面,但是遍历过程中其他线程才把该元素删除了,那么就会返回 false。
public boolean contains(Object o) { if (o == null) return false; for (Node<E> p = first(); p != null; p = succ(p)) { E item = p.item; if (item != null && o.equals(item)) return true; } return false; }
总结
ConcurrentLinkedQueue 的底层使用单向链表数据结构来保存队列元素,每个元素被包装成一个 Node 节点。队列是靠头、尾节点来维护的,创建队列时头、尾节点指向一个 item 为 null 的哨兵节点。第一次执行 peek 或者 first 操作时会把 head 指向第一个真正的队列元素。由于使用非阻塞 CAS 算法,没有加锁,所以在计算 size 时有可能进行了 offer、poll 或者 remove 操作,导致计算的元素个数不精确,所以在并发情况下 size 方法不是很有用。