Java并发(二)生产者与消费者

简介:

    考虑这样一个饭店,它有一个厨师(Chef)和一个服务员(Waiter)。这个服务员必须等待厨师准备好菜品。当厨师准备好时,他会通知服务员,之后服务员上菜,然后返回继续等待。这是一个任务协作的示例:厨师代表生产者,而服务员代表消费者。两个任务必须在菜品被生产和消费时进行握手,而系统必须以有序的方式关闭。下面是对这个叙述建模的代码:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import  java.util.concurrent.ExecutorService;
import  java.util.concurrent.Executors;
import  java.util.concurrent.TimeUnit;
class  Meal {
     private  final  int  orderNum;
     public  Meal( int  orderNum) {
         this .orderNum = orderNum;
     }
     @Override
     public  String toString() {
         return  "Meal "  + orderNum;
     }
}
class  Waiter  implements  Runnable {
     private  Restaurant r;
     public  Waiter(Restaurant r) {
         this .r = r;
     }
     @Override
     public  void  run() {
         try  {
             while (!Thread.interrupted()) {
                 synchronized  ( this ) {
                     while (r.meal ==  null ) {
                         wait(); //等待厨师做菜
                     }
                 }
                 System.out.println( "Waiter got "  + r.meal);
                 synchronized  (r.chef) {
                     r.meal =  null ; //上菜
                     r.chef.notifyAll(); //通知厨师继续做菜
                 }
             }
         catch  (InterruptedException e) {
             System.out.println( "Waiter task is over." );
         }
     }
}
class  Chef  implements  Runnable {
     private  Restaurant r;
     private  int  count =  0 ; //厨师做的菜品数量
     public  Chef(Restaurant r) {
         this .r = r;
     }
     @Override
     public  void  run() {
         try  {
             while (!Thread.interrupted()) {
                 synchronized  ( this ) {
                     while (r.meal !=  null ) {
                         wait(); //等待服务员上菜
                     }
                 }
                 if  (++count >  10 ) {
                     System.out.println( "Meal is enough, stop." );
                     r.exec.shutdownNow();
                 }
                 System.out.print( "Order up! " );
                 synchronized  (r.waiter) {
                     r.meal =  new  Meal(count); //做菜
                     r.waiter.notifyAll(); //通知服务员上菜
                 }
                 TimeUnit.MILLISECONDS.sleep( 100 );
             }
         catch  (InterruptedException e) {
             System.out.println( "Chef task is over." );
         }
     }
}
public  class  Restaurant {
     Meal meal;
     ExecutorService exec = Executors.newCachedThreadPool();
     //厨师和服务员都服务于同一个饭店
     Waiter waiter =  new  Waiter( this );
     Chef chef =  new  Chef( this );
     public  Restaurant() {
         exec.execute(waiter);
         exec.execute(chef);
     }
     public  static  void  main(String[] args) {
         new  Restaurant();
     }
}

执行结果:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
Order up! Waiter got Meal  1
Order up! Waiter got Meal  2
Order up! Waiter got Meal  3
Order up! Waiter got Meal  4
Order up! Waiter got Meal  5
Order up! Waiter got Meal  6
Order up! Waiter got Meal  7
Order up! Waiter got Meal  8
Order up! Waiter got Meal  9
Order up! Waiter got Meal  10
Meal is enough, stop.
Order up! Waiter task is over.
Chef task is over.

    Restaurant是Waiter和Chef的焦点,它们都必须知道在为哪个饭店工作,因为他们必须和这家饭店的窗口打交道,一边放置或拿取菜品r.meal。在run()中,waiter进入wait()模式,停止其任务,直至被Chef的notifyAll()唤醒。由于这是一个非常简单的程序,因此我们知道只有一个任务将在Waiter的锁上等待:即Waiter任务自身。出于这个原因,理论上可以调用notify()而不是notifyAll()。但是,在更复杂的情况下,可能会有多个任务在某个特定对象锁上等待,因此你不知道哪个任务应该被唤醒。因此调用notifyAll()要更安全一些,这样可以唤醒等待这个锁的所有任务,而每个任务都必须决定这个通知是否与自己相关。

    一旦Chef送上Meal并通知Waiter,这个Chef就将等待,知道Waiter收集到订单并通知Chef,之后Chef就可以做下一份菜品了。

    注意,wait()被包装在一个while()字句中,这个语句在不断的测试正在等待的事物。乍一看有点怪——如果在等待一个订单,一单你被唤醒,这个订单就必定是可获得的,对吗?正如前面注意到的,在更复杂的并发应用中,某个其他的任务可能在Waiter被唤醒时突然插足并拿走订单。因此唯一安全的方式是使用下面这种wait()的惯用法:

?
1
2
3
while (conditionIsNotMet) {
     wait();
}

    这可以保证在你退出等待循环之前,条件将得到满足,并且如果你收到了关于某事物的通知,而它与这个条件并无关系,或者在你完全退出等待循环之前,这个条件发生了变化,都可以确保你重返等待状态。

    请注意观察,对notifyAll()的调用必须首先捕获Waiter上的锁,而在Waiter.run()中的对wait()的调用会自动的释放这个所,因此这是由可能实现的。因为调用notifyAll()必然拥有这个锁,所以这可以保证两个试图在同一个对象上调用notifyAll()的任务不会互相冲突。

