别走!这里有个笔记:图文讲解 AQS ,一起看看 AQS 的源码……(图文较长)(一)

简介: AbstractQueuedSynchronizer 抽象队列同步器,简称 AQS 。是在 JUC 包下面一个非常重要的基础组件,JUC 包下面的并发锁 ReentrantLock CountDownLatch 等都是基于 AQS 实现的。所以想进一步研究锁的底层原理,非常有必要先了解 AQS 的原理。


前言


AbstractQueuedSynchronizer 抽象队列同步器,简称 AQS 。是在 JUC 包下面一个非常重要的基础组件,JUC 包下面的并发锁 ReentrantLock CountDownLatch 等都是基于 AQS 实现的。所以想进一步研究锁的底层原理,非常有必要先了解 AQS 的原理。


介绍


先看下 AQS 的类图结构,以及源码注释,有一定的大概了解之后再从源码入手,一步一步研究它的底层原理。

" 源码注释


提供了实现阻塞锁和相关同步器依靠先入先出(FIFO)等待队列(信号量,事件等)的框架。 此类中设计了一个对大多数基于 AQS 的同步器有用的原子变量来表示状态(state)。 子类必须定义 protected 方法来修改这个 state,并且定义 state 值在对象中的具体含义是 acquired 或 released。 考虑到这些,在这个类中的其他方法可以实现所有排队和阻塞机制。 子类可以保持其他状态字段,但只能使用方法 getState 、setState 和 compareAndSetState 以原子方式更新 state 。


子类应被定义为用于实现其封闭类的同步性能的非公共内部辅助类。 类AbstractQueuedSynchronizer没有实现任何同步接口。 相反,它定义了一些方法,如 acquireInterruptibly 可以通过具体的锁和相关同步器来调用适当履行其公共方法。


此类支持独占模式和共享模式。 在独占模式下,其他线程不能获取成功,共享模式下可以(但不一定)获取成功。 此类不“理解”,在机械意义上这些不同的是,当共享模式获取成功,则下一个等待的线程(如果存在)也必须确定它是否能够获取。 线程在不同模式下的等待共享相同的FIFO队列。 通常情况下,实现子类只支持其中一种模式,但同时使用两种模式也可以,例如ReadWriteLock 。 仅共享模式不需要定义支持未使用的模式的方法的子类。


这个类中定义了嵌套类 AbstractQueuedSynchronizer.ConditionObject ,可用于作为一个 Condition 由子类实现,并使用 isHeldExclusively 方法说明当前线程是否以独占方式进行,release()、 getState() acquire() 方法用于操作 state 原子变量。

此类提供检查和监视内部队列的方法,以及类似方法的条件对象。 根据需要进使用以用于它们的同步机制。


要使用这个类用作同步的基础上,需要重新定义以下方法,如使用,通过检查和或修改 getState 、setState 或 compareAndSetState 方法:

tryAcquire tryRelease tryAcquireShared tryReleaseShared isHeldExclusively

"


通过上面的注释可以得出大概的印象:

  1. 内部依靠先入先出(FIFO) 等待队列。
  2. 存在 state 表示状态信息。state 值只能用 getState 、setState 和 compareAndSetState 方法以原子方式更新。
  3. 支持独占模式和共享模式,但具体需要子类实现具体支持哪种模式。
  4. 嵌套 AbstractQueuedSynchronizer.ConditionObject 可以作为 Condition 由子类实现。
  5. 子类需要重新定义 tryAcquire、tryRelease、tryAcquireShared、tryReleaseShared、isHeldExclusively 方法。


队列节点 Node

Node节点,包含以下元素:

元素 含义
prev 上一个节点
next 下一个节点
thread 持有线程
waitStatus 节点状态
nextWaiter 下一个处于 CONDITION 状态的节点


组合成等待队列则如下:


下面是等待队列节点的 Node 属性:

static final class Node {
    // 节点正在共享模式下等待的标记
    static final Node SHARED = new Node();
    // 指示节点正在以独占模式等待的标记
    static final Node EXCLUSIVE = null;
    // 指示线程已取消
    static final int CANCELLED =  1;
    // 指示后续线程需要唤醒
    static final int SIGNAL    = -1;
    // 指示线程正在等待条件
    static final int CONDITION = -2;
    // 指示下一次acquireShared应该无条件传播
    static final int PROPAGATE = -3;
    /**
     * 状态字段,仅使用以下值
     * SIGNAL -1 :当前节点释放或者取消时,必须 unpark 他的后续节点。
     * CANCELLED 1 :由于超时(timeout)或中断(interrupt),该节点被取消。节点永远不会离开此状态。特别是,具有取消节点的线程永远不会再次阻塞。
     * CONDITION -2 :该节点目前在条件队列。 但它不会被用作同步队列节点,直到转移,转移时的状态将被设置为 0 。
     * PROPAGATE -3 :releaseShared 应该被传播到其他节点。 
     * 0:都不是
     * 值以数字表示以简化使用,大多数时候可以检查符号(是否大于0)以简化使用
     */
    volatile int waitStatus;
    // 上一个节点
    volatile Node prev;
    // 下一个节点
    volatile Node next;
    // 节点持有线程
    volatile Thread thread;
    // 链接下一个等待条件节点,或特殊值共享
    Node nextWaiter;
    // 节点是否处于 共享状态 是, 返回 true
    final boolean isShared() {
        return nextWaiter == SHARED;
    }
    // 返回前一个节点, 使用时 前一个节点不能为空
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }
    Node() {    // Used to establish initial head or SHARED marker
    }
    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }
    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}


在 Node 节点中需要重点关注 waitStatus

  1. 默认状态为 0;
  2. waitStatus > 0 (CANCELLED 1) 说明该节点超时或者中断了,需要从队列中移除;
  3. waitStatus = -1 SIGNAL 当前线程的前一个节点的状态为 SIGNAL,则当前线程需要阻塞(unpark);
  4. waitStatus = -2 CONDITION -2 :该节点目前在条件队列;
  5. waitStatus = -3 PROPAGATE -3 :releaseShared 应该被传播到其他节点,在共享锁模式下使用。

了解完 Node 的结构之后,再了解下 AQS 结构,并从常用方法入手,逐步了解具体实现逻辑。


AbstractQueuedSynchronizer

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
    // 等待队列的头,延迟初始化。 除了初始化,它是仅经由方法setHead修改。 注意:如果头存在,其waitStatus保证不会是 CANCELLED 状态
    private transient volatile Node head;
    // 等待队列的尾部,延迟初始化。 仅在修改通过方法ENQ添加新节点等待。
    private transient volatile Node tail;
    // 同步状态 
    private volatile int state;
    // 获取状态
    protected final int getState() {
        return state;
    }
    // 设置状态值
    protected final void setState(int newState) {
        state = newState;
    }
    // 原子更新状态值
    protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }
}


在 AQS 中主要参数为:

参数 含义
head 等待队列头
tail 等待队列尾
state 同步状态


通过注释了解到,在 AQS 里主要分为两种操作模式,分别是:独占模式、共享模式,下面分别从两个不同的角度去分析源码。

操作 含义
acquire 以独占模式获取,忽略中断。 通过调用至少一次实施tryAcquire ,在成功时返回。 否则,线程排队,可能重复查封和解封,调用tryAcquire直到成功为止。 这种方法可以用来实现方法Lock.lock 。
release 以独占模式释放。 通过疏通一个或多个线程,如果实现tryRelease返回true。 这种方法可以用来实现方法Lock.unlock 。
acquireShared 获取在共享模式下,忽略中断。 通过至少一次第一调用实现tryAcquireShared ,在成功时返回。 否则,线程排队,可能重复查封和解封,调用tryAcquireShared直到成功为止。
releaseShared 以共享模式释放。 通过疏通一个或多个线程,如果实现tryReleaseShared返回true。

无论是共享模式还是独占模式在这里面都会用到 addWaiter 方法,将当前线程及模式创建排队节点。


独占模式

获取独占资源 acquire

public final void acquire(int arg) {
    // tryAcquire 尝试获取 state,获取失败则会加入到队列
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}


在独占模式下会尝试获取 state,当获取失败时会调用 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)。

  1. tryAcquire(arg),尝试获取 state 这块由子类自己实现,不同的子类逻辑不同,这块在介绍子类代码时会说明。
  2. 获取 state 失败后,会进行 acquireQueued(addWaiter(Node.EXCLUSIVE), arg),这块代码可以拆分为两块:addWaiter(Node.EXCLUSIVE),acquireQueued(node, arg)。
  3. addWaiter(Node.EXCLUSIVE) 返回的是当前新创建的节点。
  4. acquireQueued(node, arg) 线程获取锁失败,使用 addWaiter(Node.EXCLUSIVE) 放入等待队列,而 acquireQueued(node, arg) 使用循环,不断的为队列中的节点去尝试获取资源,直到获取成功或者被中断。


