概述
JDK中提供了一系列场景的并发安全队列。总的来说,按照实现方式的不同可分为阻塞队列和非阻塞队列,
- 阻塞队列使用锁实现
- 非阻塞队列则使用CAS非阻塞算法实现
ConcurrentLinkedQueue
ConcurrentLinkedQueue是线程安全的无界非阻塞队列,其底层数据结构使用单向链表实现,对于入队和出队操作使用CAS来实现线程安全。
【类图】
ConcurrentLinkedQueue内部的队列使用单向链表方式实现,
其中有两个volatile类型的Node节点分别用来存放队列的首、尾节点。
从下面的无参构造函数可知,默认头、尾节点都是指向item为null的哨兵节点。 新元素会被插入队列末尾,出队时从队列头部获取一个元素。
在Node节点内部则维护一个使用volatile修饰的变量item,用来存放节点的值;next用来存放链表的下一个节点,从而链接为一个单向无界链表。其内部则使用UNSafe工具类提供的CAS算法来保证出入队时操作链表的原子性。
核心方法&源码解读
下面我们介绍ConcurrentLinkedQueue的几个主要方法的实现原理。
offer
在链表末尾添加一个元素
/** * Inserts the specified element at the tail of this queue. * As the queue is unbounded, this method will never return {@code false}. * * @return {@code true} (as specified by {@link Queue#offer}) * @throws NullPointerException if the specified element is null */ public boolean offer(E e) { //1 e为null则抛出空指针异常 checkNotNull(e); //2 构造Node节点构造函数内部调用unsafe.putObject,后面统一讲 final Node<E> newNode = new Node<E>(e); //3 从尾节点插入 for (Node<E> t = tail, p = t;;) { Node<E> q = p.next; // 4 如果q=null说明p是尾节点则插入 if (q == null) { // 5 使用cas设置p节点的next节点 if (p.casNext(null, newNode)) { // 6 cas成功说明新增节点已经被放入链表,然后设置当前尾节点(包含head,1,3,5.。。个节点为尾节点) if (p != t) // hop two nodes at a time casTail(t, newNode); // Failure is OK. return true; } // Lost CAS race to another thread; re-read next } else if (p == q)// 7 //多线程操作时候,由于poll时候会把老的head变为自引用,然后head的next变为新head,所以这里需要 //重新找新的head,因为新的head后面的节点才是激活的节点 p = (t != (t = tail)) ? t : head; else // 8 寻找尾节点 p = (p != t && t != (t = tail)) ? t : q; } }
首先看当一个线程调用offer(item)时的情况。首先代码(1)对传参进行空检查,如果为null则抛出NPE异常,否则执行代码(2)并使用item作为构造函数参数创建一个新的节点,然后代码(3)从队列尾部节点开始循环,打算从队列尾部添加元素,当执行到代码(4)时队列状态如下所示。
这时候节点p、t、head、tail同时指向了item为null的哨兵节点,由于哨兵节点的next节点为null,所以这里q也指向null。
q==null则执行代码(5),通过CAS原子操作判断p节点的next节点是否为null,如果为null则使用节点newNode替换p的next节点,然后执行代码(6),这里由于p==t所以没有设置尾部节点,然后退出offer方法,这时候队列的状态如下图所示
(2)上面是一个线程调用offer方法的情况,如果多个线程同时调用,就会存在多个线程同时执行到代码(5)的情况。假设线程A调用offer(item1),线程B调用offer(item2),同时执行到代码(5)p.casNext(null, newNode)。
由于CAS的比较设置操作是原子性的,所以这里假设线程A先执行了比较设置操作,发现当前p的next节点确实是null,则会原子性地更新next节点为 item1,这时候线程B也会判断p的next节点是否为null,结果发现不是null(因为线程A已经设置了p的next节点为 item1),则会跳到代码(3),然后执行到代码(4),这时候的队列分布如下图所示。
根据上面的状态图可知线程B接下来会执行代码(8),然后把q赋给了p,这时候队列状态如下图所示。
然后线程B再次跳转到代码(3)执行,当执行到代码(4)时队列状态如下图所示
由于这时候q==null,所以线程B会执行代码(5),通过CAS操作判断当前p的next节点是否是null,不是则再次循环尝试,是则使用item2替换。假设CAS成功了,那么执行代码(6),由于p!=t,所以设置tail节点为item2,然后退出offer方法。这时候队列分布如下图所示。
分析到现在,就差代码(7)还没走过,其实这一步要在执行poll操作后才会执行。这里先来看一下执行poll操作后可能会存在的一种情况,如下图所示。
下面分析当队列处于这种状态时调用offer添加元素,执行到代码(4)时的状态图,如下
这里由于q节点不为空并且pq所以执行代码(7),由于ttail所以p被赋值为head,然后重新循环,循环后执行到代码(4),这时候队列状态如下图所示。
这时候由于q==null,所以执行代码(5)进行CAS操作,如果当前没有其他线程执行offer操作,则CAS操作会成功,p的next节点被设置为新增节点。然后执行代码(6),由于p!=t所以设置新节点为队列的尾部节点,现在队列状态如图
需要注意的是,这里自引用的节点会被垃圾回收掉。
可见,offer操作中的关键步骤是代码(5),通过原子CAS操作来控制某时只有一个线程可以追加元素到队列末尾。进行CAS竞争失败的线程会通过循环一次次尝试进行CAS操作,直到CAS成功才会返回,也就是通过使用无限循环不断进行CAS尝试方式来替代阻塞算法挂起调用线程。相比阻塞算法,这是使用CPU资源换取阻塞所带来的开销。