Java JUC ConcurrentLinkedQueue解析

本文涉及的产品
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
云解析DNS,个人版 1个月
简介: ConcurrentLinkedQueue 原理探究

ConcurrentLinkedQueue 原理探究


介绍

ConcurrentLinkedQueue 是线程安全的无界非阻塞队列,底层使用单向链表实现,对于入队和出队操作使用 CAS 实现线程安全。

1654826948121.png

ConcurrentLinkedQueue 内部的队列使用单向链表方式实现,其中有两个volatile类型的Node节点分别用来存放队列的头、尾节点。


下面无参构造函数中可以知道,默认头尾节点都是指向 item 为 null 的哨兵节点,新元素插入队尾,获取元素从头部出队。

private transient volatile Node<E> head;
private transient volatile Node<E> tail;
public ConcurrentLinkedQueue() {
    head = tail = new Node<E>(null);
}

在 Node 节点中维护一个使用 volatile 修饰的变量 item,用来存放节点的值;next 存放链表的下一个节点;其内部则使用 UNSafe 工具类提供的 CAS 算法来保证出入队时操作链表的原子性。


offer 操作

offer 操作是在队尾增加一个元素,如果传递的参数是 null 则抛出异常,否则由于 ConcurrentLinkedQueue 是无界队列,该方法会一直返回 true,另外,由于使用的是 CAS 算法,所以该方法不会被阻塞挂起调用方线程。下面我们看源码:

public boolean offer(E e) {
        //(1)为null则抛出异常
        checkNotNull(e);
       //(2)构造Node节点,内部调用unsafe.putObject
        final Node<E> newNode = new Node<E>(e);
        //(3)从尾节点插入
        for (Node<E> t = tail, p = t;;) {
            Node<E> q = p.next;
            //(4)如果q == null则说明p是尾节点,执行插入
            if (q == null) {
                //(5)使用CAS设置p节点的next节点
                if (p.casNext(null, newNode)) {
                    //(6)如果p != t,则将入队节点设置成tail节点,更新失败了也没关系,因为失败了表示有其他线程成功更新了tail节点
                    if (p != t) // hop two nodes at a time
                        casTail(t, newNode);  // Failure is OK.
                    return true;
                }
            }
            else if (p == q)
                //(7)多线程操作时,由于poll操作移除元素后可能会把head变成自引用,也就是head的next变成了head,这里需要重新找新的head
                p = (t != (t = tail)) ? t : head;
            else
                //(8)寻找尾节点
                p = (p != t && t != (t = tail)) ? t : q;
        }
}

我们解析一下这个方法的执行流程,首先当调用该方法时,代码(1)对传参进行检查,为 null 则抛出异常,否则执行代码(2)并使用 item 作为构造函数参数创建一个新的节点,然后代码(3)从队列尾部节点开始循环,打算从尾部添加元素,到代码(4)时队列状态如下图。

image.png

这时候节点 p、t、head、tail 都指向 item 为 null 的哨兵节点,由于哨兵节点 next 节点为 null,所以 q 也是 null。


代码(4)发现 q == null 则执行代码(5),使用 CAS 判断 p 节点的 next 是否为 null,为 null 则使用 newNode 替换 p 的 next 节点,然后执行代码(6),但这时候 p == t 所以没有设置尾部节点,然后退出 offer 方法。目前队列状态如下图:

image.png

刚才我们讲解的是一个线程调用 offer 方法的情况,但是如果多个线程同时调用,就会存在多个线程同时执行到代码(5)的情况。


假设线程 1 调用了 offer(item1),线程 2 调用了 offer(item2),都同时执行到了代码(5),p.casNext(null,newNode)。由于 CAS 是原子性的,我们假设线程 1 先执行 CAS 操作,发现 p.next 为 null,则更新为 item1,这时候线程 2 也会判断 p.next 是否为 null,发现不为 null,则跳转到代码(3),然后执行代码(4)。这时队列分布如下图:

image.png

随后线程 2 发现不满足代码(4),则跳转执行代码(8),然后把 q 赋值给了 p。队列状态如下:

image.png

然后线程 2 再次跳转到代码(3)执行,当执行到代码(4)时队列状态如下图:

image.png

