Java AQS 实现——Condition

本文涉及的产品
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
云解析 DNS,旗舰版 1个月
简介: 本文着重介绍 AQS 的 Condition 实现方式。

引言

本文着重介绍 AQS 的 Condition 实现方式。所有关于 Java 并发的文章均收录于<Java并发系列文章>

Condition

介绍完 AQS 的共享模式和互斥模式后,我们来看一看 AQS 是如何实现条件等待的,即 Condition。在 AQS 中通过 ConditionObject 实现 Condition。从 ConditionObject 的核心数据中,我们会发现它内部也会维护一个 Node 的队列。

public class ConditionObject implements Condition, java.io.Serializable {
    /** First node of condition queue. */
    private transient Node firstWaiter;
    /** Last node of condition queue. */
    private transient Node lastWaiter;
}

那么这个队列和同步队列有什么联系吗?带着这样的疑问我们走读一下在 Condition 上 await 入队的流程。这里我们以 ReentrantLock 的 Condition 实现作为例子。 ReentrantLock 的 newCondition 函数能够创建一个 Condition 对象。内部实际上就是创建了一个 AQS 的 ConditionObject 类。

// ReentrantLock 的 newCondition 函数能够创建一个 Condition 对象
public Condition newCondition() {
    return sync.newCondition();
}
// 内部实际上就是创建了一个 AQS 的 ConditionObject 类
final ConditionObject newCondition() {
    return new ConditionObject();
}

await

我们知道,在使用 Condition 对象时,要先持有对应的锁,然后再执行 Condition 的 await 方法。因为 Condition 内部不提供同步保证,我们需要通过 lock 来保护 Condition 的正确性。

  1. 在 Condition 的 await 函数中,先会将当前线程添加到 Condition 的条件队列中,因为此时持有锁,所以 addConditionWaiter 就是拿到尾指针然后新建一个节点并加入(使用Node的nextWaiter字段保存下一个Node),注意新建节点的状态是 CONDITION
  2. 添加到条件队列后,它才将锁释放,并保存了之前的锁状态,因为在唤醒时,必须要恢复锁状态
  3. 紧接着是一个循环,只要当前节点不处于同步队列中,就通过 park 等待,进入到同步队列中,说明当前节点已经从条件队列移除,并开始了尝试获取锁的过程
  4. 如果从等待状态中恢复,首先要检查一下是不是被中断了,如果是要根据被打断的时间点做出不同的处理

    1. 如果是被 Signal 唤醒前被打断了(当前节点状态是 CONDITION),就将当前节点状态改为 0(表示取消等待),并添加到同步队列中,然后返回 THROW_IE,这意味着最终它会抛出一个异常
    2. 如果是被 Signal 唤醒之后被打断了(当前节点状态不是 CONDITION,说明已经进入了或即将进入同步队列),这里就通过自旋等待进入同步队列即可,CONDITION 代表当前处于条件队列,!CONDITION 代表当前处于同步队列,最后返回了 REINTERRUPT,意味着只会设置线程的中断标志位,不会抛出异常
  5. 当前节点处于等待队列中时(有可能被Signal唤醒也有可能被打断),开始调用 acquireQueued 等待获取锁
  6. 成功获取锁之后,进行简单地清理工作,然后如果当前线程被打断,要根据情况做出不动处理

    1. Signal 唤醒前被打断了:抛出 InterruptedException
    2. Signal 唤醒之后被打断: 记录中断标志位
public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    // 先会将当前线程添加到 Condition 的**条件队列**中
    Node node = addConditionWaiter();
    // 添加到**条件队列**后,它才将锁释放,并保存了之前的锁状态,因为在唤醒时,必须要恢复锁状态
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // 只要不处于同步队列中,就通过 park 等待
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        // 如果被打断了,就将当前线程加入到同步队列中,因为从 await 返回时,无论如何我们也要获取到锁,被打断和被 Signal 都一样
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 走到这说明当前节点处于等待队列中时(无论是被Signal唤醒还是被打断),开始调用 acquireQueued 等待获取锁
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

/**
 * Adds a new waiter to wait queue.
 * @return its new wait node
 */
