阿里P7架构师带你深入剖析synchronized的实现原理

简介: 显示锁ReentrantLock的内部同步依赖于AQS(AbstractQueuedSynchronizer),因此,分析ReentrantLock必然涉及AQS。

显示锁ReentrantLock的内部同步依赖于AQS(AbstractQueuedSynchronizer),因此,分析ReentrantLock必然涉及AQS。

本文假设读者已熟练掌握AQS的基本原理(参考AQS的基本原理),通过分析ReentrantLock#lock()与ReentrantLock#unlock()的实现原理,用实例帮助读者理解AQS的等待队列模型。

JDK版本:oracle java 1.8.0_102

接口声明

public interface Lock {
void lock();

void lockInterruptibly() throws InterruptedException;

boolean tryLock();

boolean tryLock(long time, TimeUnit unit) throws InterruptedException;

void unlock();

Condition newCondition();

}

ReentrantLock对标内置锁,实现了Lock接口。忽略Condition相关,主要提供lock、unlock两种语义,和两种语义的衍生品。
实现原理
继承AQS

本文中的“继承”指“扩展extend”。

AQS复习

AQS并提供了多个未实现的protected方法,留给作者覆写以开发不同的同步器:

public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
...
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
...
}

而其他非私有方法则使用final修饰,禁止子类覆写。
继承

ReentrantLock支持公平、非公平两种策略,并通过继承AQS实现了对应两种策略的同步器NonfairSync与FairSync。ReentrantLock默认使用非公平策略,即NonfairSync:

public ReentrantLock() {
    sync = new NonfairSync();
}
...
static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;

    final void lock() {
        ...
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}
abstract static class Sync extends AbstractQueuedSynchronizer {
    ...
    abstract void lock();

    final boolean nonfairTryAcquire(int acquires) {
        ...
    }

    protected final boolean tryRelease(int releases) {
        ...
    }

    final ConditionObject newCondition() {
        return new ConditionObject();
    }
    ...
}
...

先不追究细节。下面以默认的非公平策略为例,讲解lock和unlock的实现。
lock

public void lock() {
    sync.lock();
}

非公平策略下,sync指向一个NonfairSync实例。

static final class NonfairSync extends Sync {
    ...
    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }
    ...
}

ReentrantLock用state表示“所有者线程已经重复获取该锁的次数”。当state等于0时,表示当前没有线程持有该锁,因此,将state CAS设置为1,并记录排他的所有者线程ownerThread(ownerThread只会在0->1及1->0两次状态转换中修改);否则,state必然大于0,则尝试再获取一次锁。ownerThread将在state大于0时,用于判断重入性。

    排他性:如果线程T1已经持有锁L,则不允许除T1外的任何线程T持有该锁L
    重入性:如果线程T1已经持有锁L,则允许线程T1多次获取锁L,更确切的说,获取一次后,可多次进入锁。

二者结合,描述了ReentrantLock的一个性质:允许ownerThread重入,不允许其他线程进入或重入。

acquire

public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
...
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
...
}

改写:

public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
...
public final void acquire(int arg) {
if (tryAcquire(arg)) {
return;
}
Node newNode = addWaiter(Node.EXCLUSIVE);
boolean interrupted = acquireQueued(newNode, arg);
if (interrupted) {
selfInterrupt();
}
}
...
}

首先,通过tryAcquire()尝试获取锁。按照AQS的约定,tryAcquire()返回true表示获取成功,可直接返回;否则获取失败。如果获取失败,则向等待队列中添加一个独占模式的节点,并通过acquireQueued()阻塞的等待该节点被调用(即当前线程被唤醒)。如果是因为被中断而唤醒的,则复现中断信号。

tryAcquire

NonfairSync覆写了AQS#tryAcquire():

static final class NonfairSync extends Sync {
    ...
    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
    ...
}
abstract static class Sync extends AbstractQueuedSynchronizer {
    ...
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
    ...
}

12-17行重复了NonfairSync#lock()中state=0时的状态转换。18行进行排他性判断,如果当前线程等于ownerThread,则直接返回false。否则,19-22行进行重入,state加1(acquires=1),表示所有者线程重复获取该锁的次数增加1。

