​Java并发新构件之Exchanger

简介:

    Exchanger是在两个任务之间交换对象的栅栏。当两个任务进入栅栏时,它们各自拥有一个对象,当它们离开时,它们都拥有对方的对象。Exchanger的典型应用场景是:一个任务在创建对象,而这些对象的生产代价很高,另一个任务在消费这些对象。通过这种方式,可以有更多的对象在被创建的同时被消费。

    为了演示Exchanger类,我们将创建生产者和消费者任务。ExchangerProducer和ExchangerConsumer使用一个List<Fat>作为要求交换的对象,它们都包含一个用于这个List<Fat>的Exchanger。当你调用Exchanger.exchange()方法时,它将阻塞直至对方任务调用它自己的exchange()方法,那时,这两个exchange()方法将同时完成,而List<Fat>被交换:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import  java.util.List;
import  java.util.concurrent.CopyOnWriteArrayList;
import  java.util.concurrent.Exchanger;
import  java.util.concurrent.ExecutorService;
import  java.util.concurrent.Executors;
import  java.util.concurrent.TimeUnit;
 
class  ExchangerProducer  implements  Runnable {
     private  List<Fat> holder;
     private  Exchanger<List<Fat>> exchanger;
     public  ExchangerProducer(Exchanger<List<Fat>> exchanger, List<Fat> holder) {
         this .exchanger = exchanger;
         this .holder = holder;
     }
     @Override
     public  void  run() {
         try  {
             while (!Thread.interrupted()) {
                 //填充列表
                 for  ( int  i =  0 ;i < ExchangerDemo.size; i++) {
                     holder.add( new  Fat());
                 }
                 //等待交换
                 holder = exchanger.exchange(holder);
             }
         catch  (InterruptedException e) {
         }
         System.out.println( "Producer stopped." );
     }
}
 
class  ExchangerConsumer  implements  Runnable {
     private  List<Fat> holder;
     private  Exchanger<List<Fat>> exchanger;
     private  volatile  Fat value;
     private  static  int  num =  0 ;
     public  ExchangerConsumer(Exchanger<List<Fat>> exchanger, List<Fat> holder) {
         this .exchanger = exchanger;
         this .holder = holder;
     }
     @Override
     public  void  run() {
         try  {
             while (!Thread.interrupted()) {
                 //等待交换
                 holder = exchanger.exchange(holder);
                 //读取列表并移除元素
                 for  (Fat x : holder) {
                     num++;
                     value = x;
                     //在循环内删除元素,这对于CopyOnWriteArrayList是没有问题的
                     holder.remove(x);
                 }
                 if  (num %  10000  ==  0 ) {
                     System.out.println( "Exchanged count="  + num);
                 }
             }
         catch  (InterruptedException e) {
             
         }
         System.out.println( "Consumer stopped. Final value: "  + value);
     }
}
 
public  class  ExchangerDemo {
     static  int  size =  10 ;
     static  int  delay =  5 //秒
     public  static  void  main(String[] args)  throws  Exception {
         ExecutorService exec = Executors.newCachedThreadPool();
         List<Fat> producerList =  new  CopyOnWriteArrayList<>();
         List<Fat> consumerList =  new  CopyOnWriteArrayList<>();
         Exchanger<List<Fat>> exchanger =  new  Exchanger<>();
         exec.execute( new  ExchangerProducer(exchanger, producerList));
         exec.execute( new  ExchangerConsumer(exchanger, consumerList));
         TimeUnit.SECONDS.sleep(delay);
         exec.shutdownNow();
     }
}
 
class  Fat {
     private  volatile  double  d;
     private  static  int  counter =  1 ;
     private  final  int  id = counter++;
     public  Fat() {
         //执行一段耗时的操作
         for  ( int  i =  1 ; i< 10000 ; i++) {
             d += (Math.PI + Math.E) / ( double )i;
         }
     }
     public  void  print() {System.out.println( this );}
     public  String toString() { return  "Fat id="  + id;}
}

执行结果(可能的结果):

?
1
2
3
4
5
6
7
8
9
10
Exchanged count= 10000
Exchanged count= 20000
Exchanged count= 30000
Exchanged count= 40000
Exchanged count= 50000
Exchanged count= 60000
Exchanged count= 70000
Exchanged count= 80000
Consumer stopped. Final value: Fat id= 88300
Producer stopped.

