万字超强图文讲解AQS以及ReentrantLock应用(建议收藏)(上)

简介: 万字超强图文讲解AQS以及ReentrantLock应用(建议收藏)

| 好看请赞,养成习惯


  • 你有一个思想,我有一个思想,我们交换后,一个人就有两个思想
  • If you can NOT explain it simply, you do NOT understand it well enough


现陆续将Demo代码和技术文章整理在一起 Github实践精选 ,方便大家阅读查看,本文同样收录在此,觉得不错,还请Star


微信图片_20220510180821.png


写在前面


进入源码阶段了,写了十几篇的 并发系列 知识铺垫终于要派上用场了。相信很多人已经忘了其中的一些理论知识,别担心,我会在源码环节带入相应的理论知识点帮助大家回忆,做到理论与实践相结合,另外这是超长图文,建议收藏,如果对你有用还请点赞让更多人看到


微信图片_20220510180846.png


Java SDK 为什么要设计 Lock

曾几何时幻想过,如果 Java 并发控制只有 synchronized 多好,只有下面三种使用方式,简单方便


public class ThreeSync {
    private static final Object object = new Object();
    public synchronized void normalSyncMethod(){
        //临界区
    }
    public static synchronized void staticSyncMethod(){
        //临界区
    }
    public void syncBlockMethod(){
        synchronized (object){
            //临界区
        }
    }
}


如果在 Java 1.5之前,确实是这样,自从 1.5 版本 Doug Lea 大师就重新造了一个轮子 Lock


微信图片_20220510180929.png


我们常说:“避免重复造轮子”,如果有了轮子还是要坚持再造个轮子,那么肯定传统的轮子在某些应用场景中不能很好的解决问题


微信图片_20220510180956.gif


不知你是否还记得 Coffman 总结的四个可以发生死锁的情形 ,其中【不可剥夺条件】是指:


线程已经获得资源,在未使用完之前,不能被剥夺,只能在使用完时自己释放


要想破坏这个条件,就需要具有申请不到进一步资源就释放已有资源的能力


很显然,这个能力是 synchronized 不具备的,使用 synchronized ,如果线程申请不到资源就会进入阻塞状态,我们做什么也改变不了它的状态,这是 synchronized 轮子的致命弱点,这就强有力的给了重造轮子 Lock 的理由


显式锁 Lock



旧轮子有弱点,新轮子就要解决这些问题,所以要具备不会阻塞的功能,下面的三个方案都是解决这个问题的好办法(看下面表格描述你就明白三个方案的含义了)


特性 描述 API
能响应中断 如果不能自己释放,那可以响应中断也是很好的。Java多线程中断机制 专门描述了中断过程,目的是通过中断信号来跳出某种状态,比如阻塞 lockInterruptbly()
非阻塞式的获取锁 尝试获取,获取不到不会阻塞,直接返回 tryLock()
支持超时 给定一个时间限制,如果一段时间内没获取到,不是进入阻塞状态,同样直接返回 tryLock(long time, timeUnit)


好的方案有了,但鱼和熊掌不可兼得,Lock 多了 synchronized 不具备的特性,自然不会像 synchronized 那样一个关键字三个玩法走遍全天下,在使用上也相对复杂了一丢丢


微信图片_20220510181023.png


Lock 使用范式


synchronized 有标准用法,这样的优良传统咱 Lock 也得有,相信很多人都知道使用 Lock 的一个范式


Lock lock = new ReentrantLock();
lock.lock();
try{
    ...
}finally{
    lock.unlock();
}


既然是范式(没事不要挑战更改写法的那种),肯定有其理由,我们来看一下


标准1—finally 中释放锁


这个大家应该都会明白,在 finally 中释放锁,目的是保证在获取到锁之后,最终能被释放


标准2—在 try{} 外面获取锁


不知道你有没有想过,为什么会有标准 2 的存在,我们通常是“喜欢” try 住所有内容,生怕发生异常不能捕获的

try{} 外获取锁主要考虑两个方面:


  1. 如果没有获取到锁就抛出异常,最终释放锁肯定是有问题的,因为还未曾拥有锁谈何释放锁呢


  1. 如果在获取锁时抛出了异常,也就是当前线程并未获取到锁,但执行到 finally 代码时,如果恰巧别的线程获取到了锁,则会被释放掉(无故释放)


不同锁的实现方式略有不同,范式的存在就是要避免一切问题的出现,所以大家尽量遵守范式