实际上,NonfairSync#lock()不需要特殊处理state=0时的状态转换。可通过NonfairSync#tryAcquire()、Sync#nonfairTryAcquire()完成。

为什么19-22行不需要同步

注意,如果18行判断当前线程等于ownerThread,则根据程序顺序规则,19-22行不需要同步。因为同一线程中,第二次调用NonfairSync#tryAcquire()时(会进入19-22行),第一次调用锁写入的state、ownerThread一定是可见的。

为什么要用state表示重入次数

如果没有记录重入次数,则第一次释放锁时,会一次性把ownerThread多次重入的锁都释放掉,而此时“锁中的代码”还没有执行完成,造成混乱。

addWaiter

如果tryAcquire()获取失败,则要通过AQS#addWaiter()向等待队列中添加一个独占模式的节点,并返回该节点:

public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
...
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
...
}

AQS#enq()中重复了9-15行的逻辑,直接看enq()。

如果尾指针为null,则头指针也一定为null,表示等待队列未初始化,就CAS初始化队列(常见于无阻塞队列的设计中,如源码|并发一枝花之BlockingQueue),然后继续循环。如果尾指针非null,则队列已初始化,就CAS尝试在尾节点后插入新的节点node。

在插入过程中,会出现“node.prev指向旧的尾节点,但旧的尾节点.next为null未指向node(尽管,尾指针指向node)”的状态,即“队列在prev方向一致,next方向不一致”。记住该状态,分析ReentrantLock#unlock()时会用到。

最后,enq()返回旧的尾节点。但外层的AQS#addWaiter()仍然返回新节点node。

队列刚完成初始化时,存在一个dummy node。插入节点时,tail后移指向新节点,head不变仍然指向dummy node。直到调用AQS#acquireQueued()时,head才会后移,消除了dummy node,后面分析。

acquireQueued

插入新节点node后,通过AQS#acquireQueued()阻塞的等待该节点被调用(即当前线程被唤醒):

public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
...
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
...
}

该方法是lock过程的核心难点,需要结合AQS#addWaiter()理解AQS内部基于等待队列的同步模型。

AQS的核心是状态依赖,可概括为两条规则:

当状态还没有满足的时候,节点会进入等待队列。
特别的,获取成功的节点成为队列的头结点。

首先,AQS#addWaiter()会将新节点node加入队尾(维护规则“当状态还没有满足的时候,节点会进入等待队列”),然后,AQS#acquireQueued()检查node的前继节点是否是头节点。如果是,则尝试获取锁;如果不是,或获取所失败,都会尝试阻塞等待。

如果11行获取锁成功,则更新头节点(维护规则“获取成功的节点成为队列的头结点”),修改failed标志,并返回interrupted标志。interrupted初始化为false,可能在17-19行被修改。

初始化队列后的第一次更新头结点,直接setHead消除了dummy node。消除之后,实际节点代替了dummy node的作用,但与dummy node不同的是,该节点是持有锁的。

如果11行判断前继节点不是头节点或获取锁失败,则进入17-19行。AQS.shouldParkAfterFailedAcquire()判断是否需要阻塞等待,如果需要,则通过AQS#parkAndCheckInterrupt()阻塞等待,直到被唤醒或被中断。

shouldParkAfterFailedAcquire

AQS.shouldParkAfterFailedAcquire()根据pred.waitStatus判断新节点node是否应该被阻塞:

public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
...
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
...
}

AQS#addWaiter()构造新节点时,pred.waitStatus使用了默认值0。此时,进入14-16行,CAS设置pred.waitStatus为SIGNAL==-1。最后返回false。

回到AQS#acquireQueued()中后,由于AQS#parkAndCheckInterrupt()返回false,循环会继续进行。假设node的前继节点pred仍然不是头结点或锁获取失败,则会再次进入AQS#parkAndCheckInterrupt()。上一轮循环中,已经将pred.waitStatus设置为SIGNAL==-1,则这次会进入7-8行,直接返回true,表示应该阻塞。

