线程同步工具(五)运行阶段性并发任务

简介:

声明:本文是《 Java 7 Concurrency Cookbook 》的第三章, 作者: Javier Fernández González 译者:郑玉婷

运行阶段性并发任务

Java 并发 API 提供的一个非常复杂且强大的功能是,能够使用Phaser类运行阶段性的并发任务。当某些并发任务是分成多个步骤来执行时,那么此机制是非常有用的。Phaser类提供的机制是在每个步骤的结尾同步线程,所以除非全部线程完成第一个步骤,否则线程不能开始进行第二步。

相对于其他同步应用,我们必须初始化Phaser类与这次同步操作有关的任务数,我们可以通过增加或者减少来不断的改变这个数。

在这个指南,你将学习如果使用Phaser类来同步3个并发任务。这3个任务会在3个不同的文件夹和它们的子文件夹中搜索扩展名是.log并在24小时内修改过的文件。这个任务被分成3个步骤:

1. 在指定的文件夹和子文件夹中获得文件扩展名为.log的文件列表。
2. 过滤第一步的列表中修改超过24小时的文件。
3. 在操控台打印结果。

在步骤1和步骤2的结尾我们要检查列表是否为空。如果为空,那么线程直接结束运行并从phaser类中淘汰。

准备

指南中的例子是使用Eclipse IDE 来实现的。如果你使用Eclipse 或者其他的IDE,例如NetBeans, 打开并创建一个新的java任务。

怎么做呢

按照这些步骤来实现下面的例子:

001 import java.io.File;
002 import java.util.ArrayList;
003 import java.util.Date;
004 import java.util.List;
005 import java.util.concurrent.Phaser;
006 import java.util.concurrent.TimeUnit;
007  
008 //1.   创建一个类名为FileSearch并一定实现Runnable 接口。这个类实现的操作是在文件夹和其子文件夹中搜索确定的扩展名并在24小时内修改的文件。
009 public class FileSearch implements Runnable {
010  
011     // 2. 声明一个私有 String 属性来储存搜索开始的时候的文件夹。
012     private String initPath;
013  
014     // 3. 声明另一个私有 String 属性来储存我们要寻找的文件的扩展名。
015     private String end;
016  
017     // 4. 声明一个私有 List 属性来储存我们找到的符合条件的文件的路径。
018     private List<String> results;
019  
020     // 5. 最后,声明一个私有 Phaser 属性来控制任务的不同phaser的同步。
021     private Phaser phaser;
022  
023     // 6. 实现类的构造函数,初始化类的属性们。它接收初始文件夹的路径,文件的扩展名,和phaser 作为参数。
024     public FileSearch(String initPath, String end, Phaser phaser) {
025         this.initPath = initPath;
026         this.end = end;
027         this.phaser = phaser;
028         results = new ArrayList<String>();
029     }
030  
031     // 7. 现在,你必须实现一些要给run() 方法用的辅助方法。第一个是 directoryProcess()
032     // 方法。它接收File对象作为参数并处理全部的文件和子文件夹。对于每个文件夹,此方法会递归调用并传递文件夹作为参数。对于每个文件,此方法会调用fileProcess()
033     // 方法。
034     private void directoryProcess(File file) {
035  
036         File list[] = file.listFiles();
037         if (list != null) {
038             for (int i = 0; i < list.length; i++) {
039  
040                 if (list[i].isDirectory()) {
041                     directoryProcess(list[i]);
042                 } else {
043                     fileProcess(list[i]);
044                 }
045             }
046         }
047     }
048  
049     // 8. 现在,实现 fileProcess() 方法。它接收 File
050     // 对象作为参数并检查它的扩展名是否是我们正在查找的。如果是,此方法会把文件的绝对路径写入结果列表内。
051     private void fileProcess(File file) {
052         if (file.getName().endsWith(end)) {
053             results.add(file.getAbsolutePath());
054         }
055     }
056  
057     // 9. 现在,实现 filterResults()
058     // 方法。不接收任何参数。它过滤在第一阶段获得的文件列表,并删除修改超过24小时的文件。首先,创建一个新的空list并获得当前时间。
059     private void filterResults() {
060         List<String> newResults = new ArrayList<String>();
061         long actualDate = new Date().getTime();
062  
063         // 10. 然后,浏览结果list里的所有元素。对于每个路径,为文件创建File对象 go through all the elements
064         // of the results list. For each path in the list of results, create a
065         // File object for that file and get the last modified date for it.
066         for (int i = 0; i < results.size(); i++) {
067             File file = new File(results.get(i));
068             long fileDate = file.lastModified();
069  
070             // 11. 然后, 对比与真实日期对比,如果相差小于一天,把文件的路径加入到新的结果列表。
071             if (actualDate - fileDate < TimeUnit.MILLISECONDS.convert(1,
072                     TimeUnit.DAYS)) {
073                 newResults.add(results.get(i));
074             }
075         }
076  
077         // 12. 最后,把旧的结果改为新的。
078         results = newResults;
079     }
080  
081     // 13. 现在,实现 checkResults() 方法。此方法在第一个和第二个phase的结尾被调用,并检查结果是否为空。此方法不接收任何参数。
082     private boolean checkResults() {
083  
084         // 14. 首先,检查结果List的大小。如果为0,对象写信息到操控台表明情况,然后调用Phaser对象的
085         // arriveAndDeregister() 方法通知此线程已经结束actual phase,并离开phased操作。
086         if (results.isEmpty()) {
087             System.out.printf("%s: Phase %d: 0 results.\n", Thread
088                     .currentThread().getName(), phaser.getPhase());
089             System.out.printf("%s: Phase %d: End.\n", Thread.currentThread()
090                     .getName(), phaser.getPhase());
091             phaser.arriveAndDeregister();
092             return false;
093  
094             // 15. 另一种情况,如果结果list有元素,那么对象写信息到操控台表明情况,调用 Phaser对象懂得
095             // arriveAndAwaitAdvance() 方法并通知 actual phase,它会被阻塞直到phased
096             // 操作的全部参与线程结束actual phase。
097  
098         } else {
099             System.out.printf("%s: Phase %d: %d results.\n", Thread
100                     .currentThread().getName(), phaser.getPhase(), results
101                     .size());
102             phaser.arriveAndAwaitAdvance();
103             return true;
104         }
105     }
106  
107     // 16. 最好一个辅助方法是 showInfo() 方法,打印results list 的元素到操控台。
108     private void showInfo() {
109         for (int i = 0; i < results.size(); i++) {
110             File file = new File(results.get(i));
111             System.out.printf("%s: %s\n", Thread.currentThread().getName(),
112                     file.getAbsolutePath());
113         }
114         phaser.arriveAndAwaitAdvance();
115     }
116  
117     // 17. 现在,来实现 run() 方法,使用之前描述的辅助方法来执行,并使用Phaser对象控制phases间的改变。首先,调用phaser对象的
118     // arriveAndAwaitAdvance() 方法。直到使用线程被创建完成,搜索行为才会开始。
119     @Override
120     public void run() {
121  
122         phaser.arriveAndAwaitAdvance();
123  
124         // 18. 然后,写信息到操控台表明搜索任务开始。
125  
126         System.out.printf("%s: Starting.\n", Thread.currentThread().getName());
127  
128         // 19. 查看 initPath 属性储存的文件夹名字并使用 directoryProcess()
129         // 方法在文件夹和其子文件夹内查找带特殊扩展名的文件。
130         File file = new File(initPath);
131         if (file.isDirectory()) {
132             directoryProcess(file);
133         }
134  
135         // 20. 使用 checkResults() 方法检查是否有结果。如果没有任何结果,结束线程的执行并返回keyword。
136         if (!checkResults()) {
137             return;
138         }
139  
140         // 21. 使用filterResults() 方法过滤结果list。
141         filterResults();
142  
143         // 22. 再次使用checkResults() 方法检查是否有结果。如果没有,结束线程的执行并返回keyword。
144         if (!checkResults()) {
145             return;
146         }
147  
148         // 23. 使用 showInfo() 方法打印最终的结果list到操控台,撤销线程的登记,并打印信息表明线程的终结。
149         showInfo();
150         phaser.arriveAndDeregister();
151         System.out.printf("%s: Work completed.\n", Thread.currentThread()
152                 .getName());
153  
154     }
155 }
156  
157 // 24. 现在,实现例子的main 类通过创建类名为 Main 并为其添加 main() 方法。
158  
159 class Main {
160  
161     public static void main(String[] args) {
162  
163         // 25. 创建 含3个参与者的 Phaser 对象。
164         Phaser phaser = new Phaser(3);
165  
166         // 26. 创建3个 FileSearch 对象,每个在不同的初始文件夹里搜索.log扩展名的文件。
167         FileSearch system = new FileSearch("C:\\Windows", "log", phaser);
168         FileSearch apps = new FileSearch("C:\\Program Files", "log", phaser);
169         FileSearch documents = new FileSearch("C:\\Documents And Settings",
170                 "log", phaser);
171  
172         // 27. 创建并开始一个线程来执行第一个 FileSearch 对象。
173         Thread systemThread = new Thread(system, "System");
174         systemThread.start();
175  
176         // 28. 创建并开始一个线程来执行第二个 FileSearch 对象。
177         Thread appsThread = new Thread(apps, "Apps");
178         appsThread.start();
179  
180         // 29. 创建并开始一个线程来执行第三个 FileSearch 对象。
181         Thread documentsThread = new Thread(documents, "Documents");
182         documentsThread.start();
183  
184         // 30. 等待3个线程们的终结。
185  
186         try {
187             systemThread.join();
188             appsThread.join();
189             documentsThread.join();
190         } catch (InterruptedException e) {
191             e.printStackTrace();
192         }
193  
194         // 31. 使用isFinalized()方法把Phaser对象的结束标志值写入操控台。
195         System.out.println("Terminated: " + phaser.isTerminated());
196     }
197 }