微信图片_20220510181105.png


Lock 是怎样起到锁的作用呢?


如果你熟悉 synchronized,你知道程序编译成 CPU 指令后,在临界区会有 moniterentermoniterexit 指令的出现,可以理解成进出临界区的标识

从范式上来看:


  • lock.lock() 获取锁,“等同于” synchronized 的 moniterenter指令


  • lock.unlock() 释放锁,“等同于” synchronized 的 moniterexit 指令


那 Lock 是怎么做到的呢?


这里先简单说明一下,这样一会到源码分析时,你可以远观设计轮廓,近观实现细节,会变得越发轻松


微信图片_20220510181134.png


其实很简单,比如在 ReentrantLock 内部维护了一个 volatile 修饰的变量 state,通过 CAS 来进行读写(最底层还是交给硬件来保证原子性和可见性),如果CAS更改成功,即获取到锁,线程进入到 try 代码块继续执行;如果没有更改成功,线程会被【挂起】,不会向下执行


但 Lock 是一个接口,里面根本没有 state 这个变量的存在:


微信图片_20220510181158.png


它怎么处理这个 state 呢?很显然需要一点设计的加成了,接口定义行为,具体都是需要实现类的


Lock 接口的实现类基本都是通过【聚合】了一个【队列同步器】的子类完成线程访问控制的


那什么是队列同步器呢? (这应该是你见过的最强标题党,聊了半个世纪才入正题,评论区留言骂我)


队列同步器 AQS


队列同步器 (AbstractQueuedSynchronizer),简称同步器或AQS,就是我们今天的主人公


问:为什么你分析 JUC 源码,要从 AQS 说起呢?

答:看下图


微信图片_20220510181224.png


相信看到这个截图你就明白一二了,你听过的,面试常被问起的,工作中常用的


  • ReentrantLock


  • ReentrantReadWriteLock


  • Semaphore(信号量)


  • CountDownLatch


  • 公平锁


  • 非公平锁



都和 AQS 有直接关系,所以了解 AQS 的抽象实现,在此基础上再稍稍查看上述各类的实现细节,很快就可以全部搞定,不至于查看源码时一头雾水,丢失主线


上面提到,在锁的实现类中会聚合同步器,然后利同步器实现锁的语义,那么问题来了:


为什么要用聚合模式,怎么进一步理解锁和同步器的关系呢?

微信图片_20220510181336.png


我们绝大多数都是在使用锁,实现锁之后,其核心就是要使用方便


微信图片_20220510181507.png


从 AQS 的类名称和修饰上来看,这是一个抽象类,所以从设计模式的角度来看同步器一定是基于【模版模式】来设计的,使用者需要继承同步器,实现自定义同步器,并重写指定方法,随后将同步器组合在自定义的同步组件中,并调用同步器的模版方法,而这些模版方法又回调用使用者重写的方法


我不想将上面的解释说的这么抽象,其实想理解上面这句话,我们只需要知道下面两个问题就好了


  1. 哪些是自定义同步器可重写的方法?


  1. 哪些是抽象同步器提供的模版方法?


同步器可重写的方法


同步器提供的可重写方法只有5个,这大大方便了锁的使用者:


微信图片_20220510181536.png


按理说,需要重写的方法也应该有 abstract 来修饰的,为什么这里没有?原因其实很简单,上面的方法我已经用颜色区分成了两类:


  • 独占式


  • 共享式


自定义的同步组件或者锁不可能既是独占式又是共享式,为了避免强制重写不相干方法,所以就没有 abstract 来修饰了,但要抛出异常告知不能直接使用该方法:


    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }


暖暖的很贴心(如果你有类似的需求也可以仿照这样的设计)


表格方法描述中所说的同步状态就是上文提到的有 volatile 修饰的 state,所以我们在重写上面几个方法时,还要通过同步器提供的下面三个方法(AQS 提供的)来获取或修改同步状态:


微信图片_20220510181627.png


而独占式和共享式操作 state 变量的区别也就很简单了


微信图片_20220510181654.png


所以你看到的 ReentrantLockReentrantReadWriteLockSemaphore(信号量)CountDownLatch 这几个类其实仅仅是在实现以上几个方法上略有差别,其他的实现都是通过同步器的模版方法来实现的,到这里是不是心情放松了许多呢?我们来看一看模版方法:


同步器提供的模版方法