这时候q == null,所以线程 2 会执行代码(5),通过 CAS 操作判断 p.next 节点是否为 null,是 null 则使用 item2 替换,不是则继续循环尝试。假设 CAS 成功,那么执行代码(6),由于p != t,所以设置 tail 节点为 item2,然后退出 offer 方法。这时候队列分布如下:

image.png

到现在我们就差代码(7)没有讲过,其实在这一步主要是在执行 poll 操作后才会执行。在执行 poll 可能会发生如下情况:

image.png

我们分析一下如果是这种状态时调用 offer 添加元素,在执行到代码(4)时状态图如下:

image.png

这里由于 q 节点不为空并且p == q所以执行到代码(7)由于t == tail所以 p 被赋值为 head,然后重新循环,循环后执行到代码(4),队列状态如下:

image.png

这时候由于q == null,所以执行代码(5)进行 CAS 操作,如果当前没有其他线程执行 offer 操作,则 CAS 操作会成功,p.next 节点被设置为新增节点。然后执行代码(6),由于p != t所以设置新节点为队列的尾部节点,现在队列状态如下图:

image.png

📢 需要注意,这里自引用节点会被垃圾回收掉


总结:offer 操作的关键步骤是代码(5),通过 CAS 操作来控制同一时刻只有一个线程可以追加元素到队列末尾,而失败的线程会进行循环尝试 CAS 操作,直到 CAS 成功。


add 操作

add 操作是在链表末尾添加一个元素,内部还是调用的 offer 方法。

public boolean add(E e) {
    return offer(e);
}

poll 操作


poll 操作是在队列头部获取并移除一个元素,如果队列为空则返回 null。下面看看该方法的实现原理。

public E poll() {
    //(1)goto标记
    restartFromHead:
    //(2)无限循环
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            //(3)保存当前节点
            E item = p.item;
                         //(4)当前节点有值切CAS变为null
            if (item != null && p.casItem(item, null)) {
                //(5)CAS成功后标记当前节点并移除
                if (p != h) // hop two nodes at a time
                    //更新头节点
                    updateHead(h, ((q = p.next) != null) ? q : p);
                return item;
            }
           //(6)当前队列为空则返回null
            else if ((q = p.next) == null) {
                updateHead(h, p);
                return null;
            }
            //(7)如果当前节点被自引用,则重新寻找头节点
            else if (p == q)
                continue restartFromHead;
            else  //(8)如果下一个元素不为空,则将头节点的下一个节点设置成头节点
                p = q;
        }
    }
}
final void updateHead(Node<E> h, Node<E> p) {
    if (h != p && casHead(h, p))
        h.lazySetNext(h);
}

由于 poll 方法是从头部获取一个元素并移除,所以代码(2)内层循环是从 head 节点开始,代码(3)获取当前队列头节点,队列一开始为空时队列状态如下:

image.png

由于 head 节点指向的 item 为 null 的哨兵节点,所以会执行到代码(6),假设这个过程中没有线程调用 offer 方法,则此时 q 等于 null,这时队列状态如下:

image.png

随后执行 updateHead 方法,由于h == p,所以没有设置头节点,然后返回 null。


假设执行到代码(6)时,已经有其它线程调用了 offer 方法并成功添加了一个元素到队列,这时候 q 指向的是新增加的元素节点,如下图:

image.png

然后不满足代码(6)(q = p.next) == null,执行代码(7)的时候 p != q则执行代码(8),然后将 p 指向了节点 q,队列状态如下:

image.png

然后程序又去执行代码(3),p 现在指向的不为 null,则执行p.casItem(item, null)通过 CAS 操作尝试设置 p 的 item 值为 null,如果此时 CAS 成功,执行代码(5),此时p != h则设置头节点为 p,并且设置 h 的 next 节点为自己,poll 然后返回被移除的节点 item。队列图如下:

image.png

目前的队列状态就是我们刚才讲 offer 操作时,执行到代码(7)的状态。


现在我们还有代码(7)没执行过,我们看一下什么时候执行。假设线程 1 执行 poll 操作时,队列状态如下:

image.png

那么执行p.casItem(item, null)通过 CAS 操作尝试设置 p 的 item 值为 null,假设 CAS 设置成功则标记该节点并从队列中将其移除。队列状态如下:

image.png

然后,由于p != h,所以会执行 updateHead 方法,假如线程 1 执行 updateHead 前另外一个线程 2 开始 poll 操作,这时候线程 2 的 p 指向 head 节点,但是还没有执行到代码(6),这时候队列状态如下:

image.png

然后线程 1 执行 updateHead 操作,执行完毕后线程 1 退出,这时候队列状态如下:

image.png

然后线程 2 继续执行代码(6), q = p.next,由于该节点是自引用节点,所以p == q,所以会执行代码(7)跳到外层循环 restartFromHead,获取当前队列头 head,现在的状态如下:

image.png

总结:poll 方法在移除一个元素时,只是简单地使用 CAS 操作把当前节点的 item 值设置为 null,然后通过重新设置头节点将该元素从队列里面移除,被移除的节点就成了孤立节点,这个节点会在垃圾回收时被回收掉。另外,如果在执行分支中发现头节点被修改了,要跳到外层循环重新获取新的头节点。


peek 操作

peek 操作是获取队列头部一个元素(只获取不移除),如果队列为空则返回 null。下面看下其实现原理。

public E peek() {
   //1.
    restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            //2.
            E item = p.item;
            //3.
            if (item != null || (q = p.next) == null) {
                updateHead(h, p);
                return item;
            }
            //4.
            else if (p == q)
                continue restartFromHead;
            //5.
            else
                p = q;
        }
    }
}

peek 操作的代码结构与 poll 操作类似,不同之处在于代码(3)中少了 castItem 操作。其实这很正常,因为 peek 只是获取队列头元素值,并不清空其值。根据前面的介绍我们知道第一次执行 offer 后 head 指向的是哨兵节点(也就是 item 为 null 的节点),那么第一次执行 peek 时在代码(3)中会发现item == null,然后执行q = p.next,这时候 q 节点指向的才是队列里面第一个真正的元素,或者如果队列为 null 则 q 指向 null。


当队列为空时,队列如下:

image.png

这时候执行 updateHead,由于 h 节点等于 p 节点,所以不进行任何操作,然后 peek 操作会返回 null。


当队列中至少有一个元素时(假设只有一个),队列状态如下:

image.png

这时候执行代码(5), p 指向了 q 节点,然后执行代码(3),此时队列状态如下:

image.png

执行代码(3)时发现 item 不为 null,所以执行 updateHead 方法,由于h != p,所以设置头节点,设置后队列状态如下:

image.png

最后剔除哨兵节点。


总结:peek 操作的代码与 poll 操作类似,只是前者只获取队列头元素但是并不从队列里将它删除,而后者获取后需要从队列里面将它删除。另外,在第一次调用 peek 操作时,会删除哨兵节点,并让队列的 head 节点指向队列里面第一个元素或者为 null


size 操作

计算当前队列元素个数,在并发环境下不是很有用,因为 CAS 没有加锁,所以从调用 size 方法到返回结果期间有可能增删元素,导致统计的元素个数不精确。

public int size() {
    int count = 0;
    for (Node<E> p = first(); p != null; p = succ(p))
        if (p.item != null)
            // Collection.size() spec says to max out
            if (++count == Integer.MAX_VALUE)
                break;
    return count;
}
//获取第一个队列元素,(剔除哨兵节点),没有则为null
Node<E> first() {
    restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            boolean hasItem = (p.item != null);
            if (hasItem || (q = p.next) == null) {
                updateHead(h, p);
                return hasItem ? p : null;
            }
            else if (p == q)
                continue restartFromHead;
            else
                p = q;
        }
    }
}

remove 操作

如果队列里面存在该元素则删除该元素,如果存在多个则删除第一个,并返回 true,否则返回 false。

public boolean remove(Object o) {
    if (o != null) {
        Node<E> next, pred = null;
        for (Node<E> p = first(); p != null; pred = p, p = next) {
            boolean removed = false;
            E item = p.item;
            if (item != null) {
              // 若不匹配,则获取next节点继续匹配
                if (!o.equals(item)) {
                    next = succ(p);
                    continue;
                }
              //相等则使用CAS设置为null
                removed = p.casItem(item, null);
            }
            // 获取删除节点的后继节点
            next = succ(p);
            //如果有前驱节点,并且next节点不为null,则链接前驱节点到next节点
            if (pred != null && next != null) // unlink
              // 将被删除的节点通过CAS移除队列
                pred.casNext(p, next);
            if (removed)
                return true;
        }
    }
    return false;
}

