线程间的协作(2)——生产者与消费者模式

简介: 参考资料《Java并发编程的艺术》《Java编程思想》

1.何为生产者与消费者

    在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。


import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * @ClassName:Restraurant
 * @Description:何为生产者与消费者
 * @author: 
 * @date:2018年5月3日
 */
public class Restraurant {
	Meal m=null;
	Chef chef=new Chef(this);
	WaitPerson wait=new WaitPerson(this);
	ExecutorService service=Executors.newCachedThreadPool();
	public Restraurant() {
		service.execute(chef);
		service.execute(wait);
	}
	public static void main(String[] args) {
		new Restraurant();
	}
}
/**
 * @ClassName:Meal
 * @Description:生产者生成的数据
 * @author: 
 * @date:2018年5月3日
 */
class Meal{
	private final int orderNum;//食物订单编号
	public Meal(int num){
		orderNum=num;
	}
	public String toString(){
		return "Meal"+orderNum;
	}
}
/**
 * @ClassName:Chef
 * @Description:厨师类,及生产者
 * @author: 
 * @date:2018年5月3日
 */
class Chef implements Runnable{
	Restraurant r;
	int count=0;
	public Chef(Restraurant r) {
		this.r=r;
	}
	@Override
	public void run() {
		try{
			while(!Thread.interrupted()){
				synchronized (this) {
					while(r.m!=null){
						System.out.println("厨师等待中");
						wait();//等待服务员取餐
					}
				}
				if(count++==10){
					System.out.println("今日已售完");
					r.service.shutdownNow();
				}
				System.out.println("订单完成,服务员取餐");
				synchronized (r.wait) {
					r.m=new Meal(count);
					r.wait.notifyAll();
					
				}
				TimeUnit.SECONDS.sleep(1);
			}
		}catch (InterruptedException e) {
			System.out.println("生产者线程强制中断");
		}
		
	}
}
/**
 * @ClassName:WaitPerson
 * @Description:服务员类,即消费者
 * @author: 
 * @date:2018年5月3日
 */
class WaitPerson implements Runnable{
	Restraurant r;
	public WaitPerson(Restraurant r) {
		this.r=r;
	}
	@Override
	public void run() {
		try {
			while (!Thread.interrupted()) {
				synchronized (this) {
					while (r.m == null) {
						System.out.println("服务员等待中");
						wait();// 等待厨师生成食物
					}
				}

				System.out.println("服务员以取餐" + r.m);
				synchronized (r.chef) {
					r.m = null;
					r.chef.notifyAll();
				}
			}
		} catch (InterruptedException e) {
			System.out.println("消费者线程强制中断");
		}
		
	}
	
}

2.生产者与消费者模式

    1)产生原因:在多线程开发 中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理 完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须 等待生产者。wait与notify方法以一种非常低级的方式解决了任务互相通知的问题,即每次交互都要进行一次握手,极大影响的效率以及性能,为了解决这种生产消费能力不均衡的问题,便有了生产者和消费者模式。

    2)原理:生产者和消费者模式是通过一个容器(比如同步阻塞队列)来解决生产者和消费者的强耦合问题。生产者和消 费者彼此之间不直接通信,而是通过阻塞队列来进行通信,所以生产者生产完数据之后不用 等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取, 阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。 这个阻塞队列就是用来给生产者和消费者解耦的。java.util.concurrent.BlockingQueue接口提供了这个队列,通常使用其实现子类ArrayBlockingQueue,LinkedBlockingQueue。当消费者任务试图从同步队列中获取对象,如果队列为空时,那么队列则会挂起消费者任务,并且当拥有足够多的元素可用时才会恢复消费者任务。

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public class UseBlockingQueue {
	public static void main(String[] args) throws InterruptedException {
		LinkedBlockingQueue<Toast> dry=new LinkedBlockingQueue<Toast>(),
				butter=new LinkedBlockingQueue<Toast>(),
				jam=new LinkedBlockingQueue<Toast>(),
				con=new LinkedBlockingQueue<Toast>();
		ExecutorService exec=Executors.newCachedThreadPool();
		exec.execute(new MakeToast(dry));//制作初始吐司任务
		exec.execute(new Butter(dry,butter));//吐司抹黄油任务
		exec.execute(new Jam(butter,jam));//吐司抹果酱任务
		exec.execute(new Consumer(jam));//消费者任务,食用吐司
		TimeUnit.SECONDS.sleep(5);
		exec.shutdownNow();
	}
}
class Toast{
	private int status;//吐司状态:0代表制作吐司,1代表抹黄油,2代表向抹了黄油的吐司抹果酱
	private final int id;
	public Toast(int id1) {
		id=id1;
	}
	public void butter(){
		status=1;
	};
	public void jam(){
		status=2;
	}
	public int getStatus(){
		return status;
	}
	public int getId(){
		return id;
	}
	public String toString(){
		return "toast "+id+":"+status;
	}
}
/**
 * @Description:制作初始吐司
 */
