资深大厂JAVA架构师带你剖析Condition源码

简介: Condition是JUC里面提供于控制线程释放锁, 然后进行等待其他获取锁的线程发送 signal 信号来进行唤醒的工具类.
  1. Condition 定义

Condition是JUC里面提供于控制线程释放锁, 然后进行等待其他获取锁的线程发送 signal 信号来进行唤醒的工具类.
主要特点:

Condition内部主要是由一个装载线程节点 Node 的 Condition Queue 实现
对 Condition 的方法(await, signal等) 的调用必需是在本线程获取了独占锁的前提下
因为 操作Condition的方法的前提是获取独占锁, 所以 Condition Queue 内部是一条不支持并发安全的单向 queue (这是相对于 AQS 里面的 Sync Queue)

先看一下一个常见的 demo

import org.apache.log4j.Logger;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**

  • 此demo用于测试 condition
  • Created by xujiankang on 2017/2/8.
    */
    public class ConditionTest {

    private static final Logger logger = Logger.getLogger(ConditionTest.class);

    static final Lock lock = new ReentrantLock();
    static final Condition condition = lock.newCondition();

    public static void main(String[] args) throws Exception{

     final Thread thread1 = new Thread("Thread 1 "){
         @Override
         public void run() {
             lock.lock(); // 线程 1获取 lock
             logger.info(Thread.currentThread().getName() + " 正在运行 .....");
    
             try {
                 Thread.sleep(2 * 1000);
                 logger.info(Thread.currentThread().getName() + " 停止运行, 等待一个 signal ");
                 condition.await(); // 调用 condition.await 进行释放锁, 将当前节点封装成一个 Node 放入 Condition Queue 里面, 等待唤醒
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
             logger.info(Thread.currentThread().getName() + " 获取一个 signal, 继续执行 ");
             lock.unlock(); // 释放锁
         }
     };
    
     thread1.start();  // 线程 1 线运行
    
     Thread.sleep(1 * 1000);
    
     Thread thread2 = new Thread("Thread 2 "){
         @Override
         public void run() {
             lock.lock();        // 线程 2获取lock
             logger.info(Thread.currentThread().getName() + " 正在运行.....");
             thread1.interrupt(); // 对线程1 进行中断 看看中断后会怎么样? 结果 线程 1还是获取lock, 并且最后还进行 lock.unlock()操作
    
             try {
                 Thread.sleep(2 * 1000);
             }catch (Exception e){
    
             }
             condition.signal(); // 发送唤醒信号 从 AQS 的 Condition Queue 里面转移 Node 到 Sync Queue
             logger.info(Thread.currentThread().getName() + " 发送一个 signal ");
             logger.info(Thread.currentThread().getName() + " 发送 signal 结束");
             lock.unlock(); // 线程 2 释放锁
         }
     };
    
     thread2.start();
    

    }

}

整个执行步骤

线程 1 开始执行, 获取 lock, 然后开始睡眠 2秒
当线程1睡眠到 1秒时, 线程2开始执行, 但是lock被线程1获取, 所以 等待
线程 1 睡足2秒 调用 condition.await() 进行锁的释放, 并且将 线程1封装成一个 node 放到 condition 的 Condition Queue里面, 等待其他获取锁的线程给他 signal, 或对其进行中断(中断后可以到 Sync Queue里面进而获取 锁)
线程 2 获取锁成功, 中断 线程1, 线程被中断后, node 从 Condition Queue 转移到 Sync Queue 里面, 但是 lock 还是被 线程2获取者, 所以 node呆在 Sync Queue 里面等待获取 lock
线程 2睡了 2秒, 开始 用signal唤醒 Condition Queue 里面的节点(此时代表 线程1的node已经到 Sync Queue 里面)
线程 2释放lock, 并且在 Sync Queue 里面进行唤醒等待获取锁的节点 node
线程1 得到唤醒, 获取锁
线程1 释放锁

执行结果

[2017-02-08 22:43:09,557] INFO Thread 1 (ConditionTest.java:26) - Thread 1 正在运行 .....
[2017-02-08 22:43:11,565] INFO Thread 1 (ConditionTest.java:30) - Thread 1 停止运行, 等待一个 signal
[2017-02-08 22:43:11,565] INFO Thread 2 (ConditionTest.java:48) - Thread 2 正在运行.....
java.lang.InterruptedException
[2017-02-08 22:43:13,566] INFO Thread 2 (ConditionTest.java:57) - Thread 2 发送一个 signal
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
[2017-02-08 22:43:13,566] INFO Thread 2 (ConditionTest.java:58) - Thread 2 发送 signal 结束
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
[2017-02-08 22:43:13,567] INFO Thread 1 (ConditionTest.java:35) - Thread 1 获取一个 signal, 继续执行
at com.lami.tuomatuo.search.base.concurrent.aqs.ConditionTest$1.run(ConditionTest.java:31)

  1. Condition 构造函数级基本属性

主要是Condition Queue 的头尾节点(这里头尾节点不需要进行初始化)

/ First node of condition queue */
/
Condition Queue 里面的头节点 /
private transient Node firstWaiter;
/** Last node of condition queue
/
/* Condition Queue 里面的尾节点 /
private transient Node lastWaiter;

/ Creates a new {@code ConditionObject} instance */
/
构造函数 */
public ConditionObject(){}

  1. Condition Queue enqueue节点方法 addConditionWaiter

addConditionWaiter方法主要用于调用 Condition.await 时将当前节点封装成 一个Node, 加入到 Condition Queue里面

大家可以注意下, 下面对 Condition Queue 的操作都没考虑到 并发(Sync Queue 的队列是支持并发操作的), 这是为什么呢? 因为在进行操作 Condition 是当前的线程已经获取了AQS的独占锁, 所以不需要考虑并发的情况

/**

  • Adds a new waiter to wait queue
  • 将当前线程封装成一个 Node 节点 放入大 Condition Queue 里面
  • 大家可以注意到, 下面对 Condition Queue 的操作都没考虑到 并发(Sync Queue 的队列是支持并发操作的), 这是为什么呢? 因为在进行操作 Condition 是当前的线程已经获取了AQS的独占锁, 所以不需要考虑并发的情况
  • @return
    */
    private Node addConditionWaiter(){
    Node t = lastWaiter; // 1. Condition queue 的尾节点
    // If lastWaiter is cancelled, clean out // 2.尾节点已经Cancel, 直接进行清除,
                                                       //    这里有1个问题, 1 何时出现t.waitStatus != Node.CONDITION -> 在对线程进行中断时 ConditionObject -> await -> checkInterruptWhileWaiting -> transferAfterCancelledWait "compareAndSetWaitStatus(node, Node.CONDITION, 0)" <- 导致这种情况一般是 线程中断或 await 超时
                                                       //    一个注意点: 当Condition进行 awiat 超时或被中断时, Condition里面的节点是没有被删除掉的, 需要其他 await 在将线程加入 Condition Queue 时调用addConditionWaiter而进而删除, 或 await 操作差不多结束时, 调用 "node.nextWaiter != null" 进行判断而删除 (PS: 通过 signal 进行唤醒时 node.nextWaiter 会被置空, 而中断和超时时不会)
    
    if(t != null && t.waitStatus != Node.CONDITION){
     unlinkCancelledWaiters();                        // 3. 调用 unlinkCancelledWaiters 对 "waitStatus != Node.CONDITION" 的节点进行删除(在Condition里面的Node的waitStatus 要么是CONDITION(正常), 要么就是 0 (signal/timeout/interrupt))
     t = lastWaiter;                                // 4. 获取最新的 lastWaiter
    
    }
    Node node = new Node(Thread.currentThread(), Node.CONDITION); // 5. 将线程封装成 node 准备放入 Condition Queue 里面
    if(t == null){
     firstWaiter = node;                           // 6 .Condition Queue 是空的
    
    }else{
     t.nextWaiter = node;                          // 7. 最加到 queue 尾部
    
    }
    lastWaiter = node; // 8. 重新赋值 lastWaiter
    return node;
    }
  1. Condition 唤醒 first节点方法 doSignal

这里的唤醒指的是将节点从 Condition Queue 转移到 Sync Queue 里面

/**

  • Removes and transfers nodes until hit non-cancelled one or
  • null. Split out from signal in part to encourage compilers
  • to inline the case of no waiters
  • @param first
    /
    /*
  • 唤醒 Condition Queue 里面的头节点, 注意这里的唤醒只是将 Node 从 Condition Queue 转到 Sync Queue 里面(这时的 Node 也还是能被 Interrupt)
    */
    private void doSignal(Node first){
    do{
     if((firstWaiter = first.nextWaiter) == null){ // 1. 将 first.nextWaiter 赋值给 nextWaiter 为下次做准备
         lastWaiter = null;                          // 2. 这时若 nextWaiter == null, 则说明 Condition 为空了, 所以直接置空 lastWaiter
     }
     first.nextWaiter = null;                        // 3.  first.nextWaiter == null 是判断 Node 从 Condition queue 转移到 Sync Queue 里面是通过 signal 还是 timeout/interrupt
    
    }while(!transferForSignal(first) && (first = firstWaiter) != null); // 4. 调用 transferForSignal将 first 转移到 Sync Queue 里面, 返回不成功的话, 将 firstWaiter 赋值给 first
    }
  1. Condition 唤醒 所有 节点方法 doSignalAll

/**

  • Removes and transfers all nodes
  • @param first (non-null) the first node on condition queue
    /
    /*
  • 唤醒 Condition Queue 里面的所有的节点
    */
    private void doSignalAll(Node first){
    lastWaiter = firstWaiter = null; // 1. 将 lastWaiter, firstWaiter 置空
    do{
     Node next = first.nextWaiter;        // 2. 初始化下个换新的节点
     first.nextWaiter = null;            // 3.  first.nextWaiter == null 是判断 Node 从 Condition queue 转移到 Sync Queue 里面是通过 signal 还是 timeout/interrupt
     transferForSignal(first);             // 4. 调用  transferForSignal将 first 转移到 Sync Queue 里面
     first = next;                         // 5. 开始换新 next 节点
    
    }while(first != null);
    }
  1. Condition 删除取消节点的方法 unlinkCancelledWaiters

一般的节点都会被 signal 唤醒, 从 Condition Queue 转移到 Sync Queue, 而若遇到 interrupt 或 等待超时, 则直接改变 node 的状态(从 CONDITION 变成 0), 并直接放入 Sync 里面, 而不清理Condition Queue 里面的节点, 所以需要下面的函数

/**

  • http://czj4451.iteye.com/blog/1483264
    *
  • Unlinks cancelled waiter nodes from condition queue
  • Called only while holding lock. This is called when
  • cancellation occured during condition wait, and upon
  • insertion of a new waiter when lastWaiter is seen to have
  • been cancelled. This method is needed to avoid garbage
  • retention in the absence of signals. So even though it may
  • require a full traversal, it comes intot play when
  • timeouts or cancellations all nodes rather than stoppping at a
  • particular target to unlink all pointers to garbege nodes
  • without requiring many re-traversals during cancellation
  • storms
    /
    /*
  • 在 调用 addConditionWaiter 将线程放入 Condition Queue 里面时 或 awiat 方法获取 差不多结束时 进行清理 Condition queue 里面的因 timeout/interrupt 而还存在的节点
  • 这个删除操作比较巧妙, 其中引入了 trail 节点, 可以理解为traverse整个 Condition Queue 时遇到的最后一个有效的节点
    */
    private void unlinkCancelledWaiters(){
    Node t = firstWaiter;
    Node trail = null;
    while(t != null){
     Node next = t.nextWaiter;               // 1. 先初始化 next 节点
     if(t.waitStatus != Node.CONDITION){   // 2. 节点不有效, 在Condition Queue 里面 Node.waitStatus 只有可能是 CONDITION 或是 0(timeout/interrupt引起的)
         t.nextWaiter = null;               // 3. Node.nextWaiter 置空
         if(trail == null){                  // 4. 一次都没有遇到有效的节点
             firstWaiter = next;            // 5. 将 next 赋值给 firstWaiter(此时 next 可能也是无效的, 这只是一个临时处理)
         }else{
             trail.nextWaiter = next;       // 6. next 赋值给 trail.nextWaiter, 这一步其实就是删除节点 t
         }
         if(next == null){                  // 7. next == null 说明 已经 traverse 完了 Condition Queue
             lastWaiter = trail;
         }
     }else{
         trail = t;                         // 8. 将有效节点赋值给 trail
     }
     t = next;
    
    }
    }

毫无疑问, 这是一段非常精巧的queue节点删除, 主要还是在 节点 trail 上, trail 节点可以理解为traverse整个 Condition Queue 时遇到的最后一个有效的节点

  1. Condition 唤醒首节点方法 signal

/**

  • Moves the longest-waiting thread, if one exists, from the
  • wait queue for this condition to the wait queue for the
  • owning lock
    *
  • @throws IllegalMonitorStateException if{@link #isHeldExclusively()}
  • returns {@code false}
    /
    /*
  • 将 Condition queue 的头节点转移到 Sync Queue 里面
  • 在进行调用 signal 时, 当前的线程必须获取了 独占的锁
    */
    @Override
    public void signal() {
    if(!isHeldExclusively()){ // 1. 判断当前的线程是否已经获取 独占锁

     throw new IllegalMonitorStateException();
    

    }
    Node first = firstWaiter;
    if(first != null){

     doSignal(first);           // 2. 调用 doSignal 进行转移
    

    }
    }

    上述面试题答案都整理成文档笔记。 也还整理了一些面试资料&最新2020收集的一些大厂的面试真题(都整理成文档,小部分截图),有需要的可以 点击进入暗号:csdn 。

  1. Condition 唤醒所有节点方法 signalAll

/**

  • Moves all threads from the wait queue for this condition to
  • the wait queue for the owning lock
    *
  • @throws IllegalMonitorStateException if {@link #isHeldExclusively()}
  • return {@code false}
    /
    /*
  • 将 Condition Queue 里面的节点都转移到 Sync Queue 里面
    */
    public final void signalAll(){
    if(!isHeldExclusively()){
     throw new IllegalMonitorStateException();
    
    }
    Node first = firstWaiter;
    if(first != null){
     doSignalAll(first);
    
    }
    }
  1. Condition 释放锁进行等待方法 awaitUninterruptibly

awaitUninterruptibly 方法是一个不响应 中断的方法
整个流程

将当前的线程封装成 Node 加入到 Condition 里面
丢到当前线程所拥有的 独占锁
等待 其他获取 独占锁的线程的唤醒, 唤醒从 Condition Queue 到 Sync Queue 里面, 进而获取 独占锁
最后获取 lock 之后, 在根据线程唤醒的方式(signal/interrupt) 进行处理

/**

  • Implements uninterruptible condition wait
  • Save lock state return by {@link #getState()}

  • *
  • Invoke {@link #release(int)} with saved state as argument,
  • throwing IllegalMonitoringStateException if it fails
  • Block until signalled
  • Reacquire by invoking specified version of
  • {@link #acquire(int)} with saved state as argument

  • /
    /*
  • 不响应线程中断的方式进行 await
    */
    public final void awaitUninterruptibly(){
    Node node = addConditionWaiter(); // 1. 将当前线程封装成一个 Node 放入 Condition Queue 里面
    int savedState = fullyRelease(node); // 2. 释放当前线程所获取的所有的独占锁(PS: 独占的锁支持重入), 等等, 为什么要释放呢? 以为你调用 awaitUninterruptibly 方法的前提就是你已经获取了 独占锁
    boolean interrupted = false; // 3. 线程中断标识
    while(!isOnSyncQueue(node)){ // 4. 这里是一个 while loop, 调用 isOnSyncQueue 判断当前的 Node 是否已经被转移到 Sync Queue 里面
    LockSupport.park(this);            // 5. 若当前 node 不在 sync queue 里面, 则先 block 一下等待其他线程调用 signal 进行唤醒; (这里若有其他线程对当前线程进行 中断的换, 也能进行唤醒)
     if(Thread.interrupted()){         // 6. 判断这是唤醒是 signal 还是 interrupted(Thread.interrupted()会清楚线程的中断标记, 但没事, 我们有步骤7中的interrupted进行记录)
         interrupted = true;           // 7. 说明这次唤醒是被中断而唤醒的,这个标记若是true的话, 在 awiat 离开时还要 自己中断一下(selfInterrupt), 其他的函数可能需要线程的中断标识
     }
    
    }
    if(acquireQueued(node, savedState) || interrupted){ // 8. acquireQueued 返回 true 说明线程在 block 的过程中式被 inetrrupt 过(其实 acquireQueued 返回 true 也有可能其中有一次唤醒是 通过 signal)
     selfInterrupt();                 // 9. 自我中断, 外面的线程可以通过这个标识知道, 整个 awaitUninterruptibly 运行过程中 是否被中断过
    
    }
    }
  1. Condition 中断标示

下面两个是用于追踪 调用 awaitXXX 方法时线程有没有被中断过
主要的区别是

  1. REINTERRUPT: 代表线程是在 signal 后被中断的 (REINTERRUPT = re-interrupt 再次中断 最后会调用 selfInterrupt)
  2. THROW_IE: 代表在接受 signal 前被中断的, 则直接抛出异常 (Throw_IE = throw inner exception)

/**

  • For interruptible waits, we need to track whether to throw
  • InterruptedException, if interrupted while blocked on
  • condition, versus reinterrupt current thread, if
  • interrupted while blocked waiting to re-acquire
    /
    /*
  • 下面两个是用于追踪 调用 awaitXXX 方法时线程有没有被中断过
  • 主要的区别是
  • REINTERRUPT: 代表线程是在 signal 后被中断的 (REINTERRUPT = re-interrupt 再次中断 最后会调用 selfInterrupt)
  • THROW_IE: 代表在接受 signal 前被中断的, 则直接抛出异常 (Throw_IE = throw inner exception)
    /
    /** Mode meaning to reinterrupt on exit from wait
    /
    / 在离开 awaitXX方法, 退出前再次 自我中断 (调用 selfInterrupt)*/
    private static final int REINTERRUPT = 1;
    /
    Mode meaning to throw InterruptedException on exit from wait /
    /** 在离开 awaitXX方法, 退出前再次, 以为在 接受 signal 前被中断, 所以需要抛出异常
    /
    private static final int THROW_IE = -1;
  1. Condition 中断处理方法

该方法主要是查 在 awaitXX 方法中的这次唤醒是否是中断引起的, 若是中断引起的, 就进行 Node 的转移

/**

  • Checks for interrupt, returning THROW_IE if interrupted
  • before signalled, REINTERRUPT if after signalled, or
  • 0 if not interrupted
    /
    /*
  • 检查 在 awaitXX 方法中的这次唤醒是否是中断引起的
  • 若是中断引起的, 则将 Node 从 Condition Queue 转移到 Sync Queue 里面
  • 返回值的区别:
  • 0: 此次唤醒是通过 signal -> LockSupport.unpark
  • THROW_IE: 此次的唤醒是通过 interrupt, 并且 在 接受 signal 之前
  • REINTERRUPT: 线程的唤醒是 接受过 signal 而后又被中断
    */
    private int checkInterruptWhileWaiting(Node node){
    return Thread.interrupted() ?
    (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0;
    
    }
  1. Condition 中断处理方法 reportInterruptAfterWait

/**

  • Throws InterruptedException, reinterrupts current thread, or
  • does nothing, depending on mode
    /
    /*
  • 这个方法是在 awaitXX 方法离开前调用的, 主要是根据
  • interrupMode 判断是抛出异常, 还是自我再中断一下
    */
    private void reportInterruptAfterWait(int interrupMode) throws InterruptedException{
    if(interrupMode == THROW_IE){
     throw new InterruptedException();
    
    }
    else if(interrupMode == REINTERRUPT){
     selfInterrupt();
    
    }
    }
  1. Condition 释放锁 进行等待的方法 await

await 此方法响应中断请求, 当接受到中断请求后会将节点从 Condition Queue 转移到 Sync Queue

/**

  • Implements interruptible condition wait
    *
  • If current thread is interrupted, throw InterruptedException
  • Save lock state returned by {@link #getState()}
  • Invoke {@link #release(int)} with saved state as argument,
  • throwing IllegalMonitorStateException if it fails
  • Blocking until signalled or interrupted
  • Reacquire by invoking specifized version of
  • {@link #acquire(int)} with saved state as argument.
  • If interrupted while blocked in step 4, throw InterruptedException

  • *
  • @throws InterruptedException
    /
    /*
  • 支持 InterruptedException 的 await <- 注意这里即使是线程被中断,
  • 还是需要获取了独占的锁后, 再 调用 lock.unlock 进行释放锁
    */
    @Override
    public final void await() throws InterruptedException {
    if(Thread.interrupted()){ // 1. 判断线程是否中断
     throw new InterruptedException();
    
    }
    Node node = addConditionWaiter(); // 2. 将线程封装成一个 Node 放到 Condition Queue 里面, 其中可能有些清理工作
    int savedState = fullyRelease(node); // 3. 释放当前线程所获取的所有的锁 (PS: 调用 await 方法时, 当前线程是必须已经获取了独占的锁)
    int interruptMode = 0;
    while(!isOnSyncQueue(node)){ // 4. 判断当前线程是否在 Sync Queue 里面(这里 Node 从 Condtion Queue 里面转移到 Sync Queue 里面有两种可能 (1) 其他线程调用 signal 进行转移 (2) 当前线程被中断而进行Node的转移(就在checkInterruptWhileWaiting里面进行转移))
     LockSupport.park(this);                   // 5. 当前线程没在 Sync Queue 里面, 则进行 block
     if((interruptMode = checkInterruptWhileWaiting(node)) != 0){    // 6. 判断此次线程的唤醒是否因为线程被中断, 若是被中断, 则会在checkInterruptWhileWaiting的transferAfterCancelledWait 进行节点的转移; 返回值 interruptMode != 0
         break;                                                      // 说明此是通过线程中断的方式进行唤醒, 并且已经进行了 node 的转移, 转移到 Sync Queue 里面
     }
    
    }
    if(acquireQueued(node, savedState) && interruptMode != THROW_IE){ // 7. 调用 acquireQueued在 Sync Queue 里面进行 独占锁的获取, 返回值表明在获取的过程中有没有被中断过
     interruptMode = REINTERRUPT;
    
    }
    if(node.nextWaiter != null){ // clean up if cancelled // 8. 通过 "node.nextWaiter != null" 判断 线程的唤醒是中断还是 signal, 因为通过中断唤醒的话, 此刻代表线程的 Node 在 Condition Queue 与 Sync Queue 里面都会存在
     unlinkCancelledWaiters();                                  // 9. 进行 cancelled 节点的清除
    
    }
    if(interruptMode != 0){ // 10. "interruptMode != 0" 代表通过中断的方式唤醒线程
     reportInterruptAfterWait(interruptMode);                // 11. 根据 interruptMode 的类型决定是抛出异常, 还是自己再中断一下
    
    }
    }
  1. Condition 释放锁 进行等待的方法 awaitNanos

awaitNanos 具有超时功能, 与响应中断的功能, 不管中断还是超时都会 将节点从 Condition Queue 转移到 Sync Queue

/**

  • Impelemnts timed condition wait
    *
  • If current thread is interrupted, throw InterruptedException
  • Save lock state returned by {@link #getState()}
  • Invoke {@link #release(int)} with saved state as argument,
  • throwing IllegalMonitorStateException if it fails
  • Block until aignalled, interrupted, or timed out
  • Reacquire by invoking specified version of
  • {@link #acquire(int)} with saved state as argument
  • If interrupted while blocked in step 4, throw InterruptedException

  • /
    /*
  • 所有 awaitXX 方法其实就是
    1. 将当前的线程封装成 Node 加入到 Condition 里面
    1. 丢到当前线程所拥有的 独占锁,
    1. 等待 其他获取 独占锁的线程的唤醒, 唤醒从 Condition Queue 到 Sync Queue 里面, 进而获取 独占锁
    1. 最后获取 lock 之后, 在根据线程唤醒的方式(signal/interrupt) 进行处理
    1. 最后还是需要调用 lock./unlock 进行释放锁
      */
      @Override
      public final long awaitNanos(long nanosTimeout) throws InterruptedException {
      if(Thread.interrupted()){ // 1. 判断线程是否中断
      throw new InterruptedException();
      }
      Node node = addConditionWaiter(); // 2. 将线程封装成一个 Node 放到 Condition Queue 里面, 其中可能有些清理工作
      int savedState = fullyRelease(node); // 3. 释放当前线程所获取的所有的锁 (PS: 调用 await 方法时, 当前线程是必须已经获取了独占的锁)
      final long deadline = System.nanoTime() + nanosTimeout; // 4. 计算 wait 的截止时间
      int interruptMode = 0;
      while(!isOnSyncQueue(node)){ // 5. 判断当前线程是否在 Sync Queue 里面(这里 Node 从 Condtion Queue 里面转移到 Sync Queue 里面有两种可能 (1) 其他线程调用 signal 进行转移 (2) 当前线程被中断而进行Node的转移(就在checkInterruptWhileWaiting里面进行转移))
      if(nanosTimeout <= 0L){ // 6. 等待时间超时(这里的 nanosTimeout 是有可能 < 0),
       transferAfterCancelledWait(node);                 //  7. 调用 transferAfterCancelledWait 将 Node 从 Condition 转移到 Sync Queue 里面
       break;
      
      }
      if(nanosTimeout >= spinForTimeoutThreshold){ // 8. 当剩余时间 < spinForTimeoutThreshold, 其实函数 spin 比用 LockSupport.parkNanos 更高效
       LockSupport.parkNanos(this, nanosTimeout);       // 9. 进行线程的 block
      
      }
      if((interruptMode = checkInterruptWhileWaiting(node)) != 0){ // 10. 判断此次线程的唤醒是否因为线程被中断, 若是被中断, 则会在checkInterruptWhileWaiting的transferAfterCancelledWait 进行节点的转移; 返回值 interruptMode != 0
       break;                                                     // 说明此是通过线程中断的方式进行唤醒, 并且已经进行了 node 的转移, 转移到 Sync Queue 里面
      
      }
      nanosTimeout = deadline - System.nanoTime(); // 11. 计算剩余时间
      }

    if(acquireQueued(node, savedState) && interruptMode != THROW_IE){ // 12. 调用 acquireQueued在 Sync Queue 里面进行 独占锁的获取, 返回值表明在获取的过程中有没有被中断过

    interruptMode = REINTERRUPT;
    

    }
    if(node.nextWaiter != null){ // 13. 通过 "node.nextWaiter != null" 判断 线程的唤醒是中断还是 signal, 因为通过中断唤醒的话, 此刻代表线程的 Node 在 Condition Queue 与 Sync Queue 里面都会存在

    unlinkCancelledWaiters();                                      // 14. 进行 cancelled 节点的清除
    

    }
    if(interruptMode != 0){ // 15. "interruptMode != 0" 代表通过中断的方式唤醒线程

    reportInterruptAfterWait(interruptMode);                      // 16. 根据 interruptMode 的类型决定是抛出异常, 还是自己再中断一下
    

    }
    return deadline - System.nanoTime(); // 17 这个返回值代表是 通过 signal 还是 超时
    }

  1. Condition 释放锁 进行等待的方法 awaitUntil

/**

  • Implements absolute timed condition wait
  • If current thread is interrupted, throw InterruptedException
  • Save lock state returned by {@link #getState()}
  • Invoke {@link #release(int)} with saved state as argument,
  • throwing IllegalMonitorStateException if it fails
  • Block until signalled, interrupted, or timed out
  • Reacquire by invoking specialized version of
  • {@link #acquire(int)} with saved state as argument
  • if interrupted while blocked in step 4, throw InterruptedException
  • If timeed out while blocked in step 4, return false, else true

  • /
    /*
  • 所有 awaitXX 方法其实就是
    1. 将当前的线程封装成 Node 加入到 Condition 里面
    1. 丢到当前线程所拥有的 独占锁,
    1. 等待 其他获取 独占锁的线程的唤醒, 唤醒从 Condition Queue 到 Sync Queue 里面, 进而获取 独占锁
    1. 最后获取 lock 之后, 在根据线程唤醒的方式(signal/interrupt) 进行处理
    1. 最后还是需要调用 lock./unlock 进行释放锁
      *
  • awaitUntil 和 awaitNanos 差不多
    */
    @Override
    public boolean awaitUntil(Date deadline) throws InterruptedException {
    long abstime = deadline.getTime(); // 1. 判断线程是否中断
    if(Thread.interrupted()){

    throw new InterruptedException();
    

    }
    Node node = addConditionWaiter(); // 2. 将线程封装成一个 Node 放到 Condition Queue 里面, 其中可能有些清理工作
    int savedState = fullyRelease(node); // 3. 释放当前线程所获取的所有的锁 (PS: 调用 await 方法时, 当前线程是必须已经获取了独占的锁)
    boolean timeout = false;
    int interruptMode = 0;
    while(!isOnSyncQueue(node)){ // 4. 判断当前线程是否在 Sync Queue 里面(这里 Node 从 Condtion Queue 里面转移到 Sync Queue 里面有两种可能 (1) 其他线程调用 signal 进行转移 (2) 当前线程被中断而进行Node的转移(就在checkInterruptWhileWaiting里面进行转移))

    if(System.currentTimeMillis() > abstime){                          // 5. 计算是否超时
        timeout = transferAfterCancelledWait(node);                    //  6. 调用 transferAfterCancelledWait 将 Node 从 Condition 转移到 Sync Queue 里面
        break;
    }
    LockSupport.parkUntil(this, abstime);                              // 7. 进行 线程的阻塞
    if((interruptMode = checkInterruptWhileWaiting(node)) != 0){       // 8. 判断此次线程的唤醒是否因为线程被中断, 若是被中断, 则会在checkInterruptWhileWaiting的transferAfterCancelledWait 进行节点的转移; 返回值 interruptMode != 0
        break;                                                         // 说明此是通过线程中断的方式进行唤醒, 并且已经进行了 node 的转移, 转移到 Sync Queue 里面
    }
    

    }

    if(acquireQueued(node, savedState) && interruptMode != THROW_IE){ // 9. 调用 acquireQueued在 Sync Queue 里面进行 独占锁的获取, 返回值表明在获取的过程中有没有被中断过

    interruptMode = REINTERRUPT;
    

    }
    if(node.nextWaiter != null){ // 10. 通过 "node.nextWaiter != null" 判断 线程的唤醒是中断还是 signal, 因为通过中断唤醒的话, 此刻代表线程的 Node 在 Condition Queue 与 Sync Queue 里面都会存在

    unlinkCancelledWaiters();                                         // 11. 进行 cancelled 节点的清除
    

    }
    if(interruptMode != 0){ // 12. "interruptMode != 0" 代表通过中断的方式唤醒线程

    reportInterruptAfterWait(interruptMode);                        // 13. 根据 interruptMode 的类型决定是抛出异常, 还是自己再中断一下
    

    }

    return !timeout; // 13. 返回是否通过 interrupt 进行线程的唤醒
    }

  1. Condition 的 instrumentation 方法

/**

  • Returns true if this condition was created by the given
  • synchronization object
    /
    /**判断当前 condition 是否获取 独占锁
    /
    final boolean isOwnedBy(KAbstractOwnableSynchronizer sync){
    return sync == KAbstractQueuedSynchronizer.this;
    }

/**

  • Quires whether any threads are waiting on this condition
  • Implements {@link KAbstractOwnableSynchronizer#"hasWaiters(ConditionObject)}
    *
  • @return {@code true} if there are any waiting threads
  • @throws IllegalMonitorStateException if {@link #isHeldExclusively()}
  • returns {@code false}
    /
    /*
  • 查看 Condition Queue 里面是否有 CONDITION 的节点
    */
    protected final boolean hasWaiters(){
    if(!isHeldExclusively()){
     throw new IllegalMonitorStateException();
    
    }
    for(Node w = firstWaiter; w != null; w = w.nextWaiter ){
     if(w.waitStatus == Node.CONDITION){
         return true;
     }
    
    }
    return false;
    }

/**

  • Returns an estimate of the number of threads waiting on
  • this condition
  • Implements {@link KAbstractOwnableSynchronizer#"getWaitQueueLength()}
    *
  • @return the estimated number of waiting threads
  • @throws IllegalMonitorStateException if {@link #isHeldExclusively()}
  • return {@code false}
    /
    /*
  • 获取 Condition queue 里面的 CONDITION 的节点的个数
    */
    protected final int getWaitQueueLength(){
    if(!isHeldExclusively()){
     throw new IllegalMonitorStateException();
    
    }
    int n = 0;
    for(Node w = firstWaiter; w != null; w = w.nextWaiter){
     if(w.waitStatus == Node.CONDITION){
         ++n;
     }
    
    }
    return n;
    }

/**

  • Returns a collection containing those threads that may be
  • waiting on this Condition
  • Implements {@link KAbstractOwnableSynchronizer#'getWaitingThreads}
    *
  • @return the collection of thread
  • @throws IllegalMonitorStateException if {@link #isHeldExclusively()}
  • returns {@code false}
    /
    /*
  • 获取 等待的线程
    */
    protected final Collection getWaitingThreads(){
    if(!isHeldExclusively()){
     throw new IllegalMonitorStateException();
    
    }
    ArrayList list = new ArrayList<>();
    for(Node w = firstWaiter; w != null; w = w.nextWaiter){
     if(w.waitStatus == Node.CONDITION){
         Thread t = w.thread;
         if(t != null){
             list.add(t);
         }
     }
    
    }
    return list;
    }

希望对大家有所帮助,有用的话点赞给我支持!

相关文章
|
1月前
|
XML Java 编译器
Java注解的底层源码剖析与技术认识
Java注解(Annotation)是Java 5引入的一种新特性,它提供了一种在代码中添加元数据(Metadata)的方式。注解本身并不是代码的一部分,它们不会直接影响代码的执行,但可以在编译、类加载和运行时被读取和处理。注解为开发者提供了一种以非侵入性的方式为代码提供额外信息的手段,这些信息可以用于生成文档、编译时检查、运行时处理等。
65 7
|
4天前
|
监控 JavaScript 数据可视化
建筑施工一体化信息管理平台源码,支持微服务架构,采用Java、Spring Cloud、Vue等技术开发。
智慧工地云平台是专为建筑施工领域打造的一体化信息管理平台,利用大数据、云计算、物联网等技术,实现施工区域各系统数据汇总与可视化管理。平台涵盖人员、设备、物料、环境等关键因素的实时监控与数据分析,提供远程指挥、决策支持等功能,提升工作效率,促进产业信息化发展。系统由PC端、APP移动端及项目、监管、数据屏三大平台组成,支持微服务架构,采用Java、Spring Cloud、Vue等技术开发。
|
28天前
|
存储 JavaScript 前端开发
基于 SpringBoot 和 Vue 开发校园点餐订餐外卖跑腿Java源码
一个非常实用的校园外卖系统,基于 SpringBoot 和 Vue 的开发。这一系统源于黑马的外卖案例项目 经过站长的进一步改进和优化,提供了更丰富的功能和更高的可用性。 这个项目的架构设计非常有趣。虽然它采用了SpringBoot和Vue的组合,但并不是一个完全分离的项目。 前端视图通过JS的方式引入了Vue和Element UI,既能利用Vue的快速开发优势,
111 13
|
24天前
|
机器学习/深度学习 前端开发 算法
婚恋交友系统平台 相亲交友平台系统 婚恋交友系统APP 婚恋系统源码 婚恋交友平台开发流程 婚恋交友系统架构设计 婚恋交友系统前端/后端开发 婚恋交友系统匹配推荐算法优化
婚恋交友系统平台通过线上互动帮助单身男女找到合适伴侣,提供用户注册、个人资料填写、匹配推荐、实时聊天、社区互动等功能。开发流程包括需求分析、技术选型、系统架构设计、功能实现、测试优化和上线运维。匹配推荐算法优化是核心,通过用户行为数据分析和机器学习提高匹配准确性。
77 3
|
2月前
|
缓存 监控 Java
Java线程池提交任务流程底层源码与源码解析
【11月更文挑战第30天】嘿,各位技术爱好者们,今天咱们来聊聊Java线程池提交任务的底层源码与源码解析。作为一个资深的Java开发者,我相信你一定对线程池并不陌生。线程池作为并发编程中的一大利器,其重要性不言而喻。今天,我将以对话的方式,带你一步步深入线程池的奥秘,从概述到功能点,再到背景和业务点,最后到底层原理和示例,让你对线程池有一个全新的认识。
60 12
|
1月前
|
JavaScript 安全 Java
java版药品不良反应智能监测系统源码,采用SpringBoot、Vue、MySQL技术开发
基于B/S架构,采用Java、SpringBoot、Vue、MySQL等技术自主研发的ADR智能监测系统,适用于三甲医院,支持二次开发。该系统能自动监测全院患者药物不良反应,通过移动端和PC端实时反馈,提升用药安全。系统涵盖规则管理、监测报告、系统管理三大模块,确保精准、高效地处理ADR事件。
|
1月前
|
人工智能 移动开发 安全
家政上门系统用户端、阿姨端源码,java家政管理平台源码
家政上门系统基于互联网技术,整合大数据分析、AI算法和现代通信技术,提供便捷高效的家政服务。涵盖保洁、月嫂、烹饪等多元化服务,支持多终端访问,具备智能匹配、在线支付、订单管理等功能,确保服务透明、安全,适用于家庭生活的各种需求场景,推动家政市场规范化发展。
|
6天前
|
监控 Java
java异步判断线程池所有任务是否执行完
通过上述步骤,您可以在Java中实现异步判断线程池所有任务是否执行完毕。这种方法使用了 `CompletionService`来监控任务的完成情况,并通过一个独立线程异步检查所有任务的执行状态。这种设计不仅简洁高效,还能确保在大量任务处理时程序的稳定性和可维护性。希望本文能为您的开发工作提供实用的指导和帮助。
44 17
|
16天前
|
Java
Java—多线程实现生产消费者
本文介绍了多线程实现生产消费者模式的三个版本。Version1包含四个类:`Producer`(生产者)、`Consumer`(消费者)、`Resource`(公共资源)和`TestMain`(测试类)。通过`synchronized`和`wait/notify`机制控制线程同步,但存在多个生产者或消费者时可能出现多次生产和消费的问题。 Version2将`if`改为`while`,解决了多次生产和消费的问题,但仍可能因`notify()`随机唤醒线程而导致死锁。因此,引入了`notifyAll()`来唤醒所有等待线程,但这会带来性能问题。
Java—多线程实现生产消费者