CompletionService

简介: 如果你向Executor提交了一个批处理任务,并且希望在它们完成后获得结果。为此你可以保存与每个任务相关联的Future,然后不断地调用timeout为零的get,来检验Future是否完成。这样做固然可以,但却相当乏味。幸运的是,还有一个更好的方法:完成服务(Completion service)。如果你向Executor提交了一个批处理任务,并且希望在它们完成后获得结果。为此你可以保存

如果你向Executor提交了一个批处理任务,并且希望在它们完成后获得结果。为此你可以保存与每个任务相关联的Future,然后不断地调用timeout为零的get,来检验Future是否完成。这样做固然可以,但却相当乏味。幸运的是,还有一个更好的方法:完成服务(Completion service)。如果你向Executor提交了一个批处理任务,并且希望在它们完成后获得结果。为此你可以保存与每个任务相关联的Future,然后不断地调用timeout为零的get,来检验Future是否完成。这样做固然可以,但却相当乏味。幸运的是,还有一个更好的方法:完成服务(Completion service)。


CompletionService整合了Executor和BlockingQueue的功能。你可以将Callable任务提交给它去执行,然后使用类似于队列中的take和poll方法,在结果完整可用时获得这个结果,像一个打包的Future。ExecutorCompletionService是实现CompletionService接口的一个类,并将计算任务委托给一个Executor。

它是一个更高级的ExecutorService,它本身自带一个线程安全的线性表,无需用户额外创建。它提供了2种方法从线性表中取出结果,poll()是非阻塞的,若目前无结果,返回一个null,线程继续运行不阻塞。take()是阻塞的,若当前无结果,则线程阻塞,直到产生一个结果,被取出返回,线程才继续运行。

此类将安排那些完成时提交的任务,把它们结果放置在可使用 take 访问的队列上, 外部可以通过take(),poll(),poll(long timeout,TimeUnit unit)来取得。该类非常轻便,适合于在执行几组任务时临时使用。


主要构造函数
public ExecutorCompletionService(Executor executor)
    使用为执行基本任务而提供的执行程序创建一个 ExecutorCompletionService,并将 LinkedBlockingQueue 作为完成队列。
    参数:
        executor - 要使用的执行程序 
    抛出:
        NullPointerException - 如果执行程序为 null
public ExecutorCompletionService(Executor executor,BlockingQueue<Future<V>> completionQueue)
    使用为执行基本任务而提供的执行程序创建一个 ExecutorCompletionService,并将所提供的队列作为其完成队列。
    参数:
        executor - 要使用的执行程序
        completionQueue - 用作完成队列的队列,通常是专供此服务使用的队列 
    抛出:
        NullPointerException - 如果执行程序或 completionQueue 为 null

主要成员函数
public Future<V>  submit(Callable<V> task)
    从接口 CompletionService 复制的描述
    提交要执行的值返回任务,并返回表示挂起的任务结果的 Future。在完成时,可能会提取或轮询此任务。
    指定者:
        接口 CompletionService<V> 中的 submit
    参数:
        task - 要提交的任务 
    返回:
        一个表示挂起的任务完成的 Future
public Future<V>  submit(Runnable task,V result)
    从接口 CompletionService 复制的描述
    提交要执行的 Runnable 任务,并返回一个表示任务完成的 Future,可以提取或轮询此任务。
    指定者:
        接口 CompletionService<V> 中的 submit
    参数:
        task - 要提交的任务
        result - 要返回的已成功完成任务的结果 
    返回:
        一个表示挂起的任务完成的 Future,其 get() 方法将返回完成时给出的结果值
public Future<V>  take()
               throws InterruptedException
    从接口 CompletionService 复制的描述
    获取并移除表示下一个已完成任务的 Future,如果目前不存在这样的任务,则等待。
    指定者:
        接口 CompletionService<V> 中的 take
    返回:
        表示下一个已完成任务的 Future 
    抛出:
        InterruptedException - 如果在等待时被中断
public Future<V>  poll()
    从接口 CompletionService 复制的描述
    获取并移除表示下一个已完成任务的 Future,如果不存在这样的任务,则返回 null。
    指定者:
        接口 CompletionService<V> 中的 poll
    返回:
        表示下一个已完成任务的 Future;如果不存在这样的任务,则返回 null
public Future<V>  poll(long timeout,
                      TimeUnit unit)
               throws InterruptedException
    从接口 CompletionService 复制的描述
    获取并移除表示下一个已完成任务的 Future,如果目前不存在这样的任务,则将等待指定的时间(如果有必要)。
    指定者:
        接口 CompletionService<V> 中的 poll
    参数:
        timeout - 放弃之前需要等待的时间长度,以 unit 为时间单位
        unit - 确定如何解释 timeout 参数的 TimeUnit 
    返回:
        表示下一个已完成任务的 Future;如果等待了指定时间仍然不存在这样的任务,则返回 null 
    抛出:
        InterruptedException - 如果在等待时被中断

