AbstractQueuedSynchronizer 笔记

简介: AbstractQueuedSynchronizer 笔记

AbstractQueuedSynchronizer

AQS是一个内部维护了先进先出队列+标识(数字)的模型,标识使用CAS模式修改,作为多线程工具的基础组件

AQS队列结构.png

属性

属性名 说明
volatile Node head 为头节点会清理Node中关联的pre,thread避免GC不回收
volatile Node tail 尾节点
volatile int state 0为空闲其它组件按需使用,使用cas来赋值,
Thread exclusiveOwnerThread 持有线程

state为volatile的int,不同的业务场景按需实现,

独占模式:

ReentrantLock.Sync中state为0表示未锁定>0表示被几个线程持有

ThreadPoolExecutor.Worker中state为0表示未执行

共享模式:

CountDownLatch.Sync中state为初始化时指定,表示有多少个线程可持有,

Semaphore.Sync中state与CountDownLatch相同

混合模式:

ReentrantReadWriteLock.Sync中state为共享锁+独占锁组合 通过位运算16位来分割,最大的读锁写锁个数为65535

关键方法

独占模式

持有资源:acquire

acquire: 独占模式获取,独占模式即只有一个线程可更新state.忽略中断标识,在获取之后响应中断。

acquireInterruptibly:独占模式获取,线程标识为中断则抛出异常

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

AQS-acquire.png

tryAcquire:子类按需实现,使用独占模式更新state,增加state,成功返回true失败返回false

中断后不会正确响应park,所以需要重置线程中断标识,并在unpark之后进行中断补偿

释放资源:release

release:以独占模式释放资源

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

AQS-release.png

tryRelease:子类按需实现,使用独占模式更新state,减少state,并处理对应独占线程

共享模式

持有资源:acquireShared

  • acquireShared 共享模式获取,忽略中断线程,在获取之后相应中断。
  • acquireSharedInterruptibly 共享模式获取,响应中断,线程中断则抛出异常。
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

AQS-acquireShared.png

tryAcquireShared:子类按需实现,对返回值有如下要求:

负值:失败。 零:共享模式下的获取成功,但是没有后续共享模式下的获取可以成功。 正值: 如果共享模式下的获取成功并且后续共享模式下的获取也可能成功,则为正值,在这种情况下,后续的等待线程必须检查可用性。 (对三个不同返回值的支持使该方法可以在仅有时进行获取的情况下使用。)成功后,就已经获取了此对象。

释放资源:releaseShared

  • releaseShared:以共享模式释放资源
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

AQS-releaseShared.png

tryReleaseShared:子类按需实现,使用共享模式更新state,减少state

其它关键方法

检查并更新节点状态:shouldParkAfterFailedAcquire

在park线程之前的判断,当前置节点为取消时更新前置节点

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

唤醒后续节点:unparkSuccessor

唤醒后续节点,并清理取消的节点,

private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

共享模式设置队列头:setHeadAndPropagate

共享模式下多个线程同时持有资源,头节点会频繁变化需要及时释放资源

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);
    /*
     * Try to signal next queued node if:
     *   Propagation was indicated by caller,
     *     or was recorded (as h.waitStatus either before
     *     or after setHead) by a previous operation
     *     (note: this uses sign-check of waitStatus because
     *      PROPAGATE status may transition to SIGNAL.)
     * and
     *   The next node is waiting in shared mode,
     *     or we don't know, because it appears null
     *
     * The conservatism in both of these checks may cause
     * unnecessary wake-ups, but only when there are multiple
     * racing acquires/releases, so most need signals now or soon
     * anyway.
     */
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

Node

属性名 说明
int waitStatus 1:CANCELLED,表示当前的线程被取消;<br/>-1:SIGNAL,后续节点需要unpark;<br/>-2:CONDITION,表示当前节点在等待condition,也就是在condition队列中;<br/>-3:PROPAGATE,A releaseShared应该传播到其他节点。 在doReleaseShared中对此进行了设置(仅适用于头节点),以确保传播继续进行,即使此后进行了其他操作也是如此。 <br/> 0:表示当前节点在sync队列中,等待着获取锁。
Node prev 前驱节点,比如当前节点被取消,那就需要前驱节点和后继节点来完成连接。
Node next 后继节点。
Thread thread 入队列时的当前线程。
Node nextWaiter 为NULL表示为独占模式

PROPAGATE:共享模式中会通过状态是否小于0来判断是否需要唤醒后续节点,共享模式下多个线程可同时持有state变更,waitStatus会频繁从0切换为SIGNAL,区分SIGNAL增加的中间状态所以称为传播值

目录
打赏
0
0
0
0
891
分享
相关文章
Exploring Java's AbstractQueuedSynchronizer (AQS)
AbstractQueuedSynchronizer (AQS) is a key building block in Java's concurrent programming paradigm. With its powerful capabilities, it simplifies the development of custom synchronizers and facilitates efficient thread synchronization and control. Understanding AQS is essential for any Java develope
72 1
|
10月前
|
AbstractQueuedSynchronizer
AbstractQueuedSynchronizer
75 0
|
10月前
|
深入探索:AbstractQueuedSynchronizer 同步器的神秘面纱
深入探索:AbstractQueuedSynchronizer 同步器的神秘面纱
第二季:6CountDownLatch/CyclicBarrier/Semaphore使用过吗?【Java面试题】
第二季:6CountDownLatch/CyclicBarrier/Semaphore使用过吗?【Java面试题】
53 0
深入浅出,从 ReentrantLock 到 AQS | Java(上)
对于非Java后端同学来说,没听过倒也不是什么太过分的事,但是如果你深入学习过 Java 并发相关,那么肯定会去了解各种锁,而作为一个 有志青年 的你必然会在心里来一句,为什么加了锁就可以同步 ? 此时必然也会看到 AQS 的影子。
146 0
JUC系列学习(二):AbstractQueuedSynchronizer同步器框架及相关实现类
在并发编程中,我们经常用到的是`synchronized`和`ReentrantLock`。其中,`synchronized`是`jvm`内置锁,而`ReentrantLock`位于`java.util.concurrent`包下(以下简称`JUC`),`ReentrantLock`是基于`AbstractQueuedSynchronizer`(以下简称`AQS`)同步器框架实现的,本文主要来介绍`AQS`的内部实现及在`JUC`中基于`AQS`实现的相关类。
103 0
Java并发之AbstractQueuedSynchronizer(AQS)详解
Java并发之AbstractQueuedSynchronizer(AQS)详解
Java并发之AbstractQueuedSynchronizer(AQS)详解
Java并发编程 - AQS 简介(AbstractQueuedSynchronizer)
Java并发编程 - AQS 简介(AbstractQueuedSynchronizer)
168 0
Java并发编程 - AQS 简介(AbstractQueuedSynchronizer)
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等