我画了35张图就是为了让你深入 AQS(一)

简介: 谈到并发,我们不得不说AQS(AbstractQueuedSynchronizer),所谓的AQS即是抽象的队列式的同步器,内部定义了很多锁相关的方法,我们熟知的ReentrantLock、ReentrantReadWriteLock、CountDownLatch、Semaphore等都是基于AQS来实现的。

  前言

谈到并发,我们不得不说AQS(AbstractQueuedSynchronizer),所谓的AQS即是抽象的队列式的同步器,内部定义了很多锁相关的方法,我们熟知的ReentrantLockReentrantReadWriteLockCountDownLatchSemaphore等都是基于AQS来实现的。

我们先看下AQS相关的UML图:

0.png思维导图(高清无损 AV 画质长图.pdf 关注公众号回复 AQS 获取)1.png



AQS实现原理

AQS中 维护了一个volatile int state(代表共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列)。

这里volatile能够保证多线程下的可见性,当state=1则代表当前对象锁已经被占有,其他线程来加锁时则会失败,加锁失败的线程会被放入一个FIFO的等待队列中,比列会被UNSAFE.park()操作挂起,等待其他获取锁的线程释放锁才能够被唤醒。

另外state的操作都是通过CAS来保证其并发修改的安全性。

具体原理我们可以用一张图来简单概括:

2.png


AQS 中提供了很多关于锁的实现方法,

  • getState():获取锁的标志state值
  • setState():设置锁的标志state值
  • tryAcquire(int):独占方式获取锁。尝试获取资源,成功则返回true,失败则返回false。
  • tryRelease(int):独占方式释放锁。尝试释放资源,成功则返回true,失败则返回false。

这里还有一些方法并没有列出来,接下来我们以ReentrantLock作为突破点通过源码和画图的形式一步步了解AQS内部实现原理。


目录结构

文章准备模拟多线程竞争锁、释放锁的场景来进行分析AQS源码:

三个线程(线程一、线程二、线程三)同时来加锁/释放锁

目录如下:

  • 线程一加锁成功时AQS内部实现
  • 线程二/三加锁失败时AQS中等待队列的数据模型
  • 线程一释放锁及线程二获取锁实现原理
  • 通过线程场景来讲解公平锁具体实现原理
  • 通过线程场景来讲解Condition中await()signal()实现原理

这里会通过画图来分析每个线程加锁、释放锁后AQS内部的数据结构和实现原理


场景分析

线程一加锁成功

如果同时有三个线程并发抢占锁,此时线程一抢占锁成功,线程二线程三抢占锁失败,具体执行流程如下:

3.png

此时AQS内部数据为:

4.png


线程二线程三加锁失败:

5.png


有图可以看出,等待队列中的节点Node是一个双向链表,这里SIGNALNodewaitStatus属性,Node中还有一个nextWaiter属性,这个并未在图中画出来,这个到后面Condition会具体讲解的。

具体看下抢占锁代码实现:

java.util.concurrent.locks.ReentrantLock .NonfairSync:

static final class NonfairSync extends Sync {
    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }
    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}

这里使用的ReentrantLock非公平锁,线程进来直接利用CAS尝试抢占锁,如果抢占成功state值回被改为1,且设置对象独占锁线程为当前线程。如下所示:

