AQS源码探究_06 Conditon条件队列(await方法、线程入队与挂起、signal方法)

简介: AQS源码探究_06 Conditon条件队列(await方法、线程入队与挂起、signal方法)

提示:读源码的时候尽量自己点开源码跟着博客注释一起看,不然容易迷路~

1、条件队列流程图

image.png

2、Condition接口

public interface Condition {
  // 线程等待,可抛出中断异常(可以响应中断)
  void await() throws InterruptedException;
  // 线程等待,但是不可响应中断
  void awaitUninterruptibly();
  // 线程超时等待
  boolean await(long time, TimeUnit unit) throws InterruptedException;
  // 线程等待,直到收到被唤醒、或收到中断信号、或到了指定的截止时间日期自动唤醒
  boolean awaitUntil(Date deadline) throws InterruptedException;
  // 线程唤醒
  void signal();
  // 唤醒所有线程
  void signalAll();
}

3、AQS中实现Condition接口的内部类ConditionObject

// 位于AQS中的内部类:
public class ConditionObject implements Condition, java.io.Serializable {
    private static final long serialVersionUID = 1173984872572414699L;
    // 队列的第一个等待者节点:头等待者
    private transient Node firstWaiter;
    // 队列的最后一个等待者节点:尾等待者
    private transient Node lastWaiter;
    /**
     * 无参构造函数:
     * 在ReentrantLock中的newCondition()方法内,借助ReentrantLock的静态内部类Sync,去调用
     * newCondition方法实例化ConditionObject
     */
    public ConditionObject() { }
    ...
}


接下来分析一下ConditionObject的内部方法:

3.1 await线程等待方法

