引言
本文着重介绍 AQS 的 Condition 实现方式。所有关于 Java 并发的文章均收录于<Java并发系列文章>。
Condition
介绍完 AQS 的共享模式和互斥模式后,我们来看一看 AQS 是如何实现条件等待的,即 Condition。在 AQS 中通过 ConditionObject 实现 Condition。从 ConditionObject 的核心数据中,我们会发现它内部也会维护一个 Node 的队列。
public class ConditionObject implements Condition, java.io.Serializable {
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
}
那么这个队列和同步队列有什么联系吗?带着这样的疑问我们走读一下在 Condition 上 await 入队的流程。这里我们以 ReentrantLock 的 Condition 实现作为例子。 ReentrantLock 的 newCondition 函数能够创建一个 Condition 对象。内部实际上就是创建了一个 AQS 的 ConditionObject 类。
// ReentrantLock 的 newCondition 函数能够创建一个 Condition 对象
public Condition newCondition() {
return sync.newCondition();
}
// 内部实际上就是创建了一个 AQS 的 ConditionObject 类
final ConditionObject newCondition() {
return new ConditionObject();
}
await
我们知道,在使用 Condition 对象时,要先持有对应的锁,然后再执行 Condition 的 await 方法。因为 Condition 内部不提供同步保证,我们需要通过 lock 来保护 Condition 的正确性。
- 在 Condition 的 await 函数中,先会将当前线程添加到 Condition 的条件队列中,因为此时持有锁,所以 addConditionWaiter 就是拿到尾指针然后新建一个节点并加入(使用Node的nextWaiter字段保存下一个Node),注意新建节点的状态是 CONDITION
- 添加到条件队列后,它才将锁释放,并保存了之前的锁状态,因为在唤醒时,必须要恢复锁状态
- 紧接着是一个循环,只要当前节点不处于同步队列中,就通过 park 等待,进入到同步队列中,说明当前节点已经从条件队列移除,并开始了尝试获取锁的过程
如果从等待状态中恢复,首先要检查一下是不是被中断了,如果是要根据被打断的时间点做出不同的处理
- 如果是被 Signal 唤醒前被打断了(当前节点状态是 CONDITION),就将当前节点状态改为 0(表示取消等待),并添加到同步队列中,然后返回 THROW_IE,这意味着最终它会抛出一个异常
- 如果是被 Signal 唤醒之后被打断了(当前节点状态不是 CONDITION,说明已经进入了或即将进入同步队列),这里就通过自旋等待进入同步队列即可,CONDITION 代表当前处于条件队列,!CONDITION 代表当前处于同步队列,最后返回了 REINTERRUPT,意味着只会设置线程的中断标志位,不会抛出异常
- 当前节点处于等待队列中时(有可能被Signal唤醒也有可能被打断),开始调用 acquireQueued 等待获取锁
成功获取锁之后,进行简单地清理工作,然后如果当前线程被打断,要根据情况做出不动处理
- Signal 唤醒前被打断了:抛出 InterruptedException
- Signal 唤醒之后被打断: 记录中断标志位
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 先会将当前线程添加到 Condition 的**条件队列**中
Node node = addConditionWaiter();
// 添加到**条件队列**后,它才将锁释放,并保存了之前的锁状态,因为在唤醒时,必须要恢复锁状态
int savedState = fullyRelease(node);
int interruptMode = 0;
// 只要不处于同步队列中,就通过 park 等待
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
// 如果被打断了,就将当前线程加入到同步队列中,因为从 await 返回时,无论如何我们也要获取到锁,被打断和被 Signal 都一样
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 走到这说明当前节点处于等待队列中时(无论是被Signal唤醒还是被打断),开始调用 acquireQueued 等待获取锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
/**
* Adds a new waiter to wait queue.
* @return its new wait node
*/
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out. 清除无用的节点
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
// 因为此时持有锁,所以 addConditionWaiter 就是拿到尾指针然后新建一个节点并加入(使用Node的nextWaiter字段保存下一个Node)
// 注意新建节点的状态是 CONDITION
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
/**
* Unlinks cancelled waiter nodes from condition queue.
* Called only while holding lock. This is called when
* cancellation occurred during condition wait, and upon
* insertion of a new waiter when lastWaiter is seen to have
* been cancelled. This method is needed to avoid garbage
* retention in the absence of signals. So even though it may
* require a full traversal, it comes into play only when
* timeouts or cancellations occur in the absence of
* signals. It traverses all nodes rather than stopping at a
* particular target to unlink all pointers to garbage nodes
* without requiring many re-traversals during cancellation
* storms.
*/
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
// 状态不等于 CONDITION 代表了已经被取消或者已经被Signal唤醒,将其从条件队列中清楚
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
/**
* Returns true if a node, always one that was initially placed on
* a condition queue, is now waiting to reacquire on sync queue.
* @param node the node
* @return true if is reacquiring
*/
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
/*
* node.prev can be non-null, but not yet on queue because
* the CAS to place it on queue can fail. So we have to
* traverse from tail to make sure it actually made it. It
* will always be near the tail in calls to this method, and
* unless the CAS failed (which is unlikely), it will be
* there, so we hardly ever traverse much.
*/
return findNodeFromTail(node);
}
/**
* Checks for interrupt, returning THROW_IE if interrupted
* before signalled, REINTERRUPT if after signalled, or
* 0 if not interrupted.
*/
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ? // 检查是否被打断
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
/**
* Transfers node, if necessary, to sync queue after a cancelled wait.
* Returns true if thread was cancelled before being signalled.
*
* @param node the node
* @return true if cancelled before the node was signalled
*/
final boolean transferAfterCancelledWait(Node node) {
// 如果是被 Signal 唤醒前被打断了(当前节点状态是 CONDITION),就将当前节点状态改为 0(表示取消等待),并添加到**同步队列**中,然后返回 true,这意味着最终 await 会抛出一个 InterruptedException
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}
/*
* If we lost out to a signal(), then we can't proceed
* until it finishes its enq(). Cancelling during an
* incomplete transfer is both rare and transient, so just
* spin.
*/
// 如果是被 Signal 唤醒之后被打断了(当前节点状态不是 CONDITION,说明已经进入了或即将进入同步队列),这里就通过自旋等待进入同步队列即可
// 最后返回了 false,意味着 await 只会设置线程的中断标志位,不会抛出 InterruptedException
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
/**
* Throws InterruptedException, reinterrupts current thread, or
* does nothing, depending on mode.
*/
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
signal
接下来,我们看一下通知signal的实现逻辑。
- 我们知道调用 signal 时,也是要求持有锁 lock 的,所以在 signal 中首先会判断是不是当前线程持有锁
然后检查等待队列是不是空,如果不为空,则唤醒第一个线程,唤醒线程的逻辑也很简单
- 首先通过修改 firstWaiter 指针,将条件队列中的队首结点排除
然后将刚才移除的节点移动到同步队列中,因为在这个过程中可能该节点对应的线程被打断,所以当发生这种情况时,我们需要唤醒队列中的下一个节点,直到成功将一个节点移动到同步队列中或者条件队列为空
- 怎么判断一个线程的 await 被打断了呢?就像前面所说的只要节点的状态不是 CONDITION 那它就已经被取消
- 将节点添加到同步队列中还不够,我们需要修改前序节点的状态,使其状态为 SIGNAL 这样才能放心,所以在 transferForSignal 的最后检查了前序节点的状态如果发现该节点已经被取消,或者修改 SIGNAL 失败,就主动唤醒该节点,因为唤醒之后该节点的线程会进入到 acquireQueued 函数中,那里有更完备的删除取消节点,加锁以及等待的处理流程
/**
* Moves the longest-waiting thread, if one exists, from the
* wait queue for this condition to the wait queue for the
* owning lock.
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
public final void signal() {
// 我们知道调用 signal 时,也是要求持有锁 lock 的,所以在 signal 中首先会判断是不是当前线程持有锁
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
/**
* Removes and transfers nodes until hit non-cancelled one or
* null. Split out from signal in part to encourage compilers
* to inline the case of no waiters.
* @param first (non-null) the first node on condition queue
*/
private void doSignal(Node first) {
do {
// 首先通过修改 firstWaiter 指针,将条件队列中的队首结点排除
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
// 然后将刚才移除的节点移动到同步队列中,因为在这个过程中可能该节点对应的线程被打断,所以当发生这种情况时,我们需要唤醒队列中的下一个节点
// 直到成功将一个节点移动到同步队列中或者条件队列为空为止
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
/**
* Transfers a node from a condition queue onto sync queue.
* Returns true if successful.
* @param node the node
* @return true if successfully transferred (else the node was
* cancelled before signal)
*/
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
* 只要节点的状态不是 CONDITION 那它就已经被取消
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
最后,我们简单地说一下 signalAll 的实现,它和 signal 的实现基本相同,只不过它会对条件队列中的每一个节点执行 transferForSignal 函数。
/**
* Moves all threads from the wait queue for this condition to
* the wait queue for the owning lock.
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
/**
* Removes and transfers all nodes.
* @param first (non-null) the first node on condition queue
*/
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
文章说明
更多有价值的文章均收录于贝贝猫的文章目录
版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!
创作声明: 本文基于下列所有参考内容进行创作,其中可能涉及复制、修改或者转换,图片均来自网络,如有侵权请联系我,我会第一时间进行删除。
参考内容
[1] linux 2.6 互斥锁的实现-源码分析
[2] 深入解析条件变量(condition variables)
[3] Linux下Condition Vairable和Mutext合用的小细节
[4] 从ReentrantLock的实现看AQS的原理及应用
[5] 不可不说的Java“锁”事
[6] 从源码层面解析yield、sleep、wait、park
[7] LockSupport中的park与unpark原理
[8] Thread.sleep、Object.wait、LockSupport.park 区别
[9] 从AQS到futex-二-JVM的Thread和Parker
[10] Java的LockSupport.park()实现分析
[11] JVM源码分析之Object.wait/notify实现
[12] Java线程源码解析之interrupt
[13] Thread.interrupt()相关源码分析%E7%9B%B8%E5%85%B3%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90/)
[14] Java CAS 原理剖析
[15] 源码解析 Java 的 compareAndSwapObject 到底比较的是什么
[16] 《Java并发编程的艺术》
[17] 《实战 Java 高并发程序设计》
[18] volatile关键字深入学习
[19] 为什么Netty的FastThreadLocal速度快
[20] 线程池ThreadPoolExecutor实现原理
[21] 深入理解Java线程池:ThreadPoolExecutor
[22] ConcurrentHashMap 详解一
[23] ConcurrentHashMap 详解二
[24] JUC中Atomic class之lazySet的一点疑惑
[25] The JSR-133 Cookbook for Compiler Writers
[26] 就是要你懂Java中volatile关键字实现原理