高并发编程-线程通信_使用wait和notify进行线程间的通信

简介: 高并发编程-线程通信_使用wait和notify进行线程间的通信

20191031000606569.png


概述

Java中线程通信协作的最常见的两种方式:

  • syncrhoized加锁的线程的Object类的wait()/notify()/notifyAll()
  • ReentrantLock类加锁的线程的Condition类的await()/signal()/signalAll()

线程间直接的数据交换:

  • 通过管道进行线程间通信:1)字节流;2)字符流

可参考: Java多线程编程核心技术


场景


场景假设:

一个工作台,两个工人: Worker A 和 Workder B .

约定,Worker A 生产货物到工作台上, Workder B 从工作台 取走(消费)货物。

  • 当 工作台上没有货物时,Worker A 才生产货物,否则等待Worker B 取走(消费)货物。
  • 当 工作台上有货物时, Woker B 才从工作台取走(消费)货物,否则等待Worker A 生产货物


引子


我们先来看下线程之间不通信的情况 (错误示例)

package com.artisan.test;
public class ProduceConsumeWrongDemo {
    // 锁
    private final Object LOCK = new Object();
    // 模拟多线程间需要通信的数据  i
    private int i = 0 ;
    public void produce() throws InterruptedException {
        // 加锁
        synchronized (LOCK){
            System.out.println("produce:" + i++);
            Thread.sleep(1_000);
        }
    }
    public void consume() throws InterruptedException{
        // 加锁
        synchronized (LOCK){
            System.out.println("consume:" + i);
            Thread.sleep(1_000);
        }
    }
    public static void main(String[] args) throws InterruptedException{
        ProduceConsumeWrongDemo pc = new ProduceConsumeWrongDemo();
        // 生产线程
        new Thread(()->{
                while (true){
                    try {
                        pc.produce();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
        }).start();
        // 消费线程
        new Thread(()->{
            while (true){
                try {
                    pc.consume();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

运行结果:

"E:\Program Files\Java\jdk1.8.0_161\bin\java" "-javaagent:E:\Program Files\JetBrains\IntelliJ IDEA 2017.2.4\lib\idea_rt.jar=52137:E:\Program Files\JetBrains\IntelliJ IDEA 2017.2.4\bin" -Dfile.encoding=UTF-8 -classpath "E:\Program Files\Java\jdk1.8.0_161\jre\lib\charsets.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\deploy.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\access-bridge-64.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\cldrdata.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\dnsns.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\jaccess.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\jfxrt.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\localedata.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\nashorn.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\sunec.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\sunjce_provider.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\sunmscapi.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\sunpkcs11.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\zipfs.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\javaws.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\jce.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\jfr.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\jfxswt.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\jsse.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\management-agent.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\plugin.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\resources.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\rt.jar;D:\IdeaProjects\mvc\target\classes" com.artisan.test.ProduceConsumeWrongDemo
produce:0
produce:1
consume:2
consume:2
consume:2
produce:2
consume:3
consume:3
consume:3
produce:3
produce:4
produce:5
consume:6
....
....
....
....
....
....
....


很明显的可以看到,数据都是错乱的,因为没有线程间的通信,全凭CPU调度,生产线程和消费线程都很随意,数据一团糟糕,那该如何改进呢?


synchronized wait/notify机制


wait()——让当前线程 (Thread.concurrentThread()

方法所返回的线程) 释放对象锁并进入等待(阻塞)状态。

notify()——唤醒一个正在等待相应对象锁的线程,使其进入就绪队列,以便在当前线程释放锁后竞争锁,进而得到CPU的执行。

notifyAll()——唤醒所有正在等待相应对象锁的线程,使它们进入就绪队列,以便在当前线程释放锁后竞争锁,进而得到CPU的执行。

为了解决上面的问题,我们先来了解下synchronized wait/notify .


wait()、notify()和notifyAll()方法是本地方法,并且为final方法,无法被重写。


调用某个对象的wait()方法能让当前线程阻塞,并且当前线程必须拥有此对象的monitor(即锁). 因此调用wait()方法必须在同步块或者同步方法中进行(synchronized块或者synchronized方法)。如果当前线程没有这个对象的锁就调用wait()方法,则会抛出IllegalMonitorStateException.


调用某个对象的wait()方法,相当于让当前线程交出(释放)此对象的monitor,然后进入等待状态,等待后续再次获得此对象的锁


调用某个对象的notify()方法能够唤醒一个正在等待这个对象的monitor的线程,如果有多个线程都在等待这个对象的monitor,则只能唤醒其中一个线程. 同样的,调用某个对象的notify()方法,当前线程也必须拥有这个对象的monitor,因此调用notify()方法必须在同步块或者同步方法中进行(synchronized块或者synchronized方法)。


调用notifyAll()方法能够唤醒所有正在等待这个对象的monitor的线程


notify()和notifyAll()方法只是唤醒等待该对象的monitor的线程,并不决定哪个线程能够获取到monitor。


举个例子: 假如有三个线程Thread1、Thread2和Thread3都在等待对象objectA的monitor,此时Thread4拥有对象objectA的monitor,当在Thread4中调用objectA.notify()方法之后,Thread1、Thread2和Thread3只有一个能被唤醒。


注意,被唤醒不等于立刻就获取了objectA的monitor。


假若在Thread4中调用objectA.notifyAll()方法,则Thread1、Thread2和Thread3三个线程都会被唤醒,至于哪个线程接下来能够获取到objectA的monitor就具体依赖于操作系统的调度了。


一个线程被唤醒不代表立即获取了对象的monitor,只有等调用完notify()或者notifyAll()并退出synchronized块,释放对象锁后,其余线程才可获得锁执行。


synchronized wait/notify 改造

package com.artisan.test;
public class ProduceConsumerDemo {
    // 对象监视器-锁
    private final Object LOCK = new Object();
    // 是否生产出数据的标识
    private boolean isProduced = false;
    // volatile 确保可见性, 假设 i 就是生产者生产的数据
    private volatile int i = 0 ;
    public  void produce(){
        // 加锁
        synchronized (LOCK){
            if (isProduced){
                try {
                    // 让当前线程 (Thread.concurrentThread() 方法所返回的线程) 释放对象锁并进入等待(阻塞)状态
                    // 如果已经生产,则等待
                    LOCK.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }else{
                // 生产数据
                i++;
                System.out.println("Produce:" + i);
                // 唤醒一个正在等待相应对象锁的线程,使其进入就绪队列,以便在当前线程释放锁后竞争锁,进而得到CPU的执行
                // 通知等待的Worker B 来消费数据
                LOCK.notify();
                // 将生产标识置为true
                isProduced = true;
            }
        }
    }
    public void consume(){
        // 加锁
        synchronized (LOCK){
            if (isProduced){
                // 消费数据
                System.out.println("Consume:" + i);
                // 唤醒一个正在等待相应对象锁的线程,使其进入就绪队列,以便在当前线程释放锁后竞争锁,进而得到CPU的执行
                // 通知 等待的Wokrer A 生产数据
                LOCK.notify();
                // 已经消费完了,将生产标识置为false
                isProduced = false;
            }else{
                try {
                    // 让当前线程 (Thread.concurrentThread() 方法所返回的线程) 释放对象锁并进入等待(阻塞)状态
                    // 未生产,Worker B等待
                    LOCK.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    public static void main(String[] args) {
        ProduceConsumerDemo produceConsumerDemo = new ProduceConsumerDemo();
        new Thread(){
            @Override
            public void run() {
                while(true) produceConsumerDemo.produce();
            }
        }.start();
        new Thread(){
            @Override
            public void run() {
                while(true) produceConsumerDemo.consume();
            }
        }.start();
    }
}


20191001000746920.png


当然了并不是绝对的上面的对应关系(这里只是为了演示),因为notify唤醒后,线程只是进入Runnable状态,至于哪个线程能进入到running状态,就看哪个线程能抢到CPU的资源了。 JVM规范并没有规定哪个线程优先得到执行权,每个JVM的实现都是不同的


单个生产者 单个消费者,运行OK

.....
.....
.....
Produce:1171
Consume:1171
Produce:1172
Consume:1172
Produce:1173
Consume:1173
Produce:1174
Consume:1174
Produce:1175
Consume:1175
Produce:1176
Consume:1176
.....
.....
.....


问题


单个生产者 单个消费者 上面的代码是没有问题的,加入有多个生产者 和多个消费者呢?

我们来复用上面的代码来演示下 ,其他代码保持不变,仅在main方法中改造下,两个生产者,两个消费者

  Stream.of("P1","P2").forEach(n-> new Thread(){
            @Override
            public void run() {
                while(true) produceConsumerDemo.produce();
            }
        }.start());
        Stream.of("C1","C2").forEach(n->new Thread(){
            @Override
            public void run() {
                while(true) produceConsumerDemo.consume();
            }
        }.start());


2019100100203845.png


下篇博客,我们来分析下原因,并给出解决办法

相关文章
|
3天前
|
Java 程序员 开发者
深入理解Java并发编程:线程同步与锁机制
【4月更文挑战第30天】 在多线程的世界中,确保数据的一致性和线程间的有效通信是至关重要的。本文将深入探讨Java并发编程中的核心概念——线程同步与锁机制。我们将从基本的synchronized关键字开始,逐步过渡到更复杂的ReentrantLock类,并探讨它们如何帮助我们在多线程环境中保持数据完整性和避免常见的并发问题。文章还将通过示例代码,展示这些同步工具在实际开发中的应用,帮助读者构建对Java并发编程深层次的理解。
|
3天前
|
Java
Java并发编程:深入理解线程池
【4月更文挑战第30天】本文将深入探讨Java并发编程中的一个重要主题——线程池。我们将从线程池的基本概念入手,了解其工作原理和优势,然后详细介绍如何使用Java的Executor框架创建和管理线程池。最后,我们将讨论一些高级主题,如自定义线程工厂和拒绝策略。通过本文的学习,你将能够更好地理解和使用Java的线程池,提高你的并发编程能力。
|
3天前
|
安全 Java 调度
深入理解Java并发编程:线程安全与性能优化
【4月更文挑战第30天】本文将深入探讨Java并发编程的核心概念,包括线程安全、同步机制、锁优化以及性能调优。我们将通过实例分析如何确保多线程环境下的数据一致性,同时介绍一些常见的并发模式和最佳实践,旨在帮助开发者在保证线程安全的同时,提升系统的性能和响应能力。
|
2天前
|
存储 安全 Java
深入理解Java并发编程:线程安全与性能优化
【5月更文挑战第1天】本文将深入探讨Java并发编程的核心概念,包括线程安全和性能优化。我们将详细分析线程安全问题的根源,以及如何通过合理的设计和编码实践来避免常见的并发问题。同时,我们还将探讨如何在保证线程安全的前提下,提高程序的并发性能,包括使用高效的同步机制、减少锁的竞争以及利用现代硬件的并行能力等技术手段。
|
2天前
|
缓存 Java 调度
Java并发编程:深入理解线程池
【4月更文挑战第30天】 在Java并发编程中,线程池是一种重要的工具,它可以帮助我们有效地管理线程,提高系统性能。本文将深入探讨Java线程池的工作原理,如何使用它,以及如何根据实际需求选择合适的线程池策略。
|
3天前
|
Java
Java并发编程:深入理解线程池
【4月更文挑战第30天】 本文将深入探讨Java中的线程池,解析其原理、使用场景以及如何合理地利用线程池提高程序性能。我们将从线程池的基本概念出发,介绍其内部工作机制,然后通过实例演示如何创建和使用线程池。最后,我们将讨论线程池的优缺点以及在实际应用中需要注意的问题。
|
3天前
|
存储 安全 Java
深入理解Java并发编程:线程安全与性能优化
【4月更文挑战第30天】在Java开发中,并发编程是一个复杂而又关键的领域。它允许多个线程同时执行,从而提高程序性能和资源利用率。然而,并发编程也带来了许多挑战,如数据不一致、死锁和线程安全问题。本文将深入探讨Java并发编程的核心概念,包括线程安全和性能优化策略。我们将通过实例分析如何在保证线程安全的同时提高程序性能,为Java开发者提供实用的指导。
|
3天前
|
Java
Java并发编程:深入理解线程池
【4月更文挑战第29天】在Java中,线程池是一种管理线程的强大工具,它可以提高系统性能,减少资源消耗。本文将深入探讨Java线程池的工作原理,如何使用它,以及在使用线程池时需要注意的问题。
|
4天前
|
安全 Java
深入理解Java并发编程:线程安全与性能优化
【4月更文挑战第29天】本文将深入探讨Java并发编程的核心概念,包括线程安全和性能优化。我们将首先介绍线程安全的基本概念,然后讨论如何在Java中实现线程安全,包括使用synchronized关键字和Lock接口。接下来,我们将探讨Java并发编程的性能优化策略,如减少锁的粒度、使用读写锁和无锁数据结构。最后,我们将通过实例演示如何在实际项目中应用这些技术。
|
4天前
|
Java
Java并发编程:深入理解线程池
【4月更文挑战第29天】 在Java并发编程中,线程池是一种重要的工具,它可以帮助我们管理线程资源,提高系统性能。本文将深入探讨线程池的工作原理、使用方法以及如何根据实际需求选择合适的线程池参数。通过阅读本文,你将能够更好地理解和使用Java线程池,提高你的并发编程能力。