Java JUC ConcurrentLinkedQueue解析

本文涉及的产品
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
云解析 DNS,旗舰版 1个月
简介: ConcurrentLinkedQueue 原理探究

ConcurrentLinkedQueue 原理探究


介绍

ConcurrentLinkedQueue 是线程安全的无界非阻塞队列,底层使用单向链表实现,对于入队和出队操作使用 CAS 实现线程安全。

1654826948121.png

ConcurrentLinkedQueue 内部的队列使用单向链表方式实现,其中有两个volatile类型的Node节点分别用来存放队列的头、尾节点。


下面无参构造函数中可以知道,默认头尾节点都是指向 item 为 null 的哨兵节点,新元素插入队尾,获取元素从头部出队。

private transient volatile Node<E> head;
private transient volatile Node<E> tail;
public ConcurrentLinkedQueue() {
    head = tail = new Node<E>(null);
}

在 Node 节点中维护一个使用 volatile 修饰的变量 item,用来存放节点的值;next 存放链表的下一个节点;其内部则使用 UNSafe 工具类提供的 CAS 算法来保证出入队时操作链表的原子性。


offer 操作

offer 操作是在队尾增加一个元素,如果传递的参数是 null 则抛出异常,否则由于 ConcurrentLinkedQueue 是无界队列,该方法会一直返回 true,另外,由于使用的是 CAS 算法,所以该方法不会被阻塞挂起调用方线程。下面我们看源码:

public boolean offer(E e) {
        //(1)为null则抛出异常
        checkNotNull(e);
       //(2)构造Node节点,内部调用unsafe.putObject
        final Node<E> newNode = new Node<E>(e);
        //(3)从尾节点插入
        for (Node<E> t = tail, p = t;;) {
            Node<E> q = p.next;
            //(4)如果q == null则说明p是尾节点,执行插入
            if (q == null) {
                //(5)使用CAS设置p节点的next节点
                if (p.casNext(null, newNode)) {
                    //(6)如果p != t,则将入队节点设置成tail节点,更新失败了也没关系,因为失败了表示有其他线程成功更新了tail节点
                    if (p != t) // hop two nodes at a time
                        casTail(t, newNode);  // Failure is OK.
                    return true;
                }
            }
            else if (p == q)
                //(7)多线程操作时,由于poll操作移除元素后可能会把head变成自引用,也就是head的next变成了head,这里需要重新找新的head
                p = (t != (t = tail)) ? t : head;
            else
                //(8)寻找尾节点
                p = (p != t && t != (t = tail)) ? t : q;
        }
}

我们解析一下这个方法的执行流程,首先当调用该方法时,代码(1)对传参进行检查,为 null 则抛出异常,否则执行代码(2)并使用 item 作为构造函数参数创建一个新的节点,然后代码(3)从队列尾部节点开始循环,打算从尾部添加元素,到代码(4)时队列状态如下图。

image.png

这时候节点 p、t、head、tail 都指向 item 为 null 的哨兵节点,由于哨兵节点 next 节点为 null,所以 q 也是 null。


代码(4)发现 q == null 则执行代码(5),使用 CAS 判断 p 节点的 next 是否为 null,为 null 则使用 newNode 替换 p 的 next 节点,然后执行代码(6),但这时候 p == t 所以没有设置尾部节点,然后退出 offer 方法。目前队列状态如下图:

image.png

刚才我们讲解的是一个线程调用 offer 方法的情况,但是如果多个线程同时调用,就会存在多个线程同时执行到代码(5)的情况。


假设线程 1 调用了 offer(item1),线程 2 调用了 offer(item2),都同时执行到了代码(5),p.casNext(null,newNode)。由于 CAS 是原子性的,我们假设线程 1 先执行 CAS 操作,发现 p.next 为 null,则更新为 item1,这时候线程 2 也会判断 p.next 是否为 null,发现不为 null,则跳转到代码(3),然后执行代码(4)。这时队列分布如下图:

image.png

随后线程 2 发现不满足代码(4),则跳转执行代码(8),然后把 q 赋值给了 p。队列状态如下:

image.png

然后线程 2 再次跳转到代码(3)执行,当执行到代码(4)时队列状态如下图:

image.png

这时候q == null,所以线程 2 会执行代码(5),通过 CAS 操作判断 p.next 节点是否为 null,是 null 则使用 item2 替换,不是则继续循环尝试。假设 CAS 成功,那么执行代码(6),由于p != t,所以设置 tail 节点为 item2,然后退出 offer 方法。这时候队列分布如下:

image.png

到现在我们就差代码(7)没有讲过,其实在这一步主要是在执行 poll 操作后才会执行。在执行 poll 可能会发生如下情况:

image.png

我们分析一下如果是这种状态时调用 offer 添加元素,在执行到代码(4)时状态图如下:

image.png

这里由于 q 节点不为空并且p == q所以执行到代码(7)由于t == tail所以 p 被赋值为 head,然后重新循环,循环后执行到代码(4),队列状态如下:

image.png

这时候由于q == null,所以执行代码(5)进行 CAS 操作,如果当前没有其他线程执行 offer 操作,则 CAS 操作会成功,p.next 节点被设置为新增节点。然后执行代码(6),由于p != t所以设置新节点为队列的尾部节点,现在队列状态如下图:

image.png

📢 需要注意,这里自引用节点会被垃圾回收掉


总结:offer 操作的关键步骤是代码(5),通过 CAS 操作来控制同一时刻只有一个线程可以追加元素到队列末尾,而失败的线程会进行循环尝试 CAS 操作,直到 CAS 成功。


add 操作

add 操作是在链表末尾添加一个元素,内部还是调用的 offer 方法。

public boolean add(E e) {
    return offer(e);
}

poll 操作


poll 操作是在队列头部获取并移除一个元素,如果队列为空则返回 null。下面看看该方法的实现原理。

public E poll() {
    //(1)goto标记
    restartFromHead:
    //(2)无限循环
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            //(3)保存当前节点
            E item = p.item;
                         //(4)当前节点有值切CAS变为null
            if (item != null && p.casItem(item, null)) {
                //(5)CAS成功后标记当前节点并移除
                if (p != h) // hop two nodes at a time
                    //更新头节点
                    updateHead(h, ((q = p.next) != null) ? q : p);
                return item;
            }
           //(6)当前队列为空则返回null
            else if ((q = p.next) == null) {
                updateHead(h, p);
                return null;
            }
            //(7)如果当前节点被自引用,则重新寻找头节点
            else if (p == q)
                continue restartFromHead;
            else  //(8)如果下一个元素不为空,则将头节点的下一个节点设置成头节点
                p = q;
        }
    }
}
final void updateHead(Node<E> h, Node<E> p) {
    if (h != p && casHead(h, p))
        h.lazySetNext(h);
}

由于 poll 方法是从头部获取一个元素并移除,所以代码(2)内层循环是从 head 节点开始,代码(3)获取当前队列头节点,队列一开始为空时队列状态如下:

image.png

由于 head 节点指向的 item 为 null 的哨兵节点,所以会执行到代码(6),假设这个过程中没有线程调用 offer 方法,则此时 q 等于 null,这时队列状态如下:

image.png

随后执行 updateHead 方法,由于h == p,所以没有设置头节点,然后返回 null。


假设执行到代码(6)时,已经有其它线程调用了 offer 方法并成功添加了一个元素到队列,这时候 q 指向的是新增加的元素节点,如下图:

image.png

然后不满足代码(6)(q = p.next) == null,执行代码(7)的时候 p != q则执行代码(8),然后将 p 指向了节点 q,队列状态如下:

image.png

然后程序又去执行代码(3),p 现在指向的不为 null,则执行p.casItem(item, null)通过 CAS 操作尝试设置 p 的 item 值为 null,如果此时 CAS 成功,执行代码(5),此时p != h则设置头节点为 p,并且设置 h 的 next 节点为自己,poll 然后返回被移除的节点 item。队列图如下:

image.png

目前的队列状态就是我们刚才讲 offer 操作时,执行到代码(7)的状态。


现在我们还有代码(7)没执行过,我们看一下什么时候执行。假设线程 1 执行 poll 操作时,队列状态如下:

image.png

那么执行p.casItem(item, null)通过 CAS 操作尝试设置 p 的 item 值为 null,假设 CAS 设置成功则标记该节点并从队列中将其移除。队列状态如下:

image.png

然后,由于p != h,所以会执行 updateHead 方法,假如线程 1 执行 updateHead 前另外一个线程 2 开始 poll 操作,这时候线程 2 的 p 指向 head 节点,但是还没有执行到代码(6),这时候队列状态如下:

image.png

然后线程 1 执行 updateHead 操作,执行完毕后线程 1 退出,这时候队列状态如下:

image.png

然后线程 2 继续执行代码(6), q = p.next,由于该节点是自引用节点,所以p == q,所以会执行代码(7)跳到外层循环 restartFromHead,获取当前队列头 head,现在的状态如下:

