Java AQS 实现——共享模式

本文涉及的产品
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: 本文着重介绍 AQS 的共享模式的实现方式。

title:
date: 2020-06-09 18:46:10
tags:

  • Java
  • Concurrent
  • AQS

categories:

  • Java
  • Concurrent

引言

本文着重介绍 AQS 的共享模式的实现方式。所有关于 Java 并发的文章均收录于<Java并发系列文章>

共享模式

通过 AQS 不仅能够实现排他锁,而且也能够实现共享锁,就像前面介绍过的 ReentrantReadWriteLock,其中读锁就是共享锁,这部分是通过 AQS 的共享模式实现的。注意:这里我们只是以 ReentrantReadWriteLock 作为一个例子,这并不是说要想使用 AQS 的共享模式,就一定要同时使用它的排他模式,AQS 是很灵活的,你可以只使用它的共享模式,比如 CountDownLatch,也可以只使用它的排他模式,比如 ReentrantLock。

获取共享资源

以共享模式访问资源的入口函数是 acquireShared, 它内部会调用抽象函数 tryAcquireShared(需要子类覆写),如果返回0或正数说明成功获取资源,否则,当前线程要入队等待。

/**
 * Acquires in shared mode, ignoring interrupts.  Implemented by
 * first invoking at least once {@link #tryAcquireShared},
 * returning on success.  Otherwise the thread is queued, possibly
 * repeatedly blocking and unblocking, invoking {@link
 * #tryAcquireShared} until success.
 *
 * @param arg the acquire argument.  This value is conveyed to
 *        {@link #tryAcquireShared} but is otherwise uninterpreted
 *        and can represent anything you like.
 */
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

关于 tryAcquireShared 的实现方式,我们这里就不再举例了,这里说一说入队等待部分,虽然这部分和互斥模式的入队等待大致相同,但是还是有一些要注意的地方。

  1. 首先共享模式的同步队列入队时,入队操作和排他模式一样,只不过这里 node 的 nextWaiter 指向了 Node.SHARED,表明该 Node 是一个共享模式节点
  2. 然后同样进入一个循环,先检查自己是不是排在队列头,是的话就尝试获取锁,如果成功了则通过 setHeadAndPropagate 修改head节点,这和排他模式一样,但是除此之外,setHeadAndPropagate 还要在资源充足的时候唤醒其他线程,而排他模式只会在 release 的时候执行唤醒逻辑。细心的同学会发现在获取共享锁的时候,虽然中断不会停止获取锁的过程,但是在最后获取锁之后将设置中断标志位,这是和排他模式的一个区别,之所以这么做是因为 acquireShared 没有返回值,我们需要通过中断标志位来描述该状态。
  3. 如果获取共享资源失败,则进入队列等待,在进入等待状态前,需要将前序节点的 waitStatus 改为 SIGNAL
  4. 最后如果获取锁失败并退出循环时,说明当前请求被取消,将其从队列中移除,并唤醒后续节点
/**
 * Acquires in shared uninterruptible mode.
 * @param arg the acquire argument
 */
private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                // 自己是队列中的第1个节点,head 是虚节点,我们把它排除
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            // 不是第一个节点,则需要将前序节点的状态改为 SIGNAL 然后重试 cas,如果当前已经是 SIGNAL 状态,则进入等待状态
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                // 如果因为中断而退出等待,就记录在 flag 中,这都和前面排他模式一样
                interrupted = true;
        }
    } finally {
        if (failed)
            // 执行出队并看情况是不是要唤醒后续节点
            cancelAcquire(node);
    }
}

