CompletableFuture使用详解

简介: CompletableFuture使用详解

 目录

异步计算

Future 接口的局限性

CompletionStage

CompletableFuture

下面是CompletableFuture的一些常用的方法:


异步计算

    • 所谓异步调用其实就是实现一个可无需等待被调用函数的返回值而让操作继续运行的方法。在 Java 语言中,简单的讲就是另启一个线程来完成调用中的部分计算,使调用继续运行或返回,而不需要等待计算结果。但调用者仍需要取线程的计算结果。
    • JDK5新增了Future接口,用于描述一个异步计算的结果。虽然 Future 以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的 CPU 资源,而且也不能及时地得到计算结果。
    • 以前我们获取一个异步任务的结果可能是这样写的
    • image.gif编辑

    Future 接口的局限性

    Future接口可以构建异步应用,但依然有其局限性。它很难直接表述多个Future 结果之间的依赖性。实际开发中,我们经常需要达成以下目的:

      1. 将多个异步计算的结果合并成一个
      2. 等待Future集合中的所有任务都完成
      3. Future完成事件(即,任务完成以后触发执行动作)
      4. 。。。

      CompletionStage

        • CompletionStage代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段
        • 一个阶段的计算执行可以是一个Function,Consumer或者Runnable。比如:stage.thenApply(x -> square(x)).thenAccept(x -> System.out.print(x)).thenRun(() -> System.out.println())
        • 一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发

        CompletableFuture

          • 在Java8中,CompletableFuture提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合 CompletableFuture 的方法。
          • 它可能代表一个明确完成的Future,也有可能代表一个完成阶段( CompletionStage ),它支持在计算完成以后触发一些函数或执行某些动作。
          • 它实现了Future和CompletionStage接口

          image.gif编辑

          在JAVA中你可以理解为可以使用它来创建一个异步线程处理你想要处理的内容,并且这个异步线程是非阻塞的。同时你要注意的是,每一个任务都可以是一个Feature 或者是 CompletionStage 都是后台守护线程(Daemon),也就是说,随着主线程的死亡,守护线程不管有没有完成任务,也会结束。

          下面是CompletableFuture的一些常用的方法:

          public class WelfareImageController {
              /**
               * runAsync 无返回值
               *
               * @throws Exception
               */
              public static void runAsync() throws Exception {
                  CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                      try {
                          TimeUnit.SECONDS.sleep(1);
                      } catch (InterruptedException e) {
                      }
                      System.out.println("run end ...");
                  });
                  future.get();
              }
              /**
               * supplyAsync
               * 有返回值
               *
               * @throws Exception
               */
              public static void supplyAsync() throws Exception {
                  Instant start = Instant.now();
                  CompletableFuture<Long> future = CompletableFuture.supplyAsync(() -> {
                      try {
                          TimeUnit.SECONDS.sleep(1);
                      } catch (InterruptedException e) {
                      }
                      System.out.println("run end ...");
                      return System.currentTimeMillis();
                  });
                  long time = future.get();
                  System.out.println("time = " + time);
                  Instant end = Instant.now();
                  System.out.println("总耗时:" + Duration.between(start, end).getSeconds());
              }
              /**
               * whenComplete 方法
               * 异步线程任务执行完后调用 whenComplete 方法
               *
               * @throws Exception
               */
              public static void whenComplete() throws Exception {
                  CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                      try {
                          TimeUnit.SECONDS.sleep(1);
                      } catch (InterruptedException e) {
                      }
                      if (new Random().nextInt() % 2 >= 0) {
                          int i = 12 / 0;
                      }
                      System.out.println("run end ...");
                  });
                  future.whenComplete(new BiConsumer<Void, Throwable>() {
                      @Override
                      public void accept(Void t, Throwable action) {
                          System.out.println("执行完成!");
                      }
                  });
                  // runAsync 执行异常的时候就执行此方法
                  future.exceptionally(new Function<Throwable, Void>() {
                      @Override
                      public Void apply(Throwable t) {
                          System.out.println("执行失败!" + t.getMessage());
                          return null;
                      }
                  });
                  TimeUnit.SECONDS.sleep(2);
              }
              /**
               * thenApply 方法
               * 异步线程执行完返回的结果作为下个任务的参数
               * 注意: 任务出现异常则不执行 thenApply 方法
               *
               * @throws Exception
               */
              private static void thenApplyOld() throws Exception {
                  CompletableFuture<Long> future = CompletableFuture.supplyAsync(new Supplier<Long>() {
                      @Override
                      public Long get() {
                          long result = new Random().nextInt(100);
                          System.out.println("result1=" + result);
                          return result;
                      }
                  }).thenApply(new Function<Long, Long>() {
                      @Override
                      public Long apply(Long t) {
                          long result = t * 5;
                          System.out.println("result2=" + result);
                          return result;
                      }
                  });
                  long result = future.get();
                  System.out.println(result);
              }
              /**
               * thenApply 方法
               * 上一个方法的简化
               *
               * @throws Exception
               */
              private static void thenApply() throws Exception {
                  CompletableFuture<Long> future = CompletableFuture.supplyAsync(() -> {
                      long result = new Random().nextInt(100);
                      System.out.println("result1=" + result);
                      return result;
                  }).thenApply(t -> {
                      long result = t * 5;
                      System.out.println("result2=" + result);
                      return result;
                  });
                  long result = future.get();
                  System.out.println(result);
              }
              /**
               * handle 方法
               * handle 方法和 thenApply 方法处理方式基本一样。
               * 不同的是 handle 是在任务完成后再执行,还可以处理异常的任务。
               * thenApply 只可以执行正常的任务,任务出现异常则不执行 thenApply 方法。
               *
               * @throws Exception
               */
              public static void handle() throws Exception {
                  CompletableFuture<Integer> future = CompletableFuture.supplyAsync(new Supplier<Integer>() {
                      @Override
                      public Integer get() {
                          int i = 10 / 0;
                          return new Random().nextInt(10);
                      }
                  }).handle(new BiFunction<Integer, Throwable, Integer>() {
                      @Override
                      public Integer apply(Integer param, Throwable throwable) {
                          int result = -1;
                          if (throwable == null) {
                              result = param * 2;
                          } else {
                              System.out.println(throwable.getMessage());
                          }
                          return result;
                      }
                  });
                  System.out.println(future.get());
              }
              /**
               * thenAccept
               * 接收异步任务的返回结果进行处理 无返回值
               *
               * @param
               * @throws Exception
               */
              public static void thenAccept() throws Exception {
                  CompletableFuture<Void> future = CompletableFuture.supplyAsync(new Supplier<Integer>() {
                      @Override
                      public Integer get() {
                          return new Random().nextInt(10);
                      }
                  }).thenAccept(integer -> {
                      System.out.println(integer);
                  });
                  future.get();
              }
              /**
               * thenRun
               * 与上面的thenAccept 不同的是 不关心任务的处理结果 只要任务处理完就执行此方法
               * 注意: 任务遇到未抛出的异常则不会执行
               *
               * @param
               * @throws Exception
               */
              public static void thenRun() throws Exception {
                  CompletableFuture<Void> future = CompletableFuture.supplyAsync(new Supplier<Integer>() {
                      @Override
                      public Integer get() {
                          return new Random().nextInt(10);
                      }
                  }).thenRun(() -> {
                      System.out.println("thenRun ...");
                  });
                  future.get();
              }
              /**
               * thenCombine 合并任务
               * thenCombine 会把 两个 CompletionStage 的任务都执行完成后,把两个任务的结果一块交给 thenCombine 来处理。
               * 这里有点像stream流的reduce
               *
               * @param
               * @throws Exception
               */
              private static void thenCombine() throws Exception {
                  CompletableFuture<String> future1 = CompletableFuture.supplyAsync(new Supplier<String>() {
                      @Override
                      public String get() {
                          return "hello";
                      }
                  });
                  CompletableFuture<String> future2 = CompletableFuture.supplyAsync(new Supplier<String>() {
                      @Override
                      public String get() {
                          return "hello";
                      }
                  });
                  CompletableFuture<String> result = future1.thenCombine(future2, new BiFunction<String, String, String>() {
                      @Override
                      public String apply(String t, String u) {
                          return t + " " + u;
                      }
                  });
                  System.out.println(result.get());
              }
              /**
               * reduce 并行计算:
               * 解释一下: 后一个 Integer::sum 就是对前面 一个Integer::sum 阶段性累加结果进行合并
               *
               * @param
               * @throws Exception
               */
              public static void reduceTest() {
                  List<Integer> integers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
                  Integer reduce = integers.parallelStream().reduce(0, Integer::sum, Integer::sum);
                  System.out.println(reduce);
              }
              /**
               * thenAcceptBoth
               * 当两个CompletionStage都执行完成后,把结果一块交给thenAcceptBoth来进行消耗
               *
               * @param
               * @throws Exception
               */
              private static void thenAcceptBoth() throws Exception {
                  CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
                      @Override
                      public Integer get() {
                          int t = new Random().nextInt(3);
                          System.out.println("f1=" + t);
                          return t;
                      }
                  });
                  CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
                      @Override
                      public Integer get() {
                          int t = new Random().nextInt(3);
                          System.out.println("f2=" + t);
                          return t;
                      }
                  });
                  f1.thenAcceptBoth(f2, new BiConsumer<Integer, Integer>() {
                      @Override
                      public void accept(Integer t, Integer u) {
                          System.out.println("f1=" + t + ";f2=" + u + ";");
                      }
                  });
              }
              private static void thenAcceptBothNew() throws Exception {
                  CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
                      @Override
                      public Integer get() {
                          int t = new Random().nextInt(3);
                          try {
                              TimeUnit.SECONDS.sleep(t);
                          } catch (InterruptedException e) {
                              e.printStackTrace();
                          }
                          System.out.println("f1="+t);
                          return t;
                      }
                  });
                  CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
                      @Override
                      public Integer get() {
                          int t = new Random().nextInt(3);
                          try {
                              TimeUnit.SECONDS.sleep(t);
                          } catch (InterruptedException e) {
                              e.printStackTrace();
                          }
                          System.out.println("f2="+t);
                          return t;
                      }
                  });
                  f1.thenAcceptBoth(f2, new BiConsumer<Integer, Integer>() {
                      @Override
                      public void accept(Integer t, Integer u) {
                          System.out.println("f1="+t+";f2="+u+";");
                      }
                  });
              }
              /**
               * applyToEither 方法
               * 两个CompletionStage,谁执行返回的结果快,我就用那个CompletionStage的结果进行下一步的转化操作。
               * 慢的那个任务也会执行 但不会被applyToEither调用返回值做任何处理
               *
               * @param
               * @throws Exception
               */
              private static void applyToEither() throws Exception {
                  CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
                      @Override
                      public Integer get() {
                          int t = new Random().nextInt(3);
                          try {
                              TimeUnit.SECONDS.sleep(t);
                          } catch (InterruptedException e) {
                              e.printStackTrace();
                          }
                          System.out.println("f1=" + t);
                          return t;
                      }
                  });
                  CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
                      @Override
                      public Integer get() {
                          int t = new Random().nextInt(3);
                          try {
                              TimeUnit.SECONDS.sleep(t);
                          } catch (InterruptedException e) {
                              e.printStackTrace();
                          }
                          System.out.println("f2=" + t);
                          return t;
                      }
                  });
                  CompletableFuture<Integer> result = f1.applyToEither(f2, new Function<Integer, Integer>() {
                      @Override
                      public Integer apply(Integer t) {
                          System.out.println(t);
                          return t * 2;
                      }
                  });
                  System.out.println(result.get());
              }
              /**
               * acceptEither 方法
               * 两个CompletionStage,谁执行返回的结果快,我就用那个CompletionStage的结果进行下一步的消耗操作。
               *
               * @param
               * @throws Exception
               */
              private static void acceptEither() throws Exception {
                  CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
                      @Override
                      public Integer get() {
                          int t = new Random().nextInt(3);
                          System.out.println("f1=" + t);
                          return t;
                      }
                  });
                  CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
                      @Override
                      public Integer get() {
                          int t = new Random().nextInt(3);
                          System.out.println("f2=" + t);
                          return t;
                      }
                  });
                  f1.acceptEither(f2, new Consumer<Integer>() {
                      @Override
                      public void accept(Integer t) {
                          System.out.println(t);
                      }
                  });
              }
              private static void acceptEitherNew() throws Exception {
                  CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
                      @Override
                      public Integer get() {
                          int t = new Random().nextInt(3);
                          try {
                              TimeUnit.SECONDS.sleep(t);
                          } catch (InterruptedException e) {
                              e.printStackTrace();
                          }
                          System.out.println("f1="+t);
                          return t;
                      }
                  });
                  CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
                      @Override
                      public Integer get() {
                          int t = new Random().nextInt(3);
                          try {
                              TimeUnit.SECONDS.sleep(t);
                          } catch (InterruptedException e) {
                              e.printStackTrace();
                          }
                          System.out.println("f2="+t);
                          return t;
                      }
                  });
                  f1.acceptEither(f2, new Consumer<Integer>() {
                      @Override
                      public void accept(Integer t) {
                          System.out.println(t);
                      }
                  });
                  f1.get();
                  f2.get();
              }
              /**
               * runAfterEither 方法
               * 两个CompletionStage,任何一个完成了都会执行下一步的操作(Runnable)
               *
               * @param
               * @throws Exception
               */
              private static void runAfterEither() throws Exception {
                  CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
                      @Override
                      public Integer get() {
                          int t = new Random().nextInt(3);
                          System.out.println("f1=" + t);
                          return t;
                      }
                  });
                  CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
                      @Override
                      public Integer get() {
                          int t = new Random().nextInt(3);
                          System.out.println("f2=" + t);
                          return t;
                      }
                  });
                  f1.runAfterEither(f2, new Runnable() {
                      @Override
                      public void run() {
                          System.out.println("上面有一个已经完成了。");
                      }
                  });
                  System.out.println("f1:" + f1.get());
                  System.out.println("f2:" + f2.get());
              }
              /**
               * runAfterBoth
               * 两个CompletionStage,都完成了计算才会执行下一步的操作(Runnable)
               *
               * @param
               * @throws Exception
               */
              private static void runAfterBoth() throws Exception {
                  CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
                      @Override
                      public Integer get() {
                          int t = new Random().nextInt(3);
                          System.out.println("f1=" + t);
                          return t;
                      }
                  });
                  CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
                      @Override
                      public Integer get() {
                          int t = new Random().nextInt(3);
                          System.out.println("f2=" + t);
                          return t;
                      }
                  });
                  f1.runAfterBoth(f2, new Runnable() {
                      @Override
                      public void run() {
                          System.out.println("上面两个任务都执行完成了。");
                      }
                  });
              }
              /**
               * thenCompose 方法
               * thenCompose 方法允许你对两个 CompletionStage 进行流水线操作,第一个操作完成时,将其结果作为参数传递给第二个操作。
               *
               * @param
               * @throws Exception
               */
              private static void thenCompose() throws Exception {
                  CompletableFuture<Integer> f = CompletableFuture.supplyAsync(new Supplier<Integer>() {
                      @Override
                      public Integer get() {
                          int t = new Random().nextInt(3);
                          System.out.println("t1=" + t);
                          return t;
                      }
                  }).thenCompose(new Function<Integer, CompletionStage<Integer>>() {
                      @Override
                      public CompletionStage<Integer> apply(Integer param) {
                          return CompletableFuture.supplyAsync(new Supplier<Integer>() {
                              @Override
                              public Integer get() {
                                  int t = param * 2;
                                  System.out.println("t2=" + t);
                                  return t;
                              }
                          });
                      }
                  });
                  System.out.println("thenCompose result : " + f.get());
              }
              /**
               * allOf() 方法
               *
               * @param
               * @throws Exception
               */
              static void futureTest() {
                  CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
                      try {
                          Thread.sleep(10);
                      } catch (InterruptedException e) {
                          e.printStackTrace();
                      }
                      System.out.println("future1 finished!");
                      return "future1 finished!";
                  });
                  CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
                      System.out.println("future2 finished!");
                      return "future2 finished!";
                  });
                  //阻塞,直到所有任务结束。
                  System.out.println(System.currentTimeMillis() + ":阻塞");
                  CompletableFuture.allOf(future1, future2).join();
                  System.out.println(System.currentTimeMillis() + ":阻塞结束");
                  System.out.println("future1: " + future1.isDone() + " future2: " + future2.isDone());
              }
              static String queryCode(String name) {
                  try {
                      Thread.sleep(100);
                  } catch (InterruptedException e) {
                  }
                  return "601857";
              }
              static Double fetchPrice(String code) {
                  try {
                      Thread.sleep(100);
                  } catch (InterruptedException e) {
                  }
                  return 5 + Math.random() * 20;
              }
              public static void main(String[] args) throws Exception {
          //        Thread.sleep(4000);
          //        WelfareImageController.thenAcceptBothNew();
                  // 第一个任务:
                  CompletableFuture<String> cfQuery = CompletableFuture.supplyAsync(() -> {
                      return queryCode("中国石油");
                  });
                  // cfQuery成功后继续执行下一个任务:
                  CompletableFuture<Double> cfFetch = cfQuery.thenApplyAsync((code) -> {
                      return fetchPrice(code);
                  });
                  // cfFetch成功后打印结果:
                  cfFetch.thenAccept((result) -> {
                      System.out.println("price: " + result);
                  });
                  // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
                  Thread.sleep(1000);
              }
          }

          image.gif

          参考:CompletableFuture 使用详解 - 简书

          java8 CompletableFuture,allOf多实例返回 - 简书

          CompletableFuture基本用法 - 废物大师兄 - 博客园

          JDK1.8新特性CompletableFuture总结_finalheart的博客-CSDN博客_completablefuture

          目录
          相关文章
          |
          机器学习/深度学习 存储 自然语言处理
          简单聊一聊大模型微调技术-LoRA
          LoRA(Low-Rank Adaptation)是一种用于减少大模型微调中参数数量和计算资源的技术。通过引入低秩分解,LoRA 仅更新少量参数,从而显著降低显存消耗和计算需求。适用于大规模预训练模型的微调、跨领域迁移学习、低资源设备部署和多任务学习等场景。例如,在微调 BERT 模型时,LoRA 可以仅调整约 0.1% 的参数,保持与全量微调相近的性能。
          2256 0
          异步任务编排神器CompletableFuture
          【10月更文挑战第10天】CompletableFuture是JDK8并发包中引入的强大工具,用于处理复杂的异步任务编排。它提供了丰富的API,支持任务的串行、并行、组合及异常处理,适用于需要高效管理和协调多个异步操作的场景。例如,网页加载时需从多个服务异步获取数据,CompletableFuture可以有效提升性能和响应速度。使用时应注意异常处理和合理选择线程池,以确保程序稳定性和效率。
          异步任务编排神器CompletableFuture
          |
          8月前
          |
          人工智能 JavaScript 开发工具
          MCP详解:背景、架构与应用
          模型上下文协议(MCP)是由Anthropic提出的开源标准,旨在解决大语言模型与外部数据源和工具集成的难题。作为AI领域的“USB-C接口”,MCP通过标准化、双向通信通道连接模型与外部服务,支持资源访问、工具调用及提示模板交互。其架构基于客户端-服务器模型,提供Python、TypeScript等多语言SDK,方便开发者快速构建服务。MCP已广泛应用于文件系统、数据库、网页浏览等领域,并被阿里云百炼平台引入,助力快速搭建智能助手。未来,MCP有望成为连接大模型与现实世界的通用标准,推动AI生态繁荣发展。
          7087 66
          |
          10月前
          |
          人工智能 资源调度 自然语言处理
          钉钉项目 Teambition AI 能力重塑项目管理100种可能!
          钉钉项目Teambition AI迎来重磅升级,通义千问与DeepSeek两大模型助力AI项目管理。从项目规划、任务创建到执行建议、字段管理,再到周报总结和数据分析,Teambition AI贯穿项目全流程,重塑项目管理100种可能。AI技术赋能项目管理智能化,提升团队协作效率,确保项目进度精准把控,让任务分配、资源调度和风险管理更加轻松高效。
          钉钉项目 Teambition AI 能力重塑项目管理100种可能!
          |
          Java 调度
          利用 XXL-JOB 实现灵活控制的分片处理
          本文讲述了一种利用 XXL-JOB 来进行分片任务处理的方法,另外加入对执行节点数的灵活控制。
          614 2
          |
          SQL Rust Java
          Java 8 异步编程利器:CompletableFuture
          Java 8引入了CompletableFuture,这是一个强大的异步编程工具,增强了Future的功能,支持链式调用、任务组合与异常处理等特性,使异步编程更加直观和高效。本文详细介绍了CompletableFuture的基本概念、用法及高级功能,帮助开发者更好地掌握这一工具。
          228 0
          |
          SQL 数据库 HIVE
          hive数仓 ods层增量数据导入
          根据业务需求,当表数据量超过10万条时采用增量数据导入,否则全量导入。增量导入基于`create_date`和`modify_date`字段进行,并确保时间字段已建立索引以提升查询效率。避免在索引字段上执行函数操作。创建增量表和全量表,并按日期进行分区。首次导入全量数据,后续每日新增或变更数据保存在增量表中,通过全量表与增量表的合并保持数据一致性。
          525 13
          |
          JSON 安全 API
          抖音店铺商品信息的 API
          抖音店铺商品信息的 API 主要用于获取商品的详细信息,包括基本信息、属性、库存、评价、推广信息等。开发者需注册账号、申请权限、阅读文档、发送请求并处理响应。此外,还提供商品搜索和管理接口,帮助商家优化商品展示和管理订单,提高运营效率。使用时需遵守平台规则,确保数据安全和合法性。
          |
          存储 算法 Java
          Java8 CompletableFuture:异步编程的瑞士军刀
          Java8 CompletableFuture:异步编程的瑞士军刀
          394 2
          |
          消息中间件 运维 监控
          阿里云中间件、aPaaS 产品与解决方案介绍|学习笔记
          快速学习阿里云中间件、aPaaS 产品与解决方案介绍
          1213 94
          阿里云中间件、aPaaS 产品与解决方案介绍|学习笔记