image.png

总结:poll 方法在移除一个元素时,只是简单地使用 CAS 操作把当前节点的 item 值设置为 null,然后通过重新设置头节点将该元素从队列里面移除,被移除的节点就成了孤立节点,这个节点会在垃圾回收时被回收掉。另外,如果在执行分支中发现头节点被修改了,要跳到外层循环重新获取新的头节点。


peek 操作

peek 操作是获取队列头部一个元素(只获取不移除),如果队列为空则返回 null。下面看下其实现原理。

public E peek() {
   //1.
    restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            //2.
            E item = p.item;
            //3.
            if (item != null || (q = p.next) == null) {
                updateHead(h, p);
                return item;
            }
            //4.
            else if (p == q)
                continue restartFromHead;
            //5.
            else
                p = q;
        }
    }
}

peek 操作的代码结构与 poll 操作类似,不同之处在于代码(3)中少了 castItem 操作。其实这很正常,因为 peek 只是获取队列头元素值,并不清空其值。根据前面的介绍我们知道第一次执行 offer 后 head 指向的是哨兵节点(也就是 item 为 null 的节点),那么第一次执行 peek 时在代码(3)中会发现item == null,然后执行q = p.next,这时候 q 节点指向的才是队列里面第一个真正的元素,或者如果队列为 null 则 q 指向 null。


当队列为空时,队列如下:

image.png

这时候执行 updateHead,由于 h 节点等于 p 节点,所以不进行任何操作,然后 peek 操作会返回 null。


当队列中至少有一个元素时(假设只有一个),队列状态如下:

image.png

这时候执行代码(5), p 指向了 q 节点,然后执行代码(3),此时队列状态如下:

image.png

执行代码(3)时发现 item 不为 null,所以执行 updateHead 方法,由于h != p,所以设置头节点,设置后队列状态如下:

image.png

最后剔除哨兵节点。


总结:peek 操作的代码与 poll 操作类似,只是前者只获取队列头元素但是并不从队列里将它删除,而后者获取后需要从队列里面将它删除。另外,在第一次调用 peek 操作时,会删除哨兵节点,并让队列的 head 节点指向队列里面第一个元素或者为 null


size 操作

计算当前队列元素个数,在并发环境下不是很有用,因为 CAS 没有加锁,所以从调用 size 方法到返回结果期间有可能增删元素,导致统计的元素个数不精确。

public int size() {
    int count = 0;
    for (Node<E> p = first(); p != null; p = succ(p))
        if (p.item != null)
            // Collection.size() spec says to max out
            if (++count == Integer.MAX_VALUE)
                break;
    return count;
}
//获取第一个队列元素,(剔除哨兵节点),没有则为null
Node<E> first() {
    restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            boolean hasItem = (p.item != null);
            if (hasItem || (q = p.next) == null) {
                updateHead(h, p);
                return hasItem ? p : null;
            }
            else if (p == q)
                continue restartFromHead;
            else
                p = q;
        }
    }
}

remove 操作

如果队列里面存在该元素则删除该元素,如果存在多个则删除第一个,并返回 true,否则返回 false。

public boolean remove(Object o) {
    if (o != null) {
        Node<E> next, pred = null;
        for (Node<E> p = first(); p != null; pred = p, p = next) {
            boolean removed = false;
            E item = p.item;
            if (item != null) {
              // 若不匹配,则获取next节点继续匹配
                if (!o.equals(item)) {
                    next = succ(p);
                    continue;
                }
              //相等则使用CAS设置为null
                removed = p.casItem(item, null);
            }
            // 获取删除节点的后继节点
            next = succ(p);
            //如果有前驱节点,并且next节点不为null,则链接前驱节点到next节点
            if (pred != null && next != null) // unlink
              // 将被删除的节点通过CAS移除队列
                pred.casNext(p, next);
            if (removed)
                return true;
        }
    }
    return false;
}

contains 操作

判断队列里面是否含有指定对象,由于是遍历整个队列,所以像 size 操作一样结果也不是那么精确,有可能调用该方法时元素还在队列里面,但是遍历过程中其他线程才把该元素删除了,那么就会返回 false。

public boolean contains(Object o) {
    if (o == null) return false;
    for (Node<E> p = first(); p != null; p = succ(p)) {
        E item = p.item;
        if (item != null && o.equals(item))
            return true;
    }
    return false;
}

总结