// 线程等待,可抛出中断异常(可以响应中断)
public final void await() throws InterruptedException {
    // 判断当前线程是否是中断状态:
    if (Thread.interrupted())
        // 如果是中断状态,则直接抛出中断异常
        throw new InterruptedException();
    // 把当前线程包装成一个node,放入条件队列中,并返回封装当前线程的node
    Node node = addConditionWaiter();
    // 完全释放当前线程对应的锁(将state置为0):
    // 为什么要释放锁呢? 因为加着锁挂起的时候,除了本线程能唤醒(本线程已挂起),其他线程都没办法唤醒啊~
    int savedState = fullyRelease(node);
    // Condition队列中断状态:
    // 0: 在Condition队列挂起期间,未接收过中断信号~
    // -1: 在Condition队列挂起期间,接收到中断信号~
    // 1: 在Condition队列挂起期间,未接收到中断信号,但是迁移到阻塞队列之后,接收到过中断信号~
    int interruptMode = 0;
    // while循环条件:判断当前node节点是否在阻塞队列中: 
    // isOnSyncQueue返回true:表示当前线程对应的node已经迁移到了阻塞队列中了
    // isOnSyncQueue返回false:说明当前node仍还在条件队列中,需要park挂起~
    while (!isOnSyncQueue(node)) {
        // 挂起当前线程,使其处于等待状态~
        LockSupport.park(this);
        //什么时候会被唤醒?都有几种情况呢?
        // 1.常规路径:外部线程获取到lock之后,调用了 signal()方法 转移条件队列的头节点到 阻塞队列, 当这个节点获取到锁后,会唤醒。
        // 2.转移至阻塞队列后,发现阻塞队列中的前驱节点状态 是 取消状态,此时会唤醒当前节点
        // 3.当前节点挂起期间,被外部线程使用中断唤醒..
        // checkInterruptWhileWaiting: 即使在Condition队列挂起期间,线程发生中断了,对应的node仍然会被迁移到阻塞队列中(等待去获取锁)~
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 执行到这里,说明当前node已经迁移到阻塞队列了
    // acquireQueued:竞争队列的逻辑,线程节点竞争资源
    // 条件一:返回true 表示在阻塞队列中 被外部线程中断唤醒过..
    // 条件二:interruptMode != THROW_IE 成立,说明当前node在条件队列内 未发生过中断
    // 设置interruptMode = REINTERRUPT
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    // 考虑下 node.nextWaiter != null 条件什么时候成立呢?
    // 其实是node在条件队列内时 如果被外部线程 中断唤醒时,会加入到阻塞队列,但是并未设置nextWaiter = null。
    if (node.nextWaiter != null) // clean up if cancelled
        // 清理条件队列内取消状态的节点..
        unlinkCancelledWaiters();
    // 条件成立:说明挂起期间 发生过中断(1.条件队列内的挂起 2.条件队列之外的挂起)
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}
// 把当前线程包装成一个node,放入条件队列中,并返回封装当前线程的node
// 注意:调用await方法的线程都是持锁状态的线程,也就是说:在await方法中调用的addConditionWaiter方法不存在并发问题~
private Node addConditionWaiter() {
    // 获取当前条件队列的尾节点的引用,保存到局部变量t中
    Node t = lastWaiter;
    // 条件1:t != null 成立:说明当前条件队列中,已经有node元素了
    // 条件2:(node在队列中时,它的状态是 CONDITION(-2))
    // t.waitStatus != Node.CONDITION 成立:说明当前node发生中断了...
    if (t != null && t.waitStatus != Node.CONDITION) {
        // 清理条件队列中,所有CONDITION(-2)取消状态的node节点
        unlinkCancelledWaiters();
        // 更新局部变量t 为最新的尾结点的引用~ 
        // 因为unlinkCancelledWaiters方法可能会更改lastWaiter的引用
        t = lastWaiter;
    }
    // 为当前线程创建node节点,设置状态为 CONDITION(-2)
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    // 如果t == null 条件成立,说明条件队列中没有任何元素,当前线程是进入队列的第一个元素
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    // 更新队尾节点引用指向node
    lastWaiter = node;
    // 返回包装当前线程的node
    return node;
}
// 即使在Condition队列挂起期间,线程发生中断了,对应的node仍然会被迁移到阻塞队列中(等待去获取锁)~
private int checkInterruptWhileWaiting(Node node) {
    // Thread.interrupted() 返回当前线程中断标记位,并且重置当前标记位为false
    return Thread.interrupted() ?
            // transferAfterCancelledWait 这个方法只有在线程是被中断唤醒时才会调用!
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0;
}
final boolean transferAfterCancelledWait(Node node) {
    // 条件成立:说明当前node一定是在 条件队列内,因为signal 迁移节点到阻塞队列时,会将节点的状态修改为0
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
        // 中断唤醒的node也会被加入到 阻塞队列中!!
        enq(node);
        // true:表示是在条件队列内被中断的.
        return true;
    }
    // 执行到这里有几种情况?
    // 1.当前node已经被外部线程调用 signal 方法将其迁移到 阻塞队列内了。
    // 2.当前node正在被外部线程调用 signal 方法将其迁移至 阻塞队列中 进行中状态..
    while (!isOnSyncQueue(node))
        Thread.yield();
    // false:表示当前节点被中断唤醒时 不在 条件队列了..
    return false;
}
// 清理条件队列内取消状态的节点..(纯链表操作的方法)
private void unlinkCancelledWaiters() {
    // 表示循环当前节点,从链表的第一个节点开始 向后迭代处理.
    Node t = firstWaiter;
    // 当前链表上一个正常状态的node节点
    Node trail = null;
    while (t != null) {
        // 当前节点的下一个节点.
        Node next = t.nextWaiter;
        // 条件成立:说明当前节点状态为 取消状态
        if (t.waitStatus != Node.CONDITION) {
            // 更新nextWaiter为null
            t.nextWaiter = null;
            // 条件成立:说明遍历到的节点还未碰到过正常节点..
            if (trail == null)
                // 更新firstWaiter指针为下个节点就可以
                firstWaiter = next;
            else
                // 让上一个正常节点指向 取消节点的 下一个节点..中间有问题的节点 被跳过去了..
                trail.nextWaiter = next;
            // 条件成立:当前节点为队尾节点了,更新lastWaiter 指向最后一个正常节点 就Ok了
            if (next == null)
                lastWaiter = trail;
        }
        else// 条件不成立执行到else,说明当前节点是正常节点
            trail = t;
        t = next;
    }
}
private void reportInterruptAfterWait(int interruptMode)
    throws InterruptedException {
    // 条件成立:说明在条件队列内发生过中断,此时await方法抛出中断异常
    if (interruptMode == THROW_IE)
        throw new InterruptedException();
    // 条件成立:说明在条件队列外发生的中断,此时设置当前线程的中断标记位 为true
    // 中断处理 交给 你的业务处理。 如果你不处理,那什么事 也不会发生了...
    else if (interruptMode == REINTERRUPT)
        selfInterrupt();
}

3.2 fullyRelease完全释放当前线程锁的方法

