Java Review - 并发编程_ConcurrentLinkedQueue原理&源码剖析

简介: Java Review - 并发编程_ConcurrentLinkedQueue原理&源码剖析


概述

JDK中提供了一系列场景的并发安全队列。总的来说,按照实现方式的不同可分为阻塞队列和非阻塞队列,

  • 阻塞队列使用锁实现
  • 非阻塞队列则使用CAS非阻塞算法实现



ConcurrentLinkedQueue

ConcurrentLinkedQueue是线程安全的无界非阻塞队列,其底层数据结构使用单向链表实现,对于入队和出队操作使用CAS来实现线程安全。

【类图】

ConcurrentLinkedQueue内部的队列使用单向链表方式实现,

其中有两个volatile类型的Node节点分别用来存放队列的首、尾节点。

从下面的无参构造函数可知,默认头、尾节点都是指向item为null的哨兵节点。 新元素会被插入队列末尾,出队时从队列头部获取一个元素。

在Node节点内部则维护一个使用volatile修饰的变量item,用来存放节点的值;next用来存放链表的下一个节点,从而链接为一个单向无界链表。其内部则使用UNSafe工具类提供的CAS算法来保证出入队时操作链表的原子性。


核心方法&源码解读

下面我们介绍ConcurrentLinkedQueue的几个主要方法的实现原理。


offer

在链表末尾添加一个元素

/**
     * Inserts the specified element at the tail of this queue.
     * As the queue is unbounded, this method will never return {@code false}.
     *
     * @return {@code true} (as specified by {@link Queue#offer})
     * @throws NullPointerException if the specified element is null
     */
public boolean offer(E e) {
    //1  e为null则抛出空指针异常
    checkNotNull(e);
   //2 构造Node节点构造函数内部调用unsafe.putObject,后面统一讲
    final Node<E> newNode = new Node<E>(e);
    //3  从尾节点插入
    for (Node<E> t = tail, p = t;;) {
        Node<E> q = p.next;
        // 4  如果q=null说明p是尾节点则插入
        if (q == null) {
            //  5 使用cas设置p节点的next节点  
            if (p.casNext(null, newNode)) {
                // 6 cas成功说明新增节点已经被放入链表,然后设置当前尾节点(包含head,1,3,5.。。个节点为尾节点)
                if (p != t) // hop two nodes at a time
                    casTail(t, newNode);  // Failure is OK.
                return true;
            }
            // Lost CAS race to another thread; re-read next
        }
        else if (p == q)// 7 
            //多线程操作时候,由于poll时候会把老的head变为自引用,然后head的next变为新head,所以这里需要
            //重新找新的head,因为新的head后面的节点才是激活的节点
            p = (t != (t = tail)) ? t : head;
        else
            // 8 寻找尾节点 
            p = (p != t && t != (t = tail)) ? t : q;
    }
}
  • 首先看当一个线程调用offer(item)时的情况。首先代码(1)对传参进行空检查,如果为null则抛出NPE异常,否则执行代码(2)并使用item作为构造函数参数创建一个新的节点,然后代码(3)从队列尾部节点开始循环,打算从队列尾部添加元素,当执行到代码(4)时队列状态如下所示。

这时候节点p、t、head、tail同时指向了item为null的哨兵节点,由于哨兵节点的next节点为null,所以这里q也指向null。

  • q==null则执行代码(5),通过CAS原子操作判断p节点的next节点是否为null,如果为null则使用节点newNode替换p的next节点,然后执行代码(6),这里由于p==t所以没有设置尾部节点,然后退出offer方法,这时候队列的状态如下图所示

(2)上面是一个线程调用offer方法的情况,如果多个线程同时调用,就会存在多个线程同时执行到代码(5)的情况。假设线程A调用offer(item1),线程B调用offer(item2),同时执行到代码(5)p.casNext(null, newNode)。

由于CAS的比较设置操作是原子性的,所以这里假设线程A先执行了比较设置操作,发现当前p的next节点确实是null,则会原子性地更新next节点为 item1,这时候线程B也会判断p的next节点是否为null,结果发现不是null(因为线程A已经设置了p的next节点为 item1),则会跳到代码(3),然后执行到代码(4),这时候的队列分布如下图所示。

根据上面的状态图可知线程B接下来会执行代码(8),然后把q赋给了p,这时候队列状态如下图所示。

然后线程B再次跳转到代码(3)执行,当执行到代码(4)时队列状态如下图所示

由于这时候q==null,所以线程B会执行代码(5),通过CAS操作判断当前p的next节点是否是null,不是则再次循环尝试,是则使用item2替换。假设CAS成功了,那么执行代码(6),由于p!=t,所以设置tail节点为item2,然后退出offer方法。这时候队列分布如下图所示。

