异步任务编排神器CompletableFuture

简介: 【10月更文挑战第10天】CompletableFuture是JDK8并发包中引入的强大工具,用于处理复杂的异步任务编排。它提供了丰富的API,支持任务的串行、并行、组合及异常处理,适用于需要高效管理和协调多个异步操作的场景。例如,网页加载时需从多个服务异步获取数据,CompletableFuture可以有效提升性能和响应速度。使用时应注意异常处理和合理选择线程池,以确保程序稳定性和效率。

异步任务编排神器CompletableFuture

当需要获取异步任务的结果时,通常可以通过Future接口的get方法来获取结果

但是当异步任务繁多并且复杂,任务间可能存在依赖关系时,Future接口变得不太好用

比如任务A完成后串行执行任务B,等到B、C任务都完成后执行D任务,等到D、E、F任务都完成后汇总结果返回

当遇到复杂的异步任务编排时,Future不太好用,但是在JDK8中并发包推出的CompletableFuture能够很方便的处理这种异步编排任务

image.png

比如在一个页面需要查询多个服务的数据,如果同步查询会导致性能太慢

异步查询多个服务的数据再汇总返回,则能提高更多的性能

API

这里的API只作简单说明,大概分下类,各个分类下具体API的功能可自行查看文档(或者用到时再自行查看文档)

CompletableFuture提供的API大概分为几个大类:

同步与异步、串行、AND、OR、

同步与异步

API携带Async则说明是异步,并且可以设置线程池

一般业务开发,CompletableFuture用于处理IO任务,最好使用异步,并且指定线程池

CompletableFuture<String> taskA = CompletableFuture.supplyAsync(() -> {
   
            System.out.println("task a run");
            return "a";
});

串行

串行执行指的是任务需要同步执行,如图中的A、B任务,需要A任务执行完才能执行B任务

串行API通常以then开头,如:thenRunAsync、thenAccpetAsync、thenApplyAsync

CompletableFuture<String> taskB = taskA.thenApply((s) -> {
   
    System.out.println("task b run");
    return s + "b";
})

AND

AND指的是需要两个任务都完成,才能继续执行后续的任务,比如图中的B、C任务,要都完成才能执行D任务

AND相关API通常以Combine、Both有关,如:thenCombineAsync、thenAcceptBothAsync、runAfterBothAsync

CompletableFuture<String> taskD = taskB.thenCombineAsync(taskC, (b, c) -> {
   
    System.out.println("task d run");
    return b + c;
})

如果依赖多个任务同时完成,可以使用allOf(如图中的D、F、E任务)

CompletableFuture.allOf(taskF,taskE,taskD);

OR

OR指的是两个任务中其中一个完成,就可以继续执行后续任务

OR相关API通常以Either有关:applyToEitherAsync、acceptEitherAsync、runAfterEitherAsync

如果依赖多个任务的OR时使用:CompletableFuture.anyOf

异常处理

任务执行过程中可能出现异常,可以通过exceptionally 、whenComplete、handler等API对异常进行处理

CompletableFuture<String> taskF = CompletableFuture.supplyAsync(() -> {
   
    System.out.println("task f run");
    return "a";
}).exceptionally(e -> {
   
    System.out.println("出现异常");
    throw new RuntimeException("error");
});

注意事项

使用CompletableFuture时需要注意,如果不了解原理容易踩坑:

比如:任务出了异常怎么办?任务如何选择线程池的?线程又是如何执行的?

带着这一系列问题,我们往下看

出了异常怎么办?

使用CompletableFuture进行异步编排任务时,任务可能出现异常,因此必须使用API进行处理

CompletableFuture遇到异常时,可能会使用CompletionException或ExecutionException包装异常

public static void exception() {
   
    CompletableFuture<Void> taskException = CompletableFuture.supplyAsync(() -> {
   
        System.out.println("begin");
        return null;
    });

    taskException
            .thenApply(result -> {
   
                int i = 1 / 0;
                return i;
            })
            .exceptionally(err -> {
   
                //java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
                System.out.println(err);

                //java.lang.ArithmeticException: / by zero
                System.out.println(err.getCause());

                //java.lang.ArithmeticException: / by zero
                //使用工具处理异常
                System.out.println(getException(err));
                return 0;
            });

}