// 位于AQS中:
// 完全释放当前线程对应的锁(将state置为0):
final int fullyRelease(Node node) {
    // 完全释放锁是否成功,当failed失败时,说明当前线程是未持有锁调用await方法的线程(错误写法...)
    // 假设失败,在finally代码块中,会将刚刚加入到条件队列的,当前线程对应的node节点状态修改为取消状态
    // 后继线程就会将取消状态的节点给清理出去~ 
    boolean failed = true;
    try {
        // 获取当前线程所持有的state值。
        int savedState = getState();
        // 绝大部分情况下:release这里会返回true
        if (release(savedState)) {
            // 失败标记设置为false
            failed = false;
            // 返回当前线程释放的state值
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}

3.3 isOnSyncQueue判断当前node节点是否在阻塞队列中

// 位于AQS中:
// 判断当前node节点是否在阻塞队列中
final boolean isOnSyncQueue(Node node) {
    // 条件一:node.waitStatus == Node.CONDITION 条件成立:说明当前node一定是在条件队列中,
    // 因为signal方法迁移节点到阻塞队列前,会将node的状态设置为0
    // 条件二:前置条件node.waitStatus != Node.CONDITION ===>然后再细分以下几种情况:
    // 1.node.waitStatus == 0 (表示当前节点已经被singal)
    // 2.node.waitStatus == 1 (表示当前线程未持有锁就调用了await方法,最终会将node的状态修改为取消状态...)
    // node.waitStatus == 0 为什么还要判断node.prev == null呢?
    // 因为:signal方法是先修改状态,再迁移
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    // 执行到这里,会是哪种情况?
    // node.waitStatus != CONDITION 且 node.prev != null  ===> 可以排除掉 node.waitStatus == 1 取消状态..
    // 为什么可以排除取消状态? 因为signal方法是不会把 取消状态的node迁移走的
    // 设置prev引用的逻辑 是 迁移 阻塞队列 逻辑的设置的(enq())
    // 入队的逻辑:1.设置node.prev = tail;   2. cas当前node为 阻塞队列的 tail 尾节点 成功才算是真正进入到 阻塞队列! 3.pred.next = node;
    // 可以推算出,就算prev不是null,也不能说明当前node 已经成功入队到 阻塞队列了。
    // 条件成立:说明当前节点已经成功入队到阻塞队列,且当前节点后面已经有其它node了...
    if (node.next != null)
        return true;
    /**
     * 执行到这里,说明当前节点的状态为:node.prev != null 且 node.waitStatus == 0
     * findNodeFromTail 从阻塞队列的尾巴开始向前遍历查找node,如果查找到 返回true,查找不到返回false
     * 当前node有可能正在signal过程中,正在迁移中...还未完成...
     */
    return findNodeFromTail(node);
}

3.4 signal线程唤醒方法

// 位于AQS中:线程唤醒方法
public final void signal() {
    // 判断调用signal方法的线程是否是独占锁持有线程,如果不是,直接抛出异常..
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    // 获取条件队列的一个node
    Node first = firstWaiter;
    // 第一个节点不为null,则将第一个节点 进行迁移到阻塞队列的逻辑..
    if (first != null)
        doSignal(first);
}
// 位于AQS中:将第一个节点 进行迁移到阻塞队列 
private void doSignal(Node first) {
    do {
        // firstWaiter = first.nextWaiter因为当前first马上要出条件队列了,
        // 所以更新firstWaiter为 当前节点的下一个节点..
        // 如果当前节点的下一个节点 是 null,说明条件队列只有当前一个节点了...
        // 当前出队后,整个队列就空了..所以需要更新lastWaiter = null
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        // 当前first节点 出 条件队列。断开和下一个节点的关系
        first.nextWaiter = null;
        // transferForSignal(first) 返回boolean类型
        // 返回true 表示当前first节点迁移到阻塞队列成功  返回false 表示迁移失败...
        // while循环 :(first = firstWaiter) != null  
        // 当前first迁移失败,则将first更新为 first.next 继续尝试迁移..
        // 直至迁移某个节点成功,或者 条件队列为null为止。
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

3.4 transferForSignal当前节点迁移到阻塞队列

// 位于AQS:当前节点迁移到阻塞队列:
// 返回true 表示当前first节点迁移到阻塞队列成功  返回false 表示迁移失败...
final boolean transferForSignal(Node node) {
    // cas修改当前节点的状态,修改为0,因为当前节点马上要迁移到 阻塞队列了
    // 成功:当前节点在条件队列中状态正常。
    // 失败: 1.取消状态 (线程await时 未持有锁,最终线程对应的node会设置为 取消状态)
    //       2.node对应的线程 挂起期间,被其它线程使用 中断信号 唤醒过...
    // (就会主队进入到 阻塞队列,这时也会修改状态为0)
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
    // enq最终会将当前 node 入队到 阻塞队列,p 是当前节点在阻塞队列的 前驱节点.
    Node p = enq(node);
    // ws 前驱节点的状态..
    int ws = p.waitStatus;
    // 条件一:ws > 0 成立:说明前驱节点的状态在阻塞队列中是 取消状态,唤醒当前节点。
    // 条件二:前置条件(ws <= 0),
    // compareAndSetWaitStatus(p, ws, Node.SIGNAL) 返回true 表示设置前驱节点状态为 SIGNAl状态成功
    // compareAndSetWaitStatus(p, ws, Node.SIGNAL) 返回false  ===> 什么时候会false?
    // 当前驱node对应的线程 是 lockInterrupt入队的node时,是会响应中断的,外部线程给前驱线程中断信号之后,前驱node会将
    // 状态修改为 取消状态,并且执行 出队逻辑..
    // 前驱节点状态 只要不是 0 或者 -1 那么,就唤醒当前线程。
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        // 唤醒当前node对应的线程...回头再说。
        LockSupport.unpark(node.thread);
    return true;
}


相关文章
|
21天前
|
缓存 监控 Java
Java线程池提交任务流程底层源码与源码解析
【11月更文挑战第30天】嘿,各位技术爱好者们,今天咱们来聊聊Java线程池提交任务的底层源码与源码解析。作为一个资深的Java开发者,我相信你一定对线程池并不陌生。线程池作为并发编程中的一大利器,其重要性不言而喻。今天,我将以对话的方式,带你一步步深入线程池的奥秘,从概述到功能点,再到背景和业务点,最后到底层原理和示例,让你对线程池有一个全新的认识。
50 12
|
1月前
|
缓存 安全 C++
C++无锁队列:解锁多线程编程新境界
【10月更文挑战第27天】
49 7
|
1月前
|
消息中间件 存储 安全
|
2月前
|
Java 开发者
在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口
【10月更文挑战第20天】在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口。本文揭示了这两种方式的微妙差异和潜在陷阱,帮助你更好地理解和选择适合项目需求的线程创建方式。
27 3
|
2月前
|
Java 开发者
在Java多线程编程中,选择合适的线程创建方法至关重要
【10月更文挑战第20天】在Java多线程编程中,选择合适的线程创建方法至关重要。本文通过案例分析,探讨了继承Thread类和实现Runnable接口两种方法的优缺点及适用场景,帮助开发者做出明智的选择。
23 2
|
2月前
|
安全 Java
Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧
【10月更文挑战第20天】Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧,包括避免在循环外调用wait()、优先使用notifyAll()、确保线程安全及处理InterruptedException等,帮助读者更好地掌握这些方法的应用。
24 1
|
2月前
|
Java 开发者
Java多线程初学者指南:介绍通过继承Thread类与实现Runnable接口两种方式创建线程的方法及其优缺点
【10月更文挑战第20天】Java多线程初学者指南:介绍通过继承Thread类与实现Runnable接口两种方式创建线程的方法及其优缺点,重点解析为何实现Runnable接口更具灵活性、资源共享及易于管理的优势。
44 1
|
2月前
|
Java
在Java多线程编程中,`wait()`和`notify()`方法的相遇如同一场奇妙的邂逅
在Java多线程编程中,`wait()`和`notify()`方法的相遇如同一场奇妙的邂逅。它们用于线程间通信,使线程能够协作完成任务。通过这些方法,生产者和消费者线程可以高效地管理共享资源,确保程序的有序运行。正确使用这些方法需要遵循同步规则,避免虚假唤醒等问题。示例代码展示了如何在生产者-消费者模型中使用`wait()`和`notify()`。
32 1
|
2月前
|
安全 Java 开发者
Java多线程中的`wait()`、`notify()`和`notifyAll()`方法,探讨了它们在实现线程间通信和同步中的关键作用
本文深入解析了Java多线程中的`wait()`、`notify()`和`notifyAll()`方法,探讨了它们在实现线程间通信和同步中的关键作用。通过示例代码展示了如何正确使用这些方法,并分享了最佳实践,帮助开发者避免常见陷阱,提高多线程程序的稳定性和效率。
49 1
|
2月前
|
Java
在Java多线程编程中,`wait()` 和 `notify()/notifyAll()` 方法是线程间通信的核心机制。
在Java多线程编程中,`wait()` 和 `notify()/notifyAll()` 方法是线程间通信的核心机制。它们通过基于锁的方式,使线程在条件不满足时进入休眠状态,并在条件成立时被唤醒,从而有效解决数据一致性和同步问题。本文通过对比其他通信机制,展示了 `wait()` 和 `notify()` 的优势,并通过生产者-消费者模型的示例代码,详细说明了其使用方法和重要性。
31 1