CompletableFuture 异步编排

简介: CompletableFuture 异步编排

业务场景

查询商品详情页的逻辑比较复杂,有些数据还需要远程调用,必然需要花费更多的时间。

假如商品详情页的每个查询,需要如下标注的时间才能完成 那么,用户需要 5.5s 后才能看到商品详情页的内容。很显然是不能接受的。 如果有多个线程同时完成这 6 步操作,也许只需要 1.5s 即可完成响应。

需求产生

Future 是 Java 5 添加的类,用来描述一个异步计算的结果。你可以使用`isDone`方法检查计算是否完成,或者使用`get`阻塞住调用线程,直到计算完成返回结果,你也可以使用`cancel`方法停止任务的执行。

虽然`Future`以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不 方便,只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和我们的异步编程的 初衷相违背,轮询的方式又会耗费无谓的 CPU 资源,而且也不能及时地得到计算结果,为 什么不能用观察者设计模式当计算结果完成及时通知监听者呢?

很多语言,比如 Node.js,采用回调的方式实现异步编程。Java 的一些框架,比如 Netty,自 己扩展了 Java 的 `Future`接口,提供了`addListener`等多个扩展方法;Google guava 也提供了 通用的扩展 Future;Scala 也提供了简单易用且功能强大的 Future/Promise 异步编程模式。

作为正统的 Java 类库,是不是应该做点什么,加强一下自身库的功能呢?

在 Java 8 中, 新增加了一个包含 50 个方法左右的类: CompletableFuture,提供了非常强大的Future 的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以 通过回调的方式处理计算结果,并且提供了转换和组合 CompletableFuture 的方法。

CompletableFuture 类实现了 Future 接口,所以你还是可以像以前一样通过`get`方法阻塞或 者轮询的方式获得结果,但是这种方式不推荐使用。

CompletableFuture 和 FutureTask 同属于 Future 接口的实现类,都可以获取线程的执行结果。

创建异步对象

CompletableFuture 提供了四个静态方法来创建一个异步操作。

1、runXxxx 都是没有返回结果的,supplyXxx 都是可以获取返回结果的

2、可以传入自定义的线程池,否则就用默认的线程池;

public static  ExecutorService executor = Executors.newFixedThreadPool(10);

1.         CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
2.             System.out.println("当前线程:" + Thread.currentThread().getId());
3. int i = 10 / 2;
4.             System.out.println("运行结果:" + i);
5.         }, executor);
1.         CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
2.             System.out.println("当前线程:" + Thread.currentThread().getId());
3. int i = 10 / 2;
4.             System.out.println("运行结果:" + i);
5. return i;
6.         }, executor);
7. Integer integer = future.get();

计算完成时回调方法

whenComplete 可以处理正常和异常的计算结果,exceptionally 处理异常情况。

whenComplete 和 whenCompleteAsync 的区别:

whenComplete:是执行当前任务的线程执行继续执行 whenComplete 的任务。

whenCompleteAsync:是执行把 whenCompleteAsync 这个任务继续提交给线程池 来进行执行。

方法不以 Async 结尾,意味着 Action 使用相同的线程执行,而 Async 可能会使用其他线程 执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)

1.         CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
2.             System.out.println("当前线程:" + Thread.currentThread().getId());
3. int i = 10 / 2;
4.             System.out.println("运行结果:" + i);
5. return i;
6.         }, executor).whenComplete((res,exception)->{
7. //虽然能得到异常信息,但不能修改返回结果
8.             System.out.println("异步任务完成...结果是"+res+";异常是:"+exception);
9.         }).exceptionally(throwable -> {
10. //可以感知异常,同时返回结果
11. return  10;
12.         });

handle 方法

和 complete 一样,可对结果做最后的处理(可处理异常),可改变返回值。