在 setHeadAndPropagate 中,首先是修改了头结点,让当前节点接替之前头结点的角色,这和排他模式一样。除此之外,共享模式还要唤醒其他线程,为什么会出现这种情况呢?因为共享模式使用的是共享资源,这就意味着可能同一时刻多个线程可以同时获取该资源,那么什么场景下会出现在获取资源时需要唤醒其他等待中的共享资源请求呢?假设我们需要用 AQS 实现一个读写锁,首先我们先加一个写锁,之后多个线程获取读锁,因为读锁和写锁时互斥的,所以这些获取读锁的线程都会入队,当写锁被释放时,它只会唤醒队列中的第一个线程,而后续的读锁获取请求仍然处于等待状态,这就出现了读-读互斥的效果。这时候,第一个读锁请求成功拿到了读锁,并且发现还有剩余资源,这就需要唤醒下一个尝试读锁的线程。

总结一下,当出现如下情况时,我们需要队列中唤醒下一个线程:

  1. propagate > 0: 有剩余资源
  2. 无论重置头结点之前,还是重置之后,只要头结点的 waitStatus 小于0(可能的状态 SIGNAL CONDITION PROPAGATE),我们就得看看是不是要唤醒其他节点了, 这个比较复杂,对应的是多个线程一起释放的情况,等下我们结合 PROPAGATE 状态一起说。
  3. 虽然满足了上述2个条件,但是我们这里要排除一个例外的情况,即同步队列的内部状态为 Shared->Exclusive->Shared 的情况。这里我们需要判断下一个节点是不是排他模式的节点,如果不是再进行唤醒。这里大家可能会发现 next == null时也会执行唤醒,这是因为正向指针可能还没同步过来,我们也不确定下一个是不是共享模式的节点,不妨试一试唤醒,反正也不影响正确性。
/**
 * Sets head of queue, and checks if successor may be waiting
 * in shared mode, if so propagating if either propagate > 0 or
 * PROPAGATE status was set.
 */
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);
    /*
     * Try to signal next queued node if:
     *   Propagation was indicated by caller,
     *     or was recorded (as h.waitStatus either before
     *     or after setHead) by a previous operation
     *     (note: this uses sign-check of waitStatus because
     *      PROPAGATE status may transition to SIGNAL.)
     * and
     *   The next node is waiting in shared mode,
     *     or we don't know, because it appears null
     *
     * The conservatism in both of these checks may cause
     * unnecessary wake-ups, but only when there are multiple
     * racing acquires/releases, so most need signals now or soon
     * anyway.
     */
    // propagate > 0: 有剩余资源
    // 无论重置头结点之前,还是重置之后,只要头结点的 waitStatus 小于0,我们就得看看是不是要唤醒其他节点了
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

那么 doReleaseShared 都做了什么呢?

  1. 首先 doReleaseShared 是工作在一个循环中的,这是为了防止 CAS 失败,要进行重试,并不是用作循环唤醒
  2. 如果当前头结点的状态是 SIGNAL 就唤醒下一个节点并将 waitStatus 改为0, 如果期间 head 没有发生变化就返回
  3. 如果当前节点状态是 0,就将其修改为 PROPAGATE,来确保多个获取共享资源和释放共享资源一同发生时,可能会出现的后续节点得不到唤醒的问题。
/**
 * Release action for shared mode -- signals successor and ensures
 * propagation. (Note: For exclusive mode, release just amounts
 * to calling unparkSuccessor of head if it needs signal.)
 */
