异步任务编排神器CompletableFuture

简介: 【10月更文挑战第10天】CompletableFuture是JDK8并发包中引入的强大工具,用于处理复杂的异步任务编排。它提供了丰富的API,支持任务的串行、并行、组合及异常处理,适用于需要高效管理和协调多个异步操作的场景。例如,网页加载时需从多个服务异步获取数据,CompletableFuture可以有效提升性能和响应速度。使用时应注意异常处理和合理选择线程池,以确保程序稳定性和效率。

异步任务编排神器CompletableFuture

当需要获取异步任务的结果时,通常可以通过Future接口的get方法来获取结果

但是当异步任务繁多并且复杂,任务间可能存在依赖关系时,Future接口变得不太好用

比如任务A完成后串行执行任务B,等到B、C任务都完成后执行D任务,等到D、E、F任务都完成后汇总结果返回

当遇到复杂的异步任务编排时,Future不太好用,但是在JDK8中并发包推出的CompletableFuture能够很方便的处理这种异步编排任务

image.png

比如在一个页面需要查询多个服务的数据,如果同步查询会导致性能太慢

异步查询多个服务的数据再汇总返回,则能提高更多的性能

API

这里的API只作简单说明,大概分下类,各个分类下具体API的功能可自行查看文档(或者用到时再自行查看文档)

CompletableFuture提供的API大概分为几个大类:

同步与异步、串行、AND、OR、

同步与异步

API携带Async则说明是异步,并且可以设置线程池

一般业务开发,CompletableFuture用于处理IO任务,最好使用异步,并且指定线程池

CompletableFuture<String> taskA = CompletableFuture.supplyAsync(() -> {
   
            System.out.println("task a run");
            return "a";
});

串行

串行执行指的是任务需要同步执行,如图中的A、B任务,需要A任务执行完才能执行B任务

串行API通常以then开头,如:thenRunAsync、thenAccpetAsync、thenApplyAsync

CompletableFuture<String> taskB = taskA.thenApply((s) -> {
   
    System.out.println("task b run");
    return s + "b";
})

AND

AND指的是需要两个任务都完成,才能继续执行后续的任务,比如图中的B、C任务,要都完成才能执行D任务

AND相关API通常以Combine、Both有关,如:thenCombineAsync、thenAcceptBothAsync、runAfterBothAsync

CompletableFuture<String> taskD = taskB.thenCombineAsync(taskC, (b, c) -> {
   
    System.out.println("task d run");
    return b + c;
})

如果依赖多个任务同时完成,可以使用allOf(如图中的D、F、E任务)

CompletableFuture.allOf(taskF,taskE,taskD);

OR

OR指的是两个任务中其中一个完成,就可以继续执行后续任务

OR相关API通常以Either有关:applyToEitherAsync、acceptEitherAsync、runAfterEitherAsync

如果依赖多个任务的OR时使用:CompletableFuture.anyOf

异常处理

任务执行过程中可能出现异常,可以通过exceptionally 、whenComplete、handler等API对异常进行处理

CompletableFuture<String> taskF = CompletableFuture.supplyAsync(() -> {
   
    System.out.println("task f run");
    return "a";
}).exceptionally(e -> {
   
    System.out.println("出现异常");
    throw new RuntimeException("error");
});

注意事项

使用CompletableFuture时需要注意,如果不了解原理容易踩坑:

比如:任务出了异常怎么办?任务如何选择线程池的?线程又是如何执行的?

带着这一系列问题,我们往下看

出了异常怎么办?

使用CompletableFuture进行异步编排任务时,任务可能出现异常,因此必须使用API进行处理

CompletableFuture遇到异常时,可能会使用CompletionException或ExecutionException包装异常

public static void exception() {
   
    CompletableFuture<Void> taskException = CompletableFuture.supplyAsync(() -> {
   
        System.out.println("begin");
        return null;
    });

    taskException
            .thenApply(result -> {
   
                int i = 1 / 0;
                return i;
            })
            .exceptionally(err -> {
   
                //java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
                System.out.println(err);

                //java.lang.ArithmeticException: / by zero
                System.out.println(err.getCause());

                //java.lang.ArithmeticException: / by zero
                //使用工具处理异常
                System.out.println(getException(err));
                return 0;
            });

}

