多线程进阶学习10------AQS详解(2)

简介: 多线程进阶学习10------AQS详解(2)

独占锁释放锁

当前线程获取到锁并执行了相应逻辑之后,就需要释放锁,使得后续结点能够继续获取锁。通过调用AQS的release(int arg)模版方法可以独占式的释放锁,在该方法大概步骤如下:

  1. 尝试使用tryRelease(arg)释放锁,该方法在最开始我们就讲过,是自己实现的方法,通常来说就是将state值为0或者减少、清除当前获得锁的线程等等,如果符合自己的逻辑,锁释放成功则返回true,否则返回false;
  2. 如果tryRelease释放成功返回true,判断如果head不为null且head的状态不为0,那么尝试调用unparkSuccessor方法唤醒头结点之后的一个非取消状态(非CANCELLED状态)的后继结点,让其可以进行锁获取。返回true,方法结束;
  3. 如果tryRelease释放失败,那么返回false,方法结束。
/**
 * 独占式的释放同步状态
 *
 * @param arg 参数
 * @return 释放成功返回true, 否则返回false
 */
public final boolean release(int arg) {
    /*tryRelease释放同步状态,该方法是自己重写实现的方法
    释放成功将返回true,否则返回false或者自己实现的逻辑*/
    if (tryRelease(arg)) {
        //获取头结点
        Node h = head;
        //如果头结点不为null并且状态不等于0
        if (h != null && h.waitStatus != 0)
            /*那么唤醒头结点的一个出于等待锁状态的后继结点
             * 该方法在acquire中已经讲过了
             * */
            unparkSuccessor(h);
        return true;
    }
    return false;
}

unparkSuccessor唤醒后继结点

unparkSuccessor用于唤醒参数结点的某个非取消的后继结点,该方法在很多地方法都被调用,大概步骤:

  1. 如果当前结点的状态小于0,那么CAS设置为0,表示后继结点可以继续尝试获取锁
  2. 如果当前结点的后继s为null或者状态为取消CANCELLED,则将s先指向null;然后从tail开始到node之间倒序向前查找,找到离tail最远的非取消结点赋给s。需要从后向前遍历,因为同步队列只保证结点前驱关系的正确性。
  3. 如果s不为null,那么状态肯定不是取消CANCELLED,则直接唤醒s的线程,调用LockSupport.unpark方法唤醒,被唤醒的结点将从被park的位置继续执行!
/**
 * 唤醒指定结点的后继结点
 *
 * @param node 指定结点
 */
private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    /*
     * 1)  如果当前结点的状态小于0,那么CAS设置为0,表示后继结点线程可以先尝试获锁,而不是直接挂起。
     * */
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    //先获取node的直接后继
    Node s = node.next;
    /*
     * 2)  如果s为null或者状态为取消CANCELLED,则从tail开始到node之间倒序向前查找,找到离tail最远的非取消结点赋给s。
     * */
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    /*
     * 3)如果s不为null,那么状态肯定不是取消CANCELLED,则直接唤醒s的线程,调用LockSupport.unpark方法唤醒,被唤醒的结点将从被park的位置向后执行!
     * */
    if (s != null)
        LockSupport.unpark(s.thread);
}

其他方法

至于其他的加锁可中断,和加锁可超时,其实都和我们之前的那些基础api有共通点,就不细说了

public final void acquireInterruptibly(int arg)
        throws InterruptedException {
    //如果当前线程被中断,直接抛出异常
    if (Thread.interrupted())
        throw new InterruptedException();
    //尝试获取锁
    if (!tryAcquire(arg))
        //如果没获取到,那么调用AQS 可被中断的方法
        doAcquireInterruptibly(arg);
}
/**
 * 独占式超时获取锁,支持中断
 *
 * @param arg          参数
 * @param nanosTimeout 超时时间,纳秒
 * @return 是否获取锁成功
 * @throws InterruptedException 如果被中断,则抛出InterruptedException异常
 */
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    //如果当前线程被中断,直接抛出异常
    if (Thread.interrupted())
        throw new InterruptedException();
    //同样调用tryAcquire尝试获取锁,如果获取成功则直接返回true
    //否则调用doAcquireNanos方法挂起指定一段时间,该短时间内获取到了锁则返回true,超时还未获取到锁则返回false
    return tryAcquire(arg) ||
            doAcquireNanos(arg, nanosTimeout);//底层原理就是调用parkNacos()
}