class MakeToast implements Runnable{
	private LinkedBlockingQueue<Toast> queue=new LinkedBlockingQueue<Toast>();
	private int count=0;
	public MakeToast(LinkedBlockingQueue<Toast> q) {
		queue=q;
	}
	@Override
	public void run() {
		try{
			while(!Thread.interrupted()){
				Thread.sleep(1000);//制作时间
				Toast t=new Toast(count);
				System.out.println(t);
				queue.put(t);//添加到同步队列
				count++;
			}
		}catch (InterruptedException e) {
			System.out.println("make process interrupted");
		}
		System.out.println("make process off");
	}
}
/**
 * @Description:涂抹黄油
 */
class Butter implements Runnable{
	private LinkedBlockingQueue<Toast> queue1,queue2;//未加料吐司队列,抹黄油后吐司队列
	public Butter(LinkedBlockingQueue<Toast> q1,LinkedBlockingQueue<Toast>q2) {
		queue1=q1;
		queue2=q2;
	}
	@Override
	public void run() {
		try{
			while(!Thread.interrupted()){
				Toast t=queue1.take();//如果队列中没有可用元素将会阻塞,直至有可用元素被添加
				t.butter();
				System.out.println(t);
				queue2.put(t);
			}
		}catch (InterruptedException e) {
			System.out.println("butter process interrupted");
		}
		System.out.println("butter process off");
	}
}
/**
 * @Description:涂抹果酱
 */
class Jam implements Runnable{
	private LinkedBlockingQueue<Toast> queue1,queue2;//抹黄油后吐司队列,抹果酱吐司队列
	public Jam(LinkedBlockingQueue<Toast> q1,LinkedBlockingQueue<Toast>q2) {
		queue1=q1;
		queue2=q2;
	}
	@Override
	public void run() {
		try{
			while(!Thread.interrupted()){
				Toast t=queue1.take();//如果队列中没有可用元素将会阻塞,直至有可用元素被添加
				t.jam();
				System.out.println(t);
				queue2.put(t);
			}
		}catch (InterruptedException e) {
			System.out.println("jam process interrupted");
		}
		System.out.println("jam process off");
	}
}
/**
 * @Description:被食用
 */
class Consumer implements Runnable{
	private LinkedBlockingQueue<Toast> finished;//抹黄油后吐司队列,抹果酱吐司队列
	int count=0;
	public Consumer(LinkedBlockingQueue<Toast> q) {
		finished=q;
	}
	@Override
	public void run() {
		try{
			while(!Thread.interrupted()){
				Toast t=finished.take();//如果队列中没有可用元素将会阻塞,直至有可用元素被添加
				if(t.getId()!=count++||t.getStatus()!=2){
					System.out.println("过程出现错误");
					return;
				}else{
					System.out.println("所有过程正确实现"+"toast "+t.getId()+"被食用");
				}
			}
		}catch (InterruptedException e) {
			System.out.println("eat process interrupted");
		}
		System.out.println("eat process off");
	}
}

 