因为异常会被包装,因此处理异常时,最好使用工具类获取异常

public static Throwable getException(Throwable throwable) {
   
    //异常为CompletionException或ExecutionException,并且Cause不为空时解析
    if ((throwable instanceof CompletionException|| throwable instanceof ExecutionException)
            && Objects.nonNull(throwable.getCause())) {
   
        return throwable.getCause();
    }
    return throwable;
}

如何选择线程池?

CompletableFuture中选择线程池有三种情况:

  1. 使用方法时指定线程池
  2. 未指定线程池时,使用ForkJoin的公共线程池 ForkJoinPool.commonPool() (适合CPU任务,最大线程数量 = CPU - 1)
  3. 未指定线程池时,使用 ThreadPerTaskExecutor 每次执行任务时创建一个线程执行 (适合周期长的任务,创建/销毁线程开销大)

当未指定线程池时,可能使用ForkJoin的线程池也可能使用ThreadPerTaskExecutor,在没有查看源码的情况下会容易踩坑

并且 ThreadPerTaskExecutorForkJoinPool.commonPool() 都不适合IO任务

接下来一步步查看源码,分析CompletableFuture什么情况下会选择哪种线程池

CompletableFuture.supplyAsync

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
   
    return asyncSupplyStage(asyncPool, supplier);
}

当我们使用未指定线程池的方法时,会直接使用asyncPool作为线程池

private static final Executor asyncPool = useCommonPool ?
    ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

asyncPool根据useCommonPool来判断是使用 ForkJoinPool.commonPool() 还是使用 ThreadPerTaskExecutor

那么useCommonPool是如何确定的呢?我们继续往下查看

private static final boolean useCommonPool =
    (ForkJoinPool.getCommonPoolParallelism() > 1);

能否使用useCommonPool,由 ForkJoinPool.getCommonPoolParallelism() 决定,当它大于1时则使用 ForkJoinPool.commonPool() 否则使用 ThreadPerTaskExecutor

ForkJoinPool.getCommonPoolParallelism() 返回字段 commonParallelism

static final int commonParallelism;

commonParallelism 用于表示ForkJoinPool的并行粒度,在ForkJoinPool静态代码块中赋值初始化

ForkJoinPool.static

static {
   
    //其他略...

    //创建公共池
    common = java.security.AccessController.doPrivileged
        (new java.security.PrivilegedAction<ForkJoinPool>() {
   
            public ForkJoinPool run() {
    return makeCommonPool(); }});

    //计算并行粒度
    int par = common.config & SMASK; // report 1 even if threads disabled
    commonParallelism = par > 0 ? par : 1;
}

commonParallelism 并发粒度的字段由par决定,而par = common.config & SMASK

其中SMASK为65535(十进制),其二进制为全1,因此由 common 的字段 config 决定

(在创建公共池的过程会设置config字段)

ForkJoinPool.makeCommonPool

在创建公共池的代码中主要观察变量 parallelism 它为并发粒度

如果不携带参数,默认情况下并发粒度为CPU核数-1

