Java的wait()、notify()学习三部曲之一:JVM源码分析

简介: 阅读jvm源码,分析wait()和notify()的具体实现

欢迎访问我的GitHub

这里分类和汇总了欣宸的全部原创(含配套源码): https://github.com/zq2599/blog_demos

综述

  • Java的wait()、notify()学习三部曲由三篇文章组成,内容分别是:

一、通过阅读openjdk8的源码,分析和理解wait,notify在JVM中的具体执行过程;
二、修改JVM源码,编译构建成新的JVM,把我们感兴趣的参数打印出来,结合具体代码检查和我们的理解是否一致;
三、修改JVM源码,编译构建成新的JVM,按照我们的理解去修改关键参数,看能否达到预期效果;

  • 现在,咱们一起开始既漫长又深入的wait、notify学习之旅吧!

wait()和notify()的通常用法

  • Java多线程开发中,我们常用到wait()和notify()方法来实现线程间的协作,简单的说步骤如下:
  1. A线程取得锁,执行wait(),释放锁;
  2. B线程取得锁,完成业务后执行notify(),再释放锁;
  3. B线程释放锁之后,A线程取得锁,继续执行wait()之后的代码;

关于synchronize修饰的代码块

  • 通常,对于synchronize(lock){...}这样的代码块,编译后会生成monitorenter和monitorexit指令,线程执行到monitorenter指令时会尝试取得lock对应的monitor的所有权(CAS设置对象头),取得后即获取到锁,执行monitorexit指令时会释放monitor的所有权即释放锁;

一个完整的demo

  • 为了深入学习wait()和notify(),先用完整的demo程序来模拟场景吧,以下是源码:
public class NotifyDemo {

    private static void sleep(long sleepVal){
        try{
            Thread.sleep(sleepVal);
        }catch(Exception e){
            e.printStackTrace();
        }
    }

    private static void log(String desc){
        System.out.println(Thread.currentThread().getName() + " : " + desc);
    }

    Object lock = new Object();

    public void startThreadA(){
        new Thread(() -> {
            synchronized (lock){
                log("get lock");
                startThreadB();
                log("start wait");
                try {
                    lock.wait();
                }catch(InterruptedException e){
                    e.printStackTrace();
                }

                log("get lock after wait");
                log("release lock");
            }
        }, "thread-A").start();
    }

    public void startThreadB(){
        new Thread(()->{
            synchronized (lock){
                log("get lock");
                startThreadC();
                sleep(100);
                log("start notify");
                lock.notify();
                log("release lock");

            }
        },"thread-B").start();
    }

    public void startThreadC(){
        new Thread(() -> {
            synchronized (lock){
                log("get lock");
                log("release lock");
            }
        }, "thread-C").start();
    }

    public static void main(String[] args){
        new NotifyDemo().startThreadA();
    }
}
  • 以上就是本次实战用到的demo,代码功能简述如下:
  1. 启动线程A,取得锁之后先启动线程B再执行wait()方法,释放锁并等待;
  2. 线程B启动之后会等待锁,A线程执行wait()之后,线程B取得锁,然后启动线程C,再执行notify唤醒线程A,最后退出synchronize代码块,释放锁;
  3. 线程C启动之后就一直在等待锁,这时候线程B还没有退出synchronize代码块,锁还在线程B手里
  4. 线程A在线程B执行notify()之后就一直在等待锁,这时候线程B还没有退出synchronize代码块,锁还在线程B手里
  5. 线程B退出synchronize代码块,释放锁之后,线程A和线程C竞争锁;
  • 把上面的代码在Openjdk8下面执行,反复执行多次,都得到以下结果:
thread-A : get lock
thread-A : start wait
thread-B : get lock
thread-C : c thread is start
thread-B : start notify
thread-B : release lock
thread-A : after wait, acquire lock again
thread-A : release lock
thread-C : get lock
thread-C : release lock
  • 针对以上结果,问题来了:
  • 第一个问题:
  • 将以上代码反复执行多次,结果都是B释放锁之后A会先得到锁,这又是为什么呢?C为何不能先拿到锁呢?
  • 第二个问题:
  • 线程C自开始就执行了monitorenter指令,它能得到锁是容易理解的,但是线程A呢?在wait()之后并没有没有monitorenter指令,那么它又是如何取得锁的呢?
  • wait()、notify()这些方法都是native方法,所以只有从JVM源码寻找答案了,本次阅读的是openjdk8的源码;

