CompletionService

简介:

一.CompletionService接口提供了可以操作异步任务的功能,其唯一实现的API为ExecutorCompletionService。此API只是可以获取异步任务执行的结果,它不是ExecutorService。

其有5个核心方法:

  • Future<V> poll():同步操作,获取并移除第一已经完成的任务,否则返回null。
  • Future<V> poll(timeout):同步操作,获取并移除第一个已经完成的任务,阻塞时间为timeout,否则返回null;支持InterruptedException。
  • Future<V> submit(Callable<V> task):提交任务,并获取任务执行结果的句柄。
  • Future<V> submit(Runnable,V result):提交任务,并获取任务执行结果的句柄。
  • Future<V> take():获取并移除第一个执行完成的任务,阻塞,直到有任务返回。支持InterruptedException。

 ExecutorCompletionService之说以能够提供此功能,原因就是其内部持有一个BockingQueue(此queue可以通过构造器传入指定)。

同时这还要借助Future/FutureTask的功能。

  • public ExecutorCompletionService(Executor executor,BlockingQueue<Future<V>> completionQueue):需要指定一个现有的executor和用于存储Future的队列,此后通过submit提交的任务都将有executor来执行,并将"Future句柄"添加到队列中;这个API很像一个"修饰者".

二.Future:提供了可以查看异步执行的结果。此接口提供了多个方便的方法,以便检测和控制任务的操作。

  • boolean cancel(boolean interruptIfRunning):试图取消任务的执行,如果任务已经完成或者取消,此操作将无效。如果任务尚未启动(start),那么任务将不会被执行,如果任务正在执行,则interruptIfRunning参数决定是否中断任务线程(线程需要相应“中断”)。此方法返回后,isDone将返回true;如果方法取消成功,则isCancelled()则返回true。
  • V get():等待并获取执行结果。此方法会阻塞,知道结果返回。此方法会在线程中断时抛出InterruptException,如果任务被取消,将;抛出异常。
  • V get(timeout):阻塞指定的时间。如果时间超时,仍未能执行完成,则抛出timeoutException。

RunnableFuture接口扩展了Future接口和Runnable,只提供(覆盖)run()方法,其作用非常简单,就是标示其子类具有可执行run方法,且获取Future结果。

三.FutureTask就是RunnableFuture的子类,具有Future接口的可取消任务的能力,以及获取异步计算结果的能力。FutureTask可以认为是一个runnable和callable任务的桥梁类,其构造函数可以接受这两种任务。

  • FutureTask(Callable<V> callable) 
  • FutureTask(Runnable runnable, V result):当运行结束后,将返回指定的result。

 此外,还有几个特殊的方法:

  • protected void done():可重写的方法,当任务执行结束后,将会调用此方法执行额外的操作。
  • protected void set(V v):会被run方法内部调用,用来设置执行结果,此结果可以通过get获取。

runnable类型的任务,会在FutureTask中转化成Callable(参见Executors.callable(runnable,result),原理很简单,创建一个Callable实例,即在调用call时间接的调用run(),

并在执行结束后,返回指定的result)。

 

四.ExecutorCompletionService:提交给ExecutorCompletionService的任务,会被封装成一个QueueingFuture(一个FutureTask子类),此类的唯一作用就是在done()方法中,增加了将执行的FutureTask加入了内部队列,此时外部调用者,就可以take到相应的执行结束的任务。(take就是从blockingQueue中依次获取)

 

 

Java代码   收藏代码
  1. public class ExcutorComplementServiceTest {  
  2.   
  3.     /** 
  4.  
  5.     * @param args 
  6.  
  7.     */  
  8.   
  9.     public static void main(String[] args) throws Exception{  
  10.   
  11.         Executor executor = Executors.newFixedThreadPool(3);  
  12.         CompletionService<Integer> cs = new ExecutorCompletionService<Integer>(executor);  
  13.         //List<Future<Integer>> result = new ArrayList<Future<Integer>>(10);  
  14.         for(int i=0; i< 10; i++){  
  15.             cs.submit(new Callable<Integer>() {          
  16.                 @Override  
  17.                 public Integer call() throws Exception {  
  18.                     Random r = new Random();  
  19.                     int init = 0;  
  20.                     for(int i = 0; i<100; i++){  
  21.                         init += r.nextInt();  
  22.                         Thread.sleep(100);  
  23.                     }  
  24.                     return Integer.valueOf(init);  
  25.                 }  
  26.             });  
  27.         }  
  28.         for(int i=0; i<10; i++){  
  29.             Future<Integer> future = cs.take();  
  30.             if(future != null){  
  31.                 System.out.println(future.get());  
  32.             }  
  33.         }   
  34.   
  35.     }  
  36.   
  37.    
  38.   
  39. }  
目录
相关文章
|
4月前
|
Java
CompletionService 使用小结
CompletionService 使用小结
26 1
|
4月前
|
存储 缓存 安全
(八)深入并发之Runnable、Callable、FutureTask及CompletableFuture原理分析
关于Runnable、Callable接口大家可能在最开始学习Java多线程编程时,都曾学习过一个概念:在Java中创建多线程的方式有三种:继承Thread类、实现Runnable接口以及实现Callable接口。但是实则不然,真正创建多线程的方式只有一种:继承Thread类,因为只有`new Thread().start()`这种方式才能真正的映射一条OS的内核线程执行,而关于实现Runnable接口以及实现Callable接口创建出的Runnable、Callable对象在我看来只能姑且被称为“多线程任务”,因为无论是Runnable对象还是Callable对象,最终执行都要交由Threa
|
6月前
|
Java API
java多线程之FutureTask、Future、CompletableFuture
java多线程之FutureTask、Future、CompletableFuture
231 0
JavaThread、Runnable、Callable、线程池的使用
JavaThread、Runnable、Callable、线程池的使用
JavaThread、Runnable、Callable、线程池的使用
ThreadPoolExecutor的中的submit和FutureTask || 通过Executors 创建线程池的一些实例(Callable和Runnable的在其中的体现)
ThreadPoolExecutor的中的submit和FutureTask || 通过Executors 创建线程池的一些实例(Callable和Runnable的在其中的体现)
197 0
|
消息中间件 设计模式 Kafka
CompletionService学习
前面已经说到Future的默认实现是FutureTask,因此你可以看到其在jdk1.5的时候采用的是AQS去实现的,因此具有阻塞性,但jdk1.6之后,可以看到其基于CAS实现的。之所以学习Future,除了其具备异步功能,同时其采用的思想也是在设计模式中有体现的,也即Future模式,而且可以在kafka源码中看到基于Future构建的异步编程。前面说到其基于AQS具有阻塞性,但从源码中,可以看到在jdk1.6之后采用的是CAS
94 0
CompletionService学习
|
安全
线程池中CompletionService的应用
线程池中CompletionService的应用
114 0
|
存储
多线程 - Callable、Future 和 FutureTask 简单应用(二)
多线程 - Callable、Future 和 FutureTask 简单应用(二)
119 0
多线程 - Callable、Future 和 FutureTask 简单应用(二)
|
消息中间件 Dubbo Java
“既生 ExecutorService, 何生 CompletionService?”
“既生 ExecutorService, 何生 CompletionService?”
“既生 ExecutorService, 何生 CompletionService?”
|
存储 Java
JUC系列(六) | Callable和Future接口详解&使用、FutureTask应用 获取异步线程返回值
JUC系列(六) | Callable和Future接口详解&使用、FutureTask应用 获取异步线程返回值
403 0
JUC系列(六) | Callable和Future接口详解&使用、FutureTask应用 获取异步线程返回值