共享锁

读懂了独占锁,共享锁就比较简单了。

1.共享锁的模式是shared的。

2.如果是共享节点被唤醒,会根据传播状态,继续向后唤醒下一个节点。那会不会把独占节点也唤醒了?

答案是会的,但是独占节点被唤醒后依然会去抢锁tryAcquireShared。这时根据子类的实现,独占节点是抢不到锁的,所以又会继续阻塞。

这样虽然看似耗费性能,实则更加通用。

/**
 * 自旋尝试共享式获取锁,一段时间后可能会挂起
 * 和独占式获取的区别:
 * 1 以共享模式Node.SHARED添加结点
 * 2 获取到锁之后,修改当前的头结点,并将信息传播到后续的结点队列中
 *
 * @param arg 参数
 */
private void doAcquireShared(int arg) {
    /*1 addWaiter方法逻辑,和独占式获取的区别1 :以共享模式Node.SHARED添加结点*/
    final Node node = addWaiter(Node.SHARED);
    /*2 下面就是类似于acquireQueued方法的逻辑
     * 区别在于获取到锁之后acquireQueued调用setHead方法,这里调用setHeadAndPropagate方法
     *  */
    //当前线程获取锁失败的标志
    boolean failed = true;
    try {
        //当前线程的中断标志
        boolean interrupted = false;
        for (; ; ) {
            //获取前驱结点
            final Node p = node.predecessor();
            /*当前驱结点是头结点的时候就会以共享的方式去尝试获取锁*/
            if (p == head) {
                int r = tryAcquireShared(arg);
                /*返回值如果大于等于0,则表示获取到了锁*/
                if (r >= 0) {
                    /*和独占式获取的区别2 :修改当前的头结点,根据传播状态判断是否要唤醒后继结点。*/
                    setHeadAndPropagate(node, r);
                    // 释放掉已经获取到锁的前驱结点
                    p.next = null;
                    /*检查设置中断标志*/
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            /*判断是否应该挂起,以及挂起的方法,和acquireQueued方法的逻辑完全一致,不会响应中断*/
            if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

再看看释放锁的逻辑,主要是调用doReleaseShared方法,只会唤醒一个头线程。然后唤醒的线程会在doAcquireShared循环获取锁后继续往后唤醒。

/**
 * 共享式获取锁的核心方法,尝试唤醒一个后继线程,被唤醒的线程会尝试获取共享锁,如果成功之后,则又会有可能调用setHeadAndPropagate,将唤醒传播下去。
 * 独占锁只有在一个线程释放所之后才会唤醒下一个线程,而共享锁在一个线程在获取到锁和释放掉锁锁之后,都可能会调用这个方法唤醒下一个线程
 * 因为在共享锁模式下,锁可以被多个线程所共同持有,既然当前线程已经拿到共享锁了,那么就可以直接通知后继结点来获取锁,而不必等待锁被释放的时候再通知。
 */
private void doReleaseShared() {
    /*一个死循环,跳出循环的条件就是最下面的break*/
    for (; ; ) {
        //获取当前的head,每次循环读取最新的head
        Node h = head;
        //如果h不为null且h不为tail,表示队列至少有两个结点,那么尝试唤醒head后继结点线程
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            //如果头结点的状态为SIGNAL,那么表示后继结点需要被唤醒
            if (ws == Node.SIGNAL) {
                //尝试CAS设置h的状态从Node.SIGNAL变成0
                //可能存在多线程操作,但是只会有一条成功
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    //失败的线程结束本次循环,继续下一次循环
                    continue;            // loop to recheck cases
                //成功的那一条线程会调用unparkSuccessor方法唤醒head的一个没有取消的后继结点
                //对于一个head,只需要一条线程去唤醒该head的后继就行了。上面的CAS就是保证unparkSuccessor方法对于一个head只执行一次
                unparkSuccessor(h);
            }
            /*
             * 如果h状态为0,那说明后继结点线程已经是唤醒状态了或者将会被唤醒,不需要该线程来唤醒
             * 那么尝试设置h状态从0变成PROPAGATE,如果失败则继续下一次循环,此时设置PROPAGATE状态能保证唤醒操作能够传播下去
             * 因为后继结点成为头结点时,在setHeadAndPropagate方法中能够读取到原head结点的PROPAGATE状态<0,从而让它可以尝试唤醒后继结点(如果存在)
             * */
            else if (ws == 0 &&
                    !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                //失败的线程结束本次循环,继续下一次循环
                continue;                // loop on failed CAS
        }
        // 执行到这一步说明在上面的判断中队列可能只有一个结点,或者unparkSuccessor方法调用完毕,或h状态为PROPAGATE(不需要继续唤醒后继)
        // 再次检查h是否仍然是最新的head,如果不是的话需要再进行循环;如果是的话说明head没有变化,退出循环
        if (h == head)                   // loop if head changed
            break;
    }
}

等待条件与通知流程

lock相当于synchronize,或者是mutex。而condition就好比objectMonitor的方法,或者是操作系统的cond条件变量。

一个lock可以生成多个不同的condition对象,也就是可以有多个不同的条件,对应多个条件队列,能在线程同步中实现更复杂的需求。

public abstract class AbstractQueuedSynchronizer
            extends AbstractOwnableSynchronizer
            implements java.io.Serializable {
    /**
     * 同步队列头节点
     */
    private transient volatile Node head;
    /**
     * 同步队列尾节点
     */
    private transient volatile Node tail;
    /**
     * 同步状态
     */
    private volatile int state;
    /**
     * Node节点的实现
     */
    static final class Node {
        //……
    }
    /**
     * 位于AQS内部的ConditionObject类,就是Condition的实现
     */
    public class ConditionObject implements Condition, java.io.Serializable {
        private static final long serialVersionUID = 1173984872572414699L;
        /**
         * 条件队列头结点引用
         */
        private transient Node firstWaiter;
        /**
         * 条件队列尾结点引用
         */
        private transient Node lastWaiter;
        //……
    }
}

ConditionObject中持有条件队列的头结点引用firstWaiter和尾结点引用lastWaiter。

队列内部的结构也是node节点,和AQS中的同步队列不同的是,条件队列是一个单链表,结点之间使用nextWaiter引用维持后继的关系,并不会用到prev, next属性,它们的值都为null,并且没有哨兵结点,大概结构如下:

05e8192efc5b44ffaa7c722a7eaa39d7.png

一个lock可以有多个condition对象,一个同步队列,多个条件队列

await

大概步骤为:

  1. 调用addConditionWaiter方法,将当前线程封装成Node.CONDITION类型的Node结点链接到条件队列尾部,返回新加的结点,该过程中将移除取消等待的结点。
  2. 调用fullyRelease方法,内部会调用通用release一次性释放当前线程所占用的所有的锁(重入锁),并返回取消时的同步状态state 值,这个值会在下次线程唤醒的时候重新用来竞争锁。
  3. 循环,调用isOnSyncQueue方法判断结点是否被转移到了同步队列中:

如果不在同步队列中,那么park挂起当前线程,不在执行后续代码。说明啥,说明条件队列的节点必须被移 动到同步队列。也就是在signal那边肯定有一处代码来进行移动。

  1. 到这一步,结点一定是加入同步队列中了。那么使用acquireQueued自旋获取独占锁,将锁重入次数原封不动的写回去。
/**
 * 位于ConditionObject中的方法
 * 当前线程进入等待状态,直到被通知或中断
 *
 * @throws InterruptedException 如果线程被中断,那么返回并抛出异常
 */
public final void await() throws InterruptedException {
    /*最开始就检查一次,如果当前线程是被中断状态,则清除已中断状态,并抛出异常*/
    if (Thread.interrupted())
        throw new InterruptedException();
    /*当前线程封装成Node.CONDITION类型的Node结点链接到条件队列尾部,返回新加的结点*/
    Node node = addConditionWaiter();
    /*尝试释放当前线程所占用的所有的锁,并保存当前的锁状态*/
    int savedState = fullyRelease(node);
    //中断模式,默认为0 表示没有中断,后面会介绍
    int interruptMode = 0;
/*循环检测,如果当前队列不在同步队列中,那么将当前线程继续挂起,停止执行后续代码,直到被通知/中断;
否则,表示已在同步队列中,直接跳出循环*/
    while (!isOnSyncQueue(node)) {
        //此处线程阻塞
        LockSupport.park(this);
        // 走到这一步说明可能是被其他线程通知唤醒了或者是因为线程中断而被唤醒
        // checkInterruptWhileWaiting检查线程被唤醒的原因,并且使用interruptMode字段记录中断模式
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            //如果是中断状态,则跳出循环,这说明中断状态也会离开条件队列加入同步队列
            break;
        /*如果没有中断,那就是因为signal或者signalAll方法的调用而被唤醒的,并且已经被加入到了同步队列中
         * 在下一次循环时,将不满足循环条件,而自动退出循环*/
    }
    /*
     * 到这一步,结点一定是加入同步队列中了
     * 那么使用acquireQueued自旋获取独占锁,第二个参数就是最开始释放锁时的同步状态,这里要将锁重入次数原封不动的写回去
     * 如果在获取锁的等待过程中被中断,并且之前的中断模式不为THROW_IE(可能是0),那么设置中断模式为REINTERRUPT,
     * 即表示在调用signal或者signalAll方法之后设置的中断状态
     * */
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    /*此时已经获取到了锁,那么实际上这个对应的结点就是head结点了
     *但是如果线程是 在调用signal或者signalAll方法之前就因为中断而被唤醒 的情况时,将结点添加到同步队列的的时候,并没有清除在条件队列中的结点引用
     *因此,判断nextWaiter是否不为null,如果是则还需要从条件队列中移除彻底移除这个结点。
     * */
    if (node.nextWaiter != null)
        //这里直接调用unlinkCancelledWaiters方法移除所有waitStatus不为CONDITION的结点
        unlinkCancelledWaiters();
    //如果中断模式不为0,那么调用reportInterruptAfterWait方法对不同的中断模式做出处理
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

signal

大概步骤如下:

1、检查调用signal方法的线程是否是持有锁的线程,如果不是则直接抛出IllegalMonitorStateException异常。

2、调用doSignal方法将等待时间最长的一个结点从条件队列转移至同步队列尾部,然后根据条件可能会尝试唤醒该结点对应的线程。

/**
 * Conditon中的方法
 * 将等待时间最长的结点移动到同步队列,然后unpark唤醒
 *
 * @throws IllegalMonitorStateException 如果当前调用线程不是获取锁的线程,则抛出异常
 */
public final void signal() {
    /*1 首先调用isHeldExclusively检查当前调用线程是否是持有锁的线程
     * isHeldExclusively方法需要我们重写
     * */
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    //获取头结点
    Node first = firstWaiter;
    /*2 如果不为null,调用doSignal方法将等待时间最长的一个结点从条件队列转移至同步队列尾部,然后根据条件可能会尝试唤醒该结点对应的线程。*/
    if (first != null)
        doSignal(first);
}
/**
 * AQS中的方法
 * 检测当前线程是否是持有独占锁的线程,该方法AQS没有提供实现(抛出UnsupportedOperationException异常)
 * 通常需要我们自己重写,一般重写如下!
 *
 * @return true 是;false 否
 */
protected final boolean isHeldExclusively() {
    //比较获取锁的线程和当前线程
    return getExclusiveOwnerThread() == Thread.currentThread();
}

dosignal

/**
 * Conditon中的方法
 * 从头结点开始向后遍历,从条件队列中移除等待时间最长的结点,并将其加入到同步队列
 * 在此期间会清理一些遍历时遇到的已经取消等待的结点。
 *
 * @param first 条件队列头结点
 */
private void doSignal(Node first) {
    /*从头结点开始向后遍历,唤醒等待时间最长的结点,并清理一些已经取消等待的结点*/
    do {
        //firstWaiter指向first的后继结点,并且如果为null,则lastWaiter也置为null,表示条件队列没有了结点
        if ((firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        //first的后继引用置空,这样就将first出队列了
        first.nextWaiter = null;
        /*循环条件
         * 1 调用transferForSignal转移结点,如果转移失败(结点已经取消等待了);
         * 2 则将first赋值为它的后继,并且如果不为null;
         * 满足上面两个条件,则继续循环
         * */
    } while (!transferForSignal(first) &&
            (first = firstWaiter) != null);
}

transferForSignal会尝试将遍历到的结点转移至同步队列中,调用该方法之前并没有显示的判断结点是不是处于等待状态,而是在该方法中通过CAS的结果来判断。

大概步骤为:

1、尝试CAS将结点等待状态从Node.CONDITION更新为0。这里不存在并发的情况,因为调用线程此时已经获取了独占锁,因此如果更改等待状态失败,那说明该结点原本就不是Node.CONDITION状态,表示结点早已经取消等待了,则直接返回false,表示转移失败。

2、CAS成功,则表示该结点是处于等待状态,那么调用enq将结点添加到同步队列尾部,返回添加结点在同步队列中的前驱结点。

3、获取前驱结点的状态ws。如果ws大于0,则表示前驱已经被取消了或者将ws改为Node.SIGNAL失败,表示前驱可能在此期间被取消了,那么调用unpark方法唤醒被转移结点中的线程,好让它从await中的等待中醒来;否则,那就由它的前驱结点在获取锁之后释放锁时再唤醒。返回true。

/**
 * 将结点从条件队列转移到同步队列,并尝试唤醒
 *
 * @param node 被转移的结点
 * @return 如果成功转移,返回true;失败则返回false
 */
final boolean transferForSignal(Node node) {
    /*1 尝试将结点的等待状态变成0,表示取消等待
    如果更改等待状态失败,那说明一定是原本就不是Node.CONDITION状态,表示结点早已经取消等待了,则返回false。
    这里不存在并发的情况,因为调用线程此时已经获取了独占锁*/
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
    /*2 将结点添加到同步队列尾部,返回添加结点的前驱结点*/
    Node p = enq(node);
    //获取前驱结点的状态ws
    int ws = p.waitStatus;
    /*3 如果ws大于0 表示前驱已经被取消了 或者 将ws改为Node.SIGNAL失败,表示前驱可能在此期间被取消了
    则调用unpark方法唤醒被转移结点中的线程,好让它从await中的等待唤醒(后续尝试获取锁)
    否则,那就由它的前驱结点获取锁之后释放锁时再唤醒。
    */
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    //返回true
    return true;
}

总结

条件队列中,很多操作都不需要用到cas,是因为条件队列await和signal的前提,就是线程已经获取到了锁,所以不会有很多的并发问题。

d0dae4e4fd6942e7b580a6fcaafbd5a2.png

参考

https://www.yuque.com/snab/java/agbmupgrg4159wx7#ekKlb


相关文章
|
2月前
|
监控 Java 调度
【Java学习】多线程&JUC万字超详解
本文详细介绍了多线程的概念和三种实现方式,还有一些常见的成员方法,CPU的调动方式,多线程的生命周期,还有线程安全问题,锁和死锁的概念,以及等待唤醒机制,阻塞队列,多线程的六种状态,线程池等
134 6
【Java学习】多线程&JUC万字超详解
|
5月前
|
NoSQL Redis
Redis系列学习文章分享---第五篇(Redis实战篇--优惠券秒杀,全局唯一id 添加优惠券 实现秒杀下单 库存超卖问题分析 乐观锁解决超卖 实现一人一单功能 集群下的线程并发安全问题)
Redis系列学习文章分享---第五篇(Redis实战篇--优惠券秒杀,全局唯一id 添加优惠券 实现秒杀下单 库存超卖问题分析 乐观锁解决超卖 实现一人一单功能 集群下的线程并发安全问题)
127 0
|
5月前
|
调度 Python
Python多线程学习优质方法分享
Python多线程学习优质方法分享
26 0
|
5月前
|
安全 API C++
逆向学习Windows篇:C++中多线程的使用和回调函数的实现
逆向学习Windows篇:C++中多线程的使用和回调函数的实现
187 0
|
6月前
|
Java 调度
【JAVA学习之路 | 提高篇】进程与线程(Thread)
【JAVA学习之路 | 提高篇】进程与线程(Thread)
|
6月前
|
安全 Java
java-多线程学习记录
java-多线程学习记录
|
5月前
|
Java
Java线程学习经典例子-读写者演示
Java线程学习经典例子-读写者演示
23 0
|
6月前
|
Java 调度
【JAVA学习之路 | 提高篇】线程的通信
【JAVA学习之路 | 提高篇】线程的通信
|
6月前
|
存储 Java
【JAVA学习之路 | 提高篇】线程安全问题及解决
【JAVA学习之路 | 提高篇】线程安全问题及解决
|
6月前
|
Java
【JAVA学习之路 | 提高篇】创建与启动线程之二(继承Thread类)(实现Runnable接口)
【JAVA学习之路 | 提高篇】创建与启动线程之二(继承Thread类)(实现Runnable接口)