带上问题去看JVM源码

  • 按照demo代码执行顺序,我整理了如下问题,带着这些问题去看JVM源码可以聚焦主线,不要被一些支线的次要的代码卡住(例如一些异常处理,监控和上报等):
  1. 线程A在wait()的时候做了什么?
  2. 线程C启动后,由于此时线程B持有锁,那么线程C此时在干啥?
  3. 线程B在notify()的时候做了什么?
  4. 线程B释放锁的时候做了什么?

源码中最重要的注释信息

  • 在源码中有段注释堪称是整篇文章最重要的说明,请大家始终记住这段信息,处处都用得上:

ObjectWaiter对象存在于WaitSet、EntryList、cxq等集合中,或者正在这些集合中移动

  • 原文如下:

这里写图片描述
请务必记住这三个集合:WaitSet、EntryList、cxq

  • 好了,接下来看源码分析问题吧:

线程A在wait()的时候做了什么

  • 打开hotspot/src/share/vm/runtime/objectMonitor.cpp,看ObjectMonitor::wait方法:

这里写图片描述

  • 如上图所示,有两处代码值得我们注意:
  1. 绿框中将当前线程包装成ObjectWaiter对象,并且状态为TS_WAIT,这里对应的是jstack看到的线程状态WAITING;
  2. 红框中调用了AddWaiter方法,跟进去看下:

这里写图片描述

  • 这个ObjectWaiter对象被放入了_WaitSet中,_WaitSet是个环形双向链表(circular doubly linked list)
  • 回到ObjectMonitor::wait方法接着往下看,会发现关键代码如下图,当前线程通过park()方法开始挂起(suspend):

这里写图片描述

  • 至此,我们把wait()方法要做的事情就理清了:
  1. 包装成ObjectWaiter对象,状态为TS_WAIT;
  2. ObjectWaiter对象被放入_WaitSet中;
  3. 当前线程挂起;

线程B持有锁的时候线程C在干啥

  • 此时的线程C无法进入synchronized{}代码块,用jstack看应该是BLOCKED状态,如下图:

这里写图片描述

  • 我们看看monitorenter指令对应的源码吧,位置:openjdk/hotspot/src/share/vm/interpreter/interpreterRuntime.cpp
IRT_ENTRY_NO_ASYNC(void, InterpreterRuntime::monitorenter(JavaThread* thread, BasicObjectLock* elem))
#ifdef ASSERT
  thread->last_frame().interpreter_frame_verify_monitor(elem);
#endif
  if (PrintBiasedLockingStatistics) {
    Atomic::inc(BiasedLocking::slow_path_entry_count_addr());
  }
  Handle h_obj(thread, elem->obj());
  assert(Universe::heap()->is_in_reserved_or_null(h_obj()),
         "must be NULL or an object");
  if (UseBiasedLocking) {
    // Retry fast entry if bias is revoked to avoid unnecessary inflation
    ObjectSynchronizer::fast_enter(h_obj, elem->lock(), true, CHECK);
  } else {
    ObjectSynchronizer::slow_enter(h_obj, elem->lock(), CHECK);
  }
  assert(Universe::heap()->is_in_reserved_or_null(elem->obj()),
         "must be NULL or an object");
#ifdef ASSERT
  thread->last_frame().interpreter_frame_verify_monitor(elem);
#endif
IRT_END
  • 上面的代码有个if (UseBiasedLocking)判断,是判断是否使用偏向锁的,本例中的锁显然已经不属于当前线程C了,所以我们还是直接看slow_enter(h_obj, elem->lock(), CHECK)方法吧;
  • 打开openjdk/hotspot/src/share/vm/runtime/synchronizer.cpp:
void ObjectSynchronizer::slow_enter(Handle obj, BasicLock* lock, TRAPS) {
  markOop mark = obj->mark();
  assert(!mark->has_bias_pattern(), "should not see bias pattern here");

  //是否处于无锁状态
  if (mark->is_neutral()) {
    // Anticipate successful CAS -- the ST of the displaced mark must
    // be visible <= the ST performed by the CAS.
    lock->set_displaced_header(mark);
    //无锁状态就去竞争锁
    if (mark == (markOop) Atomic::cmpxchg_ptr(lock, obj()->mark_addr(), mark)) {
      TEVENT (slow_enter: release stacklock) ;
      return ;
    }
    // Fall through to inflate() ...
  } else
  if (mark->has_locker() && THREAD->is_lock_owned((address)mark->locker())) {
    //如果处于有锁状态,就检查是不是当前线程持有锁,如果是当前线程持有的,就return,然后就能执行同步代码块中的代码了
    assert(lock != mark->locker(), "must not re-lock the same lock");
    assert(lock != (BasicLock*)obj->mark(), "don't relock with same BasicLock");
    lock->set_displaced_header(NULL);
    return;
  }

#if 0
  // The following optimization isn't particularly useful.
  if (mark->has_monitor() && mark->monitor()->is_entered(THREAD)) {
    lock->set_displaced_header (NULL) ;
    return ;
  }
#endif

  // The object header will never be displaced to this lock,
  // so it does not matter what the value is, except that it
  // must be non-zero to avoid looking like a re-entrant lock,
  // and must not look locked either.
  lock->set_displaced_header(markOopDesc::unused_mark());
  //锁膨胀
  ObjectSynchronizer::inflate(THREAD, obj())->enter(THREAD);
}
  • 线程C在上面代码中的执行顺序如下:
  1. 判断是否是无锁状态,如果是就通过Atomic::cmpxchg_ptr去竞争锁;
  2. 不是无锁状态,就检查当前锁是否是线程C持有;
  3. 不是线程C持有,调用inflate方法开始锁膨胀;
  • ObjectSynchronizer::inflate(THREAD, obj())->enter(THREAD);
  • 来看看锁膨胀的源码:

这里写图片描述

  • 如上图,锁膨胀的代码太长,我们这里只看关键代码吧:
  • 红框中,如果当前状态已经是重量级锁,就通过mark->monitor()方法取得ObjectMonitor指针再返回;
  • 绿框中,如果还不是重量级锁,就检查是否处于膨胀中状态(其他线程正在膨胀中),如果是膨胀中,就调用ReadStableMark方法进行等待,ReadStableMark方法执行完毕后再通过continue继续检查,ReadStableMark方法中还会调用os::NakedYield()释放CPU资源;
  • 如果红框和绿框的条件都没有命中,目前已经是轻量级锁了(不是重量级锁并且不处于锁膨胀状态),可以开始膨胀了,如下图:

这里写图片描述

  • 简单来说,锁膨胀就是通过CAS将监视器对象OjectMonitor的状态设置为INFLATING,如果CAS失败,就在此循环,再走前一副图中的的红框和绿框中的判断,如果CAS设置成功,会继续设置ObjectMonitor中的header、owner等字段,然后inflate方法返回监视器对象OjectMonitor;
  • 看看之前slow_enter方法中,调用inflate方法的代码如下:
ObjectSynchronizer::inflate(THREAD, obj())->enter(THREAD);
  • 所以inflate方法返回监视器对象OjectMonitor之后,会立刻执行OjectMonitor的enter方法,这个方法中开始竞争锁了,方法在openjdk/hotspot/src/share/vm/runtime/objectMonitor.cpp文件中:

这里写图片描述

  • 如上图,红框中表示OjectMonitor的enter方法一进来就通过CAS将OjectMonitor的_owner设置为当前线程,绿框中表示设置成功的逻辑,第一个if表示重入锁的逻辑,第二个if表示第一次设置_owner成功,都意味着竞争锁成功,而我们的线程C显然是竞争失败的,会进入下图中的无线循环,反复调用EnterI方法:

这里写图片描述

  • 进入EnterI方法看看:

这里写图片描述

  • 如上图,首先构造一个ObjectWaiter对象node,后面的for(;;)代码块中来是一段非常巧妙的代码,同一时刻可能有多个线程都竞争锁失败走进这个EnterI方法,所以在这个for循环中,用CAS将_cxq地址放入node的_next,也就是把node放到_cxq队列的首位,如果CAS失败,就表示其他线程把node放入到_cxq的首位了,所以通过for循环再放一次,只要成功,此node就一定在最新的_cxq队列的首位。
  • 接下来的代码又是一个无限循环,如下图:

这里写图片描述

  • 从上图可以看出,进入循环后先调用TryLock方法竞争一次锁,如果成功了就退出循环,否则就调用Self->_ParkEvent->park方法使线程挂起,这里有自旋锁的逻辑,也就是park方法带了时间参数,就会在挂起一段时间后自动唤醒,如果不是自旋的条件,就一直挂起等待被其他条件唤醒,线程被唤醒后又会执行TryLock方法竞争一次锁,竞争不到继续这个for循环;
  • 到这里我们已经把线程C在BLOCK的时候的逻辑理清楚了,小结如下:
  1. 偏向锁逻辑,未命中;
  2. 如果是无锁状态,就通过CAS去竞争锁,此处由于锁已经被线程B持有,所以不是无锁状态;
  3. 不是无锁状态,而且锁不是线程C持有,执行锁膨胀,构造OjectMonitor对象;
  4. 竞争锁,竞争失败就将线程加入_cxq队列的首位;
  5. 开始无限循环,竞争锁成功就退出循环,竞争失败线程挂起,等待被唤醒后继续竞争;

线程B在notify()的时候做了什么

  • 接下来该线程B执行notify了,代码是objectMonitor.cpp的ObjectMonitor::notify方法:

这里写图片描述

  • 如上图所示,首先是Policy的赋值,其次是调用DequeueWaiter()方法将_WaitSet队列的第一个值取出并返回,还记得_WaitSet么?所有wait的线程都被包装成ObjectWaiter对象然后放进来了;
  • 接下来对ObjectWaiter对象的处理方式,根据Policy的不同而不同:

Policy == 0:放入_EntryList队列的排头位置;
Policy == 1:放入_EntryList队列的末尾位置;
Policy == 2:_EntryList队列为空就放入_EntryList,否则放入_cxq队列的排头位置;

这里写图片描述

  • 如上图所示,请注意把ObjectWaiter的地址写到_cxq变量的时候要用CAS操作,因为此时可能有其他线程正在竞争锁,竞争失败的时候会将自己包装成ObjectWaiter对象加入到_cxq中;
  • 这里的代码有一处疑问,期待着读着您的指教:如果_EntryList为空,就把ObjectWaiter放入ObjectWaiter中,为什么要这样做呢?
  • Policy == 3:放入_cxq队列中,末尾位置;更新_cxq变量的值的时候,同样要通过CAS注意并发问题;
  • 这里有一段很巧妙的代码,现将_cxq保存在Tail中,正常情况下将ObjectWaiter赋值给Tail->_next就可以了,但是此时有可能其他线程正在_cxq的尾部追加数据了,所以此时Tail对象对应的记录就不是最后一条了,那么它的_next就非空了,一旦发生这种情况,就执行Tail = Tail->_next,这样就获得了最新的_cxq的尾部数据,如下图所示:

这里写图片描述

  • Policy等于其他值,立即唤醒ObjectWaiter对应的线程;
  • 小结一下,线程B执行notify时候做的事情:
  1. 执行过wait的线程都在队列_WaitSet中,此处从_WaitSet中取出第一个;
  2. 根据Policy的不同,将这个线程放入_EntryList或者_cxq队列中的起始或末尾位置;

线程B释放锁的时候做了什么

  • 接下来到了揭开问题的关键了,我们来看objectMonitor.cpp的ObjectMonitor::exit方法;

这里写图片描述

  • 如上图,方法一进来先做一些合法性判断,接下来如红框所示,是偏向锁逻辑,偏向次数减一后直接返回,显然线程B在此处不会返回,而是继续往下执行;
  • 根据QMode的不同,有不同的处理方式:
  1. QMode = 2,并且_cxq非空:取_cxq队列排头位置的ObjectWaiter对象,调用ExitEpilog方法,该方法会唤醒ObjectWaiter对象的线程,此处会立即返回,后面的代码不会执行了;
  2. QMode = 3,并且_cxq非空:把_cxq队列首元素放入_EntryList的尾部;
  3. QMode = 4,并且_cxq非空:把_cxq队列首元素放入_EntryList的头部;
  4. QMode = 0,不做什么,继续往下看;
  • 只有QMode=2的时候会提前返回,等于0、3、4的时候都会继续往下执行:
  • 如果_EntryList的首元素非空,就取出来调用ExitEpilog方法,该方法会唤醒ObjectWaiter对象的线程,然后立即返回;
  • 如果_EntryList的首元素为空,就取_cxq的首元素,放入_EntryList,然后再从_EntryList中取出来执行ExitEpilog方法,然后立即返回;
  • 以上操作,均是执行过ExitEpilog方法然后立即返回,如果取出的元素为空,就执行循环继续取;
  • 小结一下,线程B释放了锁之后,执行的操作如下:
  1. 偏向锁逻辑,此处未命中;
  2. 根据QMode的不同,将ObjectWaiter从_cxq或者_EntryList中取出后唤醒;
  3. 唤醒的元素会继续执行挂起前的代码,按照我们之前的分析,线程唤醒后,就会通过CAS去竞争锁,此时由于线程B已经释放了锁,那么此时应该能竞争成功;
  • 到了现在已经将之前的几个问题搞清了,汇总起来看看:
  1. 线程A在wait() 后被加入了_WaitSet队列中;
  2. 线程C被线程B启动后竞争锁失败,被加入到_cxq队列的首位;
  3. 线程B在notify()时,从_WaitSet中取出第一个,根据Policy的不同,将这个线程放入_EntryList或者_cxq队列中的起始或末尾位置;
  4. 根据QMode的不同,将ObjectWaiter从_cxq或者_EntryList中取出后唤醒;;
  • 所以,最初的问题已经清楚了,wait()的线程被唤醒后,会进入一个队列,然后JVM会根据Policy和QMode的不同对队列中的ObjectWaiter做不同的处理,被选中的ObjectWaiter会被唤醒,去竞争锁;
  • 至此,源码分析已结束,但是因为我们不知道Policy和QMode参数到底是多少,所以还不能对之前的问题有个明确的结果,这些还是留在下一章来解答吧,下一章里我们去修改JVM源码,把参数都打印出来;

