Java线程通信的精髓:解析通知等待机制的工作原理

本文涉及的产品
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
云解析 DNS,旗舰版 1个月
简介: Java线程通信的精髓:解析通知等待机制的工作原理

通知/等待机制

存在这样一个场景,一个线程修改了一个对象的值,而另一个线程需要感知到变化后去做一些处理。这是一种典型的生产者和消费者模式,这种模式在功能层面可以实现解耦,体系结构上也具备良好的申缩性。

如何用多线程去实现这种呢?最简单的办法是让消费者线程不断地循环检查是否符合执行条件,例如下面的代码:

while (value != desire) {
    Thread.sleep(1000);
}
doSomething();

这种是非常 low 的写法,存在以下问题:

  1. 难以确保及时性:睡眠的时候基本不消化处理器资源,但是如果睡得太久,就不能及时发现变化。
  2. 难以降低开销:如果将睡眠时间降低,那么消费者可以更加迅速的感应到变化,但是却要消耗更多的处理器资源,造成浪费。

以上问题用这种方式似乎难以调和,不过 Java 通过内置的通知/等待机制就可以很好的解决这个问题。

等待和通知的相关方法

这些方法是任意 Java 对象都具备的,因为是被定义在最底层的 Object 方法上。

方法名称 描述
wait() 调用该方法后,线程进入 WAITING 状态,只有等待别的线程通知或被中断后才能返回,调用此方法后,会释放当前线程持有的对象锁。
wait(long) 超时等待一段时间,单位毫秒,如果过了这个时间还没有被通知,那么则会超时返回。
wait(long, int) 也是超时等待,不过可以自定义单位,最小可以达到纳秒。
notify() 通知一个在对象对象上等待的线程,使其从 wait 方法返回,而返回的前提是该线程获取到了对象的锁。
notifyAll() 通知所有等待在该对象的线程。

通知等待机制,是指一个线程 A 调用了对象 O 的 wait 方法后进入等待状态(相当于线程 A 在执行 wait 这行代码后卡了),而另一个线程 B 调用了对象 O 的 notify 或 notifyAll 后,线程 A 可以收到通知,从对象 O 的 wait 方法返回(从刚才卡住的位置继续执行),进而执行后续的操作。调用对象 O 的 wait、notify 方法就像是开关信号一样,用来完成等待方和通知方之间的交互工作。

实战案例

