前言
AbstractQueuedSynchronizer 抽象队列同步器,简称 AQS 。是在 JUC 包下面一个非常重要的基础组件,JUC 包下面的并发锁 ReentrantLock CountDownLatch 等都是基于 AQS 实现的。所以想进一步研究锁的底层原理,非常有必要先了解 AQS 的原理。
介绍
先看下 AQS 的类图结构,以及源码注释,有一定的大概了解之后再从源码入手,一步一步研究它的底层原理。
" 源码注释
提供了实现阻塞锁和相关同步器依靠先入先出(FIFO)等待队列(信号量,事件等)的框架。 此类中设计了一个对大多数基于 AQS 的同步器有用的原子变量来表示状态(state)。 子类必须定义 protected 方法来修改这个 state,并且定义 state 值在对象中的具体含义是 acquired 或 released。 考虑到这些,在这个类中的其他方法可以实现所有排队和阻塞机制。 子类可以保持其他状态字段,但只能使用方法 getState 、setState 和 compareAndSetState 以原子方式更新 state 。
子类应被定义为用于实现其封闭类的同步性能的非公共内部辅助类。 类AbstractQueuedSynchronizer没有实现任何同步接口。 相反,它定义了一些方法,如 acquireInterruptibly 可以通过具体的锁和相关同步器来调用适当履行其公共方法。
此类支持独占模式和共享模式。 在独占模式下,其他线程不能获取成功,共享模式下可以(但不一定)获取成功。 此类不“理解”,在机械意义上这些不同的是,当共享模式获取成功,则下一个等待的线程(如果存在)也必须确定它是否能够获取。 线程在不同模式下的等待共享相同的FIFO队列。 通常情况下,实现子类只支持其中一种模式,但同时使用两种模式也可以,例如ReadWriteLock 。 仅共享模式不需要定义支持未使用的模式的方法的子类。
这个类中定义了嵌套类 AbstractQueuedSynchronizer.ConditionObject ,可用于作为一个 Condition 由子类实现,并使用 isHeldExclusively 方法说明当前线程是否以独占方式进行,release()、 getState() acquire() 方法用于操作 state 原子变量。
此类提供检查和监视内部队列的方法,以及类似方法的条件对象。 根据需要进使用以用于它们的同步机制。
要使用这个类用作同步的基础上,需要重新定义以下方法,如使用,通过检查和或修改 getState 、setState 或 compareAndSetState 方法:
tryAcquire tryRelease tryAcquireShared tryReleaseShared isHeldExclusively
"
通过上面的注释可以得出大概的印象:
- 内部依靠先入先出(FIFO) 等待队列。
- 存在 state 表示状态信息。state 值只能用 getState 、setState 和 compareAndSetState 方法以原子方式更新。
- 支持独占模式和共享模式,但具体需要子类实现具体支持哪种模式。
- 嵌套 AbstractQueuedSynchronizer.ConditionObject 可以作为 Condition 由子类实现。
- 子类需要重新定义 tryAcquire、tryRelease、tryAcquireShared、tryReleaseShared、isHeldExclusively 方法。
队列节点 Node
Node节点,包含以下元素:
元素 | 含义 |
prev | 上一个节点 |
next | 下一个节点 |
thread | 持有线程 |
waitStatus | 节点状态 |
nextWaiter | 下一个处于 CONDITION 状态的节点 |
组合成等待队列则如下:
下面是等待队列节点的 Node 属性:
static final class Node { // 节点正在共享模式下等待的标记 static final Node SHARED = new Node(); // 指示节点正在以独占模式等待的标记 static final Node EXCLUSIVE = null; // 指示线程已取消 static final int CANCELLED = 1; // 指示后续线程需要唤醒 static final int SIGNAL = -1; // 指示线程正在等待条件 static final int CONDITION = -2; // 指示下一次acquireShared应该无条件传播 static final int PROPAGATE = -3; /** * 状态字段,仅使用以下值 * SIGNAL -1 :当前节点释放或者取消时,必须 unpark 他的后续节点。 * CANCELLED 1 :由于超时(timeout)或中断(interrupt),该节点被取消。节点永远不会离开此状态。特别是,具有取消节点的线程永远不会再次阻塞。 * CONDITION -2 :该节点目前在条件队列。 但它不会被用作同步队列节点,直到转移,转移时的状态将被设置为 0 。 * PROPAGATE -3 :releaseShared 应该被传播到其他节点。 * 0:都不是 * 值以数字表示以简化使用,大多数时候可以检查符号(是否大于0)以简化使用 */ volatile int waitStatus; // 上一个节点 volatile Node prev; // 下一个节点 volatile Node next; // 节点持有线程 volatile Thread thread; // 链接下一个等待条件节点,或特殊值共享 Node nextWaiter; // 节点是否处于 共享状态 是, 返回 true final boolean isShared() { return nextWaiter == SHARED; } // 返回前一个节点, 使用时 前一个节点不能为空 final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }
在 Node 节点中需要重点关注 waitStatus
- 默认状态为 0;
- waitStatus > 0 (CANCELLED 1) 说明该节点超时或者中断了,需要从队列中移除;
- waitStatus = -1 SIGNAL 当前线程的前一个节点的状态为 SIGNAL,则当前线程需要阻塞(unpark);
- waitStatus = -2 CONDITION -2 :该节点目前在条件队列;
- waitStatus = -3 PROPAGATE -3 :releaseShared 应该被传播到其他节点,在共享锁模式下使用。
了解完 Node 的结构之后,再了解下 AQS 结构,并从常用方法入手,逐步了解具体实现逻辑。
AbstractQueuedSynchronizer
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { // 等待队列的头,延迟初始化。 除了初始化,它是仅经由方法setHead修改。 注意:如果头存在,其waitStatus保证不会是 CANCELLED 状态 private transient volatile Node head; // 等待队列的尾部,延迟初始化。 仅在修改通过方法ENQ添加新节点等待。 private transient volatile Node tail; // 同步状态 private volatile int state; // 获取状态 protected final int getState() { return state; } // 设置状态值 protected final void setState(int newState) { state = newState; } // 原子更新状态值 protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); } }
在 AQS 中主要参数为:
参数 | 含义 |
head | 等待队列头 |
tail | 等待队列尾 |
state | 同步状态 |
通过注释了解到,在 AQS 里主要分为两种操作模式,分别是:独占模式、共享模式,下面分别从两个不同的角度去分析源码。
操作 | 含义 |
acquire | 以独占模式获取,忽略中断。 通过调用至少一次实施tryAcquire ,在成功时返回。 否则,线程排队,可能重复查封和解封,调用tryAcquire直到成功为止。 这种方法可以用来实现方法Lock.lock 。 |
release | 以独占模式释放。 通过疏通一个或多个线程,如果实现tryRelease返回true。 这种方法可以用来实现方法Lock.unlock 。 |
acquireShared | 获取在共享模式下,忽略中断。 通过至少一次第一调用实现tryAcquireShared ,在成功时返回。 否则,线程排队,可能重复查封和解封,调用tryAcquireShared直到成功为止。 |
releaseShared | 以共享模式释放。 通过疏通一个或多个线程,如果实现tryReleaseShared返回true。 |
无论是共享模式还是独占模式在这里面都会用到 addWaiter 方法,将当前线程及模式创建排队节点。
独占模式
获取独占资源 acquire
public final void acquire(int arg) { // tryAcquire 尝试获取 state,获取失败则会加入到队列 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
在独占模式下会尝试获取 state,当获取失败时会调用 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)。
- tryAcquire(arg),尝试获取 state 这块由子类自己实现,不同的子类逻辑不同,这块在介绍子类代码时会说明。
- 获取 state 失败后,会进行 acquireQueued(addWaiter(Node.EXCLUSIVE), arg),这块代码可以拆分为两块:addWaiter(Node.EXCLUSIVE),acquireQueued(node, arg)。
- addWaiter(Node.EXCLUSIVE) 返回的是当前新创建的节点。
- acquireQueued(node, arg) 线程获取锁失败,使用 addWaiter(Node.EXCLUSIVE) 放入等待队列,而 acquireQueued(node, arg) 使用循环,不断的为队列中的节点去尝试获取资源,直到获取成功或者被中断。
总结获取资源主要分为三步:
- 尝试获取资源
- 入队列
- 出队列
尝试获取资源 tryAcquire(arg)
,由子类实现,那下面则着手分别分析 入队列
、出队列
。
入队列:addWaiter(Node.EXCLUSIVE)
使用 addWaiter(Node.EXCLUSIVE)
方法将节点插入到队列中,步骤如下:
- 根据传入的模式创建节点
- 判断尾节点是否存在,不存在则需要使用
enq(node)
方法初始化节点,存在则直接尝试
插入尾部。 尝试
插入尾部时使用 CAS 插入,防止并发情况,如果插入失败,会调用enq(node)
自旋直到插入。
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // 定位到队列末尾的 node Node pred = tail; if (pred != null) { // 新节点的上一个节点 指向尾节点 node.prev = pred; // 使用 CAS 设置尾节点,tail 如果等于 pred 则更新为 node if (compareAndSetTail(pred, node)) { // 更新成功则将 pred 的下一个节点指向 node pred.next = node; return node; } } // 尾节点没有初始化,或竞争失败,自旋 enq(node); return node; } /** * tailOffset 也就是成员变量 tail 的值 * 此处相当于:比较 tail 的值和 expect 的值是否相等, 相等则更新为 update */ private final boolean compareAndSetTail(Node expect, Node update) { return unsafe.compareAndSwapObject(this, tailOffset, expect, update); } private final boolean compareAndSetHead(Node update) { return unsafe.compareAndSwapObject(this, headOffset, null, update); } private Node enq(final Node node) { for (;;) { Node t = tail; // 尾节点为空 需要初始化头节点,此时头尾节点是一个 if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { // 不为空 循环赋值 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
看完代码和注释肯定还是有点模糊,现在用图一步一步进行说明。
因为根据初始尾节点是否为空
分为两种情况,这里使用两幅图:
- 第一幅为第一次添加节点,此时 head 会延迟初始化;
- 第二幅图为已经存在队列,进行插入节点;
- 注意看代码,enq 方法返回的是
之前的尾节点
; - addWaiter 方法 返回的是
当前插入的新创建的节点
。
节点添加到队列之后,返回当前节点,而下一步则需要调用方法 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
不断的去获取资源。
出队列:acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
方法会通过循环不断尝试获取拿到资源,直到成功。代码如下:
final boolean acquireQueued(final Node node, int arg) { // 是否拿到资源 boolean failed = true; try { // 中断状态 boolean interrupted = false; // 无限循环 for (;;) { // 当前节点之前的节点 final Node p = node.predecessor(); // 前一个节点是头节点, 说明当前节点是 头节点的 next 即真实的第一个数据节点 (因为 head 是虚拟节点) // 然后再尝试获取资源 if (p == head && tryAcquire(arg)) { // 获取成功之后 将头指针指向当前节点 setHead(node); p.next = null; // help GC failed = false; return interrupted; } // p 不是头节点, 或者 头节点未能获取到资源 (非公平情况下被别的节点抢占) // 判断 node 是否要被阻塞, if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
- 不断获取本节点的上一个节点是否为 head,因为 head 是虚拟节点,如果当前节点的上一个节点是 head 节点,则当前节点为
第一个数据节点
; - 第一个数据节点不断的去获取资源,获取成功,则将 head 指向当前节点;
- 当前节点不是头节点,或者
tryAcquire(arg)
失败(失败可能是非公平锁)。这时候需要判断前一个节点状态决定当前节点是否要被阻塞
(前一个节点状态是否为 SIGNAL)。
/** * 根据上一个节点的状态,判断当前线程是否应该被阻塞 * SIGNAL -1 :当前节点释放或者取消时,必须 unpark 他的后续节点。 * CANCELLED 1 :由于超时(timeout)或中断(interrupt),该节点被取消。节点永远不会离开此状态。特别是,具有取消节点的线程永远不会再次阻塞。 * CONDITION -2 :该节点目前在条件队列。 但它不会被用作同步队列节点,直到转移,转移时的状态将被设置为 0 。 * PROPAGATE -3 :releaseShared 应该被传播到其他节点。 * 0:都不是 * */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 前一个节点的等待状态 int ws = pred.waitStatus; // 前一个节点需要 unpark 后续节点 if (ws == Node.SIGNAL) return true; // 当前节点处于取消状态 if (ws > 0) { do { // 将取消的节点从队列中移除 node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 设置前一个节点为 SIGNAL 状态 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
在 shouldParkAfterFailedAcquire
方法中,会判断前一个节点的状态,同时取消在队列中当前节点前面无效的节点。
再继续阅读 出队列 acquireQueued 方法,发现有一个 finally 会判断状态后执行 cancelAcquire(node);
,也就是上面流程图中下面的红色方块。
cancelAcquire(Node node)
final boolean acquireQueued(final Node node, int arg) { // 是否拿到资源 boolean failed = true; try { // 省略 // 在 finally 会将当前节点置为取消状态 } finally { if (failed) cancelAcquire(node); } } private void cancelAcquire(Node node) { // 节点不存在 直接返回 if (node == null) return; // 取消节点关联线程 node.thread = null; //跳过已经取消的节点,获取当前节点之前的有效节点 Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; // 获取当前节点之前的有效节点的下一个节点 Node predNext = pred.next; // 当前节点设置为取消 node.waitStatus = Node.CANCELLED; // 当前节点如果是尾节点,则将最后一个有效节点设置为尾节点,并将 predNext 设置为空 if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { int ws; // pred 不是头节点(node 的上一个有效节点 不是 head) && ( pred的状态是 SIGNAL || pred 的状态设置为 SIGNAL 成功 ) && pred 的绑定线程不为空 if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { // 当前节点的后继节点 Node next = node.next; // 后继节点不为空 且 状态有效 将 pred 的 后继节点设置为 当前节点的后继节点 if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { // node 的上一个有效节点 是 head, 或者其他情况 唤醒当前节点的下一个有效节点 unparkSuccessor(node); } node.next = node; // help GC } } private void unparkSuccessor(Node node) { // 判断当前节点状态 int ws = node.waitStatus; if (ws < 0) // 将节点状态更新为 0 compareAndSetWaitStatus(node, ws, 0); // 下一个节点, 一般是下一个节点应该就是需要唤醒的节点,即颁发证书。 Node s = node.next; // 大于 0 CANCELLED : 线程已取消 // 但是有可能 后继节点 为空或者被取消了。 if (s == null || s.waitStatus > 0) { s = null; // 从尾节点开始遍历,直到定位到 t.waitStatus <= 0 的节点 // 定位到后并不会停止,会继续执行,相当于找到最开始的那个需要唤醒的节点 // t.waitStatus <= 0 : SIGNAL( -1 后续线程需要释放) // CONDITION ( -2 线程正在等待条件) // PROPAGATE ( -3 releaseShared 应该被传播到其他节点) for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } // 定位到需要唤醒的节点后 进行 unpark if (s != null) LockSupport.unpark(s.thread); }
流程分析:
- 找到当前节点的前一个非无效节点 pred;
- 当前节点如果是尾节点,则将最后一个有效节点设置为尾节点,并将 predNext 设置为空;
- pred 不是头节点 && ( pred的状态是 SIGNAL || pred 的状态设置为 SIGNAL 成功 ) && pred 的绑定线程不为空;
- 其他情况。
下面分别画图:
Q: 通过图可以看出来,只操作了 next 指针,但是没有操作 prev 指针,这是为什么呢?
A: 在 出队列:acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
方法中,shouldParkAfterFailedAcquire
方法会判断前一个节点的状态,同时取消在队列中当前节点前面无效的节点。这时候会移除之前的无效节点,此处也是为了防止指向一个已经被移除的节点。同时保证 prev 的稳定,有利于从 tail 开始遍历列表,这块在 unparkSuccessor(node);
中也可以看到是从后往前表里列表。
Q: unparkSuccessor(Node node) 为什么从后往前遍历?
A:
在 addWaiter(Node.EXCLUSIVE)
插入新节点时,使用的是 尾插法
,看红框部分,此时有可能还未指向next。
Q: node.next = node; 这块导致 head不是指向最新节点,链表不就断了么?A: acquireQueued 方法介绍中,里面有个循环,会不断尝试获取资源,成功之后会设置为 head。并且在 shouldParkAfterFailedAcquire 中也会清除当前节点前的无效节点。
释放独占资源 release
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
以独占模式释放。 通过释放一个或多个线程,如果实现tryRelease返回true。 这种方法可以用来实现方法Lock.unlock 。
- tryRelease(arg) 操作释放资源,同样是由子类实现,后面介绍子类时会进行说明。返回 true 说明资源现在已经没有线程持有了,其他节点可以尝试获取;
- 释放成功,且 head != null && h.waitStatus != 0, 会继续执行 unparkSuccessor(h);
- 这块会看到 只要 tryRelease(arg) 操作释放资源成功, 后面无论执行是否成功,都会返回 true,unparkSuccessor(h) 相当于只是附加操作。