网络异常,图片无法展示
|
前言
JUC 下面的相关源码继续往下阅读,这就看到了非阻塞的无界线程安全队列 —— ConcurrentLinkedQueue,来一起看看吧。
介绍
基于链接节点的无界线程安全队列,对元素FIFO(先进先出)进行排序。 队列的头部是队列中最长时间的元素,队列的尾部是队列中最短时间的元素。 在队列的尾部插入新元素,队列检索操作获取队列头部的元素。
当许多线程共享对公共集合的访问 ConcurrentLinkedQueue 是一个合适的选择。 与大多数其他并发集合实现一样,此类不允许使用null元素。
基本使用
public class ConcurrentLinkedQueueTest { public static void main(String[] args) { ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<String>(); // 将指定元素插入此队列的尾部。 queue.add("liuzhihang"); // 将指定元素插入此队列的尾部。 queue.offer("liuzhihang"); // 获取但不移除此队列的头,队列为空返回 null。 queue.peek(); // 获取并移除此队列的头,此队列为空返回 null。 queue.poll(); } }
源码分析
基本结构
网络异常,图片无法展示
|
参数介绍
private static class Node<E> { // 节点中的元素 volatile E item; // 下一个节点 volatile Node<E> next; Node(E item) { UNSAFE.putObject(this, itemOffset, item); } // CAS 的方式设置节点元素 boolean casItem(E cmp, E val) { return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } // 设置下一个节点 void lazySetNext(Node<E> val) { UNSAFE.putOrderedObject(this, nextOffset, val); } // CAS 的方式设置下一个节点 boolean casNext(Node<E> cmp, Node<E> val) { return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } // 省略 …… }
在 ConcurrentLinkedQueue 内部含有一个内部类 Node,如上所示,这个内部类用来标
识链表中的一个节点,通过代码可以看出,在 ConcurrentLinkedQueue 中的链表为单向链表
。
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E> implements Queue<E>, java.io.Serializable { // 其他省略 // 头结点 private transient volatile Node<E> head; // 尾节点 private transient volatile Node<E> tail; }
头尾节点使用 volatile
修饰,保证内存可见性。
构造函数
public ConcurrentLinkedQueue() { head = tail = new Node<E>(null); }
当创建对象时,头尾节点都是指向一个空节点。
网络异常,图片无法展示
|
添加元素
public boolean add(E e) { return offer(e); } public boolean offer(E e) { // 验证是否为空 checkNotNull(e); // 创建节点 final Node<E> newNode = new Node<E>(e); // 循环入队列 // t 是当前尾节点,p 初始为 t for (Node<E> t = tail, p = t;;) { // q 为尾节点的下一个节点 Node<E> q = p.next; if (q == null) { // 为空,说明后面没有节点,则 CAS 设置尾节点 if (p.casNext(null, newNode)) { // 此时 p.next 是 newNode // 如果 p != t 说明有并发 if (p != t) // 其他线程已经更新了 tail // q = p.next 所以 q == null 不正确了 // q 取到了 t.next // 此时将 tail 更新为 新节点 casTail(t, newNode); // Failure is OK. return true; } // Lost CAS race to another thread; re-read next } // 多线程情况下, poll ,操作移除元素,可能会导致 p == q // 此时要重新查找 else if (p == q) // p = (t != (t = tail)) ? t : head; else // 检查 tail 并更新 p = (p != t && t != (t = tail)) ? t : q; } }
画图说明:
- 单线程情况下:
- 当执行到
Node q = p.next;
时,当前情况如图所示:
网络异常,图片无法展示
|
- 判断
q == null
,满足条件,此时便会执行p.casNext(null, newNode)
使用 CAS 设置 p.next。 - 设置成功之后,
p == t
没有变动,所以程序退出。
- 多线程情况下:
- 当执行到
Node q = p.next;
时,当前情况如图所示:
网络异常,图片无法展示
|
- 多个线程执行
p.casNext(null, newNode)
使用 CAS 设置 p.next。 - A 线程 CAS 设置成功:
网络异常,图片无法展示
|
- B 线程 CAS 执行失败, 重新循环,会执行到
p = (p != t && t != (t = tail)) ? t : q
。
网络异常,图片无法展示
|
- 再次循环就可以成功设置上了。
获取元素
public E poll() { restartFromHead: // 无限循环 for (;;) { for (Node<E> h = head, p = h, q;;) { // 头结点的 iterm E item = p.item; // 当前节点如果不为 null CAS 设置为 null if (item != null && p.casItem(item, null)) { // CAS 成功 则标记移除 if (p != h) // hop two nodes at a time updateHead(h, ((q = p.next) != null) ? q : p); return item; } // 当前队列未空 返回 null else if ((q = p.next) == null) { updateHead(h, p); return null; } // 自引用了, 重新进行循环 else if (p == q) continue restartFromHead; else p = q; } } }
画图过程如下:
- 在执行内层循环时,如果队列为空:
E item = p.item;
此时,iterm 为 null,会updateHead(h, p)
并返回 null。 - 假设同时有并发插入操作,添加了一个元素,此时如图所示:
网络异常,图片无法展示
|
这时会执行最后的 else 将 p = q
网络异常,图片无法展示
|
- 继续循环获取 item,并执行
p.casItem(item, null)
, 然后判断p != h
,更新 head 并返回 item。
网络异常,图片无法展示
|
这里的情况比较复杂,这里只是列举一种,如果需要可以自己多列举几种。
而查看元素的代码和获取元素代码类似就不多介绍了。
size 操作
public int size() { int count = 0; for (Node<E> p = first(); p != null; p = succ(p)) if (p.item != null) // Collection.size() spec says to max out if (++count == Integer.MAX_VALUE) break; return count; }
CAS 没有加锁,所以 size 是不准确的。并且 size 会遍历一遍列表,比较耗费性能。
总结
ConcurrentLinkedQueue 在工作中使用的相对较少,所以阅读相关源码的时候也只是大概看了一下,了解常用 API,以及底层原理。
简单总结就是使用单向链表来保存队列元素,内部使用非阻塞的 CAS 算法,没有加锁。所以计算 size 时可能不准确,同样 size 会遍历链表,所以并不建议使用。