1.简介
AbstractQueuedSynchronizer (抽象队列同步器,以下简称 AQS)出现在 JDK 1.5 中,由大师 Doug Lea 所创作。AQS 是很多同步器的基础框架,比如 ReentrantLock、CountDownLatch 和 Semaphore 等都是基于 AQS 实现的。除此之外,我们还可以基于 AQS,定制出我们所需要的同步器。
AQS 的使用方式通常都是通过内部类继承 AQS 实现同步功能,通过继承 AQS,可以简化同步器的实现。如前面所说,AQS 是很多同步器实现的基础框架。弄懂 AQS 对理解 Java 并发包里的组件大有裨益,这也是我学习 AQS 并写出这篇文章的缘由。另外,需要说明的是,AQS 本身并不是很好理解,细节很多。在看的过程中药有一定的耐心,做好看多遍的准备。好了,其他的就不多说了,开始进入正题吧。
2.原理概述
在 AQS 内部,通过维护一个FIFO 队列
来管理多线程的排队工作。在公平竞争的情况下,无法获取同步状态的线程将会被封装成一个节点,置于队列尾部。入队的线程将会通过自旋的方式获取同步状态,若在有限次的尝试后,仍未获取成功,线程则会被阻塞住。大致示意图如下:
当头结点释放同步状态后,且后继节点对应的线程被阻塞,此时头结点
线程将会去唤醒后继节点线程。后继节点线程恢复运行并获取同步状态后,会将旧的头结点从队列中移除,并将自己设为头结点。大致示意图如下:
3.重要方法介绍
本节将介绍三组重要的方法,通过使用这三组方法即可实现一个同步组件。
第一组方法是用于访问/设置同步状态的,如下:
方法 | 说明 |
---|---|
int getState() | 获取同步状态 |
void setState() | 设置同步状态 |
boolean compareAndSetState(int expect, int update) | 通过 CAS 设置同步状态 |
第二组方需要由同步组件覆写。如下:
方法 | 说明 |
---|---|
boolean tryAcquire(int arg) | 独占式获取同步状态 |
boolean tryRelease(int arg) | 独占式释放同步状态 |
int tryAcquireShared(int arg) | 共享式获取同步状态 |
boolean tryReleaseShared(int arg) | 共享式私房同步状态 |
boolean isHeldExclusively() | 检测当前线程是否获取独占锁 |
第三组方法是一组模板方法,同步组件可直接调用。如下:
方法 | 说明 |
---|---|
void acquire(int arg) | 独占式获取同步状态,该方法将会调用 tryAcquire 尝试获取同步状态。获取成功则返回,获取失败,线程进入同步队列等待。 |
void acquireInterruptibly(int arg) | 响应中断版的 acquire |
boolean tryAcquireNanos(int arg,long nanos) | 超时+响应中断版的 acquire |
void acquireShared(int arg) | 共享式获取同步状态,同一时刻可能会有多个线程获得同步状态。比如读写锁的读锁就是就是调用这个方法获取同步状态的。 |
void acquireSharedInterruptibly(int arg) | 响应中断版的 acquireShared |
boolean tryAcquireSharedNanos(int arg,long nanos) | 超时+响应中断版的 acquireShared |
boolean release(int arg) | 独占式释放同步状态 |
boolean releaseShared(int arg) | 共享式释放同步状态 |
上面列举了一堆方法,看似繁杂。但稍微理一下,就会发现上面诸多方法无非就两大类:一类是独占式获取和释放共享状态,另一类是共享式获取和释放同步状态。至于这两类方法的实现细节,我会在接下来的章节中讲到,继续往下看吧。
4.源码分析
4.1 节点结构
在并发的情况下,AQS 会将未获取同步状态的线程将会封装成节点,并将其放入同步队列尾部。同步队列中的节点除了要保存线程,还要保存等待状态。不管是独占式还是共享式,在获取状态失败时都会用到节点类。所以这里我们要先看一下节点类的实现,为后面的源码分析进行简单铺垫。源码如下:
static final class Node {
/** 共享类型节点,标记节点在共享模式下等待 */
static final Node SHARED = new Node();
/** 独占类型节点,标记节点在独占模式下等待 */
static final Node EXCLUSIVE = null;
/** 等待状态 - 取消 */
static final int CANCELLED = 1;
/**
* 等待状态 - 通知。某个节点是处于该状态,当该节点释放同步状态后,
* 会通知后继节点线程,使之可以恢复运行
*/
static final int SIGNAL = -1;
/** 等待状态 - 条件等待。表明节点等待在 Condition 上 */
static final int CONDITION = -2;
/**
* 等待状态 - 传播。表示无条件向后传播唤醒动作,详细分析请看第五章
*/
static final int PROPAGATE = -3;
/**
* 等待状态,取值如下:
* SIGNAL,
* CANCELLED,
* CONDITION,
* PROPAGATE,
* 0
*
* 初始情况下,waitStatus = 0
*/
volatile int waitStatus;
/**
* 前驱节点
*/
volatile Node prev;
/**
* 后继节点
*/
volatile Node next;
/**
* 对应的线程
*/
volatile Thread thread;
/**
* 下一个等待节点,用在 ConditionObject 中
*/
Node nextWaiter;
/**
* 判断节点是否是共享节点
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
/**
* 获取前驱节点
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
/** addWaiter 方法会调用该构造方法 */
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
/** Condition 中会用到此构造方法 */
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
4.2 独占模式分析
4.2.1 获取同步状态
独占式获取同步状态时通过 acquire 进行的,下面来分析一下该方法的源码。如下:
/**
* 该方法将会调用子类复写的 tryAcquire 方法获取同步状态,
* - 获取成功:直接返回
* - 获取失败:将线程封装在节点中,并将节点置于同步队列尾部,
* 通过自旋尝试获取同步状态。如果在有限次内仍无法获取同步状态,
* 该线程将会被 LockSupport.park 方法阻塞住,直到被前驱节点唤醒
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
/** 向同步队列尾部添加一个节点 */
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 尝试以快速方式将节点添加到队列尾部
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 快速插入节点失败,调用 enq 方法,不停的尝试插入节点
enq(node);
return node;
}
/**
* 通过 CAS + 自旋的方式插入节点到队尾
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
// 设置头结点,初始情况下,头结点是一个空节点
if (compareAndSetHead(new Node()))
tail = head;
} else {
/*
* 将节点插入队列尾部。这里是先将新节点的前驱设为尾节点,之后在尝试将新节点设为尾节
* 点,最后再将原尾节点的后继节点指向新的尾节点。除了这种方式,我们还先设置尾节点,
* 之后再设置前驱和后继,即:
*
* if (compareAndSetTail(t, node)) {
* node.prev = t;
* t.next = node;
* }
*
* 但但如果是这样做,会导致一个问题,即短时内,队列结构会遭到破坏。考虑这种情况,
* 某个线程在调用 compareAndSetTail(t, node)成功后,该线程被 CPU 切换了。此时
* 设置前驱和后继的代码还没带的及执行,但尾节点指针却设置成功,导致队列结构短时内会
* 出现如下情况:
*
* +------+ prev +-----+ +-----+
* head | | <---- | | | | tail
* | | ----> | | | |
* +------+ next +-----+ +-----+
*
* tail 节点完全脱离了队列,这样导致一些队列遍历代码出错。如果先设置
* 前驱,在设置尾节点。及时线程被切换,队列结构短时可能如下:
*
* +------+ prev +-----+ prev +-----+
* head | | <---- | | <---- | | tail
* | | ----> | | | |
* +------+ next +-----+ +-----+
*
* 这样并不会影响从后向前遍历,不会导致遍历逻辑出错。
*
* 参考:
* https://www.cnblogs.com/micrari/p/6937995.html
*/
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
/**
* 同步队列中的线程在此方法中以循环尝试获取同步状态,在有限次的尝试后,
* 若仍未获取锁,线程将会被阻塞,直至被前驱节点的线程唤醒。
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 循环获取同步状态
for (;;) {
final Node p = node.predecessor();
/*
* 前驱节点如果是头结点,表明前驱节点已经获取了同步状态。前驱节点释放同步状态后,
* 在不出异常的情况下, tryAcquire(arg) 应返回 true。此时节点就成功获取了同
* 步状态,并将自己设为头节点,原头节点出队。
*/
if (p == head && tryAcquire(arg)) {
// 成功获取同步状态,设置自己为头节点
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
/*
* 如果获取同步状态失败,则根据条件判断是否应该阻塞自己。
* 如果不阻塞,CPU 就会处于忙等状态,这样会浪费 CPU 资源
*/
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
/*
* 如果在获取同步状态中出现异常,failed = true,cancelAcquire 方法会被执行。
* tryAcquire 需同步组件开发者覆写,难免不了会出现异常。
*/
if (failed)
cancelAcquire(node);
}
}
/** 设置头节点 */
private void setHead(Node node) {
// 仅有一个线程可以成功获取同步状态,所以这里不需要进行同步控制
head = node;
node.thread = null;
node.prev = null;
}
/**
* 该方法主要用途是,当线程在获取同步状态失败时,根据前驱节点的等待状态,决定后续的动作。比如前驱
* 节点等待状态为 SIGNAL,表明当前节点线程应该被阻塞住了。不能老是尝试,避免 CPU 忙等。
* —————————————————————————————————————————————————————————————————
* | 前驱节点等待状态 | 相应动作 |
* —————————————————————————————————————————————————————————————————
* | SIGNAL | 阻塞 |
* | CANCELLED | 向前遍历, 移除前面所有为该状态的节点 |
* | waitStatus < 0 | 将前驱节点状态设为 SIGNAL, 并再次尝试获取同步状态 |
* —————————————————————————————————————————————————————————————————
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
/*
* 前驱节点等待状态为 SIGNAL,表示当前线程应该被阻塞。
* 线程阻塞后,会在前驱节点释放同步状态后被前驱节点线程唤醒
*/
if (ws == Node.SIGNAL)
return true;
/*
* 前驱节点等待状态为 CANCELLED,则以前驱节点为起点向前遍历,
* 移除其他等待状态为 CANCELLED 的节点。
*/
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* 等待状态为 0 或 PROPAGATE,设置前驱节点等待状态为 SIGNAL,
* 并再次尝试获取同步状态。
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
// 调用 LockSupport.park 阻塞自己
LockSupport.park(this);
return Thread.interrupted();
}
/**
* 取消获取同步状态
*/
private void cancelAcquire(Node node) {
if (node == null)
return;
node.thread = null;
// 前驱节点等待状态为 CANCELLED,则向前遍历并移除其他为该状态的节点
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 记录 pred 的后继节点,后面会用到
Node predNext = pred.next;
// 将当前节点等待状态设为 CANCELLED
node.waitStatus = Node.CANCELLED;
/*
* 如果当前节点是尾节点,则通过 CAS 设置前驱节点 prev 为尾节点。设置成功后,再利用 CAS 将
* prev 的 next 引用置空,断开与后继节点的联系,完成清理工作。
*/
if (node == tail && compareAndSetTail(node, pred)) {
/*
* 执行到这里,表明 pred 节点被成功设为了尾节点,这里通过 CAS 将 pred 节点的后继节点
* 设为 null。注意这里的 CAS 即使失败了,也没关系。失败了,表明 pred 的后继节点更新
* 了。pred 此时已经是尾节点了,若后继节点被更新,则是有新节点入队了。这种情况下,CAS
* 会失败,但失败不会影响同步队列的结构。
*/
compareAndSetNext(pred, predNext, null);
} else {
int ws;
// 根据条件判断是唤醒后继节点,还是将前驱节点和后继节点连接到一起
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
/*
* 这里使用 CAS 设置 pred 的 next,表明多个线程同时在取消,这里存在竞争。
* 不过此处没针对 compareAndSetNext 方法失败后做一些处理,表明即使失败了也
* 没关系。实际上,多个线程同时设置 pred 的 next 引用时,只要有一个能设置成
* 功即可。
*/
compareAndSetNext(pred, predNext, next);
} else {
/*
* 唤醒后继节点对应的线程。这里简单讲一下为什么要唤醒后继线程,考虑下面一种情况:
* head node1 node2 tail
* ws=0 ws=1 ws=-1 ws=0
* +------+ prev +-----+ prev +-----+ prev +-----+
* | | <---- | | <---- | | <---- | |
* | | ----> | | ----> | | ----> | |
* +------+ next +-----+ next +-----+ next +-----+
*
* 头结点初始状态为 0,node1、node2 和 tail 节点依次入队。node1 自旋过程中调用
* tryAcquire 出现异常,进入 cancelAcquire。head 节点此时等待状态仍然是 0,它
* 会认为后继节点还在运行中,所它在释放同步状态后,不会去唤醒后继等待状态为非取消的
* 节点 node2。如果 node1 再不唤醒 node2 的线程,该线程面临无法被唤醒的情况。此
* 时,整个同步队列就回全部阻塞住。
*/
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
/*
* 通过 CAS 将等待状态设为 0,让后继节点线程多一次
* 尝试获取同步状态的机会
*/
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
/*
* 这里如果 s == null 处理,是不是表明 node 是尾节点?答案是不一定。原因之前在分析
* enq 方法时说过。这里再啰嗦一遍,新节点入队时,队列瞬时结构可能如下:
* node1 node2
* +------+ prev +-----+ prev +-----+
* head | | <---- | | <---- | | tail
* | | ----> | | | |
* +------+ next +-----+ +-----+
*
* node2 节点为新入队节点,此时 tail 已经指向了它,但 node1 后继引用还未设置。
* 这里 node1 就是 node 参数,s = node1.next = null,但此时 node1 并不是尾
* 节点。所以这里不能从前向后遍历同步队列,应该从后向前。
*/
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 唤醒 node 的后继节点线程
LockSupport.unpark(s.thread);
}
到这里,独占式获取同步状态的分析就讲完了。如果仅分析获取同步状态的大致流程,那么这个流程并不难。但若深入到细节之中,还是需要思考思考。这里对独占式获取同步状态的大致流程做个总结,如下:
- 调用 tryAcquire 方法尝试获取同步状态
- 获取成功,直接返回
- 获取失败,将线程封装到节点中,并将节点入队
- 入队节点在 acquireQueued 方法中自旋获取同步状态
- 若节点的前驱节点是头节点,则再次调用 tryAcquire 尝试获取同步状态
- 获取成功,当前节点将自己设为头节点并返回
- 获取失败,可能再次尝试,也可能会被阻塞。这里简单认为会被阻塞。
上面的步骤对应下面的流程图:
上面流程图参考自《Java并发编程》第128页图 5-5,这里进行了重新绘制,并做了一定的修改。
4.2.2 释放同步状态
相对于获取同步状态,释放同步状态的过程则要简单的多,这里简单罗列一下步骤:
- 调用 tryRelease(arg) 尝试释放同步状态
- 根据条件判断是否应该唤醒后继线程
就两个步骤,下面看一下源码分析。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
/*
* 这里简单列举条件分支的可能性,如下:
* 1. head = null
* head 还未初始化。初始情况下,head = null,当第一个节点入队后,head 会被初始
* 为一个虚拟(dummy)节点。这里,如果还没节点入队就调用 release 释放同步状态,
* 就会出现 h = null 的情况。
*
* 2. head != null && waitStatus = 0
* 表明后继节点对应的线程仍在运行中,不需要唤醒
*
* 3. head != null && waitStatus < 0
* 后继节点对应的线程可能被阻塞了,需要唤醒
*/
if (h != null && h.waitStatus != 0)
// 唤醒后继节点,上面分析过了,这里不再赘述
unparkSuccessor(h);
return true;
}
return false;
}
4.3 共享模式分析
与独占模式不同,共享模式下,同一时刻会有多个线程获取共享同步状态。共享模式是实现读写锁中的读锁、CountDownLatch 和 Semaphore 等同步组件的基础,搞懂了,再去理解一些共享同步组件就不难了。
4.3.1 获取同步状态
共享类型的节点获取共享同步状态后,如果后继节点也是共享类型节点,当前节点则会唤醒后继节点。这样,多个节点线程即可同时获取共享同步状态。
public final void acquireShared(int arg) {
// 尝试获取共享同步状态,tryAcquireShared 返回的是整型
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
// 这里和前面一样,也是通过有限次自旋的方式获取同步状态
for (;;) {
final Node p = node.predecessor();
/*
* 前驱是头结点,其类型可能是 EXCLUSIVE,也可能是 SHARED.
* 如果是 EXCLUSIVE,线程无法获取共享同步状态。
* 如果是 SHARED,线程则可获取共享同步状态。
* 能不能获取共享同步状态要看 tryAcquireShared 具体的实现。比如多个线程竞争读写
* 锁的中的读锁时,均能成功获取读锁。但多个线程同时竞争信号量时,可能就会有一部分线
* 程因无法竞争到信号量资源而阻塞。
*/
if (p == head) {
// 尝试获取共享同步状态
int r = tryAcquireShared(arg);
if (r >= 0) {
// 设置头结点,如果后继节点是共享类型,唤醒后继节点
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/**
* 这个方法做了两件事情:
* 1. 设置自身为头结点
* 2. 根据条件判断是否要唤醒后继节点
*/
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
// 设置头结点
setHead(node);
/*
* 这个条件分支由 propagate > 0 和 h.waitStatus < 0 两部分组成。
* h.waitStatus < 0 时,waitStatus = SIGNAL 或 PROPAGATE。这里仅依赖
* 条件 propagate > 0 判断是否唤醒后继节点是不充分的,至于原因请参考第五章
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
/*
* 节点 s 如果是共享类型节点,则应该唤醒该节点
* 至于 s == null 的情况前面分析过,这里不在赘述。
*/
if (s == null || s.isShared())
doReleaseShared();
}
}
/**
* 该方法用于在 acquires/releases 存在竞争的情况下,确保唤醒动作向后传播。
*/
private void doReleaseShared() {
/*
* 下面的循环在 head 节点存在后继节点的情况下,做了两件事情:
* 1. 如果 head 节点等待状态为 SIGNAL,则将 head 节点状态设为 0,并唤醒后继节点
* 2. 如果 head 节点等待状态为 0,则将 head 节点状态设为 PROPAGATE,保证唤醒能够正
* 常传播下去。关于 PROPAGATE 状态的细节分析,后面会讲到。
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
/*
* ws = 0 的情况下,这里要尝试将状态从 0 设为 PROPAGATE,保证唤醒向后
* 传播。setHeadAndPropagate 在读到 h.waitStatus < 0 时,可以继续唤醒
* 后面的节点。
*/
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
到这里,共享模式下获取同步状态的逻辑就分析完了,不过我这里只做了简单分析。相对于独占式获取同步状态,共享式的情况更为复杂。独占模式下,只有一个节点线程可以成功获取同步状态,也只有获取已同步状态节点线程才可以释放同步状态。但在共享模式下,多个共享节点线程可以同时获得同步状态,在一些线程获取同步状态的同时,可能还会有另外一些线程正在释放同步状态。所以,共享模式更为复杂。这里我的脑力跟不上了,没法面面俱到的分析,见谅。
最后说一下共享模式下获取同步状态的大致流程,如下:
- 获取共享同步状态
- 若获取失败,则生成节点,并入队
- 如果前驱为头结点,再次尝试获取共享同步状态
- 获取成功则将自己设为头结点,如果后继节点是共享类型的,则唤醒
- 若失败,将节点状态设为 SIGNAL,再次尝试。若再次失败,线程进入等待状态
4.3.2 释放共享状态
释放共享状态主要逻辑在 doReleaseShared 中,doReleaseShared 上节已经分析过,这里就不赘述了。共享节点线程在获取同步状态和释放同步状态时都会调用 doReleaseShared,所以 doReleaseShared 是多线程竞争集中的地方。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
5.PROPAGATE 状态存在的意义
AQS 的节点有几种不同的状态,这个在 4.1 节介绍过。在这几个状态中,PROPAGATE 的用途可能是最不好理解的。网上包括一些书籍关于该状态的叙述基本都是一句带过,也就是 PROPAGATE 字面意义,即向后传播唤醒动作。至于怎么传播,鲜有资料说明过。不过,好在最终我还是找到了一篇详细叙述了 PROPAGATE 状态的文章。在博客园上,博友 活在夢裡 在他的文章 AbstractQueuedSynchronizer源码解读 对 PROPAGATE,以及其他的一些细节进行了说明,很有深度。在钦佩之余,不由得感叹作者思考的很深入。在征得他的同意后,我将在本节中引用他文章中对 PROPAGATE 状态说明的部分,并进行一定的补充说明。这里感谢作者 活在夢裡 的精彩分享,若不参考他的文章,我的这篇文章内容会比较空洞。好了,其他的不多说了,继续往下分析。
在本节中,将会说明两个个问题,如下:
- PROPAGATE 状态用在哪里,以及怎样向后传播唤醒动作的?
- 引入 PROPAGATE 状态是为了解决什么问题?
这两个问题将会在下面两节中分别进行说明。
5.1 利用 PROPAGATE 传播唤醒动作
PROPAGATE 状态是用来传播唤醒动作的,那么它是在哪里进行传播的呢?答案是在setHeadAndPropagate
方法中,这里再来看看 setHeadAndPropagate 方法的实现:
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
大家注意看 setHeadAndPropagate 方法中那个长长的判断语句,其中有一个条件是h.waitStatus < 0
,当 h.waitStatus = SIGNAL(-1) 或 PROPAGATE(-3) 是,这个条件就会成立。那么 PROPAGATE 状态是在何时被设置的呢?答案是在doReleaseShared
方法中,如下:
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {...}
// 如果 ws = 0,则将 h 状态设为 PROPAGATE
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
...
}
}
再回到 setHeadAndPropagate 的实现,该方法既然引入了h.waitStatus < 0
这个条件,就意味着仅靠条件propagate > 0
判断是否唤醒后继节点线程的机制是不充分的。至于为啥不充分,请继续往看下看。
5.2 引入 PROPAGATE 所解决的问题
PROPAGATE 的引入是为了解决一个 BUG -- JDK-6801020,复现这个 BUG 的代码如下:
import java.util.concurrent.Semaphore;
public class TestSemaphore {
private static Semaphore sem = new Semaphore(0);
private static class Thread1 extends Thread {
@Override
public void run() {
sem.acquireUninterruptibly();
}
}
private static class Thread2 extends Thread {
@Override
public void run() {
sem.release();
}
}
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 10000000; i++) {
Thread t1 = new Thread1();
Thread t2 = new Thread1();
Thread t3 = new Thread2();
Thread t4 = new Thread2();
t1.start();
t2.start();
t3.start();
t4.start();
t1.join();
t2.join();
t3.join();
t4.join();
System.out.println(i);
}
}
}
根据 BUG 的描述消息可知 JDK 6u11,6u17 两个版本受到影响。那么,接下来再来看看引起这个 BUG 的代码 -- JDK 6u17 中 setHeadAndPropagate 和 releaseShared 两个方法源码,如下:
private void setHeadAndPropagate(Node node, int propagate) {
setHead(node);
if (propagate > 0 && node.waitStatus != 0) {
/*
* Don't bother fully figuring out successor. If it
* looks null, call unparkSuccessor anyway to be safe.
*/
Node s = node.next;
if (s == null || s.isShared())
unparkSuccessor(node);
}
}
// 和 release 方法的源码基本一样
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
下面来简单说明 TestSemaphore 这个类的逻辑。这个类持有一个数值为 0 的信号量对象,并创建了4个线程,线程 t1 和 t2 用于获取信号量,t3 和 t4 则是调用 release() 方法释放信号量。在一般情况下,TestSemaphore 这个类的代码都可以正常执行。但当有极端情况出现时,可能会导致同步队列挂掉。这里演绎一下这个极端情况,考虑某次循环时,队列结构如下:
- 时刻1:线程 t3 调用 unparkSuccessor 方法,head 节点状态由 SIGNAL(-1) 变为
0
,并唤醒线程 t1。此时信号量数值为1。 - 时刻2:线程 t1 恢复运行,t1 调用 Semaphore.NonfairSync 的 tryAcquireShared,返回
0
。然后线程 t1 被切换,暂停运行。 - 时刻3:线程 t4 调用 releaseShared 方法,因 head 的状态为
0
,所以 t4 不会调用 unparkSuccessor 方法。 - 时刻4:线程 t1 恢复运行,t1 成功获取信号量,调用 setHeadAndPropagate。但因为 propagate = 0,线程 t1 无法调用 unparkSuccessor 唤醒线程 t2,t2 面临无线程唤醒的情况。因为 t2 无法退出等待状态,所以 t2.join 会阻塞主线程,导致程序挂住。
下面再来看一下修复 BUG 后的代码,根据 BUG 详情页显示,该 BUG 在 JDK 1.7 中被修复。这里找一个 JDK 7 较早版本(JDK 7u10)的代码看一下,如下:
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
在按照上面的代码演绎一下逻辑,如下:
- 时刻1:线程 t3 调用 unparkSuccessor 方法,head 节点状态由 SIGNAL(-1) 变为
0
,并唤醒线程t1。此时信号量数值为1。 - 时刻2:线程 t1 恢复运行,t1 调用 Semaphore.NonfairSync 的 tryAcquireShared,返回
0
。然后线程 t1 被切换,暂停运行。 - 时刻3:线程 t4 调用 releaseShared 方法,检测到
h.waitStatus = 0
,t4 将头节点等待状态由0
设为PROPAGATE(-3)
。 - 时刻4:线程 t1 恢复运行,t1 成功获取信号量,调用 setHeadAndPropagate。因 propagate = 0,
propagate > 0
条件不满足。而h.waitStatus = PROPAGATE(-3)
,所以条件h.waitStatus < 0
成立。进而,线程 t1 可以唤醒线程 t2,完成唤醒动作的传播。
到这里关于状态 PROPAGATE 的内容就讲完了。最后,简单总结一下本章开头提的两个问题。
问题一:PROPAGATE 状态用在哪里,以及怎样向后传播唤醒动作的?
答:PROPAGATE 状态用在 setHeadAndPropagate。当头节点状态被设为 PROPAGATE 后,后继节点成为新的头结点后。若 propagate > 0
条件不成立,则根据条件h.waitStatus < 0
成立与否,来决定是否唤醒后继节点,即向后传播唤醒动作。
问题二:引入 PROPAGATE 状态是为了解决什么问题?
答:引入 PROPAGATE 状态是为了解决并发释放信号量所导致部分请求信号量的线程无法被唤醒的问题。
声明:
本章内容是在博友 活在夢裡 的文章 AbstractQueuedSynchronizer源码解读 基础上,进行了一定的补充说明。本章所参考的观点已经过原作者同意,为避免抄袭嫌疑,特此声明。
6.总结
到这里,本文就差不多结束了。如果大家从头看到尾,到这里就可以放松一下了。写到这里,我也可以放松一下了。这篇文章总共花费了我十多天的空闲时间,确实不容易。本来我只打算讲一下基本原理,但知道后来看到本文多次推荐的那篇文章。那篇文章给我的第一感觉是,作者很厉害。第二感觉是,我也要写出一篇较为深入的 AQS 分析文章。虽然写出来也不能怎么样,水平也不会因此提高多少,也不会去造个类似的轮子。但是写完后,确实感觉很有成就感。本文的最后,来说一下如何学习 AQS 原理。AQS 的大致原理不是很难理解,所以一开始不建议纠结细节,应该先弄懂它的大致原理。在此基础上,再去分析一些细节,分析细节时,要从多线程的角度去考虑。比如,有点地方 CAS 失败后要重试,有的不用重试。总体来说 AQS 的大致原理容易理解,细节部分比较复杂。很多细节要在脑子里演绎一遍,好好思考才能想通,有点烧脑。另外因为文章篇幅的问题,关于 AQS ConditionObject 部分的分析将会放在下一篇文章中进行。
最后,再向 AQS 的作者 Doug Lea 致以崇高的敬意。仅尽力弄懂 AQS 的原理都很难了,可想而知,实现 AQS 的难度有多大。
限于本人的能力,加之深入分析 AQS 本身就比较有难度。所以文中难免会有错误出现,如果不慎翻车,请见谅。也欢迎在评论区指明这些错误,感谢。
参考
本文在知识共享许可协议 4.0 下发布,转载需在明显位置处注明出处
作者:coolblog.xyz
本文同步发布在我的个人博客:http://www.coolblog.xyz/?r=cb
本作品采用知识共享署名-非商业性使用-禁止演绎 4.0 国际许可协议进行许可。