通知/等待机制
存在这样一个场景,一个线程修改了一个对象的值,而另一个线程需要感知到变化后去做一些处理。这是一种典型的生产者和消费者模式,这种模式在功能层面可以实现解耦,体系结构上也具备良好的申缩性。
如何用多线程去实现这种呢?最简单的办法是让消费者线程不断地循环检查是否符合执行条件,例如下面的代码:
while (value != desire) { Thread.sleep(1000); } doSomething();
这种是非常 low 的写法,存在以下问题:
- 难以确保及时性:睡眠的时候基本不消化处理器资源,但是如果睡得太久,就不能及时发现变化。
- 难以降低开销:如果将睡眠时间降低,那么消费者可以更加迅速的感应到变化,但是却要消耗更多的处理器资源,造成浪费。
以上问题用这种方式似乎难以调和,不过 Java 通过内置的通知/等待机制就可以很好的解决这个问题。
等待和通知的相关方法
这些方法是任意 Java 对象都具备的,因为是被定义在最底层的 Object 方法上。
方法名称 | 描述 |
wait() | 调用该方法后,线程进入 WAITING 状态,只有等待别的线程通知或被中断后才能返回,调用此方法后,会释放当前线程持有的对象锁。 |
wait(long) | 超时等待一段时间,单位毫秒,如果过了这个时间还没有被通知,那么则会超时返回。 |
wait(long, int) | 也是超时等待,不过可以自定义单位,最小可以达到纳秒。 |
notify() | 通知一个在对象对象上等待的线程,使其从 wait 方法返回,而返回的前提是该线程获取到了对象的锁。 |
notifyAll() | 通知所有等待在该对象的线程。 |
通知等待机制,是指一个线程 A 调用了对象 O 的 wait 方法后进入等待状态(相当于线程 A 在执行 wait 这行代码后卡了),而另一个线程 B 调用了对象 O 的 notify 或 notifyAll 后,线程 A 可以收到通知,从对象 O 的 wait 方法返回(从刚才卡住的位置继续执行),进而执行后续的操作。调用对象 O 的 wait、notify 方法就像是开关信号一样,用来完成等待方和通知方之间的交互工作。
实战案例
public class WaitNotifyDemo1 { public static Integer value; public static final Object lock = new Object(); public static void main(String[] args) throws InterruptedException { new Thread(() -> { synchronized (lock) { if (value == null) { try { System.out.println("value 为空,开始等待..."); lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("接收到通知,线程继续执行,value = " + value); } }, "WaitThread").start(); TimeUnit.SECONDS.sleep(1); new Thread(() -> { synchronized (lock) { System.out.println(Thread.currentThread().getName() + " 获取到锁"); value = 10086; System.out.println("value 赋值完成!通知等待线程处理..."); lock.notify(); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println(Thread.currentThread().getName() + " 释放了锁"); } }, "NotifyThread").start(); } }
上述的代码需要注意的细节:
- 使用 wait、notify、notifyAll 时需要先对调用的对象加锁。
- 调用 wait 方法后,WaitThread 的状态由 RUNNING 转为 WAITING,并将当前的线程放在等待队列。
- NotifyThread 调用 notify 后,等待的线程依旧不会从 wait 方法返回,而是需要 NotifyThread 释放锁之后,等待的线程才有机会从 wait 放回(还要去竞争锁,不一定抢到)。
- notify 方法是将一个线程从等待队列移到同步队列,线程状态由 WAITING 变为 BLOCKED。而 notifyAll 是将等待队列中的所有线程移到同步队列,线程状态由 WAITING 变为 BLOCKED。
- 从 wait 方法返回的前提是获取到了对象的锁。
- wait、notify、notifyAll 必须依赖于 synchronized,即必须在同步方法/代码块中调用。
等待通知机制依托于同步机制 synchronized ,其目的是确保等待线程从 wait 方法返回时能够感知到其他线程对变量做出的修改。
使用可重入锁也可以实现上面的效果,代码如下:
public class WaitNotifyDemo2 { public static Integer value; public static final Lock lock = new ReentrantLock(); public static final Condition condition = lock.newCondition(); public static void main(String[] args) throws InterruptedException { new Thread(() -> { lock.lock(); try { if (value == null) { try { System.out.println("value 为空,开始等待..."); condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("接收到通知,线程继续执行,value = " + value); } finally { lock.unlock(); } }, "WaitThread").start(); TimeUnit.SECONDS.sleep(1); new Thread(() -> { lock.lock(); try { System.out.println(Thread.currentThread().getName() + " 获取到锁"); value = 10086; System.out.println("value 赋值完成!通知等待线程处理..."); condition.signal(); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println(Thread.currentThread().getName() + " 释放了锁"); } finally { lock.unlock(); } }, "NotifyThread").start(); } }
为什么 wait、notify 必须搭配 synchronized
首先,要明白,每个对象都可以被认为是一个监视器 Monitor,这个监视器由三部分组成(一个独占锁,一个同步队列,一个等待队列)。
注意是一个对象只能有一个独占锁,但是任意线程线程都可以拥有这个独占锁。
- 对于对象的非同步方法而言,任意时刻可以有任意个线程调用该方法,即普通方法同一时刻可以有多个线程调用。
- 对于对象的同步方法而言,只有拥有这个对象的独占锁才能调用这个同步方法。如果这个独占锁被其他线程占用,那么另外一个调用该同步方法的线程就会处于阻塞状态,此线程进入同步队列。
若一个拥有该独占锁的线程调用该对象同步方法的 wait 方法,则该线程会释放独占锁,并加入对象的等待队列。
某个线程调用 notify、notifyAll 方法是将等待队列的线程转移到同步队列,然后让他们竞争锁,所以这个调用线程本身必须拥有锁。
通知等待的范式
等待方
等待方遵循以下原则:
- 获取对象锁。
- 如果条件不满足,则调用对象锁的 wait 方法,被通知后仍要检查条件。
- 条件满足则执行对应的逻辑。
synchronized(对象) { while(条件不满足) { 对象.wait(); } 条件满足对应的处理逻辑 }
通知方
通知方遵循以下原则:
- 获取对象锁。
- 改变条件。
- 通知一个/所有等待在此对象锁上的线程。
synchronized(对象) { 改变条件 对象.notify(); // 对象.notifyAll(); }
Thread#join
如果线程 A 执行了线程 ThreadB.join();
语句,其含义是:当线程 A 等待 ThreadB 执行完毕后才从 ThreadB.join() 返回继续执行。
线程 Thread 除了提供 join 方法之外, 还提供了 join(long)、join(long, int) 两个具备超时属性的方法。表示如果超过了指定的时间,对应的线程还是没有终止,则会直接从超市方法中返回。
下面的程序将会利用 join 的特性,控制三个线程按照顺序执行依次输出 Main、A、B、C。
public class JoinDemo { public static void main(String[] args) { Thread main = Thread.currentThread(); Thread a = new Thread(() -> { try { main.join(); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println("A"); }, "A"); Thread b = new Thread(() -> { try { a.join(); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println("B"); }, "B"); Thread c = new Thread(() -> { try { b.join(); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println("C"); }, "C"); a.start(); b.start(); c.start(); System.out.println("Main"); } }
每个线程都有自己的前驱线程:Main -> A -> B -> C,当前线程终止的前提是前驱线程终止,等待前驱线程终止之后才能从 join 语句返回并继续执行自身逻辑,这里面也是涉及到了通知等待机制,查看 Thread join 方法的源码即可看到:
// 加锁当前线程对象 public final synchronized void join() throws InterruptedException { // 条件不满足,继续等待 while (isAlive()) { wait(0); } // 条件符合,方法返回 }
当线程终止的时候,会隐式调用线程自身的 notifyAll 方法,会通知所有等待在该线程上的线程,可以看到 join 的实现和刚刚我们总结的范式是一致的,即加锁、循环、处理逻辑。
具体是怎么隐式调用的?Java 代码中并没有找到对应的位置,其实这个逻辑在固化在了 JDK 底层,主要源码都在 src/hotspot/share/runtime/thread.cpp 文件中:
void Thread::call_run() { ... this->pre_run(); this->run(); this->post_run(); ... } void JavaThread::post_run() { // 线程将要退出 this->exit(false); ... } void JavaThread::exit(bool destroy_vm, ExitType exit_type) { ... // 线程将要退出 ensure_join(this); ... } static void ensure_join(JavaThread* thread) { ... // 调用自身的 notify_all 方法 lock.notify_all(thread); ... }
保证多个线程的执行顺序
刚刚我们介绍的 join 就可以控制多个线程的执行顺序,再回顾一下代码:
public class JoinDemo { public static void main(String[] args) { Thread main = Thread.currentThread(); Thread a = new Thread(() -> { try { main.join(); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println("A"); }, "A"); Thread b = new Thread(() -> { try { a.join(); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println("B"); }, "B"); a.start(); b.start(); System.out.println("Main"); } }
join 的本质其实还是内置的通知等待机制,所以我们使用原生的方式也是可以实现的。
public class JoinDemo2 { public static void main(String[] args) { Thread main = Thread.currentThread(); Thread a = new Thread(() -> { synchronized (main) { while (main.getState() != Thread.State.TERMINATED) { try { main.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } System.out.println("A"); }, "A"); Thread b = new Thread(() -> { synchronized (a) { while (a.getState() != Thread.State.TERMINATED) { try { a.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } System.out.println("B"); }, "B"); a.start(); b.start(); System.out.println("Main"); } }
每个线程执行完后会默认调用自身线程的 notifyAll 方法,所以我这个代码就没有明确调用了。