​Java并发新构件之Exchanger

简介:

    Exchanger是在两个任务之间交换对象的栅栏。当两个任务进入栅栏时,它们各自拥有一个对象,当它们离开时,它们都拥有对方的对象。Exchanger的典型应用场景是:一个任务在创建对象,而这些对象的生产代价很高,另一个任务在消费这些对象。通过这种方式,可以有更多的对象在被创建的同时被消费。

    为了演示Exchanger类,我们将创建生产者和消费者任务。ExchangerProducer和ExchangerConsumer使用一个List<Fat>作为要求交换的对象,它们都包含一个用于这个List<Fat>的Exchanger。当你调用Exchanger.exchange()方法时,它将阻塞直至对方任务调用它自己的exchange()方法,那时,这两个exchange()方法将同时完成,而List<Fat>被交换:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import  java.util.List;
import  java.util.concurrent.CopyOnWriteArrayList;
import  java.util.concurrent.Exchanger;
import  java.util.concurrent.ExecutorService;
import  java.util.concurrent.Executors;
import  java.util.concurrent.TimeUnit;
 
class  ExchangerProducer  implements  Runnable {
     private  List<Fat> holder;
     private  Exchanger<List<Fat>> exchanger;
     public  ExchangerProducer(Exchanger<List<Fat>> exchanger, List<Fat> holder) {
         this .exchanger = exchanger;
         this .holder = holder;
     }
     @Override
     public  void  run() {
         try  {
             while (!Thread.interrupted()) {
                 //填充列表
                 for  ( int  i =  0 ;i < ExchangerDemo.size; i++) {
                     holder.add( new  Fat());
                 }
                 //等待交换
                 holder = exchanger.exchange(holder);
             }
         catch  (InterruptedException e) {
         }
         System.out.println( "Producer stopped." );
     }
}
 
class  ExchangerConsumer  implements  Runnable {
     private  List<Fat> holder;
     private  Exchanger<List<Fat>> exchanger;
     private  volatile  Fat value;
     private  static  int  num =  0 ;
     public  ExchangerConsumer(Exchanger<List<Fat>> exchanger, List<Fat> holder) {
         this .exchanger = exchanger;
         this .holder = holder;
     }
     @Override
     public  void  run() {
         try  {
             while (!Thread.interrupted()) {
                 //等待交换
                 holder = exchanger.exchange(holder);
                 //读取列表并移除元素
                 for  (Fat x : holder) {
                     num++;
                     value = x;
                     //在循环内删除元素,这对于CopyOnWriteArrayList是没有问题的
                     holder.remove(x);
                 }
                 if  (num %  10000  ==  0 ) {
                     System.out.println( "Exchanged count="  + num);
                 }
             }
         catch  (InterruptedException e) {
             
         }
         System.out.println( "Consumer stopped. Final value: "  + value);
     }
}
 
public  class  ExchangerDemo {
     static  int  size =  10 ;
     static  int  delay =  5 //秒
     public  static  void  main(String[] args)  throws  Exception {
         ExecutorService exec = Executors.newCachedThreadPool();
         List<Fat> producerList =  new  CopyOnWriteArrayList<>();
         List<Fat> consumerList =  new  CopyOnWriteArrayList<>();
         Exchanger<List<Fat>> exchanger =  new  Exchanger<>();
         exec.execute( new  ExchangerProducer(exchanger, producerList));
         exec.execute( new  ExchangerConsumer(exchanger, consumerList));
         TimeUnit.SECONDS.sleep(delay);
         exec.shutdownNow();
     }
}
 
class  Fat {
     private  volatile  double  d;
     private  static  int  counter =  1 ;
     private  final  int  id = counter++;
     public  Fat() {
         //执行一段耗时的操作
         for  ( int  i =  1 ; i< 10000 ; i++) {
             d += (Math.PI + Math.E) / ( double )i;
         }
     }
     public  void  print() {System.out.println( this );}
     public  String toString() { return  "Fat id="  + id;}
}

执行结果(可能的结果):

?
1
2
3
4
5
6
7
8
9
10
Exchanged count= 10000
Exchanged count= 20000
Exchanged count= 30000
Exchanged count= 40000
Exchanged count= 50000
Exchanged count= 60000
Exchanged count= 70000
Exchanged count= 80000
Consumer stopped. Final value: Fat id= 88300
Producer stopped.

