使用wait/notify实现生产者/消费者模式

简介: 上一篇文章:多线程编程之线程间通信机制:wait/notify机制重点讲了在java多线程 编程中协调线程间通信的wait/notify机制,它有力的保证了线程间通信的安全性以及便利性。这篇文章就来说说如何使用前面说到的wait/notify机制实现生产者/消费者模式。

1 前言


上一篇文章多线程编程之线程间通信机制:wait/notify机制重点讲了在java多线程 编程中协调线程间通信的wait/notify机制,它有力的保证了线程间通信的安全性以及便利性。这篇文章就来说说如何使用前面说到的wait/notify机制实现生产者/消费者模式。


什么是生产者消费者模型?


生产者消费者模型具体来讲,就是在一个系统中,存在生产者和消费者两种角色,他们通过内存缓冲区进行通信,生产者生产消费者需要的资料,消费者把资料做成产品。生产消费者模式如下图。


cac900dd72d94207aa83afd272b48f5c~tplv-k3u1fbpfcp-zoom-in-crop-mark_1304_0_0_0.webp.jpg


生产者是一堆线程,消费者是另一堆线程,内存缓冲区可以使用List数组队列,数据类型只需要定义一个简单的类就好。关键是如何处理多线程之间的协作。这其实也是多线程通信的一个范例。


在这个模型中,最关键就是内存缓冲区为空的时候消费者必须等待,而内存缓冲区满的时候,生产者必须等待。其他时候可以是个动态平衡。值得注意的是多线程对临界区资源的操作时候必须保证在读写中只能存在一个线程,所以需要设计锁的策略。

这篇文章就通过前面说到的wait/notify机制实现生产者/消费者模式。


2 正文


前面的文章提到了饭馆里厨师和服务员的例子。在一个饭馆里通常都有好多厨师以及好多服务员,我们可以把厨师称为生产者,把服务员称为消费者(厨师生产的食物被服务员拿走(消费)),我们知道厨师和服务员其实是不直接打交道的,而是在厨师做好菜之后放到窗口,服务员从窗口直接把菜端走给客人就好了,这样省去了生产者和消费者之间的沟通成本,从而极大的提升工作效率。从多线程编程代码的角度来看,每一个厨师就相当于一个生产者线程,每一个服务员都相当于一个消费者线程,而放菜的窗口就相当于一个缓冲队列,生产者线程不断把生产好的东西放到缓冲队列里,消费者线程不断从缓冲队列里取东西。模型图就如上所示。


那么生产者/消费者模式的实现有以下几种情况:


1、一个生产者与一个消费者


第一种情况就是一个生产者和一个消费者,就类似于饭店只有一个厨师炒菜和一个服务员上菜。那么代码模拟情况如下:


package com.jiangxia.chap4;
//一个生产者与一个消费者
public class Demo01 {
    public static void main(String[] args) throws IllegalMonitorStateException {
        Object lock = new Object();
        Thread t1 = new Demo01cook(lock);
        t1.start();
        Thread t2 = new Demo01Waiter(lock);
        t2.start();
    }
}
//食物:用来表示有没有菜
class Food {
    public static String food="";
}
// 生产者(厨师)
class Demo01cook extends Thread {
    //模拟送餐窗口队列
    private Object lock;
    public Demo01cook(Object lock) {
        this.lock = lock;
    }
    @Override
    public void run() {
        try{
            while(true){
                //线程休眠模拟炒菜时间
                Thread.sleep(3000);
                synchronized (lock){
                    //如果传菜窗口不为空,即有菜
                    if(!Food.food.equals("")){
                        lock.wait();
                    }
                    System.out.println(System.currentTimeMillis() + "_" + System.nanoTime()+":厨师炒好菜了!等待服务员上菜。。。。");
                    lock.notify();
                }
            }
        }catch (InterruptedException e){
            e.printStackTrace();
        }
    }
}
// 消费者(服务员)
class Demo01Waiter extends Thread{
    //模拟送餐窗口队列
    private Object lock;
    public Demo01Waiter(Object lock) {
        this.lock = lock;
    }
    @Override
    public void run() {
        try{
            while(true){
                synchronized (lock){
                    if ("".equals(Food.food)){
                        lock.wait();
                    }
                    System.out.println(System.currentTimeMillis() + "_" + System.nanoTime()+":服务员没有菜可以送了,等待厨师炒菜。。。。。");
                    Food.food = "";
                    lock.notify();
                    //线程休眠模拟送菜时间
                    Thread.sleep(2000);
                }
            }
        }catch (InterruptedException e){
            e.printStackTrace();
        }
    }
}
复制代码


