jdk11源码--ReentrantLock之Condition源码分析

简介: jdk11 ReentrantLock之Condition源码分析

概述

jdk11源码-ReentrantLock源码一文中分析了ReentrantLock源码。里面有讲述在多个线程加入队列时的AQS内部状态:
在这里插入图片描述
==简单来说:condition的await和signal操作就是将node节点在这两个队列中转移的过程,这里重点关注waitstatus和nextwaiter两个字段。后面会逐行代码分析==
在这里插入图片描述

创建Condition

一个ReentrantLock可以创建多个Condition
Condition condition = lock.newCondition();
实际是创建一个ConditionObject对象,ConditionObject的定义在AbstractQueuedSynchronizer中。

nextWaiter

在之前的文章中介绍了,一个node对象中有两个重要的对象属性:

volatile int waitStatus;
Node nextWaiter;

waitStatus已经在jdk11源码-ReentrantLock源码一文中讲述。这里着重说一下nextWaiter的含义。
nextWaiter总共有三种类型的值:

  • Node.SHARED 共享模式
  • Node.EXCLUSIVE 独占模式
  • condition队列中下一个等待节点

condition queues condition队列仅在独占模式有效,使用一个简单的链接队列来保存因condition而等待的节点(线程)。这些节点可以被转移到AQS队列中重新获取锁。

awit

condition.await();使当前线程阻塞

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();//将waiter加入到condition等待队列(condition queue)
    int savedState = fullyRelease(node);//将当前线程占用的state锁资源全部释放。目的是为了将该线程从AQS队列中移除。
    int interruptMode = 0;
    //isOnSyncQueue:在AQS队列中返回true,否则返回false
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);//执行这里,说明当期线程不在AQS队列中,则需要被park挂起。

        //这里是被唤醒后的执行逻辑
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)//检查在park期间,是否被中断,根据具体情况来决定抛异常还是继续中断
            break;
    }
    
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // 清除被取消的节点
        unlinkCancelledWaiters();
    if (interruptMode != 0)//不等于0,表示park期间被中断过
        //当前线程被唤醒后,通过interruptMode来决定是继续中断还是抛异常
        reportInterruptAfterWait(interruptMode);
}

//将waiter加入到等待队列
private Node addConditionWaiter() {
    if(!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node t = lastWaiter;
    // 如果 lastWaiter 是取消状态(waitStatus != Node.CONDITION),那么将其清除
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();//删除condition队列中所有被取消的节点
        t = lastWaiter;
    }

    //新建一个waitStatus是Node.CONDITION的节点,表示当前节点(线程)的等待状态是condition。
    Node node = new Node(Node.CONDITION);
    
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

addConditionWaiter方法是将当前线程加入到condition的等待队列,lastWaiter 指向当前最新加入的node。

final int fullyRelease(Node node) {
    try {
        int savedState = getState();
        if (release(savedState))
            return savedState;
        throw new IllegalMonitorStateException();
    } catch (Throwable t) {
        node.waitStatus = Node.CANCELLED;
        throw t;
    }
}

fullyRelease方法有必要看一下,他首先获取state的值savedState,然后执行release(savedState),release方法之前已经分析过了,他会释放savedState数量的资源。这里也就是将当前线程锁定的资源全部释放,即当前线程释放独占锁。
也比较容易理解,condition只与独占锁配合使用,一个线程获取锁,但是被condition await以后挂起,肯定需要释放锁,其他线程才有机会获取锁而继续执行。

==根据前面的分析,condition的await会将线程node添加到condition队列,然后从AQS队列移除。在signal时,会将该线程node添加到AQS阻塞队列中。有关signal后面会分析。==
在释放了独占锁以后,就通过isOnSyncQueue方法循环检查当前线程(node)是否在AQS队列中,如果已经从AQS队列中移除,那么就可以放心的将其park了。

在park期间,线程有可能被中断或者unpark唤醒。那么就要判断接下来是需要继续park还是抛异常还是去重新获取锁。

final boolean isOnSyncQueue(Node node) {
    //如果当前线程node的状态是CONDITION或者node.prev为null时说明已经在Condition队列中了,所以返回false;
    //如果node添加到AQS阻塞队列中,那么他的waitstats会被初始化为0,或者被修改为-1,-3,肯定不是condition(-2)
    //如果node添加到AQS阻塞队列中,那么他的prev肯定不为空,至少也是head节点
    if (node.waitStatus == Node.CONDITION || node.prev == null) return false;
    //如果node有后继节点,那么他肯定在队列中。因为前面分析了,condition队列是不会设置next字段值的
    if (node.next != null) return true;
    
    /*
    * 执行到这里,说明node的waitStatus 不是CONDITION ,prev肯定也不是null。并且next肯定为null。
    * 第一次看这个代码,肯定会蒙圈,怎么可能会出现这种不一致的情况呢
    * 这种情况是因为在将node添加到AQS阻塞队列时,采用的CAS策略。CAS就有可能失败,所以会出现这种临时的不一致行为。
    * 在下面分析signal时会看到,signal会调用AbstractQueuedSynchronizer#enq方法,这个方法会先设置prev,然后再CAS设置tail和next。
    */
    return findNodeFromTail(node);
}

//新添加的节点都是加在队尾,所以从后向前找效率更高
private boolean findNodeFromTail(Node node) {
    for (Node p = tail;;) {
        if (p == node) return true;
        if (p == null) return false;
        p = p.prev;
    }
}

大家可以看到在AQS分析过程中个,大部分遍历循环查找都是从tail开始向前查找的。这是因为新加入的节点都加在队尾,从后往前找效率更高。

await方法中包含被唤醒后的执行逻辑,这个在分析为signal以后再看。

signal

public final void signal() {
    //isHeldExclusively()就一句话,判断互斥锁是不是当前线程加的。getExclusiveOwnerThread() == Thread.currentThread()
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)//唤醒condition队列中的第一个等待节点
        doSignal(first);
}