上面我们将同步器的实现方法分为独占式和共享式两类,模版方法其实除了提供以上两类模版方法之外,只是多了响应中断超时限制 的模版方法供 Lock 使用,来看一下


微信图片_20220510181719.png


先不用记上述方法的功能,目前你只需要了解个大概功能就好。另外,相信你也注意到了:


上面的方法都有 final 关键字修饰,说明子类不能重写这个方法

看到这你也许有点乱了,我们稍微归纳一下:


微信图片_20220510181835.png


程序员还是看代码心里踏实一点,我们再来用代码说明一下上面的关系(注意代码中的注释,以下的代码并不是很严谨,只是为了简单说明上图的代码实现):


package top.dayarch.myjuc;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
/**
 * 自定义互斥锁
 *
 * @author tanrgyb
 * @date 2020/5/23 9:33 PM
 */
public class MyMutex implements Lock {
    // 静态内部类-自定义同步器
    private static class MySync extends AbstractQueuedSynchronizer{
        @Override
        protected boolean tryAcquire(int arg) {
            // 调用AQS提供的方法,通过CAS保证原子性
            if (compareAndSetState(0, arg)){
                // 我们实现的是互斥锁,所以标记获取到同步状态(更新state成功)的线程,
                // 主要为了判断是否可重入(一会儿会说明)
                setExclusiveOwnerThread(Thread.currentThread());
                //获取同步状态成功,返回 true
                return true;
            }
            // 获取同步状态失败,返回 false
            return false;
        }
        @Override
        protected boolean tryRelease(int arg) {
            // 未拥有锁却让释放,会抛出IMSE
            if (getState() == 0){
                throw new IllegalMonitorStateException();
            }
            // 可以释放,清空排它线程标记
            setExclusiveOwnerThread(null);
            // 设置同步状态为0,表示释放锁
            setState(0);
            return true;
        }
        // 是否独占式持有
        @Override
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }
        // 后续会用到,主要用于等待/通知机制,每个condition都有一个与之对应的条件等待队列,在锁模型中说明过
        Condition newCondition() {
            return new ConditionObject();
        }
    }
  // 聚合自定义同步器
    private final MySync sync = new MySync();
    @Override
    public void lock() {
        // 阻塞式的获取锁,调用同步器模版方法独占式,获取同步状态
        sync.acquire(1);
    }
    @Override
    public void lockInterruptibly() throws InterruptedException {
        // 调用同步器模版方法可中断式获取同步状态
        sync.acquireInterruptibly(1);
    }
    @Override
    public boolean tryLock() {
        // 调用自己重写的方法,非阻塞式的获取同步状态
        return sync.tryAcquire(1);
    }
    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        // 调用同步器模版方法,可响应中断和超时时间限制
        return sync.tryAcquireNanos(1, unit.toNanos(time));
    }
    @Override
    public void unlock() {
        // 释放锁
        sync.release(1);
    }
    @Override
    public Condition newCondition() {
        // 使用自定义的条件
        return sync.newCondition();
    }
}


如果你现在打开 IDE, 你会发现上文提到的 ReentrantLockReentrantReadWriteLockSemaphore(信号量)CountDownLatch 都是按照这个结构实现,所以我们就来看一看 AQS 的模版方法到底是怎么实现锁


AQS实现分析


从上面的代码中,你应该理解了lock.tryLock() 非阻塞式获取锁就是调用自定义同步器重写的 tryAcquire() 方法,通过 CAS 设置state 状态,不管成功与否都会马上返回;那么 lock.lock() 这种阻塞式的锁是如何实现的呢?


有阻塞就需要排队,实现排队必然需要队列


CLH:Craig、Landin and Hagersten 队列,是一个单向链表,AQS中的队列是CLH变体的虚拟双向队列(FIFO)——概念了解就好,不要记


队列中每个排队的个体就是一个 Node,所以我们来看一下 Node 的结构


Node 节点


AQS 内部维护了一个同步队列,用于管理同步状态。


  • 当线程获取同步状态失败时,就会将当前线程以及等待状态等信息构造成一个 Node 节点,将其加入到同步队列中尾部,阻塞该线程


  • 当同步状态被释放时,会唤醒同步队列中“首节点”的线程获取同步状态


为了将上述步骤弄清楚,我们需要来看一看 Node 结构 (如果你能打开 IDE 一起看那是极好的)


微信图片_20220510181954.png


乍一看有点杂乱,我们还是将其归类说明一下:


微信图片_20220510182016.png


上面这几个状态说明有个印象就好,有了Node 的结构说明铺垫,你也就能想象同步队列的接本结构了:


微信图片_20220510182038.png


前置知识基本铺垫完毕,我们来看一看独占式获取同步状态的整个过程


独占式获取同步状态


故事要从范式lock.lock() 开始


public void lock() {
    // 阻塞式的获取锁,调用同步器模版方法,获取同步状态
    sync.acquire(1);
}


进入AQS的模版方法 acquire()


public final void acquire(int arg) {
  // 调用自定义同步器重写的 tryAcquire 方法
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}


首先,也会尝试非阻塞的获取同步状态,如果获取失败(tryAcquire返回false),则会调用 addWaiter 方法构造 Node 节点(Node.EXCLUSIVE 独占式)并安全的(CAS)加入到同步队列【尾部】


    private Node addWaiter(Node mode) {
          // 构造Node节点,包含当前线程信息以及节点模式【独占/共享】
        Node node = new Node(Thread.currentThread(), mode);
          // 新建变量 pred 将指针指向tail指向的节点
        Node pred = tail;
          // 如果尾节点不为空
        if (pred != null) {
              // 新加入的节点前驱节点指向尾节点
            node.prev = pred;
              // 因为如果多个线程同时获取同步状态失败都会执行这段代码
            // 所以,通过 CAS 方式确保安全的设置当前节点为最新的尾节点
            if (compareAndSetTail(pred, node)) {
                  // 曾经的尾节点的后继节点指向当前节点
                pred.next = node;
                  // 返回新构建的节点
                return node;
            }
        }
          // 尾节点为空,说明当前节点是第一个被加入到同步队列中的节点
          // 需要一个入队操作
        enq(node);
        return node;
    }
    private Node enq(final Node node) {
          // 通过“死循环”确保节点被正确添加,最终将其设置为尾节点之后才会返回,这里使用 CAS 的理由和上面一样
        for (;;) {
            Node t = tail;
              // 第一次循环,如果尾节点为 null
            if (t == null) { // Must initialize
                  // 构建一个哨兵节点,并将头部指针指向它
                if (compareAndSetHead(new Node()))
                      // 尾部指针同样指向哨兵节点
                    tail = head;
            } else {
                  // 第二次循环,将新节点的前驱节点指向t
                node.prev = t;
                  // 将新节点加入到队列尾节点
                if (compareAndSetTail(t, node)) {
                      // 前驱节点的后继节点指向当前新节点,完成双向队列
                    t.next = node;
                    return t;
                }
            }
        }
    }


你可能比较迷惑 enq() 的处理方式,进入该方法就是一个“死循环”,我们就用图来描述它是怎样跳出循环的


微信图片_20220511094411.png


有些同学可能会有疑问,为什么会有哨兵节点?


哨兵,顾名思义,是用来解决国家之间边界问题的,不直接参与生产活动。同样,计算机科学中提到的哨兵,也用来解决边界问题,如果没有边界,指定环节,按照同样算法可能会在边界处发生异常,比如要继续向下分析的 acquireQueued() 方法


    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
              // "死循环",尝试获取锁,或者挂起
            for (;;) {
                  // 获取当前节点的前驱节点
                final Node p = node.predecessor();
                  // 只有当前节点的前驱节点是头节点,才会尝试获取锁
                  // 看到这你应该理解添加哨兵节点的含义了吧
                if (p == head && tryAcquire(arg)) {
                      // 获取同步状态成功,将自己设置为头
                    setHead(node);
                      // 将哨兵节点的后继节点置为空,方便GC
                    p.next = null; // help GC
                    failed = false;
                      // 返回中断标识
                    return interrupted;
                }
                  // 当前节点的前驱节点不是头节点
                  //【或者】当前节点的前驱节点是头节点但获取同步状态失败
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }


获取同步状态成功会返回可以理解了,但是如果失败就会一直陷入到“死循环”中浪费资源吗?很显然不是,shouldParkAfterFailedAcquire(p, node)parkAndCheckInterrupt() 就会将线程获取同步状态失败的线程挂起,我们继续向下看


    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
          // 获取前驱节点的状态
        int ws = pred.waitStatus;
          // 如果是 SIGNAL 状态,即等待被占用的资源释放,直接返回 true
          // 准备继续调用 parkAndCheckInterrupt 方法
        if (ws == Node.SIGNAL)
            return true;
          // ws 大于0说明是CANCELLED状态,
        if (ws > 0) {
            // 循环判断前驱节点的前驱节点是否也为CANCELLED状态,忽略该状态的节点,重新连接队列
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
              // 将当前节点的前驱节点设置为设置为 SIGNAL 状态,用于后续唤醒操作
              // 程序第一次执行到这返回为false,还会进行外层第二次循环,最终从代码第7行返回
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }


到这里你也许有个问题:


这个地方设置前驱节点为 SIGNAL 状态到底有什么作用?


保留这个问题,我们陆续揭晓


如果前驱节点的 waitStatus 是 SIGNAL状态,即 shouldParkAfterFailedAcquire 方法会返回 true ,程序会继续向下执行 parkAndCheckInterrupt 方法,用于将当前线程挂起


    private final boolean parkAndCheckInterrupt() {
          // 线程挂起,程序不会继续向下执行
        LockSupport.park(this);
          // 根据 park 方法 API描述,程序在下述三种情况会继续向下执行
          //     1. 被 unpark 
          //     2. 被中断(interrupt)
          //     3. 其他不合逻辑的返回才会继续向下执行
          // 因上述三种情况程序执行至此,返回当前线程的中断状态,并清空中断状态
          // 如果由于被中断,该方法会返回 true
        return Thread.interrupted();
    }


被唤醒的程序会继续执行 acquireQueued 方法里的循环,如果获取同步状态成功,则会返回 interrupted = true 的结果


程序继续向调用栈上层返回,最终回到 AQS 的模版方法 acquire


public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}


你也许会有疑惑:


程序已经成功获取到同步状态并返回了,怎么会有个自我中断呢?


static void selfInterrupt() {
    Thread.currentThread().interrupt();
}


微信图片_20220511094609.png


如果你不能理解中断,强烈建议你回看 Java多线程中断机制

相关文章
|
算法 Java 程序员
5千字详细讲解java并发编程的AQS
本文讲解AQS的组成,实现原理,应用,源码解析
5千字详细讲解java并发编程的AQS
|
安全 Java
AQS是什么?Java并发编程大师的源码不得不拜读呀
今天把ReentrantLock和AQS一起翻一翻,通过源码说一说我们的Java锁,先上一个目录: 一、初识ReentrantLock 二、什么是AQS ? 三、AQS中的同步状态state 四、CLH变体队列 五、独占模式 六、共享模式 七、公平锁&非公平锁 八、结语
AQS是什么?Java并发编程大师的源码不得不拜读呀
万字超强图文讲解AQS以及ReentrantLock应用(建议收藏)(下)
万字超强图文讲解AQS以及ReentrantLock应用(建议收藏)(下)
万字超强图文讲解AQS以及ReentrantLock应用(建议收藏)(下)
别走!这里有个笔记:图文讲解 AQS ,一起看看 AQS 的源码……(图文较长)(一)
AbstractQueuedSynchronizer 抽象队列同步器,简称 AQS 。是在 JUC 包下面一个非常重要的基础组件,JUC 包下面的并发锁 ReentrantLock CountDownLatch 等都是基于 AQS 实现的。所以想进一步研究锁的底层原理,非常有必要先了解 AQS 的原理。
115 0
|
机器学习/深度学习 安全 Java
《提升能力,涨薪可待》-Java并发之AQS全面详解
在工作上必须保持学习的能力,这样才能在工作得到更好的晋升,涨薪指日可待,欢迎一起学习【提升能力,涨薪可待】系列
134 0
《提升能力,涨薪可待》-Java并发之AQS全面详解
|
存储 安全 Java
史上最全的Java并发系列之Java多线程(二)(上)
前言 文本已收录至我的GitHub仓库,欢迎Star:github.com/bin39232820… 种一棵树最好的时间是十年前,其次是现在
124 0
|
Java
史上最全的Java并发系列之Java多线程(二)(下)
前言 文本已收录至我的GitHub仓库,欢迎Star:github.com/bin39232820… 种一棵树最好的时间是十年前,其次是现在
133 0
|
Java 开发者
java并发编程的艺术(6)深入挖掘aqs独占锁源码
java并发编程的艺术(6)深入挖掘aqs独占锁源码
256 0
|
Java
java并发编程的艺术(5)CountDownLatch笔记
java并发编程的艺术(5)CountDownLatch笔记
109 0