3、CompletionService
如果你向Executor提交了一个批处理任务,并且希望在它们完成后获得结果,怎么办呢?
Java8之前的做法是让返回Futrue,然后调用其get阻塞方法即可。这样做固然可以,但却相当乏味。幸运的是,Java8提供了一个更好的方法:完成服务 (CompletionService)。
CompletionService整合了Executor和BlockingQueue的功能。你可以将Callable任务提交给它去执行,然后使用类似于队列中的take和poll方法,在结果完整可用时获得这个结果,像一个使用BlockingQueue打包的Future。
CompletionService是Java8的新增接口,JDK为其提供了一个实现类ExecutorCompletionService。
原理:内部通过阻塞队列+FutureTask,实现了任务先完成可优先获取到,即结果按照完成先后顺序排序。
/** * CompletionService多线程并发任务结果归集 * * @author fangshixiang@vipkid.com.cn * @description // * @date 2018/10/31 11:29 */ public class CompletionServiceDemo { public static void main(String[] args) { Long start = System.currentTimeMillis(); //开启10个线程 ExecutorService exs = Executors.newFixedThreadPool(10); //结果集 List<Integer> list = new ArrayList<>(); List<Future<Integer>> futureList = new ArrayList<>(); try { int taskCount = 10; //1.定义CompletionService ExecutorCompletionService是此接口的唯一实现类 需要把线程池传进去 CompletionService<Integer> completionService = new ExecutorCompletionService<>(exs); //2.添加任务(向CompletionService添加任务 然后把返回的futrue添加到futureList即可) for (int i = 0; i < taskCount; i++) { futureList.add(completionService.submit(new Task(i + 1))); } //==================结果归集=================== //方法1:future是提交时返回的,遍历queue则按照任务提交顺序,获取结果 (若是按照提交顺序,那和Futrue的Demo结果将一样,没啥优势可言) // for (Future<Integer> future : futureList) { // System.out.println("===================="); // Integer result = future.get();//线程在这里阻塞等待该任务执行完毕,按照 // System.out.println("任务result="+result+"获取到结果!"+new Date()); // list.add(result); // } // //方法2.使用内部阻塞队列的take():内部维护阻塞队列,任务先完成的先获取到 for (int i = 0; i < taskCount; i++) { Integer result = completionService.take().get();//采用completionService.take(), System.out.println("任务i==" + result + "完成!" + new Date()); list.add(result); } System.out.println("list=" + list); System.out.println("总耗时=" + (System.currentTimeMillis() - start)); } catch (Exception e) { e.printStackTrace(); } finally { exs.shutdown();//关闭线程池 } } static class Task implements Callable<Integer> { Integer i; public Task(Integer i) { super(); this.i = i; } @Override public Integer call() throws Exception { if (i == 5) { Thread.sleep(5000); } else { Thread.sleep(1000); } System.out.println("线程:" + Thread.currentThread().getName() + "任务i=" + i + ",执行完成!"); return i; } } }
输出:
线程:pool-1-thread-4任务i=4,执行完成! 任务i==4完成!Wed Oct 31 11:33:35 CST 2018 线程:pool-1-thread-3任务i=3,执行完成! 任务i==3完成!Wed Oct 31 11:33:35 CST 2018 线程:pool-1-thread-2任务i=2,执行完成! 任务i==2完成!Wed Oct 31 11:33:35 CST 2018 线程:pool-1-thread-1任务i=1,执行完成! 任务i==1完成!Wed Oct 31 11:33:35 CST 2018 线程:pool-1-thread-8任务i=8,执行完成! 任务i==8完成!Wed Oct 31 11:33:35 CST 2018 线程:pool-1-thread-7任务i=7,执行完成! 任务i==7完成!Wed Oct 31 11:33:35 CST 2018 线程:pool-1-thread-6任务i=6,执行完成! 任务i==6完成!Wed Oct 31 11:33:35 CST 2018 线程:pool-1-thread-10任务i=10,执行完成! 任务i==10完成!Wed Oct 31 11:33:35 CST 2018 线程:pool-1-thread-9任务i=9,执行完成! 任务i==9完成!Wed Oct 31 11:33:35 CST 2018 线程:pool-1-thread-5任务i=5,执行完成! 任务i==5完成!Wed Oct 31 11:33:39 CST 2018 list=[4, 3, 2, 1, 8, 7, 6, 10, 9, 5] ---》这里证实了确实按照执行完成顺序排序 总耗时=5019 ---》符合逻辑,10个任务,定长5线程池执行,取最长时间。
建议:使用率也挺高,而且能按照完成先后排序,建议如果有排序需求的优先使用。只是多线程并发执行任务结果归集,也可以使用。
优势:能够按照任务完成时间排序,所有有排序需求的,可以考虑使用它。这也是JDK8以前最佳选择
4、CompletableFuture
JDK1.8才新加入的一个实现类,实现了Future, CompletionStage2个接口(CompletionStage接口也是1.8才提供的)
CompletableFuture的简单介绍::
当一个Future可能需要显示地完成时,使用CompletionStage接口去支持完成时触发的函数和操作。当2个以上线程同时尝试完成、异常完成、取消一个CompletableFuture时,只有一个能成功。
CompletableFuture实现了CompletionStage接口的如下策略:
1.为了完成当前的CompletableFuture接口或者其他完成方法的回调函数的线程,提供了非异步的完成操作
2.没有显式入参Executor的所有async方法都使用ForkJoinPool.commonPool()为了简化监视、调试和跟踪,所有生成的异步任务都是标记接口AsynchronousCompletionTask的实例
3.所有的CompletionStage方法都是独立于其他共有方法实现的,因此一个方法的行为不会受到子类中其他方法的覆盖
CompletableFuture实现了Futurre接口的如下策略:
CompletableFuture无法直接控制完成,所以cancel操作被视为是另一种异常完成形式。方法isCompletedExceptionally可以用来确定一个CompletableFuture是否以任何异常的方式完成。
以一个CompletionException为例,方法get()和get(long,TimeUnit)抛出一个ExecutionException,对应CompletionException。为了在大多数上下文中简化用法,这个类还定义了方法join()和getNow,而不是直接在这些情况中直接抛出CompletionException
CompletionStage接口实现流式编程:
JDK8新增接口,此接口包含38个方法…是的,你没看错,就是38个方法。这些方法主要是为了支持函数式编程中流式处理。
CompletableFuture中4个异步执行任务静态方法:
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) { return asyncSupplyStage(asyncPool, supplier); } public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) { return asyncSupplyStage(screenExecutor(executor), supplier); } public static CompletableFuture<Void> runAsync(Runnable runnable) { return asyncRunStage(asyncPool, runnable); } public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) { return asyncRunStage(screenExecutor(executor), runnable); }
如上图,其中supplyAsync用于有返回值的任务,runAsync则用于没有返回值的任务。Executor参数可以手动指定线程池,否则默认ForkJoinPool.commonPool()系统级公共线程池,注意:这些线程都是Daemon线程,主线程结束Daemon线程不结束,只有JVM关闭时,生命周期终止。
组合CompletableFuture:
thenCombine(): 先完成当前CompletionStage和other 2个CompletionStage任务,然后把结果传参给BiFunction进行结果合并操作
三个重载方法如下:
public <U,V> CompletableFuture<V> thenCombine( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) { return biApplyStage(null, other, fn); } public <U,V> CompletableFuture<V> thenCombineAsync( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) { return biApplyStage(asyncPool, other, fn); } public <U,V> CompletableFuture<V> thenCombineAsync( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor) { return biApplyStage(screenExecutor(executor), other, fn); }
thenCompose():第一个CompletableFuture执行完毕后,传递给下一个CompletionStage作为入参进行操作。
三个重载方法如下:
public <U> CompletableFuture<U> thenCompose( Function<? super T, ? extends CompletionStage<U>> fn) { return uniComposeStage(null, fn); } public <U> CompletableFuture<U> thenComposeAsync( Function<? super T, ? extends CompletionStage<U>> fn) { return uniComposeStage(asyncPool, fn); } public <U> CompletableFuture<U> thenComposeAsync( Function<? super T, ? extends CompletionStage<U>> fn, Executor executor) { return uniComposeStage(screenExecutor(executor), fn); }
…]… …等等类似的重载方法有很多,后续会专门开博文讲述它的使用方式,请持续关注我的博客
【小家java】Java8新特性之—CompletableFuture的系统讲解和实例演示(使用CompletableFuture构建异步应用)