private static ForkJoinPool makeCommonPool() {
   

    final ForkJoinWorkerThreadFactory commonPoolForkJoinWorkerThreadFactory =
            new CommonPoolForkJoinWorkerThreadFactory();
    //初始化并发粒度为-1
    int parallelism = -1;
    ForkJoinWorkerThreadFactory factory = null;
    UncaughtExceptionHandler handler = null;
    try {
     // ignore exceptions in accessing/parsing properties
        String pp = System.getProperty
            ("java.util.concurrent.ForkJoinPool.common.parallelism");
        String fp = System.getProperty
            ("java.util.concurrent.ForkJoinPool.common.threadFactory");
        String hp = System.getProperty
            ("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
        if (pp != null)
            //如果携带启动参数则设置为对应的并发粒度
            parallelism = Integer.parseInt(pp);
        if (fp != null)
            factory = ((ForkJoinWorkerThreadFactory)ClassLoader.
                       getSystemClassLoader().loadClass(fp).newInstance());
        if (hp != null)
            handler = ((UncaughtExceptionHandler)ClassLoader.
                       getSystemClassLoader().loadClass(hp).newInstance());
    } catch (Exception ignore) {
   
    }
    if (factory == null) {
   
        if (System.getSecurityManager() == null)
            factory = commonPoolForkJoinWorkerThreadFactory;
        else // use security-managed default
            factory = new InnocuousForkJoinWorkerThreadFactory();
    }
    if (parallelism < 0 && // default 1 less than #cores
        //默认情况下并发粒度 = CPU核数 - 1
        (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
        parallelism = 1;
    if (parallelism > MAX_CAP)
        parallelism = MAX_CAP;
    return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
                            "ForkJoinPool.commonPool-worker-");
}

在构建对象时,config字段 this.config = (parallelism & SMASK) | mode

其中SMASK为全1,mode为0,得到的结果是不变的,因此config的值就是parallelism并发粒度

至此我们可以得出结论:默认情况下,如果不指定线程池,当CPU核数-1超过1则会使用ForkJoin公共池(最大线程数量 = CPU核数 - 1),否则使用ThreadPerTaskExecutor(每次执行都创建线程执行)

static final class ThreadPerTaskExecutor implements Executor {
   
    public void execute(Runnable r) {
    new Thread(r).start(); }
}

ThreadPerTaskExecutor只适合执行周期长的任务,如果任务周期短,并且多的情况下,创建线程也会是很大一笔开销

使用CompletableFuture时务必指定线程池,线程池最好根据业务做好隔离

如果不指定线程池会根据CPU核数选择ForkJoinCommonPool或ThreadPerTaskExecutor,它们并不适合IO任务

线程如何执行?

在同步与异步的API中线程如何执行?

在异步的API中,如果指定线程池则交给线程池中的工作线程执行,否则选择Common Pool或ThreadPerTaskExecutor

在同步的API中,通常是当前线程进行执行任务,但如果任务B依赖的任务A未完成则由任务A的回调线程执行,任务A如果是异步则由线程池来执行

public static void testSync() {
   
        CompletableFuture<String> taskA = CompletableFuture.supplyAsync(() -> {
   
//            try {
   
//                Thread.sleep(5000);
//            } catch (InterruptedException e) {
   
//                throw new RuntimeException(e);
//            }
            return "ok";
        }, threadPool);


        CompletableFuture<Void> taskB = taskA.thenAccept(s -> {
   
            //任务A执行完(不睡时)由当前线程执行
            //任务A未执行完(睡眠时)由线程池的工作线程执行
            System.out.println(s);
            System.out.println(s);
        });

        taskB.join();
}

总结

CompletableFuture提供串行、AND、OR、异常捕获、结果聚合等多种API,通过这些API能够更方便、快捷的实现异步任务的编排

使用CompletableFuture时务必对任务进行异常处理,并且它会使用CompletionException或ExecutionException包装异常,再打印异常时记得使用工具类处理,避免打印到包装的异常

CompletableFuture异步任务中如果指定线程池则直接使用指定的线程池

如果未指定线程池,当前服务器CPU数量小于等于2(并发粒度低)时使用ThreadPerTaskExecutor,其他情况(并发粒度高)使用ForkJoin框架的common pool(并发粒度 = CPU数量 - 1)

未指定线程池时使用的线程池适合CPU任务,并不适合IO任务,使用异步时务必指定线程池

当使用异步API时,由线程池的工作线程执行;使用同步API时,如果当前任务依赖的任务未完成,则有依赖、未完成的任务的线程来执行

🌠最后(一键三连求求拉~)

本篇文章被收入专栏 由点到线,由线到面,深入浅出构建Java并发编程知识体系,感兴趣的同学可以持续关注喔

本篇文章笔记以及案例被收入 Gitee-CaiCaiJavaGithub-CaiCaiJava,除此之外还有更多Java进阶相关知识,感兴趣的同学可以starred持续关注喔~

有什么问题可以在评论区交流,如果觉得菜菜写的不错,可以点赞、关注、收藏支持一下~

关注菜菜,分享更多技术干货,公众号:菜菜的后端私房菜

相关文章
|
4月前
|
数据采集 JavaScript Java
CompletableFuture异步编排,你还不会?
本文介绍了同步与异步编程的概念,探讨了在复杂业务场景中使用异步编排的重要性。通过对比 `Future` 与 `CompletableFuture`,详细讲解了 `CompletableFuture` 的多种方法,如 `runAsync`、`supplyAsync`、`whenComplete`、`exceptionally` 等,并展示了如何通过 `CompletableFuture` 实现异步任务的组合与异常处理。最后,通过实战案例演示了如何利用线程池与 `CompletableFuture` 优化商品详情页的查询效率,显著减少响应时间。
100 3
CompletableFuture异步编排,你还不会?
|
5月前
|
Java 数据库
异步&线程池 CompletableFuture 异步编排 实战应用 【终结篇】
这篇文章通过一个电商商品详情页的实战案例,展示了如何使用`CompletableFuture`进行异步编排,以解决在不同数据库表中查询商品信息的问题,并提供了详细的代码实现和遇到问题(如图片未显示)的解决方案。
异步&线程池 CompletableFuture 异步编排 实战应用 【终结篇】
|
5月前
|
Java
异步&线程池 CompletableFuture 异步编排 【下篇】
这篇文章深入探讨了Java中的`CompletableFuture`类,解释了如何创建异步操作、使用计算完成时的回调方法、异常处理、串行化方法、任务组合以及多任务组合的使用方式,并通过代码示例展示了各种场景下的应用。
异步&线程池 CompletableFuture 异步编排 【下篇】
|
安全 Java
任务编排:CompletableFuture从入门到精通
最近遇到了一个业务场景,涉及到多数据源之间的请求的流程编排,正好看到了一篇某团介绍CompletableFuture原理和使用的技术文章,主要还是涉及使用层面。网上很多文章涉及原理的部分讲的不是特别详细且比较抽象。因为涉及到多线程的工具必须要理解原理,不然一旦遇到问题排查起来就只能凭玄学,正好借此梳理一下CompletableFuture的工作原理
361 0
|
8月前
|
Java
CompletableFuture 异步编排、案例及应用小案例1
CompletableFuture 异步编排、案例及应用小案例
174 0
|
8月前
|
Java
CompletableFuture 异步编排、案例及应用小案例2
CompletableFuture 异步编排、案例及应用小案例
87 0
|
消息中间件 存储 Java
一网打尽异步神器CompletableFuture
最近一直畅游在RocketMQ的源码中,发现在RocketMQ中很多地方都使用到了CompletableFuture,所以今天就跟大家来聊一聊JDK1.8提供的异步神器CompletableFuture,并且最后会结合RocketMQ源码分析一下CompletableFuture的使用。
|
设计模式 JavaScript 前端开发
CompletableFuture 异步编排
CompletableFuture 异步编排
|
Java API
CompletableFuture实现异步编排
场景:电商系统中获取一个完整的商品信息可能分为以下几步:①获取商品基本信息 ②获取商品图片信息 ③获取商品促销活动信息 ④获取商品各种类的基本信息 等操作,如果使用串行方式去执行这些操作,假设每个操作执行1s,那么用户看到完整的商品详情就需要4s的时间,如果使用并行方式执行这些操作,可能只需要1s就可以完成。所以这就是异步执行的好处。
174 0
CompletableFuture实现异步编排
|
Java
简述CompletableFuture异步任务编排(上)
简述CompletableFuture异步任务编排
384 0
简述CompletableFuture异步任务编排(上)