CompletableFuture异步编排

简介:

CompletableFuture异步编排
什么是CompletableFuture#
CompletableFuture是JDK8提供的Future增强类。CompletableFuture异步任务执行线程池,默认是把异步任务都放在ForkJoinPool中执行。

在这种方式中,主线程不会被阻塞,不需要一直等到子线程完成。主线程可以并行的执行其他任务。

Future存在的问题#
Future实际采用FutureTask实现,该对象相当于是消费者和生产者的桥梁,消费者通过 FutureTask 存储任务的处理结果,更新任务的状态:未开始、正在处理、已完成等。而生产者拿到的 FutureTask 被转型为 Future 接口,可以阻塞式获取任务的处理结果,非阻塞式获取任务处理状态。

使用#
runAsync 和 supplyAsync方法#
CompletableFuture 提供了四个静态方法来创建一个异步操作。

Copy
public static CompletableFuture runAsync(Runnable runnable)
public static CompletableFuture runAsync(Runnable runnable, Executor executor)
public static CompletableFuture supplyAsync(Supplier supplier)
public static CompletableFuture supplyAsync(Supplier supplier, Executor executor)
没有指定Executor的方法会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码。如果指定线程池,则使用指定的线程池运行。以下所有的方法都类同。

runAsync方法不支持返回值。
supplyAsync可以支持返回值。
计算完成时回调方法#
当CompletableFuture的计算结果完成,或者抛出异常的时候,可以执行特定的Action。主要是下面的方法:

Copy
public CompletableFuture whenComplete(BiConsumer<? super T,? super Throwable> action);
public CompletableFuture whenCompleteAsync(BiConsumer<? super T,? super Throwable> action);
public CompletableFuture whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor);

public CompletableFuture exceptionally(Function fn);
whenComplete可以处理正常和异常的计算结果,exceptionally处理异常情况。BiConsumer<? super T,? super Throwable>可以定义处理业务

whenComplete 和 whenCompleteAsync 的区别:
whenComplete:是执行当前任务的线程执行继续执行 whenComplete 的任务。
whenCompleteAsync:是执行把 whenCompleteAsync 这个任务继续提交给线程池来进行执行。

方法不以Async结尾,意味着Action使用相同的线程执行,而Async可能会使用其他线程执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)

代码示例:

Copy
public class CompletableFutureDemo {

public static void main(String[] args) throws ExecutionException, InterruptedException {
    CompletableFuture future = CompletableFuture.supplyAsync(new Supplier<Object>() {
        @Override
        public Object get() {
            System.out.println(Thread.currentThread().getName() + "\t completableFuture");
            int i = 10 / 0;
            return 1024;
        }
    }).whenComplete(new BiConsumer<Object, Throwable>() {
        @Override
        public void accept(Object o, Throwable throwable) {
            System.out.println("-------o=" + o.toString());
            System.out.println("-------throwable=" + throwable);
        }
    }).exceptionally(new Function<Throwable, Object>() {
        @Override
        public Object apply(Throwable throwable) {
            System.out.println("throwable=" + throwable);
            return 6666;
        }
    });
    System.out.println(future.get());
}

}
handle 方法#
handle 是执行任务完成时对结果的处理。
handle 是在任务完成后再执行,还可以处理异常的任务。

Copy
public CompletionStage handle(BiFunction<? super T, Throwable, ? extends U> fn);
public CompletionStage handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public CompletionStage handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);
线程串行化方法#
thenApply 方法:当一个线程依赖另一个线程时,获取上一个任务返回的结果,并返回当前任务的返回值。

thenAccept方法:消费处理结果。接收任务的处理结果,并消费处理,无返回结果。

thenRun方法:只要上面的任务执行完成,就开始执行thenRun,只是处理完任务后,执行 thenRun的后续操作

带有Async默认是异步执行的。这里所谓的异步指的是不在当前线程内执行。

Copy
public CompletableFuture thenApply(Function<? super T,? extends U> fn)
public CompletableFuture thenApplyAsync(Function<? super T,? extends U> fn)
public CompletableFuture thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

public CompletionStage thenAccept(Consumer<? super T> action);
public CompletionStage thenAcceptAsync(Consumer<? super T> action);
public CompletionStage thenAcceptAsync(Consumer<? super T> action,Executor executor);

