Java AQS 实现——排他模式

本文涉及的产品
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
云解析 DNS,旗舰版 1个月
简介: 本文着重介绍 AQS 的排他模式的实现方式。

引言

本文着重介绍 AQS 的排他模式的实现方式。所有关于 Java 并发的文章均收录于<Java并发系列文章>

排他模式

获取资源

入队操作介绍完之后,我们来看一下什么情况下需要执行入队操作,我们先从排他模式说起。下面的 acquire 是 AQS 提供的一个以排他模式获取资源的函数,我们可以看到它的执行流程是:

  1. 先尝试获取资源 tryAcquire,tryAcquire 是一个抽象函数,看完前面的锁的分类部分大家应该对它比较熟悉,因为通过 AQS 实现的各类锁实际上就是通过对 tryAcquire 这类抽象函数的覆写来达到各种锁的效果的。
  2. 如果尝试加锁失败,也就是说当前该资源已经被加锁了,就通过 addWaiter 将当前线程添加到同步队列中,注意参数是 Node.EXCLUSIVE 意味着排它锁。
  3. 加入到同步队列后,开始执行 acquireQueued 函数,猜一下应该能猜到这里面一定是进行睡眠等待的逻辑
/**
 * Acquires in exclusive mode, ignoring interrupts.  Implemented
 * by invoking at least once {@link #tryAcquire},
 * returning on success.  Otherwise the thread is queued, possibly
 * repeatedly blocking and unblocking, invoking {@link
 * #tryAcquire} until success.  This method can be used
 * to implement method {@link Lock#lock}.
 *
 * @param arg the acquire argument.  This value is conveyed to
 *        {@link #tryAcquire} but is otherwise uninterpreted and
 *        can represent anything you like.
 */
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

下面的就是 acquireQueued 的代码。

  1. 这里面有一个循环,它会不断地获取当前节点的前序节点,如果前序节点是 head 节点(也就是那个虚拟节点,还记得吗,虚拟节点不保存有效数据,只用作指针),这里 head 节点充当一个标志位的效果,如果一个节点的前序节点是 head,那么该节点就排在队列的第一位。
  2. 如果 p == head ,我们就需要再调用一次 tryAcquire 尝试获取锁
  3. 如果第二步成功的话,当前线程已经获得到锁了,这时候要将 head 指针进行修改,可以看到 setHead 并没有使用到 CAS 指令,因为能执行到 setHead 函数的线程相当于已经获得到同步资源了,不存在竞争
  4. 在 setHead 函数中,对当前 Node 的 thread 和 prev 进行了清除,因为这时候该 Node 已经扮演了虚拟节点的角色,有必要把虚拟节点中用不到的属性进行清除
  5. 如果 CAS 执行失败(这里有很多种可能,比如被中断了,或者是刚入队,又或者是虚假唤醒(后面介绍)),则检查是否需要继续进入睡眠,一般来说如果前序节点的状态成功改为 SIGNAL 之后(但是改成 SIGNAL 之后还会再尝试获取一次锁,失败之后才会睡眠,这是防止死等的重中之重),就可以进入等待了,SIGNAL 表明当前节点肩负着唤醒下一个节点的责任,除此之外,在检查是否需要睡眠时,如果发现前序节点的请求已经被取消,则删除该节点。
  6. 如果发现自己确实需要睡眠,则会通过 park 函数进入等待状态,因为使用的是 park 所以不需要担心,别的线程先执行唤醒之后,当前线程再进入等待的情况,因为在这种情况下 park 函数会直接返回不会进行等待
  7. 最后从等待状态中恢复过来之后,检查是否是因为中断而唤醒的,是的话,就记录一下,在返回的时候以返回值的形式告诉调用者。可见 acquire 在遇到中断时,不会抛出 InterruptedException 异常,而是循环重试。如果想要达到被中断时立即抛出异常的效果,可以使用 acquireInterruptibly, 它的实现逻辑和 acquire 基本相同,主要的区别就是 park 被中断时,会抛出 InterruptedException