欢迎关注阿里云开发者社区博客:程序员欣宸

学习路上,你不孤单,欣宸原创一路相伴...
相关文章
|
2天前
|
设计模式 消息中间件 算法
【实习总结】Java学习最佳实践!
【实习总结】Java学习最佳实践!
22 3
|
2天前
|
数据采集 安全 Java
Java并发编程学习12-任务取消(上)
【5月更文挑战第6天】本篇介绍了取消策略、线程中断、中断策略 和 响应中断的内容
30 4
Java并发编程学习12-任务取消(上)
|
1天前
|
NoSQL 算法 Java
【redis源码学习】持久化机制,java程序员面试算法宝典pdf
【redis源码学习】持久化机制,java程序员面试算法宝典pdf
|
1天前
|
Java 数据库连接 Spring
K8S+Docker理论与实践深度集成java面试jvm原理
K8S+Docker理论与实践深度集成java面试jvm原理
|
2天前
|
存储 算法 搜索推荐
滚雪球学Java(27):从零开始学习数组:定义和初始化
【5月更文挑战第2天】🏆本文收录于「滚雪球学Java」专栏,专业攻坚指数级提升,希望能够助你一臂之力,帮你早日登顶实现财富自由🚀;同时,欢迎大家关注&&收藏&&订阅!持续更新中,up!up!up!!
10 3
|
2天前
|
Java
Java一分钟:线程协作:wait(), notify(), notifyAll()
【5月更文挑战第11天】本文介绍了Java多线程编程中的`wait()`, `notify()`, `notifyAll()`方法,它们用于线程间通信和同步。这些方法在`synchronized`代码块中使用,控制线程执行和资源访问。文章讨论了常见问题,如死锁、未捕获异常、同步使用错误及通知错误,并提供了生产者-消费者模型的示例代码,强调理解并正确使用这些方法对实现线程协作的重要性。
16 3
|
2天前
|
缓存 Java 数据库
Java并发编程学习11-任务执行演示
【5月更文挑战第4天】本篇将结合任务执行和 Executor 框架的基础知识,演示一些不同版本的任务执行Demo,并且每个版本都实现了不同程度的并发性。
34 4
Java并发编程学习11-任务执行演示
|
2天前
|
Java 索引
深入浅出JVM(五)之Java中方法调用
深入浅出JVM(五)之Java中方法调用
|
2天前
|
数据库连接
java+ssm+vue代码视频学习讲解
java+ssm+vue代码视频学习讲解
10 0
|
2天前
|
Java 编译器 对象存储
java一分钟之Java入门:认识JDK与JVM
【5月更文挑战第7天】本文介绍了Java编程的基础——JDK和JVM。JDK是包含编译器、运行时环境、类库等的开发工具包,而JVM是Java平台的核心,负责执行字节码并实现跨平台运行。常见问题包括版本不匹配、环境变量配置错误、内存溢出和线程死锁。解决办法包括选择合适JDK版本、正确配置环境变量、调整JVM内存参数和避免线程死锁。通过代码示例展示了JVM内存管理和基本Java程序结构,帮助初学者更好地理解JDK和JVM在Java编程中的作用。
22 0