CompletableFuture原理及应用场景详解

本文涉及的产品
容器镜像服务 ACR,镜像仓库100个 不限时长
MSE Nacos/ZooKeeper 企业版试用,1600元额度,限量50份
函数计算FC,每月15万CU 3个月
简介: CompletableFuture是Java 8引入的异步编程工具,用于优化多任务并行处理。相比传统Future,它支持可组合操作(如thenApply、thenCombine),避免回调地狱,同时降低依赖间的阻塞。其核心通过result存储结果,stack管理依赖动作,基于观察者模式实现回调通知。使用中需注意:异步方法建议显式传入线程池以隔离资源;异常信息需通过get()或exceptionally捕获。适用于复杂业务场景,如APP页面加载涉及多服务API调用时,可显著提升性能与代码可读性。

CompletableFuture.png

1.应用场景

现在我们打开各个APP上的一个页面,可能就需要涉及后端几十个服务的API调用,比如某宝、某个外卖APP上,下面是某个外卖APP的首页。首页上的页面展示会关联很多服务的API调用,如果使用同步调用的方式,接口耗时完全不能满足需求,因此,需要用到异步调用的方式。

CF0_1.jpg

2.使用线程池的弊端

说起异步调用,我们通常是创建一个线程池来实现多个请求的并行调用,这样接口的整体耗时由执行时间最长的线程决定。

CF0_2.png

但是线程池存在的问题是资源利用率较低:

  • CPU资源大量浪费在阻塞等待上
  • CPU调度的线程数增加了,在上下文切换上的资源消耗更大了。而且线程本身也占用系统资源

3.CompletableFuture的特性

我们引入CompletableFuture对业务流程进行编排,降低依赖之间的阻塞。本文主要讲述CompletableFuture的使用和原理。并对比Future、CompletableFuture、RxJava、Reactor的特性

Future CompletableFuture RxJava Reactor
Composable(可组合) ✔️ ✔️ ✔️
Asynchronous(异步) ✔️ ✔️ ✔️ ✔️
Operator fusion(操作融合) ✔️ ✔️
Lazy(延迟执行) ✔️ ✔️
Backpressure(回压) ✔️ ✔️
  • 可组合:将多个依赖操作通过不同方式进行编排,例如CompletableFuture提供thenCompose、thenCombine等方法,这些方法支持了可组合的特性
  • 操作融合:将数据流中的多个操作符以某种方式结合起来,进而降低开销
  • 延迟执行:操作不会立即执行,当收到明确指示时才会触发操作
  • 回压:异步阶段的处理速度跟不上,直接失败会导致大量数据丢失,这是需要反馈上游生产者降低调用量

RxJava和Reactor虽然功能更强大,但是学习成本也更高,我们选择学习成本较低的CompletableFuture

4 一个例子回顾Future

CompletableFuture是由Java 8引入的,在Java8之前我们一般通过Future实现异步,而Future是Java5新加的接口,提供异步并行计算的功能

  • Future只能通过阻塞或者轮询的方式获取结果,且不支持设置回调方法
  • Future.get()方法是阻塞调用获取结果,还提供了isDone方法,在程序中轮询这个方法可查询执行结果

创建任务方法类

public class UserService {
   

    public String getUserInfo() throws InterruptedException {
   
        Thread.sleep(300L);
        return "getUserInfo() 返回结果";
    }

    public String getUserAddress() throws InterruptedException {
   
        Thread.sleep(500L);
        return "getUserAddress() 返回结果";
    }
}

创建Future测试

public class FutureTest {
   

    public static void main(String[] args) {
   
        ExecutorService executor = Executors.newFixedThreadPool(2);
        UserService userService = new UserService();
        try {
   
            Long start = System.currentTimeMillis();
            Future<String> future1 = executor.submit(new Callable<String>() {
   
                @Override
                public String call() throws Exception {
   
                    return userService.getUserInfo();
                }
            });
            Future<String> future2 = executor.submit(new Callable<String>() {
   
                @Override
                public String call() throws Exception {
   
                    return userService.getUserAddress();
                }
            });
            String result1 = future1.get();
            System.out.println(result1);
            String result2 = future2.get();
            System.out.println(result2);

            System.out.println("两个任务执行耗时:" + (System.currentTimeMillis() - start) + " ms");

        } catch (Exception e) {
   
            e.printStackTrace();
        } finally {
   
            executor.shutdown();
        }
    }
}

最后执行结果为:

getUserInfo() 返回结果
getUserAddress() 返回结果
两个任务执行耗时:505 ms

使用Future后任务的整体耗时,由最长的任务耗时决定

前面也说过,Future对结果的获取不友好,没有提供回调方法,只能阻塞或者轮询的方式。

