线程执行者(十一)执行者分离任务的启动和结果的处理

简介:
声明:本文是《 Java 7 Concurrency Cookbook 》的第四章,作者: Javier Fernández González     译者:许巧辉     校对:方腾飞,叶磊

执行者分离任务的启动和结果的处理

通常,当你使用执行者执行并发任务时,你将会提交 Runnable或Callable任务给这个执行者,并获取Future对象控制这个方法。你可以发现这种情况,你需要提交任务给执行者在一个对象中,而处理结果在另一个对象中。基于这种情况,Java并发API提供CompletionService类。

CompletionService 类有一个方法来提交任务给执行者和另一个方法来获取已完成执行的下个任务的Future对象。在内部实现中,它使用Executor对象执行任务。这种行为的优点是共享一个CompletionService对象,并提交任务给执行者,这样其他(对象)可以处理结果。其局限性是,第二个对象只能获取那些已经完成它们的执行的任务的Future对象,所以,这些Future对象只能获取任务的结果。

在这个指南中,你将学习如何使用CompletionService类把执行者启动任务和处理它们的结果分开。

准备工作…

这个指南的例子使用Eclipse IDE实现。如果你使用Eclipse或其他IDE,如NetBeans,打开它并创建一个新的Java项目。

如何做…

按以下步骤来实现的这个例子:

1.创建ReportGenerator类,并指定其实现Callable接口,参数化为String类型。