public class WaitNotifyDemo1 {
    public static Integer value;
    public static final Object lock = new Object();
    public static void main(String[] args) throws InterruptedException {
        new Thread(() -> {
            synchronized (lock) {
                if (value == null) {
                    try {
                        System.out.println("value 为空,开始等待...");
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println("接收到通知,线程继续执行,value = " + value);
            }
        }, "WaitThread").start();
        TimeUnit.SECONDS.sleep(1);
        new Thread(() -> {
            synchronized (lock) {
                System.out.println(Thread.currentThread().getName() + " 获取到锁");
                value = 10086;
                System.out.println("value 赋值完成!通知等待线程处理...");
                lock.notify();
                try {
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                System.out.println(Thread.currentThread().getName() + " 释放了锁");
            }
        }, "NotifyThread").start();
    }
}

上述的代码需要注意的细节:

  1. 使用 wait、notify、notifyAll 时需要先对调用的对象加锁。
  2. 调用 wait 方法后,WaitThread 的状态由 RUNNING 转为 WAITING,并将当前的线程放在等待队列。
  3. NotifyThread 调用 notify 后,等待的线程依旧不会从 wait 方法返回,而是需要 NotifyThread 释放锁之后,等待的线程才有机会从 wait 放回(还要去竞争锁,不一定抢到)。
  4. notify 方法是将一个线程从等待队列移到同步队列,线程状态由 WAITING 变为 BLOCKED。而 notifyAll 是将等待队列中的所有线程移到同步队列,线程状态由 WAITING 变为 BLOCKED。
  5. 从 wait 方法返回的前提是获取到了对象的锁。
  6. wait、notify、notifyAll 必须依赖于 synchronized,即必须在同步方法/代码块中调用。

等待通知机制依托于同步机制 synchronized ,其目的是确保等待线程从 wait 方法返回时能够感知到其他线程对变量做出的修改。

使用可重入锁也可以实现上面的效果,代码如下:

public class WaitNotifyDemo2 {
    public static Integer value;
    public static final Lock lock = new ReentrantLock();
    public static final Condition condition = lock.newCondition();
    public static void main(String[] args) throws InterruptedException {
        new Thread(() -> {
            lock.lock();
            try {
                if (value == null) {
                    try {
                        System.out.println("value 为空,开始等待...");
                        condition.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println("接收到通知,线程继续执行,value = " + value);
            } finally {
                lock.unlock();
            }
        }, "WaitThread").start();
        TimeUnit.SECONDS.sleep(1);
        new Thread(() -> {
            lock.lock();
            try {
                System.out.println(Thread.currentThread().getName() + " 获取到锁");
                value = 10086;
                System.out.println("value 赋值完成!通知等待线程处理...");
                condition.signal();
                try {
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                System.out.println(Thread.currentThread().getName() + " 释放了锁");
            } finally {
                lock.unlock();
            }
        }, "NotifyThread").start();
    }
}

为什么 wait、notify 必须搭配 synchronized

首先,要明白,每个对象都可以被认为是一个监视器 Monitor,这个监视器由三部分组成(一个独占锁,一个同步队列,一个等待队列)。

注意是一个对象只能有一个独占锁,但是任意线程线程都可以拥有这个独占锁。

  1. 对于对象的非同步方法而言,任意时刻可以有任意个线程调用该方法,即普通方法同一时刻可以有多个线程调用。
  2. 对于对象的同步方法而言,只有拥有这个对象的独占锁才能调用这个同步方法。如果这个独占锁被其他线程占用,那么另外一个调用该同步方法的线程就会处于阻塞状态,此线程进入同步队列。

若一个拥有该独占锁的线程调用该对象同步方法的 wait 方法,则该线程会释放独占锁,并加入对象的等待队列。

某个线程调用 notify、notifyAll 方法是将等待队列的线程转移到同步队列,然后让他们竞争锁,所以这个调用线程本身必须拥有锁。

通知等待的范式

等待方

等待方遵循以下原则:

  1. 获取对象锁。
  2. 如果条件不满足,则调用对象锁的 wait 方法,被通知后仍要检查条件。
  3. 条件满足则执行对应的逻辑。
synchronized(对象) {
    while(条件不满足) {
        对象.wait();
    }
    条件满足对应的处理逻辑
}

通知方

通知方遵循以下原则:

  1. 获取对象锁。
  2. 改变条件。
  3. 通知一个/所有等待在此对象锁上的线程。
synchronized(对象) {
  改变条件
    对象.notify(); // 对象.notifyAll();
}

Thread#join

如果线程 A 执行了线程 ThreadB.join(); 语句,其含义是:当线程 A 等待 ThreadB 执行完毕后才从 ThreadB.join() 返回继续执行。

线程 Thread 除了提供 join 方法之外, 还提供了 join(long)、join(long, int) 两个具备超时属性的方法。表示如果超过了指定的时间,对应的线程还是没有终止,则会直接从超市方法中返回。

下面的程序将会利用 join 的特性,控制三个线程按照顺序执行依次输出 Main、A、B、C。

public class JoinDemo {
    public static void main(String[] args) {
        Thread main = Thread.currentThread();
        Thread a = new Thread(() -> {
            try {
                main.join();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("A");
        }, "A");
        Thread b = new Thread(() -> {
            try {
                a.join();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("B");
        }, "B");
        Thread c = new Thread(() -> {
            try {
                b.join();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("C");
        }, "C");
        a.start();
        b.start();
        c.start();
        System.out.println("Main");
    }
}

每个线程都有自己的前驱线程:Main -> A -> B -> C,当前线程终止的前提是前驱线程终止,等待前驱线程终止之后才能从 join 语句返回并继续执行自身逻辑,这里面也是涉及到了通知等待机制,查看 Thread join 方法的源码即可看到:

// 加锁当前线程对象
public final synchronized void join() throws InterruptedException {
    // 条件不满足,继续等待
    while (isAlive()) {
        wait(0);
    }
    // 条件符合,方法返回
}

当线程终止的时候,会隐式调用线程自身的 notifyAll 方法,会通知所有等待在该线程上的线程,可以看到 join 的实现和刚刚我们总结的范式是一致的,即加锁、循环、处理逻辑。

具体是怎么隐式调用的?Java 代码中并没有找到对应的位置,其实这个逻辑在固化在了 JDK 底层,主要源码都在 src/hotspot/share/runtime/thread.cpp 文件中:

void Thread::call_run() {
  ...
  this->pre_run();
  this->run();
  this->post_run();
  ...
}
void JavaThread::post_run() {
  // 线程将要退出
  this->exit(false);
  ...
}
void JavaThread::exit(bool destroy_vm, ExitType exit_type) {
  ...
  // 线程将要退出
  ensure_join(this);
  ... 
}
static void ensure_join(JavaThread* thread) {
  ...
  // 调用自身的 notify_all 方法
  lock.notify_all(thread);
  ...
}

保证多个线程的执行顺序

刚刚我们介绍的 join 就可以控制多个线程的执行顺序,再回顾一下代码:

public class JoinDemo {
    public static void main(String[] args) {
        Thread main = Thread.currentThread();
        Thread a = new Thread(() -> {
            try {
                main.join();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("A");
        }, "A");
        Thread b = new Thread(() -> {
            try {
                a.join();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("B");
        }, "B");
        a.start();
        b.start();
        System.out.println("Main");
    }
}

join 的本质其实还是内置的通知等待机制,所以我们使用原生的方式也是可以实现的。

public class JoinDemo2 {
    public static void main(String[] args) {
        Thread main = Thread.currentThread();
        Thread a = new Thread(() -> {
            synchronized (main) {
                while (main.getState() != Thread.State.TERMINATED) {
                    try {
                        main.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
            System.out.println("A");
        }, "A");
        Thread b = new Thread(() -> {
            synchronized (a) {
                while (a.getState() != Thread.State.TERMINATED) {
                    try {
                        a.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
            System.out.println("B");
        }, "B");
        a.start();
        b.start();
        System.out.println("Main");
    }
}

每个线程执行完后会默认调用自身线程的 notifyAll 方法,所以我这个代码就没有明确调用了。

相关文章
|
1天前
|
Java 程序员
深入理解Java中的异常处理机制
本文将深入探讨Java中的异常处理机制,通过详细的示例代码和解释,帮助读者更好地理解和应用Java异常处理。我们将从基本概念入手,逐步深入到高级特性,如自定义异常、异常链以及最佳实践。
|
6天前
|
Java
深入理解Java中的异常处理机制
【9月更文挑战第34天】在Java的世界里,异常处理是代码健壮性的守护神。本文将带你探索Java异常处理的奥秘,从基础语法到高级技巧,我们一步步揭开异常处理的面纱。你将学会如何捕获、声明和处理异常,以及如何自定义异常类型。让我们开始这段旅程,让代码更加稳健和可靠吧!
|
11天前
|
移动开发 Android开发 数据安全/隐私保护
移动应用与系统的技术演进:从开发到操作系统的全景解析随着智能手机和平板电脑的普及,移动应用(App)已成为人们日常生活中不可或缺的一部分。无论是社交、娱乐、购物还是办公,移动应用都扮演着重要的角色。而支撑这些应用运行的,正是功能强大且复杂的移动操作系统。本文将深入探讨移动应用的开发过程及其背后的操作系统机制,揭示这一领域的技术演进。
本文旨在提供关于移动应用与系统技术的全面概述,涵盖移动应用的开发生命周期、主要移动操作系统的特点以及它们之间的竞争关系。我们将探讨如何高效地开发移动应用,并分析iOS和Android两大主流操作系统的技术优势与局限。同时,本文还将讨论跨平台解决方案的兴起及其对移动开发领域的影响。通过这篇技术性文章,读者将获得对移动应用开发及操作系统深层理解的钥匙。
|
3天前
|
Java
深入浅出Java异常处理机制
【9月更文挑战第37天】在Java编程的世界里,异常处理是一项基础而重要的技能。它就像是我们生活中的急救箱,遇到意外时能及时救治,避免程序崩溃。本文将通过生动的例子和易于理解的语言,带你了解Java中的异常处理机制,从基本的try-catch语句到自定义异常的创建,让你的程序更加健壮和可靠。准备好了吗?让我们一起走进Java异常处理的大门,探索它的奥秘吧!
|
9天前
|
Java 程序员 开发者
深入理解Java中的异常处理机制
【9月更文挑战第31天】在Java编程中,异常处理是维护程序健壮性的关键。本文将通过浅显易懂的语言和生动的例子,带你了解Java异常处理的基本概念、分类以及如何优雅地处理它们。从初学者到资深开发者,每个人都能从中获得新的洞见和技巧,让你的代码更加健壮和易于维护。
12 4
|
8天前
|
Java 编译器 开发者
Java中的异常处理机制:从基础到进阶
本文深入探讨Java编程语言中的异常处理机制,从基础知识出发,逐步解析异常的分类、捕获和处理方法。通过实际案例分析,展示如何在开发过程中有效利用异常处理提高代码的稳定性和可维护性。进一步探讨了自定义异常的创建和使用场景,以及在Java中进行异常处理的最佳实践。文章旨在为Java开发者提供一个全面而详细的异常处理指南,帮助开发者更好地理解和运用Java的异常处理机制。
|
12天前
|
Java 数据库连接
深入理解Java异常处理机制
【9月更文挑战第28天】在Java编程中,异常处理是确保程序健壮性的关键。本文通过浅显易懂的语言和生动的例子,带你一步步了解Java的异常处理机制。从try-catch-finally的基本用法,到自定义异常类,再到异常处理的最佳实践,我们将一起探索如何在代码中优雅地处理那些不期而遇的小插曲。
16 4
|
10天前
|
存储 关系型数据库 MySQL
深入解析MySQL数据存储机制:从表结构到物理存储
深入解析MySQL数据存储机制:从表结构到物理存储
23 1
|
2天前
|
安全 Java 数据库连接
Python多线程编程:竞争问题的解析与应对策略
Python多线程编程:竞争问题的解析与应对策略
4 0
|
2天前
|
安全 Java 数据库连接
Python多线程编程:竞争问题的解析与应对策略【2】
Python多线程编程:竞争问题的解析与应对策略【2】
6 0

推荐镜像

更多