到这里关于获取同步状态我们还遗漏了一条线,acquireQueued 的 finally 代码块如果你仔细看你也许马上就会有疑惑:
到底什么情况才会执行 if(failed) 里面的代码 ?
if (failed) cancelAcquire(node);
这段代码被执行的条件是 failed 为 true,正常情况下,如果跳出循环,failed 的值为false,如果不能跳出循环貌似怎么也不能执行到这里,所以只有不正常的情况才会执行
到这里,也就是会发生异常,才会执行到此处
查看 try 代码块,只有两个方法会抛出异常:
node.processor()
方法
- 自己重写的
tryAcquire()
方法
先看前者:
很显然,这里抛出的异常不是重点,那就以 ReentrantLock 重写的 tryAcquire() 方法为例
另外,上面分析 shouldParkAfterFailedAcquire
方法还对 CANCELLED 的状态进行了判断,那么
什么时候会生成取消状态的节点呢?
答案就在 cancelAcquire
方法中, 我们来看看 cancelAcquire到底怎么设置/处理 CANNELLED 的
private void cancelAcquire(Node node) { // 忽略无效节点 if (node == null) return; // 将关联的线程信息清空 node.thread = null; // 跳过同样是取消状态的前驱节点 Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; // 跳出上面循环后找到前驱有效节点,并获取该有效节点的后继节点 Node predNext = pred.next; // 将当前节点的状态置为 CANCELLED node.waitStatus = Node.CANCELLED; // 如果当前节点处在尾节点,直接从队列中删除自己就好 if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { int ws; // 1. 如果当前节点的有效前驱节点不是头节点,也就是说当前节点不是头节点的后继节点 if (pred != head && // 2. 判断当前节点有效前驱节点的状态是否为 SIGNAL ((ws = pred.waitStatus) == Node.SIGNAL || // 3. 如果不是,尝试将前驱节点的状态置为 SIGNAL (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && // 判断当前节点有效前驱节点的线程信息是否为空 pred.thread != null) { // 上述条件满足 Node next = node.next; // 将当前节点有效前驱节点的后继节点指针指向当前节点的后继节点 if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { // 如果当前节点的前驱节点是头节点,或者上述其他条件不满足,就唤醒当前节点的后继节点 unparkSuccessor(node); } node.next = node; // help GC }
看到这个注释你可能有些乱了,其核心目的就是从等待队列中移除 CANCELLED 的节点,并重新拼接整个队列,总结来看,其实设置 CANCELLED 状态节点只是有三种情况,我们通过画图来分析一下:
至此,获取同步状态的过程就结束了,我们简单的用流程图说明一下整个过程
获取锁的过程就这样的结束了,先暂停几分钟整理一下自己的思路。我们上面还没有说明 SIGNAL 的作用, SIGNAL 状态信号到底是干什么用的?这就涉及到锁的释放了,我们来继续了解,整体思路和锁的获取是一样的, 但是释放过程就相对简单很多了
独占式释放同步状态
故事要从 unlock() 方法说起
public void unlock() { // 释放锁 sync.release(1); }
调用 AQS 模版方法 release,进入该方法
public final boolean release(int arg) { // 调用自定义同步器重写的 tryRelease 方法尝试释放同步状态 if (tryRelease(arg)) { // 释放成功,获取头节点 Node h = head; // 存在头节点,并且waitStatus不是初始状态 // 通过获取的过程我们已经分析了,在获取的过程中会将 waitStatus的值从初始状态更新成 SIGNAL 状态 if (h != null && h.waitStatus != 0) // 解除线程挂起状态 unparkSuccessor(h); return true; } return false; }
查看 unparkSuccessor 方法,实际是要唤醒头节点的后继节点
private void unparkSuccessor(Node node) { // 获取头节点的waitStatus int ws = node.waitStatus; if (ws < 0) // 清空头节点的waitStatus值,即置为0 compareAndSetWaitStatus(node, ws, 0); // 获取头节点的后继节点 Node s = node.next; // 判断当前节点的后继节点是否是取消状态,如果是,需要移除,重新连接队列 if (s == null || s.waitStatus > 0) { s = null; // 从尾节点向前查找,找到队列第一个waitStatus状态小于0的节点 for (Node t = tail; t != null && t != node; t = t.prev) // 如果是独占式,这里小于0,其实就是 SIGNAL if (t.waitStatus <= 0) s = t; } if (s != null) // 解除线程挂起状态 LockSupport.unpark(s.thread); }
有同学可能有疑问:
为什么这个地方是从队列尾部向前查找不是 CANCELLED 的节点?
原因有两个:
第一,先回看节点加入队列的情景:
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
节点入队并不是原子操作,代码第6、7行
node.prev = pred; compareAndSetTail(pred, node)
这两个地方可以看作是尾节点入队的原子操作,如果此时代码还没执行到 pred.next = node; 这时又恰巧执行了unparkSuccessor方法,就没办法从前往后找了,因为后继指针还没有连接起来,所以需要从后往前找
第二点原因,在上面图解产生 CANCELLED 状态节点的时候,先断开的是 Next 指针,Prev指针并未断开,因此这也是必须要从后往前遍历才能够遍历完全部的Node
同步状态至此就已经成功释放了,之前获取同步状态被挂起的线程就会被唤醒,继续从下面代码第 3 行返回执行:
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
继续返回上层调用栈, 从下面代码15行开始执行,重新执行循环,再次尝试获取同步状态
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
到这里,关于独占式获取/释放锁的流程已经闭环了,但是关于 AQS 的另外两个模版方法还没有介绍
响应中断
超时限制
独占式响应中断获取同步状态
故事要从lock.lockInterruptibly() 方法说起
public void lockInterruptibly() throws InterruptedException { // 调用同步器模版方法可中断式获取同步状态 sync.acquireInterruptibly(1); }
有了前面的理解,理解独占式可响应中断的获取同步状态方式,真是一眼就能明白了:
public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 尝试非阻塞式获取同步状态失败,如果没有获取到同步状态,执行代码7行 if (!tryAcquire(arg)) doAcquireInterruptibly(arg); }
继续查看 doAcquireInterruptibly
方法:
private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 获取中断信号后,不再返回 interrupted = true 的值,而是直接抛出 InterruptedException throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
没想到 JDK 内部也有如此相近的代码,可响应中断获取锁没什么深奥的,就是被中断抛出 InterruptedException 异常(代码第17行),这样就逐层返回上层调用栈捕获该异常进行下一步操作了
趁热打铁,来看看另外一个模版方法:
独占式超时限制获取同步状态
这个很好理解,就是给定一个时限,在该时间段内获取到同步状态,就返回 true, 否则,返回 false。好比线程给自己定了一个闹钟,闹铃一响,线程就自己返回了,这就不会使自己是阻塞状态了
既然涉及到超时限制,其核心逻辑肯定是计算时间间隔,因为在超时时间内,肯定是多次尝试获取锁的,每次获取锁肯定有时间消耗,所以计算时间间隔的逻辑就像我们在程序打印程序耗时 log 那么简单
nanosTimeout = deadline - System.nanoTime()
故事要从 lock.tryLock(time, unit)
方法说起
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { // 调用同步器模版方法,可响应中断和超时时间限制 return sync.tryAcquireNanos(1, unit.toNanos(time)); }
来看 tryAcquireNanos 方法
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); }
是不是和上面 acquireInterruptibly
方法长相很详细了,继续查看来 doAcquireNanos 方法,看程序, 该方法也是 throws InterruptedException,我们在中断文章中说过,方法标记上有 throws InterruptedException
说明该方法也是可以响应中断的,所以你可以理解超时限制
是 acquireInterruptibly
方法的加强版,具有超时和非阻塞控制的双保险
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { // 超时时间内,为获取到同步状态,直接返回false if (nanosTimeout <= 0L) return false; // 计算超时截止时间 final long deadline = System.nanoTime() + nanosTimeout; // 以独占方式加入到同步队列中 final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return true; } // 计算新的超时时间 nanosTimeout = deadline - System.nanoTime(); // 如果超时,直接返回 false if (nanosTimeout <= 0L) return false; if (shouldParkAfterFailedAcquire(p, node) && // 判断是最新超时时间是否大于阈值 1000 nanosTimeout > spinForTimeoutThreshold) // 挂起线程 nanosTimeout 长时间,时间到,自动返回 LockSupport.parkNanos(this, nanosTimeout); if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
上面的方法应该不是很难懂,但是又同学可能在第 27 行上有所困惑
为什么 nanosTimeout 和 自旋超时阈值1000进行比较?
/** * The number of nanoseconds for which it is faster to spin * rather than to use timed park. A rough estimate suffices * to improve responsiveness with very short timeouts. */ static final long spinForTimeoutThreshold = 1000L;
其实 doc 说的很清楚,说白了,1000 nanoseconds 时间已经非常非常短暂了,没必要再执行挂起和唤醒操作了,不如直接当前线程直接进入下一次循环
到这里,我们自定义的 MyMutex 只差 Condition 没有说明了,不知道你累了吗?我还在坚持
Condition
如果你看过之前写的 并发编程之等待通知机制 ,你应该对下面这个图是有印象的:
如果当时你理解了这个模型,再看 Condition 的实现,根本就不是问题了,首先 Condition 还是一个接口,肯定也是需要有实现类的
那故事就从 lock.newnewCondition
说起吧
public Condition newCondition() { // 使用自定义的条件 return sync.newCondition(); }
自定义同步器重封装了该方法:
Condition newCondition() { return new ConditionObject(); }
ConditionObject 就是 Condition 的实现类,该类就定义在了 AQS 中,只有两个成员变量:
/** First node of condition queue. */ private transient Node firstWaiter; /** Last node of condition queue. */ private transient Node lastWaiter;
所以,我们只需要来看一下 ConditionObject 实现的 await / signal 方法来使用这两个成员变量就可以了
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 同样构建 Node 节点,并加入到等待队列中 Node node = addConditionWaiter(); // 释放同步状态 int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { // 挂起当前线程 LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
这里注意用词,在介绍获取同步状态时,addWaiter 是加入到【同步队列】,就是上图说的入口等待队列,这里说的是【等待队列】,所以 addConditionWaiter 肯定是构建了一个自己的队列:
private Node addConditionWaiter() { Node t = lastWaiter; if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } // 新构建的节点的 waitStatus 是 CONDITION,注意不是 0 或 SIGNAL 了 Node node = new Node(Thread.currentThread(), Node.CONDITION); // 构建单向同步队列 if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
这里有朋友可能会有疑问:
为什么这里是单向队列,也没有使用CAS 来保证加入队列的安全性呢?
因为 await 是 Lock 范式 try 中使用的,说明已经获取到锁了,所以就没必要使用 CAS 了,至于是单向,因为这里还不涉及到竞争锁,只是做一个条件等待队列
在 Lock 中可以定义多个条件,每个条件都会对应一个 条件等待队列,所以将上图丰富说明一下就变成了这个样子:
线程已经按相应的条件加入到了条件等待队列中,那如何再尝试获取锁呢?signal / signalAll 方法就已经排上用场了
public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); }
Signal 方法通过调用 doSignal 方法,只唤醒条件等待队列中的第一个节点
private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; // 调用该方法,将条件等待队列的线程节点移动到同步队列中 } while (!transferForSignal(first) && (first = firstWaiter) != null); }
继续看 transferForSignal
方法
final boolean transferForSignal(Node node) { if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; // 重新进行入队操作 Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) // 唤醒同步队列中该线程 LockSupport.unpark(node.thread); return true; }
所以我们再用图解一下唤醒的整个过程
到这里,理解 signalAll 就非常简单了,只不过循环判断是否还有 nextWaiter,如果有就像 signal 操作一样,将其从条件等待队列中移到同步队列中
private void doSignalAll(Node first) { lastWaiter = firstWaiter = null; do { Node next = first.nextWaiter; first.nextWaiter = null; transferForSignal(first); first = next; } while (first != null); }
不知你还是否记得,我在并发编程之等待通知机制 中还说过一句话
没有特殊原因尽量用 signalAll 方法
什么时候可以用 signal 方法也在其中做了说明,请大家自行查看吧
这里我还要多说一个细节,从条件等待队列移到同步队列是有时间差的,所以使用 await() 方法也是范式的, 同样在该文章中做了解释
有时间差,就会有公平和不公平的问题,想要全面了解这个问题,我们就要走近 ReentrantLock 中来看了,除了了解公平/不公平问题,查看 ReentrantLock 的应用还是要反过来验证它使用的AQS的,我们继续吧
ReentrantLock 是如何应用的AQS
独占式的典型应用就是 ReentrantLock 了,我们来看看它是如何重写这个方法的
乍一看挺奇怪的,怎么里面自定义了三个同步器:其实 NonfairSync,FairSync 只是对 Sync 做了进一步划分:
从名称上你应该也知道了,这就是你听到过的 公平锁/非公平锁
了
何为公平锁/非公平锁?
生活中,排队讲求先来后到视为公平。程序中的公平性也是符合请求锁的绝对时间的,其实就是 FIFO,否则视为不公平
我们来对比一下 ReentrantLock 是如何实现公平锁和非公平锁的
其实没什么大不了,公平锁就是判断同步队列是否还有先驱节点的存在,只有没有先驱节点才能获取锁;而非公平锁是不管这个事的,能获取到同步状态就可以,就这么简单,那问题来了:
为什么会有公平锁/非公平锁的设计?
考虑这个问题,我们需重新回忆上面的锁获取实现图了,其实上面我已经透露了一点
主要有两点原因:
原因一:
恢复挂起的线程到真正锁的获取还是有时间差的,从人类的角度来看这个时间微乎其微,但是从CPU的角度来看,这个时间差存在的还是很明显的。所以非公平锁能更充分的利用 CPU 的时间片,尽量减少 CPU 空闲状态时间
原因二:
不知你是否还记得我在 面试问,创建多少个线程合适? 文章中反复提到过,使用多线程很重要的考量点是线程切换的开销,想象一下,如果采用非公平锁,当一个线程请求锁获取同步状态,然后释放同步状态,因为不需要考虑是否还有前驱节点,所以刚释放锁的线程在此刻再次获取同步状态的几率就变得非常大,所以就减少了线程的开销
相信到这里,你也就明白了,为什么 ReentrantLock 默认构造器用的是非公平锁同步器
public ReentrantLock() { sync = new NonfairSync(); }
看到这里,感觉非公平锁 perfect,非也,有得必有失
使用公平锁会有什么问题?
公平锁保证了排队的公平性,非公平锁霸气的忽视这个规则,所以就有可能导致排队的长时间在排队,也没有机会获取到锁,这就是传说中的 “饥饿”
如何选择公平锁/非公平锁?
相信到这里,答案已经在你心中了,如果为了更高的吞吐量,很显然非公平锁是比较合适的,因为节省很多线程切换时间,吞吐量自然就上去了,否则那就用公平锁还大家一个公平
我们还差最后一个环节,真的要挺住
可重入锁
到这里,我们还没分析 ReentrantLock 的名字,JDK 起名这么有讲究,肯定有其含义,直译过来【可重入锁】
为什么要支持锁的重入?
试想,如果是一个有 synchronized 修饰的递归调用方法,程序第二次进入被自己阻塞了岂不是很大的笑话,所以 synchronized 是支持锁的重入的
Lock 是新轮子,自然也要支持这个功能,其实现也很简单,请查看公平锁和非公平锁对比图,其中有一段代码:
// 判断当前线程是否和已占用锁的线程是同一个 else if (current == getExclusiveOwnerThread())
仔细看代码, 你也许发现,我前面的一个说明是错误的,我要重新解释一下
重入的线程会一直将 state + 1, 释放锁会 state - 1直至等于0,上面这样写也是想帮助大家快速的区分
总结
本文是一个长文,说明了为什么要造 Lock 新轮子,如何标准的使用 Lock,AQS 是什么,是如何实现锁的,结合 ReentrantLock 反推 AQS 中的一些应用以及其独有的一些特性
独占式获取锁就这样介绍完了,我们还差 AQS 共享式 xxxShared
没有分析,结合共享式,接下来我们来阅读一下 Semaphore,ReentrantReadWriteLock 和 CountLatch 等
最后,也欢迎大家的留言,如有错误之处还请指出。我的手酸了,眼睛干了,我去准备撸下一篇.....