深度解析Java8 – AbstractQueuedSynchronizer的实现分析(下)

本文涉及的产品
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
简介:
本文首发在 infoQ    作者:刘锟洋

前言

经过本系列的上半部分JDK1.8 AbstractQueuedSynchronizer的实现分析(上)的解读,相信很多读者已经对AbstractQueuedSynchronizer(下文简称AQS)的独占功能了然于胸,那么,这次我们再借助另一个工具类:CoutDownLatch,换个角度看看AQS的另外一个重要功能——共享功能的实现。

 AQS共享功能的实现

     在开始解读AQS的共享功能前,我们再重温一下CountDownLatch,CountDownLatch为java.util.concurrent包下的计数器工具类,常被用在多线程环境下,它在初始时需要指定一个计数器的大小,然后可被多个线程并发的实现减1操作,并在计数器为0后调用await方法的线程被唤醒,从而实现多线程间的协作。它在多线程环境下的基本使用方式为:


//main thread
      // 新建一个CountDownLatch,并制定一个初始大小
      CountDownLatch countDownLatch = new CountDownLatch(3);
      // 调用await方法后,main线程将阻塞在这里,直到countDownLatch 中的计数为0
      countDownLatch.await();
      System.out.println("over");

     //thread1
     // do something
     //...........
     //调用countDown方法,将计数减1
      countDownLatch.countDown();

     //thread2
     // do something
     //...........
     //调用countDown方法,将计数减1
      countDownLatch.countDown();

       //thread3
     // do something
     //...........
     //调用countDown方法,将计数减1
      countDownLatch.countDown();


     注意,线程thread 1,2,3各自调用 countDown后,countDownLatch 的计数为0,await方法返回,控制台输入“over”,在此之前main thread 会一直沉睡。
      可以看到CountDownLatch的作用类似于一个“栏栅”,在CountDownLatch的计数为0前,调用await方法的线程将一直阻塞,直到CountDownLatch计数为0,await方法才会返回,
     而CountDownLatch的countDown()方法则一般由各个线程调用,实现CountDownLatch计数的减1。
      知道了CountDownLatch的基本使用方式,我们就从上述DEMO的第一行new CountDownLatch(3)开始,看看CountDownLatch是怎么实现的。     
     首先,看下CountDownLatch的构造方法:
      Image
     和ReentrantLock类似,CountDownLatch内部也有一个叫做Sync的内部类,同样也是用它继承了AQS。
     再看下Sync:
     Image [1]
     如果你看过本系列的上半部分,你对setState方法一定不会陌生,它是AQS的一个“状态位”,在不同的场景下,代表不同的含义,比如在ReentrantLock中,表示加锁的次数,在CountDownLatch中,
    则表示CountDownLatch的计数器的初始大小。
     Image [2]
    设置完计数器大小后CountDownLatch的构造方法返回,下面我们再看下CountDownLatch的await()方法:
    Image [3]
    调用了Sync的acquireSharedInterruptibly方法,因为Sync是AQS子类的原因,这里其实是直接调用了AQS的acquireSharedInterruptibly方法:
    Image [4]        
    从方法名上看,这个方法的调用是响应线程的打断的,所以在前两行会检查下线程是否被打断。接着,尝试着获取共享锁,小于0,表示获取失败,通过本系列的上半部分的解读,
   我们知道AQS在获取锁的思路是,先尝试直接获取锁,如果失败会将当前线程放在队列中,按照FIFO的原则等待锁。
    而对于共享锁也是这个思路,如果和独占锁一致,这里的tryAcquireShared应该是个空方法,留给子类去判断:
    Image [5] 
    再看看CountDownLatch:
    Image [6] 
     如果state变成0了,则返回1,表示获取成功,否则返回-1则表示获取失败。
     看到这里,读者可能会发现, await方法的获取方式更像是在获取一个独占锁,那为什么这里还会用tryAcquireShared呢?
     回想下CountDownLatch的await方法是不是只能在主线程中调用?答案是否定的,CountDownLatch的await方法可以在多个线程中调用,当CountDownLatch的计数器为0后,调用await的方法都会依次返回。
     也就是说可以多个线程同时在等待await方法返回,所以它被设计成了实现tryAcquireShared方法,获取的是一个共享锁,锁在所有调用await方法的线程间共享,所以叫共享锁。
 
    回到acquireSharedInterruptibly方法:
    Image [7]
   如果获取共享锁失败(返回了-1,说明state不为0,也就是CountDownLatch的计数器还不为0),进入调用doAcquireSharedInterruptibly方法中,按照我们上述的猜想,应该是要将当前线程放入到队列中去。
  在这之前,我们再回顾一下AQS队列的数据结构:AQS是一个双向链表,通过节点中的next,pre变量分别指向当前节点后一个节点和前一个节点。其中,每个节点中都包含了一个线程和一个类型变量:表示当前节点是独占节点还是共享节点,头节点中的线程为正在占有锁的线程,而后的所有节点的线程表示为正在等待获取锁的线程。如下图所示:
   Image [10] [1]