Java8之前也可以用guava的ListenableFuture,来设置回调,但是这样又会导致臭名昭著的回调地狱(异步编程中因多层嵌套回调函数导致的代码可读性、可维护性急剧下降的现象),这里不展开了

5.CompletableFuture的使用

CompletableFuture实现了两个接口:Future和CompletionStage,Future用于异步计算,CompletionStage用于表示异步执行过程汇总的一个步骤Stage

CF1.png

5.1一个例子入门CompletableFuture

这里创建一个流程,多个任务之间存在依赖关系

根据依赖数量,可以分为:零依赖、一元依赖、二元依赖、多元依赖

CF2.png

5.1.1零依赖:创建异步任务

CF3.png

上面两个任务CF1、CF2就是零依赖,可以直接创建,主要有三种创建方式:

        ExecutorService executor = Executors.newFixedThreadPool(5);
        UserService userService = new UserService();
        //1、使用runAsync或supplyAsync发起异步调用
        CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() ->
                userService.getUserInfo(), executor);
        //2、CompletableFuture.completedFuture()直接创建一个已完成状态的CompletableFuture
        CompletableFuture<String> cf2 = CompletableFuture.completedFuture("result2");
        //3、先初始化一个未完成的CompletableFuture,然后通过complete() completeExceptionally(),完成该CompletableFuture
        CompletableFuture<String> cf = new CompletableFuture<>();
        cf.complete("success");

5.1.2 一元依赖:依赖一个CF

任务的执行存在一个上游依赖,可以通过thenApply、thenAccept、thenCompose方法来实现

CF4.png

CompletableFuture<String> cf3 = cf1.thenApply(result1 -> {
   
  //result1为CF1的结果
  //......
  return "result3";
});
CompletableFuture<String> cf5 = cf2.thenApply(result2 -> {
   
  //result2为CF2的结果
  //......
  return "result5";
});

5.1.3 二元依赖:依赖两个CF

上图中的CF4就是个二元依赖,它依赖CF1和CF2,我们通过thenCombine等回调来实现。代码如下:

CompletableFuture<String> cf4 = cf1.thenCombine(cf2, (result1, result2) -> {
   
            //result1和result2分别为cf1和cf2的结果
            return "result4";
        });

5.1.4 多元依赖:依赖多个CF

CF5.png

CF6是多元依赖,这种关系可以通过allOfanyOf方法来实现:

  • allOf方法:多个依赖需全部完成
  • anyOf方法:任意一个依赖完成即可
        //多元依赖
CompletableFuture<Void> cf6 = CompletableFuture.allOf(cf3, cf4, cf5);
CompletableFuture<String> result = cf6.thenApply(v -> {
   
            //这里的join并不会阻塞,因为传给thenApply的函数是在CF3、CF4、CF5全部完成时,才会执行 。
            String result3 = cf3.join();
            String result4 = cf4.join();
            String result5 = cf5.join();
            //根据result3、result4、result5组装最终result;
            return result3 + result4 + result5;
        });

6.CompletableFuture原理

CompletableFuture包含了两个volatile修饰的变量:result和stack

  • result存储当前CF的结果
  • stack表示当前CF完成后需要触发的依赖动作,依赖动作可以有多个,以栈形成存储,stack表示栈顶元素
    volatile Object result;       // Either the result or boxed AltResult
    volatile Completion stack;    // Top of Treiber stack of dependent actions

Completion类本身是观察者的基类

CF6.png

被观察者:每个CF都是一个被观察者,stack中存储的是注册的所有观察者,当CF执行完成后,会弹栈stack,依次通知观察者。result用于存储CF执行的结果数据

观察者:回调方法如thenApply、thenAccept会生成一个Completion类型的对象,就是观察者。检查当前CF是否已完成,如果已完成则执行Completion,否则加入观察者链stack中

7.使用问题

7.1代码执行在哪个线程上?

CompletableFuture的组合操作都有同步和异步两种方法:

同步方法(即不带Async后缀的):

  • 如果注册时被依赖的操作已经执行完成,则直接由当前线程执行
  • 如果注册时被依赖操作未执行完,则由回调线程执行

异步方法(带Async后缀的):

  • 不传递线程池参数Executor时,由公共线程池CommonPool(CPU核数-1)执行
  • 传递时用的传入的指定线程池

7.2异步回调要传线程池

异步回调时强制传入线程池,并根据实际情况做线程池隔离

不传递时,使用的都是公共线程池CommonPool,容易形成性能瓶颈。手动传递线程池参数可以更方便调节参数,并给不同业务分配不同线程池,做到资源隔离

7.3 Future需要获取返回值,才能获取异常信息

CompletableFuture<Void> future = CompletableFuture.supplyAsync(
     ......
)

  //如果不加get()方法这一行,看不到异常信息
  future.get();

Future需要获取返回值时,才能获取到异常信息,不加get()方法是看不到的。