//从condition队列头往后找到一个没有被取消的节点,对其进行唤醒操作。
private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) && (first = firstWaiter) != null);
    //上面while条件表示:移除失败,并且condition队列还有后继节点
}

//将node节点从condition队列移动到AQS阻塞队列,成功返回true。
final boolean transferForSignal(Node node) {
    //设置失败,说明node被取消了。返回false,那么在上层循环中会继续查找下一个节点
    //设置成功,则node的waitstatus=0,再加上上面他的nextWaiter 被设置为null,也就是从condition队列中移除了。
    if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
        return false;

    //将node以CAS方式添加到AQS队尾
    //注意这里返回的p是老的队尾,也就是新加入node的前驱节点
    Node p = enq(node);
    int ws = p.waitStatus;
    //ws > 0说明前驱节点被取消;
    //ws<=0,那么需要将新加入节点的前驱节点waitstatus设置为Node.SIGNAL
    if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);//唤醒
    return true;
}
  • doSignal方法中,第一次循环时,first执行condition队列的头结点,firstWaiter 指向第二个节点。如果后面没有节点了,那么将lastWaiter 设置为null。然后将first的nextWaiter 设置为null,因为他马上就从condition队列中移除了。
  • 当移除失败,并且condition队列还有后继节点时会进行循环查找下一个没有被取消的节点进行唤醒。
  • 通过代码分析,从condition队列中移除的操作分两步:一个是将node节点的nextWaiter设置为null,第二是将他的waitStatus设置为初始值0。

一句话总结唤醒操作signal的工作:将线程node从condition队列转移到AQS队列中。

signal唤醒后的操作

从代码中可以看到,其实signal只是将node从condition队列移动到AQS队列,并没有主动调用LockSupport.unpark方法,还是依赖于AQS自身的机制真正unpark。

先看一下有哪些情况会让线程停止park继续往下执行:

  1. 另一个线程调用了signal方法,将node节点从condition队列转移到AQS阻塞队列,然后获取了锁(unpark)
  2. 另外一个线程对这个线程进行了中断
  3. 上面signal中的transferForSignal方法中有提到:前驱节点被取消 或者 修改AQS队列的前驱节点waitstatus设置为Node.SIGNAL时失败,这两种情况会调用unpark。

回到await方法,看唤醒后继续执行的逻辑

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        
        //唤醒后继续执行
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