1.         CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
2.             System.out.println("当前线程:" + Thread.currentThread().getId());
3. int i = 10 / 2;
4.             System.out.println("运行结果:" + i);
5. return i;
6.         }).handle((res,thr)->{
7. if(res!=null){
8. return  res*2;
9.             }else {
10. return  0;
11.             }
12.         });

线程串行化方法

带有 Async 默认是异步执行的。同之前。

thenApply 方法:当一个线程依赖另一个线程时,获取上一个任务返回的结果,并返回当前 任务的返回值。

1. 
2.         CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
3.             System.out.println("当前线程:" + Thread.currentThread().getId());
4. int i = 10 / 2;
5.             System.out.println("运行结果:" + i);
6. return i;
7.         }, executor).thenApplyAsync((res) -> {
8. return res + "hello";
9.         }, executor);
10. String s = f2.get();

       /**
        * thenApplyAsync:能接收上一步的结果,有返回值
        * .thenApplyAsync((res) -> {
        *             return res + "hello";
        *         }, executor);
        */

thenAccept 方法:消费处理结果。接收任务的处理结果,并消费处理,无返回结果。

1. 
2.         CompletableFuture.supplyAsync(() -> {
3.             System.out.println("当前线程:" + Thread.currentThread().getId());
4. int i = 10 / 2;
5.             System.out.println("运行结果:" + i);
6. return i;
7.         },executor).thenAcceptAsync((res)->{
8.             System.out.println("任务2开始启动");
9.             System.out.println(res);
10.         });

       /**
        * thenAcceptAsync:能接收上一步的结果,但是无返回值
        * thenAcceptAsync((res)->{
        *             System.out.println("任务2开始启动");
        *             System.out.println(res);
        *         })
        */

thenRun 方法:只要上面的任务执行完成,就开始执行 thenRun,只是处理完任务后,执行thenRun 的后续操作

1. 
2.         CompletableFuture<Void> future1 = CompletableFuture.supplyAsync(() -> {
3.             System.out.println("当前线程:" + Thread.currentThread().getId());
4. int i = 10 / 2;
5.             System.out.println("运行结果:" + i);
6. return i;
7.         },executor).thenRunAsync(()->{
8.             System.out.println("任务2开始执行了...");
9.         });

       /**
        *      thenRun:不能获取到上一步的执行结果 ,无返回值
        *          .thenRunAsync(()->{
        *             System.out.println("任务2开始执行了...");
        *         });
        */

以上都要前置任务成功完成。

Function<? super T,? extends U>

T:上一个任务返回结果的类型

U:当前任务的返回值类型

两任务组合 - 都要完成

两个任务必须都完成,触发该任务。

thenCombine:组合两个 future,获取两个 future 的返回结果,并返回当前任务的返回值

thenAcceptBoth:组合两个 future,获取两个 future 任务的返回结果,然后处理任务,没有 返回值。

runAfterBoth:组合两个 future,不需要获取 future 的结果,只需两个 future 处理完任务后, 处理该任务。

1.         CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
2.             System.out.println("当前线程:" + Thread.currentThread().getId());
3. int i = 10 / 2;
4.             System.out.println("运行结果:" + i);
5. return i+"task01";
6.         }, executor);
7. 
8.        CompletableFuture.supplyAsync(() -> {
9.             System.out.println("当前线程:" + Thread.currentThread().getId());
10. int i = 10 / 2;
11.             System.out.println("运行结果:" + i);
12. return i+"task02";
13.         }, executor).runAfterBoth(f1,()->{
14.             System.out.println("123");
15.         });

两任务组合 - 一个完成

当两个任务中,任意一个 future 任务完成的时候,执行任务。

applyToEither:两个任务有一个执行完成,获取它的返回值,处理任务并有新的返回值。

acceptEither:两个任务有一个执行完成,获取它的返回值,处理任务,没有新的返回值。

runAfterEither:两个任务有一个执行完成,不需要获取 future 的结果,处理任务,也没有返 回值。

