Java AQS 实现——共享模式

简介: 本文着重介绍 AQS 的共享模式的实现方式。

title:
date: 2020-06-09 18:46:10
tags:

  • Java
  • Concurrent
  • AQS

categories:

  • Java
  • Concurrent

引言

本文着重介绍 AQS 的共享模式的实现方式。所有关于 Java 并发的文章均收录于<Java并发系列文章>

共享模式

通过 AQS 不仅能够实现排他锁,而且也能够实现共享锁,就像前面介绍过的 ReentrantReadWriteLock,其中读锁就是共享锁,这部分是通过 AQS 的共享模式实现的。注意:这里我们只是以 ReentrantReadWriteLock 作为一个例子,这并不是说要想使用 AQS 的共享模式,就一定要同时使用它的排他模式,AQS 是很灵活的,你可以只使用它的共享模式,比如 CountDownLatch,也可以只使用它的排他模式,比如 ReentrantLock。

获取共享资源

以共享模式访问资源的入口函数是 acquireShared, 它内部会调用抽象函数 tryAcquireShared(需要子类覆写),如果返回0或正数说明成功获取资源,否则,当前线程要入队等待。

/**
 * Acquires in shared mode, ignoring interrupts.  Implemented by
 * first invoking at least once {@link #tryAcquireShared},
 * returning on success.  Otherwise the thread is queued, possibly
 * repeatedly blocking and unblocking, invoking {@link
 * #tryAcquireShared} until success.
 *
 * @param arg the acquire argument.  This value is conveyed to
 *        {@link #tryAcquireShared} but is otherwise uninterpreted
 *        and can represent anything you like.
 */
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

关于 tryAcquireShared 的实现方式,我们这里就不再举例了,这里说一说入队等待部分,虽然这部分和互斥模式的入队等待大致相同,但是还是有一些要注意的地方。

  1. 首先共享模式的同步队列入队时,入队操作和排他模式一样,只不过这里 node 的 nextWaiter 指向了 Node.SHARED,表明该 Node 是一个共享模式节点
  2. 然后同样进入一个循环,先检查自己是不是排在队列头,是的话就尝试获取锁,如果成功了则通过 setHeadAndPropagate 修改head节点,这和排他模式一样,但是除此之外,setHeadAndPropagate 还要在资源充足的时候唤醒其他线程,而排他模式只会在 release 的时候执行唤醒逻辑。细心的同学会发现在获取共享锁的时候,虽然中断不会停止获取锁的过程,但是在最后获取锁之后将设置中断标志位,这是和排他模式的一个区别,之所以这么做是因为 acquireShared 没有返回值,我们需要通过中断标志位来描述该状态。
  3. 如果获取共享资源失败,则进入队列等待,在进入等待状态前,需要将前序节点的 waitStatus 改为 SIGNAL
  4. 最后如果获取锁失败并退出循环时,说明当前请求被取消,将其从队列中移除,并唤醒后续节点
/**
 * Acquires in shared uninterruptible mode.
 * @param arg the acquire argument
 */
private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                // 自己是队列中的第1个节点,head 是虚节点,我们把它排除
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            // 不是第一个节点,则需要将前序节点的状态改为 SIGNAL 然后重试 cas,如果当前已经是 SIGNAL 状态,则进入等待状态
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                // 如果因为中断而退出等待,就记录在 flag 中,这都和前面排他模式一样
                interrupted = true;
        }
    } finally {
        if (failed)
            // 执行出队并看情况是不是要唤醒后续节点
            cancelAcquire(node);
    }
}

在 setHeadAndPropagate 中,首先是修改了头结点,让当前节点接替之前头结点的角色,这和排他模式一样。除此之外,共享模式还要唤醒其他线程,为什么会出现这种情况呢?因为共享模式使用的是共享资源,这就意味着可能同一时刻多个线程可以同时获取该资源,那么什么场景下会出现在获取资源时需要唤醒其他等待中的共享资源请求呢?假设我们需要用 AQS 实现一个读写锁,首先我们先加一个写锁,之后多个线程获取读锁,因为读锁和写锁时互斥的,所以这些获取读锁的线程都会入队,当写锁被释放时,它只会唤醒队列中的第一个线程,而后续的读锁获取请求仍然处于等待状态,这就出现了读-读互斥的效果。这时候,第一个读锁请求成功拿到了读锁,并且发现还有剩余资源,这就需要唤醒下一个尝试读锁的线程。

