Java并发(四)BlockingQueue的使用

简介:

    wait()和notifyAll()方法以一种非常低级的方式解决了任务互操作的问题,即每次交互时都需要握手。在许多情况下,你可以瞄准更高的抽象级别,使用同步队列来解决任务协作的问题。同步队列在任何时刻都只允许一个任务插入或移除元素。在java.util.concurrent.BlockingQueue接口中提供了这个队列,这个接口有大量的标准实现。你通常可以使用LinkedBlockingQueue,它是一个无届队列,你还可以使用ArrayBlockingQueue,它具有固定的尺寸,因此你可以在它被阻塞之前,向其中放置有限数量的元素。

    如果消费者任务试图从队列中获取对象,而该队列此时为空,那么这些队列还可以挂起消费者任务,并且当有更多的元素可用时回复消费者任务。阻塞队列可以解决非常大的问题,而其方式与wait()和notifyAll()相比,则要简单并可靠许多。

    考虑下面这个BlockingQueue的示例,有一台机器具有三个任务:一个制作吐司,一个给吐司抹黄油,还有一个给吐司涂果酱。我们可以通过各个处理过程之间的BlockingQueue来运行这个吐司制作程序:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
import  java.util.Random;
import  java.util.concurrent.ExecutorService;
import  java.util.concurrent.Executors;
import  java.util.concurrent.LinkedBlockingQueue;
import  java.util.concurrent.TimeUnit;
 
class  Toast {
     /**
      * 吐司的状态:
      * DRY: 烘干的
      * BUTTERED: 涂了黄油的
      * JAMMED: 涂了果酱的
      * <p>吐司的状态只能由DRY->BUTTERED->JAMMED转变
      */
     public  enum  Status {DRY, BUTTERED, JAMMED}
     private  Status status = Status.DRY; //默认状态为DRY
     private  final  int  id;
     public  Toast( int  id) {  this .id =  id;}
     public  void  butter() {status = Status.BUTTERED;}
     public  void  jam() {status = Status.JAMMED;}
     public  Status getStatus() { return  status;}
     public  int  getId() { return  id;}
     public  String toString() {
         return  "Toast id: "  + id +  ", status: "  + status;
     }
}
 
@SuppressWarnings ( "serial" )
class  ToastQueue  extends  LinkedBlockingQueue<Toast> {}
 
/**
  * 生产吐司的任务。
  */
class  Toaster  implements  Runnable {
     private  ToastQueue toastQueue;
     private  int  count =  0 ;
     private  Random random =  new  Random( 47 );
     public  Toaster(ToastQueue queue) {
         this .toastQueue = queue;
     }
     @Override
     public  void  run() {
         try  {
             while (!Thread.interrupted()) {
                 TimeUnit.MILLISECONDS.sleep( 300  + random.nextInt( 500 ));
                 //生产一片吐司,这些吐司是有序的
                 Toast toast =  new  Toast(count++);
                 System.out.println(toast);
                 //放到toastQueue中
                 toastQueue.put(toast);
             }
         catch  (InterruptedException e) {
             System.out.println( "Toaster interrupted." );
         }
         System.out.println( "Toaster off." );
     }
}
 
/**
  * 涂黄油的任务。
  */
class  Butterer  implements  Runnable {
     private  ToastQueue dryQueue;
     private  ToastQueue butteredQueue;
     public  Butterer(ToastQueue dryQueue, ToastQueue butteredQueue) {
         this .dryQueue = dryQueue;
         this .butteredQueue = butteredQueue;
     }
     
     @Override
     public  void  run() {
         try  {
             while (!Thread.interrupted()) {
                 //在取得下一个吐司之前会一直阻塞
                 Toast toast = dryQueue.take();
                 toast.butter();
                 System.out.println(toast);
                 butteredQueue.put(toast);
             }
         catch  (InterruptedException e) {
             System.out.println( "Butterer interrupted." );
         }
         System.out.println( "Butterer off." );
         
     }
}
 
/**
  * 涂果酱的任务。
  */
class  Jammer  implements  Runnable {
     private  ToastQueue butteredQueue;
     private  ToastQueue finishedQueue;
     public  Jammer(ToastQueue butteredQueue, ToastQueue finishedQueue) {
         this .finishedQueue = finishedQueue;
         this .butteredQueue = butteredQueue;
     }
     
