Java AQS 实现——数据组织

本文涉及的产品
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
云解析 DNS,旗舰版 1个月
简介: 从本篇文章开始,我们将介绍 Java AQS 的实现方式,本文先介绍 AQS 的内部数据是如何组织的,后面的文章中再分别介绍 AQS 的各个部门实现。

引言

从本篇文章开始,我们将介绍 Java AQS 的实现方式,本文先介绍 AQS 的内部数据是如何组织的,后面的文章中再分别介绍 AQS 的各个部门实现。所有关于 Java 并发的文章均收录于<Java并发系列文章>

AQS

通过前面的介绍,大家一定看出来了,上述的各种类型的锁和一些线程控制接口(CountDownLatch 等),最终都是通过 AQS 来实现的,不同之处只在于 tryAcquire 等抽象函数如何实现。从这个角度来看,AQS(AbstractQueuedSynchronizer) 这个基类设计的真的很不错,能够包容各种同步控制方案,并提供了必须的下层依赖:比如阻塞,队列等。接下来我们就来揭开它神秘的面纱。

内部数据

AQS 顾名思义,就是通过队列来组织修改互斥资源的请求。当这个资源空闲时间,那么修改请求可以直接进行,而当这个资源处于锁定状态时,就需要等待,AQS 会将所有等待的请求维护在一个类似于 CLH 的队列中。CLH:Craig、Landin and Hagersten队列,是单向链表,AQS中的队列是CLH变体的虚拟双向队列(FIFO),AQS是通过将每条请求共享资源的线程封装成一个节点来实现锁的分配。主要原理图如下:

clh-v-queue

图中的 state 是一个用 volatile 修饰的 int 变量,它的使用都是通过 CAS 来进行的,而 FIFO 队列完成请求排队的工作,队列的操作也是通过 CAS 来进行的,正因如此该队列的操作才能达到理想的性能要求。

通过 CAS 修改 state 比较容易,大家应该都能理解,但是如果要通过 CAS 维护一个双向队列要怎么做呢?这里我们看一下 AQS 中 CLH 队列的实现。在 AQS 中有两个指针一个指针指向了队列头,一个指向了队列尾。它们都是懒初始化的,也就是说最初都为null。

/**
 * Head of the wait queue, lazily initialized.  Except for
 * initialization, it is modified only via method setHead.  Note:
 * If head exists, its waitStatus is guaranteed not to be
 * CANCELLED.
 */
private transient volatile Node head;

/**
 * Tail of the wait queue, lazily initialized.  Modified only via
 * method enq to add new wait node.
 */
private transient volatile Node tail;

队列中的每个节点,都是一个 Node 实例,该实例的第一个关键字段是 waitState,它表述了当前节点所处的状态,通过 CAS 进行修改:

  • SIGNAL:表示当前节点承担唤醒后继节点的责任
  • CANCELLED:表示当前节点已经超时或者被打断
  • CONDITION:表示当前节点正在 Condition 上等待(通过锁可以创建 Condition 对象)
  • PROPAGATE:只会设置在 head 节点上,用于表明释放共享锁时,需要将这个行为传播到其他节点上,这个我们稍后详细介绍。
static final class Node {
    /** Marker to indicate a node is waiting in shared mode */
    static final Node SHARED = new Node();
    /** Marker to indicate a node is waiting in exclusive mode */
    static final Node EXCLUSIVE = null;

    /** waitStatus value to indicate thread has cancelled */
    static final int CANCELLED =  1;
    /** waitStatus value to indicate successor's thread needs unparking */
    static final int SIGNAL    = -1;
    /** waitStatus value to indicate thread is waiting on condition */
    static final int CONDITION = -2;
    /**
     * waitStatus value to indicate the next acquireShared should
     * unconditionally propagate
     */
    static final int PROPAGATE = -3;