protected final boolean compareAndSetState(int expect, int update) {
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
protected final void setExclusiveOwnerThread(Thread thread) {
    exclusiveOwnerThread = thread;
}

线程二抢占锁失败

我们按照真实场景来分析,线程一抢占锁成功后,state变为1,线程二通过CAS修改state变量必然会失败。此时AQSFIFO(First In First Out 先进先出)队列中数据如图所示:

6.png

我们将线程二执行的逻辑一步步拆解来看:

java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire():

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

先看看tryAcquire()的具体实现:java.util.concurrent.locks.ReentrantLock .nonfairTryAcquire():

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

nonfairTryAcquire()方法中首先会获取state的值,如果不为0则说明当前对象的锁已经被其他线程所占有,接着判断占有锁的线程是否为当前线程,如果是则累加state值,这就是可重入锁的具体实现,累加state值,释放锁的时候也要依次递减state值。

如果state为0,则执行CAS操作,尝试更新state值为1,如果更新成功则代表当前线程加锁成功。

线程二为例,因为线程一已经将state修改为1,所以线程二通过CAS修改state的值不会成功。加锁失败。

线程二执行tryAcquire()后会返回false,接着执行addWaiter(Node.EXCLUSIVE)逻辑,将自己加入到一个FIFO等待队列中,代码实现如下:

java.util.concurrent.locks.AbstractQueuedSynchronizer.addWaiter():

private Node addWaiter(Node mode) {    
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

这段代码首先会创建一个和当前线程绑定的Node节点,Node为双向链表。此时等待对内中的tail指针为空,直接调用enq(node)方法将当前线程加入等待队列尾部:

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) {
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

第一遍循环时tail指针为空,进入if逻辑,使用CAS操作设置head指针,将head指向一个新创建的Node节点。此时AQS中数据:

7.png


执行完成之后,headtailt都指向第一个Node元素。

接着执行第二遍循环,进入else逻辑,此时已经有了head节点,这里要操作的就是将线程二对应的Node节点挂到head节点后面。此时队列中就有了两个Node节点:

8.png

addWaiter()方法执行完后,会返回当前线程创建的节点信息。继续往后执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)逻辑,此时传入的参数为线程二对应的Node节点信息:

java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued():

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndChecknIterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        return true;
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

acquireQueued()这个方法会先判断当前传入的Node对应的前置节点是否为head,如果是则尝试加锁。加锁成功过则将当前节点设置为head节点,然后空置之前的head节点,方便后续被垃圾回收掉。

如果加锁失败或者Node的前置节点不是head节点,就会通过shouldParkAfterFailedAcquire方法 将head节点的waitStatus变为了SIGNAL=-1,最后执行parkAndChecknIterrupt方法,调用LockSupport.park()挂起当前线程。

此时AQS中的数据如下图:

9.png

此时线程二就静静的待在AQS的等待队列里面了,等着其他线程释放锁来唤醒它。

线程三抢占锁失败

看完了线程二抢占锁失败的分析,那么再来分析线程三抢占锁失败就很简单了,先看看addWaiter(Node mode)方法:

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

此时等待队列的tail节点指向线程二,进入if逻辑后,通过CAS指令将tail节点重新指向线程三。接着线程三调用enq()方法执行入队操作,和上面线程二执行方式是一致的,入队后会修改线程二对应的Node中的waitStatus=SIGNAL。最后线程三也会被挂起。此时等待队列的数据如图:

11.png

相关文章
|
8月前
|
Java Perl
我画了近百张图来理解红黑树
我画了近百张图来理解红黑树
70 2
|
8月前
|
存储 安全 Java
《吊打面试官》从根上剖析ReentrantLock的来龙去脉
《吊打面试官》从根上剖析ReentrantLock的来龙去脉
|
8月前
|
消息中间件 安全 算法
通透!从头到脚讲明白线程锁
线程锁在分布式应用中是重中之重,当谈论线程锁时,通常指的是在多线程编程中使用的同步机制,它可以确保在同一时刻只有一个线程能够访问共享资源,从而避免竞争条件和数据不一致性问题。
339 0
|
8月前
|
Java API 开发者
高逼格面试:线程封闭,新名词√
高逼格面试:线程封闭,新名词√
66 0
|
存储 缓存 监控
|
设计模式 Java API
终于弄懂AQS了
大家好,我是三友~~ 相信大家对Java中的Lock锁应该不会陌生,比如ReentrantLock,锁主要是用来解决解决多线程运行访问共享资源时的线程安全问题。那你是不是很好奇,这些Lock锁api是如何实现的呢?本文就是来探讨一下这些Lock锁底层的AQS(AbstractQueuedSynchronizer)到底是如何实现的。
|
存储 监控 Java
7000字+24张图带你彻底弄懂线程池(2)
7000字+24张图带你彻底弄懂线程池(2)
|
消息中间件 缓存 监控
7000字+24张图带你彻底弄懂线程池(1)
7000字+24张图带你彻底弄懂线程池(1)
【10张图】管程内部,进去看看
【10张图】管程内部,进去看看
171 0
【10张图】管程内部,进去看看
|
设计模式 算法 Java

热门文章

最新文章