public CompletionStage thenRun(Runnable action);
public CompletionStage thenRunAsync(Runnable action);
public CompletionStage thenRunAsync(Runnable action,Executor executor);
Function<? super T,? extends U>
T:上一个任务返回结果的类型
U:当前任务的返回值类型

代码演示:

Copy
public static void main(String[] args) throws ExecutionException, InterruptedException {

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(new Supplier<Integer>() {
    @Override
    public Integer get() {
        System.out.println(Thread.currentThread().getName() + "\t completableFuture");
        //int i = 10 / 0;
        return 1024;
    }
}).thenApply(new Function<Integer, Integer>() {
    @Override
    public Integer apply(Integer o) {
        System.out.println("thenApply方法,上次返回结果:" + o);
        return  o * 2;
    }
}).whenComplete(new BiConsumer<Integer, Throwable>() {
    @Override
    public void accept(Integer o, Throwable throwable) {
        System.out.println("-------o=" + o);
        System.out.println("-------throwable=" + throwable);
    }
}).exceptionally(new Function<Throwable, Integer>() {
    @Override
    public Integer apply(Throwable throwable) {
        System.out.println("throwable=" + throwable);
        return 6666;
    }
}).handle(new BiFunction<Integer, Throwable, Integer>() {
    @Override
    public Integer apply(Integer integer, Throwable throwable) {
        System.out.println("handle o=" + integer);
        System.out.println("handle throwable=" + throwable);
        return 8888;
    }
});
System.out.println(future.get());

}
两任务组合 - 都要完成#
两个任务必须都完成,触发该任务。

thenCombine:组合两个future,获取两个future的返回结果,并返回当前任务的返回值

thenAcceptBoth:组合两个future,获取两个future任务的返回结果,然后处理任务,没有返回值。

runAfterBoth:组合两个future,不需要获取future的结果,只需两个future处理完任务后,处理该任务。

Copy
public CompletableFuture thenCombine(

CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn);

public CompletableFuture thenCombineAsync(

CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn);

public CompletableFuture thenCombineAsync(

CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn, Executor executor);

public CompletableFuture thenAcceptBoth(

CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action);

public CompletableFuture thenAcceptBothAsync(

CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action);

public CompletableFuture thenAcceptBothAsync(

CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action, Executor executor);

public CompletableFuture runAfterBoth(CompletionStage<?> other,

                                        Runnable action);

public CompletableFuture runAfterBothAsync(CompletionStage<?> other,

                                             Runnable action);

public CompletableFuture runAfterBothAsync(CompletionStage<?> other,

                                             Runnable action,
                                             Executor executor);

测试案例:

Copy
public static void main(String[] args) {

CompletableFuture.supplyAsync(() -> {
    return "hello";
}).thenApplyAsync(t -> {
    return t + " world!";
}).thenCombineAsync(CompletableFuture.completedFuture(" CompletableFuture"), (t, u) -> {
    return t + u;
}).whenComplete((t, u) -> {
    System.out.println(t);
});

}
输出:hello world! CompletableFuture

两任务组合 - 一个完成#
当两个任务中,任意一个future任务完成的时候,执行任务。

applyToEither:两个任务有一个执行完成,获取它的返回值,处理任务并有新的返回值。

acceptEither:两个任务有一个执行完成,获取它的返回值,处理任务,没有新的返回值。

runAfterEither:两个任务有一个执行完成,不需要获取future的结果,处理任务,也没有返回值。

Copy
public CompletableFuture applyToEither(

CompletionStage<? extends T> other, Function<? super T, U> fn);

public CompletableFuture applyToEitherAsync(

CompletionStage<? extends T> other, Function<? super T, U> fn);

public CompletableFuture applyToEitherAsync(

CompletionStage<? extends T> other, Function<? super T, U> fn,
Executor executor);

public CompletableFuture acceptEither(

CompletionStage<? extends T> other, Consumer<? super T> action);

public CompletableFuture acceptEitherAsync(

CompletionStage<? extends T> other, Consumer<? super T> action);

public CompletableFuture acceptEitherAsync(

CompletionStage<? extends T> other, Consumer<? super T> action,
Executor executor);

public CompletableFuture runAfterEither(CompletionStage<?> other,

                                          Runnable action);

