声明:本文是《 Java 7 Concurrency Cookbook 》的第三章, 作者: Javier Fernández González 译者:郑玉婷
在并发任务间交换数据
Java 并发 API 提供了一种允许2个并发任务间相互交换数据的同步应用。更具体的说,Exchanger 类允许在2个线程间定义同步点,当2个线程到达这个点,他们相互交换数据类型,使用第一个线程的数据类型变成第二个的,然后第二个线程的数据类型变成第一个的。
这个类在遇到类似生产者和消费者问题时,是非常有用的。来一个非常经典的并发问题:你有相同的数据buffer,一个或多个数据生产者,和一个或多个数据消费者。只是Exchange类只能同步2个线程,所以你只能在你的生产者和消费者问题中只有一个生产者和一个消费者时使用这个类。
在这个指南,你将学习如何使用 Exchanger 类来解决只有一个生产者和一个消费者的生产者和消费者问题。
准备
这个指南的例子使用Eclipse IDE实现。如果你使用Eclipse或其他IDE,如NetBeans,打开它并创建一个新的Java项目。
怎么做呢…
按照这些步骤来实现下面的例子:
01 |
package tool; |
02 |
import java.util.List; |
03 |
import java.util.concurrent.Exchanger; |
04 |
05 |
//1. 首先,从实现producer开始吧。创建一个类名为Producer并一定实现 Runnable 接口。 |
06 |
public class Producer implements Runnable { |
07 |
08 |
// 2. 声明 List<String>对象,名为 buffer。这是等等要被相互交换的数据类型。 |
09 |
private List<String> buffer; |
10 |
11 |
// 3. 声明 Exchanger<List<String>>; 对象,名为exchanger。这个 exchanger 对象是用来同步producer和consumer的。 |
12 |
private final Exchanger<List<String>> exchanger; |
13 |
14 |
// 4. 实现类的构造函数,初始化这2个属性。 |
15 |
public Producer(List<String> buffer, Exchanger<List<String>> exchanger) { |
16 |
this .buffer = buffer; |
17 |
this .exchanger = exchanger; |
18 |
} |
19 |
20 |
// 5. 实现 run() 方法. 在方法内,实现10次交换。 |
21 |
@Override |
22 |
public void run() { |
23 |
int cycle = 1 ; |
24 |
for ( int i = 0 ; i < 10 ; i++) { System.out.printf( "Producer: Cycle %d\n" , cycle); |
25 |
26 |
// 6. 在每次循环中,加10个字符串到buffer。 |
27 |
for ( int j = 0 ; j < 10 ; j++) { |
28 |
String message = "Event " + ((i * 10 ) + j); |
29 |
System.out.printf( "Producer: %s\n" , message); |
30 |
buffer.add(message); |
31 |
} |
32 |
33 |
// 7. 调用 exchange() 方法来与consumer交换数据。此方法可能会抛出InterruptedException 异常, 加上处理代码。 |
34 |
try { |
35 |
buffer = exchanger.exchange(buffer); |
36 |
} catch (InterruptedException e) { |
37 |
e.printStackTrace(); |
38 |
} |
39 |
System.out.println( "Producer: " + buffer.size()); |
40 |
cycle++; |
41 |
} |
42 |
} |
43 |
} |
01 |
//8. 现在, 来实现consumer。创建一个类名为Consumer并一定实现 Runnable 接口。 |
02 |
package tool; |
03 |
import java.util.List; |
04 |
import java.util.concurrent.Exchanger; |
05 |
public class Consumer implements Runnable { |
06 |
07 |
// 9. 声明名为buffer的 List<String>对象。这个对象类型是用来相互交换的。 |
08 |
private List<String> buffer; |
09 |
10 |
// 10. 声明一个名为exchanger的 Exchanger<List<String>> 对象。用来同步 producer和consumer。 |
11 |
private final Exchanger<List<String>> exchanger; |
12 |
13 |
// 11. 实现类的构造函数,并初始化2个属性。 |
14 |
public Consumer(List<String>buffer, Exchanger<List<String>> exchanger) { |
15 |
this .buffer = buffer; |
16 |
this .exchanger = exchanger; |
17 |
} |
18 |
19 |
// 12. 实现 run() 方法。在方法内,实现10次交换。 |
20 |
@Override |
21 |
public void run() { |
22 |
int cycle = 1 ; |
23 |
for ( int i = 0 ; i < 10 ; i++) { |
24 |
System.out.printf( "Consumer: Cycle %d\n" , cycle); |
25 |
26 |
// 13. 在每次循环,首先调用exchange()方法来与producer同步。Consumer需要消耗数据。此方法可能会抛出InterruptedException异常, 加上处理代码。 |
27 |
try { |
28 |
buffer = exchanger.exchange(buffer); |
29 |
} catch (InterruptedException e) { e.printStackTrace(); |
30 |
} |
31 |
32 |
// 14. 把producer发来的在buffer里的10字符串写到操控台并从buffer内删除,留空。System.out.println("Consumer: " + buffer.size()); |
33 |
for ( int j = 0 ; j < 10 ; j++) { |
34 |
String message = buffer.get( 0 ); |
35 |
System.out.println( "Consumer: " + message); |
36 |
buffer.remove( 0 ); |
37 |
} |
38 |
cycle++; |
39 |
} |
01 |
//15.现在,实现例子的主类通过创建一个类,名为Core并加入 main() 方法。 |
02 |
package tool; |
03 |
import java.util.ArrayList; |
04 |
mport java.util.List; |
05 |
import java.util.concurrent.Exchanger; |
06 |
07 |
public class Core { |
08 |
public static void main(String[] args) { |
09 |
10 |
// 16. 创建2个buffers。分别给producer和consumer使用. |
11 |
List<String> buffer1 = new ArrayList<String>(); |
12 |
List<String> buffer2 = new ArrayList<String>(); |
13 |
14 |
// 17. 创建Exchanger对象,用来同步producer和consumer。 |
15 |
Exchanger<List<String>> exchanger = new Exchanger<List<String>>(); |
16 |
17 |
// 18. 创建Producer对象和Consumer对象。 |
18 |
Producer producer = new Producer(buffer1, exchanger); |
19 |
Consumer consumer = new Consumer(buffer2, exchanger); |
20 |
21 |
// 19. 创建线程来执行producer和consumer并开始线程。 |
22 |
Thread threadProducer = new Thread(producer); |
23 |
Thread threadConsumer = new Thread(consumer); threadProducer.start(); |
24 |
threadConsumer.start(); |
25 |
} |
它是怎么工作的…
消费者开始时是空白的buffer,然后调用Exchanger来与生产者同步。因为它需要数据来消耗。生产者也是从空白的buffer开始,然后创建10个字符串,保存到buffer,并使用exchanger与消费者同步。
在这儿,2个线程(生产者和消费者线程)都是在Exchanger里并交换了数据类型,所以当消费者从exchange() 方法返回时,它有10个字符串在buffer内。当生产者从 exchange() 方法返回时,它有空白的buffer来重新写入。这样的操作会重复10遍。
如你执行例子,你会发现生产者和消费者是如何并发的执行任务和在每个步骤它们是如何交换buffers的。与其他同步工具一样会发生这种情况,第一个调用 exchange()方法会进入休眠直到其他线程的达到。
更多…
Exchanger 类有另外一个版本的exchange方法:
- exchange(V data, long time, TimeUnit unit):V是声明Phaser的参数种类(例子里是 List)。 此线程会休眠直到另一个线程到达并中断它,或者特定的时间过去了。TimeUnit类有多种常量:DAYS, HOURS, MICROSECONDS, MILLISECONDS, MINUTES, NANOSECONDS, 和 SECONDS。
- 文章转自 并发编程网-ifeve.com