1 前言
上一篇文章:多线程编程之线程间通信机制:wait/notify机制重点讲了在java多线程 编程中协调线程间通信的wait/notify
机制,它有力的保证了线程间通信的安全性以及便利性。这篇文章就来说说如何使用前面说到的wait/notify
机制实现生产者/消费者模式。
什么是生产者消费者模型?
生产者消费者模型具体来讲,就是在一个系统中,存在生产者和消费者两种角色,他们通过内存缓冲区进行通信,生产者生产消费者需要的资料,消费者把资料做成产品。生产消费者模式如下图。
生产者是一堆线程,消费者是另一堆线程,内存缓冲区可以使用List数组队列,数据类型只需要定义一个简单的类就好。关键是如何处理多线程之间的协作。这其实也是多线程通信的一个范例。
在这个模型中,最关键就是内存缓冲区为空的时候消费者必须等待,而内存缓冲区满的时候,生产者必须等待。其他时候可以是个动态平衡。值得注意的是多线程对临界区资源的操作时候必须保证在读写中只能存在一个线程,所以需要设计锁的策略。
这篇文章就通过前面说到的wait/notify
机制实现生产者/消费者模式。
2 正文
前面的文章提到了饭馆里厨师和服务员的例子。在一个饭馆里通常都有好多厨师以及好多服务员,我们可以把厨师称为生产者,把服务员称为消费者(厨师生产的食物被服务员拿走(消费)),我们知道厨师和服务员其实是不直接打交道的,而是在厨师做好菜之后放到窗口,服务员从窗口直接把菜端走给客人就好了,这样省去了生产者和消费者之间的沟通成本,从而极大的提升工作效率。从多线程编程代码的角度来看,每一个厨师就相当于一个生产者线程,每一个服务员都相当于一个消费者线程,而放菜的窗口就相当于一个缓冲队列,生产者线程不断把生产好的东西放到缓冲队列里,消费者线程不断从缓冲队列里取东西。模型图就如上所示。
那么生产者/消费者模式的实现有以下几种情况:
1、一个生产者与一个消费者
第一种情况就是一个生产者和一个消费者,就类似于饭店只有一个厨师炒菜和一个服务员上菜。那么代码模拟情况如下:
package com.jiangxia.chap4; //一个生产者与一个消费者 public class Demo01 { public static void main(String[] args) throws IllegalMonitorStateException { Object lock = new Object(); Thread t1 = new Demo01cook(lock); t1.start(); Thread t2 = new Demo01Waiter(lock); t2.start(); } } //食物:用来表示有没有菜 class Food { public static String food=""; } // 生产者(厨师) class Demo01cook extends Thread { //模拟送餐窗口队列 private Object lock; public Demo01cook(Object lock) { this.lock = lock; } @Override public void run() { try{ while(true){ //线程休眠模拟炒菜时间 Thread.sleep(3000); synchronized (lock){ //如果传菜窗口不为空,即有菜 if(!Food.food.equals("")){ lock.wait(); } System.out.println(System.currentTimeMillis() + "_" + System.nanoTime()+":厨师炒好菜了!等待服务员上菜。。。。"); lock.notify(); } } }catch (InterruptedException e){ e.printStackTrace(); } } } // 消费者(服务员) class Demo01Waiter extends Thread{ //模拟送餐窗口队列 private Object lock; public Demo01Waiter(Object lock) { this.lock = lock; } @Override public void run() { try{ while(true){ synchronized (lock){ if ("".equals(Food.food)){ lock.wait(); } System.out.println(System.currentTimeMillis() + "_" + System.nanoTime()+":服务员没有菜可以送了,等待厨师炒菜。。。。。"); Food.food = ""; lock.notify(); //线程休眠模拟送菜时间 Thread.sleep(2000); } } }catch (InterruptedException e){ e.printStackTrace(); } } } 复制代码
结果如下:
通过上面结果可以看出,生产者消费者有序生产消费。
二、一生产者多消费者
package com.jiangxia.chap4; import java.util.ArrayList; import java.util.List; /** * 一个生产者 多个消费者 */ public class Demo02 { public static void main(String[] args) { Food02 food02 = new Food02(); Thread cook = new Cook02(food02); cook.setName("厨师"); cook.start(); Thread[] waiters = new Thread[5]; for (int i = 0; i < 5; i++) { waiters[i] = new Waiter02(food02); waiters[i].setName("服务员"+(char)('A'+i)); waiters[i].start(); } } } class Food02{ private List<String> foodlist = new ArrayList<>(); synchronized public void addin(String food){ try{ if(foodlist.size()==1){ System.out.println("厨师:"+Thread.currentThread().getName()+"等待中"); this.wait(); } foodlist.add(food); System.out.println(Thread.currentThread().getName() + ":生产食物" + food); System.out.println(Thread.currentThread().getName() + ":还有" + foodlist.size() + "个食物"); this.notify(); }catch (InterruptedException e){ e.printStackTrace(); } } synchronized public String getout(){ String returnfood = null; try { /** * 当没有食物时 */ while (foodlist.size()==0){ System.out.println("服务员:"+Thread.currentThread().getName()+"等待中"); this.wait(); } returnfood = foodlist.get(0); foodlist.remove(0); System.out.println("服务员:"+Thread.currentThread().getName() + ":取走" + returnfood); System.out.println(Thread.currentThread().getName() + ":还有" + foodlist.size() + "个食物"); // this.notify(); this.notifyAll(); }catch (InterruptedException e){ e.printStackTrace(); } return returnfood; } } /** * 生产者 */ class Cook02 extends Thread{ private Food02 food02; public Cook02(Food02 food02) { this.food02 = food02; } @Override public void run() { while (true){ food02.addin(Math.random()+""); } } } /** * 消费者 */ class Waiter02 extends Thread{ private Food02 food02; public Waiter02(Food02 food02) { this.food02 = food02; } @Override public void run() { while (true){ food02.getout(); } } } 复制代码
部分结果如下:
在一个生产者多个消费者模型中:
- 消费者在判断集合是否有数据时不能使用if,因为有可能会重复唤醒的还是消费者,这里需要使用while语句进行判断,保证集合中一定是有数据的。
- 消费者唤醒线程不使用notify,因为notify是随机唤醒一个线程,万一重复唤醒消费者就没有办法重新生产新的数据,所以这里需要使用notifyAll方法。
三、多个生产者一个消费者
多个生产者与一个消费者的代码与上面一个生产者多个消费者代码类似:
package com.jiangxia.chap4; import java.util.ArrayList; import java.util.List; /** * 多生产者一消费者 */ public class Demo03 { public static void main(String[] args) { Food03 food03 = new Food03(); /** * 多个生产者 */ Thread[] cookies = new Thread[5]; for (int i = 0; i < cookies.length; i++) { cookies[i] = new Cook03(food03); cookies[i].setName("厨师" + (char)('A' + i)); cookies[i].start(); } Thread waiter = new Waiter03(food03); waiter.setName("服务员"); waiter.start(); } } class Food03{ private List<String> foodlist = new ArrayList<>(); synchronized public void addin(String food){ try{ if(foodlist.size()==1){ System.out.println("厨师:"+Thread.currentThread().getName()+"等待中"); this.wait(); } foodlist.add(food); System.out.println(Thread.currentThread().getName() + ":生产食物" + food); System.out.println(Thread.currentThread().getName() + ":还有" + foodlist.size() + "个食物"); this.notify(); }catch (InterruptedException e){ e.printStackTrace(); } } synchronized public String getout(){ String returnfood = null; try { /** * 当没有食物时 */ while (foodlist.size()==0){ System.out.println("服务员:"+Thread.currentThread().getName()+"等待中"); this.wait(); } returnfood = foodlist.get(0); foodlist.remove(0); System.out.println("服务员:"+Thread.currentThread().getName() + ":取走" + returnfood); System.out.println(Thread.currentThread().getName() + ":还有" + foodlist.size() + "个食物"); // this.notify(); this.notifyAll(); }catch (InterruptedException e){ e.printStackTrace(); } return returnfood; } } /** * 生产者 */ class Cook03 extends Thread{ private Food03 food03; public Cook03(Food03 food03) { this.food03 = food03; } @Override public void run() { while (true){ food03.addin(Math.random()+""); } } } /** * 消费者 */ class Waiter03 extends Thread{ private Food03 food03; public Waiter03(Food03 food03) { this.food03 = food03; } @Override public void run() { while (true){ food03.getout(); } } } 复制代码
部分结果如下:
多个生产者一个消费者的情况下:
1、需要不断的判断集合中是否有数据,如果有就重新等待,直到没有数据 时再做添加
2、生产者不能使用notify唤醒其它线程,有可能造成死锁,应该使用notifyAll方法把所有的线程都唤醒。
四、多个生产者多个消费者
最后一种就是多个生产者多个消费者,代码如下:
package com.jiangxia.chap4; import java.util.ArrayList; import java.util.List; /** * 多生产者 多消费者 */ public class Demo04 { public static void main(String[] args) { Food04 food04 = new Food04(); /** * 多个生产者 多个消费者 */ int size = 5; Thread[] cookies = new Thread[size]; Thread[] waiters = new Thread[size]; for (int i = 0; i < size; i++) { char c = (char)('A' + i); cookies[i] = new Cook04(food04); cookies[i].setName("厨师" + c); cookies[i].start(); waiters[i] = new Waiter04(food04); waiters[i].setName("服务员" + c); waiters[i].start(); } } } class Food04{ private List<String> foodlist = new ArrayList<>(); synchronized public void addin(String food){ try{ if(foodlist.size()==1){ System.out.println("厨师:"+Thread.currentThread().getName()+"等待中"); this.wait(); } foodlist.add(food); System.out.println(Thread.currentThread().getName() + ":生产食物" + food); System.out.println(Thread.currentThread().getName() + ":还有" + foodlist.size() + "个食物"); this.notify(); }catch (InterruptedException e){ e.printStackTrace(); } } synchronized public String getout(){ String returnfood = null; try { /** * 当没有食物时 */ while (foodlist.size()==0){ System.out.println("服务员:"+Thread.currentThread().getName()+"等待中"); this.wait(); } returnfood = foodlist.get(0); foodlist.remove(0); System.out.println("服务员:"+Thread.currentThread().getName() + ":取走" + returnfood); System.out.println(Thread.currentThread().getName() + ":还有" + foodlist.size() + "个食物"); // this.notify(); this.notifyAll(); }catch (InterruptedException e){ e.printStackTrace(); } return returnfood; } } /** * 生产者 */ class Cook04 extends Thread{ private Food04 food04; public Cook04(Food04 food04) { this.food04 = food04; } @Override public void run() { while (true){ food04.addin(Math.random()+""); } } } /** * 消费者 */ class Waiter04 extends Thread{ private Food04 food04; public Waiter04(Food04 food04) { this.food04 = food04; } @Override public void run() { while (true){ food04.getout(); } } } 复制代码
部分结果如下:
以上就是wait/notify
机制的一个现实应用:生产者-消费者
模式的一个简介。