/**
 * Acquires in exclusive uninterruptible mode for thread already in
 * queue. Used by condition wait methods as well as acquire.
 */
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {// 自己是排在最前面的节点,尝试获取锁
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            // 永远执行不到,因为抛出的异常都被 parkAndCheckInterrupt 中的 Thread.interrupted() 吞掉了
            cancelAcquire(node);
    }
}

/**
 * Sets head of queue to be node, thus dequeuing. Called only by
 * acquire methods.  Also nulls out unused fields for sake of GC
 * and to suppress unnecessary signals and traversals.
 */
private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}

/**
 * Checks and updates status for a node that failed to acquire.
 * Returns true if thread should block. This is the main signal
 * control in all acquire loops.  Requires that pred == node.prev.
 *
 * @param pred node's predecessor holding status
 * @param node the node
 * @return {@code true} if thread should block
 */
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * 前序节点的状态已经是 SIGNAL 了,可以进入等待状态
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    if (ws > 0) {
        /*
         * 前序节点被取消,移除被取消的节点
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * 修改前序节点为 SIGNAL,这时候当前线程还没进入等待状态,我们需要重新判断一下现在自己是不是第一个,是的话就不用等待了
         * 改成 SIGNAL 之后还会再尝试获取一次锁,失败之后才会睡眠,这是防止死等的重中之重,考虑如下情况 ThreadB-tryAcquire->ThreadA-Release->ThreadA-CheckStatus(!=SIGNAL)->ThreadB-ChangeStatus2SIGNAL
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

/**
 * Convenience method to park and then check if interrupted
 *
 * @return {@code true} if interrupted
 */
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

作为延伸,我们这里带大家回顾一下 ReentrantLock 中公平锁部分对 tryAcquire 的实现。它对大家理解 AQS 如何在 CAS 修改同步队列的情况下(先修改前序指针->CAS 修改尾结点->修复后续指针),以哪种方式访问队列中的数据能够避开数据不同步的风险。简单地说,通过前序指针(prev)访问队列中的数据肯定是安全的。

/**
 * Fair version of tryAcquire.  Don't grant access unless
 * recursive call or no waiters or is first.
 */
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        // 在尝试获取公平锁时,先会判断队列中是否存在前序节点。
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

public final boolean hasQueuedPredecessors() {
    // The correctness of this depends on head being initialized
    // before tail and on head.next being accurate if the current
    // thread is first in queue.
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

从上述代码中可以看到,在尝试获取公平锁时,先会判断队列中是否存在前序节点。之所以这么做是因为,只有发生互斥等待时,才会出现入队等待的情况,如果全是共享模式使用资源的话,队列会一直是空的,大家别急我们接下来就介绍共享模式的实现。这里我们先着重看一下 hasQueuedPredecessors 的实现,它是怎么判断有前序节点的呢:

  1. 首先如果 h != t 是说,队列的不为空,因为 head == tail 时,队列中只存在一个虚拟节点不存在实际的等待线程
  2. 在此基础上,我们还要判断一下当前持有锁的线程是不是自己,如果 head.next == null 说明有其他线程刚执行完入队的setTail工作(因为 h != t),但是前序指针还没修复,这种情况下 head.next == null,说明有别的线程已经持有锁了
  3. 另外一种可能就是 head.next != Thread.currentThread() 这时候队首持有锁的线程不是当前线程,所以存在前序节点

既然前面提到了 acquireInterruptibly 这里我们就简单地说一下,就像前面所说它和 acquire 基本相同,确实如此,从下面的代码中可以看到,它就是遇到中断时将 InterruptedException 外抛。

/**
 * Acquires in exclusive mode, aborting if interrupted.
 * Implemented by first checking interrupt status, then invoking
 * at least once {@link #tryAcquire}, returning on
 * success.  Otherwise the thread is queued, possibly repeatedly
 * blocking and unblocking, invoking {@link #tryAcquire}
 * until success or the thread is interrupted.  This method can be
 * used to implement method {@link Lock#lockInterruptibly}.
 *
 */
public final void acquireInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}

private void doAcquireInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                // 唯一的区别在这里,这里直接抛出异常,而 acquire 中只是记录一下flag: interrupted = true;
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            // 如果抛出 InterruptedException 异常,则会执行 cancelAcquire
            cancelAcquire(node);
    }
}