/*
* 如果中断了,检查中断是发送在signalled之前还是之后,
* 如果中断发生在signalled之前,返回THROW_IE
* 如果中断发生在signalled之后,返回REINTERRUPT
* 没有中断返回0
* 
* 注意:这里如果中断了,会首先会尝试将节点转移到AQS阻塞队列
*/
private int checkInterruptWhileWaiting(Node node) {
   return Thread.interrupted() ?
       (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
       0;
}

/*
* 该方法只有线程中断才会执行
* 尝试将节点转移到AQS队列。如果在signalled前取消了线程,返回true,上层checkInterruptWhileWaiting方法会抛出异常标识
* 
*/
final boolean transferAfterCancelledWait(Node node) {
    //如果这里CAS成功,说明node节点当前的状态时CONDITION,也就是说中断操作是在signalled前发生的。
    //因为:1、只有中断才会调用这个方法。2、上面doSignal方法中会在signal过程中将其状态由CONDITION修改为0。
   if (node.compareAndSetWaitStatus(Node.CONDITION, 0)) {
       enq(node);//将node节点加在AQS阻塞队列的队尾
       return true;
   }

    /*
    * 执行到这里说明上面CAS失败,说明中断发生在signalled之后。
    * signal方法中会将node以CAS方式添加到AQS队尾,执行enq方法,但是走到这里时enq方法可能还没有执行完,也就是node节点还没有完全加入到AQS队列中,所以这里自旋等待其完成。
    */
   while (!isOnSyncQueue(node))
       Thread.yield();
   return false;
}

上面将node从condition队列转移到AQS阻塞队列中以后,就开始尝试获取锁【==注意:无论是否中断,都会将其转移到AQS阻塞队列==】。

if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
    interruptMode = REINTERRUPT;//获取锁过程中被中断过,并且interruptMode 不是抛异常,那么重新设置中断标识。

acquireQueued方法用于获取锁资源。由于在await的时候,释放了savedState数量的资源,所以这里需要获取savedState数量的资源。acquireQueued返回结果是是否中断过,这里的中断是指在获取锁资源的过程中是否被中断过,与interruptMode 无关。
获取锁过程中被中断过,并且interruptMode 不是抛异常,那么重新设置中断标识。

接下来执行

if (node.nextWaiter != null) 
    unlinkCancelledWaiters();//清理condition队列中的被清除的节点

注意这里是清理的condition队列中的被取消的节点。
那么什么时候node.nextWaiter 不为空呢?在doSignal方法中会执行first.nextWaiter = null;一行代码,正常情况下来说,nextWaiter 应该都为空才对。
注意这里只是正常情况,别忘了被中断的情况,如果在signal之前就被中断了,执行了上面的acquireQueued方法,转移到了AQS队列中,此时该node节点的nextWaiter 不为空。但是他的waitstatus已经被设置为0.
在这里插入图片描述
unlinkCancelledWaiters比较简单,就是遍历condition队列,清楚被取消(waitStatus != Node.CONDITION))的节点。因为condition队列中的所有节点的waitStatus 肯定都是Node.CONDITION。

private void unlinkCancelledWaiters() {
    Node t = firstWaiter;
    Node trail = null;
    while (t != null) {
        Node next = t.nextWaiter;
        if (t.waitStatus != Node.CONDITION) {//waitStatus != Node.CONDITION)  就是被取消的节点
            t.nextWaiter = null;
            if (trail == null)
                firstWaiter = next;
            else
                trail.nextWaiter = next;
            if (next == null)
                lastWaiter = trail;
        }
        else
            trail = t;
        t = next;
    }
}

最后一步就是根据checkInterruptWhileWaiting方法返回的interruptMode中断标记,来决定抛异常还是给当前线程设置中断标记

private void reportInterruptAfterWait(int interruptMode)
    throws InterruptedException {
    if (interruptMode == THROW_IE)
        throw new InterruptedException();
    else if (interruptMode == REINTERRUPT)
        selfInterrupt();
}

好了 ,condition分析完毕,还是有点复杂度的。

