CompletableFuture 使用介绍

简介: CompletableFuture 使用介绍

本文安利一个 Java8 的工具 CompletableFuture,这是 Java8 带来的一个非常好用的用于异步编程的类。

还没使用过的小伙伴,赶紧用起来吧。

本文不介绍它的实现源码,仅介绍它的接口使用,本文也不做它和 RxJava 等其他异步编程框架的对比。

 

一、实例化

首先,不管我们要做什么,我们第一步是需要构造出 CompletableFuture 实例。

最简单的,我们可以通过构造函数来进行实例化:

CompletableFuture<String> cf = new CompletableFuture<String>();

这个实例此时还没有什么用,因为它没有实际的任务,我们选择结束这个任务:

1. // 可以选择在当前线程结束,也可以在其他线程结束
2. cf.complete("coding...");

因为 CompletableFuture 是一个 Future,我们用 String result = cf.get() 就能获取到结果了。

CompletableFuture 提供了 join() 方法,它的功能和 get() 方法是一样的,都是阻塞获取值,它们的区别在于 join() 抛出的是 unchecked Exception。

上面的代码确实没什么用,下面介绍几个 static 方法,它们使用任务来实例化一个 CompletableFuture 实例。

1. CompletableFuture.runAsync(Runnable runnable);
2. CompletableFuture.runAsync(Runnable runnable, Executor executor);
3. 
4. CompletableFuture.supplyAsync(Supplier<U> supplier);
5. CompletableFuture.supplyAsync(Supplier<U> supplier, Executor executor)
  • runAsync 方法接收的是 Runnable 的实例,意味着它没有返回值
  • supplyAsync 方法对应的是有返回值的情况
  • 这两个方法的带 executor 的变种,表示让任务在指定的线程池中执行,不指定的话,通常任务是在 ForkJoinPool.commonPool() 线程池中执行的。

好的,现在我们已经有了第一个 CompletableFuture 实例了,我们来看接下来的内容。

二、任务之间的顺序执行

我们先来看执行两个任务的情况,首先执行任务 A,然后将任务 A 的结果传递给任务 B。

其实这里有很多种情况,任务 A 是否有返回值,任务 B 是否需要任务 A 的返回值,任务 B 是否有返回值,等等。有个明确的就是,肯定是任务 A 执行完后再执行任务 B。

 

我们用下面的 6 行代码来说:

1. CompletableFuture.runAsync(() -> {}).thenRun(() -> {}); 
2. CompletableFuture.runAsync(() -> {}).thenAccept(resultA -> {}); 
3. CompletableFuture.runAsync(() -> {}).thenApply(resultA -> "resultB");
4. 
5. CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> {});
6. CompletableFuture.supplyAsync(() -> "resultA").thenAccept(resultA -> {});
7. CompletableFuture.supplyAsync(() -> "resultA").thenApply(resultA -> resultA + " resultB");

前面 3 行代码演示的是,任务 A 无返回值,所以对应的,第 2 行和第 3 行代码中,resultA 其实是 null

第 4 行用的是 thenRun(Runnable runnable),任务 A 执行完执行 B,并且 B 不需要 A 的结果。

第 5 行用的是 thenAccept(Consumer action),任务 A 执行完执行 B,B 需要 A 的结果,但是任务 B 不返回值。

第 6 行用的是 thenApply(Function fn),任务 A 执行完执行 B,B 需要 A 的结果,同时任务 B 有返回值。

这一小节说完了,如果任务 B 后面还有任务 C,往下继续调用 .thenXxx() 即可。

 

三、异常处理

说到这里,我们顺便来说下 CompletableFuture 的异常处理。这里我们要介绍两个方法:

1. public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn);
2. public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);

看下面的代码:

1. CompletableFuture.supplyAsync(() -> "resultA")
2.     .thenApply(resultA -> resultA + " resultB")
3.     .thenApply(resultB -> resultB + " resultC")
4.     .thenApply(resultC -> resultC + " resultD");

上面的代码中,任务 A、B、C、D 依次执行,如果任务 A 抛出异常(当然上面的代码不会抛出异常),那么后面的任务都得不到执行。如果任务 C 抛出异常,那么任务 D 得不到执行。

那么我们怎么处理异常呢?看下面的代码,我们在任务 A 中抛出异常,并对其进行处理:

1. CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
2. throw new RuntimeException();
3. })
4.         .exceptionally(ex -> "errorResultA")
5.         .thenApply(resultA -> resultA + " resultB")
6.         .thenApply(resultB -> resultB + " resultC")
7.         .thenApply(resultC -> resultC + " resultD");
8. 
9. System.out.println(future.join());

上面的代码中,任务 A 抛出异常,然后通过 .exceptionally() 方法处理了异常,并返回新的结果,这个新的结果将传递给任务 B。所以最终的输出结果是:

errorResultA resultB resultC resultD

再看下面的代码,我们来看下另一种处理方式,使用 handle(BiFunction fn) 来处理异常:

1. CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "resultA")
2.         .thenApply(resultA -> resultA + " resultB")
3. // 任务 C 抛出异常
4.         .thenApply(resultB -> {throw new RuntimeException();})
5. // 处理任务 C 的返回值或异常
6.         .handle(new BiFunction<Object, Throwable, Object>() {
7. @Override
8. public Object apply(Object re, Throwable throwable) {
9. if (throwable != null) {
10. return "errorResultC";
11.                 }
12. return re;
13.             }
14.         })
15.         .thenApply(resultC -> resultC + " resultD");
16. 
17. System.out.println(future.join());

上面的代码使用了 handle 方法来处理任务 C 的执行结果,上面的代码中,rethrowable 必然有一个是 null,它们分别代表正常的执行结果和异常的情况。

当然,它们也可以都为 null,因为如果它作用的那个 CompletableFuture 实例没有返回值的时候,re 就是 null。

四、取两个任务的结果

上面一节,我们说的是,任务 A 执行完 -> 任务 B 执行完 -> 执行任务 C,它们之间有先后执行关系,因为后面的任务依赖于前面的任务的结果。

 

这节我们来看怎么让任务 A 和任务 B 同时执行,然后取它们的结果进行后续操作。这里强调的是任务之间的并行工作,没有先后执行顺序。

 

如果使用 Future 的话,我们通常是这么写的:

1. ExecutorService executorService = Executors.newCachedThreadPool();
2. 
3. Future<String> futureA = executorService.submit(() -> "resultA");
4. Future<String> futureB = executorService.submit(() -> "resultB");
5. 
6. String resultA = futureA.get();
7. String resultB = futureB.get();

接下来,我们看看 CompletableFuture 中是怎么写的,看下面的几行代码:

1. CompletableFuture<String> cfA = CompletableFuture.supplyAsync(() -> "resultA");
2. CompletableFuture<String> cfB = CompletableFuture.supplyAsync(() -> "resultB");
3. 
4. cfA.thenAcceptBoth(cfB, (resultA, resultB) -> {});
5. cfA.thenCombine(cfB, (resultA, resultB) -> "result A + B");
6. cfA.runAfterBoth(cfB, () -> {});

第 3 行代码和第 4 行代码演示了怎么使用两个任务的结果 resultA 和 resultB,它们的区别在于,thenAcceptBoth 表示后续的处理不需要返回值,而 thenCombine 表示需要返回值。

如果你不需要 resultA 和 resultB,那么还可以使用第 5 行描述的 runAfterBoth 方法。

注意,上面的写法和下面的写法是没有区别的:

1. CompletableFuture<String> cfA = CompletableFuture.supplyAsync(() -> "resultA");
2. 
3. cfA.thenAcceptBoth(CompletableFuture.supplyAsync(() -> "resultB"),
4.         (resultA, resultB) -> {});

千万不要以为这种写法任务 A 执行完了以后再执行任务 B。

五、取多个任务的结果

接下来,我们将介绍两个非常简单的静态方法:allOf() 和 anyOf() 方法。

1. public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs){...}
2. public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {...}

这两个方法都非常简单,简单介绍一下。

1. CompletableFuture cfA = CompletableFuture.supplyAsync(() -> "resultA");
2. CompletableFuture cfB = CompletableFuture.supplyAsync(() -> 123);
3. CompletableFuture cfC = CompletableFuture.supplyAsync(() -> "resultC");
4. 
5. CompletableFuture<Void> future = CompletableFuture.allOf(cfA, cfB, cfC);
6. // 所以这里的 join() 将阻塞,直到所有的任务执行结束
7. future.join();

由于 allOf 聚合了多个 CompletableFuture 实例,所以它是没有返回值的。这也是它的一个缺点。

anyOf 也非常容易理解,就是只要有任意一个 CompletableFuture 实例执行完成就可以了,看下面的例子:

1. CompletableFuture cfA = CompletableFuture.supplyAsync(() -> "resultA");
2. CompletableFuture cfB = CompletableFuture.supplyAsync(() -> 123);
3. CompletableFuture cfC = CompletableFuture.supplyAsync(() -> "resultC");
4. 
5. CompletableFuture<Object> future = CompletableFuture.anyOf(cfA, cfB, cfC);
6. Object result = future.join();

最后一行的 join() 方法会返回最先完成的任务的结果,所以它的泛型用的是 Object,因为每个任务可能返回的类型不同。

五、either 方法

如果你的 anyOf(...) 只需要处理两个 CompletableFuture 实例,那么也可以使用 xxxEither() 来处理,

1. cfA.acceptEither(cfB, result -> {});
2. cfA.acceptEitherAsync(cfB, result -> {});
3. cfA.acceptEitherAsync(cfB, result -> {}, executorService);
4. 
5. cfA.applyToEither(cfB, result -> {return result;});
6. cfA.applyToEitherAsync(cfB, result -> {return result;});
7. cfA.applyToEitherAsync(cfB, result -> {return result;}, executorService);
8. 
9. cfA.runAfterEither(cfA, () -> {});
10. cfA.runAfterEitherAsync(cfB, () -> {});
11. cfA.runAfterEitherAsync(cfB, () -> {}, executorService);

