Java线程通信的精髓:解析通知等待机制的工作原理

简介: Java线程通信的精髓:解析通知等待机制的工作原理

通知/等待机制

存在这样一个场景,一个线程修改了一个对象的值,而另一个线程需要感知到变化后去做一些处理。这是一种典型的生产者和消费者模式,这种模式在功能层面可以实现解耦,体系结构上也具备良好的申缩性。

如何用多线程去实现这种呢?最简单的办法是让消费者线程不断地循环检查是否符合执行条件,例如下面的代码:

while (value != desire) {
    Thread.sleep(1000);
}
doSomething();

这种是非常 low 的写法,存在以下问题:

  1. 难以确保及时性:睡眠的时候基本不消化处理器资源,但是如果睡得太久,就不能及时发现变化。
  2. 难以降低开销:如果将睡眠时间降低,那么消费者可以更加迅速的感应到变化,但是却要消耗更多的处理器资源,造成浪费。

以上问题用这种方式似乎难以调和,不过 Java 通过内置的通知/等待机制就可以很好的解决这个问题。

等待和通知的相关方法

这些方法是任意 Java 对象都具备的,因为是被定义在最底层的 Object 方法上。

方法名称 描述
wait() 调用该方法后,线程进入 WAITING 状态,只有等待别的线程通知或被中断后才能返回,调用此方法后,会释放当前线程持有的对象锁。
wait(long) 超时等待一段时间,单位毫秒,如果过了这个时间还没有被通知,那么则会超时返回。
wait(long, int) 也是超时等待,不过可以自定义单位,最小可以达到纳秒。
notify() 通知一个在对象对象上等待的线程,使其从 wait 方法返回,而返回的前提是该线程获取到了对象的锁。
notifyAll() 通知所有等待在该对象的线程。

通知等待机制,是指一个线程 A 调用了对象 O 的 wait 方法后进入等待状态(相当于线程 A 在执行 wait 这行代码后卡了),而另一个线程 B 调用了对象 O 的 notify 或 notifyAll 后,线程 A 可以收到通知,从对象 O 的 wait 方法返回(从刚才卡住的位置继续执行),进而执行后续的操作。调用对象 O 的 wait、notify 方法就像是开关信号一样,用来完成等待方和通知方之间的交互工作。

实战案例