contains 操作

判断队列里面是否含有指定对象,由于是遍历整个队列,所以像 size 操作一样结果也不是那么精确,有可能调用该方法时元素还在队列里面,但是遍历过程中其他线程才把该元素删除了,那么就会返回 false。

public boolean contains(Object o) {
    if (o == null) return false;
    for (Node<E> p = first(); p != null; p = succ(p)) {
        E item = p.item;
        if (item != null && o.equals(item))
            return true;
    }
    return false;
}

总结

ConcurrentLinkedQueue 的底层使用单向链表数据结构来保存队列元素,每个元素被包装成一个 Node 节点。队列是靠头、尾节点来维护的,创建队列时头、尾节点指向一个 item 为 null 的哨兵节点。第一次执行 peek 或者 first 操作时会把 head 指向第一个真正的队列元素。由于使用非阻塞 CAS 算法,没有加锁,所以在计算 size 时有可能进行了 offer、poll 或者 remove 操作,导致计算的元素个数不精确,所以在并发情况下 size 方法不是很有用。

相关文章
|
3天前
|
自然语言处理 Java 应用服务中间件
Java 18 新特性解析
Java 18 新特性解析
|
3天前
|
存储 设计模式 Java
Java中的if-else语句:深入解析与应用实践
Java中的if-else语句:深入解析与应用实践
|
3天前
|
Java 索引
Java中的for循环:深度解析
Java中的for循环:深度解析
|
3天前
|
设计模式 存储 Java
掌握Java设计模式的23种武器(全):深入解析与实战示例
掌握Java设计模式的23种武器(全):深入解析与实战示例
|
3天前
|
算法 安全 Java
深入解析Java多线程:源码级别的分析与实践
深入解析Java多线程:源码级别的分析与实践
|
8天前
|
安全 Java 数据处理
Java并发编程:解锁多线程的潜力
在数字化时代的浪潮中,Java作为一门广泛使用的编程语言,其并发编程能力是提升应用性能和响应速度的关键。本文将带你深入理解Java并发编程的核心概念,探索如何通过多线程技术有效利用计算资源,并实现高效的数据处理。我们将从基础出发,逐步揭开高效并发编程的面纱,让你的程序运行得更快、更稳、更强。
|
7天前
|
Java 开发者
奇迹时刻!探索 Java 多线程的奇幻之旅:Thread 类和 Runnable 接口的惊人对决
【8月更文挑战第13天】Java的多线程特性能显著提升程序性能与响应性。本文通过示例代码详细解析了两种核心实现方式:Thread类与Runnable接口。Thread类适用于简单场景,直接定义线程行为;Runnable接口则更适合复杂的项目结构,尤其在需要继承其他类时,能保持代码的清晰与模块化。理解两者差异有助于开发者在实际应用中做出合理选择,构建高效稳定的多线程程序。
28 7
|
6天前
|
安全 Java 数据库
一天十道Java面试题----第四天(线程池复用的原理------>spring事务的实现方式原理以及隔离级别)
这篇文章是关于Java面试题的笔记,涵盖了线程池复用原理、Spring框架基础、AOP和IOC概念、Bean生命周期和作用域、单例Bean的线程安全性、Spring中使用的设计模式、以及Spring事务的实现方式和隔离级别等知识点。
|
6天前
|
存储 监控 安全
一天十道Java面试题----第三天(对线程安全的理解------>线程池中阻塞队列的作用)
这篇文章是Java面试第三天的笔记,讨论了线程安全、Thread与Runnable的区别、守护线程、ThreadLocal原理及内存泄漏问题、并发并行串行的概念、并发三大特性、线程池的使用原因和解释、线程池处理流程,以及线程池中阻塞队列的作用和设计考虑。
|
3天前
|
存储 缓存 安全
深度剖析Java HashMap:源码分析、线程安全与最佳实践
深度剖析Java HashMap:源码分析、线程安全与最佳实践

热门文章

最新文章

推荐镜像

更多