相关文章
|
2月前
|
负载均衡 算法 安全
基于Reactor模式的高性能网络库之线程池组件设计篇
EventLoopThreadPool 是 Reactor 模式中实现“一个主线程 + 多个工作线程”的关键组件,用于高效管理多个 EventLoop 并在多核 CPU 上分担高并发 I/O 压力。通过封装 Thread 类和 EventLoopThread,实现线程创建、管理和事件循环的调度,形成线程池结构。每个 EventLoopThread 管理一个子线程与对应的 EventLoop(subloop),主线程(base loop)通过负载均衡算法将任务派发至各 subloop,从而提升系统性能与并发处理能力。
90 3
|
3月前
|
机器学习/深度学习 监控 算法
局域网行为监控软件 C# 多线程数据包捕获算法:基于 KMP 模式匹配的内容分析优化方案探索
本文探讨了一种结合KMP算法的多线程数据包捕获与分析方案,用于局域网行为监控。通过C#实现,该系统可高效检测敏感内容、管理URL访问、分析协议及审计日志。实验表明,相较于传统算法,KMP在处理大规模网络流量时效率显著提升。未来可在算法优化、多模式匹配及机器学习等领域进一步研究。
77 0
|
7月前
|
SQL 数据建模 BI
【YashanDB 知识库】用 yasldr 配置 Bulkload 模式作单线程迁移 300G 的业务数据到分布式数据库,迁移任务频繁出错
问题描述 详细版本:YashanDB Server Enterprise Edition Release 23.2.4.100 x86_64 6db1237 影响范围: 离线数据迁移场景,影响业务数据入库。 外场将部分 NewCIS 的报表业务放到分布式数据库,验证 SQL 性能水平。 操作系统环境配置: 125G 内存 32C CPU 2T 的 HDD 磁盘 问题出现的步骤/操作: 1、部署崖山分布式数据库 1mm 1cn 3dn 单线启动 yasldr 数据迁移任务,设置 32 线程的 bulk load 模式 2、观察 yasldr.log 是否出现如下错
|
9月前
|
Java
Java—多线程实现生产消费者
本文介绍了多线程实现生产消费者模式的三个版本。Version1包含四个类:`Producer`(生产者)、`Consumer`(消费者)、`Resource`(公共资源)和`TestMain`(测试类)。通过`synchronized`和`wait/notify`机制控制线程同步,但存在多个生产者或消费者时可能出现多次生产和消费的问题。 Version2将`if`改为`while`,解决了多次生产和消费的问题,但仍可能因`notify()`随机唤醒线程而导致死锁。因此,引入了`notifyAll()`来唤醒所有等待线程,但这会带来性能问题。
166 1
Java—多线程实现生产消费者
|
9月前
|
缓存 安全 Java
【JavaEE】——单例模式引起的多线程安全问题:“饿汉/懒汉”模式,及解决思路和方法(面试高频)
单例模式下,“饿汉模式”,“懒汉模式”,单例模式下引起的线程安全问题,解锁思路和解决方法
|
11月前
|
安全 Java
Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧
【10月更文挑战第20天】Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧,包括避免在循环外调用wait()、优先使用notifyAll()、确保线程安全及处理InterruptedException等,帮助读者更好地掌握这些方法的应用。
130 1
|
4月前
|
机器学习/深度学习 消息中间件 存储
【高薪程序员必看】万字长文拆解Java并发编程!(9-2):并发工具-线程池
🌟 ​大家好,我是摘星!​ 🌟今天为大家带来的是并发编程中的强力并发工具-线程池,废话不多说让我们直接开始。
168 0
|
7月前
|
Linux
Linux编程: 在业务线程中注册和处理Linux信号
通过本文,您可以了解如何在业务线程中注册和处理Linux信号。正确处理信号可以提高程序的健壮性和稳定性。希望这些内容能帮助您更好地理解和应用Linux信号处理机制。
118 26
|
7月前
|
Linux
Linux编程: 在业务线程中注册和处理Linux信号
本文详细介绍了如何在Linux中通过在业务线程中注册和处理信号。我们讨论了信号的基本概念,并通过完整的代码示例展示了在业务线程中注册和处理信号的方法。通过正确地使用信号处理机制,可以提高程序的健壮性和响应能力。希望本文能帮助您更好地理解和应用Linux信号处理,提高开发效率和代码质量。
126 17