使用wait/notify实现生产者/消费者模式

简介: 上一篇文章:多线程编程之线程间通信机制:wait/notify机制重点讲了在java多线程 编程中协调线程间通信的wait/notify机制,它有力的保证了线程间通信的安全性以及便利性。这篇文章就来说说如何使用前面说到的wait/notify机制实现生产者/消费者模式。

1 前言


上一篇文章多线程编程之线程间通信机制:wait/notify机制重点讲了在java多线程 编程中协调线程间通信的wait/notify机制,它有力的保证了线程间通信的安全性以及便利性。这篇文章就来说说如何使用前面说到的wait/notify机制实现生产者/消费者模式。


什么是生产者消费者模型?


生产者消费者模型具体来讲,就是在一个系统中,存在生产者和消费者两种角色,他们通过内存缓冲区进行通信,生产者生产消费者需要的资料,消费者把资料做成产品。生产消费者模式如下图。


cac900dd72d94207aa83afd272b48f5c~tplv-k3u1fbpfcp-zoom-in-crop-mark_1304_0_0_0.webp.jpg


生产者是一堆线程,消费者是另一堆线程,内存缓冲区可以使用List数组队列,数据类型只需要定义一个简单的类就好。关键是如何处理多线程之间的协作。这其实也是多线程通信的一个范例。


在这个模型中,最关键就是内存缓冲区为空的时候消费者必须等待,而内存缓冲区满的时候,生产者必须等待。其他时候可以是个动态平衡。值得注意的是多线程对临界区资源的操作时候必须保证在读写中只能存在一个线程,所以需要设计锁的策略。

这篇文章就通过前面说到的wait/notify机制实现生产者/消费者模式。


2 正文


前面的文章提到了饭馆里厨师和服务员的例子。在一个饭馆里通常都有好多厨师以及好多服务员,我们可以把厨师称为生产者,把服务员称为消费者(厨师生产的食物被服务员拿走(消费)),我们知道厨师和服务员其实是不直接打交道的,而是在厨师做好菜之后放到窗口,服务员从窗口直接把菜端走给客人就好了,这样省去了生产者和消费者之间的沟通成本,从而极大的提升工作效率。从多线程编程代码的角度来看,每一个厨师就相当于一个生产者线程,每一个服务员都相当于一个消费者线程,而放菜的窗口就相当于一个缓冲队列,生产者线程不断把生产好的东西放到缓冲队列里,消费者线程不断从缓冲队列里取东西。模型图就如上所示。


那么生产者/消费者模式的实现有以下几种情况:


1、一个生产者与一个消费者


第一种情况就是一个生产者和一个消费者,就类似于饭店只有一个厨师炒菜和一个服务员上菜。那么代码模拟情况如下:


package com.jiangxia.chap4;
//一个生产者与一个消费者
public class Demo01 {
    public static void main(String[] args) throws IllegalMonitorStateException {
        Object lock = new Object();
        Thread t1 = new Demo01cook(lock);
        t1.start();
        Thread t2 = new Demo01Waiter(lock);
        t2.start();
    }
}
//食物:用来表示有没有菜
class Food {
    public static String food="";
}
// 生产者(厨师)
class Demo01cook extends Thread {
    //模拟送餐窗口队列
    private Object lock;
    public Demo01cook(Object lock) {
        this.lock = lock;
    }
    @Override
    public void run() {
        try{
            while(true){
                //线程休眠模拟炒菜时间
                Thread.sleep(3000);
                synchronized (lock){
                    //如果传菜窗口不为空,即有菜
                    if(!Food.food.equals("")){
                        lock.wait();
                    }
                    System.out.println(System.currentTimeMillis() + "_" + System.nanoTime()+":厨师炒好菜了!等待服务员上菜。。。。");
                    lock.notify();
                }
            }
        }catch (InterruptedException e){
            e.printStackTrace();
        }
    }
}
// 消费者(服务员)
class Demo01Waiter extends Thread{
    //模拟送餐窗口队列
    private Object lock;
    public Demo01Waiter(Object lock) {
        this.lock = lock;
    }
    @Override
    public void run() {
        try{
            while(true){
                synchronized (lock){
                    if ("".equals(Food.food)){
                        lock.wait();
                    }
                    System.out.println(System.currentTimeMillis() + "_" + System.nanoTime()+":服务员没有菜可以送了,等待厨师炒菜。。。。。");
                    Food.food = "";
                    lock.notify();
                    //线程休眠模拟送菜时间
                    Thread.sleep(2000);
                }
            }
        }catch (InterruptedException e){
            e.printStackTrace();
        }
    }
}
复制代码