结果如下:


541ab1f8d8544c66bc47f764faa2ac54~tplv-k3u1fbpfcp-zoom-in-crop-mark_1304_0_0_0.webp.jpg


通过上面结果可以看出,生产者消费者有序生产消费。


二、一生产者多消费者


package com.jiangxia.chap4;
import java.util.ArrayList;
import java.util.List;
/**
 * 一个生产者 多个消费者
 */
public class Demo02 {
    public static void main(String[] args) {
        Food02 food02 = new Food02();
        Thread cook = new Cook02(food02);
        cook.setName("厨师");
        cook.start();
        Thread[] waiters = new Thread[5];
        for (int i = 0; i < 5; i++) {
            waiters[i] = new Waiter02(food02);
            waiters[i].setName("服务员"+(char)('A'+i));
            waiters[i].start();
        }
    }
}
class Food02{
    private List<String> foodlist = new ArrayList<>();
    synchronized public void addin(String food){
        try{
            if(foodlist.size()==1){
                System.out.println("厨师:"+Thread.currentThread().getName()+"等待中");
                this.wait();
            }
            foodlist.add(food);
            System.out.println(Thread.currentThread().getName() + ":生产食物" + food);
            System.out.println(Thread.currentThread().getName() + ":还有" +  foodlist.size() + "个食物");
            this.notify();
        }catch (InterruptedException e){
            e.printStackTrace();
        }
    }
    synchronized public String getout(){
        String returnfood = null;
        try {
            /**
             * 当没有食物时
             */
            while (foodlist.size()==0){
                System.out.println("服务员:"+Thread.currentThread().getName()+"等待中");
                this.wait();
            }
            returnfood = foodlist.get(0);
            foodlist.remove(0);
            System.out.println("服务员:"+Thread.currentThread().getName() + ":取走" + returnfood);
            System.out.println(Thread.currentThread().getName() + ":还有" + foodlist.size() + "个食物");
//            this.notify();
            this.notifyAll();
        }catch (InterruptedException e){
            e.printStackTrace();
        }
        return returnfood;
    }
}
/**
 * 生产者
 */
class Cook02 extends Thread{
    private Food02 food02;
    public Cook02(Food02 food02) {
        this.food02 = food02;
    }
    @Override
    public void run() {
        while (true){
            food02.addin(Math.random()+"");
        }
    }
}
/**
 * 消费者
 */
class Waiter02 extends Thread{
    private Food02 food02;
    public Waiter02(Food02 food02) {
        this.food02 = food02;
    }
    @Override
    public void run() {
        while (true){
            food02.getout();
        }
    }
}
复制代码


部分结果如下:


04788f9e756c4d088b0957b108d09f97~tplv-k3u1fbpfcp-zoom-in-crop-mark_1304_0_0_0.webp.jpg


在一个生产者多个消费者模型中:


  1. 消费者在判断集合是否有数据时不能使用if,因为有可能会重复唤醒的还是消费者,这里需要使用while语句进行判断,保证集合中一定是有数据的。


  1. 消费者唤醒线程不使用notify,因为notify是随机唤醒一个线程,万一重复唤醒消费者就没有办法重新生产新的数据,所以这里需要使用notifyAll方法。


三、多个生产者一个消费者


多个生产者与一个消费者的代码与上面一个生产者多个消费者代码类似:


package com.jiangxia.chap4;
import java.util.ArrayList;
import java.util.List;
/**
 * 多生产者一消费者
 */