private Node addConditionWaiter() {
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out. 清除无用的节点
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    // 因为此时持有锁,所以 addConditionWaiter 就是拿到尾指针然后新建一个节点并加入(使用Node的nextWaiter字段保存下一个Node)
    // 注意新建节点的状态是 CONDITION
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}
/**
 * Unlinks cancelled waiter nodes from condition queue.
 * Called only while holding lock. This is called when
 * cancellation occurred during condition wait, and upon
 * insertion of a new waiter when lastWaiter is seen to have
 * been cancelled. This method is needed to avoid garbage
 * retention in the absence of signals. So even though it may
 * require a full traversal, it comes into play only when
 * timeouts or cancellations occur in the absence of
 * signals. It traverses all nodes rather than stopping at a
 * particular target to unlink all pointers to garbage nodes
 * without requiring many re-traversals during cancellation
 * storms.
 */
private void unlinkCancelledWaiters() {
    Node t = firstWaiter;
    Node trail = null;
    while (t != null) {
        Node next = t.nextWaiter;
        // 状态不等于 CONDITION 代表了已经被取消或者已经被Signal唤醒,将其从条件队列中清楚
        if (t.waitStatus != Node.CONDITION) {
            t.nextWaiter = null;
            if (trail == null)
                firstWaiter = next;
            else
                trail.nextWaiter = next;
            if (next == null)
                lastWaiter = trail;
        }
        else
            trail = t;
        t = next;
    }
}
/**
 * Returns true if a node, always one that was initially placed on
 * a condition queue, is now waiting to reacquire on sync queue.
 * @param node the node
 * @return true if is reacquiring
 */
final boolean isOnSyncQueue(Node node) {
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    if (node.next != null) // If has successor, it must be on queue
        return true;
    /*
     * node.prev can be non-null, but not yet on queue because
     * the CAS to place it on queue can fail. So we have to
     * traverse from tail to make sure it actually made it.  It
     * will always be near the tail in calls to this method, and
     * unless the CAS failed (which is unlikely), it will be
     * there, so we hardly ever traverse much.
     */
    return findNodeFromTail(node);
}
/**
 * Checks for interrupt, returning THROW_IE if interrupted
 * before signalled, REINTERRUPT if after signalled, or
 * 0 if not interrupted.
 */
private int checkInterruptWhileWaiting(Node node) {
    return Thread.interrupted() ? // 检查是否被打断
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
        0;
}
/**
 * Transfers node, if necessary, to sync queue after a cancelled wait.
 * Returns true if thread was cancelled before being signalled.
 *
 * @param node the node
 * @return true if cancelled before the node was signalled
 */
final boolean transferAfterCancelledWait(Node node) {
    // 如果是被 Signal 唤醒前被打断了(当前节点状态是 CONDITION),就将当前节点状态改为 0(表示取消等待),并添加到**同步队列**中,然后返回 true,这意味着最终 await 会抛出一个 InterruptedException
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
        enq(node);
        return true;
    }
    /*
     * If we lost out to a signal(), then we can't proceed
     * until it finishes its enq().  Cancelling during an
     * incomplete transfer is both rare and transient, so just
     * spin.
     */
    // 如果是被 Signal 唤醒之后被打断了(当前节点状态不是 CONDITION,说明已经进入了或即将进入同步队列),这里就通过自旋等待进入同步队列即可
    // 最后返回了 false,意味着 await 只会设置线程的中断标志位,不会抛出 InterruptedException
    while (!isOnSyncQueue(node))
        Thread.yield();
    return false;
}

/**
 * Inserts node into queue, initializing if necessary. See picture above.
 * @param node the node to insert
 * @return node's predecessor
 */
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}
/**
 * Throws InterruptedException, reinterrupts current thread, or
 * does nothing, depending on mode.
 */
private void reportInterruptAfterWait(int interruptMode)
    throws InterruptedException {
    if (interruptMode == THROW_IE)
        throw new InterruptedException();
    else if (interruptMode == REINTERRUPT)
        selfInterrupt();
}

signal

