生产者消费者模式

简介: 生产者消费者模式

单生产和消费模式

在Java中,负责产生数据的模块的是生产者,负责使用数据的模块是消费者,生产者消费者解决数据的平衡问题,即先有数据才能使用,没有数据时消费者需要等待。

例如:有一个饭店,它有一个厨师和一个服务员,服务员必须等厨师把菜做好了,通知到服务员才能上菜,然后返回继续等待,厨师代表生产者,服务员代表消费者,两个任务在被消费和生产同时运行。

public class Text17 {
    public static void main(String[] args) {
        ValueOP valueOP=new ValueOP();
        //测试生产-消费
        ProductThread productThread=new ProductThread(valueOP);
        ConsumerThread consumerThread=new ConsumerThread(valueOP);
        productThread.start();
        consumerThread.start();
    }
}
//定义线程类模拟生产者
class  ProductThread extends Thread
{
    private ValueOP obj;
    public  ProductThread(ValueOP obj)
    {
        this.obj=obj;
    }
    @Override
    public void run() {
       while (true)
       {
           try {
               obj.SetValue();
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
       }
    }
}
class  ConsumerThread extends Thread
{
    private ValueOP obj;
    public  ConsumerThread(ValueOP obj)
    {
        this.obj=obj;
    }
    @Override
    public void run() {
        while (true)
        {
            try {
                obj.GetValue();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
//SetValue用来设置值,如果不为空就不设置值,如果GetValue为空,就等待不读取,这样前一个设置后一个读取
class  ValueOP
{
    public  String value="";
    //修改值方法
    public  void  SetValue() throws InterruptedException {
        synchronized (this)
        {
            //如果不是空字符串就等待
            if(!value.equalsIgnoreCase(""))
            {
                this.wait();
            }
            //如果是空串就设置value值
            String value=System.currentTimeMillis()+"-"+System.nanoTime();
            System.out.println("set设置的是"+value);
            this.value=value;
            this.notify();
        }
    }
    //读取字段
    public  void GetValue() throws InterruptedException {
        synchronized (this)
        {
            //如果是空字符串就等待
            if(value.equalsIgnoreCase(""))
            {
                this.wait();
            }
            //不是空串就读取,并赋值为空
            System.out.println("get的值是:"+value);
            this.value="";
            this.notify();
        }
    }
}

这样生产与消费交替运行

多生产和消费模式

一个饭店有多个厨师和服务员,当厨师们做菜过快了,导致服务员上菜速度跟不上,导致菜堆积在窗口,这时候要让厨师停止生产,等待服务员把菜上完,再继续做菜。如果服务员们上菜速度太快了,厨师没有做完,多名服务员又想上菜,这时候要等待厨师做菜。

public class Text17 {
    public static void main(String[] args) {
        ValueOP valueOP=new ValueOP();
        //测试生产-消费
        ProductThread productThread=new ProductThread(valueOP);
        ProductThread productThread2=new ProductThread(valueOP);
        ConsumerThread consumerThread=new ConsumerThread(valueOP);
        ConsumerThread consumerThread2=new ConsumerThread(valueOP);
        productThread.start();
        productThread2.start();;
        consumerThread.start();
        consumerThread2.start();
    }
}
//定义线程类模拟生产者
class  ProductThread extends Thread
{
    private ValueOP obj;
    public  ProductThread(ValueOP obj)
    {
        this.obj=obj;
    }
    @Override
    public void run() {
       while (true)
       {
           try {
               obj.SetValue();
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
       }
    }
}
class  ConsumerThread extends Thread
{
    private ValueOP obj;
    public  ConsumerThread(ValueOP obj)
    {
        this.obj=obj;
    }
    @Override
    public void run() {
        while (true)
        {
            try {
                obj.GetValue();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
//SetValue用来设置值,如果不为空就不设置值,如果GetValue为空,就等待不读取,这样前一个设置后一个读取
class  ValueOP
{
    public  String value="";
    //修改值方法
    public  void  SetValue() throws InterruptedException {
        synchronized (this)
        {
            //如果不是空字符串就等待
            while (!value.equalsIgnoreCase(""))
            {
                this.wait();
            }
            //如果是空串就设置value值
            String value=System.currentTimeMillis()+"-"+System.nanoTime();
            System.out.println("set设置的是"+value);
            this.value=value;
            this.notifyAll();
        }
    }
    //读取字段
    public  void GetValue() throws InterruptedException {
        synchronized (this)
        {
            //如果是空字符串就等待
            while (value.equalsIgnoreCase(""))
            {
                this.wait();
            }
            //不是空串就读取,并赋值为空
            System.out.println("get的值是:"+value);
            this.value="";
            this.notifyAll();
        }
    }
}

在多生产消费的环境notify不能保证是唤醒消费者,如果生产者唤醒生产者就会出现假死情况。

通过管道实现线程通信

在java.io包中的pipeStream管道流用于在线程之间传送数据,一个线程发送数据到输出管道,另一个线程从输入管道读取数据,相关的类包括:pidedInputStream和pipedoutStream,pipedReader和pepedWriter

import java.awt.print.PrinterIOException;
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
public class TextWrite {
    public static void main(String[] args) throws IOException {
        PipedInputStream inputStream=new PipedInputStream();
        PipedOutputStream outputStream=new PipedOutputStream();
        inputStream.connect(outputStream);
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    WriteData(outputStream);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    ReadData(inputStream);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
    //定义方法向管道流中写入数据
public  static  void  WriteData(PipedOutputStream outputStream) throws IOException {
        for (int i = 0; i <100 ; i++) {
            String data=""+i;
            outputStream.write(data.getBytes());//把字节数组写入到输出管道中
        }
        outputStream.close();
}
//从管道流中读取数据
    public  static  void  ReadData(PipedInputStream inputStream) throws IOException {
        byte[] bytes=new byte[1024];
        int len=inputStream.read(bytes);//返回读到的字节数,没有读到返回-1
        while (len != -1) {
            System.out.println(new String(bytes, 0, len));
            len=inputStream.read(bytes);//继续从管道读取数据
        }
        inputStream.close();
    }
}


相关文章
|
5月前
|
存储 设计模式 安全
使用BlockingQueue实现生产者-消费者模式
使用BlockingQueue实现生产者-消费者模式
|
7月前
|
C++
C++11实现生产者消费者
C++11实现生产者消费者
80 1
|
安全 Linux 数据安全/隐私保护
【Linux线程同步】生产者消费者模型
【Linux线程同步】生产者消费者模型
133 0
多线程实践-生产者消费者
多线程实践-生产者消费者
79 0
|
消息中间件 Java
《JUC并发编程 - 模式篇》保护性暂停模式 | 顺序控制模式 | 生产者消费者模式 | 两阶段终止模式 | Balking模式 | 享元模式(一)
《JUC并发编程 - 模式篇》保护性暂停模式 | 顺序控制模式 | 生产者消费者模式 | 两阶段终止模式 | Balking模式 | 享元模式
《JUC并发编程 - 模式篇》保护性暂停模式 | 顺序控制模式 | 生产者消费者模式 | 两阶段终止模式 | Balking模式 | 享元模式(一)
|
缓存 监控 安全
《JUC并发编程 - 模式篇》保护性暂停模式 | 顺序控制模式 | 生产者消费者模式 | 两阶段终止模式 | Balking模式 | 享元模式(二)
《JUC并发编程 - 模式篇》保护性暂停模式 | 顺序控制模式 | 生产者消费者模式 | 两阶段终止模式 | Balking模式 | 享元模式
《JUC并发编程 - 模式篇》保护性暂停模式 | 顺序控制模式 | 生产者消费者模式 | 两阶段终止模式 | Balking模式 | 享元模式(二)
|
缓存 Java 调度
线程协作 生产者消费者模式
线程协作 生产者消费者模式
112 0
线程协作  生产者消费者模式
|
Java 开发者
解决生产者-消费者同步问题|学习笔记
快速学习 解决生产者-消费者同步问题
157 0
解决生产者-消费者同步问题|学习笔记
基于阻塞队列实现的简单生产者-消费者模式
基于阻塞队列实现的简单生产者-消费者模式
115 0