1.         CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
2.             System.out.println("当前线程:" + Thread.currentThread().getId());
3. int i = 10 / 2;
4.             System.out.println("运行结果:" + i);
5. return i+"task01";
6.         }, executor);
7. 
8.        CompletableFuture.supplyAsync(() -> {
9.             System.out.println("当前线程:" + Thread.currentThread().getId());
10. int i = 10 / 2;
11.             System.out.println("运行结果:" + i);
12. return i+"task02";
13.         }, executor).runAfterEither(f1,()->{
14.            System.out.println("123");
15.        });

多任务组合

allOf:等待所有任务完成

anyOf:只要有一个任务完成

1.         CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
2.             System.out.println("当前线程:" + Thread.currentThread().getId());
3. int i = 10 / 2;
4.             System.out.println("运行结果:" + i);
5. return i+"task01";
6.         }, executor);
7. 
8.         CompletableFuture<Void> f2 = CompletableFuture.supplyAsync(() -> {
9.             System.out.println("当前线程:" + Thread.currentThread().getId());
10. int i = 10 / 2;
11.             System.out.println("运行结果:" + i);
12. return i + "task02";
13.         }, executor).runAfterEither(f1, () -> {
14.             System.out.println("123");
15.         });
16.         CompletableFuture.allOf(f1,f2);


相关文章
|
1月前
|
Java
异步技巧之CompletableFuture
异步技巧之CompletableFuture
43 2
|
22天前
|
Java
JAVA线程&线程池&异步编排
JAVA线程&线程池&异步编排
25 0
|
4月前
|
Java
CompletableFuture 异步编排、案例及应用小案例2
CompletableFuture 异步编排、案例及应用小案例
31 0
|
4月前
|
Java
CompletableFuture 异步编排、案例及应用小案例1
CompletableFuture 异步编排、案例及应用小案例
55 0
|
6月前
|
Java 数据库 数据安全/隐私保护
【CompletableFuture事件驱动异步回调】
【CompletableFuture事件驱动异步回调】
|
安全 Java
任务编排:CompletableFuture从入门到精通
最近遇到了一个业务场景,涉及到多数据源之间的请求的流程编排,正好看到了一篇某团介绍CompletableFuture原理和使用的技术文章,主要还是涉及使用层面。网上很多文章涉及原理的部分讲的不是特别详细且比较抽象。因为涉及到多线程的工具必须要理解原理,不然一旦遇到问题排查起来就只能凭玄学,正好借此梳理一下CompletableFuture的工作原理
286 0
|
9月前
|
存储 SpringCloudAlibaba Java
Java新特性:异步编排CompletableFuture
CompletableFuture由Java 8提供,是实现异步化的工具类,上手难度较低,且功能强大,支持通过函数式编程的方式对各类操作进行组合编排。 CompletableFuture实现了CompletionStage接口和Future接口,前者是对后者的一个扩展,增加了异步[回调](https://so.csdn.net/so/search?q=回调&spm=1001.2101.3001.7020)、流式处理、多个Future组合处理的能力,使Java在处理多任务的协同工作时更加顺畅便利。
172 1
Java新特性:异步编排CompletableFuture
|
9月前
|
JavaScript Java UED
CompletableFuture 异步处理
CompletableFuture 异步处理
80 0
|
Java API
CompletableFuture实现异步编排
场景:电商系统中获取一个完整的商品信息可能分为以下几步:①获取商品基本信息 ②获取商品图片信息 ③获取商品促销活动信息 ④获取商品各种类的基本信息 等操作,如果使用串行方式去执行这些操作,假设每个操作执行1s,那么用户看到完整的商品详情就需要4s的时间,如果使用并行方式执行这些操作,可能只需要1s就可以完成。所以这就是异步执行的好处。
132 0
CompletableFuture实现异步编排
|
Java
简述CompletableFuture异步任务编排(上)
简述CompletableFuture异步任务编排
308 0
简述CompletableFuture异步任务编排(上)

热门文章

最新文章