CompletableFuture 异步编排

简介: CompletableFuture 异步编排

业务场景

查询商品详情页的逻辑比较复杂,有些数据还需要远程调用,必然需要花费更多的时间。

假如商品详情页的每个查询,需要如下标注的时间才能完成 那么,用户需要 5.5s 后才能看到商品详情页的内容。很显然是不能接受的。 如果有多个线程同时完成这 6 步操作,也许只需要 1.5s 即可完成响应。

需求产生

Future 是 Java 5 添加的类,用来描述一个异步计算的结果。你可以使用`isDone`方法检查计算是否完成,或者使用`get`阻塞住调用线程,直到计算完成返回结果,你也可以使用`cancel`方法停止任务的执行。

虽然`Future`以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不 方便,只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和我们的异步编程的 初衷相违背,轮询的方式又会耗费无谓的 CPU 资源,而且也不能及时地得到计算结果,为 什么不能用观察者设计模式当计算结果完成及时通知监听者呢?

很多语言,比如 Node.js,采用回调的方式实现异步编程。Java 的一些框架,比如 Netty,自 己扩展了 Java 的 `Future`接口,提供了`addListener`等多个扩展方法;Google guava 也提供了 通用的扩展 Future;Scala 也提供了简单易用且功能强大的 Future/Promise 异步编程模式。

作为正统的 Java 类库,是不是应该做点什么,加强一下自身库的功能呢?

在 Java 8 中, 新增加了一个包含 50 个方法左右的类: CompletableFuture,提供了非常强大的Future 的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以 通过回调的方式处理计算结果,并且提供了转换和组合 CompletableFuture 的方法。

CompletableFuture 类实现了 Future 接口,所以你还是可以像以前一样通过`get`方法阻塞或 者轮询的方式获得结果,但是这种方式不推荐使用。

CompletableFuture 和 FutureTask 同属于 Future 接口的实现类,都可以获取线程的执行结果。

创建异步对象

CompletableFuture 提供了四个静态方法来创建一个异步操作。

1、runXxxx 都是没有返回结果的,supplyXxx 都是可以获取返回结果的

2、可以传入自定义的线程池,否则就用默认的线程池;

public static  ExecutorService executor = Executors.newFixedThreadPool(10);

1.         CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
2.             System.out.println("当前线程:" + Thread.currentThread().getId());
3. int i = 10 / 2;
4.             System.out.println("运行结果:" + i);
5.         }, executor);
1.         CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
2.             System.out.println("当前线程:" + Thread.currentThread().getId());
3. int i = 10 / 2;
4.             System.out.println("运行结果:" + i);
5. return i;
6.         }, executor);
7. Integer integer = future.get();

计算完成时回调方法

whenComplete 可以处理正常和异常的计算结果,exceptionally 处理异常情况。

whenComplete 和 whenCompleteAsync 的区别:

whenComplete:是执行当前任务的线程执行继续执行 whenComplete 的任务。

whenCompleteAsync:是执行把 whenCompleteAsync 这个任务继续提交给线程池 来进行执行。

方法不以 Async 结尾,意味着 Action 使用相同的线程执行,而 Async 可能会使用其他线程 执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)

1.         CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
2.             System.out.println("当前线程:" + Thread.currentThread().getId());
3. int i = 10 / 2;
4.             System.out.println("运行结果:" + i);
5. return i;
6.         }, executor).whenComplete((res,exception)->{
7. //虽然能得到异常信息,但不能修改返回结果
8.             System.out.println("异步任务完成...结果是"+res+";异常是:"+exception);
9.         }).exceptionally(throwable -> {
10. //可以感知异常,同时返回结果
11. return  10;
12.         });

handle 方法

和 complete 一样,可对结果做最后的处理(可处理异常),可改变返回值。

1.         CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
2.             System.out.println("当前线程:" + Thread.currentThread().getId());
3. int i = 10 / 2;
4.             System.out.println("运行结果:" + i);
5. return i;
6.         }).handle((res,thr)->{
7. if(res!=null){
8. return  res*2;
9.             }else {
10. return  0;
11.             }
12.         });

线程串行化方法

带有 Async 默认是异步执行的。同之前。

thenApply 方法:当一个线程依赖另一个线程时,获取上一个任务返回的结果,并返回当前 任务的返回值。

