万字超强图文讲解AQS以及ReentrantLock应用(建议收藏)(下)

简介: 万字超强图文讲解AQS以及ReentrantLock应用(建议收藏)(下)

到这里关于获取同步状态我们还遗漏了一条线,acquireQueued 的 finally 代码块如果你仔细看你也许马上就会有疑惑:


到底什么情况才会执行 if(failed) 里面的代码 ?


if (failed)
  cancelAcquire(node);


这段代码被执行的条件是 failed 为 true,正常情况下,如果跳出循环,failed 的值为false,如果不能跳出循环貌似怎么也不能执行到这里,所以只有不正常的情况才会执行

到这里,也就是会发生异常,才会执行到此处


查看 try 代码块,只有两个方法会抛出异常:


  • node.processor() 方法


  • 自己重写的 tryAcquire() 方法


先看前者:


微信图片_20220511095357.png


很显然,这里抛出的异常不是重点,那就以 ReentrantLock 重写的 tryAcquire() 方法为例


微信图片_20220511095449.png


另外,上面分析 shouldParkAfterFailedAcquire 方法还对 CANCELLED 的状态进行了判断,那么


什么时候会生成取消状态的节点呢?


答案就在 cancelAcquire 方法中, 我们来看看 cancelAcquire到底怎么设置/处理 CANNELLED 的


    private void cancelAcquire(Node node) {
        // 忽略无效节点
        if (node == null)
            return;
                // 将关联的线程信息清空
        node.thread = null;
        // 跳过同样是取消状态的前驱节点
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;
        // 跳出上面循环后找到前驱有效节点,并获取该有效节点的后继节点
        Node predNext = pred.next;
        // 将当前节点的状态置为 CANCELLED
        node.waitStatus = Node.CANCELLED;
        // 如果当前节点处在尾节点,直接从队列中删除自己就好
        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {
            int ws;
              // 1. 如果当前节点的有效前驱节点不是头节点,也就是说当前节点不是头节点的后继节点
            if (pred != head &&
                // 2. 判断当前节点有效前驱节点的状态是否为 SIGNAL
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 // 3. 如果不是,尝试将前驱节点的状态置为 SIGNAL
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                // 判断当前节点有效前驱节点的线程信息是否为空
                pred.thread != null) {
                  // 上述条件满足
                Node next = node.next;
                  // 将当前节点有效前驱节点的后继节点指针指向当前节点的后继节点
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                  // 如果当前节点的前驱节点是头节点,或者上述其他条件不满足,就唤醒当前节点的后继节点
                unparkSuccessor(node);
            }
            node.next = node; // help GC
        }


看到这个注释你可能有些乱了,其核心目的就是从等待队列中移除 CANCELLED 的节点,并重新拼接整个队列,总结来看,其实设置 CANCELLED 状态节点只是有三种情况,我们通过画图来分析一下:


微信图片_20220511095629.png


微信图片_20220511095646.png


微信图片_20220511095706.png


至此,获取同步状态的过程就结束了,我们简单的用流程图说明一下整个过程


微信图片_20220511095741.png


获取锁的过程就这样的结束了,先暂停几分钟整理一下自己的思路。我们上面还没有说明 SIGNAL 的作用, SIGNAL 状态信号到底是干什么用的?这就涉及到锁的释放了,我们来继续了解,整体思路和锁的获取是一样的, 但是释放过程就相对简单很多了


独占式释放同步状态


故事要从 unlock() 方法说起


    public void unlock() {
        // 释放锁
        sync.release(1);
    }


调用 AQS 模版方法 release,进入该方法


    public final boolean release(int arg) {
          // 调用自定义同步器重写的 tryRelease 方法尝试释放同步状态
        if (tryRelease(arg)) {
              // 释放成功,获取头节点
            Node h = head;
              // 存在头节点,并且waitStatus不是初始状态
              // 通过获取的过程我们已经分析了,在获取的过程中会将 waitStatus的值从初始状态更新成 SIGNAL 状态
            if (h != null && h.waitStatus != 0)
                  // 解除线程挂起状态
                unparkSuccessor(h);
            return true;
        }
        return false;
    }


查看 unparkSuccessor 方法,实际是要唤醒头节点的后继节点


    private void unparkSuccessor(Node node) {      
          // 获取头节点的waitStatus
        int ws = node.waitStatus;
        if (ws < 0)
              // 清空头节点的waitStatus值,即置为0
            compareAndSetWaitStatus(node, ws, 0);
          // 获取头节点的后继节点
        Node s = node.next;
          // 判断当前节点的后继节点是否是取消状态,如果是,需要移除,重新连接队列
        if (s == null || s.waitStatus > 0) {
            s = null;
              // 从尾节点向前查找,找到队列第一个waitStatus状态小于0的节点
            for (Node t = tail; t != null && t != node; t = t.prev)
                  // 如果是独占式,这里小于0,其实就是 SIGNAL
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
              // 解除线程挂起状态
            LockSupport.unpark(s.thread);
    }


有同学可能有疑问:


为什么这个地方是从队列尾部向前查找不是 CANCELLED 的节点?


原因有两个:


第一,先回看节点加入队列的情景:


    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }


节点入队并不是原子操作,代码第6、7行


node.prev = pred; 
compareAndSetTail(pred, node) 


这两个地方可以看作是尾节点入队的原子操作,如果此时代码还没执行到 pred.next = node; 这时又恰巧执行了unparkSuccessor方法,就没办法从前往后找了,因为后继指针还没有连接起来,所以需要从后往前找


第二点原因,在上面图解产生 CANCELLED 状态节点的时候,先断开的是 Next 指针,Prev指针并未断开,因此这也是必须要从后往前遍历才能够遍历完全部的Node

同步状态至此就已经成功释放了,之前获取同步状态被挂起的线程就会被唤醒,继续从下面代码第 3 行返回执行:


    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }


继续返回上层调用栈, 从下面代码15行开始执行,重新执行循环,再次尝试获取同步状态


    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }


到这里,关于独占式获取/释放锁的流程已经闭环了,但是关于 AQS 的另外两个模版方法还没有介绍


  • 响应中断


  • 超时限制


微信图片_20220511100131.png


独占式响应中断获取同步状态


故事要从lock.lockInterruptibly() 方法说起

    public void lockInterruptibly() throws InterruptedException {
        // 调用同步器模版方法可中断式获取同步状态
        sync.acquireInterruptibly(1);
    }


有了前面的理解,理解独占式可响应中断的获取同步状态方式,真是一眼就能明白了:


    public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
          // 尝试非阻塞式获取同步状态失败,如果没有获取到同步状态,执行代码7行
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }


继续查看 doAcquireInterruptibly 方法:


    private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                      // 获取中断信号后,不再返回 interrupted = true 的值,而是直接抛出 InterruptedException 
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }


没想到 JDK 内部也有如此相近的代码,可响应中断获取锁没什么深奥的,就是被中断抛出 InterruptedException 异常(代码第17行),这样就逐层返回上层调用栈捕获该异常进行下一步操作了


趁热打铁,来看看另外一个模版方法:


独占式超时限制获取同步状态


这个很好理解,就是给定一个时限,在该时间段内获取到同步状态,就返回 true, 否则,返回 false。好比线程给自己定了一个闹钟,闹铃一响,线程就自己返回了,这就不会使自己是阻塞状态了


既然涉及到超时限制,其核心逻辑肯定是计算时间间隔,因为在超时时间内,肯定是多次尝试获取锁的,每次获取锁肯定有时间消耗,所以计算时间间隔的逻辑就像我们在程序打印程序耗时 log 那么简单


nanosTimeout = deadline - System.nanoTime()


故事要从 lock.tryLock(time, unit) 方法说起


    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        // 调用同步器模版方法,可响应中断和超时时间限制
        return sync.tryAcquireNanos(1, unit.toNanos(time));
    }


来看 tryAcquireNanos 方法


    public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquire(arg) ||
            doAcquireNanos(arg, nanosTimeout);
    }


是不是和上面 acquireInterruptibly 方法长相很详细了,继续查看来 doAcquireNanos 方法,看程序, 该方法也是 throws InterruptedException,我们在中断文章中说过,方法标记上有 throws InterruptedException 说明该方法也是可以响应中断的,所以你可以理解超时限制


