一 同步锁
在并发编程中,我们经常用到的是synchronized
和ReentrantLock
。其中,synchronized
是jvm
内置锁,而ReentrantLock
位于java.util.concurrent
包下(以下简称JUC
),ReentrantLock
是基于AbstractQueuedSynchronizer
(以下简称AQS
)同步器框架实现的,本文主要来介绍AQS
的内部实现及在JUC
中基于AQS
实现的相关类。
二 AQS内部实现
AQS
是一个抽象类,内部维护一个state
变量(代表共享资源)、一个FIFO
等待队列(用来获取共享资源、线程排队管理等)。AQS
定义了两种访问共享资源的方式:Exclusive
(独占方式,每次只有一个线程访问资源,如:ReentrantLock
)、Share
(共享方式,多个线程可以同时访问资源,如:Semaphore
、CountDownLatch
等)。不管是独占方式还是共享方式,其具体实现只需要实现共享资源state
的获取和释放即可,等待队列的相关操作(如获取锁失败入队、唤醒出队等),AQS
已经实现好了。
注:虽然AQS
提供了独占和共享两种方式访问共享资源,但是两者并不一定是互斥的,还可以是独占和共享共存的方式访问共享资源,如ReentrantReadWriteLock
(后面单独分析)。
1、等待队列
Node
是AQS
中的一个静态内部类,上面说到AQS中维护了一个FIFO的双端等待队列,当获取锁失败时,AQS会将当前线程及等待状态等信息构造成一个节点Node,加入到等待队列的队尾,并且当前线程变成阻塞状态,等待唤醒。
//等待队列的Head节点
private transient volatile Node head;
//等待队列的Tail节点
private transient volatile Node tail;
static final class Node {
//共享锁对应节点
static final Node SHARED = new Node();
//独占锁对应节点
static final Node EXCLUSIVE = null;
//等待状态
volatile int waitStatus;
Node nextWaiter;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
//前驱节点
volatile Node prev;
//后继节点
volatile Node next;
//当前线程
volatile Thread thread;
final boolean isShared() {
return nextWaiter == SHARED;
}
//找到当前节点的前驱节点
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() {}
//在addWaiter()时调用
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
//在Condition中调用
Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
}
- waitStatus:当前线程在等待队列中的状态
waitStatus | 值 | 功能 |
---|---|---|
SIGNAL | -1 | 当前节点的后继节点被阻塞(通过park )时,设置其前驱节点的状态为SIGNAL 。标记为SIGNAL 节点的线程释放锁时就会通知后继节点,使得后继节点所在的线程被唤醒。这个状态一般是后继节点来设置前驱节点的。 |
CANCELLED | 1 | 由于超时(timeout )或中断(interrupt ),队列中此节点被取消。节点进入此状态后不会再变化。 |
CONDITION | -2 | 此节点当前位于条件队列(condition queue )中。当其他线程对这个Condition 调用signal 方法后,它会被转移同步队列(sync queue )中。 |
PROPAGATE | -3 | 已发布的节点应该传播到其他节点。这在doReleaseShared 中设置(仅针对head 节点),以确保传播继续进行,即使其他操作已经介入。 |
0 | 0 | 以上的都不是,默认初始无锁状态 |
- prev :等待队列(
Sync Queue
)中前驱节点 - next :等待队列(
Sync Queue
)中后继节点 - thread : 当前线程
- nextWaiter:条件队列(
Condition Queue
)的后继节点
注:等待队列(Sync Queue)在独占模式、共享模式中都会使用到,条件队列(Condition Queue)只会在独占锁中使用。
如:
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();
condition.await();
condition.signal();
通过ReentrantLock
可以构造Condition
,调用condition的await/signal
方法时会把当前线程在Condition Queue
中执行入队/出队操作。
当调用condition.await()
时,Node
节点就会从Sync Queue
中进入到Condition Queue
中,且对应的pre、next
都会被置为null
,waitStatus
变为CONDITION
,并通过nextWaiter
形成链表;
而当调用condition.signal()
时,Node
又会从Condition Queue
进入到Sync Queue
中,pre、next
重新设置,waitStatus
根据当前状态进行设置,nextWaiter
会被置为null
。
注:当调用多个lock.newCondition()
时,就会产生多个Condition
队列,即一个ReentrantLock
可以对应多个Condition Queue
。
2、状态指示器state
state
表示当前锁的状态,state=0
是初始状态,即无锁状态; 当state>0
表示已经有线程获得了锁,当同一个线程多次获得同步锁时(可重入性),state
会递增,而在释放锁的时候,state
会进行递减直到state=0
时,其他线程才有资格获取锁。独占模式下只有一个线程能够获取同步锁,而共享模式下可以有多个线程可以获取同步锁。
private volatile int state;
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
//通过CAS方式更新state值
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
继承AQS
的同步器主要实现:
独占锁需要实现:
isHeldExclusively()
:是否是独占资源,当用到condition
时需要实现它。tryAcquire(int arg)
:独占锁方式,尝试获取资源,成功返回true
,否则返回false
。tryRelease(int arg)
:独占锁方式,释放资源,如果释放操作使得所有获取同步器时被阻塞的线程恢复执行,那么返回的是true
,否则返回false
。
共享锁需要实现:
tryAcquireShared(int arg)
:共享方式获取资源,负数表示获取操作失败;0代表成功,但无剩余资源;正数代表成功,且有剩余资源。tryReleaseShared(int arg)
:共享方式尝试释放资源,如果释放后允许唤醒后续节点返回true
,否则返回false
。
我们来看下上述方法在AQS
中的实现:
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
无论是独占锁还是共享锁,获取锁和释放锁在AQS
中都是没有实现的,需要在自定义同步器中实现
独占锁相关:
acquire(int arg)
: 独占模式下尝试获取共享资源,忽略中断操作。获取成功返回true
,否则线程进入等待队列中,等待被唤醒后继续尝试获取共享资源直到成功为止。本方法可以在实现Lock.lock()
中使用。acquire()
在AQS
中的源码:
//tryAcquire尝试获取资源(在子类中实现),如成功直接返回,否则通过addWaiter及acquireQueued将该线程加入到等待队列的队尾,标记为独占模式,如果该线程在等待过程中中断过,acquireQueued方法会返回true,否则返回false。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//如果tail为空(即队列为空),或通过CAS设置node到队尾失败,继续通过enq()循环设置node到队尾,直至成功为止
enq(node);
return node;
}
//将node设置到队列尾部
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
/**
*1、当前node节点的前驱节点是SIGNAL状态,返回true,意味着当前线程会被挂起,阻塞等待。
*2、如果前驱节点是CANCEL状态(waitStatus>0),跳过此节点并删除,继续往上找,直到找到不是CANCEL状态的节点作为其前驱节点为止;如果前驱节点是0或者PROPAGATE状态,则将前驱节点设置为SIGNAL状态,则在下一次执行循环时shouldParkAfterFailedAcquire可以直接返回true,挂起线程。
**/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
acquireInterruptibly(int arg)
: 同acquire
方法,区别在于如果收到中断通知会直接中断。本方法可以在Lock.lockInterruptibly()
中使用。
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
三 JUC中基于AQS的相关实现
基于AQS
构建的同步器类中,最基本的操作就是各种形式的获取操作和释放操作。获取操作是一种依赖状态的操作,通常会阻塞;释放操作时被阻塞的线程会重新开始执行。前面也说到了,AQS
通过管理state
变量来进行相关操作,跟state
相关的方法有getState
、setState
、compareAndSetState
,并且state
在不同实现AQS的子类中代表的意思也各不相同。如:state 在 ReentrantLock中代表的是锁所有者线程重复获取该锁的次数;Semaphore 中 state代表剩余的许可数量;FutureTask中state用来表示任务的状态(未开始、正在运行、已完成、已取消)。
ReentrantLock
:只支持独占方式获取锁,所以内部只实现了tryAcquire
、tryRelease
、isHeldExclusively
。ReentrantLock
利用AQS
对多个条件变量、多个等待线程集内置支持。Lock.newCondition
返回一个新的ConditionObject
实例。CountDownLatch
:用于保存当前许可的数量。获取操作意味着“等待并直到闭锁到达结束状态”Semaphore
:Semaphore
翻译为信号量,是一种共享锁,可以同时允许一个或多个线程同时共享资源。CyclicBarrier
:CyclicBarrier
意为循环栅栏,可以实现一组线程等待至某个状态之后再全部继续执行。当所有线程执行完毕后,CyclicBarrier
还可以继续被重用。ReentrantReadWriteLock
:ReentrantReadWriteLock
是独占锁(写锁)、共享锁(读锁)可以同时存在的一种读写锁,在读操作远大于写操作的场景中,能实现更好的并发性。ThreadPoolExecutor
:线程池,内部实现使用的ReentrantLock
。另外Android
中AsyncTask
内部实现即是ThreadPoolExecutor
。
四 总结
获取锁的流程:
AQS
的模板方法acquire
通过调用子类自定义实现的tryAcquire
获取锁;- 如果获取锁失败,通过
addWaiter
方法将线程构造成Node
节点插入到同步队列队尾; - 在
acquirQueued
方法中以自旋的方法尝试获取锁,如果失败则判断是否需要将当前线程阻塞,如果需要阻塞则最终执行LockSupport(Unsafe)
中的native API
来实现线程阻塞。
释放锁的流程:
- 首先获取当前节点(实际上传入的是
head
节点)的状态,如果head
节点的下一个节点是null
,或者下一个节点的状态为CANCEL
(不用再唤醒了),则直接从等待队列的尾部开始遍历,一直遍历到最前面的waitStatus
小于等于 0 的节点(大于0的状态是CANCEL
状态)。 - 如果最终遍历到的节点不为
null
,再调用LockSupport.unpark
方法,调用底层方法唤醒线程。
五 参考
【1】并发Lock之AQS(AbstractQueuedSynchronizer)详解
【2】[逐行分析AQS源码(1)——独占锁的获取
](https://segmentfault.com/a/1190000015739343)