Java线程通信的精髓:解析通知等待机制的工作原理

本文涉及的产品
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: Java线程通信的精髓:解析通知等待机制的工作原理

通知/等待机制

存在这样一个场景,一个线程修改了一个对象的值,而另一个线程需要感知到变化后去做一些处理。这是一种典型的生产者和消费者模式,这种模式在功能层面可以实现解耦,体系结构上也具备良好的申缩性。

如何用多线程去实现这种呢?最简单的办法是让消费者线程不断地循环检查是否符合执行条件,例如下面的代码:

while (value != desire) {
    Thread.sleep(1000);
}
doSomething();

这种是非常 low 的写法,存在以下问题:

  1. 难以确保及时性:睡眠的时候基本不消化处理器资源,但是如果睡得太久,就不能及时发现变化。
  2. 难以降低开销:如果将睡眠时间降低,那么消费者可以更加迅速的感应到变化,但是却要消耗更多的处理器资源,造成浪费。

以上问题用这种方式似乎难以调和,不过 Java 通过内置的通知/等待机制就可以很好的解决这个问题。

等待和通知的相关方法

这些方法是任意 Java 对象都具备的,因为是被定义在最底层的 Object 方法上。

方法名称 描述
wait() 调用该方法后,线程进入 WAITING 状态,只有等待别的线程通知或被中断后才能返回,调用此方法后,会释放当前线程持有的对象锁。
wait(long) 超时等待一段时间,单位毫秒,如果过了这个时间还没有被通知,那么则会超时返回。
wait(long, int) 也是超时等待,不过可以自定义单位,最小可以达到纳秒。
notify() 通知一个在对象对象上等待的线程,使其从 wait 方法返回,而返回的前提是该线程获取到了对象的锁。
notifyAll() 通知所有等待在该对象的线程。

通知等待机制,是指一个线程 A 调用了对象 O 的 wait 方法后进入等待状态(相当于线程 A 在执行 wait 这行代码后卡了),而另一个线程 B 调用了对象 O 的 notify 或 notifyAll 后,线程 A 可以收到通知,从对象 O 的 wait 方法返回(从刚才卡住的位置继续执行),进而执行后续的操作。调用对象 O 的 wait、notify 方法就像是开关信号一样,用来完成等待方和通知方之间的交互工作。

实战案例