分析到现在,就差代码(7)还没走过,其实这一步要在执行poll操作后才会执行。这里先来看一下执行poll操作后可能会存在的一种情况,如下图所示。

下面分析当队列处于这种状态时调用offer添加元素,执行到代码(4)时的状态图,如下

这里由于q节点不为空并且pq所以执行代码(7),由于ttail所以p被赋值为head,然后重新循环,循环后执行到代码(4),这时候队列状态如下图所示。

这时候由于q==null,所以执行代码(5)进行CAS操作,如果当前没有其他线程执行offer操作,则CAS操作会成功,p的next节点被设置为新增节点。然后执行代码(6),由于p!=t所以设置新节点为队列的尾部节点,现在队列状态如图

需要注意的是,这里自引用的节点会被垃圾回收掉。

可见,offer操作中的关键步骤是代码(5),通过原子CAS操作来控制某时只有一个线程可以追加元素到队列末尾。进行CAS竞争失败的线程会通过循环一次次尝试进行CAS操作,直到CAS成功才会返回,也就是通过使用无限循环不断进行CAS尝试方式来替代阻塞算法挂起调用线程。相比阻塞算法,这是使用CPU资源换取阻塞所带来的开销。



add

add操作是在链表末尾添加一个元素,其实在内部调用的还是offer操作’\

/**
     * Inserts the specified element at the tail of this queue.
     * As the queue is unbounded, this method will never throw
     * {@link IllegalStateException} or return {@code false}.
     *
     * @return {@code true} (as specified by {@link Collection#add})
     * @throws NullPointerException if the specified element is null
     */
    public boolean add(E e) {
        return offer(e);
    }



poll

poll操作是在队列头部获取并移除一个元素,如果队列为空则返回null。

public E poll() {
      // 1. goto标记
        restartFromHead:
        // 2 无限循环
        for (;;) {
            for (Node<E> h = head, p = h, q;;) {
              // 3 保存当前节点
                E item = p.item;
        // 4 当前item有值,则CAS变为null
                if (item != null && p.casItem(item, null)) {
                    // Successful CAS is the linearization point
                    // for item to be removed from this queue.
                    // 5 cas成功则标记当前节点并从链表中移除
                    if (p != h) // hop two nodes at a time
                        updateHead(h, ((q = p.next) != null) ? q : p);
                    return item;
                }
                // 6 当前队列为空则返回null
                else if ((q = p.next) == null) {
                    updateHead(h, p);
                    return null;
                }
                // 7 如果当前节点被自己引用,则重新查找新的队列头节点
                else if (p == q)
                    continue restartFromHead;
                else // 8 
                    p = q;
            }
        }
    }

poll方法在移除一个元素时,只是简单地使用CAS操作把当前节点的item值设置为null,然后通过重新设置头节点将该元素从队列里面移除,被移除的节点就成了孤立节点,这个节点会在垃圾回收时被回收掉。另外,如果在执行分支中发现头节点被修改了,要跳到外层循环重新获取新的头节点。



peek

peek操作是获取队列头部一个元素(只获取不移除),如果队列为空则返回null

public E peek() {
      // 1 
        restartFromHead:
        for (;;) {
          // 2 
            for (Node<E> h = head, p = h, q;;) {
                E item = p.item;
                // 3 
                if (item != null || (q = p.next) == null) {
                    updateHead(h, p);
                    return item;
                }
                // 4 
                else if (p == q)
                    continue restartFromHead;
                else
                  // 5 
                    p = q;
            }
        }
    }

Peek操作的代码结构与poll操作类似,不同之处在于代码(3)中少了castItem操作。

其实这很正常,因为peek只是获取队列头元素值,并不清空其值。根据前面的介绍我们知道第一次执行offer后head指向的是哨兵节点(也就是item为null的节点),那么第一次执行peek时在代码(3)中会发现item==null,然后执行q=p.next,这时候q节点指向的才是队列里面第一个真正的元素,或者如果队列为null则q指向null。


总结:peek操作的代码与poll操作类似,只是前者只获取队列头元素但是并不从队列里将它删除,而后者获取后需要从队列里面将它删除。

另外,在第一次调用peek操作时,会删除哨兵节点,并让队列的head节点指向队列里面第一个元素或者null。


size

计算当前队列元素个数,在并发环境下不是很有用,因为CAS没有加锁,所以从调用size函数到返回结果期间有可能增删元素,导致统计的元素个数不精确