    /**
     * Status field, taking on only the values:
     *   SIGNAL:     The successor of this node is (or will soon be)
     *               blocked (via park), so the current node must
     *               unpark its successor when it releases or
     *               cancels. To avoid races, acquire methods must
     *               first indicate they need a signal,
     *               then retry the atomic acquire, and then,
     *               on failure, block.
     *   CANCELLED:  This node is cancelled due to timeout or interrupt.
     *               Nodes never leave this state. In particular,
     *               a thread with cancelled node never again blocks.
     *   CONDITION:  This node is currently on a condition queue.
     *               It will not be used as a sync queue node
     *               until transferred, at which time the status
     *               will be set to 0. (Use of this value here has
     *               nothing to do with the other uses of the
     *               field, but simplifies mechanics.)
     *   PROPAGATE:  A releaseShared should be propagated to other
     *               nodes. This is set (for head node only) in
     *               doReleaseShared to ensure propagation
     *               continues, even if other operations have
     *               since intervened.
     *   0:          None of the above
     *
     * The values are arranged numerically to simplify use.
     * Non-negative values mean that a node doesn't need to
     * signal. So, most code doesn't need to check for particular
     * values, just for sign.
     *
     * The field is initialized to 0 for normal sync nodes, and
     * CONDITION for condition nodes.  It is modified using CAS
     * (or when possible, unconditional volatile writes).
     */
    volatile int waitStatus;

    /**
     * Link to predecessor node that current node/thread relies on
     * for checking waitStatus. Assigned during enqueuing, and nulled
     * out (for sake of GC) only upon dequeuing.  Also, upon
     * cancellation of a predecessor, we short-circuit while
     * finding a non-cancelled one, which will always exist
     * because the head node is never cancelled: A node becomes
     * head only as a result of successful acquire. A
     * cancelled thread never succeeds in acquiring, and a thread only
     * cancels itself, not any other node.
     */
    volatile Node prev;

    /**
     * Link to the successor node that the current node/thread
     * unparks upon release. Assigned during enqueuing, adjusted
     * when bypassing cancelled predecessors, and nulled out (for
     * sake of GC) when dequeued.  The enq operation does not
     * assign next field of a predecessor until after attachment,
     * so seeing a null next field does not necessarily mean that
     * node is at end of queue. However, if a next field appears
     * to be null, we can scan prev's from the tail to
     * double-check.  The next field of cancelled nodes is set to
     * point to the node itself instead of null, to make life
     * easier for isOnSyncQueue.
     */
    volatile Node next;

    /**
     * The thread that enqueued this node.  Initialized on
     * construction and nulled out after use.
     */
    volatile Thread thread;

    /**
     * Link to next node waiting on condition, or the special
     * value SHARED.  Because condition queues are accessed only
     * when holding in exclusive mode, we just need a simple
     * linked queue to hold nodes while they are waiting on
     * conditions. They are then transferred to the queue to
     * re-acquire. And because conditions can only be exclusive,
     * we save a field by using special value to indicate shared
     * mode.
     */
    Node nextWaiter;

    /**
     * Returns true if node is waiting in shared mode.
     */
    final boolean isShared() {
        return nextWaiter == SHARED;
    }
    //...
}

因为是双向队列,所以 Node 实例中势必有 prev 和 next 指针,此外 Node 中还会保存与其对应的线程。最后是 nextWaiter,当一个节点对应了共享请求时,nextWaiter 指向了 Node. SHARED 而当一个节点是排他请求时,nextWaiter 默认指向了 Node. EXCLUSIVE 也就是 null。我们知道 AQS 也提供了 Condition 功能,该功能就是通过 nextWaiter 来维护在 Condition 上等待的线程。也就是说这里的 nextWaiter 在锁的实现部分中,扮演者共享锁和排它锁的标志位,而在条件等待队列中,充当链表的 next 指针。

同步队列

接下来,我们由最常见的入队操作出发,介绍 AQS 框架的实现与使用。从下面的代码中可以看到入队操作支持两种模式,一种是排他模式,一种是共享模式,分别对应了排它锁场景和共享锁场景。

  1. 当任意一种请求,要入队时,先会构建一个 Node 实例,然后获取当前 AQS 队列的尾结点,如果尾结点为空,就是说队列还没初始化,初始化过程在后面 enq 函数中实现
  2. 这里我们先看初始化之后的情况,即 tail != null,先将当前 Node 的前向指针 prev 更新,然后通过 CAS 将尾结点修改为当前 Node,修改成功时,再更新前一个节点的后向指针 next,因为只有修改尾指针过程是原子的,所以这里会出现新插入一个节点时,之前的尾节点 previousTail 的 next 指针为null的情况,也就是说会存在短暂的正向指针和反向指针不同步的情况,不过在后面的介绍中,你会发现 AQS 很完备地避开了这种不同步带来的风险(通过从后往前遍历)
  3. 如果上述操作成功,则当前线程已经进入同步队列,否则,可能存在多个线程的竞争,其他线程设置尾结点成功了,而当前线程失败了,这时候会和尾结点未初始化一样进入 enq 函数中。