public class WaitNotifyDemo1 {
    public static Integer value;
    public static final Object lock = new Object();
    public static void main(String[] args) throws InterruptedException {
        new Thread(() -> {
            synchronized (lock) {
                if (value == null) {
                    try {
                        System.out.println("value 为空,开始等待...");
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println("接收到通知,线程继续执行,value = " + value);
            }
        }, "WaitThread").start();
        TimeUnit.SECONDS.sleep(1);
        new Thread(() -> {
            synchronized (lock) {
                System.out.println(Thread.currentThread().getName() + " 获取到锁");
                value = 10086;
                System.out.println("value 赋值完成!通知等待线程处理...");
                lock.notify();
                try {
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                System.out.println(Thread.currentThread().getName() + " 释放了锁");
            }
        }, "NotifyThread").start();
    }
}

上述的代码需要注意的细节:

  1. 使用 wait、notify、notifyAll 时需要先对调用的对象加锁。
  2. 调用 wait 方法后,WaitThread 的状态由 RUNNING 转为 WAITING,并将当前的线程放在等待队列。
  3. NotifyThread 调用 notify 后,等待的线程依旧不会从 wait 方法返回,而是需要 NotifyThread 释放锁之后,等待的线程才有机会从 wait 放回(还要去竞争锁,不一定抢到)。
  4. notify 方法是将一个线程从等待队列移到同步队列,线程状态由 WAITING 变为 BLOCKED。而 notifyAll 是将等待队列中的所有线程移到同步队列,线程状态由 WAITING 变为 BLOCKED。
  5. 从 wait 方法返回的前提是获取到了对象的锁。
  6. wait、notify、notifyAll 必须依赖于 synchronized,即必须在同步方法/代码块中调用。

等待通知机制依托于同步机制 synchronized ,其目的是确保等待线程从 wait 方法返回时能够感知到其他线程对变量做出的修改。

使用可重入锁也可以实现上面的效果,代码如下:

public class WaitNotifyDemo2 {
    public static Integer value;
    public static final Lock lock = new ReentrantLock();
    public static final Condition condition = lock.newCondition();
    public static void main(String[] args) throws InterruptedException {
        new Thread(() -> {
            lock.lock();
            try {
                if (value == null) {
                    try {
                        System.out.println("value 为空,开始等待...");
                        condition.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println("接收到通知,线程继续执行,value = " + value);
            } finally {
                lock.unlock();
            }
        }, "WaitThread").start();
        TimeUnit.SECONDS.sleep(1);
        new Thread(() -> {
            lock.lock();
            try {
                System.out.println(Thread.currentThread().getName() + " 获取到锁");
                value = 10086;
                System.out.println("value 赋值完成!通知等待线程处理...");
                condition.signal();
                try {
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                System.out.println(Thread.currentThread().getName() + " 释放了锁");
            } finally {
                lock.unlock();
            }
        }, "NotifyThread").start();
    }
}

为什么 wait、notify 必须搭配 synchronized

首先,要明白,每个对象都可以被认为是一个监视器 Monitor,这个监视器由三部分组成(一个独占锁,一个同步队列,一个等待队列)。

注意是一个对象只能有一个独占锁,但是任意线程线程都可以拥有这个独占锁。

  1. 对于对象的非同步方法而言,任意时刻可以有任意个线程调用该方法,即普通方法同一时刻可以有多个线程调用。
  2. 对于对象的同步方法而言,只有拥有这个对象的独占锁才能调用这个同步方法。如果这个独占锁被其他线程占用,那么另外一个调用该同步方法的线程就会处于阻塞状态,此线程进入同步队列。

若一个拥有该独占锁的线程调用该对象同步方法的 wait 方法,则该线程会释放独占锁,并加入对象的等待队列。

某个线程调用 notify、notifyAll 方法是将等待队列的线程转移到同步队列,然后让他们竞争锁,所以这个调用线程本身必须拥有锁。

通知等待的范式

等待方

等待方遵循以下原则:

  1. 获取对象锁。
  2. 如果条件不满足,则调用对象锁的 wait 方法,被通知后仍要检查条件。
  3. 条件满足则执行对应的逻辑。
synchronized(对象) {
    while(条件不满足) {
        对象.wait();
    }
    条件满足对应的处理逻辑
}

通知方

通知方遵循以下原则:

  1. 获取对象锁。
  2. 改变条件。
  3. 通知一个/所有等待在此对象锁上的线程。
synchronized(对象) {
  改变条件
    对象.notify(); // 对象.notifyAll();
}

Thread#join

如果线程 A 执行了线程 ThreadB.join(); 语句,其含义是:当线程 A 等待 ThreadB 执行完毕后才从 ThreadB.join() 返回继续执行。

线程 Thread 除了提供 join 方法之外, 还提供了 join(long)、join(long, int) 两个具备超时属性的方法。表示如果超过了指定的时间,对应的线程还是没有终止,则会直接从超市方法中返回。

下面的程序将会利用 join 的特性,控制三个线程按照顺序执行依次输出 Main、A、B、C。

public class JoinDemo {
    public static void main(String[] args) {
        Thread main = Thread.currentThread();
        Thread a = new Thread(() -> {
            try {
                main.join();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("A");
        }, "A");
        Thread b = new Thread(() -> {
            try {
                a.join();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("B");
        }, "B");
        Thread c = new Thread(() -> {
            try {
                b.join();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("C");
        }, "C");
        a.start();
        b.start();
        c.start();
        System.out.println("Main");
    }
}

每个线程都有自己的前驱线程:Main -> A -> B -> C,当前线程终止的前提是前驱线程终止,等待前驱线程终止之后才能从 join 语句返回并继续执行自身逻辑,这里面也是涉及到了通知等待机制,查看 Thread join 方法的源码即可看到:

// 加锁当前线程对象
public final synchronized void join() throws InterruptedException {
    // 条件不满足,继续等待
    while (isAlive()) {
        wait(0);
    }
    // 条件符合,方法返回
}

当线程终止的时候,会隐式调用线程自身的 notifyAll 方法,会通知所有等待在该线程上的线程,可以看到 join 的实现和刚刚我们总结的范式是一致的,即加锁、循环、处理逻辑。

具体是怎么隐式调用的?Java 代码中并没有找到对应的位置,其实这个逻辑在固化在了 JDK 底层,主要源码都在 src/hotspot/share/runtime/thread.cpp 文件中:

void Thread::call_run() {
  ...
  this->pre_run();
  this->run();
  this->post_run();
  ...
}
void JavaThread::post_run() {
  // 线程将要退出
  this->exit(false);
  ...
}
void JavaThread::exit(bool destroy_vm, ExitType exit_type) {
  ...
  // 线程将要退出
  ensure_join(this);
  ... 
}
static void ensure_join(JavaThread* thread) {
  ...
  // 调用自身的 notify_all 方法
  lock.notify_all(thread);
  ...
}

保证多个线程的执行顺序

刚刚我们介绍的 join 就可以控制多个线程的执行顺序,再回顾一下代码:

public class JoinDemo {
    public static void main(String[] args) {
        Thread main = Thread.currentThread();
        Thread a = new Thread(() -> {
            try {
                main.join();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("A");
        }, "A");
        Thread b = new Thread(() -> {
            try {
                a.join();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("B");
        }, "B");
        a.start();
        b.start();
        System.out.println("Main");
    }
}

join 的本质其实还是内置的通知等待机制,所以我们使用原生的方式也是可以实现的。

public class JoinDemo2 {
    public static void main(String[] args) {
        Thread main = Thread.currentThread();
        Thread a = new Thread(() -> {
            synchronized (main) {
                while (main.getState() != Thread.State.TERMINATED) {
                    try {
                        main.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
            System.out.println("A");
        }, "A");
        Thread b = new Thread(() -> {
            synchronized (a) {
                while (a.getState() != Thread.State.TERMINATED) {
                    try {
                        a.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
            System.out.println("B");
        }, "B");
        a.start();
        b.start();
        System.out.println("Main");
    }
}

每个线程执行完后会默认调用自身线程的 notifyAll 方法,所以我这个代码就没有明确调用了。

相关文章
|
3天前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
14 2
|
18天前
|
存储 Java 关系型数据库
高效连接之道:Java连接池原理与最佳实践
在Java开发中,数据库连接是应用与数据交互的关键环节。频繁创建和关闭连接会消耗大量资源,导致性能瓶颈。为此,Java连接池技术通过复用连接,实现高效、稳定的数据库连接管理。本文通过案例分析,深入探讨Java连接池的原理与最佳实践,包括连接池的基本操作、配置和使用方法,以及在电商应用中的具体应用示例。
37 5
|
7天前
|
存储 算法 Java
大厂面试高频:什么是自旋锁?Java 实现自旋锁的原理?
本文详解自旋锁的概念、优缺点、使用场景及Java实现。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
大厂面试高频:什么是自旋锁?Java 实现自旋锁的原理?
|
8天前
|
Java
Java之CountDownLatch原理浅析
本文介绍了Java并发工具类`CountDownLatch`的使用方法、原理及其与`Thread.join()`的区别。`CountDownLatch`通过构造函数接收一个整数参数作为计数器,调用`countDown`方法减少计数,`await`方法会阻塞当前线程,直到计数为零。文章还详细解析了其内部机制,包括初始化、`countDown`和`await`方法的工作原理,并给出了一个游戏加载场景的示例代码。
Java之CountDownLatch原理浅析
|
10天前
|
Java 索引 容器
Java ArrayList扩容的原理
Java 的 `ArrayList` 是基于数组实现的动态集合。初始时,`ArrayList` 底层创建一个空数组 `elementData`,并设置 `size` 为 0。当首次添加元素时,会调用 `grow` 方法将数组扩容至默认容量 10。之后每次添加元素时,如果当前数组已满,则会再次调用 `grow` 方法进行扩容。扩容规则为:首次扩容至 10,后续扩容至原数组长度的 1.5 倍或根据实际需求扩容。例如,当需要一次性添加 100 个元素时,会直接扩容至 110 而不是 15。
Java ArrayList扩容的原理
|
5天前
|
存储 消息中间件 算法
深入探索操作系统的心脏——内核机制解析
本文旨在揭示操作系统核心——内核的工作原理,通过剖析其关键组件与机制,为读者提供一个清晰的内核结构图景。不同于常规摘要的概述性内容,本文摘要将直接聚焦于内核的核心概念、主要功能以及其在系统管理中扮演的角色,旨在激发读者对操作系统深层次运作原理的兴趣与理解。
|
16天前
|
存储 Java 关系型数据库
在Java开发中,数据库连接是应用与数据交互的关键环节。本文通过案例分析,深入探讨Java连接池的原理与最佳实践
在Java开发中,数据库连接是应用与数据交互的关键环节。本文通过案例分析,深入探讨Java连接池的原理与最佳实践,包括连接创建、分配、复用和释放等操作,并通过电商应用实例展示了如何选择合适的连接池库(如HikariCP)和配置参数,实现高效、稳定的数据库连接管理。
33 2
|
17天前
|
存储 缓存 安全
🌟Java零基础:深入解析Java序列化机制
【10月更文挑战第20天】本文收录于「滚雪球学Java」专栏,专业攻坚指数级提升,希望能够助你一臂之力,帮你早日登顶实现财富自由🚀;同时,欢迎大家关注&&收藏&&订阅!持续更新中,up!up!up!!
21 3
|
16天前
|
算法 Java 数据库连接
Java连接池技术,从基础概念出发,解析了连接池的工作原理及其重要性
本文详细介绍了Java连接池技术,从基础概念出发,解析了连接池的工作原理及其重要性。连接池通过复用数据库连接,显著提升了应用的性能和稳定性。文章还展示了使用HikariCP连接池的示例代码,帮助读者更好地理解和应用这一技术。
31 1
|
3天前
|
存储 供应链 物联网
深入解析区块链技术的核心原理与应用前景
深入解析区块链技术的核心原理与应用前景

推荐镜像

更多