/**
     * Returns the number of elements in this queue.  If this queue
     * contains more than {@code Integer.MAX_VALUE} elements, returns
     * {@code Integer.MAX_VALUE}.
     *
     * <p>Beware that, unlike in most collections, this method is
     * <em>NOT</em> a constant-time operation. Because of the
     * asynchronous nature of these queues, determining the current
     * number of elements requires an O(n) traversal.
     * Additionally, if elements are added or removed during execution
     * of this method, the returned result may be inaccurate.  Thus,
     * this method is typically not very useful in concurrent
     * applications.
     *
     * @return the number of elements in this queue
     */
    public int size() {
        int count = 0;
        for (Node<E> p = first(); p != null; p = succ(p))
            if (p.item != null)
                // Collection.size() spec says to max out
                if (++count == Integer.MAX_VALUE)
                    break;
        return count;
    }
// 获取第一个元素,哨兵元素不算,没有则为null
    Node<E> first() {
        restartFromHead:
        for (;;) {
            for (Node<E> h = head, p = h, q;;) {
                boolean hasItem = (p.item != null);
                if (hasItem || (q = p.next) == null) {
                    updateHead(h, p);
                    return hasItem ? p : null;
                }
                else if (p == q)
                    continue restartFromHead;
                else
                    p = q;
            }
        }
    }



remove

如果队列里面存在该元素则删除该元素,如果存在多个则删除第一个,并返回true,否则返回false。

/**
     * Removes a single instance of the specified element from this queue,
     * if it is present.  More formally, removes an element {@code e} such
     * that {@code o.equals(e)}, if this queue contains one or more such
     * elements.
     * Returns {@code true} if this queue contained the specified element
     * (or equivalently, if this queue changed as a result of the call).
     *
     * @param o element to be removed from this queue, if present
     * @return {@code true} if this queue changed as a result of the call
     */
    public boolean remove(Object o) {
        //查找元素为空,直接返回false
    if (o == null) return false;
    Node<E> pred = null;
    for (Node<E> p = first(); p != null; p = succ(p)) {
        E item = p.item;
        //相等则使用cas值null,同时一个线程成功,失败的线程循环查找队列中其他元素是否有匹配的。
        if (item != null &&
            o.equals(item) &&
            p.casItem(item, null)) {
            //获取next元素
            Node<E> next = succ(p);
            //如果有前驱节点,并且next不为空则链接前驱节点到next,
            if (pred != null && next != null)
                pred.casNext(p, next);
            return true;
        }
        pred = p;
    }
    return false;
}



contains

判断队列里面是否含有指定对象,由于是遍历整个队列,所以像size 操作一样结果也不是那么精确,有可能调用该方法时元素还在队列里面,但是遍历过程中其他线程才把该元素删除了,那么就会返回false。

/**
     * Returns {@code true} if this queue contains the specified element.
     * More formally, returns {@code true} if and only if this queue contains
     * at least one element {@code e} such that {@code o.equals(e)}.
     *
     * @param o object to be checked for containment in this queue
     * @return {@code true} if this queue contains the specified element
     */
    public boolean contains(Object o) {
        if (o == null) return false;
        for (Node<E> p = first(); p != null; p = succ(p)) {
            E item = p.item;
            if (item != null && o.equals(item))
                return true;
        }
        return false;
    }

总结

ConcurrentLinkedQueue的底层使用单向链表数据结构来保存队列元素,每个元素被包装成一个Node节点。队列是靠头、尾节点来维护的,创建队列时头、尾节点指向一个item为null的哨兵节点。

第一次执行peek或者first操作时会把head指向第一个真正的队列元素。由于使用非阻塞CAS算法,没有加锁,所以在计算size时有可能进行了offer、poll或者remove操作,导致计算的元素个数不精确,所以在并发情况下size函数不是很有用。

如下图所示,入队、出队都是操作使用volatile修饰的tail、head节点,要保证在多线程下出入队线程安全,只需要保证这两个Node操作的可见性和原子性即可。由于volatile本身可以保证可见性,所以只需要保证对两个变量操作的原子性即可。

offer操作是在tail后面添加元素,也就是调用tail.casNext方法,而这个方法使用的是CAS操作,只有一个线程会成功,然后失败的线程会循环,重新获取tail,再执行casNext方法。poll操作也通过类似CAS的算法保证出队时移除节点操作的原子性