结果如下:


541ab1f8d8544c66bc47f764faa2ac54~tplv-k3u1fbpfcp-zoom-in-crop-mark_1304_0_0_0.webp.jpg


通过上面结果可以看出,生产者消费者有序生产消费。


二、一生产者多消费者


package com.jiangxia.chap4;
import java.util.ArrayList;
import java.util.List;
/**
 * 一个生产者 多个消费者
 */
public class Demo02 {
    public static void main(String[] args) {
        Food02 food02 = new Food02();
        Thread cook = new Cook02(food02);
        cook.setName("厨师");
        cook.start();
        Thread[] waiters = new Thread[5];
        for (int i = 0; i < 5; i++) {
            waiters[i] = new Waiter02(food02);
            waiters[i].setName("服务员"+(char)('A'+i));
            waiters[i].start();
        }
    }
}
class Food02{
    private List<String> foodlist = new ArrayList<>();
    synchronized public void addin(String food){
        try{
            if(foodlist.size()==1){
                System.out.println("厨师:"+Thread.currentThread().getName()+"等待中");
                this.wait();
            }
            foodlist.add(food);
            System.out.println(Thread.currentThread().getName() + ":生产食物" + food);
            System.out.println(Thread.currentThread().getName() + ":还有" +  foodlist.size() + "个食物");
            this.notify();
        }catch (InterruptedException e){
            e.printStackTrace();
        }
    }
    synchronized public String getout(){
        String returnfood = null;
        try {
            /**
             * 当没有食物时
             */
            while (foodlist.size()==0){
                System.out.println("服务员:"+Thread.currentThread().getName()+"等待中");
                this.wait();
            }
            returnfood = foodlist.get(0);
            foodlist.remove(0);
            System.out.println("服务员:"+Thread.currentThread().getName() + ":取走" + returnfood);
            System.out.println(Thread.currentThread().getName() + ":还有" + foodlist.size() + "个食物");
//            this.notify();
            this.notifyAll();
        }catch (InterruptedException e){
            e.printStackTrace();
        }
        return returnfood;
    }
}
/**
 * 生产者
 */
class Cook02 extends Thread{
    private Food02 food02;
    public Cook02(Food02 food02) {
        this.food02 = food02;
    }
    @Override
    public void run() {
        while (true){
            food02.addin(Math.random()+"");
        }
    }
}
/**
 * 消费者
 */
class Waiter02 extends Thread{
    private Food02 food02;
    public Waiter02(Food02 food02) {
        this.food02 = food02;
    }
    @Override
    public void run() {
        while (true){
            food02.getout();
        }
    }
}
复制代码


部分结果如下:


04788f9e756c4d088b0957b108d09f97~tplv-k3u1fbpfcp-zoom-in-crop-mark_1304_0_0_0.webp.jpg


在一个生产者多个消费者模型中:


  1. 消费者在判断集合是否有数据时不能使用if,因为有可能会重复唤醒的还是消费者,这里需要使用while语句进行判断,保证集合中一定是有数据的。


  1. 消费者唤醒线程不使用notify,因为notify是随机唤醒一个线程,万一重复唤醒消费者就没有办法重新生产新的数据,所以这里需要使用notifyAll方法。


三、多个生产者一个消费者


多个生产者与一个消费者的代码与上面一个生产者多个消费者代码类似:


package com.jiangxia.chap4;
import java.util.ArrayList;
import java.util.List;
/**
 * 多生产者一消费者
 */