1. 
2.         CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
3.             System.out.println("当前线程:" + Thread.currentThread().getId());
4. int i = 10 / 2;
5.             System.out.println("运行结果:" + i);
6. return i;
7.         }, executor).thenApplyAsync((res) -> {
8. return res + "hello";
9.         }, executor);
10. String s = f2.get();

       /**
        * thenApplyAsync:能接收上一步的结果,有返回值
        * .thenApplyAsync((res) -> {
        *             return res + "hello";
        *         }, executor);
        */

thenAccept 方法:消费处理结果。接收任务的处理结果,并消费处理,无返回结果。

1. 
2.         CompletableFuture.supplyAsync(() -> {
3.             System.out.println("当前线程:" + Thread.currentThread().getId());
4. int i = 10 / 2;
5.             System.out.println("运行结果:" + i);
6. return i;
7.         },executor).thenAcceptAsync((res)->{
8.             System.out.println("任务2开始启动");
9.             System.out.println(res);
10.         });

       /**
        * thenAcceptAsync:能接收上一步的结果,但是无返回值
        * thenAcceptAsync((res)->{
        *             System.out.println("任务2开始启动");
        *             System.out.println(res);
        *         })
        */

thenRun 方法:只要上面的任务执行完成,就开始执行 thenRun,只是处理完任务后,执行thenRun 的后续操作

1. 
2.         CompletableFuture<Void> future1 = CompletableFuture.supplyAsync(() -> {
3.             System.out.println("当前线程:" + Thread.currentThread().getId());
4. int i = 10 / 2;
5.             System.out.println("运行结果:" + i);
6. return i;
7.         },executor).thenRunAsync(()->{
8.             System.out.println("任务2开始执行了...");
9.         });

       /**
        *      thenRun:不能获取到上一步的执行结果 ,无返回值
        *          .thenRunAsync(()->{
        *             System.out.println("任务2开始执行了...");
        *         });
        */

以上都要前置任务成功完成。

Function<? super T,? extends U>

T:上一个任务返回结果的类型

U:当前任务的返回值类型

两任务组合 - 都要完成

两个任务必须都完成,触发该任务。

thenCombine:组合两个 future,获取两个 future 的返回结果,并返回当前任务的返回值

thenAcceptBoth:组合两个 future,获取两个 future 任务的返回结果,然后处理任务,没有 返回值。

runAfterBoth:组合两个 future,不需要获取 future 的结果,只需两个 future 处理完任务后, 处理该任务。

1.         CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
2.             System.out.println("当前线程:" + Thread.currentThread().getId());
3. int i = 10 / 2;
4.             System.out.println("运行结果:" + i);
5. return i+"task01";
6.         }, executor);
7. 
8.        CompletableFuture.supplyAsync(() -> {
9.             System.out.println("当前线程:" + Thread.currentThread().getId());
10. int i = 10 / 2;
11.             System.out.println("运行结果:" + i);
12. return i+"task02";
13.         }, executor).runAfterBoth(f1,()->{
14.             System.out.println("123");
15.         });

两任务组合 - 一个完成

当两个任务中,任意一个 future 任务完成的时候,执行任务。

applyToEither:两个任务有一个执行完成,获取它的返回值,处理任务并有新的返回值。

acceptEither:两个任务有一个执行完成,获取它的返回值,处理任务,没有新的返回值。

runAfterEither:两个任务有一个执行完成,不需要获取 future 的结果,处理任务,也没有返 回值。

1.         CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
2.             System.out.println("当前线程:" + Thread.currentThread().getId());
3. int i = 10 / 2;
4.             System.out.println("运行结果:" + i);
5. return i+"task01";
6.         }, executor);
7. 
8.        CompletableFuture.supplyAsync(() -> {
9.             System.out.println("当前线程:" + Thread.currentThread().getId());
10. int i = 10 / 2;
11.             System.out.println("运行结果:" + i);
12. return i+"task02";
13.         }, executor).runAfterEither(f1,()->{
14.            System.out.println("123");
15.        });

多任务组合

allOf:等待所有任务完成

anyOf:只要有一个任务完成