ConcurrentLinkedQueue 的底层使用单向链表数据结构来保存队列元素,每个元素被包装成一个 Node 节点。队列是靠头、尾节点来维护的,创建队列时头、尾节点指向一个 item 为 null 的哨兵节点。第一次执行 peek 或者 first 操作时会把 head 指向第一个真正的队列元素。由于使用非阻塞 CAS 算法,没有加锁,所以在计算 size 时有可能进行了 offer、poll 或者 remove 操作,导致计算的元素个数不精确,所以在并发情况下 size 方法不是很有用。

相关文章
|
7天前
|
人工智能 自然语言处理 Java
FastExcel:开源的 JAVA 解析 Excel 工具,集成 AI 通过自然语言处理 Excel 文件,完全兼容 EasyExcel
FastExcel 是一款基于 Java 的高性能 Excel 处理工具,专注于优化大规模数据处理,提供简洁易用的 API 和流式操作能力,支持从 EasyExcel 无缝迁移。
55 9
FastExcel:开源的 JAVA 解析 Excel 工具,集成 AI 通过自然语言处理 Excel 文件,完全兼容 EasyExcel
|
14天前
|
存储 缓存 Java
Java 并发编程——volatile 关键字解析
本文介绍了Java线程中的`volatile`关键字及其与`synchronized`锁的区别。`volatile`保证了变量的可见性和一定的有序性,但不能保证原子性。它通过内存屏障实现,避免指令重排序,确保线程间数据一致。相比`synchronized`,`volatile`性能更优,适用于简单状态标记和某些特定场景,如单例模式中的双重检查锁定。文中还解释了Java内存模型的基本概念,包括主内存、工作内存及并发编程中的原子性、可见性和有序性。
Java 并发编程——volatile 关键字解析
|
12天前
|
Java 数据库连接 Spring
反射-----浅解析(Java)
在java中,我们可以通过反射机制,知道任何一个类的成员变量(成员属性)和成员方法,也可以堆任何一个对象,调用这个对象的任何属性和方法,更进一步我们还可以修改部分信息和。
|
1月前
|
存储 算法 Java
Java内存管理深度解析####
本文深入探讨了Java虚拟机(JVM)中的内存分配与垃圾回收机制,揭示了其高效管理内存的奥秘。文章首先概述了JVM内存模型,随后详细阐述了堆、栈、方法区等关键区域的作用及管理策略。在垃圾回收部分,重点介绍了标记-清除、复制算法、标记-整理等多种回收算法的工作原理及其适用场景,并通过实际案例分析了不同GC策略对应用性能的影响。对于开发者而言,理解这些原理有助于编写出更加高效、稳定的Java应用程序。 ####
|
1月前
|
存储 监控 算法
Java虚拟机(JVM)垃圾回收机制深度解析与优化策略####
本文旨在深入探讨Java虚拟机(JVM)的垃圾回收机制,揭示其工作原理、常见算法及参数调优方法。通过剖析垃圾回收的生命周期、内存区域划分以及GC日志分析,为开发者提供一套实用的JVM垃圾回收优化指南,助力提升Java应用的性能与稳定性。 ####
|
2月前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
88 2
|
3月前
|
缓存 Java 程序员
Map - LinkedHashSet&Map源码解析
Map - LinkedHashSet&Map源码解析
89 0
|
3月前
|
算法 Java 容器
Map - HashSet & HashMap 源码解析
Map - HashSet & HashMap 源码解析
69 0
|
13天前
|
存储 设计模式 算法
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
行为型模式用于描述程序在运行时复杂的流程控制,即描述多个类或对象之间怎样相互协作共同完成单个对象都无法单独完成的任务,它涉及算法与对象间职责的分配。行为型模式分为类行为模式和对象行为模式,前者采用继承机制来在类间分派行为,后者采用组合或聚合在对象间分配行为。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象行为模式比类行为模式具有更大的灵活性。 行为型模式分为: • 模板方法模式 • 策略模式 • 命令模式 • 职责链模式 • 状态模式 • 观察者模式 • 中介者模式 • 迭代器模式 • 访问者模式 • 备忘录模式 • 解释器模式
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
|
13天前
|
设计模式 存储 安全
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
结构型模式描述如何将类或对象按某种布局组成更大的结构。它分为类结构型模式和对象结构型模式,前者采用继承机制来组织接口和类,后者釆用组合或聚合来组合对象。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象结构型模式比类结构型模式具有更大的灵活性。 结构型模式分为以下 7 种: • 代理模式 • 适配器模式 • 装饰者模式 • 桥接模式 • 外观模式 • 组合模式 • 享元模式
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析