定制并发类(七)实现ThreadFactory接口生成自定义的线程给Fork/Join框架

简介:

声明:本文是《 Java 7 Concurrency Cookbook 》的第七章,作者: Javier Fernández González     译者:许巧辉

实现ThreadFactory接口生成自定义的线程给Fork/Join框架

Fork/Join框架是Java7中最有趣的特征之一。它是Executor和ExecutorService接口的一个实现,允许你执行Callable和Runnable任务而不用管理这些执行线程。

这个执行者面向执行能被拆分成更小部分的任务。主要组件如下:

  • 一个特殊任务,实现ForkJoinTask类
  • 两种操作,将任务划分成子任务的fork操作和等待这些子任务结束的join操作
  • 一个算法,优化池中线程的使用的work-stealing算法。当一个任务正在等待它的子任务(结束)时,它的执行线程将执行其他任务(等待执行的任务)。

ForkJoinPool类是Fork/Join的主要类。在它的内部实现,有如下两种元素:

  • 一个存储等待执行任务的列队。
  • 一个执行任务的线程池

在这个指南中,你将学习如何实现一个在ForkJoinPool类中使用的自定义的工作者线程,及如何使用一个工厂来使用它。

准备工作…

这个指南的例子使用Eclipse IDE实现。如果你使用Eclipse或其他IDE,如NetBeans,打开它并创建一个新的Java项目。

如何做…

按以下步骤来实现的这个例子:

1.创建一个继承ForkJoinWorkerThread类的MyWorkerThread类。