acquireInterruptibly 方法的加强版,具有超时和非阻塞控制的双保险


    private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
          // 超时时间内,为获取到同步状态,直接返回false
        if (nanosTimeout <= 0L)
            return false;
          // 计算超时截止时间
        final long deadline = System.nanoTime() + nanosTimeout;
          // 以独占方式加入到同步队列中
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                  // 计算新的超时时间
                nanosTimeout = deadline - System.nanoTime();
                  // 如果超时,直接返回 false
                if (nanosTimeout <= 0L)
                    return false;
                if (shouldParkAfterFailedAcquire(p, node) &&
                        // 判断是最新超时时间是否大于阈值 1000    
                    nanosTimeout > spinForTimeoutThreshold)
                      // 挂起线程 nanosTimeout 长时间,时间到,自动返回
                    LockSupport.parkNanos(this, nanosTimeout);
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }


上面的方法应该不是很难懂,但是又同学可能在第 27 行上有所困惑


为什么 nanosTimeout 和 自旋超时阈值1000进行比较?


    /**
     * The number of nanoseconds for which it is faster to spin
     * rather than to use timed park. A rough estimate suffices
     * to improve responsiveness with very short timeouts.
     */
    static final long spinForTimeoutThreshold = 1000L;


其实 doc 说的很清楚,说白了,1000 nanoseconds 时间已经非常非常短暂了,没必要再执行挂起和唤醒操作了,不如直接当前线程直接进入下一次循环


到这里,我们自定义的 MyMutex 只差 Condition 没有说明了,不知道你累了吗?我还在坚持


微信图片_20220511100529.png


Condition


如果你看过之前写的 并发编程之等待通知机制 ,你应该对下面这个图是有印象的:


微信图片_20220511100617.png


如果当时你理解了这个模型,再看 Condition 的实现,根本就不是问题了,首先 Condition 还是一个接口,肯定也是需要有实现类的


微信图片_20220511100705.png


那故事就从 lock.newnewCondition 说起吧


    public Condition newCondition() {
        // 使用自定义的条件
        return sync.newCondition();
    }


自定义同步器重封装了该方法:


        Condition newCondition() {
            return new ConditionObject();
        }


ConditionObject 就是 Condition 的实现类,该类就定义在了 AQS 中,只有两个成员变量:


/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;


所以,我们只需要来看一下 ConditionObject 实现的 await / signal 方法来使用这两个成员变量就可以了


        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
              // 同样构建 Node 节点,并加入到等待队列中
            Node node = addConditionWaiter();
              // 释放同步状态
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                  // 挂起当前线程
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }


这里注意用词,在介绍获取同步状态时,addWaiter 是加入到【同步队列】,就是上图说的入口等待队列,这里说的是【等待队列】,所以 addConditionWaiter 肯定是构建了一个自己的队列:


        private Node addConditionWaiter() {
            Node t = lastWaiter;
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
              // 新构建的节点的 waitStatus 是 CONDITION,注意不是 0 或 SIGNAL 了
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
              // 构建单向同步队列
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }


这里有朋友可能会有疑问:


为什么这里是单向队列,也没有使用CAS 来保证加入队列的安全性呢?


因为 await 是 Lock 范式 try 中使用的,说明已经获取到锁了,所以就没必要使用 CAS 了,至于是单向,因为这里还不涉及到竞争锁,只是做一个条件等待队列

在 Lock 中可以定义多个条件,每个条件都会对应一个 条件等待队列,所以将上图丰富说明一下就变成了这个样子:


微信图片_20220511100915.png



线程已经按相应的条件加入到了条件等待队列中,那如何再尝试获取锁呢?signal / signalAll 方法就已经排上用场了


        public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }


Signal 方法通过调用 doSignal 方法,只唤醒条件等待队列中的第一个节点


        private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
                  // 调用该方法,将条件等待队列的线程节点移动到同步队列中
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }


继续看 transferForSignal 方法


    final boolean transferForSignal(Node node) {       
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
           // 重新进行入队操作
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
              // 唤醒同步队列中该线程
            LockSupport.unpark(node.thread);
        return true;
    }