private void doReleaseShared() {
    /*
     * Ensure that a release propagates, even if there are other
     * in-progress acquires/releases.  This proceeds in the usual
     * way of trying to unparkSuccessor of head if it needs
     * signal. But if it does not, status is set to PROPAGATE to
     * ensure that upon release, propagation continues.
     * Additionally, we must loop in case a new node is added
     * while we are doing this. Also, unlike other uses of
     * unparkSuccessor, we need to know if CAS to reset status
     * fails, if so rechecking.
     */
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {// 如果当前头结点的状态是 SIGNAL 就唤醒下一个节点并将 waitStatus 改为0
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 && // 如果当前节点状态是 0,就将其修改为 PROPAGATE,来确保多个获取共享资源和释放共享资源一同发生时,可能会出现的后续节点得不到唤醒的问题
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

PROPAGATE 存在意义听着有点抽象,我们来举个例子:

  1. 假如我们的共享资源数为 2,这时候 AB 线程持有这两份资源,CD线程因为没有获取到资源而入队。
  2. 但是这时候,只有C线程将前序节点的状态修改为 SIGNAL,D线程还没来得及做这一步,我们称现在是时间点1,当前剩余资源 state = 0,如下图时间点1。
  3. 紧接着线程 A 进行释放,释放时因为剩余资源大于 0 而唤醒了 C,并将 head 的状态修改为0,这时候对应了下图的时间点2,剩余资源 state = 1。
  4. C 线程苏醒,并执行了获取共享资源的操作,state 变为 0,C 走到了 setHeadAndPropagate 函数中,但是还没有执行 setHead,对应了下图的时间点3。
  5. 这时候 B 线程进行释放,剩余资源 state = 1,但是发现 head 的状态是 0,所以不进行唤醒(这里假设没有使用 PROPAGATE),对应了如下的时间点4。
  6. 最后线程 C,继续往下执行,发现 propagate = 0,同时 head 的状态也是 0,所以不会进入到 doReleaseShared 中,这就会导致线程 B 本来要进行的唤醒操作被跳过了。
  7. C 出队后,线程 D 终于把前序节点的状态改为 SIGNAL,但是因为自己已经是队首节点了,所以别的线程不会唤醒它,对应了如下时间点5。

why-need-propagate-1

而当我们使用 PROPAGATE 时,上述的时间点4,就会变成下图中的状态。因为在 doReleaseShared 是通过循环 CAS 的方式将状态 0 变为 PROPAGATE 的,而且期间如果发现头结点的变动,会重新将新的头结点状态置为 PROPAGATE。也就是说线程 B 执行完时,head 结点的状态必是 PROPAGATE。

why-need-propagate-2

但是这还不够:

  1. 如果线程 C 碰巧在这之后将头结点换掉了,也就是线程 C 的节点变成了新的头结点时,该节点的状态仍然是 0,如果线程 C 仅通过检查新的头结点是否小于 0,就会把之前的 PROPAGATE 遗漏,就如上图的情况。
  2. 而如果线程 C 只检查之前头结点的状态的话,也会出现类似的情况,就是线程 C 换掉头结点之前,头结点的状态是 0,换掉头结点之后,线程 B 将新头结点的状态改为 PROPAGATE,线程 C 也会遗漏 PROPAGATE 状态, 就如下图所示。

why-need-propagate-3

为了 Cover 上述的两种情况,在线程 C 执行 setHeadAndPropagate 时,最后判断是否要唤醒后续节点的条件中,既会检查之前的头结点的状态是否是 PROPAGATE,也会检查之后的头结点(原来的 ThreadC节点)的状态是否是 PROPAGATE。当任意一种情况满足时都会唤醒 ThreadD。

大家再想一下另一种情况,如果线程 B 的释放过程在线程 C 出队之后才开始进行情况,对应了之前的时间点 5。这时候,又要分线程 D 将head 状态改为 SIGNAL 或者 head 的状态是 0,因为 ThreadD 可能还没有来得及修改前序节点的状态。当线程 B 释放时,如果头结点的状态是 SIGNAL,那么它就会唤醒线程 D,因为线程 D 使用park进行等待,所以提前唤醒也不用担心线程 D 会死等,本文的最后会介绍为什么使用 park 不用考虑这样的问题。

而如果头结点状态仍然是 0 呢?虽然线程 B 不会执行唤醒操作了,但是因为线程 D 已经是队首结点了,所以它会尝试获取资源,而不会进行等待,这就不会有问题。

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                // 自己是队列中的第1个节点,head 是虚节点,我们把它排除
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
             // 不是第一个节点,则需要将前序节点的状态改为 SIGNAL 然后重试 cas,如果当前已经是 SIGNAL 状态,则进入等待状态
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                // 如果因为中断而退出等待,就记录在 flag 中,这都和前面排他模式一样
                interrupted = true;
        //...
}

释放共享资源

共享模式的完整释放逻辑实际上很简单,它的核心内容已经刚介绍了就是 doReleaseShared 函数,完整的释放过程不过是先通过 tryReleaseShared 修改 state,然后释放成功再执行 doReleaseShared 罢了。

/**
 * Releases in shared mode.  Implemented by unblocking one or more
 * threads if {@link #tryReleaseShared} returns true.
 *
 * @param arg the release argument.  This value is conveyed to
 *        {@link #tryReleaseShared} but is otherwise uninterpreted
 *        and can represent anything you like.
 * @return the value returned from {@link #tryReleaseShared}
 */
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

文章说明

更多有价值的文章均收录于贝贝猫的文章目录

stun

版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!

创作声明: 本文基于下列所有参考内容进行创作,其中可能涉及复制、修改或者转换,图片均来自网络,如有侵权请联系我,我会第一时间进行删除。

参考内容

[1] linux 2.6 互斥锁的实现-源码分析
[2] 深入解析条件变量(condition variables)
[3] Linux下Condition Vairable和Mutext合用的小细节
[4] 从ReentrantLock的实现看AQS的原理及应用
[5] 不可不说的Java“锁”事
[6] 从源码层面解析yield、sleep、wait、park
[7] LockSupport中的park与unpark原理
[8] Thread.sleep、Object.wait、LockSupport.park 区别
[9] 从AQS到futex-二-JVM的Thread和Parker
[10] Java的LockSupport.park()实现分析
[11] JVM源码分析之Object.wait/notify实现
[12] Java线程源码解析之interrupt
[13] Thread.interrupt()相关源码分析%E7%9B%B8%E5%85%B3%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90/)
[14] Java CAS 原理剖析
[15] 源码解析 Java 的 compareAndSwapObject 到底比较的是什么
[16] 《Java并发编程的艺术》
[17] 《实战 Java 高并发程序设计》
[18] volatile关键字深入学习
[19] 为什么Netty的FastThreadLocal速度快
[21] 深入理解Java线程池:ThreadPoolExecutor
[22] ConcurrentHashMap 详解一
[23] ConcurrentHashMap 详解二
[24] JUC中Atomic class之lazySet的一点疑惑
[25] The JSR-133 Cookbook for Compiler Writers
[26] 就是要你懂Java中volatile关键字实现原理