代码:
  1. import java.util.Random;  
  2. import java.util.concurrent.BlockingQueue;  
  3. import java.util.concurrent.Callable;  
  4. import java.util.concurrent.CompletionService;  
  5. import java.util.concurrent.ExecutionException;  
  6. import java.util.concurrent.ExecutorCompletionService;  
  7. import java.util.concurrent.ExecutorService;  
  8. import java.util.concurrent.Executors;  
  9. import java.util.concurrent.Future;  
  10. import java.util.concurrent.LinkedBlockingQueue;  
  11.   
  12. public class Test17 {  
  13.     public static void main(String[] args) throws Exception {  
  14.         Test17 t = new Test17();  
  15.         t.count1();  
  16.         t.count2();  
  17.     }  
  18. //使用阻塞容器保存每次Executor处理的结果,在后面进行统一处理  
  19.     public void count1() throws Exception{  
  20.         ExecutorService exec = Executors.newCachedThreadPool();  
  21.         BlockingQueue<Future<Integer>> queue = new LinkedBlockingQueue<Future<Integer>>();  
  22.         for(int i=0; i<10; i++){  
  23.             Future<Integer> future =exec.submit(getTask());  
  24.             queue.add(future);  
  25.         }  
  26.         int sum = 0;  
  27.         int queueSize = queue.size();  
  28.         for(int i=0; i<queueSize; i++){  
  29.             sum += queue.take().get();  
  30.         }  
  31.         System.out.println("总数为:"+sum);  
  32.         exec.shutdown();  
  33.     }  
  34. //使用CompletionService(完成服务)保持Executor处理的结果  
  35.     public void count2() throws InterruptedException, ExecutionException{  
  36.         ExecutorService exec = Executors.newCachedThreadPool();  
  37.         CompletionService<Integer> execcomp = new ExecutorCompletionService<Integer>(exec);  
  38.         for(int i=0; i<10; i++){  
  39.             execcomp.submit(getTask());  
  40.         }  
  41.         int sum = 0;  
  42.         for(int i=0; i<10; i++){  
  43. //检索并移除表示下一个已完成任务的 Future,如果目前不存在这样的任务,则等待。  
  44.             Future<Integer> future = execcomp.take();  
  45.             sum += future.get();  
  46.         }  
  47.         System.out.println("总数为:"+sum);  
  48.         exec.shutdown();  
  49.     }  
  50.     //得到一个任务  
  51.     public Callable<Integer> getTask(){  
  52.         final Random rand = new Random();  
  53.         Callable<Integer> task = new Callable<Integer>(){  
  54.             @Override  
  55.             public Integer call() throws Exception {  
  56.                 int i = rand.nextInt(10);  
  57.                 int j = rand.nextInt(10);  
  58.                 int sum = i*j;  
  59.                 System.out.print(sum+"\t");  
  60.                 return sum;  
  61.             }  
  62.         };  
  63.         return task;  
  64.     }  

结果:

49 6 48 28 312 0 0 6 总数为:152
56  5425  35 10 12  854 20  总数为:282

目录
相关文章
|
5月前
|
Java
CompletionService 使用小结
CompletionService 使用小结
34 1
|
5月前
|
存储 缓存 安全
(八)深入并发之Runnable、Callable、FutureTask及CompletableFuture原理分析
关于Runnable、Callable接口大家可能在最开始学习Java多线程编程时,都曾学习过一个概念:在Java中创建多线程的方式有三种:继承Thread类、实现Runnable接口以及实现Callable接口。但是实则不然,真正创建多线程的方式只有一种:继承Thread类,因为只有`new Thread().start()`这种方式才能真正的映射一条OS的内核线程执行,而关于实现Runnable接口以及实现Callable接口创建出的Runnable、Callable对象在我看来只能姑且被称为“多线程任务”,因为无论是Runnable对象还是Callable对象,最终执行都要交由Threa
110 1
|
7月前
|
Java API
java多线程之FutureTask、Future、CompletableFuture
java多线程之FutureTask、Future、CompletableFuture
315 0
JavaThread、Runnable、Callable、线程池的使用
JavaThread、Runnable、Callable、线程池的使用
104 0
JavaThread、Runnable、Callable、线程池的使用
|
消息中间件 设计模式 Kafka
CompletionService学习
前面已经说到Future的默认实现是FutureTask,因此你可以看到其在jdk1.5的时候采用的是AQS去实现的,因此具有阻塞性,但jdk1.6之后,可以看到其基于CAS实现的。之所以学习Future,除了其具备异步功能,同时其采用的思想也是在设计模式中有体现的,也即Future模式,而且可以在kafka源码中看到基于Future构建的异步编程。前面说到其基于AQS具有阻塞性,但从源码中,可以看到在jdk1.6之后采用的是CAS
105 0
CompletionService学习
|
Java
Java多线程 CompletionService和ExecutorCompletionService
Java多线程 CompletionService和ExecutorCompletionService
151 0
Java多线程 CompletionService和ExecutorCompletionService
|
安全
线程池中CompletionService的应用
线程池中CompletionService的应用
117 0
|
存储
多线程 - Callable、Future 和 FutureTask 简单应用(二)
多线程 - Callable、Future 和 FutureTask 简单应用(二)
122 0
多线程 - Callable、Future 和 FutureTask 简单应用(二)
|
消息中间件 Dubbo Java
“既生 ExecutorService, 何生 CompletionService?”
“既生 ExecutorService, 何生 CompletionService?”
“既生 ExecutorService, 何生 CompletionService?”
|
Java API
【小家Java】Future、FutureTask、CompletionService、CompletableFuture解决多线程并发中归集问题的效率对比(下)
【小家Java】Future、FutureTask、CompletionService、CompletableFuture解决多线程并发中归集问题的效率对比(下)
【小家Java】Future、FutureTask、CompletionService、CompletableFuture解决多线程并发中归集问题的效率对比(下)