add
add操作是在链表末尾添加一个元素,其实在内部调用的还是offer操作’\
/** * Inserts the specified element at the tail of this queue. * As the queue is unbounded, this method will never throw * {@link IllegalStateException} or return {@code false}. * * @return {@code true} (as specified by {@link Collection#add}) * @throws NullPointerException if the specified element is null */ public boolean add(E e) { return offer(e); }
poll
poll操作是在队列头部获取并移除一个元素,如果队列为空则返回null。
public E poll() { // 1. goto标记 restartFromHead: // 2 无限循环 for (;;) { for (Node<E> h = head, p = h, q;;) { // 3 保存当前节点 E item = p.item; // 4 当前item有值,则CAS变为null if (item != null && p.casItem(item, null)) { // Successful CAS is the linearization point // for item to be removed from this queue. // 5 cas成功则标记当前节点并从链表中移除 if (p != h) // hop two nodes at a time updateHead(h, ((q = p.next) != null) ? q : p); return item; } // 6 当前队列为空则返回null else if ((q = p.next) == null) { updateHead(h, p); return null; } // 7 如果当前节点被自己引用,则重新查找新的队列头节点 else if (p == q) continue restartFromHead; else // 8 p = q; } } }
poll方法在移除一个元素时,只是简单地使用CAS操作把当前节点的item值设置为null,然后通过重新设置头节点将该元素从队列里面移除,被移除的节点就成了孤立节点,这个节点会在垃圾回收时被回收掉。另外,如果在执行分支中发现头节点被修改了,要跳到外层循环重新获取新的头节点。
peek
peek操作是获取队列头部一个元素(只获取不移除),如果队列为空则返回null
public E peek() { // 1 restartFromHead: for (;;) { // 2 for (Node<E> h = head, p = h, q;;) { E item = p.item; // 3 if (item != null || (q = p.next) == null) { updateHead(h, p); return item; } // 4 else if (p == q) continue restartFromHead; else // 5 p = q; } } }
Peek操作的代码结构与poll操作类似,不同之处在于代码(3)中少了castItem操作。
其实这很正常,因为peek只是获取队列头元素值,并不清空其值。根据前面的介绍我们知道第一次执行offer后head指向的是哨兵节点(也就是item为null的节点),那么第一次执行peek时在代码(3)中会发现item==null,然后执行q=p.next,这时候q节点指向的才是队列里面第一个真正的元素,或者如果队列为null则q指向null。
总结:peek操作的代码与poll操作类似,只是前者只获取队列头元素但是并不从队列里将它删除,而后者获取后需要从队列里面将它删除。
另外,在第一次调用peek操作时,会删除哨兵节点,并让队列的head节点指向队列里面第一个元素或者null。
size
计算当前队列元素个数,在并发环境下不是很有用,因为CAS没有加锁,所以从调用size函数到返回结果期间有可能增删元素,导致统计的元素个数不精确
/** * Returns the number of elements in this queue. If this queue * contains more than {@code Integer.MAX_VALUE} elements, returns * {@code Integer.MAX_VALUE}. * * <p>Beware that, unlike in most collections, this method is * <em>NOT</em> a constant-time operation. Because of the * asynchronous nature of these queues, determining the current * number of elements requires an O(n) traversal. * Additionally, if elements are added or removed during execution * of this method, the returned result may be inaccurate. Thus, * this method is typically not very useful in concurrent * applications. * * @return the number of elements in this queue */ public int size() { int count = 0; for (Node<E> p = first(); p != null; p = succ(p)) if (p.item != null) // Collection.size() spec says to max out if (++count == Integer.MAX_VALUE) break; return count; }
// 获取第一个元素,哨兵元素不算,没有则为null Node<E> first() { restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { boolean hasItem = (p.item != null); if (hasItem || (q = p.next) == null) { updateHead(h, p); return hasItem ? p : null; } else if (p == q) continue restartFromHead; else p = q; } } }
remove
如果队列里面存在该元素则删除该元素,如果存在多个则删除第一个,并返回true,否则返回false。
/** * Removes a single instance of the specified element from this queue, * if it is present. More formally, removes an element {@code e} such * that {@code o.equals(e)}, if this queue contains one or more such * elements. * Returns {@code true} if this queue contained the specified element * (or equivalently, if this queue changed as a result of the call). * * @param o element to be removed from this queue, if present * @return {@code true} if this queue changed as a result of the call */ public boolean remove(Object o) { //查找元素为空,直接返回false if (o == null) return false; Node<E> pred = null; for (Node<E> p = first(); p != null; p = succ(p)) { E item = p.item; //相等则使用cas值null,同时一个线程成功,失败的线程循环查找队列中其他元素是否有匹配的。 if (item != null && o.equals(item) && p.casItem(item, null)) { //获取next元素 Node<E> next = succ(p); //如果有前驱节点,并且next不为空则链接前驱节点到next, if (pred != null && next != null) pred.casNext(p, next); return true; } pred = p; } return false; }
contains
判断队列里面是否含有指定对象,由于是遍历整个队列,所以像size 操作一样结果也不是那么精确,有可能调用该方法时元素还在队列里面,但是遍历过程中其他线程才把该元素删除了,那么就会返回false。
/** * Returns {@code true} if this queue contains the specified element. * More formally, returns {@code true} if and only if this queue contains * at least one element {@code e} such that {@code o.equals(e)}. * * @param o object to be checked for containment in this queue * @return {@code true} if this queue contains the specified element */ public boolean contains(Object o) { if (o == null) return false; for (Node<E> p = first(); p != null; p = succ(p)) { E item = p.item; if (item != null && o.equals(item)) return true; } return false; }
总结
ConcurrentLinkedQueue的底层使用单向链表数据结构来保存队列元素,每个元素被包装成一个Node节点。队列是靠头、尾节点来维护的,创建队列时头、尾节点指向一个item为null的哨兵节点。
第一次执行peek或者first操作时会把head指向第一个真正的队列元素。由于使用非阻塞CAS算法,没有加锁,所以在计算size时有可能进行了offer、poll或者remove操作,导致计算的元素个数不精确,所以在并发情况下size函数不是很有用。
如下图所示,入队、出队都是操作使用volatile修饰的tail、head节点,要保证在多线程下出入队线程安全,只需要保证这两个Node操作的可见性和原子性即可。由于volatile本身可以保证可见性,所以只需要保证对两个变量操作的原子性即可。
offer操作是在tail后面添加元素,也就是调用tail.casNext方法,而这个方法使用的是CAS操作,只有一个线程会成功,然后失败的线程会循环,重新获取tail,再执行casNext方法。poll操作也通过类似CAS的算法保证出队时移除节点操作的原子性