/**
 * Creates and enqueues node for current thread and given mode.
 *
 * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
 * @return the new node
 */
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        // 已经进行了初始化
        node.prev = pred;
        // CAS 修改尾节点
        if (compareAndSetTail(pred, node)) {
            // 成功之后再修改后向指针
            pred.next = node;
            return node;
        }
    }
    // 循环 CAS 过程和初始化过程
    enq(node);
    return node;
}

正常通过 CAS 修改数据都会在一个循环中进行,而这里的 addWaiter 只是在一个 if 中进行,这是为什么呢?实际上,大家看到的 addWaiter 的这部分 CAS 过程是一个快速执行线,在没有竞争时,这种方式能省略不少判断过程。当发生竞争时,会进入 enq 函数中,那里才是循环 CAS 的地方。

  1. 整个 enq 的工作在一个循环中进行
  2. 先会检查是否未进行初始化,是的话,就设置一个虚拟节点 Node 作为 head 和 tail,也就是说同步队列的第一个节点并不保存实际数据,只是一个保存指针的地方
  3. 初始化完成后,通过 CAS 修改尾节点,直到修改成功为止,最后修复后向指针
/**
 * Inserts node into queue, initializing if necessary. See picture above.
 * @param node the node to insert
 * @return node's predecessor
 */
private Node enq(final Node node) {
    for (;;) {// 在一个循环中进行 CAS 操作
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            // CAS 修改尾节点
            if (compareAndSetTail(t, node)) {
                // 成功之后再修改后向指针
                t.next = node;
                return t;
            }
        }

文章说明

更多有价值的文章均收录于贝贝猫的文章目录

stun

版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!

创作声明: 本文基于下列所有参考内容进行创作,其中可能涉及复制、修改或者转换,图片均来自网络,如有侵权请联系我,我会第一时间进行删除。

参考内容

[1] linux 2.6 互斥锁的实现-源码分析
[2] 深入解析条件变量(condition variables)
[3] Linux下Condition Vairable和Mutext合用的小细节
[4] 从ReentrantLock的实现看AQS的原理及应用
[5] 不可不说的Java“锁”事
[6] 从源码层面解析yield、sleep、wait、park
[7] LockSupport中的park与unpark原理
[8] Thread.sleep、Object.wait、LockSupport.park 区别
[9] 从AQS到futex-二-JVM的Thread和Parker
[10] Java的LockSupport.park()实现分析
[11] JVM源码分析之Object.wait/notify实现
[12] Java线程源码解析之interrupt
[13] Thread.interrupt()相关源码分析%E7%9B%B8%E5%85%B3%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90/)
[14] Java CAS 原理剖析
[15] 源码解析 Java 的 compareAndSwapObject 到底比较的是什么
[16] 《Java并发编程的艺术》
[17] 《实战 Java 高并发程序设计》
[18] volatile关键字深入学习
[19] 为什么Netty的FastThreadLocal速度快
[20] 线程池ThreadPoolExecutor实现原理
[21] 深入理解Java线程池:ThreadPoolExecutor
[22] ConcurrentHashMap 详解一
[23] ConcurrentHashMap 详解二
[24] JUC中Atomic class之lazySet的一点疑惑
[25] The JSR-133 Cookbook for Compiler Writers
[26] 就是要你懂Java中volatile关键字实现原理

