线程同步工具(七)在并发任务间交换数据

简介:

声明:本文是《 Java 7 Concurrency Cookbook 》的第三章, 作者: Javier Fernández González 译者:郑玉婷

在并发任务间交换数据

Java 并发 API 提供了一种允许2个并发任务间相互交换数据的同步应用。更具体的说,Exchanger 类允许在2个线程间定义同步点,当2个线程到达这个点,他们相互交换数据类型,使用第一个线程的数据类型变成第二个的,然后第二个线程的数据类型变成第一个的。

这个类在遇到类似生产者和消费者问题时,是非常有用的。来一个非常经典的并发问题:你有相同的数据buffer,一个或多个数据生产者,和一个或多个数据消费者。只是Exchange类只能同步2个线程,所以你只能在你的生产者和消费者问题中只有一个生产者和一个消费者时使用这个类。

在这个指南,你将学习如何使用 Exchanger 类来解决只有一个生产者和一个消费者的生产者和消费者问题。

准备

这个指南的例子使用Eclipse IDE实现。如果你使用Eclipse或其他IDE,如NetBeans,打开它并创建一个新的Java项目。

怎么做呢

按照这些步骤来实现下面的例子:

01 package tool;
02 import java.util.List;
03 import java.util.concurrent.Exchanger;
04  
05 //1. 首先,从实现producer开始吧。创建一个类名为Producer并一定实现 Runnable 接口。
06 public class Producer implements Runnable {
07  
08 // 2. 声明 List<String>对象,名为 buffer。这是等等要被相互交换的数据类型。
09 private List<String> buffer;
10  
11 // 3. 声明 Exchanger<List<String>>; 对象,名为exchanger。这个 exchanger 对象是用来同步producer和consumer的。
12 private final Exchanger<List<String>> exchanger;
13  
14 // 4. 实现类的构造函数,初始化这2个属性。
15 public Producer(List<String> buffer, Exchanger<List<String>> exchanger) {
16 this.buffer = buffer;
17 this.exchanger = exchanger;
18 }
19  
20 // 5. 实现 run() 方法. 在方法内,实现10次交换。
21 @Override
22 public void run() {
23 int cycle = 1;
24 for (int i = 0; i < 10; i++) {           System.out.printf("Producer: Cycle %d\n", cycle);
25  
26 // 6. 在每次循环中,加10个字符串到buffer。
27 for (int j = 0; j <10; j++) {
28 String message = "Event " + ((i * 10) + j);
29 System.out.printf("Producer: %s\n", message);
30 buffer.add(message);
31 }
32  
33 // 7. 调用 exchange() 方法来与consumer交换数据。此方法可能会抛出InterruptedException 异常, 加上处理代码。
34 try {
35 buffer = exchanger.exchange(buffer);
36 catch (InterruptedException e) {
37 e.printStackTrace();
38 }
39 System.out.println("Producer: " + buffer.size());
40 cycle++;
41 }
42 }
43 }
01 //8. 现在, 来实现consumer。创建一个类名为Consumer并一定实现 Runnable 接口。
02 package tool;
03 import java.util.List;
04 import java.util.concurrent.Exchanger;
05 public class Consumer implements Runnable {
06  
07 // 9. 声明名为buffer的 List<String>对象。这个对象类型是用来相互交换的。
08 private List<String> buffer;
09  
10 // 10. 声明一个名为exchanger的 Exchanger<List<String>> 对象。用来同步 producer和consumer。
11 private final Exchanger<List<String>> exchanger;
12  
13 // 11. 实现类的构造函数,并初始化2个属性。
14 public Consumer(List<String>buffer, Exchanger<List<String>> exchanger) {
15 this.buffer = buffer;
16 this.exchanger = exchanger;
17 }
18  
19 // 12. 实现 run() 方法。在方法内,实现10次交换。
20 @Override
21 public void run() {
22 int cycle = 1;
23 for (int i = 0; i < 10; i++) {
24 System.out.printf("Consumer: Cycle %d\n", cycle);
25  
26 // 13. 在每次循环,首先调用exchange()方法来与producer同步。Consumer需要消耗数据。此方法可能会抛出InterruptedException异常, 加上处理代码。
27 try {
28 buffer = exchanger.exchange(buffer);
29 catch (InterruptedException e) {              e.printStackTrace();
30 }
31  
32 // 14. 把producer发来的在buffer里的10字符串写到操控台并从buffer内删除,留空。System.out.println("Consumer: " + buffer.size());
33 for (int j = 0; j <10; j++) {
34 String message = buffer.get(0);
35 System.out.println("Consumer: " + message);
36 buffer.remove(0);
37 }
38 cycle++;
39 }

 

01 //15.现在,实现例子的主类通过创建一个类,名为Core并加入 main() 方法。
02 package tool;
03 import java.util.ArrayList;
04 mport java.util.List;
05 import java.util.concurrent.Exchanger;
06  
07 public class Core {
08 public static void main(String[] args) {
09  
10 // 16. 创建2个buffers。分别给producer和consumer使用.
11 List<String> buffer1 = new ArrayList<String>();
12 List<String> buffer2 = new ArrayList<String>();
13  
14 // 17. 创建Exchanger对象,用来同步producer和consumer。
15 Exchanger<List<String>> exchanger = new Exchanger<List<String>>();
16  
17 // 18. 创建Producer对象和Consumer对象。
18 Producer producer = new Producer(buffer1, exchanger);
19 Consumer consumer = new Consumer(buffer2, exchanger);
20  
21 // 19. 创建线程来执行producer和consumer并开始线程。
22 Thread threadProducer = new Thread(producer);
23 Thread threadConsumer = new Thread(consumer); threadProducer.start();
24 threadConsumer.start();
25 }

它是怎么工作的…

消费者开始时是空白的buffer,然后调用Exchanger来与生产者同步。因为它需要数据来消耗。生产者也是从空白的buffer开始,然后创建10个字符串,保存到buffer,并使用exchanger与消费者同步。

在这儿,2个线程(生产者和消费者线程)都是在Exchanger里并交换了数据类型,所以当消费者从exchange() 方法返回时,它有10个字符串在buffer内。当生产者从 exchange() 方法返回时,它有空白的buffer来重新写入。这样的操作会重复10遍。

如你执行例子,你会发现生产者和消费者是如何并发的执行任务和在每个步骤它们是如何交换buffers的。与其他同步工具一样会发生这种情况,第一个调用 exchange()方法会进入休眠直到其他线程的达到。

更多…

Exchanger 类有另外一个版本的exchange方法:

  • exchange(V data, long time, TimeUnit unit):V是声明Phaser的参数种类(例子里是 List)。 此线程会休眠直到另一个线程到达并中断它,或者特定的时间过去了。TimeUnit类有多种常量:DAYS, HOURS, MICROSECONDS, MILLISECONDS, MINUTES, NANOSECONDS, 和 SECONDS。
  • 文章转自 并发编程网-ifeve.com
目录
相关文章
|
1月前
|
存储 Java 数据库
如何处理线程池关闭时未完成的任务?
总之,处理线程池关闭时未完成的任务需要综合考虑多种因素,并根据实际情况选择合适的处理方式。通过合理的处理,可以最大程度地减少任务丢失和数据不一致等问题,确保系统的稳定运行和业务的顺利开展。
119 64
|
1月前
|
消息中间件 监控 Java
线程池关闭时未完成的任务如何保证数据的一致性?
保证线程池关闭时未完成任务的数据一致性需要综合运用多种方法和机制。通过备份与恢复、事务管理、任务状态记录与恢复、数据同步与协调、错误处理与补偿、监控与预警等手段的结合,以及结合具体业务场景进行分析和制定策略,能够最大程度地确保数据的一致性,保障系统的稳定运行和业务的顺利开展。同时,不断地优化和改进这些方法和机制,也是提高系统性能和可靠性的重要途径。
119 62
|
25天前
|
缓存 监控 Java
Java线程池提交任务流程底层源码与源码解析
【11月更文挑战第30天】嘿,各位技术爱好者们,今天咱们来聊聊Java线程池提交任务的底层源码与源码解析。作为一个资深的Java开发者,我相信你一定对线程池并不陌生。线程池作为并发编程中的一大利器,其重要性不言而喻。今天,我将以对话的方式,带你一步步深入线程池的奥秘,从概述到功能点,再到背景和业务点,最后到底层原理和示例,让你对线程池有一个全新的认识。
53 12
|
1月前
|
安全 Java
线程安全的艺术:确保并发程序的正确性
在多线程环境中,确保线程安全是编程中的一个核心挑战。线程安全问题可能导致数据不一致、程序崩溃甚至安全漏洞。本文将分享如何确保线程安全,探讨不同的技术策略和最佳实践。
41 6
|
1月前
|
安全 Java 开发者
Java 多线程并发控制:深入理解与实战应用
《Java多线程并发控制:深入理解与实战应用》一书详细解析了Java多线程编程的核心概念、并发控制技术及其实战技巧,适合Java开发者深入学习和实践参考。
58 6
|
1月前
|
存储 安全 Java
Java多线程编程中的并发容器:深入解析与实战应用####
在本文中,我们将探讨Java多线程编程中的一个核心话题——并发容器。不同于传统单一线程环境下的数据结构,并发容器专为多线程场景设计,确保数据访问的线程安全性和高效性。我们将从基础概念出发,逐步深入到`java.util.concurrent`包下的核心并发容器实现,如`ConcurrentHashMap`、`CopyOnWriteArrayList`以及`BlockingQueue`等,通过实例代码演示其使用方法,并分析它们背后的设计原理与适用场景。无论你是Java并发编程的初学者还是希望深化理解的开发者,本文都将为你提供有价值的见解与实践指导。 --- ####
|
3天前
|
NoSQL Redis
单线程传奇Redis,为何引入多线程?
Redis 4.0 引入多线程支持,主要用于后台对象删除、处理阻塞命令和网络 I/O 等操作,以提高并发性和性能。尽管如此,Redis 仍保留单线程执行模型处理客户端请求,确保高效性和简单性。多线程仅用于优化后台任务,如异步删除过期对象和分担读写操作,从而提升整体性能。
12 1
|
2月前
|
存储 消息中间件 资源调度
C++ 多线程之初识多线程
这篇文章介绍了C++多线程的基本概念,包括进程和线程的定义、并发的实现方式,以及如何在C++中创建和管理线程,包括使用`std::thread`库、线程的join和detach方法,并通过示例代码展示了如何创建和使用多线程。
60 1
|
2月前
|
Java 开发者
在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口
【10月更文挑战第20天】在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口。本文揭示了这两种方式的微妙差异和潜在陷阱,帮助你更好地理解和选择适合项目需求的线程创建方式。
32 3
|
2月前
|
Java 开发者
在Java多线程编程中,选择合适的线程创建方法至关重要
【10月更文挑战第20天】在Java多线程编程中,选择合适的线程创建方法至关重要。本文通过案例分析,探讨了继承Thread类和实现Runnable接口两种方法的优缺点及适用场景,帮助开发者做出明智的选择。
25 2