因为异常会被包装,因此处理异常时,最好使用工具类获取异常

public static Throwable getException(Throwable throwable) {
   
    //异常为CompletionException或ExecutionException,并且Cause不为空时解析
    if ((throwable instanceof CompletionException|| throwable instanceof ExecutionException)
            && Objects.nonNull(throwable.getCause())) {
   
        return throwable.getCause();
    }
    return throwable;
}

如何选择线程池?

CompletableFuture中选择线程池有三种情况:

  1. 使用方法时指定线程池
  2. 未指定线程池时,使用ForkJoin的公共线程池 ForkJoinPool.commonPool() (适合CPU任务,最大线程数量 = CPU - 1)
  3. 未指定线程池时,使用 ThreadPerTaskExecutor 每次执行任务时创建一个线程执行 (适合周期长的任务,创建/销毁线程开销大)

当未指定线程池时,可能使用ForkJoin的线程池也可能使用ThreadPerTaskExecutor,在没有查看源码的情况下会容易踩坑

并且 ThreadPerTaskExecutorForkJoinPool.commonPool() 都不适合IO任务

接下来一步步查看源码,分析CompletableFuture什么情况下会选择哪种线程池

CompletableFuture.supplyAsync

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
   
    return asyncSupplyStage(asyncPool, supplier);
}

当我们使用未指定线程池的方法时,会直接使用asyncPool作为线程池

private static final Executor asyncPool = useCommonPool ?
    ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

asyncPool根据useCommonPool来判断是使用 ForkJoinPool.commonPool() 还是使用 ThreadPerTaskExecutor

那么useCommonPool是如何确定的呢?我们继续往下查看

private static final boolean useCommonPool =
    (ForkJoinPool.getCommonPoolParallelism() > 1);

能否使用useCommonPool,由 ForkJoinPool.getCommonPoolParallelism() 决定,当它大于1时则使用 ForkJoinPool.commonPool() 否则使用 ThreadPerTaskExecutor

ForkJoinPool.getCommonPoolParallelism() 返回字段 commonParallelism

static final int commonParallelism;

commonParallelism 用于表示ForkJoinPool的并行粒度,在ForkJoinPool静态代码块中赋值初始化

ForkJoinPool.static

static {
   
    //其他略...

    //创建公共池
    common = java.security.AccessController.doPrivileged
        (new java.security.PrivilegedAction<ForkJoinPool>() {
   
            public ForkJoinPool run() {
    return makeCommonPool(); }});

    //计算并行粒度
    int par = common.config & SMASK; // report 1 even if threads disabled
    commonParallelism = par > 0 ? par : 1;
}

commonParallelism 并发粒度的字段由par决定,而par = common.config & SMASK

其中SMASK为65535(十进制),其二进制为全1,因此由 common 的字段 config 决定

(在创建公共池的过程会设置config字段)

ForkJoinPool.makeCommonPool

在创建公共池的代码中主要观察变量 parallelism 它为并发粒度

如果不携带参数,默认情况下并发粒度为CPU核数-1

