①. CompletableFuture概述
①. 在Java8中,CompletableFuture提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合CompletableFuture的方法
②. 它可能代表一个明确完成的Future,也有可能代表一个完成阶段(CompletionStage),它支持在计算完成以后触发一些函数或执行某些动作
③. 它实现了Future和CompletionStage接口
④. CompletionStage接口说明
CompletionStage代表异步计算过程中的某一个阶段,一个阶段完成后可能会触发另外一个阶段
一个阶段的计算执行可以是一个Funcation、Consumer、Runnable。比
如:stage.thenApply (x->square(x)).thenAccept(x->System.out.println(x)).thenRun(()->{System.out.println()});
一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发.有些类似Linux系统的管道分隔符传参数
public class CompletableFutureTest2 { public static void main(String[] args)throws Exception { /** 1.当一个线程依赖另一个线程时,可以使用thenApply()方法来把这两个线程串行化(第二个任务依赖第一个任务的结果) public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) 2.它可以处理正常的计算结果,或者异常情况 public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action) 3.异常的处理操作 public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn) */ CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) {e.printStackTrace();} return 1; }).thenApply(result -> { return result+3; }).whenComplete((v,e)->{ if(e==null){ System.out.println(Thread.currentThread().getName()+"\t"+"result = " + v); } }).exceptionally(e->{ e.printStackTrace(); return null; }); System.out.println(Thread.currentThread().getName()+"\t"+"over..."); //主线程不要立即结束,否则CompletableFuture默认使用的线程池会立即关闭,暂停几秒 try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) {e.printStackTrace();} } }
②. CompletableFuture创建方式
①. CompletableFuture 提供了四个静态方法来创建一个异步操作
runAsync方法不支持返回值
supplyAsync可以支持返回值
//runAsync方法不支持返回值 public static CompletableFuture<Void> runAsync(Runnable runnable) public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) //supplyAsync可以支持返回值 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
②. 没有指定Executor的方法会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码。如果指定线程池,则使用指定的线程池运行。以下所有的方法都类同
public class CompletableFutureTest { public static void main(String[] args) throws Exception{ ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 5, 2L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(3)); //(1). CompletableFuture.runAsync(Runnable runnable); CompletableFuture future1=CompletableFuture.runAsync(()->{ System.out.println(Thread.currentThread().getName()+"*********future1 coming in"); }); //这里获取到的值是null System.out.println(future1.get()); //(2). CompletableFuture.runAsync(Runnable runnable,Executor executor); CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> { //ForkJoinPool.commonPool-worker-9 System.out.println(Thread.currentThread().getName() + "\t" + "*********future2 coming in"); }, executor); //(3).public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(() -> { //pool-1-thread-1 System.out.println(Thread.currentThread().getName() + "\t" + "future3带有返回值"); return 1024; }); System.out.println(future3.get()); //(4).public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) CompletableFuture<Integer> future4 = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + "\t" + "future4带有返回值"); return 1025; }, executor); System.out.println(future4.get()); //关闭线程池 executor.shutdown(); } }