public CompletableFuture runAfterEitherAsync(CompletionStage<?> other,

                                               Runnable action);

public CompletableFuture runAfterEitherAsync(CompletionStage<?> other,

                                               Runnable action,
                                               Executor executor);

多任务组合#
Copy
public static CompletableFuture allOf(CompletableFuture<?>... cfs);

public static CompletableFuture

anyOf:只要有一个任务完成

Copy
public static void main(String[] args) {

List<CompletableFuture> futures = Arrays.asList(CompletableFuture.completedFuture("hello"),
                                                CompletableFuture.completedFuture(" world!"),
                                                CompletableFuture.completedFuture(" hello"),
                                                CompletableFuture.completedFuture("java!"));
final CompletableFuture<Void> allCompleted = CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{}));
allCompleted.thenRun(() -> {
    futures.stream().forEach(future -> {
        try {
            System.out.println("get future at:"+System.currentTimeMillis()+", result:"+future.get());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    });
});

}
作者: ingxx

出处:https://www.cnblogs.com/ingxx/p/12598414.html

相关文章
|
6月前
|
Java
异步技巧之CompletableFuture
异步技巧之CompletableFuture
74 2
|
1月前
|
Java API
异步任务编排神器CompletableFuture
【10月更文挑战第10天】CompletableFuture是JDK8并发包中引入的强大工具,用于处理复杂的异步任务编排。它提供了丰富的API,支持任务的串行、并行、组合及异常处理,适用于需要高效管理和协调多个异步操作的场景。例如,网页加载时需从多个服务异步获取数据,CompletableFuture可以有效提升性能和响应速度。使用时应注意异常处理和合理选择线程池,以确保程序稳定性和效率。
异步任务编排神器CompletableFuture
|
2月前
|
数据采集 JavaScript Java
CompletableFuture异步编排,你还不会?
本文介绍了同步与异步编程的概念,探讨了在复杂业务场景中使用异步编排的重要性。通过对比 `Future` 与 `CompletableFuture`,详细讲解了 `CompletableFuture` 的多种方法,如 `runAsync`、`supplyAsync`、`whenComplete`、`exceptionally` 等,并展示了如何通过 `CompletableFuture` 实现异步任务的组合与异常处理。最后,通过实战案例演示了如何利用线程池与 `CompletableFuture` 优化商品详情页的查询效率,显著减少响应时间。
CompletableFuture异步编排,你还不会?
|
3月前
|
Java 数据库
异步&线程池 CompletableFuture 异步编排 实战应用 【终结篇】
这篇文章通过一个电商商品详情页的实战案例,展示了如何使用`CompletableFuture`进行异步编排,以解决在不同数据库表中查询商品信息的问题,并提供了详细的代码实现和遇到问题(如图片未显示)的解决方案。
异步&线程池 CompletableFuture 异步编排 实战应用 【终结篇】
|
3月前
|
Java
异步&线程池 CompletableFuture 异步编排 【下篇】
这篇文章深入探讨了Java中的`CompletableFuture`类,解释了如何创建异步操作、使用计算完成时的回调方法、异常处理、串行化方法、任务组合以及多任务组合的使用方式,并通过代码示例展示了各种场景下的应用。
异步&线程池 CompletableFuture 异步编排 【下篇】
|
安全 Java
任务编排:CompletableFuture从入门到精通
最近遇到了一个业务场景,涉及到多数据源之间的请求的流程编排,正好看到了一篇某团介绍CompletableFuture原理和使用的技术文章,主要还是涉及使用层面。网上很多文章涉及原理的部分讲的不是特别详细且比较抽象。因为涉及到多线程的工具必须要理解原理,不然一旦遇到问题排查起来就只能凭玄学,正好借此梳理一下CompletableFuture的工作原理
349 0
|
6月前
|
Java
CompletableFuture 异步编排、案例及应用小案例1
CompletableFuture 异步编排、案例及应用小案例
164 0
|
6月前
|
Java
CompletableFuture 异步编排、案例及应用小案例2
CompletableFuture 异步编排、案例及应用小案例
78 0
|
Java 数据库 数据安全/隐私保护
【CompletableFuture事件驱动异步回调】
【CompletableFuture事件驱动异步回调】
|
设计模式 JavaScript 前端开发
CompletableFuture 异步编排
CompletableFuture 异步编排