总结一下,当出现如下情况时,我们需要队列中唤醒下一个线程:

  1. propagate > 0: 有剩余资源
  2. 无论重置头结点之前,还是重置之后,只要头结点的 waitStatus 小于0(可能的状态 SIGNAL CONDITION PROPAGATE),我们就得看看是不是要唤醒其他节点了, 这个比较复杂,对应的是多个线程一起释放的情况,等下我们结合 PROPAGATE 状态一起说。
  3. 虽然满足了上述2个条件,但是我们这里要排除一个例外的情况,即同步队列的内部状态为 Shared->Exclusive->Shared 的情况。这里我们需要判断下一个节点是不是排他模式的节点,如果不是再进行唤醒。这里大家可能会发现 next == null时也会执行唤醒,这是因为正向指针可能还没同步过来,我们也不确定下一个是不是共享模式的节点,不妨试一试唤醒,反正也不影响正确性。
/**
 * Sets head of queue, and checks if successor may be waiting
 * in shared mode, if so propagating if either propagate > 0 or
 * PROPAGATE status was set.
 */
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);
    /*
     * Try to signal next queued node if:
     *   Propagation was indicated by caller,
     *     or was recorded (as h.waitStatus either before
     *     or after setHead) by a previous operation
     *     (note: this uses sign-check of waitStatus because
     *      PROPAGATE status may transition to SIGNAL.)
     * and
     *   The next node is waiting in shared mode,
     *     or we don't know, because it appears null
     *
     * The conservatism in both of these checks may cause
     * unnecessary wake-ups, but only when there are multiple
     * racing acquires/releases, so most need signals now or soon
     * anyway.
     */
    // propagate > 0: 有剩余资源
    // 无论重置头结点之前,还是重置之后,只要头结点的 waitStatus 小于0,我们就得看看是不是要唤醒其他节点了
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

那么 doReleaseShared 都做了什么呢?

  1. 首先 doReleaseShared 是工作在一个循环中的,这是为了防止 CAS 失败,要进行重试,并不是用作循环唤醒
  2. 如果当前头结点的状态是 SIGNAL 就唤醒下一个节点并将 waitStatus 改为0, 如果期间 head 没有发生变化就返回
  3. 如果当前节点状态是 0,就将其修改为 PROPAGATE,来确保多个获取共享资源和释放共享资源一同发生时,可能会出现的后续节点得不到唤醒的问题。
/**
 * Release action for shared mode -- signals successor and ensures
 * propagation. (Note: For exclusive mode, release just amounts
 * to calling unparkSuccessor of head if it needs signal.)
 */