1
public class ReportGenerator implements Callable<String> {
2.声明两个私有的、String类型的属性,sender和title,用来表示报告的数据。

1
private String sender;
2
private String title;
3.实现这个类的构造器,初始化这两个属性。

1
public ReportGenerator(String sender, String title){
2
this.sender=sender;
3
this.title=title;
4
}
4.实现call()方法。首先,让线程睡眠一段随机时间。

1
@Override
2
public String call() throws Exception {
3
try {
4
Long duration=(long)(Math.random()*10);
5
System.out.printf("%s_%s: ReportGenerator: Generating a report during %d seconds\n",this.sender,this.title,duration);
6
TimeUnit.SECONDS.sleep(duration);
7
} catch (InterruptedException e) {
8
e.printStackTrace();
9
}
5.然后,生成一个有sender和title属性的字符串的报告,返回这个字符串。

1
String ret=sender+": "+title;
2
return ret;
3
}
6.创建ReportRequest类,实现Runnable接口。这个类将模拟一些报告请求。

1
public class ReportRequest implements Runnable {
7.声明私有的、String类型的属性name,用来存储ReportRequest的名称。

1
private String name;
8.声明私有的、CompletionService类型的属性service。CompletionService接口是个参数化接口,使用String类型参数化它。

1
private CompletionService<String> service;
9.实现这个类的构造器,初始化这两个属性。

1
public ReportRequest(String name, CompletionService<String> service){
2
this.name=name;
3
this.service=service;
4
}
10.实现run()方法。创建1个ReportGenerator对象,并使用submit()方法把它提交给CompletionService对象。

1
@Override
2
public void run() {
3
ReportGenerator reportGenerator=new ReportGenerator(name,"Report");
4
service.submit(reportGenerator);
5
}
11.创建ReportProcessor类。这个类将获取ReportGenerator任务的结果,指定它实现Runnable接口。

1
public class ReportProcessor implements Runnable {
12.声明一个私有的、CompletionService类型的属性service。由于CompletionService接口是个参数化接口,使用String类作为这个CompletionService接口的参数。

1
private CompletionService<String> service;
13.声明一个私有的、boolean类型的属性end。

1
private boolean end;
14.实现这个类的构造器,初始化这两个属性。

1
public ReportProcessor (CompletionService<String> service){
2
this.service=service;
3
end=false;
4
}
15.实现run()方法。当属性end值为false,调用CompletionService接口的poll()方法,获取CompletionService执行的下个已完成任务的Future对象。

1
@Override
2
public void run() {
3
while (!end){
4
try {
5
Future<String> result=service.poll(20, TimeUnit.SECONDS);
16.然后,使用Future对象的get()方法获取任务的结果,并且将这些结果写入到控制台。

01
if (result!=null) {
02
String report=result.get();
03
System.out.printf("ReportReceiver: Report Received:%s\n",report);
04
}
05
} catch (InterruptedException | ExecutionException e) {
06
e.printStackTrace();
07
}
08
}
09
System.out.printf("ReportSender: End\n");
10
}
17.实现setEnd()方法,用来修改属性end的值。

1
public void setEnd(boolean end) {
2
this.end = end;
3
}
18.实现这个示例的主类,通过创建Main类,并实现main()方法。

1
public class Main {
2
public static void main(String[] args) {
19.使用Executors类的newCachedThreadPool()方法创建ThreadPoolExecutor。

1
ExecutorService executor=(ExecutorService)Executors.newCachedThreadPool();
20.创建CompletionService,使用前面创建的执行者作为构造器的参数。

1
CompletionService<String> service=new ExecutorCompletionService<>(executor);
21.创建两个ReportRequest对象,并用线程执行它们。

1
ReportRequest faceRequest=new ReportRequest("Face", service);
2
ReportRequest onlineRequest=new ReportRequest("Online";,service);
3
Thread faceThread=new Thread(faceRequest);
4
Thread onlineThread=new Thread(onlineRequest);
22.创建一个ReportProcessor对象,并用线程执行它。

1
ReportProcessor processor=new ReportProcessor(service);
2
Thread senderThread=new Thread(processor);
23.启动这3个线程。

1
System.out.printf("Main: Starting the Threads\n");
2
faceThread.start();
3
onlineThread.start();
4
senderThread.start();
24.等待ReportRequest线程的结束。

1
try {
2
System.out.printf("Main: Waiting for the report
3
generators.\n");
4
faceThread.join();
5
onlineThread.join();
6
} catch (InterruptedException e) {
7
e.printStackTrace();
8
}
25.使用shutdown()方法关闭执行者,使用awaitTermination()方法等待任务的结果。

1
System.out.printf("Main: Shutting down the executor.\n");
2
executor.shutdown();
3
try {
4
executor.awaitTermination(1, TimeUnit.DAYS);
5
} catch (InterruptedException e) {
6
e.printStackTrace();
7
}
26.设置ReportSender对象的end属性值为true,结束它的执行。

1
processor.setEnd(true);
2
System.out.println("Main: Ends");
这是如何工作的…

在示例的主类中,你使用Executors类的newCachedThreadPool()方法创建ThreadPoolExecutor。然后,使用这个对象初始化一个CompletionService对象,因为CompletionService需要使用一个执行者来执行任务。利用CompletionService执行一个任务,你需要使用submit()方法,如在ReportRequest类中。

当其中一个任务被执行,CompletionService完成这个任务的执行时,这个CompletionService在一个队列中存储Future对象来控制它的执行。poll()方法用来查看这个列队,如果有任何任务执行完成,那么返回列队的第一个元素,它是一个已完成任务的Future对象。当poll()方法返回一个Future对象时,它将这个Future对象从队列中删除。这种情况下,你可以传两个属性给那个方法,表明你想要等任务结果的时间,以防队列中的已完成任务的结果是空的。

一旦CompletionService对象被创建,你创建2个ReportRequest对象,用来执行3个ReportGenerator任务,每个都在CompletionService中,和一个ReportSender任务,它将会处理已提交给2个ReportRequest对象的任务所产生的结果。

不止这些…

CompletionService类可以执行Callable和Runnable任务。在这个示例中,你已经使用Callable,但你同样可以提交Runnable对象。由于Runnable对象不会产生结果,CompletionService类的理念不适用于这些情况。

这个类同样提供其他两个方法,用来获取已完成任务的Future对象。这两个方法如下:

poll():不带参数版本的poll()方法,检查是否有任何Future对象在队列中。如果列队是空的,它立即返回null。否则,它返回第一个元素,并从列队中删除它。
take():这个方法,不带参数。检查是否有任何Future对象在队列中。如果队列是空的,它阻塞线程直到队列有一个元素。当队列有元素,它返回第一元素,并从列队中删除它。
参见

在第4章,线程执行者中的执行者执行返回结果的任务指南
文章转自  并发编程网-ifeve.com
目录
相关文章
|
15天前
|
Java 数据库 Android开发
【专栏】Kotlin在Android开发中的多线程优化,包括线程池、协程的使用,任务分解、避免阻塞操作以及资源管理
【4月更文挑战第27天】本文探讨了Kotlin在Android开发中的多线程优化,包括线程池、协程的使用,任务分解、避免阻塞操作以及资源管理。通过案例分析展示了网络请求、图像处理和数据库操作的优化实践。同时,文章指出并发编程的挑战,如性能评估、调试及兼容性问题,并强调了多线程优化对提升应用性能的重要性。开发者应持续学习和探索新的优化策略,以适应移动应用市场的竞争需求。
|
2月前
|
算法 调度 索引
什么是多任务和线程?用线程写的一个udp同步聊天器
什么是多任务和线程?用线程写的一个udp同步聊天器
30 0
|
2月前
|
数据采集 存储 Java
「多线程大杀器」Python并发编程利器:ThreadPoolExecutor,让你一次性轻松开启多个线程,秒杀大量任务!
「多线程大杀器」Python并发编程利器:ThreadPoolExecutor,让你一次性轻松开启多个线程,秒杀大量任务!
|
2月前
|
存储 算法 Java
【C/C++ 线程池设计思路】 深入探索线程池设计:任务历史记录的高效管理策略
【C/C++ 线程池设计思路】 深入探索线程池设计:任务历史记录的高效管理策略
89 0
|
5天前
|
存储 安全 Linux
【探索Linux】P.19(多线程 | 线程的概念 | 线程控制 | 分离线程)
【探索Linux】P.19(多线程 | 线程的概念 | 线程控制 | 分离线程)
7 0
|
13天前
|
存储 安全 Java
【亮剑】`ConcurrentHashMap`是Java中线程安全的哈希表,采用锁定分离技术提高并发性能
【4月更文挑战第30天】`ConcurrentHashMap`是Java中线程安全的哈希表,采用锁定分离技术提高并发性能。数据被分割成多个Segment,每个拥有独立锁,允许多线程并发访问不同Segment。当写操作发生时,计算键的哈希值定位Segment并获取其锁;读操作通常无需锁定。内部会根据负载动态调整Segment,减少锁竞争。虽然使用不公平锁,但Java 8及以上版本提供了公平锁选项。理解其工作原理对开发高性能并发应用至关重要。
|
17天前
|
存储 安全 Java
Java多线程实战-从零手搓一个简易线程池(一)定义任务等待队列
Java多线程实战-从零手搓一个简易线程池(一)定义任务等待队列
|
1月前
|
Java Spring
定时任务里面的任务多线程操作
该内容是关于Spring Boot中配置异步任务和定时任务的代码示例。首先通过`@Configuration`和`@EnableAsync`开启异步支持,然后定义线程池,如使用`ThreadPoolExecutor`并设置核心线程数、最大线程数等参数。接着,在需要异步执行的方法上添加`@Async`注解。此外,通过`@EnableScheduling`开启定时任务,并使用`@Scheduled`定义具体任务和执行周期。若需指定多个线程池,可以创建不同的`Executor` bean,并在`@Async`中指定线程池名称。
21 2
|
2月前
|
安全 Java 调度
【C/C++ 线程池设计思路 】设计与实现支持优先级任务的C++线程池 简要介绍
【C/C++ 线程池设计思路 】设计与实现支持优先级任务的C++线程池 简要介绍
56 2
|
2月前
|
Java
多线程------Future异步任务
多线程------Future异步任务