    通过把整个run()方法体放到一个try语句块中,可以使得这两个run()方法都被设计为可以有序的关闭。catch子句将紧挨着run()方法的括号之前结束,因此,如果这个任务收到了InterruptedException,它将在捕获到异常后立即结束。

    注意,在Chef中,在调用shutdownNow()之后,你应该直接从run()返回,并且通常这就是你应该做的。但是,以这种方式执行还有一些更有趣的东西。记住,shutdownNow()将向所有由ExecutorService启动的任务发送interrupt(),但是在Chef中,任务并没有在获得该interrupt()立即结束,因为当任务试图进入一个(可中断的)阻塞操作时,这个中断只能抛出InterruptedException。因此你将首先看到“Order up!”,然后Chef试图调用sleep()方法时,抛出了InterruptedException。如果你移除对sleep()的调用,那么这个任务将回到run()循环的顶部,并由于Thread.interrupted()测试而退出,同时并不抛异常。

    在这两个示例中,对于一个任务而言,只有一个单一的地方用于存放对象,从而使得另一个任务稍后可以使用这个对象。但是,在典型的生产者-消费者实现中,应使用先进先出队列来存储被生产和消费的对象。


目录
相关文章
|
1月前
|
存储 安全 算法
解读 Java 并发队列 BlockingQueue
解读 Java 并发队列 BlockingQueue
20 0
|
2月前
|
监控 安全 算法
Java并发基础:LinkedTransferQueue全面解析!
LinkedTransferQueue类实现了高效的线程间数据传递,支持等待匹配的生产者-消费者模式,基于链表的无界设计使其在高并发场景下表现卓越,且无需担心队列溢出,丰富的方法和良好的可扩展性满足了各种复杂应用场景的需求。
Java并发基础:LinkedTransferQueue全面解析!
|
2月前
|
Java 程序员 API
Java并发基础:concurrent Flow API全面解析
java.util.concurrent.Flow定义了响应式编程的核心接口,促进了Java在异步数据处理和背压机制方面的标准化,这使得第三方库如Reactor和RxJava能够基于这些接口提供丰富的实现和功能,同时简化了响应式编程在Java中的使用,Flow API增强了Java在并发编程领域的灵活性,使得处理异步数据流变得更加自然和高效。
109 0
Java并发基础:concurrent Flow API全面解析
|
1月前
|
存储 缓存 算法
Java并发基础:原子类之AtomicMarkableReference全面解析
AtomicMarkableReference类能够确保引用和布尔标记的原子性更新,有效避免了多线程环境下的竞态条件,其提供的方法可以轻松地实现基于条件的原子性操作,提高了程序的并发安全性和可靠性。
Java并发基础:原子类之AtomicMarkableReference全面解析
|
5天前
|
安全 Java
深入理解 Java 多线程和并发工具类
【4月更文挑战第19天】本文探讨了Java多线程和并发工具类在实现高性能应用程序中的关键作用。通过继承`Thread`或实现`Runnable`创建线程,利用`Executors`管理线程池,以及使用`Semaphore`、`CountDownLatch`和`CyclicBarrier`进行线程同步。保证线程安全、实现线程协作和性能调优(如设置线程池大小、避免不必要同步)是重要环节。理解并恰当运用这些工具能提升程序效率和可靠性。
|
7天前
|
Java 开发者
Java中多线程并发控制的实现与优化
【4月更文挑战第17天】 在现代软件开发中,多线程编程已成为提升应用性能和响应能力的关键手段。特别是在Java语言中,由于其平台无关性和强大的运行时环境,多线程技术的应用尤为广泛。本文将深入探讨Java多线程的并发控制机制,包括基本的同步方法、死锁问题以及高级并发工具如java.util.concurrent包的使用。通过分析多线程环境下的竞态条件、资源争夺和线程协调问题,我们提出了一系列实现和优化策略,旨在帮助开发者构建更加健壮、高效的多线程应用。
7 0
|
7天前
|
存储 缓存 安全
Java并发基础之互斥同步、非阻塞同步、指令重排与volatile
在Java中,多线程编程常常涉及到共享数据的访问,这时候就需要考虑线程安全问题。Java提供了多种机制来实现线程安全,其中包括互斥同步(Mutex Synchronization)、非阻塞同步(Non-blocking Synchronization)、以及volatile关键字等。 互斥同步(Mutex Synchronization) 互斥同步是一种基本的同步手段,它要求在任何时刻,只有一个线程可以执行某个方法或某个代码块,其他线程必须等待。Java中的synchronized关键字就是实现互斥同步的常用手段。当一个线程进入一个synchronized方法或代码块时,它需要先获得锁,如果
24 0
|
16天前
|
存储 缓存 安全
【企业级理解】高效并发之Java内存模型
【企业级理解】高效并发之Java内存模型
|
23天前
|
安全 Java
Java中的多线程并发控制
在Java中,多线程是实现并发执行任务的一种重要方式。然而,随着多个线程同时访问共享资源,可能会导致数据不一致和其他并发问题。因此,了解并掌握Java中的多线程并发控制机制显得尤为重要。本文将深入探讨Java的多线程并发控制,包括synchronized关键字、Lock接口、Semaphore类以及CountDownLatch类等,并通过实例代码演示其使用方法和注意事项。
12 2
|
27天前
|
缓存 NoSQL Java
Java项目:支持并发的秒杀项目(基于Redis)
Java项目:支持并发的秒杀项目(基于Redis)
26 0