private void doReleaseShared() {
    /*
     * Ensure that a release propagates, even if there are other
     * in-progress acquires/releases.  This proceeds in the usual
     * way of trying to unparkSuccessor of head if it needs
     * signal. But if it does not, status is set to PROPAGATE to
     * ensure that upon release, propagation continues.
     * Additionally, we must loop in case a new node is added
     * while we are doing this. Also, unlike other uses of
     * unparkSuccessor, we need to know if CAS to reset status
     * fails, if so rechecking.
     */
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {// 如果当前头结点的状态是 SIGNAL 就唤醒下一个节点并将 waitStatus 改为0
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 && // 如果当前节点状态是 0,就将其修改为 PROPAGATE,来确保多个获取共享资源和释放共享资源一同发生时,可能会出现的后续节点得不到唤醒的问题
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

PROPAGATE 存在意义听着有点抽象,我们来举个例子:

  1. 假如我们的共享资源数为 2,这时候 AB 线程持有这两份资源,CD线程因为没有获取到资源而入队。
  2. 但是这时候,只有C线程将前序节点的状态修改为 SIGNAL,D线程还没来得及做这一步,我们称现在是时间点1,当前剩余资源 state = 0,如下图时间点1。
  3. 紧接着线程 A 进行释放,释放时因为剩余资源大于 0 而唤醒了 C,并将 head 的状态修改为0,这时候对应了下图的时间点2,剩余资源 state = 1。
  4. C 线程苏醒,并执行了获取共享资源的操作,state 变为 0,C 走到了 setHeadAndPropagate 函数中,但是还没有执行 setHead,对应了下图的时间点3。
  5. 这时候 B 线程进行释放,剩余资源 state = 1,但是发现 head 的状态是 0,所以不进行唤醒(这里假设没有使用 PROPAGATE),对应了如下的时间点4。
  6. 最后线程 C,继续往下执行,发现 propagate = 0,同时 head 的状态也是 0,所以不会进入到 doReleaseShared 中,这就会导致线程 B 本来要进行的唤醒操作被跳过了。
  7. C 出队后,线程 D 终于把前序节点的状态改为 SIGNAL,但是因为自己已经是队首节点了,所以别的线程不会唤醒它,对应了如下时间点5。

why-need-propagate-1

而当我们使用 PROPAGATE 时,上述的时间点4,就会变成下图中的状态。因为在 doReleaseShared 是通过循环 CAS 的方式将状态 0 变为 PROPAGATE 的,而且期间如果发现头结点的变动,会重新将新的头结点状态置为 PROPAGATE。也就是说线程 B 执行完时,head 结点的状态必是 PROPAGATE。

why-need-propagate-2

但是这还不够:

  1. 如果线程 C 碰巧在这之后将头结点换掉了,也就是线程 C 的节点变成了新的头结点时,该节点的状态仍然是 0,如果线程 C 仅通过检查新的头结点是否小于 0,就会把之前的 PROPAGATE 遗漏,就如上图的情况。
  2. 而如果线程 C 只检查之前头结点的状态的话,也会出现类似的情况,就是线程 C 换掉头结点之前,头结点的状态是 0,换掉头结点之后,线程 B 将新头结点的状态改为 PROPAGATE,线程 C 也会遗漏 PROPAGATE 状态, 就如下图所示。

why-need-propagate-3

为了 Cover 上述的两种情况,在线程 C 执行 setHeadAndPropagate 时,最后判断是否要唤醒后续节点的条件中,既会检查之前的头结点的状态是否是 PROPAGATE,也会检查之后的头结点(原来的 ThreadC节点)的状态是否是 PROPAGATE。当任意一种情况满足时都会唤醒 ThreadD。

大家再想一下另一种情况,如果线程 B 的释放过程在线程 C 出队之后才开始进行情况,对应了之前的时间点 5。这时候,又要分线程 D 将head 状态改为 SIGNAL 或者 head 的状态是 0,因为 ThreadD 可能还没有来得及修改前序节点的状态。当线程 B 释放时,如果头结点的状态是 SIGNAL,那么它就会唤醒线程 D,因为线程 D 使用park进行等待,所以提前唤醒也不用担心线程 D 会死等,本文的最后会介绍为什么使用 park 不用考虑这样的问题。

而如果头结点状态仍然是 0 呢?虽然线程 B 不会执行唤醒操作了,但是因为线程 D 已经是队首结点了,所以它会尝试获取资源,而不会进行等待,这就不会有问题。

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                // 自己是队列中的第1个节点,head 是虚节点,我们把它排除
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
             // 不是第一个节点,则需要将前序节点的状态改为 SIGNAL 然后重试 cas,如果当前已经是 SIGNAL 状态,则进入等待状态
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                // 如果因为中断而退出等待,就记录在 flag 中,这都和前面排他模式一样
                interrupted = true;
        //...
}

释放共享资源

共享模式的完整释放逻辑实际上很简单,它的核心内容已经刚介绍了就是 doReleaseShared 函数,完整的释放过程不过是先通过 tryReleaseShared 修改 state,然后释放成功再执行 doReleaseShared 罢了。

/**
 * Releases in shared mode.  Implemented by unblocking one or more
 * threads if {@link #tryReleaseShared} returns true.
 *
 * @param arg the release argument.  This value is conveyed to
 *        {@link #tryReleaseShared} but is otherwise uninterpreted
 *        and can represent anything you like.
 * @return the value returned from {@link #tryReleaseShared}
 */
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

文章说明

更多有价值的文章均收录于贝贝猫的文章目录

stun

版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!

创作声明: 本文基于下列所有参考内容进行创作,其中可能涉及复制、修改或者转换,图片均来自网络,如有侵权请联系我,我会第一时间进行删除。

参考内容

[1] linux 2.6 互斥锁的实现-源码分析
[2] 深入解析条件变量(condition variables)
[3] Linux下Condition Vairable和Mutext合用的小细节
[4] 从ReentrantLock的实现看AQS的原理及应用
[5] 不可不说的Java“锁”事
[6] 从源码层面解析yield、sleep、wait、park
[7] LockSupport中的park与unpark原理
[8] Thread.sleep、Object.wait、LockSupport.park 区别
[9] 从AQS到futex-二-JVM的Thread和Parker
[10] Java的LockSupport.park()实现分析
[11] JVM源码分析之Object.wait/notify实现
[12] Java线程源码解析之interrupt
[13] Thread.interrupt()相关源码分析%E7%9B%B8%E5%85%B3%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90/)
[14] Java CAS 原理剖析
[15] 源码解析 Java 的 compareAndSwapObject 到底比较的是什么
[16] 《Java并发编程的艺术》
[17] 《实战 Java 高并发程序设计》
[18] volatile关键字深入学习
[19] 为什么Netty的FastThreadLocal速度快
[21] 深入理解Java线程池:ThreadPoolExecutor
[22] ConcurrentHashMap 详解一
[23] ConcurrentHashMap 详解二
[24] JUC中Atomic class之lazySet的一点疑惑
[25] The JSR-133 Cookbook for Compiler Writers
[26] 就是要你懂Java中volatile关键字实现原理

