声明:本文是《 Java 7 Concurrency Cookbook 》的第三章, 作者: Javier Fernández González 译者:郑玉婷
控制并发阶段性任务的改变
Phaser 类提供每次phaser改变阶段都会执行的方法。它是 onAdvance() 方法。它接收2个参数:当前阶段数和注册的参与者数;它返回 Boolean 值,如果phaser继续它的执行,则为 false;否则为真,即phaser结束运行并进入 termination 状态。
如果注册参与者为0,此方法的默认的实现值为真,要不然就是false。如果你扩展Phaser类并覆盖此方法,那么你可以修改它的行为。通常,当你要从一个phase到另一个,来执行一些行动时,你会对这么做感兴趣的。
在这个指南,你将学习如何控制phaser的 phase的改变,通过实现自定义版本的 Phaser类并覆盖 onAdvance() 方法来执行一些每个phase 都会改变的行动。你将要实现一个模拟测验,有些学生要完成他们的练习。全部的学生都必须完成同一个练习才能继续下一个练习。
准备
指南中的例子是使用Eclipse IDE 来实现的。如果你使用Eclipse 或者其他的IDE,例如NetBeans, 打开并创建一个新的java项目。
怎么做呢…
按照这些步骤来实现下面的例子::
003 |
import java.util.Date;
|
004 |
import java.util.concurrent.Phaser;
|
005 |
import java.util.concurrent.TimeUnit;
|
008 |
public class MyPhaser extends Phaser {
|
015 |
protected boolean onAdvance( int phase, int registeredParties) {
|
018 |
return studentsArrived();
|
020 |
return finishFirstExercise();
|
022 |
return finishSecondExercise();
|
031 |
private boolean studentsArrived() {
|
032 |
System.out.printf( "Phaser: The exam are going to start. The students are ready.\n" );
|
033 |
System.out.printf( "Phaser: We have %d students.\n" ,
|
034 |
getRegisteredParties());
|
039 |
private boolean finishFirstExercise() {
|
040 |
System.out.printf( "Phaser: All the students have finished the first exercise.\n" );
|
041 |
System.out.printf( "Phaser: It's time for the second one.\n" );
|
046 |
private boolean finishSecondExercise() {
|
047 |
System.out.printf( "Phaser: All the students have finished the second exercise.\n" );
|
048 |
System.out.printf( "Phaser: It's time for the third one.\n" );
|
053 |
private boolean finishExam() {
|
054 |
System.out.printf( "Phaser: All the students have finished the exam.\n" );
|
055 |
System.out.printf( "Phaser: Thank you for your time.\n" );
|
060 |
public class Student implements Runnable {
|
063 |
private Phaser phaser;
|
066 |
public Student(Phaser phaser) {
|
067 |
this .phaser = phaser;
|
076 |
System.out.printf( "%s: Has arrived to do the exam. %s\n" , Thread
|
077 |
.currentThread().getName(), new Date());
|
078 |
phaser.arriveAndAwaitAdvance();
|
082 |
System.out.printf( "%s: Is going to do the first exercise. %s\n" ,
|
083 |
Thread.currentThread().getName(), new Date());
|
085 |
System.out.printf( "%s: Has done the first exercise. %s\n" , Thread
|
086 |
.currentThread().getName(), new Date());
|
087 |
phaser.arriveAndAwaitAdvance();
|
090 |
System.out.printf( "%s: Is going to do the second exercise.%s\n" ,
|
091 |
Thread.currentThread().getName(), new Date());
|
093 |
System.out.printf( "%s: Has done the second exercise. %s\n" , Thread
|
094 |
.currentThread().getName(), new Date());
|
095 |
phaser.arriveAndAwaitAdvance();
|
096 |
System.out.printf( "%s: Is going to do the third exercise. %s\n" ,
|
097 |
Thread.currentThread().getName(), new Date());
|
099 |
System.out.printf( "%s: Has finished the exam. %s\n" , Thread
|
100 |
.currentThread().getName(), new Date());
|
101 |
phaser.arriveAndAwaitAdvance();
|
105 |
private void doExercise1() {
|
107 |
long duration = ( long ) (Math.random() * 10 );
|
108 |
TimeUnit.SECONDS.sleep(duration);
|
109 |
} catch (InterruptedException e) {
|
115 |
private void doExercise2() {
|
117 |
long duration = ( long ) (Math.random() * 10 );
|
118 |
TimeUnit.SECONDS.sleep(duration);
|
119 |
} catch (InterruptedException e) {
|
125 |
private void doExercise3() {
|
127 |
long duration = ( long ) (Math.random() * 10 );
|
128 |
TimeUnit.SECONDS.sleep(duration);
|
129 |
} catch (InterruptedException e) {
|
实现例子的main类,创建名为Main的类并添加main() 方法。
03 |
import tool.MyPhaser.Student;
|
08 |
public static void main(String[] args) {
|
11 |
MyPhaser phaser = new MyPhaser();
|
14 |
MyPhaser.Student students[] = new Student[ 5 ];
|
15 |
for ( int i = 0 ; i < students.length; i++) {
|
16 |
students[i] = phaser. new Student(phaser);
|
21 |
Thread threads[] = new Thread[students.length];
|
22 |
for ( int i = 0 ; i < students.length; i++) {
|
23 |
threads[i] = new Thread(students[i], "Student " + i);
|
28 |
for ( int i = 0 ; i < threads.length; i++) {
|
31 |
} catch (InterruptedException e) {
|
37 |
System.out.printf( "Main: The phaser has finished: %s.\n" ,
|
38 |
phaser.isTerminated());
|
它是怎么工作的…
这个练习模拟了有3个测验的真实测试。全部的学生必须都完成同一个测试才能开始下一个测试。为了实现这个必须使用同步,我们使用了Phaser类,但是你实现了你自己的phaser通过扩展原来的类,并覆盖onAdvance() 方法.
在阶段改变之前和在唤醒 arriveAndAwaitAdvance() 方法中休眠的全部线程们之前,此方法被 phaser 调用。这个方法接收当前阶段数作为参数,0是第一个phase ,还有注册的参与者数。最有用的参数是actual phase。如果你要基于不同的当前阶段执行不同的操作,那么你必须使用选择性结构(if/else 或 switch)来选择你想执行的操作。例子里,我们使用了 switch 结构来为每个phase的改变选择不同的方法。
onAdvance() 方法返回 Boolean 值表明 phaser 终结与否。如果返回 false 值,表示它还没有终结,那么线程将继续执行其他phases。如果phaser 返回真值,那么phaser将叫醒全部待定的线程们,并且转移phaser到terminated 状态,所以之后的任何对phaser的方法的调用都会被立刻返回,还有isTerminated() 方法将返回真值。
在核心类,当你创建 MyPhaser 对象,在phaser中你不用表示参与者的数量。你为每个 Student 对象调用了 register() 方法创建了phaser的参与者的注册。这个调用不会在Student 对象或者执行它的线程与phaser之间这个建立任何关系。 说真的,phaser的参与者数就是个数字而已。phaser与参与者之间没有任何关系。
下面的裁图展示了例子的执行结果:

你可以发现学生们结束第一个练习的时间是不同的。当全部都结束练习时,phaser 调用onAdvance() 方法写信息到操控台,接着全部的学生在同一时间开始第二场测试。
参见
第三章,线程同步应用:运行并发阶段性任务
第八章,测试并发应用:监控 Phaser
文章转自 并发编程网-ifeve.com