当抛出异常时,就会执行到最后面的 cancelAcquire 函数,该函数中负责将前序节点中状态为 CANCELLED 的节点清除,然后再把自己的状态变为 CANCELLED。紧接着是一些清除工作:

  • 如果当前节点是尾结点,则通过 CAS 修改尾结点即可。如果 CAS 失败了也不要紧,因为当前线程状态已经是 CANCELLED 了所以其他线程会把自己清除
  • 如果当前节点不是第一个节点,即 pred != head ,我们要保证前序节点的状态是 SIGNAL,并且前序节点的线程不是当前线程,因为自己不是尾结点,所以自己当前的状态很可能就是 SIGNAL,所以这里我们无论如何要确保前序节点的状态能够修改为 SIGNAL,如果做到了,就可以大胆地通过 CAS 将自己从队列中移除
  • 否则,说明自己可能是头结点,或者前序节点都取消了,也有可能前序节点的线程就是当前线程,那么就只能由自己来唤醒后续的线程了
/**
 * Cancels an ongoing attempt to acquire.
 *
 * @param node the node
 */
private void cancelAcquire(Node node) {
    // Ignore if node doesn't exist
    if (node == null)
        return;

    node.thread = null;

    // Skip cancelled predecessors
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;

    // predNext is the apparent node to unsplice. CASes below will
    // fail if not, in which case, we lost race vs another cancel
    // or signal, so no further action is necessary.
    Node predNext = pred.next;

    // Can use unconditional write instead of CAS here.
    // After this atomic step, other Nodes can skip past us.
    // Before, we are free of interference from other threads.
    node.waitStatus = Node.CANCELLED;

    // If we are the tail, remove ourselves.
    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null);
    } else {
        // If successor needs signal, try to set pred's next-link
        // so it will get one. Otherwise wake it up to propagate.
        int ws;
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
            unparkSuccessor(node);
        }

        node.next = node; // help GC
    }
}

唤醒的过程也很简单:

  1. 如果当前节点状态不是 CANCELLED 就清除状态
  2. 然后先看一下 next 指针指向的节点是否需要被唤醒,next == null 代表了可能发生的前序指针和后续指针不同步,s.waitStatus > 0 表示后继节点已被取消,这时候我们就需要找到下一个需要被唤醒的节点。
  3. 我们需要从尾结点出发,逐个向前找,因为前序指针肯定是安全的
  4. 如果找到了需要被唤醒的线程,就执行 unpark 唤醒它
/**
 * Wakes up node's successor, if one exists.
 *
 * @param node the node
 */
private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

释放资源

介绍完资源的获取,我们再来看看资源的释放流程。

  1. 首先,调用 tryRelease 函数,它也是一个抽象函数,在上层实现中一般会进行必要的检查,比如检查持有锁的线程是否是当前线程等,就比如如下 ReentrantLock 中对 tryRelease 的实现。
  2. 释放成功后,判断当前同步队列是否为空,不为空并且当前线程承担唤醒职责时(waitStatus < 0),因为当前线程能够成功执行 tryRelease,所以当前线程的 waitStatus 不会是 CANCELLED,剩下的状态 SIGNAL 是需要承担唤醒职责的,CONDITION 和 PROPAGATE 我们后面介绍。
  3. 具体的唤醒函数 unparkSuccessor 我们前面刚介绍过,这里就不再赘述了
/**
 * Releases in exclusive mode.  Implemented by unblocking one or
 * more threads if {@link #tryRelease} returns true.
 * This method can be used to implement method {@link Lock#unlock}.
 *
 * @param arg the release argument.  This value is conveyed to
 *        {@link #tryRelease} but is otherwise uninterpreted and
 *        can represent anything you like.
 * @return the value returned from {@link #tryRelease}
 */
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

AQS 框架使用