所以我们再用图解一下唤醒的整个过程


微信图片_20220511101045.png


到这里,理解 signalAll 就非常简单了,只不过循环判断是否还有 nextWaiter,如果有就像 signal 操作一样,将其从条件等待队列中移到同步队列中


        private void doSignalAll(Node first) {
            lastWaiter = firstWaiter = null;
            do {
                Node next = first.nextWaiter;
                first.nextWaiter = null;
                transferForSignal(first);
                first = next;
            } while (first != null);
        }


不知你还是否记得,我在并发编程之等待通知机制 中还说过一句话


没有特殊原因尽量用 signalAll 方法


什么时候可以用 signal 方法也在其中做了说明,请大家自行查看吧


微信图片_20220511101128.png

这里我还要多说一个细节,从条件等待队列移到同步队列是有时间差的,所以使用 await() 方法也是范式的, 同样在该文章中做了解释


微信图片_20220511101147.png


有时间差,就会有公平和不公平的问题,想要全面了解这个问题,我们就要走近 ReentrantLock 中来看了,除了了解公平/不公平问题,查看 ReentrantLock 的应用还是要反过来验证它使用的AQS的,我们继续吧


ReentrantLock 是如何应用的AQS


独占式的典型应用就是 ReentrantLock 了,我们来看看它是如何重写这个方法的


微信图片_20220511101239.png


乍一看挺奇怪的,怎么里面自定义了三个同步器:其实 NonfairSync,FairSync 只是对 Sync 做了进一步划分:


微信图片_20220511101303.png


从名称上你应该也知道了,这就是你听到过的 公平锁/非公平锁


何为公平锁/非公平锁?


生活中,排队讲求先来后到视为公平。程序中的公平性也是符合请求锁的绝对时间的,其实就是 FIFO,否则视为不公平


我们来对比一下 ReentrantLock 是如何实现公平锁和非公平锁的


微信图片_20220511101331.png


其实没什么大不了,公平锁就是判断同步队列是否还有先驱节点的存在,只有没有先驱节点才能获取锁;而非公平锁是不管这个事的,能获取到同步状态就可以,就这么简单,那问题来了:


为什么会有公平锁/非公平锁的设计?


考虑这个问题,我们需重新回忆上面的锁获取实现图了,其实上面我已经透露了一点


微信图片_20220511101400.png


主要有两点原因:


原因一:


恢复挂起的线程到真正锁的获取还是有时间差的,从人类的角度来看这个时间微乎其微,但是从CPU的角度来看,这个时间差存在的还是很明显的。所以非公平锁能更充分的利用 CPU 的时间片,尽量减少 CPU 空闲状态时间


原因二:


不知你是否还记得我在 面试问,创建多少个线程合适? 文章中反复提到过,使用多线程很重要的考量点是线程切换的开销,想象一下,如果采用非公平锁,当一个线程请求锁获取同步状态,然后释放同步状态,因为不需要考虑是否还有前驱节点,所以刚释放锁的线程在此刻再次获取同步状态的几率就变得非常大,所以就减少了线程的开销


微信图片_20220511101434.png


相信到这里,你也就明白了,为什么 ReentrantLock 默认构造器用的是非公平锁同步器


    public ReentrantLock() {
        sync = new NonfairSync();
    }


看到这里,感觉非公平锁 perfect,非也,有得必有失


使用公平锁会有什么问题?


公平锁保证了排队的公平性,非公平锁霸气的忽视这个规则,所以就有可能导致排队的长时间在排队,也没有机会获取到锁,这就是传说中的 “饥饿”


如何选择公平锁/非公平锁?


相信到这里,答案已经在你心中了,如果为了更高的吞吐量,很显然非公平锁是比较合适的,因为节省很多线程切换时间,吞吐量自然就上去了,否则那就用公平锁还大家一个公平


我们还差最后一个环节,真的要挺住


可重入锁


到这里,我们还没分析 ReentrantLock 的名字,JDK 起名这么有讲究,肯定有其含义,直译过来【可重入锁】


为什么要支持锁的重入?