    在main()中,创建了用于两个任务的单一的Exchanger,以及两个用于互换的CopyOnWriteArrayList。这个特定的List变体允许列表在被遍历的时候调用remove()方法,而不会抛出ConcurrentModifiedException异常。ExchangerProducer将填充这个List,然后将这个满列表跟ExchangerConsumer的空列表交换。交换之后,ExchangerProducer可以继续的生产Fat对象,而ExchangerConsumer则开始使用满列表中的对象。因为有了Exchanger,填充一个列表和消费另一个列表便同时发生了。

目录
相关文章
|
1月前
|
存储 安全 Java
Java多线程编程中的并发容器:深入解析与实战应用####
在本文中,我们将探讨Java多线程编程中的一个核心话题——并发容器。不同于传统单一线程环境下的数据结构,并发容器专为多线程场景设计,确保数据访问的线程安全性和高效性。我们将从基础概念出发,逐步深入到`java.util.concurrent`包下的核心并发容器实现,如`ConcurrentHashMap`、`CopyOnWriteArrayList`以及`BlockingQueue`等,通过实例代码演示其使用方法,并分析它们背后的设计原理与适用场景。无论你是Java并发编程的初学者还是希望深化理解的开发者,本文都将为你提供有价值的见解与实践指导。 --- ####
|
1月前
|
存储 设计模式 分布式计算
Java中的多线程编程:并发与并行的深度解析####
在当今软件开发领域,多线程编程已成为提升应用性能、响应速度及资源利用率的关键手段之一。本文将深入探讨Java平台上的多线程机制,从基础概念到高级应用,全面解析并发与并行编程的核心理念、实现方式及其在实际项目中的应用策略。不同于常规摘要的简洁概述,本文旨在通过详尽的技术剖析,为读者构建一个系统化的多线程知识框架,辅以生动实例,让抽象概念具体化,复杂问题简单化。 ####
|
1月前
|
Java 数据库连接 数据库
如何构建高效稳定的Java数据库连接池,涵盖连接池配置、并发控制和异常处理等方面
本文介绍了如何构建高效稳定的Java数据库连接池,涵盖连接池配置、并发控制和异常处理等方面。通过合理配置初始连接数、最大连接数和空闲连接超时时间,确保系统性能和稳定性。文章还探讨了同步阻塞、异步回调和信号量等并发控制策略,并提供了异常处理的最佳实践。最后,给出了一个简单的连接池示例代码,并推荐使用成熟的连接池框架(如HikariCP、C3P0)以简化开发。
56 2
|
2月前
|
Java
【编程进阶知识】揭秘Java多线程:并发与顺序编程的奥秘
本文介绍了Java多线程编程的基础,通过对比顺序执行和并发执行的方式,展示了如何使用`run`方法和`start`方法来控制线程的执行模式。文章通过具体示例详细解析了两者的异同及应用场景,帮助读者更好地理解和运用多线程技术。
36 1
|
3月前
|
Java API 容器
JAVA并发编程系列(10)Condition条件队列-并发协作者
本文通过一线大厂面试真题,模拟消费者-生产者的场景,通过简洁的代码演示,帮助读者快速理解并复用。文章还详细解释了Condition与Object.wait()、notify()的区别,并探讨了Condition的核心原理及其实现机制。
|
4月前
|
存储 Java
Java 中 ConcurrentHashMap 的并发级别
【8月更文挑战第22天】
66 5
|
4月前
|
缓存 Java 调度
【Java 并发秘籍】线程池大作战:揭秘 JDK 中的线程池家族!
【8月更文挑战第24天】Java的并发库提供多种线程池以应对不同的多线程编程需求。本文通过实例介绍了四种主要线程池:固定大小线程池、可缓存线程池、单一线程线程池及定时任务线程池。固定大小线程池通过预设线程数管理任务队列;可缓存线程池能根据需要动态调整线程数量;单一线程线程池确保任务顺序执行;定时任务线程池支持周期性或延时任务调度。了解并正确选用这些线程池有助于提高程序效率和资源利用率。
63 2
|
7月前
|
数据可视化 Java 测试技术
Java 编程问题:十一、并发-深入探索1
Java 编程问题:十一、并发-深入探索
80 0
|
4月前
|
安全 Java 调度
解锁Java并发编程高阶技能:深入剖析无锁CAS机制、揭秘魔法类Unsafe、精通原子包Atomic,打造高效并发应用
【8月更文挑战第4天】在Java并发编程中,无锁编程以高性能和低延迟应对高并发挑战。核心在于无锁CAS(Compare-And-Swap)机制,它基于硬件支持,确保原子性更新;Unsafe类提供底层内存操作,实现CAS;原子包java.util.concurrent.atomic封装了CAS操作,简化并发编程。通过`AtomicInteger`示例,展现了线程安全的自增操作,突显了这些技术在构建高效并发程序中的关键作用。
77 1
|
5月前
|
安全 Java 开发者
Java并发编程:理解并发安全与性能优化
在当今软件开发中,Java作为一种广泛使用的编程语言,其并发编程能力显得尤为重要。本文深入探讨了Java中的并发编程,包括如何确保并发安全性以及优化并发程序的性能。通过分析常见的并发问题和解决方案,读者将能够更好地理解如何利用Java的并发工具包来构建可靠和高效的多线程应用程序。 【7月更文挑战第10天】
66 3