上面的各个带 either 的方法,表达的都是一个意思,指的是两个任务中的其中一个执行完成,就执行指定的操作

它们几组的区别也很明显,分别用于表达是否需要任务 A 和任务 B 的执行结果,是否需要返回值。

大家可能会对这里的几个变种有盲区,这里顺便说几句。

1、cfA.acceptEither(cfB, result -> {});cfB.acceptEither(cfA, result -> {}); 是一个意思;

2、第二个变种,加了 Async 后缀的方法,代表将需要执行的任务放到 ForkJoinPool.commonPool() 中执行(非完全严谨);第三个变种很好理解,将任务放到指定线程池中执行;

3、难道第一个变种是同步的?不是的,而是说,它由任务 A 或任务 B 所在的执行线程来执行,取决于哪个任务先结束。

 

转载自:https://javadoop.com/post/completable-future

------------------------------------------------------------------------------------------------------------------------------

上面的文章内容还不错,但是并不太全面,建议结合源码结合下面推荐的书一起学习效果会更好

相关参考:

1、《Java 8函数式编程》  9.6 CompletableFuture

2、《Java8实战》 第11章  CompletableFuture 组合式异步编程

3、《Java教程》廖雪峰视频 https://www.bilibili.com/video/av54953654/?p=17

相关文章
|
4月前
|
存储 Java API
MinIO Java SDK 7.1.4 升级到 8.5.17 需要注意什么
现在我需要你帮我分析对比这个两个sdk在对外的接口设计上是否有不兼容的变更
305 5
|
7月前
|
SQL 存储 数据库
【赵渝强老师】达梦数据库的归档模式
本文介绍了达梦数据库备份与恢复中重做日志文件的作用,重点讲解了归档模式的必要性及其配置方法。文章分析了非归档模式可能导致的数据丢失问题,并推荐使用归档模式以保障数据一致性和完整性。归档模式分为本地归档和远程归档:本地归档将重做日志存储在本地,而远程归档适用于集群环境,确保所有节点拥有完整日志。文中还详细展示了如何通过SQL命令开启归档模式,包括切换状态、设置路径及验证配置等步骤,并附有视频教程辅助理解。
358 1
|
存储 缓存 负载均衡
带你认识DM 共享存储数据库集群
带你认识DM 共享存储数据库集群
383 3
|
缓存 前端开发 JavaScript
Rails应用慢如蜗牛?揭开数据库到前端的全方位性能优化秘籍,从此告别龟速加载!
【8月更文挑战第31天】本文探讨了Ruby on Rails应用的性能优化方法,涵盖数据库查询与前端渲染。通过具体代码示例,介绍了如何使用`includes`避免N+1查询问题,利用缓存机制提高效率,以及通过合并和压缩CSS及JavaScript文件优化前端渲染。这些技巧有助于全面提升应用性能和用户体验。
167 1
|
存储 关系型数据库 MySQL
(十九)MySQL之表分区篇:涨知识了!携手共探鲜为人知的表分区!
分库分表相信大家都听说过,但(partitioning)表分区这个概念却鲜为人知,MySQL在5.1版本中开始支持了表分区技术,同时在MySQL5.5中进行了优化,自从MySQL支持的绝大部分引擎都开启了表分区功能。
1193 2
|
SQL 关系型数据库 MySQL
MySQL----配置双主双从
本文档详细介绍了如何在四台服务器上配置MySQL的双主双从架构。首先,通过关闭防火墙和SELinux确保网络通信畅通无阻。接着,设置各服务器的主机名和本地Host,确保名称解析正确。然后,通过YUM安装MySQL并修改初始密码。接下来,逐步配置四个节点(master01、master02、slave01、slave02),包括修改配置文件、创建用户和授权等步骤,实现主从复制。最后,通过SQL命令验证主从同步是否成功。
|
人工智能 前端开发 数据可视化
手猫助手Agent技术探索总结(2)
手猫助手Agent技术探索总结
282 8
|
存储 Java API
SpringBoot + MinIO 实现文件切片极速上传技术
【8月更文挑战第19天】在现代互联网应用中,文件上传是一个常见且重要的功能。然而,随着文件体积的增大,传统的文件上传方式往往面临效率低下、耗时过长等问题。为了提升大文件上传的速度和效率,我们可以采用文件切片上传技术,并结合SpringBoot和MinIO来实现这一功能。
525 0
|
数据采集 SQL DataWorks
DataWorks产品使用合集之如何配置数据质量监控
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
167 0
|
Java fastjson Maven
写给大忙人看的 - Java中上传文件MinIO服务器(2)
上一篇 写给大忙人看的 - 搭建文件服务器 MinIO(一),我们已经成功地搭建了 MinIO 文件服务器,这一篇讲解在 Java 中如何上传文件至 MinIO
815 0
写给大忙人看的 - Java中上传文件MinIO服务器(2)