试想,如果是一个有 synchronized 修饰的递归调用方法,程序第二次进入被自己阻塞了岂不是很大的笑话,所以 synchronized 是支持锁的重入的


Lock 是新轮子,自然也要支持这个功能,其实现也很简单,请查看公平锁和非公平锁对比图,其中有一段代码:


// 判断当前线程是否和已占用锁的线程是同一个
else if (current == getExclusiveOwnerThread())


仔细看代码, 你也许发现,我前面的一个说明是错误的,我要重新解释一下


微信图片_20220511101556.png

重入的线程会一直将 state + 1, 释放锁会 state - 1直至等于0,上面这样写也是想帮助大家快速的区分


总结


本文是一个长文,说明了为什么要造 Lock 新轮子,如何标准的使用 Lock,AQS 是什么,是如何实现锁的,结合 ReentrantLock 反推 AQS 中的一些应用以及其独有的一些特性


独占式获取锁就这样介绍完了,我们还差 AQS 共享式 xxxShared 没有分析,结合共享式,接下来我们来阅读一下 Semaphore,ReentrantReadWriteLock 和 CountLatch 等

最后,也欢迎大家的留言,如有错误之处还请指出。我的手酸了,眼睛干了,我去准备撸下一篇.....






目录
打赏
0
0
0
0
1
分享
相关文章
万字详解并发编程!!!
本文介绍了并发编程的基本概念和技术,涵盖了操作系统的发展历程、进程与线程的原理和使用方法。主要内容包括: 操作系统发展史:从手工操作到多道程序系统、分时系统、实时系统,再到通用操作系统,逐步介绍了操作系统的演变过程。 并发编程技术:强调并发编程的目标是充分利用CPU资源,提高系统性能 进程:详细讲解了进程的概念、组成、状态、调度算法、进程间通信(IPC)以及守护进程和僵尸进程等问题。 线:介绍了线程的基本概念、与进程的区别、线程的创建、多线程共享资源、线程同步与互斥锁、递归锁和死锁问题 5. **队列**:讲解了队列的基本概念,包括先进先出队列、后进先出队列和优先级队列,并提供了具体的实现示例
150 38
5千字详细讲解java并发编程的AQS
本文讲解AQS的组成,实现原理,应用,源码解析
5千字详细讲解java并发编程的AQS
AQS是什么?Java并发编程大师的源码不得不拜读呀
今天把ReentrantLock和AQS一起翻一翻,通过源码说一说我们的Java锁,先上一个目录: 一、初识ReentrantLock 二、什么是AQS ? 三、AQS中的同步状态state 四、CLH变体队列 五、独占模式 六、共享模式 七、公平锁&非公平锁 八、结语
AQS是什么?Java并发编程大师的源码不得不拜读呀
别走!这里有个笔记:图文讲解 AQS ,一起看看 AQS 的源码……(图文较长)(一)
AbstractQueuedSynchronizer 抽象队列同步器,简称 AQS 。是在 JUC 包下面一个非常重要的基础组件,JUC 包下面的并发锁 ReentrantLock CountDownLatch 等都是基于 AQS 实现的。所以想进一步研究锁的底层原理,非常有必要先了解 AQS 的原理。
118 0
史上最全的Java并发系列之Java多线程(二)(下)
前言 文本已收录至我的GitHub仓库,欢迎Star:github.com/bin39232820… 种一棵树最好的时间是十年前,其次是现在
139 0
史上最全的Java并发系列之Java多线程(二)(上)
前言 文本已收录至我的GitHub仓库,欢迎Star:github.com/bin39232820… 种一棵树最好的时间是十年前,其次是现在
127 0
《提升能力,涨薪可待》-Java并发之AQS全面详解
在工作上必须保持学习的能力,这样才能在工作得到更好的晋升,涨薪指日可待,欢迎一起学习【提升能力,涨薪可待】系列
138 0
《提升能力,涨薪可待》-Java并发之AQS全面详解
史上最全的Java并发系列之并发编程的挑战
前言 文本已收录至我的GitHub仓库,欢迎Star:github.com/bin39232820… 种一棵树最好的时间是十年前,其次是现在
173 0
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等