相关文章
|
11月前
|
设计模式 人工智能 安全
AQS:Java 中悲观锁的底层实现机制
AQS(AbstractQueuedSynchronizer)是Java并发包中实现同步组件的基础工具,支持锁(如ReentrantLock、ReadWriteLock)和线程同步工具类(如CountDownLatch、Semaphore)等。Doug Lea设计AQS旨在抽象基础同步操作,简化同步组件构建。 使用AQS需实现`tryAcquire(int arg)`和`tryRelease(int arg)`方法以获取和释放资源,共享模式还需实现`tryAcquireShared(int arg)`和`tryReleaseShared(int arg)`。
522 32
AQS:Java 中悲观锁的底层实现机制
|
9月前
|
Java 应用服务中间件 Docker
java-web部署模式概述
本文总结了现代 Web 开发中 Spring Boot HTTP 接口服务的常见部署模式,包括 Servlet 与 Reactive 模型、内置与外置容器、物理机 / 容器 / 云环境部署及单体与微服务架构,帮助开发者根据实际场景选择合适的方案。
491 25
|
9月前
|
存储 Java 大数据
Java 大视界 -- Java 大数据在智能家居能源消耗模式分析与节能策略制定中的应用(198)
简介:本文探讨Java大数据技术在智能家居能源消耗分析与节能策略中的应用。通过数据采集、存储与智能分析,构建能耗模型,挖掘用电模式,制定设备调度策略,实现节能目标。结合实际案例,展示Java大数据在智能家居节能中的关键作用。
|
11月前
|
供应链 JavaScript 前端开发
Java基于SaaS模式多租户ERP系统源码
ERP,全称 Enterprise Resource Planning 即企业资源计划。是一种集成化的管理软件系统,它通过信息技术手段,将企业的各个业务流程和资源管理进行整合,以提高企业的运营效率和管理水平,它是一种先进的企业管理理念和信息化管理系统。 适用于小微企业的 SaaS模式多租户ERP管理系统, 采用最新的技术栈开发, 让企业简单上云。专注于小微企业的应用需求,如企业基本的进销存、询价,报价, 采购、销售、MRP生产制造、品质管理、仓库库存管理、财务应收付款, OA办公单据、CRM等。
747 23
|
Java
【源码】【Java并发】【AQS】从ReentrantLock、Semaphore、CutDownLunch、CyclicBarrier看AQS源码
前言 主播觉得,AQS的原理,就是通过这2个队列的协助,实现核心功能,同步队列(CLH队列)和条件队列(Condition队列)。 同步队列(CLH队列) 作用:管理需要获...
228 18
【源码】【Java并发】【AQS】从ReentrantLock、Semaphore、CutDownLunch、CyclicBarrier看AQS源码
|
设计模式 存储 安全
【Java并发】【AQS】适合初学者体质的AQS入门
AQS这是灰常重要的哈,很多JUC下的框架的核心,那都是我们的AQS,所以这里,我们直接开始先研究AQS。 那说到研究AQS,那我们应该,使用开始说起🤓 入门 什么是AQS? AQS(Abst
261 8
【Java并发】【AQS】适合初学者体质的AQS入门
|
11月前
|
缓存 安全 Java
【高薪程序员必看】万字长文拆解Java并发编程!(3-1):并发共享问题的解决与分析
活锁:多个线程相互影响对方退出同步代码块的条件而导致线程一直运行的情况。例如,线程1的退出条件是count=5,而线程2和线程3在其代码块中不断地是count进行自增自减的操作,导致线程1永远运行。内存一致性问题:由于JIT即时编译器对缓存的优化和指令重排等造成的内存可见性和有序性问题,可以通过synchronized,volatile,并发集合类等机制来解决。这里的线程安全是指,多个线程调用它们同一个实例的方法时,是线程安全的,但仅仅能保证当前调用的方法是线程安全的,不同方法之间是线程不安全的。
198 0
|
11月前
|
Java 程序员
【高薪程序员必看】万字长文拆解Java并发编程!(3-2):并发共享问题的解决与分析
wait方法和notify方法都是Object类的方法:让当前获取锁的线程进入waiting状态,并进入waitlist队列:让当前获取锁的线程进入waiting状态,并进入waitlist队列,等待n秒后自动唤醒:在waitlist队列中挑一个线程唤醒:唤醒所有在waitlist队列中的线程它们都是之间协作的手段,只有拥有对象锁的线程才能调用这些方法,否则会出现IllegalMonitorStateException异常park方法和unpark方法是LockSupport类中的方法。
199 0
|
设计模式 XML 存储
【二】设计模式~~~创建型模式~~~工厂方法模式(Java)
文章详细介绍了工厂方法模式(Factory Method Pattern),这是一种创建型设计模式,用于将对象的创建过程委托给多个工厂子类中的某一个,以实现对象创建的封装和扩展性。文章通过日志记录器的实例,展示了工厂方法模式的结构、角色、时序图、代码实现、优点、缺点以及适用环境,并探讨了如何通过配置文件和Java反射机制实现工厂的动态创建。
【二】设计模式~~~创建型模式~~~工厂方法模式(Java)
|
设计模式 XML Java
【一】设计模式~~~创建型模式~~~简单工厂模式(Java)
文章详细介绍了简单工厂模式(Simple Factory Pattern),这是一种创建型设计模式,用于根据输入参数的不同返回不同类的实例,而客户端不需要知道具体类名。文章通过图表类的实例,展示了简单工厂模式的结构、时序图、代码实现、优缺点以及适用环境,并提供了Java代码示例和扩展应用,如通过配置文件读取参数来实现对象的创建。
【一】设计模式~~~创建型模式~~~简单工厂模式(Java)
下一篇
开通oss服务