什么时候会遇到ws > 0的case呢?当pred所维护的获取请求被取消时,pred.waitStatus会被设置为CANCELLED==1,从而进入9-14行。改写:

if (ws > 0) {
do {
pred = pred.prev;
node.prev = pred;
} while (pred.waitStatus > 0);
pred.next = node;
}

逻辑很简单,循环移除所有被取消的前继节点pred,直到找到未被取消的pred。移除所有被取消的前继节点后,直接返回false。

注意,在执行6行之前,队列处于“node.prev指向最新的前继节点,但pred.next指向已经移除的后继节点”的状态,即“队列在prev方向一致,next方向不一致”。记住该状态,分析ReentrantLock#unlock()时会用到。

此处不需要检查前继节点是否为null。因为等待队列的头结点要么是dummy node,满足dummy.waitStatus==0;要么是刚替换的real node,满足real.waitStatus==0;要么是后继节点已经阻塞的节点,满足real.waitStatus==SIGNAL==-1。则最晚遍历到头结点时,一定会退出循环,不会出现pred为null的情况。

回到AQS#acquireQueued()后,重新检查前继节点是否为头节点,并作出相应处理。

经过多次循环执行AQS.shouldParkAfterFailedAcquire()后,等待队列趋于稳定。最终的稳定状态为:

除了头节点,剩余节点都会返回true,表示需要阻塞等待。
除了尾节点,剩余节点都满足waitStatus==SIGNAL,表示释放后需要唤醒后继节点。

parkAndCheckInterrupt

public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
...
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
...
}

AQS#parkAndCheckInterrupt()借助LockSupport.park()实现阻塞等待。最后调用Thread.interrupted()检查是否被中断,并清除中断状态,并返回中断标志。

如果是被中断的,则需要在外层AQS#acquireQueued()中重新设置中断标志interrupted,并在下一次循环中返回。然后在更外层的AQS#acquire()中调用AQS.selfInterrupt()重放中断。

为什么不能直接在AQS#parkAndCheckInterrupt()返回后中断?因为返回中转标志能提供更大的灵活性,外界可以自行决定是即时重放、稍后重放还是压根不重放。Condition在得知AQS#acquireQueued()是被中断的之后,便没有直接复现中断,而是根据REINTERRUPT配置决定是否重放。

cancelAcquire

最后,如果在执行AQS#acquire()的过程中抛出任何异常,则取消任务:

public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
...
private void cancelAcquire(Node node) {
...
node.waitStatus = Node.CANCELLED;
...
}
...
}

因此,如果指考虑ReentrantLock#lock()方法的话,那么被标记为CACELLED状态的节点一定在获取锁时抛出了异常,AQS.shouldParkAfterFailedAcquire()中清理了这部分CACELLED节点。

超时版ReentrantLock#tryLock()中,还可以由于超时触发取消。

lock小结

ReentrantLock#lock()收敛后,AQS内部的等待队列如图:

image.png
unlock

ReentrantLock#unlcok()与ReentrantLock#lcok()是对偶的。

public void unlock() {
    sync.release(1);
}

获取锁以单位1进行,释放锁时也以单位1进行。
release

public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
...
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
...
}

首先,通过tryRelease()尝试释放锁。按照AQS的约定,tryRelease()返回true表示已完全释放,可唤醒所有阻塞线程;否则没有完全释放,不需要唤醒。如果已完全释放,则只需要唤醒头结点的后继节点,该节点的ownerThread必然与头结点不同(如果相同,则之前lock时能够重入,不需要排队);否则没有完全释放,不需要唤醒任何节点。

对于独占锁,“完全释放”表示ownerThread的所有重入操作均已结束。

解释8行的判断

如果h == null,则队列还未初始化(回忆AQS#enq())。如果h.waitStatus == 0,则要么刚刚初始化队列,只有一个dummy node,没有后继节点(回忆AQS#enq());要么后继节点还没被阻塞,不需要唤醒(回忆等待队列的稳定状态)。

tryRelease

对照tryAcquire()分析tryRelease():

abstract static class Sync extends AbstractQueuedSynchronizer {
    ...
    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }
    ...
}