看到这里,不知道大家对 AQS 的使用流程有没有体会,我的理解是在使用它时,一般我们需要覆写 tryAcquire 和 tryRelease 这类函数,它们是直接修改互斥资源 state 的函数,AQS 通过将具体的状态修改职责移交到子类中,能让子类实现各种类型的锁,就比如前一章我们介绍的那些。

而子类向外暴露 lock 和 unlock 函数时,又直接使用 AQS 中的 acquire 和 releas 函数,因为这些函数中封装了尝试加锁过程和加锁失败入队等待过程。

public void lock()        { acquire(1); }
public void unlock()      { release(1); }

总结一下就是 AQS 对变化报开放的态度,你可以通过它完成各种同步策略,同时对与那些样板代码,都已经被它封装在了自己内部,并使用 final 关键字修饰,例如 acquire 和 release。

文章说明

更多有价值的文章均收录于贝贝猫的文章目录

stun

版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!

创作声明: 本文基于下列所有参考内容进行创作,其中可能涉及复制、修改或者转换,图片均来自网络,如有侵权请联系我,我会第一时间进行删除。

参考内容

[1] linux 2.6 互斥锁的实现-源码分析
[2] 深入解析条件变量(condition variables)
[3] Linux下Condition Vairable和Mutext合用的小细节
[4] 从ReentrantLock的实现看AQS的原理及应用
[5] 不可不说的Java“锁”事
[6] 从源码层面解析yield、sleep、wait、park
[7] LockSupport中的park与unpark原理
[8] Thread.sleep、Object.wait、LockSupport.park 区别
[9] 从AQS到futex-二-JVM的Thread和Parker
[10] Java的LockSupport.park()实现分析
[11] JVM源码分析之Object.wait/notify实现
[12] Java线程源码解析之interrupt
[13] Thread.interrupt()相关源码分析%E7%9B%B8%E5%85%B3%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90/)
[14] Java CAS 原理剖析
[15] 源码解析 Java 的 compareAndSwapObject 到底比较的是什么
[16] 《Java并发编程的艺术》
[17] 《实战 Java 高并发程序设计》
[18] volatile关键字深入学习
[19] 为什么Netty的FastThreadLocal速度快
[20] 线程池ThreadPoolExecutor实现原理
[21] 深入理解Java线程池:ThreadPoolExecutor
[22] ConcurrentHashMap 详解一
[23] ConcurrentHashMap 详解二
[24] JUC中Atomic class之lazySet的一点疑惑
[25] The JSR-133 Cookbook for Compiler Writers
[26] 就是要你懂Java中volatile关键字实现原理

