- 你有一个思想,我有一个思想,我们交换后,一个人就有两个思想
- If you can NOT explain it simply, you do NOT understand it well enough
前言
上一篇文章 不会用Java Future,我怀疑你泡茶没我快 全面分析了 Future,通过它我们可以获取线程的执行结果,它虽然解决了 Runnable 的 “三无” 短板,但是它自身还是有短板:
不能手动完成计算
假设你使用 Future 运行子线程调用远程 API 来获取某款产品的最新价格,服务器由于洪灾宕机了,此时如果你想手动结束计算,而是想返回上次缓存中的价格,这是 Future 做不到的
调用 get() 方法会阻塞程序
Future 不会通知你它的完成,它提供了一个get()方法,程序调用该方法会阻塞直到结果可用为止,没有办法利用回调函数附加到Future,并在Future的结果可用时自动调用它
不能链式执行
烧水泡茶中,通过构造函数传参做到多个任务的链式执行,万一有更多的任务,或是任务链的执行顺序有变,对原有程序的影响都是非常大的
整合多个 Future 执行结果方式笨重
假设有多个 Future 并行执行,需要在这些任务全部执行完成之后做后续操作,Future 本身是做不到的,需要借助工具类 Executors
的方法
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) <T> T invokeAny(Collection<? extends Callable<T>> tasks)
没有异常处理
Future 同样没有提供很好的异常处理方案
上一篇文章看 Future 觉得是发现了新天地,这么一说有感觉回到了解放前
对于 Java 后端的同学,在 Java1.8 之前想实现异步编程,还想避开上述这些烦恼,ReactiveX 应该是一个常见解决方案(做Android 的应该会有了解)。如果熟悉前端同学, ES6 Promise(男朋友的承诺)也解决了异步编程的烦恼
天下语言都在彼此借鉴相应优点,Java 作为老牌劲旅自然也要解决上述问题。又是那个男人,并发大师 Doug Lea 忧天下程序员之忧,解天下程序员之困扰,在 Java1.8 版本(Lambda 横空出世)中,新增了一个并发工具类 CompletableFuture,它的出现,让人在泡茶过程中,品尝到了不一样的味道......
几个重要 Lambda 函数
CompletableFuture 在 Java1.8 的版本中出现,自然也得搭上 Lambda 的顺风车,为了更好的理解 CompletableFuture,这里我需要先介绍一下几个 Lambda 函数,我们只需要关注它们的以下几点就可以:
- 参数接受形式
- 返回值形式
- 函数名称
Runnable
Runnable 我们已经说过无数次了,无参数,无返回值
@FunctionalInterface public interface Runnable { public abstract void run(); }
Function
Function 接受一个参数,并且有返回值
@FunctionalInterface public interface Function<T, R> { R apply(T t); }
Consumer
Consumer 接受一个参数,没有返回值
@FunctionalInterface public interface Consumer<T> { void accept(T t); }
Supplier
Supplier 没有参数,有一个返回值
@FunctionalInterface public interface Supplier<T> { T get(); }
BiConsumer
BiConsumer 接受两个参数(Bi, 英文单词词根,代表两个的意思),没有返回值
@FunctionalInterface public interface BiConsumer<T, U> { void accept(T t, U u);
好了,我们做个小汇总
有些同学可能有疑问,为什么要关注这几个函数式接口,因为 CompletableFuture 的函数命名以及其作用都是和这几个函数式接口高度相关的,一会你就会发现了
前戏做足,终于可以进入正题了 CompletableFuture
CompletableFuture
类结构
老规矩,先从类结构看起:
实现了 Future 接口
实现了 Future 接口,那就具有 Future 接口的相关特性,请脑补 Future 那少的可怜的 5 个方法,这里不再赘述,具体请查看 不会用Java Future,我怀疑你泡茶没我快
实现了 CompletionStage 接口
CompletionStage 这个接口还是挺陌生的,中文直译过来是【竣工阶段】,如果将烧水泡茶比喻成一项大工程,他们的竣工阶段体现是不一样的
- 单看线程1 或单看线程 2 就是一种串行关系,做完一步之后做下一步
- 一起看线程1 和 线程 2,它们彼此就是并行关系,两个线程做的事彼此独立互补干扰
- 泡茶就是线程1 和 线程 2 的汇总/组合,也就是线程 1 和 线程 2 都完成之后才能到这个阶段(当然也存在线程1 或 线程 2 任意一个线程竣工就可以开启下一阶段的场景)
所以,CompletionStage 接口的作用就做了这点事,所有函数都用于描述任务的时序关系,总结起来就是这个样子:
CompletableFuture 既然实现了两个接口,自然也就会实现相应的方法充分利用其接口特性,我们走进它的方法来看一看
CompletableFuture 大约有50种不同处理串行,并行,组合以及处理错误的方法。小弟屏幕不争气,方法之多,一个屏幕装不下,看到这么多方法,是不是瞬间要直接 收藏——>吃灰
2连走人?别担心,我们按照相应的命名和作用进行分类,分分钟搞定50多种方法
串行关系
then
直译【然后】,也就是表示下一步,所以通常是一种串行关系体现, then 后面的单词(比如 run /apply/accept)就是上面说的函数式接口中的抽象方法名称了,它的作用和那几个函数式接口的作用是一样一样滴
CompletableFuture<Void> thenRun(Runnable action) CompletableFuture<Void> thenRunAsync(Runnable action) CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor) <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) CompletableFuture<Void> thenAccept(Consumer<? super T> action) CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor) <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor)
聚合 And 关系
combine... with...
和 both...and...
都是要求两者都满足,也就是 and 的关系了
<U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor) <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action) <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action) <U> CompletableFuture<Void> thenAcceptBothAsync( CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor) CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action) CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action) CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor)
聚合 Or 关系
Either...or...
表示两者中的一个,自然也就是 Or 的体现了
<U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn) <U> CompletableFuture<U> applyToEitherAsync(、CompletionStage<? extends T> other, Function<? super T, U> fn) <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn, Executor executor) CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action) CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action) CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor) CompletableFuture<Void> runAfterEither(CompletionStage<?> other, Runnable action) CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action) CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action, Executor executor)
异常处理
CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn) CompletableFuture<T> exceptionallyAsync(Function<Throwable, ? extends T> fn) CompletableFuture<T> exceptionallyAsync(Function<Throwable, ? extends T> fn, Executor executor) CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action) CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action) CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor) <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn) <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn) <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)
这个异常处理看着还挺吓人的,拿传统的 try/catch/finally 做个对比也就瞬间秒懂了
whenComplete 和 handle 的区别如果你看接受的参数函数式接口名称你也就能看出差别了,前者使用Comsumer, 自然也就不会有返回值;后者使用 Function,自然也就会有返回值
这里并没有全部列举,不过相信很多同学已经发现了规律:
CompletableFuture 提供的所有回调方法都有两个异步(Async)变体,都像这样
// thenApply() 的变体 <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
另外,方法的名称也都与前戏中说的函数式接口完全匹配,按照这中规律分类之后,这 50 多个方法看起来是不是很轻松了呢?
基本方法已经罗列的差不多了,接下来我们通过一些例子来实际演示一下:
案例演示
创建一个 CompletableFuture 对象
创建一个 CompletableFuture 对象并没有什么稀奇的,依旧是通过构造函数构建
CompletableFuture<String> completableFuture = new CompletableFuture<String>();
这是最简单的 CompletableFuture 对象创建方式,由于它实现了 Future 接口,所以自然就可以通过 get() 方法获取结果
String result = completableFuture.get();
文章开头已经说过,get()方法在任务结束之前将一直处在阻塞状态,由于上面创建的 Future 没有返回,所以在这里调用 get() 将会永久性的堵塞
这时就需要我们调用 complete() 方法手动的结束一个 Future
completableFuture.complete("Future's Result Here Manually");
这时,所有等待这个 Future 的 client 都会返回手动结束的指定结果
runAsync
使用 runAsync
进行异步计算
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { throw new IllegalStateException(e); } System.out.println("运行在一个单独的线程当中"); }); future.get();
由于使用的是 Runnable 函数式表达式,自然也不会获取到结果
supplyAsync
使用 runAsync
是没有返回结果的,我们想获取异步计算的返回结果需要使用 supplyAsync()
方法
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { throw new IllegalStateException(e); } log.info("运行在一个单独的线程当中"); return "我有返回值"; }); log.info(future.get());
由于使用的是 Supplier 函数式表达式,自然可以获得返回结果
我们已经多次说过,get() 方法在Future 计算完成之前会一直处在 blocking 状态下,对于真正的异步处理,我们希望的是可以通过传入回调函数,在Future 结束时自动调用该回调函数,这样,我们就不用等待结果