Exchanger是在两个任务之间交换对象的栅栏。当两个任务进入栅栏时,它们各自拥有一个对象,当它们离开时,它们都拥有对方的对象。Exchanger的典型应用场景是:一个任务在创建对象,而这些对象的生产代价很高,另一个任务在消费这些对象。通过这种方式,可以有更多的对象在被创建的同时被消费。
为了演示Exchanger类,我们将创建生产者和消费者任务。ExchangerProducer和ExchangerConsumer使用一个List<Fat>作为要求交换的对象,它们都包含一个用于这个List<Fat>的Exchanger。当你调用Exchanger.exchange()方法时,它将阻塞直至对方任务调用它自己的exchange()方法,那时,这两个exchange()方法将同时完成,而List<Fat>被交换:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
|
import
java.util.List;
import
java.util.concurrent.CopyOnWriteArrayList;
import
java.util.concurrent.Exchanger;
import
java.util.concurrent.ExecutorService;
import
java.util.concurrent.Executors;
import
java.util.concurrent.TimeUnit;
class
ExchangerProducer
implements
Runnable {
private
List<Fat> holder;
private
Exchanger<List<Fat>> exchanger;
public
ExchangerProducer(Exchanger<List<Fat>> exchanger, List<Fat> holder) {
this
.exchanger = exchanger;
this
.holder = holder;
}
@Override
public
void
run() {
try
{
while
(!Thread.interrupted()) {
//填充列表
for
(
int
i =
0
;i < ExchangerDemo.size; i++) {
holder.add(
new
Fat());
}
//等待交换
holder = exchanger.exchange(holder);
}
}
catch
(InterruptedException e) {
}
System.out.println(
"Producer stopped."
);
}
}
class
ExchangerConsumer
implements
Runnable {
private
List<Fat> holder;
private
Exchanger<List<Fat>> exchanger;
private
volatile
Fat value;
private
static
int
num =
0
;
public
ExchangerConsumer(Exchanger<List<Fat>> exchanger, List<Fat> holder) {
this
.exchanger = exchanger;
this
.holder = holder;
}
@Override
public
void
run() {
try
{
while
(!Thread.interrupted()) {
//等待交换
holder = exchanger.exchange(holder);
//读取列表并移除元素
for
(Fat x : holder) {
num++;
value = x;
//在循环内删除元素,这对于CopyOnWriteArrayList是没有问题的
holder.remove(x);
}
if
(num %
10000
==
0
) {
System.out.println(
"Exchanged count="
+ num);
}
}
}
catch
(InterruptedException e) {
}
System.out.println(
"Consumer stopped. Final value: "
+ value);
}
}
public
class
ExchangerDemo {
static
int
size =
10
;
static
int
delay =
5
;
//秒
public
static
void
main(String[] args)
throws
Exception {
ExecutorService exec = Executors.newCachedThreadPool();
List<Fat> producerList =
new
CopyOnWriteArrayList<>();
List<Fat> consumerList =
new
CopyOnWriteArrayList<>();
Exchanger<List<Fat>> exchanger =
new
Exchanger<>();
exec.execute(
new
ExchangerProducer(exchanger, producerList));
exec.execute(
new
ExchangerConsumer(exchanger, consumerList));
TimeUnit.SECONDS.sleep(delay);
exec.shutdownNow();
}
}
class
Fat {
private
volatile
double
d;
private
static
int
counter =
1
;
private
final
int
id = counter++;
public
Fat() {
//执行一段耗时的操作
for
(
int
i =
1
; i<
10000
; i++) {
d += (Math.PI + Math.E) / (
double
)i;
}
}
public
void
print() {System.out.println(
this
);}
public
String toString() {
return
"Fat id="
+ id;}
}
|
执行结果(可能的结果):
1
2
3
4
5
6
7
8
9
10
|
Exchanged count=
10000
Exchanged count=
20000
Exchanged count=
30000
Exchanged count=
40000
Exchanged count=
50000
Exchanged count=
60000
Exchanged count=
70000
Exchanged count=
80000
Consumer stopped. Final value: Fat id=
88300
Producer stopped.
|
在main()中,创建了用于两个任务的单一的Exchanger,以及两个用于互换的CopyOnWriteArrayList。这个特定的List变体允许列表在被遍历的时候调用remove()方法,而不会抛出ConcurrentModifiedException异常。ExchangerProducer将填充这个List,然后将这个满列表跟ExchangerConsumer的空列表交换。交换之后,ExchangerProducer可以继续的生产Fat对象,而ExchangerConsumer则开始使用满列表中的对象。因为有了Exchanger,填充一个列表和消费另一个列表便同时发生了。