public class MyWorkerThread extends ForkJoinWorkerThread {

2.声明和创建一个参数化为Integer类的ThreadLocal属性,名为taskCounter。

private static ThreadLocal<Integer> taskCounter=new ThreadLocal<Integer>();

3.实现这个类的构造器。

protected MyWorkerThread(ForkJoinPool pool) {
super(pool);
}

4.重写onStart()方法。调用父类的这个方法,写入一条信息到控制台。设置当前线程的taskCounter属性值为0。

@Override
protected void onStart() {
super.onStart();
System.out.printf("MyWorkerThread %d: Initializing task
counter.\n",getId());
taskCounter.set(0);
}

5.重写onTermination()方法。写入当前线程的taskCounter属性值到控制台。

@Override
protected void onTermination(Throwable exception) {
System.out.printf("MyWorkerThread %d:
%d\n",getId(),taskCounter.get());
super.onTermination(exception);
}

6.实现addTask()方法。递增taskCounter属性值。

public void addTask(){
int counter=taskCounter.get().intValue();
counter++;
taskCounter.set(counter);
}

7.创建一个实现ForkJoinWorkerThreadFactory接口的MyWorkerThreadFactory类。实现newThread()方法,创建和返回一个MyWorkerThread对象。

@Override
public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
return new MyWorkerThread(pool);
}
}

8.创建MyRecursiveTask类,它继承一个参数化为Integer类的RecursiveTask类。

public class MyRecursiveTask extends RecursiveTask<Integer> {

9.声明一个私有的、int类型的属性array。

private int array[];

10.声明两个私有的、int类型的属性start和end。

private int start, end;

11.实现这个类的构造器,初始化它的属性。

public MyRecursiveTask(int array[],int start, int end) {
this.array=array;
this.start=start;
this.end=end;
}

12.实现compute()方法,用来合计数组中在start和end位置之间的所有元素。首先,将执行这个任务的线程转换成一个MyWorkerThread对象,然后使用addTask()方法来增长这个线程的任务计数器。

@Override
protected Integer compute() {
Integer ret;
MyWorkerThread thread=(MyWorkerThread)Thread.currentThread();
thread.addTask();
}

13.实现addResults()方法。计算和返回两个任务(接收参数)的结果的总和。

private Integer addResults(Task task1, Task task2) {
int value;
try {
value = task1.get().intValue()+task2.get().intValue();
} catch (InterruptedException e) {
e.printStackTrace();
value=0;
} catch (ExecutionException e) {
e.printStackTrace();
value=0;
}

14.令这个线程睡眠10毫秒,然后返回任务的结果。

try {
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
return value;
}

15.实现这个例子的主类,通过创建Main类,并实现main()方法。

public class Main {
public static void main(String[] args) throws Exception {

16.创建一个名为factory的MyWorkerThreadFactory对象。

MyWorkerThreadFactory factory=new MyWorkerThreadFactory();

17.创建一个名为pool的ForkJoinPool对象,将前面创建的factory对象作为参数传给它的构造器。

ForkJoinPool pool=new ForkJoinPool(4, factory, null, false);

18.创建一个大小为100000的整数数组,将所有元素初始化为值1。

int array[]=new int[100000];
for (int i=0; i<array.length; i++){
array[i]=1;
}

19.创建一个新的Task对象,用来合计数组中的所有元素。

MyRecursiveTask task=new MyRecursiveTask(array,0,array.length);

20.使用execute()方法,将这个任务提交给池。

pool.execute(task);

21.使用join()方法,等待这个任务的结束。

task.join();

22.使用shutdown()方法,关闭这个池。

pool.shutdown();

23.使用awaitTermination()方法,等待这个执行者的结束。

pool.awaitTermination(1, TimeUnit.DAYS);

24.使用get()方法,将任务的结束写入到控制台。

System.out.printf("Main: Result: %d\n",task.get());

25.写入一条信息到控制台,表明程序的结束。

System.out.printf("Main: End of the program\n");

它是如何工作的…

Fork/Join框架使用的线程叫工作者线程。Java包含继承Thread类的ForkJoinWorkerThread类和使用Fork/Join框架实现工作者线程。

在这个指南中,你已实现了继承ForkJoinWorkerThread类的MyWorkerThread类,并重写这个类的两个方法。你的目标是实现每个工作者线程的任务计数器,以至于你可以知道每个工作者线程执行多少个任务。你已经通过一个ThreadLocal属性实现计数器。这样,每个线程都拥有它自己的计数器,对于来你说是透明的。

你已重写ForkJoinWorkerThread类的onStart()方法来实现任务的计数器。当工作者线程开始它的执行时,这个方法将被调用。你也重写了onTermination()方法,将任务计数器的值写入到控制台。当工作者线程结束它的执行时,这个方法将被调用。你也在MyWorkerThread类中实现addTask()方法,用来增加每个线程的任务计数器。

对于ForkJoinPool类,与Java并发API中的所有执行者一样,使用工厂来创建它。所以,如果你想在ForkJoinPool类中使用MyWorkerThread线程,你必须实现自己的线程工厂。对于Fork/Join框架,这个工厂必须实现ForkJoinPool.ForkJoinWorkerThreadFactory类。为此,你已实现MyWorkerThreadFactory类。这个类只有一个用来创建一个新的MyWorkerThread对象的方法。

最后,你只要使用已创建的工厂来初始化ForkJoinPool类。你已在Main类中通过使用ForkJoinPool的构造器实现了。

以下截图显示了这个程序的部分输出:

4

你可以看出ForkJoinPool对象如何执行4个工作者线程及每个工作者线程执行多少个任务。

不止这些…

考虑一下,当一个线程正常结束或抛出一个Exception异常时,调用的ForkJoinWorkerThread提供的onTermination()方法。这个方法接收一个Throwable对象作为参数。如果这个参数值为null时,表明这个工作者线程正常结束。但是,如果这个参数的值不为null,表明这个线程抛出一个异常。你必须包含必要的代码来处理这种情况。

参见

目录
相关文章
|
11天前
|
安全 Java API
java如何请求接口然后终止某个线程
通过本文的介绍,您应该能够理解如何在Java中请求接口并根据返回结果终止某个线程。合理使用标志位或 `interrupt`方法可以确保线程的安全终止,而处理好网络请求中的各种异常情况,可以提高程序的稳定性和可靠性。
42 6
|
1月前
|
安全 Java
线程安全的艺术:确保并发程序的正确性
在多线程环境中,确保线程安全是编程中的一个核心挑战。线程安全问题可能导致数据不一致、程序崩溃甚至安全漏洞。本文将分享如何确保线程安全,探讨不同的技术策略和最佳实践。
41 6
|
1月前
|
安全 Java 开发者
Java 多线程并发控制:深入理解与实战应用
《Java多线程并发控制:深入理解与实战应用》一书详细解析了Java多线程编程的核心概念、并发控制技术及其实战技巧,适合Java开发者深入学习和实践参考。
57 6
|
1月前
|
存储 安全 Java
Java多线程编程中的并发容器:深入解析与实战应用####
在本文中,我们将探讨Java多线程编程中的一个核心话题——并发容器。不同于传统单一线程环境下的数据结构,并发容器专为多线程场景设计,确保数据访问的线程安全性和高效性。我们将从基础概念出发,逐步深入到`java.util.concurrent`包下的核心并发容器实现,如`ConcurrentHashMap`、`CopyOnWriteArrayList`以及`BlockingQueue`等,通过实例代码演示其使用方法,并分析它们背后的设计原理与适用场景。无论你是Java并发编程的初学者还是希望深化理解的开发者,本文都将为你提供有价值的见解与实践指导。 --- ####
|
3天前
|
NoSQL Redis
单线程传奇Redis,为何引入多线程?
Redis 4.0 引入多线程支持,主要用于后台对象删除、处理阻塞命令和网络 I/O 等操作,以提高并发性和性能。尽管如此,Redis 仍保留单线程执行模型处理客户端请求,确保高效性和简单性。多线程仅用于优化后台任务,如异步删除过期对象和分担读写操作,从而提升整体性能。
12 1
|
2月前
|
存储 消息中间件 资源调度
C++ 多线程之初识多线程
这篇文章介绍了C++多线程的基本概念,包括进程和线程的定义、并发的实现方式,以及如何在C++中创建和管理线程,包括使用`std::thread`库、线程的join和detach方法,并通过示例代码展示了如何创建和使用多线程。
60 1
|
2月前
|
Java 开发者
在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口
【10月更文挑战第20天】在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口。本文揭示了这两种方式的微妙差异和潜在陷阱,帮助你更好地理解和选择适合项目需求的线程创建方式。
32 3
|
2月前
|
Java 开发者
在Java多线程编程中,选择合适的线程创建方法至关重要
【10月更文挑战第20天】在Java多线程编程中,选择合适的线程创建方法至关重要。本文通过案例分析,探讨了继承Thread类和实现Runnable接口两种方法的优缺点及适用场景,帮助开发者做出明智的选择。
25 2
|
2月前
|
Java
Java中多线程编程的基本概念和创建线程的两种主要方式:继承Thread类和实现Runnable接口
【10月更文挑战第20天】《JAVA多线程深度解析:线程的创建之路》介绍了Java中多线程编程的基本概念和创建线程的两种主要方式:继承Thread类和实现Runnable接口。文章详细讲解了每种方式的实现方法、优缺点及适用场景,帮助读者更好地理解和掌握多线程编程技术,为复杂任务的高效处理奠定基础。
41 2
|
2月前
|
Java 开发者
Java多线程初学者指南:介绍通过继承Thread类与实现Runnable接口两种方式创建线程的方法及其优缺点
【10月更文挑战第20天】Java多线程初学者指南:介绍通过继承Thread类与实现Runnable接口两种方式创建线程的方法及其优缺点,重点解析为何实现Runnable接口更具灵活性、资源共享及易于管理的优势。
47 1