一.CompletionService接口提供了可以操作异步任务的功能,其唯一实现的API为ExecutorCompletionService。此API只是可以获取异步任务执行的结果,它不是ExecutorService。
其有5个核心方法:
- Future<V> poll():同步操作,获取并移除第一已经完成的任务,否则返回null。
- Future<V> poll(timeout):同步操作,获取并移除第一个已经完成的任务,阻塞时间为timeout,否则返回null;支持InterruptedException。
- Future<V> submit(Callable<V> task):提交任务,并获取任务执行结果的句柄。
- Future<V> submit(Runnable,V result):提交任务,并获取任务执行结果的句柄。
- Future<V> take():获取并移除第一个执行完成的任务,阻塞,直到有任务返回。支持InterruptedException。
ExecutorCompletionService之说以能够提供此功能,原因就是其内部持有一个BockingQueue(此queue可以通过构造器传入指定)。
同时这还要借助Future/FutureTask的功能。
- public ExecutorCompletionService(Executor executor,BlockingQueue<Future<V>> completionQueue):需要指定一个现有的executor和用于存储Future的队列,此后通过submit提交的任务都将有executor来执行,并将"Future句柄"添加到队列中;这个API很像一个"修饰者".
二.Future:提供了可以查看异步执行的结果。此接口提供了多个方便的方法,以便检测和控制任务的操作。
- boolean cancel(boolean interruptIfRunning):试图取消任务的执行,如果任务已经完成或者取消,此操作将无效。如果任务尚未启动(start),那么任务将不会被执行,如果任务正在执行,则interruptIfRunning参数决定是否中断任务线程(线程需要相应“中断”)。此方法返回后,isDone将返回true;如果方法取消成功,则isCancelled()则返回true。
- V get():等待并获取执行结果。此方法会阻塞,知道结果返回。此方法会在线程中断时抛出InterruptException,如果任务被取消,将;抛出异常。
- V get(timeout):阻塞指定的时间。如果时间超时,仍未能执行完成,则抛出timeoutException。
RunnableFuture接口扩展了Future接口和Runnable,只提供(覆盖)run()方法,其作用非常简单,就是标示其子类具有可执行run方法,且获取Future结果。
三.FutureTask就是RunnableFuture的子类,具有Future接口的可取消任务的能力,以及获取异步计算结果的能力。FutureTask可以认为是一个runnable和callable任务的桥梁类,其构造函数可以接受这两种任务。
- FutureTask(Callable<V> callable)
- FutureTask(Runnable runnable, V result):当运行结束后,将返回指定的result。
此外,还有几个特殊的方法:
- protected void done():可重写的方法,当任务执行结束后,将会调用此方法执行额外的操作。
- protected void set(V v):会被run方法内部调用,用来设置执行结果,此结果可以通过get获取。
runnable类型的任务,会在FutureTask中转化成Callable(参见Executors.callable(runnable,result),原理很简单,创建一个Callable实例,即在调用call时间接的调用run(),
并在执行结束后,返回指定的result)。
四.ExecutorCompletionService:提交给ExecutorCompletionService的任务,会被封装成一个QueueingFuture(一个FutureTask子类),此类的唯一作用就是在done()方法中,增加了将执行的FutureTask加入了内部队列,此时外部调用者,就可以take到相应的执行结束的任务。(take就是从blockingQueue中依次获取)
- public class ExcutorComplementServiceTest {
- /**
- * @param args
- */
- public static void main(String[] args) throws Exception{
- Executor executor = Executors.newFixedThreadPool(3);
- CompletionService<Integer> cs = new ExecutorCompletionService<Integer>(executor);
- //List<Future<Integer>> result = new ArrayList<Future<Integer>>(10);
- for(int i=0; i< 10; i++){
- cs.submit(new Callable<Integer>() {
- @Override
- public Integer call() throws Exception {
- Random r = new Random();
- int init = 0;
- for(int i = 0; i<100; i++){
- init += r.nextInt();
- Thread.sleep(100);
- }
- return Integer.valueOf(init);
- }
- });
- }
- for(int i=0; i<10; i++){
- Future<Integer> future = cs.take();
- if(future != null){
- System.out.println(future.get());
- }
- }
- }
- }