相关文章
|
3天前
|
数据采集 人工智能 Java
Java产科专科电子病历系统源码
产科专科电子病历系统,全结构化设计,实现产科专科电子病历与院内HIS、LIS、PACS信息系统、区域妇幼信息平台的三级互联互通,系统由门诊系统、住院系统、数据统计模块三部分组成,它管理了孕妇从怀孕开始到生产结束42天一系列医院保健服务信息。
15 4
|
10天前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
35 2
|
1月前
|
Java Apache Maven
Java百项管理之新闻管理系统 熟悉java语法——大学生作业 有源码!!!可运行!!!
文章提供了使用Apache POI库在Java中创建和读取Excel文件的详细代码示例,包括写入数据到Excel和从Excel读取数据的方法。
60 6
Java百项管理之新闻管理系统 熟悉java语法——大学生作业 有源码!!!可运行!!!
|
14天前
|
人工智能 监控 数据可视化
Java智慧工地信息管理平台源码 智慧工地信息化解决方案SaaS源码 支持二次开发
智慧工地系统是依托物联网、互联网、AI、可视化建立的大数据管理平台,是一种全新的管理模式,能够实现劳务管理、安全施工、绿色施工的智能化和互联网化。围绕施工现场管理的人、机、料、法、环五大维度,以及施工过程管理的进度、质量、安全三大体系为基础应用,实现全面高效的工程管理需求,满足工地多角色、多视角的有效监管,实现工程建设管理的降本增效,为监管平台提供数据支撑。
32 3
|
19天前
|
运维 自然语言处理 供应链
Java云HIS医院管理系统源码 病案管理、医保业务、门诊、住院、电子病历编辑器
通过门诊的申请,或者直接住院登记,通过”护士工作站“分配患者,完成后,进入医生患者列表,医生对应开具”长期医嘱“和”临时医嘱“,并在电子病历中,记录病情。病人出院时,停止长期医嘱,开具出院医嘱。进入出院审核,审核医嘱与住院通过后,病人结清缴费,完成出院。
57 3
|
25天前
|
JavaScript Java 项目管理
Java毕设学习 基于SpringBoot + Vue 的医院管理系统 持续给大家寻找Java毕设学习项目(附源码)
基于SpringBoot + Vue的医院管理系统,涵盖医院、患者、挂号、药物、检查、病床、排班管理和数据分析等功能。开发工具为IDEA和HBuilder X,环境需配置jdk8、Node.js14、MySQL8。文末提供源码下载链接。
|
28天前
|
移动开发 前端开发 JavaScript
java家政系统成品源码的关键特点和技术应用
家政系统成品源码是已开发完成的家政服务管理软件,支持用户注册、登录、管理个人资料,家政人员信息管理,服务项目分类,订单与预约管理,支付集成,评价与反馈,地图定位等功能。适用于各种规模的家政服务公司,采用uniapp、SpringBoot、MySQL等技术栈,确保高效管理和优质用户体验。
|
1月前
|
JSON 前端开发 Java
震惊!图文并茂——Java后端如何响应不同格式的数据给前端(带源码)
文章介绍了Java后端如何使用Spring Boot框架响应不同格式的数据给前端,包括返回静态页面、数据、HTML代码片段、JSON对象、设置状态码和响应的Header。
141 1
震惊!图文并茂——Java后端如何响应不同格式的数据给前端(带源码)
|
1月前
|
Java
Java基础之 JDK8 HashMap 源码分析(中间写出与JDK7的区别)
这篇文章详细分析了Java中HashMap的源码,包括JDK8与JDK7的区别、构造函数、put和get方法的实现,以及位运算法的应用,并讨论了JDK8中的优化,如链表转红黑树的阈值和扩容机制。
27 1
|
1月前
|
存储 前端开发 Java
Java后端如何进行文件上传和下载 —— 本地版(文末配绝对能用的源码,超详细,超好用,一看就懂,博主在线解答) 文件如何预览和下载?(超简单教程)
本文详细介绍了在Java后端进行文件上传和下载的实现方法,包括文件上传保存到本地的完整流程、文件下载的代码实现,以及如何处理文件预览、下载大小限制和运行失败的问题,并提供了完整的代码示例。
522 1
下一篇
无影云桌面