| 好看请赞,养成习惯
- 你有一个思想,我有一个思想,我们交换后,一个人就有两个思想
- If you can NOT explain it simply, you do NOT understand it well enough
现陆续将Demo代码和技术文章整理在一起 Github实践精选 ,方便大家阅读查看,本文同样收录在此,觉得不错,还请Star
写在前面
进入源码阶段了,写了十几篇的 并发系列 知识铺垫终于要派上用场了。相信很多人已经忘了其中的一些理论知识,别担心,我会在源码环节带入相应的理论知识点帮助大家回忆,做到理论与实践相结合,另外这是超长图文,建议收藏,如果对你有用还请点赞让更多人看到
Java SDK 为什么要设计 Lock
曾几何时幻想过,如果 Java 并发控制只有 synchronized 多好,只有下面三种使用方式,简单方便
public class ThreeSync { private static final Object object = new Object(); public synchronized void normalSyncMethod(){ //临界区 } public static synchronized void staticSyncMethod(){ //临界区 } public void syncBlockMethod(){ synchronized (object){ //临界区 } } }
如果在 Java 1.5之前,确实是这样,自从 1.5 版本 Doug Lea 大师就重新造了一个轮子 Lock
我们常说:“避免重复造轮子”,如果有了轮子还是要坚持再造个轮子,那么肯定传统的轮子在某些应用场景中不能很好的解决问题
不知你是否还记得 Coffman 总结的四个可以发生死锁的情形 ,其中【不可剥夺条件】是指:
线程已经获得资源,在未使用完之前,不能被剥夺,只能在使用完时自己释放
要想破坏这个条件,就需要具有申请不到进一步资源就释放已有资源的能力
很显然,这个能力是 synchronized 不具备的,使用 synchronized ,如果线程申请不到资源就会进入阻塞状态,我们做什么也改变不了它的状态,这是 synchronized 轮子的致命弱点,这就强有力的给了重造轮子 Lock 的理由
显式锁 Lock
旧轮子有弱点,新轮子就要解决这些问题,所以要具备不会阻塞的功能,下面的三个方案都是解决这个问题的好办法(看下面表格描述你就明白三个方案的含义了)
特性 | 描述 | API |
能响应中断 | 如果不能自己释放,那可以响应中断也是很好的。Java多线程中断机制 专门描述了中断过程,目的是通过中断信号来跳出某种状态,比如阻塞 | lockInterruptbly() |
非阻塞式的获取锁 | 尝试获取,获取不到不会阻塞,直接返回 | tryLock() |
支持超时 | 给定一个时间限制,如果一段时间内没获取到,不是进入阻塞状态,同样直接返回 | tryLock(long time, timeUnit) |
好的方案有了,但鱼和熊掌不可兼得,Lock 多了 synchronized 不具备的特性,自然不会像 synchronized 那样一个关键字三个玩法走遍全天下,在使用上也相对复杂了一丢丢
Lock 使用范式
synchronized 有标准用法,这样的优良传统咱 Lock 也得有,相信很多人都知道使用 Lock 的一个范式
Lock lock = new ReentrantLock(); lock.lock(); try{ ... }finally{ lock.unlock(); }
既然是范式(没事不要挑战更改写法的那种),肯定有其理由,我们来看一下
标准1—finally 中释放锁
这个大家应该都会明白,在 finally 中释放锁,目的是保证在获取到锁之后,最终能被释放
标准2—在 try{} 外面获取锁
不知道你有没有想过,为什么会有标准 2 的存在,我们通常是“喜欢” try 住所有内容,生怕发生异常不能捕获的
在 try{}
外获取锁主要考虑两个方面:
- 如果没有获取到锁就抛出异常,最终释放锁肯定是有问题的,因为还未曾拥有锁谈何释放锁呢
- 如果在获取锁时抛出了异常,也就是当前线程并未获取到锁,但执行到 finally 代码时,如果恰巧别的线程获取到了锁,则会被释放掉(无故释放)
不同锁的实现方式略有不同,范式的存在就是要避免一切问题的出现,所以大家尽量遵守范式
Lock 是怎样起到锁的作用呢?
如果你熟悉 synchronized,你知道程序编译成 CPU 指令后,在临界区会有 moniterenter
和 moniterexit
指令的出现,可以理解成进出临界区的标识
从范式上来看:
lock.lock()
获取锁,“等同于” synchronized 的 moniterenter指令
lock.unlock()
释放锁,“等同于” synchronized 的 moniterexit 指令
那 Lock 是怎么做到的呢?
这里先简单说明一下,这样一会到源码分析时,你可以远观设计轮廓,近观实现细节,会变得越发轻松
其实很简单,比如在 ReentrantLock 内部维护了一个 volatile 修饰的变量 state,通过 CAS 来进行读写(最底层还是交给硬件来保证原子性和可见性),如果CAS更改成功,即获取到锁,线程进入到 try 代码块继续执行;如果没有更改成功,线程会被【挂起】,不会向下执行
但 Lock 是一个接口,里面根本没有 state 这个变量的存在:
它怎么处理这个 state 呢?很显然需要一点设计的加成了,接口定义行为,具体都是需要实现类的
Lock 接口的实现类基本都是通过【聚合】了一个【队列同步器】的子类完成线程访问控制的
那什么是队列同步器呢? (这应该是你见过的最强标题党,聊了半个世纪才入正题,评论区留言骂我)
队列同步器 AQS
队列同步器 (AbstractQueuedSynchronizer),简称同步器或AQS,就是我们今天的主人公
问:为什么你分析 JUC 源码,要从 AQS 说起呢?答:看下图
相信看到这个截图你就明白一二了,你听过的,面试常被问起的,工作中常用的
ReentrantLock
ReentrantReadWriteLock
Semaphore(信号量)
CountDownLatch
公平锁
非公平锁
ThreadPoolExecutor
(关于线程池的理解,可以查看 为什么要使用线程池? )
都和 AQS 有直接关系,所以了解 AQS 的抽象实现,在此基础上再稍稍查看上述各类的实现细节,很快就可以全部搞定,不至于查看源码时一头雾水,丢失主线
上面提到,在锁的实现类中会聚合同步器,然后利同步器实现锁的语义,那么问题来了:
为什么要用聚合模式,怎么进一步理解锁和同步器的关系呢?
我们绝大多数都是在使用锁,实现锁之后,其核心就是要使用方便
从 AQS 的类名称和修饰上来看,这是一个抽象类,所以从设计模式的角度来看同步器一定是基于【模版模式】来设计的,使用者需要继承同步器,实现自定义同步器,并重写指定方法,随后将同步器组合在自定义的同步组件中,并调用同步器的模版方法,而这些模版方法又回调用使用者重写的方法
我不想将上面的解释说的这么抽象,其实想理解上面这句话,我们只需要知道下面两个问题就好了
- 哪些是自定义同步器可重写的方法?
- 哪些是抽象同步器提供的模版方法?
同步器可重写的方法
同步器提供的可重写方法只有5个,这大大方便了锁的使用者:
按理说,需要重写的方法也应该有 abstract 来修饰的,为什么这里没有?原因其实很简单,上面的方法我已经用颜色区分成了两类:
独占式
共享式
自定义的同步组件或者锁不可能既是独占式又是共享式,为了避免强制重写不相干方法,所以就没有 abstract 来修饰了,但要抛出异常告知不能直接使用该方法:
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
暖暖的很贴心(如果你有类似的需求也可以仿照这样的设计)
表格方法描述中所说的同步状态
就是上文提到的有 volatile 修饰的 state,所以我们在重写
上面几个方法时,还要通过同步器提供的下面三个方法(AQS 提供的)来获取或修改同步状态:
而独占式和共享式操作 state 变量的区别也就很简单了
所以你看到的 ReentrantLock
ReentrantReadWriteLock
Semaphore(信号量)
CountDownLatch
这几个类其实仅仅是在实现以上几个方法上略有差别,其他的实现都是通过同步器的模版方法来实现的,到这里是不是心情放松了许多呢?我们来看一看模版方法:
同步器提供的模版方法
上面我们将同步器的实现方法分为独占式和共享式两类,模版方法其实除了提供以上两类模版方法之外,只是多了响应中断
和超时限制
的模版方法供 Lock 使用,来看一下
先不用记上述方法的功能,目前你只需要了解个大概功能就好。另外,相信你也注意到了:
上面的方法都有 final 关键字修饰,说明子类不能重写这个方法
看到这你也许有点乱了,我们稍微归纳一下:
程序员还是看代码心里踏实一点,我们再来用代码说明一下上面的关系(注意代码中的注释,以下的代码并不是很严谨,只是为了简单说明上图的代码实现):
package top.dayarch.myjuc; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.AbstractQueuedSynchronizer; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; /** * 自定义互斥锁 * * @author tanrgyb * @date 2020/5/23 9:33 PM */ public class MyMutex implements Lock { // 静态内部类-自定义同步器 private static class MySync extends AbstractQueuedSynchronizer{ @Override protected boolean tryAcquire(int arg) { // 调用AQS提供的方法,通过CAS保证原子性 if (compareAndSetState(0, arg)){ // 我们实现的是互斥锁,所以标记获取到同步状态(更新state成功)的线程, // 主要为了判断是否可重入(一会儿会说明) setExclusiveOwnerThread(Thread.currentThread()); //获取同步状态成功,返回 true return true; } // 获取同步状态失败,返回 false return false; } @Override protected boolean tryRelease(int arg) { // 未拥有锁却让释放,会抛出IMSE if (getState() == 0){ throw new IllegalMonitorStateException(); } // 可以释放,清空排它线程标记 setExclusiveOwnerThread(null); // 设置同步状态为0,表示释放锁 setState(0); return true; } // 是否独占式持有 @Override protected boolean isHeldExclusively() { return getState() == 1; } // 后续会用到,主要用于等待/通知机制,每个condition都有一个与之对应的条件等待队列,在锁模型中说明过 Condition newCondition() { return new ConditionObject(); } } // 聚合自定义同步器 private final MySync sync = new MySync(); @Override public void lock() { // 阻塞式的获取锁,调用同步器模版方法独占式,获取同步状态 sync.acquire(1); } @Override public void lockInterruptibly() throws InterruptedException { // 调用同步器模版方法可中断式获取同步状态 sync.acquireInterruptibly(1); } @Override public boolean tryLock() { // 调用自己重写的方法,非阻塞式的获取同步状态 return sync.tryAcquire(1); } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { // 调用同步器模版方法,可响应中断和超时时间限制 return sync.tryAcquireNanos(1, unit.toNanos(time)); } @Override public void unlock() { // 释放锁 sync.release(1); } @Override public Condition newCondition() { // 使用自定义的条件 return sync.newCondition(); } }
如果你现在打开 IDE, 你会发现上文提到的 ReentrantLock
ReentrantReadWriteLock
Semaphore(信号量)
CountDownLatch
都是按照这个结构实现,所以我们就来看一看 AQS 的模版方法到底是怎么实现锁
AQS实现分析
从上面的代码中,你应该理解了lock.tryLock()
非阻塞式获取锁就是调用自定义同步器重写的 tryAcquire()
方法,通过 CAS 设置state 状态,不管成功与否都会马上返回;那么 lock.lock() 这种阻塞式的锁是如何实现的呢?
有阻塞就需要排队,实现排队必然需要队列
CLH:Craig、Landin and Hagersten 队列,是一个单向链表,AQS中的队列是CLH变体的虚拟双向队列(FIFO)——概念了解就好,不要记
队列中每个排队的个体就是一个 Node,所以我们来看一下 Node 的结构
Node 节点
AQS 内部维护了一个同步队列,用于管理同步状态。
- 当线程获取同步状态失败时,就会将当前线程以及等待状态等信息构造成一个 Node 节点,将其加入到同步队列中尾部,阻塞该线程
- 当同步状态被释放时,会唤醒同步队列中“首节点”的线程获取同步状态
为了将上述步骤弄清楚,我们需要来看一看 Node 结构 (如果你能打开 IDE 一起看那是极好的)
乍一看有点杂乱,我们还是将其归类说明一下:
上面这几个状态说明有个印象就好,有了Node 的结构说明铺垫,你也就能想象同步队列的接本结构了:
前置知识基本铺垫完毕,我们来看一看独占式获取同步状态的整个过程
独占式获取同步状态
故事要从范式lock.lock() 开始
public void lock() { // 阻塞式的获取锁,调用同步器模版方法,获取同步状态 sync.acquire(1); }
进入AQS的模版方法 acquire()
public final void acquire(int arg) { // 调用自定义同步器重写的 tryAcquire 方法 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
首先,也会尝试非阻塞的获取同步状态,如果获取失败(tryAcquire返回false),则会调用 addWaiter
方法构造 Node 节点(Node.EXCLUSIVE 独占式)并安全的(CAS)加入到同步队列【尾部】
private Node addWaiter(Node mode) { // 构造Node节点,包含当前线程信息以及节点模式【独占/共享】 Node node = new Node(Thread.currentThread(), mode); // 新建变量 pred 将指针指向tail指向的节点 Node pred = tail; // 如果尾节点不为空 if (pred != null) { // 新加入的节点前驱节点指向尾节点 node.prev = pred; // 因为如果多个线程同时获取同步状态失败都会执行这段代码 // 所以,通过 CAS 方式确保安全的设置当前节点为最新的尾节点 if (compareAndSetTail(pred, node)) { // 曾经的尾节点的后继节点指向当前节点 pred.next = node; // 返回新构建的节点 return node; } } // 尾节点为空,说明当前节点是第一个被加入到同步队列中的节点 // 需要一个入队操作 enq(node); return node; } private Node enq(final Node node) { // 通过“死循环”确保节点被正确添加,最终将其设置为尾节点之后才会返回,这里使用 CAS 的理由和上面一样 for (;;) { Node t = tail; // 第一次循环,如果尾节点为 null if (t == null) { // Must initialize // 构建一个哨兵节点,并将头部指针指向它 if (compareAndSetHead(new Node())) // 尾部指针同样指向哨兵节点 tail = head; } else { // 第二次循环,将新节点的前驱节点指向t node.prev = t; // 将新节点加入到队列尾节点 if (compareAndSetTail(t, node)) { // 前驱节点的后继节点指向当前新节点,完成双向队列 t.next = node; return t; } } } }
你可能比较迷惑 enq() 的处理方式,进入该方法就是一个“死循环”,我们就用图来描述它是怎样跳出循环的
有些同学可能会有疑问,为什么会有哨兵节点?
哨兵,顾名思义,是用来解决国家之间边界问题的,不直接参与生产活动。同样,计算机科学中提到的哨兵,也用来解决边界问题,如果没有边界,指定环节,按照同样算法可能会在边界处发生异常,比如要继续向下分析的
acquireQueued()
方法
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; // "死循环",尝试获取锁,或者挂起 for (;;) { // 获取当前节点的前驱节点 final Node p = node.predecessor(); // 只有当前节点的前驱节点是头节点,才会尝试获取锁 // 看到这你应该理解添加哨兵节点的含义了吧 if (p == head && tryAcquire(arg)) { // 获取同步状态成功,将自己设置为头 setHead(node); // 将哨兵节点的后继节点置为空,方便GC p.next = null; // help GC failed = false; // 返回中断标识 return interrupted; } // 当前节点的前驱节点不是头节点 //【或者】当前节点的前驱节点是头节点但获取同步状态失败 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
获取同步状态成功会返回可以理解了,但是如果失败就会一直陷入到“死循环”中浪费资源吗?很显然不是,shouldParkAfterFailedAcquire(p, node)
和 parkAndCheckInterrupt()
就会将线程获取同步状态失败的线程挂起,我们继续向下看
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 获取前驱节点的状态 int ws = pred.waitStatus; // 如果是 SIGNAL 状态,即等待被占用的资源释放,直接返回 true // 准备继续调用 parkAndCheckInterrupt 方法 if (ws == Node.SIGNAL) return true; // ws 大于0说明是CANCELLED状态, if (ws > 0) { // 循环判断前驱节点的前驱节点是否也为CANCELLED状态,忽略该状态的节点,重新连接队列 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 将当前节点的前驱节点设置为设置为 SIGNAL 状态,用于后续唤醒操作 // 程序第一次执行到这返回为false,还会进行外层第二次循环,最终从代码第7行返回 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
到这里你也许有个问题:
这个地方设置前驱节点为 SIGNAL 状态到底有什么作用?
保留这个问题,我们陆续揭晓
如果前驱节点的 waitStatus 是 SIGNAL状态,即 shouldParkAfterFailedAcquire 方法会返回 true ,程序会继续向下执行 parkAndCheckInterrupt
方法,用于将当前线程挂起
private final boolean parkAndCheckInterrupt() { // 线程挂起,程序不会继续向下执行 LockSupport.park(this); // 根据 park 方法 API描述,程序在下述三种情况会继续向下执行 // 1. 被 unpark // 2. 被中断(interrupt) // 3. 其他不合逻辑的返回才会继续向下执行 // 因上述三种情况程序执行至此,返回当前线程的中断状态,并清空中断状态 // 如果由于被中断,该方法会返回 true return Thread.interrupted(); }
被唤醒的程序会继续执行 acquireQueued
方法里的循环,如果获取同步状态成功,则会返回 interrupted = true
的结果
程序继续向调用栈上层返回,最终回到 AQS 的模版方法 acquire
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
你也许会有疑惑:
程序已经成功获取到同步状态并返回了,怎么会有个自我中断呢?
static void selfInterrupt() { Thread.currentThread().interrupt(); }
如果你不能理解中断,强烈建议你回看 Java多线程中断机制