     @Override
     public  void  run() {
         try  {
             while (!Thread.interrupted()) {
                 //在取得下一个吐司之前会一直阻塞
                 Toast toast = butteredQueue.take();
                 toast.jam();
                 System.out.println(toast);
                 finishedQueue.put(toast);
             }
         catch  (InterruptedException e) {
             System.out.println( "Jammer interrupted." );
         }
         System.out.println( "Jammer off." );
     }
}
 
/**
  * 吃吐司的人,消费者。
  */
class  Eater  implements  Runnable {
     private  ToastQueue finishedQueue;
     private  int  count =  0 ;
     public  Eater (ToastQueue finishedQueue) {
         this .finishedQueue = finishedQueue;
     }
     @Override
     public  void  run() {
         try  {
             while (!Thread.interrupted()) {
                 //在取得下一个吐司之前会一直阻塞
                 Toast toast = finishedQueue.take();
                 //验证取得的吐司是有序的,而且状态是JAMMED的
                 if  (toast.getId() != count++ || 
                         toast.getStatus() != Toast.Status.JAMMED) {
                     System.out.println( "Error -> "  + toast);
                     System.exit(- 1 );
                 else  {
                     //吃掉吐司
                     System.out.println(toast +  "->Eaten" );
                 }
             }
         catch  (InterruptedException e) {
             System.out.println( "Eater interrupted." );
         }
         System.out.println( "Eater off." );
     }
}
 
public  class  ToastOMatic {
     public  static  void  main(String[] args)  throws  Exception {
         ToastQueue dryQueue =  new  ToastQueue();
         ToastQueue butteredQueue =  new  ToastQueue();
         ToastQueue finishedQueue =  new  ToastQueue();
         ExecutorService exec = Executors.newCachedThreadPool();
         exec.execute( new  Toaster(dryQueue));
         exec.execute( new  Butterer(dryQueue, butteredQueue));
         exec.execute( new  Jammer(butteredQueue, finishedQueue));
         exec.execute( new  Eater(finishedQueue));
         TimeUnit.SECONDS.sleep( 5 );
         exec.shutdownNow();
     }
}

执行结果(可能的结果):

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
Toast id:  0 , status: DRY
Toast id:  0 , status: BUTTERED
Toast id:  0 , status: JAMMED
Toast id:  0 , status: JAMMED->Eaten
Toast id:  1 , status: DRY
Toast id:  1 , status: BUTTERED
Toast id:  1 , status: JAMMED
Toast id:  1 , status: JAMMED->Eaten
Toast id:  2 , status: DRY
Toast id:  2 , status: BUTTERED
Toast id:  2 , status: JAMMED
Toast id:  2 , status: JAMMED->Eaten
Toast id:  3 , status: DRY
Toast id:  3 , status: BUTTERED
Toast id:  3 , status: JAMMED
Toast id:  3 , status: JAMMED->Eaten
Toast id:  4 , status: DRY
Toast id:  4 , status: BUTTERED
Toast id:  4 , status: JAMMED
Toast id:  4 , status: JAMMED->Eaten
Toast id:  5 , status: DRY
Toast id:  5 , status: BUTTERED
Toast id:  5 , status: JAMMED
Toast id:  5 , status: JAMMED->Eaten
Toast id:  6 , status: DRY
Toast id:  6 , status: BUTTERED
Toast id:  6 , status: JAMMED
Toast id:  6 , status: JAMMED->Eaten
Toast id:  7 , status: DRY
Toast id:  7 , status: BUTTERED
Toast id:  7 , status: JAMMED
Toast id:  7 , status: JAMMED->Eaten
Eater interrupted.
Eater off.
Butterer interrupted.
Toaster interrupted.
Toaster off.
Jammer interrupted.
Jammer off.
Butterer off.

    Toast是一个使用enum值的优秀示例。注意,这个示例中没有任何显式的同步(即使用Lock对象或者synchronized关键字的同步),因为同步已经由队列和系统的设计隐式的管理了——每片Toast在任何时刻都只由一个任务在操作。因为队列的阻塞,使得处理过程将被自动的挂起和恢复。你可以看到由BlockingQueue产生的简化十分明显。在使用显式的wait()和notifyAll()时存在的类和类之间的耦合被消除了,因为每个类都只和它的BlockingQueue通信。

目录
相关文章
|
4月前
|
安全 Java 编译器
揭秘JAVA深渊:那些让你头大的最晦涩知识点,从泛型迷思到并发陷阱,你敢挑战吗?
【8月更文挑战第22天】Java中的难点常隐藏在其高级特性中,如泛型与类型擦除、并发编程中的内存可见性及指令重排,以及反射与动态代理等。这些特性虽强大却也晦涩,要求开发者深入理解JVM运作机制及计算机底层细节。例如,泛型在编译时检查类型以增强安全性,但在运行时因类型擦除而丢失类型信息,可能导致类型安全问题。并发编程中,内存可见性和指令重排对同步机制提出更高要求,不当处理会导致数据不一致。反射与动态代理虽提供运行时行为定制能力,但也增加了复杂度和性能开销。掌握这些知识需深厚的技术底蕴和实践经验。
100 2
|
4月前
|
安全 Java 调度
解锁Java并发编程高阶技能:深入剖析无锁CAS机制、揭秘魔法类Unsafe、精通原子包Atomic,打造高效并发应用
【8月更文挑战第4天】在Java并发编程中,无锁编程以高性能和低延迟应对高并发挑战。核心在于无锁CAS(Compare-And-Swap)机制,它基于硬件支持,确保原子性更新;Unsafe类提供底层内存操作,实现CAS;原子包java.util.concurrent.atomic封装了CAS操作,简化并发编程。通过`AtomicInteger`示例,展现了线程安全的自增操作,突显了这些技术在构建高效并发程序中的关键作用。
77 1
|
1月前
|
存储 安全 Java
Java多线程编程中的并发容器:深入解析与实战应用####
在本文中,我们将探讨Java多线程编程中的一个核心话题——并发容器。不同于传统单一线程环境下的数据结构,并发容器专为多线程场景设计,确保数据访问的线程安全性和高效性。我们将从基础概念出发,逐步深入到`java.util.concurrent`包下的核心并发容器实现,如`ConcurrentHashMap`、`CopyOnWriteArrayList`以及`BlockingQueue`等,通过实例代码演示其使用方法,并分析它们背后的设计原理与适用场景。无论你是Java并发编程的初学者还是希望深化理解的开发者,本文都将为你提供有价值的见解与实践指导。 --- ####
|
1月前
|
存储 设计模式 分布式计算
Java中的多线程编程:并发与并行的深度解析####
在当今软件开发领域,多线程编程已成为提升应用性能、响应速度及资源利用率的关键手段之一。本文将深入探讨Java平台上的多线程机制,从基础概念到高级应用,全面解析并发与并行编程的核心理念、实现方式及其在实际项目中的应用策略。不同于常规摘要的简洁概述,本文旨在通过详尽的技术剖析,为读者构建一个系统化的多线程知识框架,辅以生动实例,让抽象概念具体化,复杂问题简单化。 ####
|
1月前
|
Java 数据库连接 数据库
如何构建高效稳定的Java数据库连接池,涵盖连接池配置、并发控制和异常处理等方面
本文介绍了如何构建高效稳定的Java数据库连接池,涵盖连接池配置、并发控制和异常处理等方面。通过合理配置初始连接数、最大连接数和空闲连接超时时间,确保系统性能和稳定性。文章还探讨了同步阻塞、异步回调和信号量等并发控制策略,并提供了异常处理的最佳实践。最后,给出了一个简单的连接池示例代码,并推荐使用成熟的连接池框架(如HikariCP、C3P0)以简化开发。
56 2
|
2月前
|
Java
【编程进阶知识】揭秘Java多线程:并发与顺序编程的奥秘
本文介绍了Java多线程编程的基础,通过对比顺序执行和并发执行的方式,展示了如何使用`run`方法和`start`方法来控制线程的执行模式。文章通过具体示例详细解析了两者的异同及应用场景,帮助读者更好地理解和运用多线程技术。
36 1
|
3月前
|
Java API 容器
JAVA并发编程系列(10)Condition条件队列-并发协作者
本文通过一线大厂面试真题,模拟消费者-生产者的场景,通过简洁的代码演示,帮助读者快速理解并复用。文章还详细解释了Condition与Object.wait()、notify()的区别,并探讨了Condition的核心原理及其实现机制。
|
4月前
|
存储 Java
Java 中 ConcurrentHashMap 的并发级别
【8月更文挑战第22天】
66 5
|
4月前
|
存储 算法 Java
Java 中的同步集合和并发集合
【8月更文挑战第22天】
53 5
|
4月前
|
缓存 Java 调度
【Java 并发秘籍】线程池大作战:揭秘 JDK 中的线程池家族!
【8月更文挑战第24天】Java的并发库提供多种线程池以应对不同的多线程编程需求。本文通过实例介绍了四种主要线程池:固定大小线程池、可缓存线程池、单一线程线程池及定时任务线程池。固定大小线程池通过预设线程数管理任务队列;可缓存线程池能根据需要动态调整线程数量;单一线程线程池确保任务顺序执行;定时任务线程池支持周期性或延时任务调度。了解并正确选用这些线程池有助于提高程序效率和资源利用率。
63 2