黄色节点,表示正在获取锁的节点,剩下的蓝色节点(Node1、Node2、Node3)为正在等待锁的节点,他们通过各自的next,pre变量分别指向前后节点,形成了AQS中的双向链表。 
    再看看doAcquireSharedInterruptibly方法:

   private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED); //将当前线程包装为类型为Node.SHARED的节点,标示这是一个共享节点。
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {//如果新建节点的前一个节点,就是Head,说明当前节点是AQS队列中等待获取锁的第一个节点,按照FIFO的原则,可以直接尝试获取锁。
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r); //获取成功,需要将当前节点设置为AQS队列中的第一个节点,这是AQS的规则,队列的头节点表示正在获取锁的节点
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) && //检查下是否需要将当前节点挂起
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }


这里有几点需要说明的:
 1. setHeadAndPropagate方法:
   Image [8]

    首先,使用了CAS更换了头节点,然后,将当前节点的下一个节点取出来,如果同样是“shared”类型的,再做一个”releaseShared”操作。看下doReleaseShared方法:


       for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) //如果当前节点是SIGNAL意味着,它正在等待一个信号,
                                                                                              //或者说,它在等待被唤醒,因此做两件事,
                                                                                              //1是重置waitStatus标志位,2是重置成功后,唤醒下一个节点。
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))  //如果本身头结点的waitStatus是出于重置状态(waitStatus==0)的,将其设置为“传播”状态。意味着需要将状态向后一个节点传播。
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }


 
  为什么要这么做呢?这就是共享功能和独占功能最不一样的地方,对于独占功能来说,有且只有一个线程(通常只对应一个节点,拿ReentantLock举例,如果当前持有锁的线程重复调用lock()方法,
那根据本系列上半部分我们的介绍,我们知道,会被包装成多个节点在AQS的队列中,所以用一个线程来描述更准确),能够获取锁,但是对于共享功能来说。
共享的状态是可以被共享的,也就是意味着其他AQS队列中的其他节点也应能第一时间知道状态的变化。因此,一个节点获取到共享状态流程图是这样的:
      Image [9]
     比如现在有如下队列:
Image [10] [1]
     当Node1调用tryAcquireShared成功后,更换了头节点:
    Image [11]

     Node1变成了头节点然后调用unparkSuccessor()方法唤醒了Node2,Node2中持有的线程A出于上面流程图的park node的位置,

     线程A被唤醒后,重复黄色线条的流程,重新检查调用tryAcquireShared方法,看能否成功,如果成功,则又更改头结点,重复以上步骤,以实现节点自身获取共享锁成功后,唤醒下一个共享类型结点的操作,实现共享状态的向后传递。

 2.其实对于doAcquireShared方法,AQS还提供了集中类似的实现:

   Image [12]

 分别对应了:

 1. 带参数请求共享锁。 (忽略中断)

 2. 带参数请求共享锁,且响应中断。(每次循环时,会检查当前线程的中断状态,以实现对线程中断的响应)

 3. 带参数请求共享锁但是限制等待时间。(第二个参数设置超时时间,超出时间后,方法返回。)

比较特别的为最后一个doAcquireSharedNanos方法,我们一起看下它怎么实现超时时间的控制的。

Image [13]

因为该方法和其余获取共享锁的方法逻辑是类似的,我用红色框圈出了它所不一样的地方,也就是实现超时时间控制的地方。

可以看到,其实就是在进入方法时,计算出了一个“deadline”,每次循环的时候用当前时间和“deadline”比较,大于“dealine”说明超时时间已到,直接返回方法。

注意,最后一个红框中的这行代码:

    nanosTimeout > spinForTimeoutThreshold

从变量的字面意思可知,这是拿超时时间和超时自旋的最小阀值作比较,在这里Doug Lea把超时自旋的阀值设置成了1000ns,即只有超时时间大于1000ns才会去挂起线程,否则,再次循环,以实现“自旋”操作。这是“自旋”在AQS中的应用之处。

 

看完await方法,我们再来看下countDown()方法:

Image [14]
调用了AQS的releaseShared方法,并传入了参数1:
Image [15]
同样先尝试去释放锁,tryReleaseShared同样为空方法,留给子类自己去实现,以下是CountDownLatch的内部类Sync的实现:
Image [16]

