开篇
这篇文章主要是讲解FairSync公平锁的源码分析,整个内容分为加锁过程、解锁过程,CLH队列等概念。
首先一直困扰我的CLH队列的CLH的缩写我终于明白,看似三个人的人名的首字符缩写"CLH" (Craig, Landin, andHagersten)。
加锁过程主要核心逻辑在于尝试获取锁,获锁失败后进入等待队列,以及进入等待队列的过程是需要进行多次循环判断的。
解锁过程相对加锁过程会简单许多,核心逻辑在释放锁、唤醒下一个等待线程两个过程。
CLH的概念在加锁过程已经提及了,可以一并看看。
java源码 - ReentrantLock
java源码 - ReentrantLock之FairSync
java源码 - ReentrantLock之NonfairSync
java源码 - ReentrantLock图解加锁过程
加锁过程
ReentrantLock的的锁过程如下:
- 1、先尝试获取锁,通过tryAcquire()实现。
- 2、获取锁失败后,线程被包装成Node对象后添加到CLH队列,通过addWaiter()实现。
- 3、添加CLH队列后,逐步的去执行CLH队列的线程,如果当前线程获取到了锁,则返回;否则,当前线程进行休眠,直到唤醒并重新获取锁了才返回。
tryAcquire的操作流程
1、如果锁未占用的情况下:判断当前线程是否处于CLH的首位,如果位于首位就通过原子更新操作设置锁占用。
2、如果锁被占用的情况下:判断当前线程是否是占用锁线程,如果是则实现锁的可重入功能,设置锁占用次数。
static final class FairSync extends Sync {
// lock的入口,内部调用acquire方法实现加锁操作
final void lock() {
// lock的入口
acquire(1);
}
public final void acquire(int arg) {
// 第一步尝试获取锁,成功则返回
// 获取锁失败后通过addWaiter添加到CLH队列的末尾
// 通过acquireQueued判断是否轮到自己唤醒了
// 可以理解为之前没获取锁但是等执行到这里的时候可能锁已经释放了
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// acquires的参数值为1
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 获取当前锁状态,0表示锁未占用,>0表示被占用
int c = getState();
if (c == 0) {
// 首先判断是不是CLH队列的第一个元素,没有祖先则表示第一个元素
// 然后从unsafe把state设置为1,表示锁被占用
// 设置锁占用线程为当前线程
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
// 设置锁占用线程为当前线程
setExclusiveOwnerThread(current);
// 返回锁占用成功
return true;
}
}
// 判断锁占用线程是不是本线程,说明是可重入锁
else if (current == getExclusiveOwnerThread()) {
// 重入锁增加锁定次数
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
// 设置state会占用次数
setState(nextc);
// 返回锁占用成功
return true;
}
// 否则返回锁占用失败
return false;
}
}
acquire的操作流程
- 1、第一步通过tryAcquire()尝试获取锁,成功则返回
- 2、获取锁失败后通过addWaiter添加到CLH队列的末尾
- 3、添加CLH队列后,通过acquireQueued()方法逐步的去执行CLH队列的线程,如果当前线程获取到了锁则返回;否则当前线程进行休眠,直到唤醒并重新获取锁后返回。
public final void acquire(int arg) {
// 第一步尝试获取锁,成功则返回
// 获取锁失败后通过addWaiter添加到CLH队列的末尾
// 通过acquireQueued判断是否轮到自己唤醒了
// 可以理解为之前没获取锁但是等执行到这里的时候可能锁已经释放了
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
addWaiter的操作流程
1、将当前线程包装成Node对象。
2、先尝试通过快速失败法尝试在CLH队尾插入Node对象
3、如果快速插入失败后那么就通过enq方法在CLH队尾插入Node对象
private Node addWaiter(Node mode) {
// 将线程包装成为Node对象,便于添加CLH队列
Node node = new Node(Thread.currentThread(), mode);
// 先尝试快速插入到CLH队尾,插入成功就返回Node对象
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 快速插入CLH队尾失败后,通过enq方法实现
enq(node);
return node;
}
// 将Node节点插入CLH队尾的实现
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 如果CLH队列为空,那么设置Head和Tail都为Node
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 通过unsafe来保证Node插入队尾
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
acquireQueued的操作流程
- 1、如果当前节点Node的前驱节点属于head,当前节点属于老二地位通过tryAcquire()尝试获取锁,获取成功后那么就释放原head节点(可以理解为head已经释放锁然后从CLH删除),把当前节点设置为head节点。
- 2、通过shouldParkAfterFailedAcquire()方法判断Node代表的线程是否进入waiting状态,直到被unpark()。
- 3、parkAndCheckInterrupt()方法将当前线程进入waiting状态。
- 4、休眠线程被唤醒的时候会执行 if (p == head && tryAcquire(arg))逻辑判断
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 先判断节点的祖先
final Node p = node.predecessor();
// 如果前驱是head,即该结点已成老二,
// 那么便有资格去尝试获取资源,
// tryAcquire成功说明head已经释放锁
// 休眠线程被唤醒的时候会继续执行这里
if (p == head && tryAcquire(arg)) {
// 设置当前节点为head节点
setHead(node);
// 释放原head节点用于gc回收
p.next = null; // help GC
failed = false;
return interrupted;
}
// 如果自己可以休息了,就进入waiting状态,直到被unpark()
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//如果等待过程中被中断过,哪怕只有那么一次,就将interrupted标记为true
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire的操作流程
- 1、如果前置节点处于SIGNAL状态,那么当前线程进入阻塞状态,返回true
- 2、如果前置节点处于ws>0也就是取消状态,那么当前线程节点就往前查找第一个状态处于ws<=0的节点
- 3、如果前置状态ws=0的节点,那么就把前置节点设置为SIGNAL状态
- 4、整个shouldParkAfterFailedAcquire函数是在for()循环当中循环执行的,我们可以想象按照步骤2->3->1的顺序执行,按照前置遍历寻找合适的前置节点,接着发现前置节点ws状态为0后重新设置为SIGNAL,最后发现前置节点状态为SINGAL后休眠线程自身。
- 5、线程从运行态进入waiting状态其实也是经历了一系列的处理过程的。
// shouldParkAfterFailedAcquire外层for循环调用
// 第一次设置Node前置节点状态为SIGNAL
// 下一次循环就前置节点庄为SIGNAL,那么线程自身就需要被阻塞了
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 如果前继节点是SIGNAL状态,则意味这当前线程需要被阻塞。此时,返回true。
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
// ws>0代表线程被取消了
// static final int CANCELLED = 1;
// waitStatus value to indicate thread has cancelled
if (ws > 0) {
// 如果前驱处于取消状态,那就一直往前找,直到找到最近一个正常等待的状态,并排在它的后边
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 如果前继节点为“0”或者“共享锁”状态,则设置前继节点为SIGNAL状态。
// 状态为0的情况只可能是初始化的时候的默认值
// 当前线程进入等待状态的时候需要设置前置状态为SIGNAL
// SIGNAL状态表示后置线程需要被唤醒
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
// parkAndCheckInterrupt()的作用是阻塞当前线程,并且返回“线程被唤醒之后”的中断状态。
LockSupport.park(this);
return Thread.interrupted();
}
Node的介绍
- 1、Node节点作为CLH队列的节点元素,内部包含线程对象
- 2、Node节点包含多种状态,每种状态都在源码中注释了,默认初始化应该为0
- 3、Node节点是一个双向列表的节点,包含前置和后置节点的指针
- 4、Node节点处于AbstractQueuedSynchronizer类当中,其中AbstractQueuedSynchronizer包含state变量标记是否处于锁状态
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
protected AbstractQueuedSynchronizer() { }
/**
* +------+ prev +-----+ +-----+
* head | | <---- | | <---- | | tail
* +------+ +-----+ +-----+
*/
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
// 当前线程已被取消
static final int CANCELLED = 1;
// “当前线程的后继线程需要被unpark(唤醒)”。
// 一般发生情况是:当前线程的后继线程处于阻塞状态,
// 而当前线程被release或cancel掉,因此需要唤醒当前线程的后继线程。
static final int SIGNAL = -1;
// 当前线程(处在Condition休眠状态)在等待Condition唤醒
static final int CONDITION = -2;
// (共享锁)其它线程获取到“共享锁”,状态为0表示当前线程不属于上面的任何一种状态。
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
private transient volatile Node head;
private transient volatile Node tail;
private volatile int state;
}
解锁过程
release过程
- 1、通过tryRelease()方法尝试让当前线程释放锁对象
- 2、通过unparkSuccessor()方法设置当前节点状态ws=0并且唤醒CLH队列中的下一个等待线程
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease过程
- 1、如果占用锁线程非当前线程直接抛异常
- 2、递减锁计数后如果值为0那么就释放当前锁占用者
- 3、更新锁状态为未占用,即state为0
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
unparkSuccessor过程
1、设置当前Node状态为0
2、寻找下一个等待线程节点来唤醒等待线程并通过LockSupport.unpark()唤醒线程
3、寻找下一个等待线程,如果当前Node的下一个节点符合状态就直接进行唤醒,否则从队尾开始进行倒序查找,找到最优先的线程进行唤醒。
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
// 找到状态<0的线程进行唤醒
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
##参考文章
Java多线程:AQS源码分析