- 引言
- 一、为什么需要CompletableFuture?
- 二、核心概念:Promise与异步任务
- 三、创建CompletableFuture
- 四、任务链式编排:thenApply、thenAccept、thenRun
- 五、组合多个Future:thenCompose与thenCombine
- 六、多任务组合:allOf与anyOf
- 七、异常处理:exceptionally与handle
- 八、总结与最佳实践
- 互动环节
引言
在传统的Java并发编程中,我们使用Future来获取异步任务的执行结果。但它有一个致命的弱点:获取结果会阻塞线程,而且难以优雅地处理多个异步任务之间的依赖关系。
想象一下这样的场景:你需要调用三个不同的远程服务,然后将它们的结果合并返回。如果用Future.get(),你只能在每个调用后傻傻地等待,性能极差。
CompletableFuture的出现,彻底改变了这一切!它不仅是Future的增强版,更是一个强大的异步编程工具,允许你以声明式的风格构建复杂的异步任务流水线,无需阻塞等待,极大提升了程序的吞吐量和响应能力。
一、为什么需要CompletableFuture?
传统Future的局限性:
- 阻塞获取结果:get()方法是阻塞的,调用它时线程只能等待,无法在等待时执行其他有用工作。
- 无法手动完成:不能手动设置Future的结果或异常。
- 无法链式处理:很难表达“当一个任务完成后,接着做另一件事”这样的逻辑。需要手动用get()获取结果后再提交新任务,代码繁琐且易出错。
- 无法组合多个Future:合并多个异步任务的结果非常困难且不优雅。
CompletableFuture的核心优势:
- 非阻塞回调:提供了一系列方法(如thenApply, thenAccept),可以在任务完成后自动触发回调,无需阻塞等待。
- 手动完成:可以手动设置结果或异常,使任务完成。
- 强大的链式编程:支持将多个异步任务以链式(流水线)的方式组合起来。
- 多任务组合:可以轻松地等待所有任务完成(allOf),或等待任意一个任务完成(anyOf)。
- 优雅的异常处理:提供了专门的异常处理方法,可以在异步链的任何阶段处理异常。
简单说,CompletableFuture让异步编程变得像搭积木一样简单直观。
二、核心概念:Promise与异步任务
CompletableFuture实现了两个接口:
- Future:提供了异步计算的结果(get(), isDone()等)。
- CompletionStage:代表了异步计算中的一个阶段(Stage)。一个阶段的完成可以触发另一个依赖阶段的执行。这才是CompletableFuture强大功能的源泉。
你可以把它理解为一个Promise:我给你一个承诺(CompletableFuture对象),将来某个时刻我会给你一个结果(或一个异常)。在这个承诺兑现之前,你可以告诉它:“结果出来后,请接着做A,然后再做B...”。
三、创建CompletableFuture
1. 使用runAsync和supplyAsync
这是最常用的创建方式,它们都会在ForkJoinPool.commonPool()(默认的ForkJoin线程池)中异步执行任务。
import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; public class CreationExample { public static void main(String[] args) throws ExecutionException, InterruptedException { // 1. runAsync: 执行没有返回值的Runnable任务 CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> { System.out.println("正在运行一个无返回值的异步任务..."); sleep(1000); }); future1.get(); // 等待任务完成 // 2. supplyAsync: 执行有返回值的Supplier任务(最常用) CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { System.out.println("正在运行一个有返回值的异步任务..."); sleep(1000); return "Hello, CompletableFuture!"; }); // 阻塞获取结果 String result = future2.get(); System.out.println("获取到的结果: " + result); } private static void sleep(long millis) { try { Thread.sleep(millis); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }
提示:这两个方法都有重载版本,可以传入自定义的Executor来指定任务在哪个线程池中运行。
四、任务链式编排:thenApply、thenAccept、thenRun
这是CompletableFuture最核心、最常用的功能。它允许你将多个任务串联起来,形成一个异步流水线。
类比:就像在餐厅点餐。
- supplyAsync(() -> 下单) -> 后厨开始做菜(返回一个Future代表将来的菜)
- .thenApply(菜 -> 切好的菜) -> 对菜进行加工(转换结果)
- .thenAccept(切好的菜 -> 装盘) -> 消费结果,但不产生新结果
- .thenRun(() -> 擦桌子) -> 不关心上一步的结果,只在前一步完成后执行一个动作
public class ChainingExample { public static void main(String[] args) { // 创建一个初始的异步任务 CompletableFuture.supplyAsync(() -> { System.out.println("任务1: 查询用户ID - " + Thread.currentThread().getName()); sleep(500); return "User123"; }) // thenApply: 接收上一步的结果,进行转换,返回新值 .thenApply(userId -> { System.out.println("任务2: 根据ID(" + userId + ")查询用户详情"); sleep(500); return userId + " 的详细信息"; }) // thenAccept: 接收上一步的结果,进行消费,不返回新值 .thenAccept(userDetail -> { System.out.println("任务3: 发送用户详情(" + userDetail + ")到界面"); }) // thenRun: 不关心上一步结果,只在前序步骤完成后执行 .thenRun(() -> { System.out.println("任务4: 所有操作已完成,记录日志"); }); // 主线程继续执行其他工作... System.out.println("主线程不会被阻塞,可以继续做其他事"); sleep(3000); // 等待异步任务链执行完毕 } }
关键点:
- 异步执行:默认情况下,每个回调任务可能会在同一个线程或其他线程中执行(取决于CompletableFuture的内部优化)。可以使用thenApplyAsync等方法强制指定在新的异步线程中执行。
- 非阻塞:整个链条的构建是瞬间完成的,不会阻塞主线程。
五、组合多个Future:thenCompose与thenCombine
1.thenCompose:扁平化嵌套的Future
用于解决“回调地狱”(Callback Hell)。当一个异步任务依赖另一个异步任务的结果时使用。
// 假设我们有两个服务方法,都返回CompletableFuture CompletableFuture<String> getUserDetail(String userId) { return CompletableFuture.supplyAsync(() -> "Detail of " + userId); } CompletableFuture<Double> getCreditRating(String userDetail) { return CompletableFuture.supplyAsync(() -> 99.5); } public void thenComposeExample() { // 不使用thenCompose会导致嵌套(丑陋) // CompletableFuture<CompletableFuture<Double>> badNesting = getUserDetail("User123").thenApply(detail -> getCreditRating(detail)); // 使用thenCompose: 扁平化结果 CompletableFuture<Double> result = getUserDetail("User123") .thenCompose(this::getCreditRating); // this::getCreditRating 等价于 detail -> getCreditRating(detail) result.thenAccept(rating -> System.out.println("用户信用分: " + rating)); }
2.thenCombine:合并两个独立Future的结果
当两个互不依赖的异步任务都完成后,需要对它们的结果进行合并处理时使用。
public void thenCombineExample() { CompletableFuture<Double> weightFuture = CompletableFuture.supplyAsync(() -> { sleep(500); return 65.5; // 体重 }); CompletableFuture<Double> heightFuture = CompletableFuture.supplyAsync(() -> { sleep(800); return 1.75; // 身高 }); // 合并两个独立任务的结果 CompletableFuture<Double> bmiFuture = weightFuture.thenCombine(heightFuture, (weight, height) -> { System.out.println("计算BMI,体重: " + weight + ", 身高: " + height); return weight / (height * height); }); bmiFuture.thenAccept(bmi -> System.out.println("您的BMI指数: " + bmi)); }
六、多任务组合:allOf与anyOf
1.allOf:等待所有任务完成
当你需要并行执行多个独立任务,并等待它们全部完成后再继续时使用。
public void allOfExample() { CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {sleep(1000); return "结果1";}); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {sleep(2000); return "结果2";}); CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {sleep(3000); return "结果3";}); CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2, future3); // allOf返回的Future在所有任务完成时完成,但没有聚合结果 allFutures.thenRun(() -> { // 此时所有任务都已完成,可以安全地调用get()了(不会阻塞) try { String result1 = future1.get(); String result2 = future2.get(); String result3 = future3.get(); System.out.println("所有任务完成,结果: " + result1 + ", " + result2 + ", " + result3); } catch (Exception e) { e.printStackTrace(); } }).join(); // 等待所有任务完成 }
2.anyOf:等待任意一个任务完成
当你只需要多个任务中的任意一个完成时使用。
public void anyOfExample() { CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {sleep(1000); return "快速结果";}); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {sleep(5000); return "慢速结果";}); CompletableFuture<Object> winner = CompletableFuture.anyOf(future1, future2); winner.thenAccept(result -> System.out.println("最先完成的任务结果是: " + result)); // 输出: "最先完成的任务结果是: 快速结果" }
七、异常处理:exceptionally与handle
在异步链中,任何一步都可能抛出异常。CompletableFuture提供了优雅的方式来处理异常,而不需要丑陋的try-catch块。
1.exceptionally:捕获异常并提供默认值
类似于try-catch,只在发生异常时被调用。
public void exceptionallyExample() { CompletableFuture.supplyAsync(() -> { if (true) { // 模拟一个错误 throw new RuntimeException("Oops! Something went wrong!"); } return "Success"; }) .exceptionally(ex -> { // 只有当异常发生时,才会进入这个回调 System.err.println("哎呀,出错了: " + ex.getMessage()); return "Default Value"; // 提供一个默认的恢复值 }) .thenAccept(result -> System.out.println("最终结果: " + result)); // 输出: "最终结果: Default Value" }
2.handle:无论成功或失败都会执行
类似于try-catch-finally,无论成功还是失败都会被调用,并接收结果和异常两个参数。
public void handleExample() { CompletableFuture.supplyAsync(() -> { // 可能成功,也可能失败 return "Success"; // throw new RuntimeException("Failed"); }) .handle((result, ex) -> { if (ex != null) { // 处理异常情况 System.err.println("任务失败: " + ex.getMessage()); return "Recovered from failure"; } // 处理成功情况 System.out.println("任务成功,结果为: " + result); return result.toUpperCase(); }) .thenAccept(finalResult -> System.out.println("处理后的结果: " + finalResult)); }
八、总结与最佳实践
- 核心思想:CompletableFuture将复杂的异步回调、任务编排和异常处理,转变为声明式、函数式的链式调用,代码更加清晰和易于维护。
- ** vs 传统Future**:它解决了Future.get()阻塞线程的问题,提供了无比强大的任务组合能力。
- 最佳实践:
- 指定自定义线程池:默认使用ForkJoinPool.commonPool(),在生产环境中,强烈建议根据业务类型传入自定义的Executor,以避免不同业务相互干扰或耗尽公共线程池。
- 善用异常处理:一定要在异步链的末尾或关键节点使用exceptionally或handle,避免异常被悄无声息地吞掉。
- 理解异步边界:明确每个thenApply/thenAccept是在哪个线程上执行的(默认与上一个任务相同,使用Async后缀则不同)。
- 适用场景:
- 并行调用多个远程服务,然后聚合结果。
- 构建异步、非阻塞的响应式流程。
- 处理复杂的、多步骤的异步业务逻辑。
CompletableFuture是Java 8引入的最重要的并发工具之一,它彻底改变了Java的异步编程模式,是学习和开发现代高并发、高性能Java应用的必备利器。