public class Demo03 {
    public static void main(String[] args) {
        Food03 food03 = new Food03();
        /**
         * 多个生产者
         */
        Thread[] cookies = new Thread[5];
        for (int i = 0; i < cookies.length; i++) {
            cookies[i] = new Cook03(food03);
            cookies[i].setName("厨师" + (char)('A' + i));
            cookies[i].start();
        }
        Thread waiter = new Waiter03(food03);
        waiter.setName("服务员");
        waiter.start();
    }
}
class Food03{
    private List<String> foodlist = new ArrayList<>();
    synchronized public void addin(String food){
        try{
            if(foodlist.size()==1){
                System.out.println("厨师:"+Thread.currentThread().getName()+"等待中");
                this.wait();
            }
            foodlist.add(food);
            System.out.println(Thread.currentThread().getName() + ":生产食物" + food);
            System.out.println(Thread.currentThread().getName() + ":还有" +  foodlist.size() + "个食物");
            this.notify();
        }catch (InterruptedException e){
            e.printStackTrace();
        }
    }
    synchronized public String getout(){
        String returnfood = null;
        try {
            /**
             * 当没有食物时
             */
            while (foodlist.size()==0){
                System.out.println("服务员:"+Thread.currentThread().getName()+"等待中");
                this.wait();
            }
            returnfood = foodlist.get(0);
            foodlist.remove(0);
            System.out.println("服务员:"+Thread.currentThread().getName() + ":取走" + returnfood);
            System.out.println(Thread.currentThread().getName() + ":还有" + foodlist.size() + "个食物");
//            this.notify();
            this.notifyAll();
        }catch (InterruptedException e){
            e.printStackTrace();
        }
        return returnfood;
    }
}
/**
 * 生产者
 */
class Cook03 extends Thread{
    private Food03 food03;
    public Cook03(Food03 food03) {
        this.food03 = food03;
    }
    @Override
    public void run() {
        while (true){
            food03.addin(Math.random()+"");
        }
    }
}
/**
 * 消费者
 */
class Waiter03 extends Thread{
    private Food03 food03;
    public Waiter03(Food03 food03) {
        this.food03 = food03;
    }
    @Override
    public void run() {
        while (true){
            food03.getout();
        }
    }
}
复制代码


部分结果如下:


409ce0976f864f70beeba6905f30fb6d~tplv-k3u1fbpfcp-zoom-in-crop-mark_1304_0_0_0.webp.jpg


多个生产者一个消费者的情况下:


1、需要不断的判断集合中是否有数据,如果有就重新等待,直到没有数据 时再做添加


2、生产者不能使用notify唤醒其它线程,有可能造成死锁,应该使用notifyAll方法把所有的线程都唤醒。


四、多个生产者多个消费者


最后一种就是多个生产者多个消费者,代码如下:


package com.jiangxia.chap4;
import java.util.ArrayList;
import java.util.List;
/**
 * 多生产者 多消费者
 */
public class Demo04 {
    public static void main(String[] args) {
        Food04 food04 = new Food04();
        /**
         * 多个生产者 多个消费者
         */
        int size = 5;
        Thread[] cookies = new Thread[size];
        Thread[] waiters = new Thread[size];
        for (int i = 0; i < size; i++) {
            char c = (char)('A' + i);
            cookies[i] = new Cook04(food04);
            cookies[i].setName("厨师" + c);
            cookies[i].start();
            waiters[i] = new Waiter04(food04);
            waiters[i].setName("服务员" + c);
            waiters[i].start();
        }
    }
}
class Food04{
    private List<String> foodlist = new ArrayList<>();
    synchronized public void addin(String food){
        try{
            if(foodlist.size()==1){
                System.out.println("厨师:"+Thread.currentThread().getName()+"等待中");
                this.wait();
            }
            foodlist.add(food);
            System.out.println(Thread.currentThread().getName() + ":生产食物" + food);
            System.out.println(Thread.currentThread().getName() + ":还有" +  foodlist.size() + "个食物");
            this.notify();
        }catch (InterruptedException e){
            e.printStackTrace();
        }
    }
    synchronized public String getout(){
        String returnfood = null;
        try {
            /**
             * 当没有食物时
             */
            while (foodlist.size()==0){
                System.out.println("服务员:"+Thread.currentThread().getName()+"等待中");
                this.wait();
            }
            returnfood = foodlist.get(0);
            foodlist.remove(0);
            System.out.println("服务员:"+Thread.currentThread().getName() + ":取走" + returnfood);
            System.out.println(Thread.currentThread().getName() + ":还有" + foodlist.size() + "个食物");
//            this.notify();
            this.notifyAll();
        }catch (InterruptedException e){
            e.printStackTrace();
        }
        return returnfood;
    }
}
/**
 * 生产者
 */