1.         CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
2.             System.out.println("当前线程:" + Thread.currentThread().getId());
3. int i = 10 / 2;
4.             System.out.println("运行结果:" + i);
5. return i+"task01";
6.         }, executor);
7. 
8.         CompletableFuture<Void> f2 = CompletableFuture.supplyAsync(() -> {
9.             System.out.println("当前线程:" + Thread.currentThread().getId());
10. int i = 10 / 2;
11.             System.out.println("运行结果:" + i);
12. return i + "task02";
13.         }, executor).runAfterEither(f1, () -> {
14.             System.out.println("123");
15.         });
16.         CompletableFuture.allOf(f1,f2);


相关文章
|
7月前
|
Java
异步技巧之CompletableFuture
异步技巧之CompletableFuture
76 2
|
2月前
|
Java API
异步任务编排神器CompletableFuture
【10月更文挑战第10天】CompletableFuture是JDK8并发包中引入的强大工具,用于处理复杂的异步任务编排。它提供了丰富的API,支持任务的串行、并行、组合及异常处理,适用于需要高效管理和协调多个异步操作的场景。例如,网页加载时需从多个服务异步获取数据,CompletableFuture可以有效提升性能和响应速度。使用时应注意异常处理和合理选择线程池,以确保程序稳定性和效率。
异步任务编排神器CompletableFuture
|
3月前
|
数据采集 JavaScript Java
CompletableFuture异步编排,你还不会?
本文介绍了同步与异步编程的概念,探讨了在复杂业务场景中使用异步编排的重要性。通过对比 `Future` 与 `CompletableFuture`,详细讲解了 `CompletableFuture` 的多种方法,如 `runAsync`、`supplyAsync`、`whenComplete`、`exceptionally` 等,并展示了如何通过 `CompletableFuture` 实现异步任务的组合与异常处理。最后,通过实战案例演示了如何利用线程池与 `CompletableFuture` 优化商品详情页的查询效率,显著减少响应时间。
CompletableFuture异步编排,你还不会?
|
4月前
|
Java 数据库
异步&线程池 CompletableFuture 异步编排 实战应用 【终结篇】
这篇文章通过一个电商商品详情页的实战案例,展示了如何使用`CompletableFuture`进行异步编排,以解决在不同数据库表中查询商品信息的问题,并提供了详细的代码实现和遇到问题(如图片未显示)的解决方案。
异步&线程池 CompletableFuture 异步编排 实战应用 【终结篇】
|
4月前
|
Java
异步&线程池 CompletableFuture 异步编排 【下篇】
这篇文章深入探讨了Java中的`CompletableFuture`类,解释了如何创建异步操作、使用计算完成时的回调方法、异常处理、串行化方法、任务组合以及多任务组合的使用方式,并通过代码示例展示了各种场景下的应用。
异步&线程池 CompletableFuture 异步编排 【下篇】
|
7月前
|
Java
JAVA线程&线程池&异步编排
JAVA线程&线程池&异步编排
55 0
|
安全 Java
任务编排:CompletableFuture从入门到精通
最近遇到了一个业务场景,涉及到多数据源之间的请求的流程编排,正好看到了一篇某团介绍CompletableFuture原理和使用的技术文章,主要还是涉及使用层面。网上很多文章涉及原理的部分讲的不是特别详细且比较抽象。因为涉及到多线程的工具必须要理解原理,不然一旦遇到问题排查起来就只能凭玄学,正好借此梳理一下CompletableFuture的工作原理
355 0
|
7月前
|
Java
CompletableFuture 异步编排、案例及应用小案例2
CompletableFuture 异步编排、案例及应用小案例
80 0
|
7月前
|
Java
CompletableFuture 异步编排、案例及应用小案例1
CompletableFuture 异步编排、案例及应用小案例
172 0
|
存储 SpringCloudAlibaba Java
Java新特性:异步编排CompletableFuture
CompletableFuture由Java 8提供,是实现异步化的工具类,上手难度较低,且功能强大,支持通过函数式编程的方式对各类操作进行组合编排。 CompletableFuture实现了CompletionStage接口和Future接口,前者是对后者的一个扩展,增加了异步[回调](https://so.csdn.net/so/search?q=回调&spm=1001.2101.3001.7020)、流式处理、多个Future组合处理的能力,使Java在处理多任务的协同工作时更加顺畅便利。
372 1
Java新特性:异步编排CompletableFuture