死循环更新state的值,实现state的减1操作,之所以用死循环是为了确保state值的更新成功。

从上文的分析中可知,如果state的值为0,在CountDownLatch中意味:所有的子线程已经执行完毕,这个时候可以唤醒调用await()方法的线程了,而这些线程正在AQS的队列中,并被挂起的,

所以下一步应该去唤醒AQS队列中的头结点了(AQS的队列为FIFO队列),然后由头节点去依次唤醒AQS队列中的其他共享节点。如果tryReleaseShared返回true,进入doReleaseShared()方法:


  private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) //如果当前节点是SIGNAL意味着,它正在等待一个信号,
                                                                                              //或者说,它在等待被唤醒,因此做两件事,
                                                                                              //1是重置waitStatus标志位,2是重置成功后,唤醒下一个节点。
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))  //如果本身头结点的waitStatus是出于重置状态(waitStatus==0)的,将其设置为“传播”状态。意味着需要将状态向后一个节点传播。
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
  }


 
 
当线程被唤醒后,会重新尝试获取共享锁,而对于CountDownLatch线程获取共享锁判断依据是state是否为0,而这个时候显然state已经变成了0,因此可以顺利获取共享锁并且依次唤醒AQS队里中后面的节点及对应的线程。
 

总结

     本文从CountDownLatch入手,深入分析了AQS关于共享锁方面的实现方式:

     如果获取共享锁失败后,将请求共享锁的线程封装成Node对象放入AQS的队列中,并挂起Node对象对应的线程,实现请求锁线程的等待操作。待共享锁可以被获取后,从头节点开始,依次唤醒头节点及其以后的所有共享类型的节点。实现共享状态的传播。这里有几点值得注意:
1.     与AQS的独占功能一样,共享锁是否可以被获取的判断为空方法,交由子类去实现。
2.     与AQS的独占功能不同,当锁被头节点获取后,独占功能是只有头节点获取锁,其余节点的线程继续沉睡,等待锁被释放后,才会唤醒下一个节点的线程,而共享功能是只要头节点获取锁成功,就在唤醒自身节点对应的线程的同时,继续唤醒AQS队列中的下一个节点的线程,每个节点在唤醒自身的同时还会唤醒下一个节点对应的线程,以实现共享状态的“向后传播”,从而实现共享功能。

以上的分析都是从AQS子类的角度去看待AQS的部分功能的,而如果直接看待AQS,或许可以这么去解读:
首先,AQS并不关心“是什么锁”,对于AQS来说它只是实现了一系列的用于判断“资源”是否可以访问的API,并且封装了在“访问资源”受限时将请求访问的线程的加入队列、挂起、唤醒等操作, AQS只关心“资源不可以访问时,怎么处理?”、“资源是可以被同时访问,还是在同一时间只能被一个线程访问?”、“如果有线程等不及资源了,怎么从AQS的队列中退出?”等一系列围绕资源访问的问题,而至于“资源是否可以被访问?”这个问题则交给AQS的子类去实现。
当AQS的子类是实现独占功能时,例如ReentrantLock,“资源是否可以被访问”被定义为只要AQS的state变量不为0,并且持有锁的线程不是当前线程,则代表资源不能访问。
当AQS的子类是实现共享功能时,例如:CountDownLatch,“资源是否可以被访问”被定义为只要AQS的state变量不为0,说明资源不能访问。这是典型的将规则和操作分开的设计思路:规则子类定义,操作逻辑因为具有公用性,放在父类中去封装。当然,正式因为AQS只是关心“资源在什么条件下可被访问”,所以子类还可以同时使用AQS的共享功能和独占功能的API以实现更为复杂的功能。
比如:ReentrantReadWriteLock,我们知道ReentrantReadWriteLock的中也有一个叫Sync的内部类继承了AQS,而AQS的队列可以同时存放共享锁和独占锁,对于ReentrantReadWriteLock来说分别代表读锁和写锁,当队列中的头节点为读锁时,代表读操作可以执行,而写操作不能执行,因此请求写操作的线程会被挂起,当读操作依次推出后,写锁成为头节点,请求写操作的线程被唤醒,可以执行写操作,而此时的读请求将被封装成Node放入AQS的队列中。如此往复,实现读写锁的读写交替进行。
而本系列文章上半部分提到的FutureTask,其实思路也是:封装一个存放线程执行结果的变量A,使用AQS的独占API实现线程对变量A的独占访问,判断规则是,线程没有执行完毕:call()方法没有返回前,不能访问变量A,或者是超时时间没到前不能访问变量A(这就是FutureTask的get方法可以实现获取线程执行结果时,设置超时时间的原因)。
综上所述,本系列文章从AQS独占锁和共享锁两个方面深入分析了AQS的实现方式和独特的设计思路,希望对读者有启发,下一篇文章,我们将继续JDK 1.8下 J.U.C (java.util.concurrent)包中的其他工具类,敬请期待。