接下来,我们看一下通知signal的实现逻辑。

  1. 我们知道调用 signal 时,也是要求持有锁 lock 的,所以在 signal 中首先会判断是不是当前线程持有锁
  2. 然后检查等待队列是不是空,如果不为空,则唤醒第一个线程,唤醒线程的逻辑也很简单

    1. 首先通过修改 firstWaiter 指针,将条件队列中的队首结点排除
    2. 然后将刚才移除的节点移动到同步队列中,因为在这个过程中可能该节点对应的线程被打断,所以当发生这种情况时,我们需要唤醒队列中的下一个节点,直到成功将一个节点移动到同步队列中或者条件队列为空

      1. 怎么判断一个线程的 await 被打断了呢?就像前面所说的只要节点的状态不是 CONDITION 那它就已经被取消
      2. 将节点添加到同步队列中还不够,我们需要修改前序节点的状态,使其状态为 SIGNAL 这样才能放心,所以在 transferForSignal 的最后检查了前序节点的状态如果发现该节点已经被取消,或者修改 SIGNAL 失败,就主动唤醒该节点,因为唤醒之后该节点的线程会进入到 acquireQueued 函数中,那里有更完备的删除取消节点,加锁以及等待的处理流程
/**
 * Moves the longest-waiting thread, if one exists, from the
 * wait queue for this condition to the wait queue for the
 * owning lock.
 *
 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
 *         returns {@code false}
 */
