wait()和notifyAll()方法以一种非常低级的方式解决了任务互操作的问题,即每次交互时都需要握手。在许多情况下,你可以瞄准更高的抽象级别,使用同步队列来解决任务协作的问题。同步队列在任何时刻都只允许一个任务插入或移除元素。在java.util.concurrent.BlockingQueue接口中提供了这个队列,这个接口有大量的标准实现。你通常可以使用LinkedBlockingQueue,它是一个无届队列,你还可以使用ArrayBlockingQueue,它具有固定的尺寸,因此你可以在它被阻塞之前,向其中放置有限数量的元素。
如果消费者任务试图从队列中获取对象,而该队列此时为空,那么这些队列还可以挂起消费者任务,并且当有更多的元素可用时回复消费者任务。阻塞队列可以解决非常大的问题,而其方式与wait()和notifyAll()相比,则要简单并可靠许多。
考虑下面这个BlockingQueue的示例,有一台机器具有三个任务:一个制作吐司,一个给吐司抹黄油,还有一个给吐司涂果酱。我们可以通过各个处理过程之间的BlockingQueue来运行这个吐司制作程序:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
|
import
java.util.Random;
import
java.util.concurrent.ExecutorService;
import
java.util.concurrent.Executors;
import
java.util.concurrent.LinkedBlockingQueue;
import
java.util.concurrent.TimeUnit;
class
Toast {
/**
* 吐司的状态:
* DRY: 烘干的
* BUTTERED: 涂了黄油的
* JAMMED: 涂了果酱的
* <p>吐司的状态只能由DRY->BUTTERED->JAMMED转变
*/
public
enum
Status {DRY, BUTTERED, JAMMED}
private
Status status = Status.DRY;
//默认状态为DRY
private
final
int
id;
public
Toast(
int
id) {
this
.id = id;}
public
void
butter() {status = Status.BUTTERED;}
public
void
jam() {status = Status.JAMMED;}
public
Status getStatus() {
return
status;}
public
int
getId() {
return
id;}
public
String toString() {
return
"Toast id: "
+ id +
", status: "
+ status;
}
}
@SuppressWarnings
(
"serial"
)
class
ToastQueue
extends
LinkedBlockingQueue<Toast> {}
/**
* 生产吐司的任务。
*/
class
Toaster
implements
Runnable {
private
ToastQueue toastQueue;
private
int
count =
0
;
private
Random random =
new
Random(
47
);
public
Toaster(ToastQueue queue) {
this
.toastQueue = queue;
}
@Override
public
void
run() {
try
{
while
(!Thread.interrupted()) {
TimeUnit.MILLISECONDS.sleep(
300
+ random.nextInt(
500
));
//生产一片吐司,这些吐司是有序的
Toast toast =
new
Toast(count++);
System.out.println(toast);
//放到toastQueue中
toastQueue.put(toast);
}
}
catch
(InterruptedException e) {
System.out.println(
"Toaster interrupted."
);
}
System.out.println(
"Toaster off."
);
}
}
/**
* 涂黄油的任务。
*/
class
Butterer
implements
Runnable {
private
ToastQueue dryQueue;
private
ToastQueue butteredQueue;
public
Butterer(ToastQueue dryQueue, ToastQueue butteredQueue) {
this
.dryQueue = dryQueue;
this
.butteredQueue = butteredQueue;
}
@Override
public
void
run() {
try
{
while
(!Thread.interrupted()) {
//在取得下一个吐司之前会一直阻塞
Toast toast = dryQueue.take();
toast.butter();
System.out.println(toast);
butteredQueue.put(toast);
}
}
catch
(InterruptedException e) {
System.out.println(
"Butterer interrupted."
);
}
System.out.println(
"Butterer off."
);
}
}
/**
* 涂果酱的任务。
*/
class
Jammer
implements
Runnable {
private
ToastQueue butteredQueue;
private
ToastQueue finishedQueue;
public
Jammer(ToastQueue butteredQueue, ToastQueue finishedQueue) {
this
.finishedQueue = finishedQueue;
this
.butteredQueue = butteredQueue;
}
@Override
public
void
run() {
try
{
while
(!Thread.interrupted()) {
//在取得下一个吐司之前会一直阻塞
Toast toast = butteredQueue.take();
toast.jam();
System.out.println(toast);
finishedQueue.put(toast);
}
}
catch
(InterruptedException e) {
System.out.println(
"Jammer interrupted."
);
}
System.out.println(
"Jammer off."
);
}
}
/**
* 吃吐司的人,消费者。
*/
class
Eater
implements
Runnable {
private
ToastQueue finishedQueue;
private
int
count =
0
;
public
Eater (ToastQueue finishedQueue) {
this
.finishedQueue = finishedQueue;
}
@Override
public
void
run() {
try
{
while
(!Thread.interrupted()) {
//在取得下一个吐司之前会一直阻塞
Toast toast = finishedQueue.take();
//验证取得的吐司是有序的,而且状态是JAMMED的
if
(toast.getId() != count++ ||
toast.getStatus() != Toast.Status.JAMMED) {
System.out.println(
"Error -> "
+ toast);
System.exit(-
1
);
}
else
{
//吃掉吐司
System.out.println(toast +
"->Eaten"
);
}
}
}
catch
(InterruptedException e) {
System.out.println(
"Eater interrupted."
);
}
System.out.println(
"Eater off."
);
}
}
public
class
ToastOMatic {
public
static
void
main(String[] args)
throws
Exception {
ToastQueue dryQueue =
new
ToastQueue();
ToastQueue butteredQueue =
new
ToastQueue();
ToastQueue finishedQueue =
new
ToastQueue();
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(
new
Toaster(dryQueue));
exec.execute(
new
Butterer(dryQueue, butteredQueue));
exec.execute(
new
Jammer(butteredQueue, finishedQueue));
exec.execute(
new
Eater(finishedQueue));
TimeUnit.SECONDS.sleep(
5
);
exec.shutdownNow();
}
}
|
执行结果(可能的结果):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
Toast id:
0
, status: DRY
Toast id:
0
, status: BUTTERED
Toast id:
0
, status: JAMMED
Toast id:
0
, status: JAMMED->Eaten
Toast id:
1
, status: DRY
Toast id:
1
, status: BUTTERED
Toast id:
1
, status: JAMMED
Toast id:
1
, status: JAMMED->Eaten
Toast id:
2
, status: DRY
Toast id:
2
, status: BUTTERED
Toast id:
2
, status: JAMMED
Toast id:
2
, status: JAMMED->Eaten
Toast id:
3
, status: DRY
Toast id:
3
, status: BUTTERED
Toast id:
3
, status: JAMMED
Toast id:
3
, status: JAMMED->Eaten
Toast id:
4
, status: DRY
Toast id:
4
, status: BUTTERED
Toast id:
4
, status: JAMMED
Toast id:
4
, status: JAMMED->Eaten
Toast id:
5
, status: DRY
Toast id:
5
, status: BUTTERED
Toast id:
5
, status: JAMMED
Toast id:
5
, status: JAMMED->Eaten
Toast id:
6
, status: DRY
Toast id:
6
, status: BUTTERED
Toast id:
6
, status: JAMMED
Toast id:
6
, status: JAMMED->Eaten
Toast id:
7
, status: DRY
Toast id:
7
, status: BUTTERED
Toast id:
7
, status: JAMMED
Toast id:
7
, status: JAMMED->Eaten
Eater interrupted.
Eater off.
Butterer interrupted.
Toaster interrupted.
Toaster off.
Jammer interrupted.
Jammer off.
Butterer off.
|
Toast是一个使用enum值的优秀示例。注意,这个示例中没有任何显式的同步(即使用Lock对象或者synchronized关键字的同步),因为同步已经由队列和系统的设计隐式的管理了——每片Toast在任何时刻都只由一个任务在操作。因为队列的阻塞,使得处理过程将被自动的挂起和恢复。你可以看到由BlockingQueue产生的简化十分明显。在使用显式的wait()和notifyAll()时存在的类和类之间的耦合被消除了,因为每个类都只和它的BlockingQueue通信。