当使用ExecutorService启动了多个Callable后,每个Callable会产生一个Future,我们需要将多个Future存入一个线性表,用于之后处理数据。当然,还有更复杂的情况,有5个生产者线程,每个生产者线程都会创建任务,所有任务的Future都存放到同一个线性表中。另有一个消费者线程,从线性表中取出Future进行处理。
CompletionService正是为此而存在,它是一个更高级的ExecutorService,它本身自带一个线程安全的线性表,无需用户额外创建。它提供了2种方法从线性表中取出结果,poll()是非阻塞的,若目前无结果,返回一个null,线程继续运行不阻塞。take()是阻塞的,若当前无结果,则线程阻塞,直到产生一个结果,被取出返回,线程才继续运行。
public class Test { public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService executor = Executors.newCachedThreadPool(); CompletionService<Integer> comp = new ExecutorCompletionService<>(executor); for(int i = 0; i<5; i++) { comp.submit(new Task()); } executor.shutdown(); int count = 0, index = 1; while(count<5) { Future<Integer> f = comp.poll(); if(f == null) { System.out.println(index + " 没发现有完成的任务"); }else { System.out.println(index + "产生了一个随机数: " + f.get()); count++; } index++; TimeUnit.MILLISECONDS.sleep(500); } } } class Task implements Callable<Integer> { @Override public Integer call() throws Exception { Random rand = new Random(); TimeUnit.SECONDS.sleep(rand.nextInt(7)); return rand.nextInt(); } }
实际运用小案例:
模拟页面渲染
package com.thread; import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; /** * CompletionService的运用:当Executor遇见BlockingQueue时使用。 * 使用场景: * 如果向Executor提交了一个批处理任务,并且希望在他们完成后获得结果。 * * CompletionService整合了Executor和BlockingQueue的功能,你可以将Callable任务提交给它执行, * 然后使用类似队列中的take和poll方法,在结果完整可用时(只是等待任意一个future的返回值),获得这个结果。就像一个大包的Future。 * * completionService.take()的说明是:检索并移除已完成的任务,如果没有任何一个任务完成的,则继续等待 * * 从案例的结果可以看出,每当图片下载完毕后,就会执行渲染操作。 * take方法只是检索completionService中所有future,看是否有执行完的任务,并获得结果。 * * @author hadoop * */ public class CompletionThread { static ExecutorService mExecutor = Executors.newFixedThreadPool(5); static int totalTimeDownPhoto =0; /** * 模拟页面渲染场景 */ static void renderPage(){ final List<String> info = new ArrayList<String>(); for (int i = 0; i < 20; i++) { info.add("图片" + i); } CompletionService<String> completionService = new ExecutorCompletionService<String>(mExecutor); /** * 开启多线程处理下载图片的任务 */ for(final String str : info){ completionService.submit(new Callable<String>() { @Override public String call() throws Exception { //下载图片download(str) int randomTime = new Random().nextInt(9) + 1;//限制耗时不会出现0s,不会大于10s Thread.sleep(randomTime*1000); System.out.println("下载" + str +"耗费了" + randomTime + "s"); computeTime(randomTime); return str; } }); } try { System.out.println("处理文字渲染的逻辑"); int randomTime = new Random().nextInt(9) + 1; Thread.sleep(1000*randomTime); computeTime(randomTime); System.out.println("处理文字渲染的逻辑耗费了" + randomTime + "s"); /** * 如果渲染图片耗时也比较久,也可以使用多线程。这里只是模拟,没有使用多线程处理渲染图片的过程 */ for (int i = 0; i < info.size(); i++) { //take检索并移除已完成的任务,如果没有任何一个任务完成的,则继续等待 Future<String> f = completionService.take(); //处理渲染图片的逻辑 randomTime = new Random().nextInt(3) + 1; Thread.sleep(1000*randomTime); computeTime(randomTime); System.out.println("渲染"+f.get() +"耗时"+randomTime+"s"); } } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } /** * 只有执行了shutdown方法,执行isTerminated才有效。否则isTerminated一直为ture */ mExecutor.shutdown(); while(true){ if(mExecutor.isTerminated()){ System.out.println("所有任务都执行完了,关闭线程池"); break; } } } /** * 统计下载图片所花费的总时间 * @param randomTime */ static void computeTime(int randomTime){ synchronized (mExecutor) { totalTimeDownPhoto += randomTime; } } public static void main(String[] args) { long start = System.currentTimeMillis(); renderPage(); long end = System.currentTimeMillis(); System.out.println("渲染页面总耗时:"+(end - start)); System.out.println("下载每张图片,渲染每张图片以及渲染文字的合计耗时是:"+ totalTimeDownPhoto); int saveTime = (int) (totalTimeDownPhoto - (end - start)/1000); System.out.println("总节约时间:"+ saveTime+"s"); } }