public class Demo03 {
    public static void main(String[] args) {
        Food03 food03 = new Food03();
        /**
         * 多个生产者
         */
        Thread[] cookies = new Thread[5];
        for (int i = 0; i < cookies.length; i++) {
            cookies[i] = new Cook03(food03);
            cookies[i].setName("厨师" + (char)('A' + i));
            cookies[i].start();
        }
        Thread waiter = new Waiter03(food03);
        waiter.setName("服务员");
        waiter.start();
    }
}
class Food03{
    private List<String> foodlist = new ArrayList<>();
    synchronized public void addin(String food){
        try{
            if(foodlist.size()==1){
                System.out.println("厨师:"+Thread.currentThread().getName()+"等待中");
                this.wait();
            }
            foodlist.add(food);
            System.out.println(Thread.currentThread().getName() + ":生产食物" + food);
            System.out.println(Thread.currentThread().getName() + ":还有" +  foodlist.size() + "个食物");
            this.notify();
        }catch (InterruptedException e){
            e.printStackTrace();
        }
    }
    synchronized public String getout(){
        String returnfood = null;
        try {
            /**
             * 当没有食物时
             */
            while (foodlist.size()==0){
                System.out.println("服务员:"+Thread.currentThread().getName()+"等待中");
                this.wait();
            }
            returnfood = foodlist.get(0);
            foodlist.remove(0);
            System.out.println("服务员:"+Thread.currentThread().getName() + ":取走" + returnfood);
            System.out.println(Thread.currentThread().getName() + ":还有" + foodlist.size() + "个食物");
//            this.notify();
            this.notifyAll();
        }catch (InterruptedException e){
            e.printStackTrace();
        }
        return returnfood;
    }
}
/**
 * 生产者
 */
class Cook03 extends Thread{
    private Food03 food03;
    public Cook03(Food03 food03) {
        this.food03 = food03;
    }
    @Override
    public void run() {
        while (true){
            food03.addin(Math.random()+"");
        }
    }
}
/**
 * 消费者
 */
class Waiter03 extends Thread{
    private Food03 food03;
    public Waiter03(Food03 food03) {
        this.food03 = food03;
    }
    @Override
    public void run() {
        while (true){
            food03.getout();
        }
    }
}
复制代码


部分结果如下:


409ce0976f864f70beeba6905f30fb6d~tplv-k3u1fbpfcp-zoom-in-crop-mark_1304_0_0_0.webp.jpg


多个生产者一个消费者的情况下:


1、需要不断的判断集合中是否有数据,如果有就重新等待,直到没有数据 时再做添加


2、生产者不能使用notify唤醒其它线程,有可能造成死锁,应该使用notifyAll方法把所有的线程都唤醒。


四、多个生产者多个消费者


最后一种就是多个生产者多个消费者,代码如下:


package com.jiangxia.chap4;
import java.util.ArrayList;
import java.util.List;
/**
 * 多生产者 多消费者
 */
public class Demo04 {
    public static void main(String[] args) {
        Food04 food04 = new Food04();
        /**
         * 多个生产者 多个消费者
         */
        int size = 5;
        Thread[] cookies = new Thread[size];
        Thread[] waiters = new Thread[size];
        for (int i = 0; i < size; i++) {
            char c = (char)('A' + i);
            cookies[i] = new Cook04(food04);
            cookies[i].setName("厨师" + c);
            cookies[i].start();
            waiters[i] = new Waiter04(food04);
            waiters[i].setName("服务员" + c);
            waiters[i].start();
        }
    }
}
class Food04{
    private List<String> foodlist = new ArrayList<>();
    synchronized public void addin(String food){
        try{
            if(foodlist.size()==1){
                System.out.println("厨师:"+Thread.currentThread().getName()+"等待中");
                this.wait();
            }
            foodlist.add(food);
            System.out.println(Thread.currentThread().getName() + ":生产食物" + food);
            System.out.println(Thread.currentThread().getName() + ":还有" +  foodlist.size() + "个食物");
            this.notify();
        }catch (InterruptedException e){
            e.printStackTrace();
        }
    }
    synchronized public String getout(){
        String returnfood = null;
        try {
            /**
             * 当没有食物时
             */
            while (foodlist.size()==0){
                System.out.println("服务员:"+Thread.currentThread().getName()+"等待中");
                this.wait();
            }
            returnfood = foodlist.get(0);
            foodlist.remove(0);
            System.out.println("服务员:"+Thread.currentThread().getName() + ":取走" + returnfood);
            System.out.println(Thread.currentThread().getName() + ":还有" + foodlist.size() + "个食物");
//            this.notify();
            this.notifyAll();
        }catch (InterruptedException e){
            e.printStackTrace();
        }
        return returnfood;
    }
}
/**
 * 生产者
 */