相关文章
|
2月前
|
存储 Java
JAVA并发编程AQS原理剖析
很多小朋友面试时候,面试官考察并发编程部分,都会被问:说一下AQS原理。面对并发编程基础和面试经验,专栏采用通俗简洁无废话无八股文方式,已陆续梳理分享了《一文看懂全部锁机制》、《JUC包之CAS原理》、《volatile核心原理》、《synchronized全能王的原理》,希望可以帮到大家巩固相关核心技术原理。今天我们聊聊AQS....
|
2月前
|
存储 Java 开发者
【Java新纪元启航】JDK 22:解锁未命名变量与模式,让代码更简洁,思维更自由!
【9月更文挑战第7天】JDK 22带来的未命名变量与模式匹配的结合,是Java编程语言发展历程中的一个重要里程碑。它不仅简化了代码,提高了开发效率,更重要的是,它激发了我们对Java编程的新思考,让我们有机会以更加自由、更加创造性的方式解决问题。随着Java生态系统的不断演进,我们有理由相信,未来的Java将更加灵活、更加强大,为开发者们提供更加广阔的舞台。让我们携手并进,共同迎接Java新纪元的到来!
61 11
|
2月前
|
设计模式 Java
Java设计模式-工厂方法模式(4)
Java设计模式-工厂方法模式(4)
|
3月前
|
消息中间件 Java
【实战揭秘】如何运用Java发布-订阅模式,打造高效响应式天气预报App?
【8月更文挑战第30天】发布-订阅模式是一种消息通信模型,发送者将消息发布到公共队列,接收者自行订阅并处理。此模式降低了对象间的耦合度,使系统更灵活、可扩展。例如,在天气预报应用中,`WeatherEventPublisher` 类作为发布者收集天气数据并通知订阅者(如 `TemperatureDisplay` 和 `HumidityDisplay`),实现组件间的解耦和动态更新。这种方式适用于事件驱动的应用,提高了系统的扩展性和可维护性。
65 2
|
3月前
|
Java
"揭秘Java IO三大模式:BIO、NIO、AIO背后的秘密!为何AIO成为高并发时代的宠儿,你的选择对了吗?"
【8月更文挑战第19天】在Java的IO编程中,BIO、NIO与AIO代表了三种不同的IO处理机制。BIO采用同步阻塞模型,每个连接需单独线程处理,适用于连接少且稳定的场景。NIO引入了非阻塞性质,利用Channel、Buffer与Selector实现多路复用,提升了效率与吞吐量。AIO则是真正的异步IO,在JDK 7中引入,通过回调或Future机制在IO操作完成后通知应用,适合高并发场景。选择合适的模型对构建高效网络应用至关重要。
77 2
|
3月前
|
设计模式 XML 存储
【二】设计模式~~~创建型模式~~~工厂方法模式(Java)
文章详细介绍了工厂方法模式(Factory Method Pattern),这是一种创建型设计模式,用于将对象的创建过程委托给多个工厂子类中的某一个,以实现对象创建的封装和扩展性。文章通过日志记录器的实例,展示了工厂方法模式的结构、角色、时序图、代码实现、优点、缺点以及适用环境,并探讨了如何通过配置文件和Java反射机制实现工厂的动态创建。
【二】设计模式~~~创建型模式~~~工厂方法模式(Java)
|
3月前
|
设计模式 XML Java
【一】设计模式~~~创建型模式~~~简单工厂模式(Java)
文章详细介绍了简单工厂模式(Simple Factory Pattern),这是一种创建型设计模式,用于根据输入参数的不同返回不同类的实例,而客户端不需要知道具体类名。文章通过图表类的实例,展示了简单工厂模式的结构、时序图、代码实现、优缺点以及适用环境,并提供了Java代码示例和扩展应用,如通过配置文件读取参数来实现对象的创建。
【一】设计模式~~~创建型模式~~~简单工厂模式(Java)
|
2月前
|
JSON Java UED
uniapp:使用DCloud的uni-push推送消息通知(在线模式)java实现
以上展示了使用Java结合DCloud的uni-push进行在线消息推送的基本步骤和实现方法。实际部署时,可能需要依据实际项目的规模,业务场景及用户基数进行必要的调整和优化,确保消息推送机制在保证用户体验的同时也满足业务需求。
160 0
|
3月前
|
开发者 C# 存储
WPF开发者必读:资源字典应用秘籍,轻松实现样式与模板共享,让你的WPF应用更上一层楼!
【8月更文挑战第31天】在WPF开发中,资源字典是一种强大的工具,用于共享样式、模板、图像等资源,提高了应用的可维护性和可扩展性。本文介绍了资源字典的基础知识、创建方法及最佳实践,并通过示例展示了如何在项目中有效利用资源字典,实现资源的重用和动态绑定。
72 0
|
3月前
|
Java 开发者
解锁Java并发编程的秘密武器!揭秘AQS,让你的代码从此告别‘锁’事烦恼,多线程同步不再是梦!
【8月更文挑战第25天】AbstractQueuedSynchronizer(AQS)是Java并发包中的核心组件,作为多种同步工具类(如ReentrantLock和CountDownLatch等)的基础。AQS通过维护一个表示同步状态的`state`变量和一个FIFO线程等待队列,提供了一种高效灵活的同步机制。它支持独占式和共享式两种资源访问模式。内部使用CLH锁队列管理等待线程,当线程尝试获取已持有的锁时,会被放入队列并阻塞,直至锁被释放。AQS的巧妙设计极大地丰富了Java并发编程的能力。
41 0