总结获取资源主要分为三步:

  1. 尝试获取资源
  2. 入队列
  3. 出队列


尝试获取资源 tryAcquire(arg),由子类实现,那下面则着手分别分析 入队列出队列


入队列:addWaiter(Node.EXCLUSIVE)

使用 addWaiter(Node.EXCLUSIVE) 方法将节点插入到队列中,步骤如下:

  1. 根据传入的模式创建节点
  2. 判断尾节点是否存在,不存在则需要使用 enq(node) 方法初始化节点,存在则直接尝试插入尾部。
  3. 尝试插入尾部时使用 CAS 插入,防止并发情况,如果插入失败,会调用 enq(node) 自旋直到插入。
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // 定位到队列末尾的 node
    Node pred = tail;
    if (pred != null) {
        // 新节点的上一个节点 指向尾节点
        node.prev = pred;
        // 使用 CAS 设置尾节点,tail 如果等于 pred 则更新为 node
        if (compareAndSetTail(pred, node)) {
            // 更新成功则将 pred 的下一个节点指向 node
            pred.next = node;
            return node;
        }
    }
    // 尾节点没有初始化,或竞争失败,自旋
    enq(node);
    return node;
}
/**
 * tailOffset 也就是成员变量 tail 的值
 * 此处相当于:比较 tail 的值和 expect 的值是否相等, 相等则更新为 update
 */