private static ForkJoinPool makeCommonPool() {
   

    final ForkJoinWorkerThreadFactory commonPoolForkJoinWorkerThreadFactory =
            new CommonPoolForkJoinWorkerThreadFactory();
    //初始化并发粒度为-1
    int parallelism = -1;
    ForkJoinWorkerThreadFactory factory = null;
    UncaughtExceptionHandler handler = null;
    try {
     // ignore exceptions in accessing/parsing properties
        String pp = System.getProperty
            ("java.util.concurrent.ForkJoinPool.common.parallelism");
        String fp = System.getProperty
            ("java.util.concurrent.ForkJoinPool.common.threadFactory");
        String hp = System.getProperty
            ("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
        if (pp != null)
            //如果携带启动参数则设置为对应的并发粒度
            parallelism = Integer.parseInt(pp);
        if (fp != null)
            factory = ((ForkJoinWorkerThreadFactory)ClassLoader.
                       getSystemClassLoader().loadClass(fp).newInstance());
        if (hp != null)
            handler = ((UncaughtExceptionHandler)ClassLoader.
                       getSystemClassLoader().loadClass(hp).newInstance());
    } catch (Exception ignore) {
   
    }
    if (factory == null) {
   
        if (System.getSecurityManager() == null)
            factory = commonPoolForkJoinWorkerThreadFactory;
        else // use security-managed default
            factory = new InnocuousForkJoinWorkerThreadFactory();
    }
    if (parallelism < 0 && // default 1 less than #cores
        //默认情况下并发粒度 = CPU核数 - 1
        (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
        parallelism = 1;
    if (parallelism > MAX_CAP)
        parallelism = MAX_CAP;
    return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
                            "ForkJoinPool.commonPool-worker-");
}

在构建对象时,config字段 this.config = (parallelism & SMASK) | mode

其中SMASK为全1,mode为0,得到的结果是不变的,因此config的值就是parallelism并发粒度

至此我们可以得出结论:默认情况下,如果不指定线程池,当CPU核数-1超过1则会使用ForkJoin公共池(最大线程数量 = CPU核数 - 1),否则使用ThreadPerTaskExecutor(每次执行都创建线程执行)

static final class ThreadPerTaskExecutor implements Executor {
   
    public void execute(Runnable r) {
    new Thread(r).start(); }
}

ThreadPerTaskExecutor只适合执行周期长的任务,如果任务周期短,并且多的情况下,创建线程也会是很大一笔开销

使用CompletableFuture时务必指定线程池,线程池最好根据业务做好隔离

如果不指定线程池会根据CPU核数选择ForkJoinCommonPool或ThreadPerTaskExecutor,它们并不适合IO任务

线程如何执行?

在同步与异步的API中线程如何执行?

在异步的API中,如果指定线程池则交给线程池中的工作线程执行,否则选择Common Pool或ThreadPerTaskExecutor

在同步的API中,通常是当前线程进行执行任务,但如果任务B依赖的任务A未完成则由任务A的回调线程执行,任务A如果是异步则由线程池来执行

public static void testSync() {
   
        CompletableFuture<String> taskA = CompletableFuture.supplyAsync(() -> {
   
//            try {
   
//                Thread.sleep(5000);
//            } catch (InterruptedException e) {
   
//                throw new RuntimeException(e);
//            }
            return "ok";
        }, threadPool);


        CompletableFuture<Void> taskB = taskA.thenAccept(s -> {
   
            //任务A执行完(不睡时)由当前线程执行
            //任务A未执行完(睡眠时)由线程池的工作线程执行
            System.out.println(s);
            System.out.println(s);
        });

        taskB.join();
}

总结

CompletableFuture提供串行、AND、OR、异常捕获、结果聚合等多种API,通过这些API能够更方便、快捷的实现异步任务的编排

使用CompletableFuture时务必对任务进行异常处理,并且它会使用CompletionException或ExecutionException包装异常,再打印异常时记得使用工具类处理,避免打印到包装的异常

CompletableFuture异步任务中如果指定线程池则直接使用指定的线程池

如果未指定线程池,当前服务器CPU数量小于等于2(并发粒度低)时使用ThreadPerTaskExecutor,其他情况(并发粒度高)使用ForkJoin框架的common pool(并发粒度 = CPU数量 - 1)

未指定线程池时使用的线程池适合CPU任务,并不适合IO任务,使用异步时务必指定线程池

当使用异步API时,由线程池的工作线程执行;使用同步API时,如果当前任务依赖的任务未完成,则有依赖、未完成的任务的线程来执行

🌠最后(一键三连求求拉~)

本篇文章被收入专栏 由点到线,由线到面,深入浅出构建Java并发编程知识体系,感兴趣的同学可以持续关注喔

本篇文章笔记以及案例被收入 Gitee-CaiCaiJavaGithub-CaiCaiJava,除此之外还有更多Java进阶相关知识,感兴趣的同学可以starred持续关注喔~

有什么问题可以在评论区交流,如果觉得菜菜写的不错,可以点赞、关注、收藏支持一下~

关注菜菜,分享更多技术干货,公众号:菜菜的后端私房菜

相关文章
|
存储 缓存 监控
美团面试:说说OOM三大场景和解决方案? (绝对史上最全)
小伙伴们,有没有遇到过程序突然崩溃,然后抛出一个OutOfMemoryError的异常?这就是我们俗称的OOM,也就是内存溢出 本文来带大家学习Java OOM的三大经典场景以及解决方案,保证让你有所收获!
5301 0
美团面试:说说OOM三大场景和解决方案? (绝对史上最全)
|
监控 Java 测试技术
实战:Springboot集成Sentinel实现流量控制、熔断降级、负载保护
实战:Springboot集成Sentinel实现流量控制、熔断降级、负载保护
异步&线程池 CompletableFuture 异步编排 【下篇】
这篇文章深入探讨了Java中的`CompletableFuture`类,解释了如何创建异步操作、使用计算完成时的回调方法、异常处理、串行化方法、任务组合以及多任务组合的使用方式,并通过代码示例展示了各种场景下的应用。
异步&线程池 CompletableFuture 异步编排 【下篇】
|
9月前
|
存储 SQL 弹性计算
一文了解多云原生的现代化实时数仓 SelectDB Cloud
现代多云原生实时数据仓库 SelectDB Cloud,充分利用云原生能力,为客户提供极致性价比、融合统一、简单易用、安全稳定的云上数据分析服务。
413 4
一文了解多云原生的现代化实时数仓 SelectDB Cloud
|
11月前
|
NoSQL 关系型数据库 Java
以超卖为例✨各种场景下如何防止并发污染数据?
【10月更文挑战第8天】本文以商品库存扣减为例,探讨了在各种场景下如何防止并发操作导致的数据不一致问题。文章首先介绍了悲观锁和乐观锁的概念,然后分别从Java层面和中间件层面详细讲解了多种解决方案,包括使用synchronized、ReentrantLock、乐观锁(CAS)、数据库乐观锁和悲观锁、分布式锁(如Redis锁)等。最后,针对高并发场景,提出了将数据预热到Redis并使用Lua脚本保证原子性的方法。通过这些方法,可以有效防止超卖等数据污染问题。
以超卖为例✨各种场景下如何防止并发污染数据?
|
11月前
|
消息中间件 Serverless 数据安全/隐私保护
开发者如何使用云消息队列 RabbitMQ 版
【10月更文挑战第13天】开发者如何使用云消息队列 RabbitMQ 版
437 114
|
数据采集 JavaScript Java
CompletableFuture异步编排,你还不会?
本文介绍了同步与异步编程的概念,探讨了在复杂业务场景中使用异步编排的重要性。通过对比 `Future` 与 `CompletableFuture`,详细讲解了 `CompletableFuture` 的多种方法,如 `runAsync`、`supplyAsync`、`whenComplete`、`exceptionally` 等,并展示了如何通过 `CompletableFuture` 实现异步任务的组合与异常处理。最后,通过实战案例演示了如何利用线程池与 `CompletableFuture` 优化商品详情页的查询效率,显著减少响应时间。
352 3
CompletableFuture异步编排,你还不会?
|
SQL 存储 JSON
更快更强,SLS 推出高性能 SPL 日志查询模式
从海量的日志数据中,按照各种灵活的条件进行即时查询搜索,是可观测场景下的基本需求。本文介绍了 SLS 新推出的高性能 SPL 日志查询模式,支持 Unix 风格级联管道式语法,以及各种丰富的 SQL 处理函数。同时通过计算下推、向量化计算等优化,使得 SPL 查询可以在数秒内处理亿级数据,并支持 SPL 过滤结果分布图、随机翻页等特性。
13084 211
|
11月前
|
存储 分布式计算 负载均衡
分布式文件系统
【10月更文挑战第12天】
347 3
|
11月前
|
JavaScript Linux 开发者
JXcore
【10月更文挑战第23天】
109 2