引言
本文着重介绍 AQS 的排他模式的实现方式。所有关于 Java 并发的文章均收录于<Java并发系列文章>。
排他模式
获取资源
入队操作介绍完之后,我们来看一下什么情况下需要执行入队操作,我们先从排他模式说起。下面的 acquire 是 AQS 提供的一个以排他模式获取资源的函数,我们可以看到它的执行流程是:
- 先尝试获取资源 tryAcquire,tryAcquire 是一个抽象函数,看完前面的锁的分类部分大家应该对它比较熟悉,因为通过 AQS 实现的各类锁实际上就是通过对 tryAcquire 这类抽象函数的覆写来达到各种锁的效果的。
- 如果尝试加锁失败,也就是说当前该资源已经被加锁了,就通过 addWaiter 将当前线程添加到同步队列中,注意参数是 Node.EXCLUSIVE 意味着排它锁。
- 加入到同步队列后,开始执行 acquireQueued 函数,猜一下应该能猜到这里面一定是进行睡眠等待的逻辑
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
下面的就是 acquireQueued 的代码。
- 这里面有一个循环,它会不断地获取当前节点的前序节点,如果前序节点是 head 节点(也就是那个虚拟节点,还记得吗,虚拟节点不保存有效数据,只用作指针),这里 head 节点充当一个标志位的效果,如果一个节点的前序节点是 head,那么该节点就排在队列的第一位。
- 如果 p == head ,我们就需要再调用一次 tryAcquire 尝试获取锁
- 如果第二步成功的话,当前线程已经获得到锁了,这时候要将 head 指针进行修改,可以看到 setHead 并没有使用到 CAS 指令,因为能执行到 setHead 函数的线程相当于已经获得到同步资源了,不存在竞争
- 在 setHead 函数中,对当前 Node 的 thread 和 prev 进行了清除,因为这时候该 Node 已经扮演了虚拟节点的角色,有必要把虚拟节点中用不到的属性进行清除
- 如果 CAS 执行失败(这里有很多种可能,比如被中断了,或者是刚入队,又或者是虚假唤醒(后面介绍)),则检查是否需要继续进入睡眠,一般来说如果前序节点的状态成功改为 SIGNAL 之后(但是改成 SIGNAL 之后还会再尝试获取一次锁,失败之后才会睡眠,这是防止死等的重中之重),就可以进入等待了,SIGNAL 表明当前节点肩负着唤醒下一个节点的责任,除此之外,在检查是否需要睡眠时,如果发现前序节点的请求已经被取消,则删除该节点。
- 如果发现自己确实需要睡眠,则会通过 park 函数进入等待状态,因为使用的是 park 所以不需要担心,别的线程先执行唤醒之后,当前线程再进入等待的情况,因为在这种情况下 park 函数会直接返回不会进行等待
- 最后从等待状态中恢复过来之后,检查是否是因为中断而唤醒的,是的话,就记录一下,在返回的时候以返回值的形式告诉调用者。可见 acquire 在遇到中断时,不会抛出 InterruptedException 异常,而是循环重试。如果想要达到被中断时立即抛出异常的效果,可以使用 acquireInterruptibly, 它的实现逻辑和 acquire 基本相同,主要的区别就是 park 被中断时,会抛出 InterruptedException
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {// 自己是排在最前面的节点,尝试获取锁
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
// 永远执行不到,因为抛出的异常都被 parkAndCheckInterrupt 中的 Thread.interrupted() 吞掉了
cancelAcquire(node);
}
}
/**
* Sets head of queue to be node, thus dequeuing. Called only by
* acquire methods. Also nulls out unused fields for sake of GC
* and to suppress unnecessary signals and traversals.
*/
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
/**
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev.
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* 前序节点的状态已经是 SIGNAL 了,可以进入等待状态
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* 前序节点被取消,移除被取消的节点
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* 修改前序节点为 SIGNAL,这时候当前线程还没进入等待状态,我们需要重新判断一下现在自己是不是第一个,是的话就不用等待了
* 改成 SIGNAL 之后还会再尝试获取一次锁,失败之后才会睡眠,这是防止死等的重中之重,考虑如下情况 ThreadB-tryAcquire->ThreadA-Release->ThreadA-CheckStatus(!=SIGNAL)->ThreadB-ChangeStatus2SIGNAL
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
/**
* Convenience method to park and then check if interrupted
*
* @return {@code true} if interrupted
*/
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
作为延伸,我们这里带大家回顾一下 ReentrantLock 中公平锁部分对 tryAcquire 的实现。它对大家理解 AQS 如何在 CAS 修改同步队列的情况下(先修改前序指针->CAS 修改尾结点->修复后续指针),以哪种方式访问队列中的数据能够避开数据不同步的风险。简单地说,通过前序指针(prev)访问队列中的数据肯定是安全的。
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 在尝试获取公平锁时,先会判断队列中是否存在前序节点。
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
从上述代码中可以看到,在尝试获取公平锁时,先会判断队列中是否存在前序节点。之所以这么做是因为,只有发生互斥等待时,才会出现入队等待的情况,如果全是共享模式使用资源的话,队列会一直是空的,大家别急我们接下来就介绍共享模式的实现。这里我们先着重看一下 hasQueuedPredecessors 的实现,它是怎么判断有前序节点的呢:
- 首先如果 h != t 是说,队列的不为空,因为 head == tail 时,队列中只存在一个虚拟节点不存在实际的等待线程
- 在此基础上,我们还要判断一下当前持有锁的线程是不是自己,如果 head.next == null 说明有其他线程刚执行完入队的setTail工作(因为 h != t),但是前序指针还没修复,这种情况下 head.next == null,说明有别的线程已经持有锁了
- 另外一种可能就是 head.next != Thread.currentThread() 这时候队首持有锁的线程不是当前线程,所以存在前序节点
既然前面提到了 acquireInterruptibly 这里我们就简单地说一下,就像前面所说它和 acquire 基本相同,确实如此,从下面的代码中可以看到,它就是遇到中断时将 InterruptedException 外抛。
/**
* Acquires in exclusive mode, aborting if interrupted.
* Implemented by first checking interrupt status, then invoking
* at least once {@link #tryAcquire}, returning on
* success. Otherwise the thread is queued, possibly repeatedly
* blocking and unblocking, invoking {@link #tryAcquire}
* until success or the thread is interrupted. This method can be
* used to implement method {@link Lock#lockInterruptibly}.
*
*/
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 唯一的区别在这里,这里直接抛出异常,而 acquire 中只是记录一下flag: interrupted = true;
throw new InterruptedException();
}
} finally {
if (failed)
// 如果抛出 InterruptedException 异常,则会执行 cancelAcquire
cancelAcquire(node);
}
}
当抛出异常时,就会执行到最后面的 cancelAcquire 函数,该函数中负责将前序节点中状态为 CANCELLED 的节点清除,然后再把自己的状态变为 CANCELLED。紧接着是一些清除工作:
- 如果当前节点是尾结点,则通过 CAS 修改尾结点即可。如果 CAS 失败了也不要紧,因为当前线程状态已经是 CANCELLED 了所以其他线程会把自己清除
- 如果当前节点不是第一个节点,即 pred != head ,我们要保证前序节点的状态是 SIGNAL,并且前序节点的线程不是当前线程,因为自己不是尾结点,所以自己当前的状态很可能就是 SIGNAL,所以这里我们无论如何要确保前序节点的状态能够修改为 SIGNAL,如果做到了,就可以大胆地通过 CAS 将自己从队列中移除
- 否则,说明自己可能是头结点,或者前序节点都取消了,也有可能前序节点的线程就是当前线程,那么就只能由自己来唤醒后续的线程了
/**
* Cancels an ongoing attempt to acquire.
*
* @param node the node
*/
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
// Skip cancelled predecessors
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
唤醒的过程也很简单:
- 如果当前节点状态不是 CANCELLED 就清除状态
- 然后先看一下 next 指针指向的节点是否需要被唤醒,next == null 代表了可能发生的前序指针和后续指针不同步,s.waitStatus > 0 表示后继节点已被取消,这时候我们就需要找到下一个需要被唤醒的节点。
- 我们需要从尾结点出发,逐个向前找,因为前序指针肯定是安全的
- 如果找到了需要被唤醒的线程,就执行 unpark 唤醒它
/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
释放资源
介绍完资源的获取,我们再来看看资源的释放流程。
- 首先,调用 tryRelease 函数,它也是一个抽象函数,在上层实现中一般会进行必要的检查,比如检查持有锁的线程是否是当前线程等,就比如如下 ReentrantLock 中对 tryRelease 的实现。
- 释放成功后,判断当前同步队列是否为空,不为空并且当前线程承担唤醒职责时(waitStatus < 0),因为当前线程能够成功执行 tryRelease,所以当前线程的 waitStatus 不会是 CANCELLED,剩下的状态 SIGNAL 是需要承担唤醒职责的,CONDITION 和 PROPAGATE 我们后面介绍。
- 具体的唤醒函数 unparkSuccessor 我们前面刚介绍过,这里就不再赘述了
/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
AQS 框架使用
看到这里,不知道大家对 AQS 的使用流程有没有体会,我的理解是在使用它时,一般我们需要覆写 tryAcquire 和 tryRelease 这类函数,它们是直接修改互斥资源 state 的函数,AQS 通过将具体的状态修改职责移交到子类中,能让子类实现各种类型的锁,就比如前一章我们介绍的那些。
而子类向外暴露 lock 和 unlock 函数时,又直接使用 AQS 中的 acquire 和 releas 函数,因为这些函数中封装了尝试加锁过程和加锁失败入队等待过程。
public void lock() { acquire(1); }
public void unlock() { release(1); }
总结一下就是 AQS 对变化报开放的态度,你可以通过它完成各种同步策略,同时对与那些样板代码,都已经被它封装在了自己内部,并使用 final 关键字修饰,例如 acquire 和 release。
文章说明
更多有价值的文章均收录于贝贝猫的文章目录
版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!
创作声明: 本文基于下列所有参考内容进行创作,其中可能涉及复制、修改或者转换,图片均来自网络,如有侵权请联系我,我会第一时间进行删除。
参考内容
[1] linux 2.6 互斥锁的实现-源码分析
[2] 深入解析条件变量(condition variables)
[3] Linux下Condition Vairable和Mutext合用的小细节
[4] 从ReentrantLock的实现看AQS的原理及应用
[5] 不可不说的Java“锁”事
[6] 从源码层面解析yield、sleep、wait、park
[7] LockSupport中的park与unpark原理
[8] Thread.sleep、Object.wait、LockSupport.park 区别
[9] 从AQS到futex-二-JVM的Thread和Parker
[10] Java的LockSupport.park()实现分析
[11] JVM源码分析之Object.wait/notify实现
[12] Java线程源码解析之interrupt
[13] Thread.interrupt()相关源码分析%E7%9B%B8%E5%85%B3%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90/)
[14] Java CAS 原理剖析
[15] 源码解析 Java 的 compareAndSwapObject 到底比较的是什么
[16] 《Java并发编程的艺术》
[17] 《实战 Java 高并发程序设计》
[18] volatile关键字深入学习
[19] 为什么Netty的FastThreadLocal速度快
[20] 线程池ThreadPoolExecutor实现原理
[21] 深入理解Java线程池:ThreadPoolExecutor
[22] ConcurrentHashMap 详解一
[23] ConcurrentHashMap 详解二
[24] JUC中Atomic class之lazySet的一点疑惑
[25] The JSR-133 Cookbook for Compiler Writers
[26] 就是要你懂Java中volatile关键字实现原理