private final boolean compareAndSetTail(Node expect, Node update) {
    return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
private final boolean compareAndSetHead(Node update) {
        return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        // 尾节点为空 需要初始化头节点,此时头尾节点是一个
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            // 不为空 循环赋值
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

看完代码和注释肯定还是有点模糊,现在用图一步一步进行说明。


因为根据初始尾节点是否为空分为两种情况,这里使用两幅图:

  1. 第一幅为第一次添加节点,此时 head 会延迟初始化;
  2. 第二幅图为已经存在队列,进行插入节点;
  3. 注意看代码,enq 方法返回的是之前的尾节点
  4. addWaiter 方法 返回的是当前插入的新创建的节点

节点添加到队列之后,返回当前节点,而下一步则需要调用方法 acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 不断的去获取资源。


出队列:acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

方法会通过循环不断尝试获取拿到资源,直到成功。代码如下:

final boolean acquireQueued(final Node node, int arg) {
    // 是否拿到资源
    boolean failed = true;
    try {
        // 中断状态
        boolean interrupted = false;
        // 无限循环
        for (;;) {
            // 当前节点之前的节点
            final Node p = node.predecessor();
            // 前一个节点是头节点, 说明当前节点是 头节点的 next 即真实的第一个数据节点 (因为 head 是虚拟节点)
            // 然后再尝试获取资源
            if (p == head && tryAcquire(arg)) {
                // 获取成功之后 将头指针指向当前节点
                setHead(node); 
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // p 不是头节点, 或者 头节点未能获取到资源 (非公平情况下被别的节点抢占) 
            // 判断 node 是否要被阻塞,
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
  1. 不断获取本节点的上一个节点是否为 head,因为 head 是虚拟节点,如果当前节点的上一个节点是 head 节点,则当前节点为 第一个数据节点
  2. 第一个数据节点不断的去获取资源,获取成功,则将 head 指向当前节点;
  3. 当前节点不是头节点,或者 tryAcquire(arg) 失败(失败可能是非公平锁)。这时候需要判断前一个节点状态决定当前节点是否要被阻塞(前一个节点状态是否为 SIGNAL)。
/**
 * 根据上一个节点的状态,判断当前线程是否应该被阻塞
 * SIGNAL -1 :当前节点释放或者取消时,必须 unpark 他的后续节点。
 * CANCELLED 1 :由于超时(timeout)或中断(interrupt),该节点被取消。节点永远不会离开此状态。特别是,具有取消节点的线程永远不会再次阻塞。
 * CONDITION -2 :该节点目前在条件队列。 但它不会被用作同步队列节点,直到转移,转移时的状态将被设置为 0 。
 * PROPAGATE -3 :releaseShared 应该被传播到其他节点。 
 * 0:都不是
 *
 */
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 前一个节点的等待状态
    int ws = pred.waitStatus;
    // 前一个节点需要 unpark 后续节点
    if (ws == Node.SIGNAL)
        return true;
    // 当前节点处于取消状态
    if (ws > 0) {
        do {
            // 将取消的节点从队列中移除
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // 设置前一个节点为 SIGNAL 状态
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}


shouldParkAfterFailedAcquire 方法中,会判断前一个节点的状态,同时取消在队列中当前节点前面无效的节点。


再继续阅读 出队列 acquireQueued 方法,发现有一个 finally 会判断状态后执行 cancelAcquire(node); ,也就是上面流程图中下面的红色方块。

cancelAcquire(Node node)
final boolean acquireQueued(final Node node, int arg) {
    // 是否拿到资源
    boolean failed = true;
    try {
        // 省略
        // 在 finally 会将当前节点置为取消状态
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
private void cancelAcquire(Node node) {
    // 节点不存在 直接返回
    if (node == null)
        return;
    // 取消节点关联线程
    node.thread = null;
    //跳过已经取消的节点,获取当前节点之前的有效节点
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;
    // 获取当前节点之前的有效节点的下一个节点
    Node predNext = pred.next;
    // 当前节点设置为取消
    node.waitStatus = Node.CANCELLED;
    // 当前节点如果是尾节点,则将最后一个有效节点设置为尾节点,并将 predNext 设置为空
    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null);
    } else {
        int ws;
        // pred 不是头节点(node 的上一个有效节点 不是 head) && ( pred的状态是 SIGNAL ||  pred 的状态设置为 SIGNAL 成功 ) && pred 的绑定线程不为空
        if (pred != head && 
        ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && 
        pred.thread != null) {
            // 当前节点的后继节点
            Node next = node.next;
            // 后继节点不为空 且 状态有效 将 pred 的 后继节点设置为 当前节点的后继节点
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
            // node 的上一个有效节点 是 head, 或者其他情况 唤醒当前节点的下一个有效节点
            unparkSuccessor(node);
        }
        node.next = node; // help GC
    }
}
private void unparkSuccessor(Node node) {
    // 判断当前节点状态
    int ws = node.waitStatus;
    if (ws < 0)
        // 将节点状态更新为 0 
        compareAndSetWaitStatus(node, ws, 0);
    // 下一个节点, 一般是下一个节点应该就是需要唤醒的节点,即颁发证书。
    Node s = node.next;
    // 大于 0  CANCELLED : 线程已取消
    // 但是有可能 后继节点 为空或者被取消了。
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 从尾节点开始遍历,直到定位到 t.waitStatus <= 0 的节点
        // 定位到后并不会停止,会继续执行,相当于找到最开始的那个需要唤醒的节点
        // t.waitStatus <= 0 : SIGNAL( -1 后续线程需要释放) 
        //                     CONDITION ( -2 线程正在等待条件) 
        //                     PROPAGATE ( -3 releaseShared 应该被传播到其他节点)
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 定位到需要唤醒的节点后 进行 unpark
    if (s != null)
        LockSupport.unpark(s.thread);
}


流程分析:

  1. 找到当前节点的前一个非无效节点 pred;
  2. 当前节点如果是尾节点,则将最后一个有效节点设置为尾节点,并将 predNext 设置为空;
  3. pred 不是头节点 && ( pred的状态是 SIGNAL || pred 的状态设置为 SIGNAL 成功 ) && pred 的绑定线程不为空;
  4. 其他情况。


下面分别画图:


Q: 通过图可以看出来,只操作了 next 指针,但是没有操作 prev 指针,这是为什么呢?

A:出队列:acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 方法中,shouldParkAfterFailedAcquire 方法会判断前一个节点的状态,同时取消在队列中当前节点前面无效的节点。这时候会移除之前的无效节点,此处也是为了防止指向一个已经被移除的节点。同时保证 prev 的稳定,有利于从 tail 开始遍历列表,这块在 unparkSuccessor(node); 中也可以看到是从后往前表里列表。


Q: unparkSuccessor(Node node) 为什么从后往前遍历?

A:

addWaiter(Node.EXCLUSIVE) 插入新节点时,使用的是 尾插法,看红框部分,此时有可能还未指向next。


Q: node.next = node; 这块导致 head不是指向最新节点,链表不就断了么?A: acquireQueued 方法介绍中,里面有个循环,会不断尝试获取资源,成功之后会设置为 head。并且在 shouldParkAfterFailedAcquire 中也会清除当前节点前的无效节点。


释放独占资源 release

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

以独占模式释放。 通过释放一个或多个线程,如果实现tryRelease返回true。 这种方法可以用来实现方法Lock.unlock 。

  1. tryRelease(arg) 操作释放资源,同样是由子类实现,后面介绍子类时会进行说明。返回 true 说明资源现在已经没有线程持有了,其他节点可以尝试获取;
  2. 释放成功,且 head != null && h.waitStatus != 0, 会继续执行 unparkSuccessor(h);
  3. 这块会看到 只要 tryRelease(arg) 操作释放资源成功, 后面无论执行是否成功,都会返回 true,unparkSuccessor(h) 相当于只是附加操作。
目录
相关文章
|
3天前
|
Java 网络虚拟化
从源码全面解析LinkedBlockingQueue的来龙去脉
从源码全面解析LinkedBlockingQueue的来龙去脉
|
3天前
|
数据采集 安全 Java
Java并发编程学习12-任务取消(上)
【5月更文挑战第6天】本篇介绍了取消策略、线程中断、中断策略 和 响应中断的内容
30 4
Java并发编程学习12-任务取消(上)
|
3天前
|
Java 编译器
从源码全面解析 ArrayBlockingQueue 的来龙去脉
从源码全面解析 ArrayBlockingQueue 的来龙去脉
|
设计模式 算法
深入了解ReentrantLock源码附带图文分析
ReentrantLock源码图文分析
82 0
深入了解ReentrantLock源码附带图文分析
|
机器学习/深度学习 安全 Java
java并发原理实战(10)--AQS 和公平锁分析
java并发原理实战(10)--AQS 和公平锁分析
100 0
java并发原理实战(10)--AQS 和公平锁分析
别走!这里有个笔记:图文讲解 AQS ,一起看看 AQS 的源码……(图文较长)(二)
AbstractQueuedSynchronizer 抽象队列同步器,简称 AQS 。是在 JUC 包下面一个非常重要的基础组件,JUC 包下面的并发锁 ReentrantLock CountDownLatch 等都是基于 AQS 实现的。所以想进一步研究锁的底层原理,非常有必要先了解 AQS 的原理。
95 0
|
设计模式 IDE 算法
万字超强图文讲解AQS以及ReentrantLock应用(建议收藏)(上)
万字超强图文讲解AQS以及ReentrantLock应用(建议收藏)
万字超强图文讲解AQS以及ReentrantLock应用(建议收藏)(上)
万字超强图文讲解AQS以及ReentrantLock应用(建议收藏)(下)
万字超强图文讲解AQS以及ReentrantLock应用(建议收藏)(下)
万字超强图文讲解AQS以及ReentrantLock应用(建议收藏)(下)
|
算法 Java API
硬核干货:5W字17张高清图理解同步器框架AbstractQueuedSynchronizer(下)
Doug Lea大神编写AQS是有严谨的理论基础的,他的个人博客上有一篇论文《The java.util.concurrent Synchronizer Framewor》,可以在互联网找到相应的译文《JUC同步器框架》,如果想要深入研究AQS必须要理解一下该论文的内容,然后结合论文内容详细分析一下AQS的源码实现。本文在阅读AQS源码的时候选用的JDK版本是JDK11。
138 0
硬核干货:5W字17张高清图理解同步器框架AbstractQueuedSynchronizer(下)
|
存储 算法 Java
硬核干货:5W字17张高清图理解同步器框架AbstractQueuedSynchronizer(上)
Doug Lea大神编写AQS是有严谨的理论基础的,他的个人博客上有一篇论文《The java.util.concurrent Synchronizer Framewor》,可以在互联网找到相应的译文《JUC同步器框架》,如果想要深入研究AQS必须要理解一下该论文的内容,然后结合论文内容详细分析一下AQS的源码实现。本文在阅读AQS源码的时候选用的JDK版本是JDK11。
151 1
硬核干货:5W字17张高清图理解同步器框架AbstractQueuedSynchronizer(上)