【并发编程】线程间的通信

简介: 【并发编程】线程间的通信

1.wait、notify、notifyAll

  • 在多线程环境下,有时候一个线程的执行,依赖于另一个线程的某种状态的改变,这时就可以使用wait和notify或者notifyAll。
  • wait和sleep的区别:wait会释放持有的锁,但是sleep不会,sleep知识让线程在指定的时间内,不去抢占cpu的资源。
  • wait notify在使用的时候必须放在同步代码块里,必须拥有当前对象的锁,不能获取A对象的锁,去唤醒B对象。
  • notify随机唤醒一个等待的线程,notifyAll唤醒所有在该对象上等待的线程。
public class WaitDemo {
    private static boolean flag = false;
    private static Object object = new Object();
    public static void main(String[] args) throws InterruptedException {
        new Thread(() -> {
            synchronized (object) {
                if (!flag) {
                    try {
                        System.out.println("flag is false");
                        System.out.println(object+"进入等待状态");
                        object.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
            System.out.println("flag is true");
        }).start();
        Thread.sleep(2000L);
        new Thread(() -> {
            synchronized (object) {
                flag = true;
                object.notify();
                System.out.println(object+"被唤醒");
            }
        }).start();
    }
}

4a140df19445429c94bd77998847289a.jpg

2.生产者消费者模型

(1)生产者消费者模型图

6c0f00dad5ad43d9913b3dacd07ddfe5.jpg

(2)编码实战

  • 中间商Broker代码
public class Broker {
    //当前库存数
    private static int num;
    //规定最大库存数量
    private static final int TOTAL = 20;
    /**
     * 生产者生产产品存入库存
     */
    public synchronized void put(){
        //先判断库存有没有满
        if(num < TOTAL){
            //库存没有满时,生产者生产
            System.out.println("---库存新增一个,当前库存为:"+ ++num);
            //唤醒消费者消费
            notifyAll();
        }else{
            try {
                //库存满时,生产这进入等待状态
                System.out.println("***库存已满,生产者等待生产");
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    /**
     * 消费者消费库存
     */
    public synchronized void take(){
        //先判断是否有库存
        if(num>0){
            System.out.println("---库存减少1个,当前库存为:"+ --num);
            //唤醒生产者
            notifyAll();
        }else{
            try {
                System.out.println("***暂无库存,消费者等待消费");
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
  • 生产者Producer代码
public class Producer implements Runnable {
    private Broker broker;
    public Producer(Broker broker) {
        this.broker = broker;
    }
    @Override
    public void run() {
        while (true) {
            System.out.println("###生产者生产一件商品");
            broker.put();
        }
    }
}
  • 消费者Consumer代码
public class Consumer implements Runnable {
    private Broker broker;
    public Consumer(Broker broker) {
        this.broker = broker;
    }
    @Override
    public void run() {
        while (true) {
            System.out.println("###消费者消费一件商品");
            broker.take();
        }
    }
}
  • 测试代码
public static void main(String[] args) {
    //创建中间商
    Broker broker = new Broker();
    //生产者线程
    for (int i = 0; i < 5; i++) {
        new Thread(new Producer(broker)).start();
    }
    //消费者线程
    for (int i = 0; i < 5; i++) {
        new Thread(new Consumer(broker)).start();
    }
}


f98fa397176346a9b3a01cc8c94f48ce.jpg

3.管道流进行线程间的通信

  • 管道流进行通信其实就是以内存为媒介,一个线程去往里面存数据,一个线程去里面取数据,用于线程间的通信。
  • 主要有两类
  • 面向字节:【PipedOutputStream、PipedInputStream】
  • 面向字符:【PipedReader、PipedWriter】

(1)字节管道流

  • 编写线程ByteStreamReader类
public class ByteStreamReader implements Runnable {
    private PipedInputStream pipedInputStream;
    public ByteStreamReader(PipedInputStream pipedInputStream) {
        this.pipedInputStream = pipedInputStream;
    }
    @Override
    public void run() {
        try {
            if(pipedInputStream != null){
                //读取内存中中的数据
                String str = new BufferedReader(new InputStreamReader(pipedInputStream)).lines().collect(Collectors.joining("\n"));
                System.out.println("当前线程:"+Thread.currentThread().getName()+"读取内存中的数据:"+str);
            }
            pipedInputStream.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
  • 测试代码
public static void main(String[] args) throws IOException {
    //创建管道输入流
    PipedInputStream pipedInputStream = new PipedInputStream();
  //创建管道输出流
    PipedOutputStream pipedOutputStream = new PipedOutputStream();
  //输入流与输出流建立连接
    pipedOutputStream.connect(pipedInputStream);
  //启动线程,将输入流作为参数传输进去
    new Thread(new ByteStreamReader(pipedInputStream)).start();
  //创建字符输入流
    BufferedReader bufferedReader = null;
    System.out.print("当前线程:"+Thread.currentThread().getName()+"向内存中写入数据:");
  //将控制台输入的内容转化成流
    bufferedReader = new BufferedReader(new InputStreamReader(System.in));
  //写入内存
    pipedOutputStream.write(bufferedReader.readLine().getBytes());
    pipedOutputStream.close();
    if(bufferedReader != null){
        bufferedReader.close();
     }
}

171142d85bdc4200b804e4dda8e53bd2.jpg

  • 注意:不要在同一个线程中使用PipInputStream和PipOutputStream,会造成死锁。

4.Thread.join()方法

(1)join()方法简介

  • join()方法一共三个方法重载
public final void join() throws InterruptedException;
public final synchronized void join(long millis) throws InterruptedException;
public final synchronized void join(long millis, int nanos) throws InterruptedException;
  • 三个重载最终都掉用一个参数的版本。
  • join()和join(0)是等价的,表示会一直等下去,join(非0)表示等待一段时间。
  • 使用场景:线程A执行到一半,需要一个数据,这个数据需要线程B去执行修改,只有B修改完成之后,A才能继续操作。

(2)join的使用

public class JoinDemo {
    public static int num = 0;
    public void add() {
        num++;
    }
    public static void main(String[] args) {
        JoinDemo joinDemo = new JoinDemo();
        Thread thread = new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + ":开始执行");
            System.out.println(Thread.currentThread().getName() + ":执行num+1");
            joinDemo.add();
            System.out.println(Thread.currentThread().getName() + ":结束执行");
        }, "线程1");
        new Thread(() -> {
            try {
                System.out.println(Thread.currentThread().getName() + ":开始执行");
                thread.start();
                /**join方法控制让线程2中的线程1先执行完成以后在执行线程2后面的操作*/
                thread.join();
                if (num == 1) {
                    System.out.println(Thread.currentThread().getName() + ":拿到的num为:" + num);
                }
                System.out.println(Thread.currentThread().getName() + ":结束执行");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "线程2").start();
    }
}
  • 没有加上join()方法的运行结果

3f684aedfa864518b2af6b3f76b1ddf3.jpg

  • 加上join()方法的运行结果

d571399a0f474606b795f2d82097debd.jpg

5.Condition详解

(1)Condition简介

  • 在线程Thread类中线程之间通信是通过object类的wait()和notify()方式实现的。而ReentrantLock也有类似于wait()和notify()功能。前者是java底层级别后者是语言级别的具有更高的可控制性和扩展性。
  • 二者的区别:
  • Condition能够支持不响应式中断,而通过使用Object方式不支持。
  • Condition能偶支持多个等待队列(new多个Condition对像),而Object方式只能支持一个。
  • Condition能够支持超时时间的设置,而Object不支持。

(2)案例实战

  • 简单案例
public class ConditionDemo implements Runnable{
    private static Lock lock = new ReentrantLock();
    private static Condition condition = lock.newCondition();
    @Override
    public void run() {
        try{
            lock.lock();
            condition.await();
            System.out.println("Thread is going on");
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }
    public static void main(String[] args) throws InterruptedException {
        Thread thread = new Thread(new ConditionDemo());
    //启动线程
        thread.start();
    //睡眠2s
        Thread.sleep(2000);
    //加锁,因为condition在调用await()方法时,会释放锁资源,所以要重新加锁
        lock.lock();
    //唤醒
        condition.signal();
    //解锁
        lock.unlock();
    }
}

1736b5d1459740229704bcaf47067842.jpg

新建的线程thread调用start()方法后执行run()方法,此时掉用lock.lock()方法进行加锁,此时线程获得锁,继续执行condition.await()方法,这个时候线程会释放刚才获得的锁资源,将线程加入到condition维护的等待队列中,

等调用condition.signal()方法后,会唤醒condition等待对类中的一个线程加入到AQS对列中去,直至唤醒的线程重新获取所资源后才能继续向下执行。

  • 生产者消费者模型
public class ConditionDemo {
    private int queueSize=10;
    //定义优先队列,大小初始化为10
    private PriorityQueue<Integer> queue = new PriorityQueue<>(queueSize);
    //定义ReentrantLock,Condition要配合锁使用
    private Lock lock = new ReentrantLock();
    //定义生产者的Condition对象
    private Condition producer = lock.newCondition();
    //定义消费者的Condition对象
    private Condition consumer = lock.newCondition();
    class Consumer extends Thread{
        volatile boolean flag = true;
        private void consume(){
            //循环调用
            while(flag){
                //加锁
                lock.lock();
                try{
                    /**
                     * 如果队列是空就让消费者停止消费,进入等待状态,循环等待,
                     * 保证不会在有消费者线程去执行await()方法
                     */
                    while(queue.isEmpty()){
                        try{
                            System.out.println("队列空,等待数据");
                            consumer.await();
                        } catch (InterruptedException e) {
                            //发生异常结束方法执行
                            flag=false;
                        }
                    }
                    //队列弹出一个元素
                    queue.poll();
                    //唤醒生产者
                    producer.signal();
                    System.out.println("从队列中取走一个元素,队列剩余"+queue.size()+"个元素");
                }finally {
                    //最后一定要进行解锁操作
                    lock.unlock();
                }
            }
        }
        @Override
        public void run() {
            consume();
        }
    }
    class Producer extends Thread{
        volatile boolean flag = true;
        private void produce(){
            //循环调用
            while(flag){
                //加锁
                lock.lock();
                try{
                    /**
                     * 判断队列是否已满,如果队列的大小等于规定好的队列长度
                     * 就让生产者进行等待
                     */
                    while(queue.size() == queueSize){
                        try {
                            System.out.println("队列满,等待有空余空间");
                            producer.await();
                        }catch (InterruptedException e){
                            //发生异常结束方法执行
                            flag=false;
                        }
                    }
                    //生产一个元素
                    queue.offer(1); //每次插入一个元素
                    //唤醒消费者
                    consumer.signal();
                    System.out.println("向队列中插入一个元素,队列剩余"+queue.size()+"个元素");
                }finally {
                    lock.unlock();
                }
            }
        }
        @Override
        public void run() {
            produce();
        }
    }
    public static void main(String[] args) {
        ConditionDemo conditionDemo = new ConditionDemo();
        Producer producer = conditionDemo.new Producer();
        Consumer consumer = conditionDemo.new Consumer();
        producer.start();
        consumer.start();
        producer.interrupt();
        consumer.interrupt();
    }
}


498367bb7f0f4e458e59622b0e0eee14.jpg

相关文章
|
24天前
|
缓存 Java 开发者
Java多线程并发编程:同步机制与实践应用
本文深入探讨Java多线程中的同步机制,分析了多线程并发带来的数据不一致等问题,详细介绍了`synchronized`关键字、`ReentrantLock`显式锁及`ReentrantReadWriteLock`读写锁的应用,结合代码示例展示了如何有效解决竞态条件,提升程序性能与稳定性。
98 6
|
2月前
|
Java 调度
[Java]线程生命周期与线程通信
本文详细探讨了线程生命周期与线程通信。文章首先分析了线程的五个基本状态及其转换过程,结合JDK1.8版本的特点进行了深入讲解。接着,通过多个实例介绍了线程通信的几种实现方式,包括使用`volatile`关键字、`Object`类的`wait()`和`notify()`方法、`CountDownLatch`、`ReentrantLock`结合`Condition`以及`LockSupport`等工具。全文旨在帮助读者理解线程管理的核心概念和技术细节。
41 1
[Java]线程生命周期与线程通信
|
1月前
|
并行计算 数据处理 调度
Python中的并发编程:探索多线程与多进程的奥秘####
本文深入探讨了Python中并发编程的两种主要方式——多线程与多进程,通过对比分析它们的工作原理、适用场景及性能差异,揭示了在不同应用需求下如何合理选择并发模型。文章首先简述了并发编程的基本概念,随后详细阐述了Python中多线程与多进程的实现机制,包括GIL(全局解释器锁)对多线程的影响以及多进程的独立内存空间特性。最后,通过实例演示了如何在Python项目中有效利用多线程和多进程提升程序性能。 ####
|
1月前
|
Java
JAVA多线程通信:为何wait()与notify()如此重要?
在Java多线程编程中,`wait()` 和 `notify()/notifyAll()` 方法是实现线程间通信的核心机制。它们通过基于锁的方式,使线程在条件不满足时进入休眠状态,并在条件满足时被唤醒,从而确保数据一致性和同步。相比其他通信方式,如忙等待,这些方法更高效灵活。 示例代码展示了如何在生产者-消费者模型中使用这些方法实现线程间的协调和同步。
39 3
|
25天前
|
设计模式 安全 Java
Java 多线程并发编程
Java多线程并发编程是指在Java程序中使用多个线程同时执行,以提高程序的运行效率和响应速度。通过合理管理和调度线程,可以充分利用多核处理器资源,实现高效的任务处理。本内容将介绍Java多线程的基础概念、实现方式及常见问题解决方法。
52 0
|
2月前
|
安全 Java
Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧
【10月更文挑战第20天】Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧,包括避免在循环外调用wait()、优先使用notifyAll()、确保线程安全及处理InterruptedException等,帮助读者更好地掌握这些方法的应用。
25 1
|
2月前
|
安全 Java 开发者
Java多线程中的`wait()`、`notify()`和`notifyAll()`方法,探讨了它们在实现线程间通信和同步中的关键作用
本文深入解析了Java多线程中的`wait()`、`notify()`和`notifyAll()`方法,探讨了它们在实现线程间通信和同步中的关键作用。通过示例代码展示了如何正确使用这些方法,并分享了最佳实践,帮助开发者避免常见陷阱,提高多线程程序的稳定性和效率。
53 1
|
2月前
|
Java
在Java多线程编程中,`wait()` 和 `notify()/notifyAll()` 方法是线程间通信的核心机制。
在Java多线程编程中,`wait()` 和 `notify()/notifyAll()` 方法是线程间通信的核心机制。它们通过基于锁的方式,使线程在条件不满足时进入休眠状态,并在条件成立时被唤醒,从而有效解决数据一致性和同步问题。本文通过对比其他通信机制,展示了 `wait()` 和 `notify()` 的优势,并通过生产者-消费者模型的示例代码,详细说明了其使用方法和重要性。
36 1
|
2月前
|
Java
|
2月前
|
数据挖掘 程序员 调度
探索Python的并发编程:线程与进程的实战应用
【10月更文挑战第4天】 本文深入探讨了Python中实现并发编程的两种主要方式——线程和进程,通过对比分析它们的特点、适用场景以及在实际编程中的应用,为读者提供清晰的指导。同时,文章还介绍了一些高级并发模型如协程,并给出了性能优化的建议。
38 3