    在main()中,创建了用于两个任务的单一的Exchanger,以及两个用于互换的CopyOnWriteArrayList。这个特定的List变体允许列表在被遍历的时候调用remove()方法,而不会抛出ConcurrentModifiedException异常。ExchangerProducer将填充这个List,然后将这个满列表跟ExchangerConsumer的空列表交换。交换之后,ExchangerProducer可以继续的生产Fat对象,而ExchangerConsumer则开始使用满列表中的对象。因为有了Exchanger,填充一个列表和消费另一个列表便同时发生了。

目录
相关文章
|
5月前
|
Java 大数据 Go
从混沌到秩序:Java共享内存模型如何通过显式约束驯服并发?
并发编程旨在混乱中建立秩序。本文对比Java共享内存模型与Golang消息传递模型,剖析显式同步与隐式因果的哲学差异,揭示happens-before等机制如何保障内存可见性与数据一致性,展现两大范式的深层分野。(238字)
168 4
|
5月前
|
缓存 安全 Java
如何理解Java中的并发?
Java并发指多任务交替执行,提升资源利用率与响应速度。通过线程实现,涉及线程安全、可见性、原子性等问题,需用synchronized、volatile、线程池及并发工具类解决,是高并发系统开发的关键基础。(238字)
326 5
|
8月前
|
SQL 缓存 安全
深度理解 Java 内存模型:从并发基石到实践应用
本文深入解析 Java 内存模型(JMM),涵盖其在并发编程中的核心作用与实践应用。内容包括 JMM 解决的可见性、原子性和有序性问题,线程与内存的交互机制,volatile、synchronized 和 happens-before 等关键机制的使用,以及在单例模式、线程通信等场景中的实战案例。同时,还介绍了常见并发 Bug 的排查与解决方案,帮助开发者写出高效、线程安全的 Java 程序。
450 0
|
8月前
|
Java API 调度
从阻塞到畅通:Java虚拟线程开启并发新纪元
从阻塞到畅通:Java虚拟线程开启并发新纪元
429 83
|
8月前
|
存储 Java 调度
Java虚拟线程:轻量级并发的革命性突破
Java虚拟线程:轻量级并发的革命性突破
467 83
|
9月前
|
Java 物联网 数据处理
Java Solon v3.2.0 史上最强性能优化版本发布 并发能力提升 700% 内存占用节省 50%
Java Solon v3.2.0 是一款性能卓越的后端开发框架,新版本并发性能提升700%,内存占用节省50%。本文将从核心特性(如事件驱动模型与内存优化)、技术方案示例(Web应用搭建与数据库集成)到实际应用案例(电商平台与物联网平台)全面解析其优势与使用方法。通过简单代码示例和真实场景展示,帮助开发者快速掌握并应用于项目中,大幅提升系统性能与资源利用率。
259 6
Java Solon v3.2.0 史上最强性能优化版本发布 并发能力提升 700% 内存占用节省 50%
|
5月前
|
JSON 网络协议 安全
【Java】(10)进程与线程的关系、Tread类;讲解基本线程安全、网络编程内容;JSON序列化与反序列化
几乎所有的操作系统都支持进程的概念,进程是处于运行过程中的程序,并且具有一定的独立功能,进程是系统进行资源分配和调度的一个独立单位一般而言,进程包含如下三个特征。独立性动态性并发性。
277 1
|
5月前
|
JSON 网络协议 安全
【Java基础】(1)进程与线程的关系、Tread类;讲解基本线程安全、网络编程内容;JSON序列化与反序列化
几乎所有的操作系统都支持进程的概念,进程是处于运行过程中的程序,并且具有一定的独立功能,进程是系统进行资源分配和调度的一个独立单位一般而言,进程包含如下三个特征。独立性动态性并发性。
296 1
|
6月前
|
数据采集 存储 弹性计算
高并发Java爬虫的瓶颈分析与动态线程优化方案
高并发Java爬虫的瓶颈分析与动态线程优化方案