public final void signal() {
    // 我们知道调用 signal 时,也是要求持有锁 lock 的,所以在 signal 中首先会判断是不是当前线程持有锁
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

/**
 * Removes and transfers nodes until hit non-cancelled one or
 * null. Split out from signal in part to encourage compilers
 * to inline the case of no waiters.
 * @param first (non-null) the first node on condition queue
 */
private void doSignal(Node first) {
    do {
        // 首先通过修改 firstWaiter 指针,将条件队列中的队首结点排除
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
        // 然后将刚才移除的节点移动到同步队列中,因为在这个过程中可能该节点对应的线程被打断,所以当发生这种情况时,我们需要唤醒队列中的下一个节点
        // 直到成功将一个节点移动到同步队列中或者条件队列为空为止
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

/**
 * Transfers a node from a condition queue onto sync queue.
 * Returns true if successful.
 * @param node the node
 * @return true if successfully transferred (else the node was
 * cancelled before signal)
 */
final boolean transferForSignal(Node node) {
    /*
     * If cannot change waitStatus, the node has been cancelled.
     * 只要节点的状态不是 CONDITION 那它就已经被取消
     */
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    /*
     * Splice onto queue and try to set waitStatus of predecessor to
     * indicate that thread is (probably) waiting. If cancelled or
     * attempt to set waitStatus fails, wake up to resync (in which
     * case the waitStatus can be transiently and harmlessly wrong).
     */
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

/**
 * Inserts node into queue, initializing if necessary. See picture above.
 * @param node the node to insert
 * @return node's predecessor
 */
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

最后,我们简单地说一下 signalAll 的实现,它和 signal 的实现基本相同,只不过它会对条件队列中的每一个节点执行 transferForSignal 函数。

/**
 * Moves all threads from the wait queue for this condition to
 * the wait queue for the owning lock.
 *
 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
 *         returns {@code false}
 */
public final void signalAll() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignalAll(first);
}
/**
 * Removes and transfers all nodes.
 * @param first (non-null) the first node on condition queue
 */
private void doSignalAll(Node first) {
    lastWaiter = firstWaiter = null;
    do {
        Node next = first.nextWaiter;
        first.nextWaiter = null;
        transferForSignal(first);
        first = next;
    } while (first != null);
}

文章说明

更多有价值的文章均收录于贝贝猫的文章目录

stun

版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!

创作声明: 本文基于下列所有参考内容进行创作,其中可能涉及复制、修改或者转换,图片均来自网络,如有侵权请联系我,我会第一时间进行删除。

参考内容

[1] linux 2.6 互斥锁的实现-源码分析
[2] 深入解析条件变量(condition variables)
[3] Linux下Condition Vairable和Mutext合用的小细节
[4] 从ReentrantLock的实现看AQS的原理及应用
[5] 不可不说的Java“锁”事
[6] 从源码层面解析yield、sleep、wait、park
[7] LockSupport中的park与unpark原理
[8] Thread.sleep、Object.wait、LockSupport.park 区别
[9] 从AQS到futex-二-JVM的Thread和Parker
[10] Java的LockSupport.park()实现分析
[11] JVM源码分析之Object.wait/notify实现
[12] Java线程源码解析之interrupt
[13] Thread.interrupt()相关源码分析%E7%9B%B8%E5%85%B3%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90/)
[14] Java CAS 原理剖析
[15] 源码解析 Java 的 compareAndSwapObject 到底比较的是什么
[16] 《Java并发编程的艺术》
[17] 《实战 Java 高并发程序设计》
[18] volatile关键字深入学习
[19] 为什么Netty的FastThreadLocal速度快
[20] 线程池ThreadPoolExecutor实现原理
[21] 深入理解Java线程池:ThreadPoolExecutor
[22] ConcurrentHashMap 详解一
[23] ConcurrentHashMap 详解二
[24] JUC中Atomic class之lazySet的一点疑惑
[25] The JSR-133 Cookbook for Compiler Writers
[26] 就是要你懂Java中volatile关键字实现原理

相关文章
|
3月前
|
Java API 容器
JAVA并发编程系列(10)Condition条件队列-并发协作者
本文通过一线大厂面试真题,模拟消费者-生产者的场景,通过简洁的代码演示,帮助读者快速理解并复用。文章还详细解释了Condition与Object.wait()、notify()的区别,并探讨了Condition的核心原理及其实现机制。
|
3月前
|
存储 Java
JAVA并发编程AQS原理剖析
很多小朋友面试时候,面试官考察并发编程部分,都会被问:说一下AQS原理。面对并发编程基础和面试经验,专栏采用通俗简洁无废话无八股文方式,已陆续梳理分享了《一文看懂全部锁机制》、《JUC包之CAS原理》、《volatile核心原理》、《synchronized全能王的原理》,希望可以帮到大家巩固相关核心技术原理。今天我们聊聊AQS....
|
4月前
|
开发者 C# 存储
WPF开发者必读:资源字典应用秘籍,轻松实现样式与模板共享,让你的WPF应用更上一层楼!
【8月更文挑战第31天】在WPF开发中,资源字典是一种强大的工具,用于共享样式、模板、图像等资源,提高了应用的可维护性和可扩展性。本文介绍了资源字典的基础知识、创建方法及最佳实践,并通过示例展示了如何在项目中有效利用资源字典,实现资源的重用和动态绑定。
116 0
|
4月前
|
Java 开发者
解锁Java并发编程的秘密武器!揭秘AQS,让你的代码从此告别‘锁’事烦恼,多线程同步不再是梦!
【8月更文挑战第25天】AbstractQueuedSynchronizer(AQS)是Java并发包中的核心组件,作为多种同步工具类(如ReentrantLock和CountDownLatch等)的基础。AQS通过维护一个表示同步状态的`state`变量和一个FIFO线程等待队列,提供了一种高效灵活的同步机制。它支持独占式和共享式两种资源访问模式。内部使用CLH锁队列管理等待线程,当线程尝试获取已持有的锁时,会被放入队列并阻塞,直至锁被释放。AQS的巧妙设计极大地丰富了Java并发编程的能力。
50 0
|
5月前
|
Java
Java中的线程通信:wait、notify与Condition详解
Java中的线程通信:wait、notify与Condition详解
|
6月前
|
Java 调度 开发者
揭秘Java并发包(JUC)的基石:AQS原理和应用
揭秘Java并发包(JUC)的基石:AQS原理和应用
|
6月前
|
安全 Java
Java 并发编程之AQS
Java 并发编程之AQS
179 0
|
6月前
|
监控 安全 Java
Java中的锁(Lock、重入锁、读写锁、队列同步器、Condition)
Java中的锁(Lock、重入锁、读写锁、队列同步器、Condition)
33 0
|
7月前
|
存储 安全 算法
掌握Java并发编程:Lock、Condition与并发集合
掌握Java并发编程:Lock、Condition与并发集合
55 0
|
7月前
|
搜索推荐 Java
[Java探索者之路] Java中的AbstractQueuedSynchronizer(AQS)简介
[Java探索者之路] Java中的AbstractQueuedSynchronizer(AQS)简介