相关文章
|
3月前
|
存储 Java
JAVA并发编程AQS原理剖析
很多小朋友面试时候,面试官考察并发编程部分,都会被问:说一下AQS原理。面对并发编程基础和面试经验,专栏采用通俗简洁无废话无八股文方式,已陆续梳理分享了《一文看懂全部锁机制》、《JUC包之CAS原理》、《volatile核心原理》、《synchronized全能王的原理》,希望可以帮到大家巩固相关核心技术原理。今天我们聊聊AQS....
|
3月前
|
存储 Java 开发者
【Java新纪元启航】JDK 22:解锁未命名变量与模式,让代码更简洁,思维更自由!
【9月更文挑战第7天】JDK 22带来的未命名变量与模式匹配的结合,是Java编程语言发展历程中的一个重要里程碑。它不仅简化了代码,提高了开发效率,更重要的是,它激发了我们对Java编程的新思考,让我们有机会以更加自由、更加创造性的方式解决问题。随着Java生态系统的不断演进,我们有理由相信,未来的Java将更加灵活、更加强大,为开发者们提供更加广阔的舞台。让我们携手并进,共同迎接Java新纪元的到来!
74 11
|
3月前
|
设计模式 Java
Java设计模式-工厂方法模式(4)
Java设计模式-工厂方法模式(4)
|
4月前
|
消息中间件 Java
【实战揭秘】如何运用Java发布-订阅模式,打造高效响应式天气预报App?
【8月更文挑战第30天】发布-订阅模式是一种消息通信模型,发送者将消息发布到公共队列,接收者自行订阅并处理。此模式降低了对象间的耦合度,使系统更灵活、可扩展。例如,在天气预报应用中,`WeatherEventPublisher` 类作为发布者收集天气数据并通知订阅者(如 `TemperatureDisplay` 和 `HumidityDisplay`),实现组件间的解耦和动态更新。这种方式适用于事件驱动的应用,提高了系统的扩展性和可维护性。
81 2
|
4月前
|
Java
"揭秘Java IO三大模式:BIO、NIO、AIO背后的秘密!为何AIO成为高并发时代的宠儿,你的选择对了吗?"
【8月更文挑战第19天】在Java的IO编程中,BIO、NIO与AIO代表了三种不同的IO处理机制。BIO采用同步阻塞模型,每个连接需单独线程处理,适用于连接少且稳定的场景。NIO引入了非阻塞性质,利用Channel、Buffer与Selector实现多路复用,提升了效率与吞吐量。AIO则是真正的异步IO,在JDK 7中引入,通过回调或Future机制在IO操作完成后通知应用,适合高并发场景。选择合适的模型对构建高效网络应用至关重要。
94 2
|
4月前
|
设计模式 XML 存储
【二】设计模式~~~创建型模式~~~工厂方法模式(Java)
文章详细介绍了工厂方法模式(Factory Method Pattern),这是一种创建型设计模式,用于将对象的创建过程委托给多个工厂子类中的某一个,以实现对象创建的封装和扩展性。文章通过日志记录器的实例,展示了工厂方法模式的结构、角色、时序图、代码实现、优点、缺点以及适用环境,并探讨了如何通过配置文件和Java反射机制实现工厂的动态创建。
【二】设计模式~~~创建型模式~~~工厂方法模式(Java)
|
4月前
|
设计模式 XML Java
【一】设计模式~~~创建型模式~~~简单工厂模式(Java)
文章详细介绍了简单工厂模式(Simple Factory Pattern),这是一种创建型设计模式,用于根据输入参数的不同返回不同类的实例,而客户端不需要知道具体类名。文章通过图表类的实例,展示了简单工厂模式的结构、时序图、代码实现、优缺点以及适用环境,并提供了Java代码示例和扩展应用,如通过配置文件读取参数来实现对象的创建。
【一】设计模式~~~创建型模式~~~简单工厂模式(Java)
|
3月前
|
JSON Java UED
uniapp:使用DCloud的uni-push推送消息通知(在线模式)java实现
以上展示了使用Java结合DCloud的uni-push进行在线消息推送的基本步骤和实现方法。实际部署时,可能需要依据实际项目的规模,业务场景及用户基数进行必要的调整和优化,确保消息推送机制在保证用户体验的同时也满足业务需求。
235 0
|
4月前
|
开发者 C# 存储
WPF开发者必读:资源字典应用秘籍,轻松实现样式与模板共享,让你的WPF应用更上一层楼!
【8月更文挑战第31天】在WPF开发中,资源字典是一种强大的工具,用于共享样式、模板、图像等资源,提高了应用的可维护性和可扩展性。本文介绍了资源字典的基础知识、创建方法及最佳实践,并通过示例展示了如何在项目中有效利用资源字典,实现资源的重用和动态绑定。
116 0
|
4月前
|
Java 开发者
解锁Java并发编程的秘密武器!揭秘AQS,让你的代码从此告别‘锁’事烦恼,多线程同步不再是梦!
【8月更文挑战第25天】AbstractQueuedSynchronizer(AQS)是Java并发包中的核心组件,作为多种同步工具类(如ReentrantLock和CountDownLatch等)的基础。AQS通过维护一个表示同步状态的`state`变量和一个FIFO线程等待队列,提供了一种高效灵活的同步机制。它支持独占式和共享式两种资源访问模式。内部使用CLH锁队列管理等待线程,当线程尝试获取已持有的锁时,会被放入队列并阻塞,直至锁被释放。AQS的巧妙设计极大地丰富了Java并发编程的能力。
50 0