5-6行很重要。如果存在某线程持有锁,则可以检查unlock是否被ownerThread触发;如果不存在线程持有锁,则ownerThread==null,可以检查是否在未lock的情况下进行unlock,或者重复执行了unlock。

因此,使用ReentrantLock时,try-finally要这么写:

Lock lock = new ReentrantLock();
lock.lock();
try {
 // do sth
} finally {
 lock.unlcok();
}

确保在调用lock()成功之后,才能调用unlock()。

接下来,8行判断是否将要进行1->0的状态转换,如果是,则可以完全释放锁,将ownerThread置为null。然后设置state。

最后,返回是否可完全释放的标志free。

可见性问题

为了抓住核心功能,前面一直忽略了一个很重要的问题——可见性。忽略可见性问题的话,阅读源码基本没有影响,但自己实现同步器时将带来噩梦。

以此处为例,是应该先执行8-11行,还是先执行12行呢?或者是无所谓呢?

为保障可见性,必须先执行8-11行,再执行12行。因为exclusiveOwnerThread的可见性要借助于(piggyback)于volatile变量state:

...
private transient Thread exclusiveOwnerThread;
...
private volatile int state;
...

配套的,也必须先读state,再读exclusiveOwnerThread:

abstract static class Sync extends AbstractQueuedSynchronizer {
    ...
    final boolean nonfairTryAcquire(int acquires) {
        ...
        int c = getState(); // 先读state
        if (c == 0) {
            ...
        }
        else if (current == getExclusiveOwnerThread()) {    // 再读exclusiveOwnerThread
            ...
        }
        return false;
    }
    ...
}

核心是三条Happens-Before规则:

程序顺序规则:如果程序中操作A在操作B之前,那么在线程中操作A将在操作B之前执行。
传递性:如果操作A在操作B之前执行,并且操作B在操作C之前执行,那么操作A必须在操作C之前执行。
volatile变量规则:对volatile变量的写入操作必须在对该变量的读操作之前执行。

具体来说,“先写exclusiveOwnerThread再写state;先读state再读exclusiveOwnerThread”的方案,保证了“在读state之后,发生在写state之前的写exclusiveOwnerThread操作对发生在读state之后的读exclusiveOwnerThread操作一定是可见的”。

程序顺序规则、传递性两条基本规则,经常与监视器锁规则、volatile变量规则显示的搭配,一定要掌握。

相对的,线程启动规则、线程结束规则、中断规则、终结器规则则通常被隐式的使用。

unparkSuccessor

public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
...
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}
...

}

对lock过程的分析中,我们得知,队列中所有节点的waitStatus要么为0,要么为SIGNAL==-1。当node.waitStatus==SIGNAL时,表示node的后继节点s已被阻塞或正在被阻塞。现在需要唤醒s,则7-8行将node.waitStatus置0。

注意,node.waitStatus一定不为CANCELLED==1,因为如果lock()方法没有执行成功,就无法通过unlock()方法调用AQS#unparkSuccessor()。

接下来,10-18行从尾节点向前遍历,找到node后最靠近node的未取消的节点,如果存在该节点s(s!=null),就唤醒s.thread以竞争锁。

一致性问题

为什么要从尾节点向前遍历,而不能从node向后遍历?这是因为,AQS中的等待队列基于一个弱一致性双向链表实现,允许某些时刻下,队列在prev方向一致,next方向不一致。

理想情况下,队列每时每刻都处于一致的状态(强一致性模型),从node向后遍历找第一个未取消节点是更高效的做法。然而,维护一致性通常需要牺牲部分性能,为了进一步的提升性能,脑洞大开的神牛们想出了各种高性能的弱一致性模型。尽管模型允许了更多弱一致状态,但所有弱一致状态都在控制之下,不会出现一致性问题。

回忆lock过程的分析,有两个地方出现了这个弱一致状态:

AQS#enq()插入新节点(包括AQS#addWaiter())的过程中,旧的尾节点next为null未指向新节点。对应条件s == null。如图:

image.png

AQS.shouldParkAfterFailedAcquire()移除CACELLED节点的过程中,中间节点指向已被移除的CACELLED节点。对应条件s.waitStatus > 0。如图:

image.png

因此,从node开始,沿着next方向向后遍历是行不通的。只能从tail开始,沿着prev方向向前遍历,直到找到未取消的节点(s != null),或遍历完node的所有后继子孙(s == null)。

当然,s == null也可能表示node恰好是尾节点,该状态是强一致的,但仍然可以复用该段代码。

unlock小结

ReentrantLock#unlock()收敛后,AQS内部的等待队列如图:

总结

本文的目标是借助对ReentrantLock#lock()与ReentrantLock#unlock()的分析,理解AQS的等待队列模型。

实际上,ReentrantLock还有一些重要的特性和API,如ReentrantLock#lockInterruptibly()、ReentrantLock#newCondition()。后面分先后分析这两个API,加深对AQS的理解。

希望对大家有所帮助,有用的话点赞给我支持!

相关文章
|
5月前
|
监控
阿里商旅账单系统架构设计实践问题之对账模型包括内容问题如何解决
阿里商旅账单系统架构设计实践问题之对账模型包括内容问题如何解决
|
1月前
|
监控 API 调度
开放源代码平台Flynn的架构与实现原理
【10月更文挑战第21天】应用程序的生命周期涉及从开发到运行的复杂过程,包括源代码、构建、部署和运行阶段。
|
7月前
|
消息中间件 分布式计算 中间件
秀出天际!阿里甩出的988页分布式微服务架构进阶神仙手册我粉了
秀出天际!阿里甩出的988页分布式微服务架构进阶神仙手册我粉了
|
5月前
|
数据格式
阿里商旅账单系统架构设计实践问题之系统设计中的清结算系统问题如何解决
阿里商旅账单系统架构设计实践问题之系统设计中的清结算系统问题如何解决
|
7月前
|
存储 运维 5G
基于阿里云数据库 SelectDB 内核 Apache Doris 的实时/离线一体化架构,赋能中国联通 5G 全连接工厂解决方案
数据是 5G 全连接工厂的核心要素,为支持全方位的数据收集、存储、分析等工作的高效进行,联通 5G 全连接工厂从典型的 Lambda 架构演进为 All in [Apache Doris](https://c.d4t.cn/vwDf8R) 的实时/离线一体化架构,并凭借 Doris 联邦查询能力打造统一查询网关,数据处理及查询链路大幅简化,为联通 5G 全连接工厂带来数据时效性、查询响应、存储成本、开发效率全方位的提升。
基于阿里云数据库 SelectDB 内核 Apache Doris 的实时/离线一体化架构,赋能中国联通 5G 全连接工厂解决方案
|
5月前
|
搜索推荐 Java
阿里商旅账单系统架构设计实践问题之需要账单数据表达式引擎问题如何解决
阿里商旅账单系统架构设计实践问题之需要账单数据表达式引擎问题如何解决
|
5月前
|
监控 供应链 搜索推荐
阿里商旅账单系统架构设计实践问题之账单详情数据未同步会带来问题如何解决
阿里商旅账单系统架构设计实践问题之 账单详情数据未同步会带来问题如何解决
|
5月前
|
存储 搜索推荐
阿里商旅账单系统架构设计实践问题之差错处理(平账)的主要目的问题如何解决
阿里商旅账单系统架构设计实践问题之差错处理(平账)的主要目的问题如何解决
|
7月前
|
Android开发
Android Jetpack架构开发组件化应用实战,字节跳动+阿里+华为+腾讯等大厂Android面试题
Android Jetpack架构开发组件化应用实战,字节跳动+阿里+华为+腾讯等大厂Android面试题
|
7月前
|
NoSQL Java 关系型数据库
爱了!阿里高工纯手打金三银四Java架构面试大全,涵盖近年来1000余道大厂面试真题
爱了!阿里高工纯手打金三银四Java架构面试大全,涵盖近年来1000余道大厂面试真题
下一篇
DataWorks