class Cook04 extends Thread{
    private Food04 food04;
    public Cook04(Food04 food04) {
        this.food04 = food04;
    }
    @Override
    public void run() {
        while (true){
            food04.addin(Math.random()+"");
        }
    }
}
/**
 * 消费者
 */
class Waiter04 extends Thread{
    private Food04 food04;
    public Waiter04(Food04 food04) {
        this.food04 = food04;
    }
    @Override
    public void run() {
        while (true){
            food04.getout();
        }
    }
}
复制代码


部分结果如下:


8432b433497c424aa1b17891fe2c22bb~tplv-k3u1fbpfcp-zoom-in-crop-mark_1304_0_0_0.webp.jpg


以上就是wait/notify机制的一个现实应用:生产者-消费者模式的一个简介。

目录
相关文章
|
消息中间件 存储 负载均衡
Rabbitmq direct模式保证一个队列只对应一个消费者
Rabbitmq direct模式保证一个队列只对应一个消费者
351 0
|
7月前
|
安全 Java 容器
线程池,定时器以及阻塞队列(生产者/消费者模型)
线程池,定时器以及阻塞队列(生产者/消费者模型)
55 0
|
安全 API C++
c++生产者和消费者线程循环
线程安全-生产者消费者模型
103 1
生产者消费者问题(生产者和消费者分别阻塞于不同的锁)
生产者消费者问题(生产者和消费者分别阻塞于不同的锁)
|
消息中间件 JSON 运维
生产RabbitMQ队列阻塞该如何处理?
那天我和同事一起吃完晚饭回公司加班,然后就群里就有人@我说xxx商户说收不到推送,一开始觉得没啥。我第一反应是不是极光没注册上,就让客服通知商户,重新登录下试试。这边打开极光推送的后台进行检查。后面反应收不到推送的越来越多,我就知道这事情不简单。
|
数据可视化
高并发编程-线程通信_使用wait和notify进行线程间的通信2_多生产者多消费者导致程序假死原因分析
高并发编程-线程通信_使用wait和notify进行线程间的通信2_多生产者多消费者导致程序假死原因分析
57 0
|
消息中间件 Java Spring
【RabbitMQ】Basic Queue 简单队列模型与WorkQueue
【RabbitMQ】Basic Queue 简单队列模型与WorkQueue
121 0
【RabbitMQ】Basic Queue 简单队列模型与WorkQueue
|
安全 数据处理
线程中的生产者和消费者模式
线程中的生产者和消费者模式
132 0
线程中的生产者和消费者模式
|
消息中间件 缓存 负载均衡
RocketMQ中生产者发消息前为啥一定要调用start()方法?
RocketMQ中生产者发消息前为啥一定要调用start()方法?
217 0
RocketMQ中生产者发消息前为啥一定要调用start()方法?
|
消息中间件 存储 监控
Kafka Producer 异步发送消息居然也会阻塞?
Kafka 一直以来都以高吞吐量的特性而家喻户晓,就在上周,在一个性能监控项目中,需要使用到 Kafka 传输海量消息,在这过程中遇到了一个 Kafka Producer 异步发送消息会被阻塞的问题,导致生产端发送耗时很大。 是的,你没听错,Kafka Producer 异步发送消息也会发生阻塞现象,那究竟是怎么回事呢?
1028 0
Kafka Producer 异步发送消息居然也会阻塞?