相关文章
|
2月前
|
安全 Java 程序员
深入理解Java内存模型与并发编程####
本文旨在探讨Java内存模型(JMM)的复杂性及其对并发编程的影响,不同于传统的摘要形式,本文将以一个实际案例为引子,逐步揭示JMM的核心概念,包括原子性、可见性、有序性,以及这些特性在多线程环境下的具体表现。通过对比分析不同并发工具类的应用,如synchronized、volatile关键字、Lock接口及其实现等,本文将展示如何在实践中有效利用JMM来设计高效且安全的并发程序。最后,还将简要介绍Java 8及更高版本中引入的新特性,如StampedLock,以及它们如何进一步优化多线程编程模型。 ####
54 0
|
18天前
|
安全 Java 开发者
【JAVA】封装多线程原理
Java 中的多线程封装旨在简化使用、提高安全性和增强可维护性。通过抽象和隐藏底层细节,提供简洁接口。常见封装方式包括基于 Runnable 和 Callable 接口的任务封装,以及线程池的封装。Runnable 适用于无返回值任务,Callable 支持有返回值任务。线程池(如 ExecutorService)则用于管理和复用线程,减少性能开销。示例代码展示了如何实现这些封装,使多线程编程更加高效和安全。
|
18天前
|
存储 算法 Java
【JAVA】生成accessToken原理
在Java中,生成accessToken用于身份验证和授权,确保合法用户访问受保护资源。流程包括:1. 身份验证(如用户名密码、OAuth 2.0);2. 生成唯一且安全的令牌;3. 设置令牌有效期并存储;4. 客户端传递令牌,服务器验证其有效性。常见场景为OAuth 2.0协议,涉及客户端注册、用户授权、获取授权码和换取accessToken。示例代码展示了使用Apache HttpClient库模拟OAuth 2.0获取accessToken的过程。
|
2月前
|
监控 Java API
探索Java NIO:究竟在哪些领域能大显身手?揭秘原理、应用场景与官方示例代码
Java NIO(New IO)自Java SE 1.4引入,提供比传统IO更高效、灵活的操作,支持非阻塞IO和选择器特性,适用于高并发、高吞吐量场景。NIO的核心概念包括通道(Channel)、缓冲区(Buffer)和选择器(Selector),能实现多路复用和异步操作。其应用场景涵盖网络通信、文件操作、进程间通信及数据库操作等。NIO的优势在于提高并发性和性能,简化编程;但学习成本较高,且与传统IO存在不兼容性。尽管如此,NIO在构建高性能框架如Netty、Mina和Jetty中仍广泛应用。
60 3
|
2月前
|
安全 算法 Java
Java CAS原理和应用场景大揭秘:你掌握了吗?
CAS(Compare and Swap)是一种乐观锁机制,通过硬件指令实现原子操作,确保多线程环境下对共享变量的安全访问。它避免了传统互斥锁的性能开销和线程阻塞问题。CAS操作包含三个步骤:获取期望值、比较当前值与期望值是否相等、若相等则更新为新值。CAS广泛应用于高并发场景,如数据库事务、分布式锁、无锁数据结构等,但需注意ABA问题。Java中常用`java.util.concurrent.atomic`包下的类支持CAS操作。
93 2
|
安全 Java
Java并发编程笔记之CopyOnWriteArrayList源码分析
并发包中并发List只有CopyOnWriteArrayList这一个,CopyOnWriteArrayList是一个线程安全的ArrayList,对其进行修改操作和元素迭代操作都是在底层创建一个拷贝数组(快照)上进行的,也就是写时拷贝策略。
19570 0
|
Java 安全
Java并发编程笔记之读写锁 ReentrantReadWriteLock 源码分析
我们知道在解决线程安全问题上使用 ReentrantLock 就可以,但是 ReentrantLock 是独占锁,同时只有一个线程可以获取该锁,而实际情况下会有写少读多的场景,显然 ReentrantLock 满足不了需求,所以 ReentrantReadWriteLock 应运而生,ReentrantReadWriteLock 采用读写分离,多个线程可以同时获取读锁。
3158 0
|
Java
Java并发编程笔记之FutureTask源码分析
FutureTask可用于异步获取执行结果或取消执行任务的场景。通过传入Runnable或者Callable的任务给FutureTask,直接调用其run方法或者放入线程池执行,之后可以在外部通过FutureTask的get方法异步获取执行结果,因此,FutureTask非常适合用于耗时的计算,主线程可以在完成自己的任务后,再去获取结果。
4310 0
|
Java 调度 API
Java并发编程笔记之Timer源码分析
timer在JDK里面,是很早的一个API了。具有延时的,并具有周期性的任务,在newScheduledThreadPool出来之前我们一般会用Timer和TimerTask来做,但是Timer存在一些缺陷,为什么这么说呢?   Timer只创建唯一的线程来执行所有Timer任务。
3026 0
|
Java
Java并发编程笔记之Semaphore信号量源码分析
JUC 中 Semaphore 的使用与原理分析,Semaphore 也是 Java 中的一个同步器,与 CountDownLatch 和 CycleBarrier 不同在于它内部的计数器是递增的,那么,Semaphore 的内部实现是怎样的呢?   Semaphore 信号量也是Java 中一个同步容器,与CountDownLatch 和 CyclicBarrier 不同之处在于它内部的计数器是递增的。
4308 0

热门文章

最新文章