class Cook04 extends Thread{
    private Food04 food04;
    public Cook04(Food04 food04) {
        this.food04 = food04;
    }
    @Override
    public void run() {
        while (true){
            food04.addin(Math.random()+"");
        }
    }
}
/**
 * 消费者
 */
class Waiter04 extends Thread{
    private Food04 food04;
    public Waiter04(Food04 food04) {
        this.food04 = food04;
    }
    @Override
    public void run() {
        while (true){
            food04.getout();
        }
    }
}
复制代码


部分结果如下:


8432b433497c424aa1b17891fe2c22bb~tplv-k3u1fbpfcp-zoom-in-crop-mark_1304_0_0_0.webp.jpg


以上就是wait/notify机制的一个现实应用:生产者-消费者模式的一个简介。

目录
相关文章
|
存储 缓存 负载均衡
【2022持续更新】大数据最全知识点整理-HBase篇
【2022持续更新】大数据最全知识点整理-HBase篇
1493 0
【2022持续更新】大数据最全知识点整理-HBase篇
|
canal 缓存 NoSQL
面试官,如何保证缓存与数据库的数据一致性
面试官,如何保证缓存与数据库的数据一致性
|
3月前
|
消息中间件 canal 存储
如何解决并发环境下双写不一致的问题?
在并发环境下,“双写不一致”指数据库与缓存因操作顺序或执行时机差异导致数据不匹配。解决核心是保证操作的原子性、顺序性或最终一致性。常见方案包括延迟双删、加锁机制、binlog同步、版本号机制和读写锁分离,分别适用于不同一致性要求和并发场景,需根据业务需求综合选择。
255 0
|
SQL 分布式计算 DataWorks
MaxCompute产品使用合集之DataWorks体验案例绑定如何绑定到正确的maxcomputer引擎上
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
存储 搜索推荐 大数据
大数据在医疗领域的应用
大数据在医疗领域有广泛应用,包括电子病历的数字化管理和共享,提升医疗服务效率与协同性;通过数据分析支持医疗决策,制定个性化治疗方案;预测疾病风险并提供预防措施;在精准医疗中深度分析患者基因组信息,实现高效治疗;在药物研发中,加速疗效和副作用发现,提高临床试验效率。此外,在金融领域,大数据的“4V”特性助力业务决策前瞻性,被广泛应用于银行、证券和保险的风险评估、市场分析及个性化服务中,提升运营效率和客户满意度。
1349 6
|
Java 数据库连接 数据库
Spring基础3——AOP,事务管理
AOP简介、入门案例、工作流程、切入点表达式、环绕通知、通知获取参数或返回值或异常、事务管理
Spring基础3——AOP,事务管理
|
监控 算法 自动驾驶
目标检测算法:从理论到实践的深度探索
【7月更文第18天】目标检测,作为计算机视觉领域的核心任务之一,旨在识别图像或视频中特定对象的位置及其类别。这一技术在自动驾驶、视频监控、医疗影像分析等多个领域发挥着至关重要的作用。本文将深入浅出地介绍目标检测的基本概念、主流算法,并通过一个实际的代码示例,带您领略YOLOv5这一高效目标检测模型的魅力。
1249 11
|
容器
OOP 中的组合、聚合和关联
【8月更文挑战第22天】
219 0
|
存储 缓存 安全
|
存储 数据库
zookeeper 集群环境搭建及集群选举及数据同步机制
zookeeper 集群环境搭建及集群选举及数据同步机制
370 2