CompletableFuture还提供了异常捕获回调exceptionally方法,相当于同步调用中的try/catch方法可获取异常

public CompletableFuture<Integer> getCancelTypeAsync(long orderId) {
   
    CompletableFuture<WmOrderOpRemarkResult> remarkResultFuture = wmOrderAdditionInfoThriftService.findOrderCancelledRemarkByOrderIdAsync(orderId);//业务方法,内部会发起异步rpc调用
    return remarkResultFuture
      .exceptionally(err -> {
   //通过exceptionally 捕获异常,打印日志并返回默认值
         log.error("WmOrderRemarkService.getCancelTypeAsync Exception orderId={}", orderId, err);
         return 0;
      });
}
相关文章
|
存储 缓存 监控
美团面试:说说OOM三大场景和解决方案? (绝对史上最全)
小伙伴们,有没有遇到过程序突然崩溃,然后抛出一个OutOfMemoryError的异常?这就是我们俗称的OOM,也就是内存溢出 本文来带大家学习Java OOM的三大经典场景以及解决方案,保证让你有所收获!
6021 0
美团面试:说说OOM三大场景和解决方案? (绝对史上最全)
|
6月前
|
监控 安全 Java
Spring AOP实现原理
本内容主要介绍了Spring AOP的核心概念、实现机制及代理生成流程。涵盖切面(Aspect)、连接点(Join Point)、通知(Advice)、切点(Pointcut)等关键概念,解析了JDK动态代理与CGLIB代理的原理及对比,并深入探讨了通知执行链路和责任链模式的应用。同时,详细分析了AspectJ注解驱动的AOP解析过程,包括切面识别、切点表达式匹配及通知适配为Advice的机制,帮助理解Spring AOP的工作原理与实现细节。
1027 13
|
8月前
|
缓存 监控 NoSQL
场景题:线上接口响应慢,应该如何排查问题?
面试中常见的接口响应慢排查题旨在考察研发人员的系统性解决问题的能力。回答时需结合业务场景(如大促、高峰期),并运用工具(Arthas、SkyWalking等)进行监控告警、链路追踪和日志分析,明确问题范围及原因。具体步骤包括:1. 定位问题(确认单个接口或整体系统、查看APM指标、分析链路和日志);2. 排查网络、中间件及外部依赖(检测延迟、检查Redis、RocketMQ、MySQL等);3. 服务端性能分析(CPU、内存、磁盘IO、JVM调优)。最后提出优化方案,如代码逻辑、数据库、缓存策略及资源扩容等。总结时可结合实际案例,展示完整的排查与优化流程。
1432 3
|
10月前
|
Prometheus 监控 Cloud Native
高频面题: 你们线上 QPS 多少?你 怎么知道的?
本文由45岁资深架构师尼恩撰写,针对高级开发和架构师面试中的高频问题提供详细解答。文章涵盖了QPS、TPS、RT等性能指标的定义及计算方法,详解了如何配置Prometheus与Grafana监控系统QPS,并提供了应对高并发场景(如双十一抢购)的系统部署策略。此外,还分享了多个大厂面试真题及解决方案,帮助读者在面试中充分展示技术实力,提升求职竞争力。建议收藏并深入学习,为面试做好充分准备。更多内容可参考《尼恩Java面试宝典》及相关技术圣经系列PDF。
|
10月前
|
Java
Java中的CompletableFuture详解
`CompletableFuture`提供了一种简洁而强大的方式来处理Java中的异步编程。通过它,我们可以轻松地创建和组合异步任务,并处理任务中的异常。掌握 `CompletableFuture`的使用,将显著提升Java并发编程的效率和代码可读性。
536 16
|
监控 负载均衡 Java
5 大 SpringCloud 核心组件详解,8 张图彻底弄懂
本文图文详解 Spring Cloud 的五大核心组件,帮助深入理解和掌握微服务架构。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
5 大 SpringCloud 核心组件详解,8 张图彻底弄懂
|
8月前
|
分布式计算 算法 Java
|
8月前
|
Java Spring
SpringBoot自动配置原理
本文深入解析了SpringBoot的核心功能——自动配置,重点探讨了`org.springframework.boot.autoconfigure`及相关注解的工作机制。通过分析`@SpringBootApplication`、`@EnableAutoConfiguration`等注解,揭示了SpringBoot如何基于类路径和条件自动装配Bean
437 8
|
11月前
|
NoSQL Java Redis
Spring Boot 自动配置机制:从原理到自定义
Spring Boot 的自动配置机制通过 `spring.factories` 文件和 `@EnableAutoConfiguration` 注解,根据类路径中的依赖和条件注解自动配置所需的 Bean,大大简化了开发过程。本文深入探讨了自动配置的原理、条件化配置、自定义自动配置以及实际应用案例,帮助开发者更好地理解和利用这一强大特性。
1834 15