相关文章
|
17天前
|
前端开发 JavaScript Java
java常用数据判空、比较和类型转换
本文介绍了Java开发中常见的数据处理技巧,包括数据判空、数据比较和类型转换。详细讲解了字符串、Integer、对象、List、Map、Set及数组的判空方法,推荐使用工具类如StringUtils、Objects等。同时,讨论了基本数据类型与引用数据类型的比较方法,以及自动类型转换和强制类型转换的规则。最后,提供了数值类型与字符串互相转换的具体示例。
|
2月前
|
算法 Java 数据处理
从HashSet到TreeSet,Java集合框架中的Set接口及其实现类以其“不重复性”要求,彻底改变了处理唯一性数据的方式。
从HashSet到TreeSet,Java集合框架中的Set接口及其实现类以其“不重复性”要求,彻底改变了处理唯一性数据的方式。HashSet基于哈希表实现,提供高效的元素操作;TreeSet则通过红黑树实现元素的自然排序,适合需要有序访问的场景。本文通过示例代码详细介绍了两者的特性和应用场景。
51 6
|
2月前
|
存储 Java API
深入剖析Java Map:不只是存储数据,更是设计艺术的体现!
【10月更文挑战第17天】在Java编程中,Map是一种重要的数据结构,用于存储键值对,并展现了设计艺术的精髓。本文深入剖析了Map的设计原理和使用技巧,包括基本概念、设计艺术(如哈希表与红黑树的空间时间权衡)、以及使用技巧(如选择合适的实现类、避免空指针异常等),帮助读者更好地理解和应用Map。
110 3
|
24天前
|
JSON Java 程序员
Java|如何用一个统一结构接收成员名称不固定的数据
本文介绍了一种 Java 中如何用一个统一结构接收成员名称不固定的数据的方法。
25 3
|
1月前
|
Java 程序员 容器
Java中的变量和常量:数据的‘小盒子’和‘铁盒子’有啥不一样?
在Java中,变量是一个可以随时改变的数据容器,类似于一个可以反复打开的小盒子。定义变量时需指定数据类型和名称。例如:`int age = 25;` 表示定义一个整数类型的变量 `age`,初始值为25。 常量则是不可改变的数据容器,类似于一个锁死的铁盒子,定义时使用 `final` 关键字。例如:`final int MAX_SPEED = 120;` 表示定义一个名为 `MAX_SPEED` 的常量,值为120,且不能修改。 变量和常量的主要区别在于变量的数据可以随时修改,而常量的数据一旦确定就不能改变。常量主要用于防止意外修改、提高代码可读性和便于维护。
|
1月前
|
存储 缓存 安全
在 Java 编程中,创建临时文件用于存储临时数据或进行临时操作非常常见
在 Java 编程中,创建临时文件用于存储临时数据或进行临时操作非常常见。本文介绍了使用 `File.createTempFile` 方法和自定义创建临时文件的两种方式,详细探讨了它们的使用场景和注意事项,包括数据缓存、文件上传下载和日志记录等。强调了清理临时文件、确保文件名唯一性和合理设置文件权限的重要性。
74 2
|
1月前
|
Java
Java 8 引入的 Streams 功能强大,提供了一种简洁高效的处理数据集合的方式
Java 8 引入的 Streams 功能强大,提供了一种简洁高效的处理数据集合的方式。本文介绍了 Streams 的基本概念和使用方法,包括创建 Streams、中间操作和终端操作,并通过多个案例详细解析了过滤、映射、归并、排序、分组和并行处理等操作,帮助读者更好地理解和掌握这一重要特性。
31 2
|
2月前
|
存储 SQL 小程序
JVM知识体系学习五:Java Runtime Data Area and JVM Instruction (java运行时数据区域和java指令(大约200多条,这里就将一些简单的指令和学习))
这篇文章详细介绍了Java虚拟机(JVM)的运行时数据区域和JVM指令集,包括程序计数器、虚拟机栈、本地方法栈、直接内存、方法区和堆,以及栈帧的组成部分和执行流程。
41 2
JVM知识体系学习五:Java Runtime Data Area and JVM Instruction (java运行时数据区域和java指令(大约200多条,这里就将一些简单的指令和学习))
|
1月前
|
存储 分布式计算 Java
存算分离与计算向数据移动:深度解析与Java实现
【11月更文挑战第10天】随着大数据时代的到来,数据量的激增给传统的数据处理架构带来了巨大的挑战。传统的“存算一体”架构,即计算资源与存储资源紧密耦合,在处理海量数据时逐渐显露出其局限性。为了应对这些挑战,存算分离(Disaggregated Storage and Compute Architecture)和计算向数据移动(Compute Moves to Data)两种架构应运而生,成为大数据处理领域的热门技术。
63 2
|
1月前
|
SQL Java OLAP
java实现“数据平滑升级”
java实现“数据平滑升级”
44 2