目录
相关文章
|
3天前
|
存储 缓存 Java
Java 并发编程——volatile 关键字解析
本文介绍了Java线程中的`volatile`关键字及其与`synchronized`锁的区别。`volatile`保证了变量的可见性和一定的有序性,但不能保证原子性。它通过内存屏障实现,避免指令重排序,确保线程间数据一致。相比`synchronized`,`volatile`性能更优,适用于简单状态标记和某些特定场景,如单例模式中的双重检查锁定。文中还解释了Java内存模型的基本概念,包括主内存、工作内存及并发编程中的原子性、可见性和有序性。
Java 并发编程——volatile 关键字解析
|
3天前
|
缓存 算法 搜索推荐
Java中的算法优化与复杂度分析
在Java开发中,理解和优化算法的时间复杂度和空间复杂度是提升程序性能的关键。通过合理选择数据结构、避免重复计算、应用分治法等策略,可以显著提高算法效率。在实际开发中,应该根据具体需求和场景,选择合适的优化方法,从而编写出高效、可靠的代码。
15 6
|
1天前
|
Java 数据库连接 Spring
反射-----浅解析(Java)
在java中,我们可以通过反射机制,知道任何一个类的成员变量(成员属性)和成员方法,也可以堆任何一个对象,调用这个对象的任何属性和方法,更进一步我们还可以修改部分信息和。
|
26天前
|
Java 编译器
Java 泛型详细解析
本文将带你详细解析 Java 泛型,了解泛型的原理、常见的使用方法以及泛型的局限性,让你对泛型有更深入的了解。
39 2
Java 泛型详细解析
|
24天前
|
调度 开发者
核心概念解析:进程与线程的对比分析
在操作系统和计算机编程领域,进程和线程是两个基本而核心的概念。它们是程序执行和资源管理的基础,但它们之间存在显著的差异。本文将深入探讨进程与线程的区别,并分析它们在现代软件开发中的应用和重要性。
45 4
|
23天前
|
存储 算法 Java
Java内存管理深度解析####
本文深入探讨了Java虚拟机(JVM)中的内存分配与垃圾回收机制,揭示了其高效管理内存的奥秘。文章首先概述了JVM内存模型,随后详细阐述了堆、栈、方法区等关键区域的作用及管理策略。在垃圾回收部分,重点介绍了标记-清除、复制算法、标记-整理等多种回收算法的工作原理及其适用场景,并通过实际案例分析了不同GC策略对应用性能的影响。对于开发者而言,理解这些原理有助于编写出更加高效、稳定的Java应用程序。 ####
|
23天前
|
存储 监控 算法
Java虚拟机(JVM)垃圾回收机制深度解析与优化策略####
本文旨在深入探讨Java虚拟机(JVM)的垃圾回收机制,揭示其工作原理、常见算法及参数调优方法。通过剖析垃圾回收的生命周期、内存区域划分以及GC日志分析,为开发者提供一套实用的JVM垃圾回收优化指南,助力提升Java应用的性能与稳定性。 ####
|
25天前
|
Java 数据库连接 开发者
Java中的异常处理机制:深入解析与最佳实践####
本文旨在为Java开发者提供一份关于异常处理机制的全面指南,从基础概念到高级技巧,涵盖try-catch结构、自定义异常、异常链分析以及最佳实践策略。不同于传统的摘要概述,本文将以一个实际项目案例为线索,逐步揭示如何高效地管理运行时错误,提升代码的健壮性和可维护性。通过对比常见误区与优化方案,读者将获得编写更加健壮Java应用程序的实用知识。 --- ####
|
1月前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
76 2
|
2天前
|
存储 设计模式 算法
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
行为型模式用于描述程序在运行时复杂的流程控制,即描述多个类或对象之间怎样相互协作共同完成单个对象都无法单独完成的任务,它涉及算法与对象间职责的分配。行为型模式分为类行为模式和对象行为模式,前者采用继承机制来在类间分派行为,后者采用组合或聚合在对象间分配行为。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象行为模式比类行为模式具有更大的灵活性。 行为型模式分为: • 模板方法模式 • 策略模式 • 命令模式 • 职责链模式 • 状态模式 • 观察者模式 • 中介者模式 • 迭代器模式 • 访问者模式 • 备忘录模式 • 解释器模式
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析

推荐镜像

更多