它是怎么工作的…

这程序开始创建的 Phaser 对象是用来在每个phase的末端控制线程的同步。Phaser的构造函数接收参与者的数量作为参数。在这里,Phaser有3个参与者。这个数向Phaser表示 Phaser改变phase之前执行 arriveAndAwaitAdvance() 方法的线程数,并叫醒正在休眠的线程。

一旦Phaser被创建,我们运行3个线程分别执行3个不同的FileSearch对象。

在例子里,我们使用 Windows operating system 的路径。如果你使用的是其他操作系统,那么修改成适应你的环境的路径。

FileSearch对象的 run() 方法中的第一个指令是调用Phaser对象的 arriveAndAwaitAdvance() 方法。像之前提到的,Phaser知道我们要同步的线程的数量。当某个线程调用此方法,Phaser减少终结actual phase的线程数,并让这个线程进入休眠 直到全部其余线程结束phase。在run() 方法前面调用此方法,没有任何 FileSearch 线程可以开始他们的工作,直到全部线程被创建。

在phase 1 和 phase 2 的末端,我们检查phase 是否生成有元素的结果list,或者它没有生成结果且list为空。在第一个情况,checkResults() 方法之前提的调用 arriveAndAwaitAdvance()。在第二个情况,如果list为空,那就没有必要让线程继续了,就直接返回吧。但是你必须通知phaser,将会少一个参与者。为了这个,我们使用arriveAndDeregister()。它通知phaser线程结束了actual phase, 但是它将不会继续参见后面的phases,所以请phaser不要再等待它了。

在phase3的结尾实现了 showInfo() 方法, 调用了 phaser 的 arriveAndAwaitAdvance() 方法。这个调用,保证了全部线程在同一时间结束。当此方法结束执行,有一个调用phaser的arriveAndDeregister() 方法。这个调用,我们撤销了对phaser线程的注册,所以当全部线程结束时,phaser 有0个参与者。

最后,main() 方法等待3个线程的完成并调用phaser的 isTerminated() 方法。当phaser 有0个参与者时,它进入termination状态,此状态与此调用将会打印true到操控台。

Phaser 对象可能是在这2中状态:

  1. Active: 当 Phaser 接受新的参与者注册,它进入这个状态,并且在每个phase的末端同步。 在此状态,Phaser像在这个指南里解释的那样工作。此状态不在Java 并发 API中。
  2. Termination: 默认状态,当Phaser里全部的参与者都取消注册,它进入这个状态,所以这时 Phaser 有0个参与者。更具体的说,当onAdvance() 方法返回真值时,Phaser 是在这个状态里。如果你覆盖那个方法,你可以改变它的默认行为。当 Phaser 在这个状态,同步方法 arriveAndAwaitAdvance()会 立刻返回,不会做任何同步。

Phaser 类的一个显著特点是你不需要控制任何与phaser相关的方法的异常。不像其他同步应用,线程们在phaser休眠不会响应任何中断也不会抛出 InterruptedException 异常。只有一个异常会在下面的‘更多’里解释。

下面的裁图是例子的运行结果:

它展示了前2个phases的执行。你可以发现Apps线程在phase 2 结束它的运行由于list 为空。当你执行例子,你会发现一些线程比其他的线程更快结束phase,但是他们必须等待其他全部结束然后才能继续。

更多…

The Phaser类还提供了其他相关方法来改变phase。他们是:

  • arrive(): 此方法示意phaser某个参与者已经结束actual phase了,但是他应该等待其他的参与者才能继续执行。小心使用此法,因为它并不能与其他线程同步。
  • awaitAdvance(int phase): 如果我们传递的参数值等于phaser的actual phase,此方法让当前线程进入睡眠直到phaser的全部参与者结束当前的phase。如果参数值与phaser 的 actual phase不等,那么立刻返回。
  • awaitAdvanceInterruptibly(int phaser): 此方法等同与之前的方法,只是在线程正在此方法中休眠而被中断时候,它会抛出InterruptedException 异常。

Phaser的参与者的注册
当你创建一个 Phaser 对象,你表明了参与者的数量。但是Phaser类还有2种方法来增加参与者的数量。他们是:

  • register(): 此方法为Phaser添加一个新的参与者。这个新加入者会被认为是还未到达 actual phase.
  • bulkRegister(int Parties): 此方法为Phaser添加一个特定数量的参与者。这些新加入的参与都会被认为是还未到达 actual phase.

Phaser类提供的唯一一个减少参与者数量的方法是arriveAndDeregister() 方法,它通知phaser线程已经结束了actual phase,而且他不想继续phased的操作了。

强制终止 Phaser
当phaser有0个参与者,它进入一个称为Termination的状态。Phaser 类提供 forceTermination() 来改变phaser的状态,让它直接进入Termination 状态,不在乎已经在phaser中注册的参与者的数量。此机制可能会很有用在一个参与者出现异常的情况下来强制结束phaser.

当phaser在 Termination 状态, awaitAdvance() 和 arriveAndAwaitAdvance() 方法立刻返回一个负值,而不是一般情况下的正值如果你知道你的phaser可能终止了,那么你可以用这些方法来确认他是否真的终止了。

参见

第八章,测试并发应用:检测Phaser 

目录
相关文章
|
7月前
|
安全 Java
【JavaSE专栏76】三态和五态,线程的不同状态:新建、运行、状态、阻塞、等待、计时等待状态
【JavaSE专栏76】三态和五态,线程的不同状态:新建、运行、状态、阻塞、等待、计时等待状态
|
8月前
|
安全 算法 Java
去某东面试遇到并发编程问题:如何安全地中断一个正在运行的线程
一个位5年的小伙伴去某东面试被一道并发编程的面试题给Pass了,说”如何中断一个正在运行中的线程?,这个问题很多工作2年的都知道,实在是有些遗憾。 今天,我给大家来分享一下我的回答。
65 0
|
17天前
|
监控 安全
线程死循环是多线程应用程序开发过程中一个难以忽视的问题,它源于线程在执行过程中因逻辑错误或不可预见的竞争状态而陷入永久运行的状态,严重影响系统的稳定性和资源利用率。那么,如何精准定位并妥善处理线程死循环现象,并在编码阶段就规避潜在风险呢?谈谈你的看法~
避免线程死循环的关键策略包括使用同步机制(如锁和信号量)、减少共享可变状态、设置超时、利用监控工具、定期代码审查和测试、异常处理及设计简洁线程逻辑。通过这些方法,可降低竞态条件、死锁风险,提升程序稳定性和可靠性。
17 0
|
2月前
|
Go 调度
|
7月前
|
缓存 Java p3c
【Java用法】线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
【Java用法】线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
125 0
|
9月前
|
Web App开发 消息中间件 Prometheus
Spring Boot 服务监控,健康检查,线程信息,JVM堆信息,指标收集,运行情况监控等!(一)
Spring Boot 服务监控,健康检查,线程信息,JVM堆信息,指标收集,运行情况监控等!
线程池内运行的线程抛异常,线程池会怎么办
线程池内运行的线程抛异常,线程池会怎么办
|
9月前
|
存储 缓存 Dubbo
|
9月前
如何停止一个正在运行的线程
如何停止一个正在运行的线程
49 1
|
9月前
|
JSON 监控 安全
Spring Boot 服务监控,健康检查,线程信息,JVM堆信息,指标收集,运行情况监控等!(二)
Spring Boot 服务监控,健康检查,线程信息,JVM堆信息,指标收集,运行情况监控等!

相关实验场景

更多