public class WaitNotifyDemo1 {
    public static Integer value;
    public static final Object lock = new Object();
    public static void main(String[] args) throws InterruptedException {
        new Thread(() -> {
            synchronized (lock) {
                if (value == null) {
                    try {
                        System.out.println("value 为空,开始等待...");
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println("接收到通知,线程继续执行,value = " + value);
            }
        }, "WaitThread").start();
        TimeUnit.SECONDS.sleep(1);
        new Thread(() -> {
            synchronized (lock) {
                System.out.println(Thread.currentThread().getName() + " 获取到锁");
                value = 10086;
                System.out.println("value 赋值完成!通知等待线程处理...");
                lock.notify();
                try {
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                System.out.println(Thread.currentThread().getName() + " 释放了锁");
            }
        }, "NotifyThread").start();
    }
}

上述的代码需要注意的细节:

  1. 使用 wait、notify、notifyAll 时需要先对调用的对象加锁。
  2. 调用 wait 方法后,WaitThread 的状态由 RUNNING 转为 WAITING,并将当前的线程放在等待队列。
  3. NotifyThread 调用 notify 后,等待的线程依旧不会从 wait 方法返回,而是需要 NotifyThread 释放锁之后,等待的线程才有机会从 wait 放回(还要去竞争锁,不一定抢到)。
  4. notify 方法是将一个线程从等待队列移到同步队列,线程状态由 WAITING 变为 BLOCKED。而 notifyAll 是将等待队列中的所有线程移到同步队列,线程状态由 WAITING 变为 BLOCKED。
  5. 从 wait 方法返回的前提是获取到了对象的锁。
  6. wait、notify、notifyAll 必须依赖于 synchronized,即必须在同步方法/代码块中调用。

等待通知机制依托于同步机制 synchronized ,其目的是确保等待线程从 wait 方法返回时能够感知到其他线程对变量做出的修改。

使用可重入锁也可以实现上面的效果,代码如下:

public class WaitNotifyDemo2 {
    public static Integer value;
    public static final Lock lock = new ReentrantLock();
    public static final Condition condition = lock.newCondition();
    public static void main(String[] args) throws InterruptedException {
        new Thread(() -> {
            lock.lock();
            try {
                if (value == null) {
                    try {
                        System.out.println("value 为空,开始等待...");
                        condition.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println("接收到通知,线程继续执行,value = " + value);
            } finally {
                lock.unlock();
            }
        }, "WaitThread").start();
        TimeUnit.SECONDS.sleep(1);
        new Thread(() -> {
            lock.lock();
            try {
                System.out.println(Thread.currentThread().getName() + " 获取到锁");
                value = 10086;
                System.out.println("value 赋值完成!通知等待线程处理...");
                condition.signal();
                try {
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                System.out.println(Thread.currentThread().getName() + " 释放了锁");
            } finally {
                lock.unlock();
            }
        }, "NotifyThread").start();
    }
}

为什么 wait、notify 必须搭配 synchronized

首先,要明白,每个对象都可以被认为是一个监视器 Monitor,这个监视器由三部分组成(一个独占锁,一个同步队列,一个等待队列)。

注意是一个对象只能有一个独占锁,但是任意线程线程都可以拥有这个独占锁。

  1. 对于对象的非同步方法而言,任意时刻可以有任意个线程调用该方法,即普通方法同一时刻可以有多个线程调用。
  2. 对于对象的同步方法而言,只有拥有这个对象的独占锁才能调用这个同步方法。如果这个独占锁被其他线程占用,那么另外一个调用该同步方法的线程就会处于阻塞状态,此线程进入同步队列。

若一个拥有该独占锁的线程调用该对象同步方法的 wait 方法,则该线程会释放独占锁,并加入对象的等待队列。

某个线程调用 notify、notifyAll 方法是将等待队列的线程转移到同步队列,然后让他们竞争锁,所以这个调用线程本身必须拥有锁。

通知等待的范式

等待方

等待方遵循以下原则:

  1. 获取对象锁。
  2. 如果条件不满足,则调用对象锁的 wait 方法,被通知后仍要检查条件。
  3. 条件满足则执行对应的逻辑。
synchronized(对象) {
    while(条件不满足) {
        对象.wait();
    }
    条件满足对应的处理逻辑
}

通知方

通知方遵循以下原则:

  1. 获取对象锁。
  2. 改变条件。
  3. 通知一个/所有等待在此对象锁上的线程。
synchronized(对象) {
  改变条件
    对象.notify(); // 对象.notifyAll();
}

Thread#join

如果线程 A 执行了线程 ThreadB.join(); 语句,其含义是:当线程 A 等待 ThreadB 执行完毕后才从 ThreadB.join() 返回继续执行。

线程 Thread 除了提供 join 方法之外, 还提供了 join(long)、join(long, int) 两个具备超时属性的方法。表示如果超过了指定的时间,对应的线程还是没有终止,则会直接从超市方法中返回。

下面的程序将会利用 join 的特性,控制三个线程按照顺序执行依次输出 Main、A、B、C。

public class JoinDemo {
    public static void main(String[] args) {
        Thread main = Thread.currentThread();
        Thread a = new Thread(() -> {
            try {
                main.join();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("A");
        }, "A");
        Thread b = new Thread(() -> {
            try {
                a.join();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("B");
        }, "B");
        Thread c = new Thread(() -> {
            try {
                b.join();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("C");
        }, "C");
        a.start();
        b.start();
        c.start();
        System.out.println("Main");
    }
}

每个线程都有自己的前驱线程:Main -> A -> B -> C,当前线程终止的前提是前驱线程终止,等待前驱线程终止之后才能从 join 语句返回并继续执行自身逻辑,这里面也是涉及到了通知等待机制,查看 Thread join 方法的源码即可看到:

// 加锁当前线程对象
public final synchronized void join() throws InterruptedException {
    // 条件不满足,继续等待
    while (isAlive()) {
        wait(0);
    }
    // 条件符合,方法返回
}

当线程终止的时候,会隐式调用线程自身的 notifyAll 方法,会通知所有等待在该线程上的线程,可以看到 join 的实现和刚刚我们总结的范式是一致的,即加锁、循环、处理逻辑。

具体是怎么隐式调用的?Java 代码中并没有找到对应的位置,其实这个逻辑在固化在了 JDK 底层,主要源码都在 src/hotspot/share/runtime/thread.cpp 文件中:

void Thread::call_run() {
  ...
  this->pre_run();
  this->run();
  this->post_run();
  ...
}
void JavaThread::post_run() {
  // 线程将要退出
  this->exit(false);
  ...
}
void JavaThread::exit(bool destroy_vm, ExitType exit_type) {
  ...
  // 线程将要退出
  ensure_join(this);
  ... 
}
static void ensure_join(JavaThread* thread) {
  ...
  // 调用自身的 notify_all 方法
  lock.notify_all(thread);
  ...
}

保证多个线程的执行顺序

刚刚我们介绍的 join 就可以控制多个线程的执行顺序,再回顾一下代码:

public class JoinDemo {
    public static void main(String[] args) {
        Thread main = Thread.currentThread();
        Thread a = new Thread(() -> {
            try {
                main.join();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("A");
        }, "A");
        Thread b = new Thread(() -> {
            try {
                a.join();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("B");
        }, "B");
        a.start();
        b.start();
        System.out.println("Main");
    }
}

join 的本质其实还是内置的通知等待机制,所以我们使用原生的方式也是可以实现的。

public class JoinDemo2 {
    public static void main(String[] args) {
        Thread main = Thread.currentThread();
        Thread a = new Thread(() -> {
            synchronized (main) {
                while (main.getState() != Thread.State.TERMINATED) {
                    try {
                        main.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
            System.out.println("A");
        }, "A");
        Thread b = new Thread(() -> {
            synchronized (a) {
                while (a.getState() != Thread.State.TERMINATED) {
                    try {
                        a.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
            System.out.println("B");
        }, "B");
        a.start();
        b.start();
        System.out.println("Main");
    }
}

每个线程执行完后会默认调用自身线程的 notifyAll 方法,所以我这个代码就没有明确调用了。

相关文章
|
2月前
|
人工智能 Java 关系型数据库
Java——SPI机制详解
SPI(Service Provider Interface)是JDK内置的服务提供发现机制,主要用于框架扩展和组件替换。通过在`META-INF/services/`目录下定义接口实现类文件,Java程序可利用`ServiceLoader`动态加载服务实现。SPI核心思想是解耦,允许不同厂商为同一接口提供多种实现,如`java.sql.Driver`的MySQL与PostgreSQL实现。然而,SPI存在缺陷:需遍历所有实现并实例化,可能造成资源浪费;获取实现类方式不够灵活;多线程使用时存在安全问题。尽管如此,SPI仍是Java生态系统中实现插件化和模块化设计的重要工具。
|
9天前
|
人工智能 前端开发 安全
Java开发不可不知的秘密:类加载器实现机制
类加载器是Java中负责动态加载类到JVM的组件,理解其工作原理对开发复杂应用至关重要。本文详解类加载过程、双亲委派模型及常见类加载器,并介绍自定义类加载器的实现与应用场景。
|
2月前
|
设计模式 人工智能 安全
AQS:Java 中悲观锁的底层实现机制
AQS(AbstractQueuedSynchronizer)是Java并发包中实现同步组件的基础工具,支持锁(如ReentrantLock、ReadWriteLock)和线程同步工具类(如CountDownLatch、Semaphore)等。Doug Lea设计AQS旨在抽象基础同步操作,简化同步组件构建。 使用AQS需实现`tryAcquire(int arg)`和`tryRelease(int arg)`方法以获取和释放资源,共享模式还需实现`tryAcquireShared(int arg)`和`tryReleaseShared(int arg)`。
107 32
AQS:Java 中悲观锁的底层实现机制
|
2月前
|
Java 区块链 网络架构
酷阿鲸森林农场:Java 区块链系统中的 P2P 区块同步与节点自动加入机制
本文介绍了基于 Java 的去中心化区块链电商系统设计与实现,重点探讨了 P2P 网络在酷阿鲸森林农场项目中的应用。通过节点自动发现、区块广播同步及链校验功能,系统实现了无需中心服务器的点对点网络架构。文章详细解析了核心代码逻辑,包括 P2P 服务端监听、客户端广播新区块及节点列表自动获取等环节,并提出了消息签名验证、WebSocket 替代 Socket 等优化方向。该系统不仅适用于农业电商,还可扩展至教育、物流等领域,构建可信数据链条。
|
2月前
|
人工智能 JavaScript Java
Java反射机制及原理
本文介绍了Java反射机制的基本概念、使用方法及其原理。反射在实际项目中比代理更常用,掌握它可以提升编程能力并理解框架设计原理。文章详细讲解了获取Class对象的四种方式:对象.getClass()、类.class、Class.forName()和类加载器.loadClass(),并分析了Class.forName()与ClassLoader的区别。此外,还探讨了通过Class对象进行实例化、获取方法和字段等操作的具体实现。最后从JVM类加载机制角度解析了Class对象的本质及其与类和实例的关系,帮助读者深入理解Java反射的工作原理。
|
4月前
|
缓存 Dubbo Java
理解的Java中SPI机制
本文深入解析了JDK提供的Java SPI(Service Provider Interface)机制,这是一种基于接口编程、策略模式与配置文件组合实现的动态加载机制,核心在于解耦。文章通过具体示例介绍了SPI的使用方法,包括定义接口、创建配置文件及加载实现类的过程,并分析了其原理与优缺点。SPI适用于框架扩展或替换场景,如JDBC驱动加载、SLF4J日志实现等,但存在加载效率低和线程安全问题。
151 7
理解的Java中SPI机制
|
3月前
|
存储 Java 编译器
Java 中 .length 的使用方法:深入理解 Java 数据结构中的长度获取机制
本文深入解析了 Java 中 `.length` 的使用方法及其在不同数据结构中的应用。对于数组,通过 `.length` 属性获取元素数量;字符串则使用 `.length()` 方法计算字符数;集合类如 `ArrayList` 采用 `.size()` 方法统计元素个数。此外,基本数据类型和包装类不支持长度属性。掌握这些区别,有助于开发者避免常见错误,提升代码质量。
209 1
|
4月前
|
传感器 人工智能 监控
反向寻车系统怎么做?基本原理与系统组成解析
本文通过反向寻车系统的核心组成部分与技术分析,阐述反向寻车系统的工作原理,适用于适用于商场停车场、医院停车场及火车站停车场等。如需获取智慧停车场反向寻车技术方案前往文章最下方获取,如有项目合作及技术交流欢迎私信作者。
288 2
|
4月前
|
存储 设计模式 Java
重学Java基础篇—ThreadLocal深度解析与最佳实践
ThreadLocal 是一种实现线程隔离的机制,为每个线程创建独立变量副本,适用于数据库连接管理、用户会话信息存储等场景。
146 5
|
4月前
|
存储 监控 安全
重学Java基础篇—类的生命周期深度解析
本文全面解析了Java类的生命周期,涵盖加载、验证、准备、解析、初始化、使用及卸载七个关键阶段。通过分阶段执行机制详解(如加载阶段的触发条件与技术实现),结合方法调用机制、内存回收保护等使用阶段特性,以及卸载条件和特殊场景处理,帮助开发者深入理解JVM运作原理。同时,文章探讨了性能优化建议、典型异常处理及新一代JVM特性(如元空间与模块化系统)。总结中强调安全优先、延迟加载与动态扩展的设计思想,并提供开发建议与进阶方向,助力解决性能调优、内存泄漏排查及框架设计等问题。
163 5

推荐镜像

更多
  • DNS