CompletableFuture使用详解

简介: CompletableFuture使用详解

 目录

异步计算

Future 接口的局限性

CompletionStage

CompletableFuture

下面是CompletableFuture的一些常用的方法:


异步计算

    • 所谓异步调用其实就是实现一个可无需等待被调用函数的返回值而让操作继续运行的方法。在 Java 语言中,简单的讲就是另启一个线程来完成调用中的部分计算,使调用继续运行或返回,而不需要等待计算结果。但调用者仍需要取线程的计算结果。
    • JDK5新增了Future接口,用于描述一个异步计算的结果。虽然 Future 以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的 CPU 资源,而且也不能及时地得到计算结果。
    • 以前我们获取一个异步任务的结果可能是这样写的
    • image.gif编辑

    Future 接口的局限性

    Future接口可以构建异步应用,但依然有其局限性。它很难直接表述多个Future 结果之间的依赖性。实际开发中,我们经常需要达成以下目的:

      1. 将多个异步计算的结果合并成一个
      2. 等待Future集合中的所有任务都完成
      3. Future完成事件(即,任务完成以后触发执行动作)
      4. 。。。

      CompletionStage

        • CompletionStage代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段
        • 一个阶段的计算执行可以是一个Function,Consumer或者Runnable。比如:stage.thenApply(x -> square(x)).thenAccept(x -> System.out.print(x)).thenRun(() -> System.out.println())
        • 一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发

        CompletableFuture

          • 在Java8中,CompletableFuture提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合 CompletableFuture 的方法。
          • 它可能代表一个明确完成的Future,也有可能代表一个完成阶段( CompletionStage ),它支持在计算完成以后触发一些函数或执行某些动作。
          • 它实现了Future和CompletionStage接口

          image.gif编辑

          在JAVA中你可以理解为可以使用它来创建一个异步线程处理你想要处理的内容,并且这个异步线程是非阻塞的。同时你要注意的是,每一个任务都可以是一个Feature 或者是 CompletionStage 都是后台守护线程(Daemon),也就是说,随着主线程的死亡,守护线程不管有没有完成任务,也会结束。

          下面是CompletableFuture的一些常用的方法:

          public class WelfareImageController {
              /**
               * runAsync 无返回值
               *
               * @throws Exception
               */
              public static void runAsync() throws Exception {
                  CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                      try {
                          TimeUnit.SECONDS.sleep(1);
                      } catch (InterruptedException e) {
                      }
                      System.out.println("run end ...");
                  });
                  future.get();
              }
              /**
               * supplyAsync
               * 有返回值
               *
               * @throws Exception
               */
              public static void supplyAsync() throws Exception {
                  Instant start = Instant.now();
                  CompletableFuture<Long> future = CompletableFuture.supplyAsync(() -> {
                      try {
                          TimeUnit.SECONDS.sleep(1);
                      } catch (InterruptedException e) {
                      }
                      System.out.println("run end ...");
                      return System.currentTimeMillis();
                  });
                  long time = future.get();
                  System.out.println("time = " + time);
                  Instant end = Instant.now();
                  System.out.println("总耗时:" + Duration.between(start, end).getSeconds());
              }
              /**
               * whenComplete 方法
               * 异步线程任务执行完后调用 whenComplete 方法
               *
               * @throws Exception
               */
              public static void whenComplete() throws Exception {
                  CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                      try {
                          TimeUnit.SECONDS.sleep(1);
                      } catch (InterruptedException e) {
                      }
                      if (new Random().nextInt() % 2 >= 0) {
                          int i = 12 / 0;
                      }
                      System.out.println("run end ...");
                  });
                  future.whenComplete(new BiConsumer<Void, Throwable>() {
                      @Override
                      public void accept(Void t, Throwable action) {
                          System.out.println("执行完成!");
                      }
                  });
                  // runAsync 执行异常的时候就执行此方法
                  future.exceptionally(new Function<Throwable, Void>() {
                      @Override
                      public Void apply(Throwable t) {
                          System.out.println("执行失败!" + t.getMessage());
                          return null;
                      }
                  });
                  TimeUnit.SECONDS.sleep(2);
              }
              /**
               * thenApply 方法
               * 异步线程执行完返回的结果作为下个任务的参数
               * 注意: 任务出现异常则不执行 thenApply 方法
               *
               * @throws Exception
               */
              private static void thenApplyOld() throws Exception {
                  CompletableFuture<Long> future = CompletableFuture.supplyAsync(new Supplier<Long>() {
                      @Override
                      public Long get() {
                          long result = new Random().nextInt(100);
                          System.out.println("result1=" + result);
                          return result;
                      }
                  }).thenApply(new Function<Long, Long>() {
                      @Override
                      public Long apply(Long t) {
                          long result = t * 5;
                          System.out.println("result2=" + result);
                          return result;
                      }
                  });
                  long result = future.get();
                  System.out.println(result);
              }
              /**
               * thenApply 方法
               * 上一个方法的简化
               *
               * @throws Exception
               */
              private static void thenApply() throws Exception {
                  CompletableFuture<Long> future = CompletableFuture.supplyAsync(() -> {
                      long result = new Random().nextInt(100);
                      System.out.println("result1=" + result);
                      return result;
                  }).thenApply(t -> {
                      long result = t * 5;
                      System.out.println("result2=" + result);
                      return result;
                  });
                  long result = future.get();
                  System.out.println(result);
              }
              /**
               * handle 方法
               * handle 方法和 thenApply 方法处理方式基本一样。
               * 不同的是 handle 是在任务完成后再执行,还可以处理异常的任务。
               * thenApply 只可以执行正常的任务,任务出现异常则不执行 thenApply 方法。
               *
               * @throws Exception
               */
              public static void handle() throws Exception {
                  CompletableFuture<Integer> future = CompletableFuture.supplyAsync(new Supplier<Integer>() {
                      @Override
                      public Integer get() {
                          int i = 10 / 0;
                          return new Random().nextInt(10);
                      }
                  }).handle(new BiFunction<Integer, Throwable, Integer>() {
                      @Override
                      public Integer apply(Integer param, Throwable throwable) {
                          int result = -1;
                          if (throwable == null) {
                              result = param * 2;
                          } else {
                              System.out.println(throwable.getMessage());
                          }
                          return result;
                      }
                  });
                  System.out.println(future.get());
              }
              /**
               * thenAccept
               * 接收异步任务的返回结果进行处理 无返回值
               *
               * @param
               * @throws Exception
               */
              public static void thenAccept() throws Exception {
                  CompletableFuture<Void> future = CompletableFuture.supplyAsync(new Supplier<Integer>() {
                      @Override
                      public Integer get() {
                          return new Random().nextInt(10);
                      }
                  }).thenAccept(integer -> {
                      System.out.println(integer);
                  });
                  future.get();
              }
              /**
               * thenRun
               * 与上面的thenAccept 不同的是 不关心任务的处理结果 只要任务处理完就执行此方法
               * 注意: 任务遇到未抛出的异常则不会执行
               *
               * @param
               * @throws Exception
               */
              public static void thenRun() throws Exception {
                  CompletableFuture<Void> future = CompletableFuture.supplyAsync(new Supplier<Integer>() {
                      @Override
                      public Integer get() {
                          return new Random().nextInt(10);
                      }
                  }).thenRun(() -> {
                      System.out.println("thenRun ...");
                  });
                  future.get();
              }
              /**
               * thenCombine 合并任务
               * thenCombine 会把 两个 CompletionStage 的任务都执行完成后,把两个任务的结果一块交给 thenCombine 来处理。
               * 这里有点像stream流的reduce
               *
               * @param
               * @throws Exception
               */
              private static void thenCombine() throws Exception {
                  CompletableFuture<String> future1 = CompletableFuture.supplyAsync(new Supplier<String>() {
                      @Override
                      public String get() {
                          return "hello";
                      }
                  });
                  CompletableFuture<String> future2 = CompletableFuture.supplyAsync(new Supplier<String>() {
                      @Override
                      public String get() {
                          return "hello";
                      }
                  });
                  CompletableFuture<String> result = future1.thenCombine(future2, new BiFunction<String, String, String>() {
                      @Override
                      public String apply(String t, String u) {
                          return t + " " + u;
                      }
                  });
                  System.out.println(result.get());
              }
              /**
               * reduce 并行计算:
               * 解释一下: 后一个 Integer::sum 就是对前面 一个Integer::sum 阶段性累加结果进行合并
               *
               * @param
               * @throws Exception
               */
              public static void reduceTest() {
                  List<Integer> integers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
                  Integer reduce = integers.parallelStream().reduce(0, Integer::sum, Integer::sum);
                  System.out.println(reduce);
              }
              /**
               * thenAcceptBoth
               * 当两个CompletionStage都执行完成后,把结果一块交给thenAcceptBoth来进行消耗
               *
               * @param
               * @throws Exception
               */
              private static void thenAcceptBoth() throws Exception {
                  CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
                      @Override
                      public Integer get() {
                          int t = new Random().nextInt(3);
                          System.out.println("f1=" + t);
                          return t;
                      }
                  });
                  CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
                      @Override
                      public Integer get() {
                          int t = new Random().nextInt(3);
                          System.out.println("f2=" + t);
                          return t;
                      }
                  });
                  f1.thenAcceptBoth(f2, new BiConsumer<Integer, Integer>() {
                      @Override
                      public void accept(Integer t, Integer u) {
                          System.out.println("f1=" + t + ";f2=" + u + ";");
                      }
                  });
              }
              private static void thenAcceptBothNew() throws Exception {
                  CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
                      @Override
                      public Integer get() {
                          int t = new Random().nextInt(3);
                          try {
                              TimeUnit.SECONDS.sleep(t);
                          } catch (InterruptedException e) {
                              e.printStackTrace();
                          }
                          System.out.println("f1="+t);
                          return t;
                      }
                  });
                  CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
                      @Override
                      public Integer get() {
                          int t = new Random().nextInt(3);
                          try {
                              TimeUnit.SECONDS.sleep(t);
                          } catch (InterruptedException e) {
                              e.printStackTrace();
                          }
                          System.out.println("f2="+t);
                          return t;
                      }
                  });
                  f1.thenAcceptBoth(f2, new BiConsumer<Integer, Integer>() {
                      @Override
                      public void accept(Integer t, Integer u) {
                          System.out.println("f1="+t+";f2="+u+";");
                      }
                  });
              }
              /**
               * applyToEither 方法
               * 两个CompletionStage,谁执行返回的结果快,我就用那个CompletionStage的结果进行下一步的转化操作。
               * 慢的那个任务也会执行 但不会被applyToEither调用返回值做任何处理
               *
               * @param
               * @throws Exception
               */
              private static void applyToEither() throws Exception {
                  CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
                      @Override
                      public Integer get() {
                          int t = new Random().nextInt(3);
                          try {
                              TimeUnit.SECONDS.sleep(t);
                          } catch (InterruptedException e) {
                              e.printStackTrace();
                          }
                          System.out.println("f1=" + t);
                          return t;
                      }
                  });
                  CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
                      @Override
                      public Integer get() {
                          int t = new Random().nextInt(3);
                          try {
                              TimeUnit.SECONDS.sleep(t);
                          } catch (InterruptedException e) {
                              e.printStackTrace();
                          }
                          System.out.println("f2=" + t);
                          return t;
                      }
                  });
                  CompletableFuture<Integer> result = f1.applyToEither(f2, new Function<Integer, Integer>() {
                      @Override
                      public Integer apply(Integer t) {
                          System.out.println(t);
                          return t * 2;
                      }
                  });
                  System.out.println(result.get());
              }
              /**
               * acceptEither 方法
               * 两个CompletionStage,谁执行返回的结果快,我就用那个CompletionStage的结果进行下一步的消耗操作。
               *
               * @param
               * @throws Exception
               */
              private static void acceptEither() throws Exception {
                  CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
                      @Override
                      public Integer get() {
                          int t = new Random().nextInt(3);
                          System.out.println("f1=" + t);
                          return t;
                      }
                  });
                  CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
                      @Override
                      public Integer get() {
                          int t = new Random().nextInt(3);
                          System.out.println("f2=" + t);
                          return t;
                      }
                  });
                  f1.acceptEither(f2, new Consumer<Integer>() {
                      @Override
                      public void accept(Integer t) {
                          System.out.println(t);
                      }
                  });
              }
              private static void acceptEitherNew() throws Exception {
                  CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
                      @Override
                      public Integer get() {
                          int t = new Random().nextInt(3);
                          try {
                              TimeUnit.SECONDS.sleep(t);
                          } catch (InterruptedException e) {
                              e.printStackTrace();
                          }
                          System.out.println("f1="+t);
                          return t;
                      }
                  });
                  CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
                      @Override
                      public Integer get() {
                          int t = new Random().nextInt(3);
                          try {
                              TimeUnit.SECONDS.sleep(t);
                          } catch (InterruptedException e) {
                              e.printStackTrace();
                          }
                          System.out.println("f2="+t);
                          return t;
                      }
                  });
                  f1.acceptEither(f2, new Consumer<Integer>() {
                      @Override
                      public void accept(Integer t) {
                          System.out.println(t);
                      }
                  });
                  f1.get();
                  f2.get();
              }
              /**
               * runAfterEither 方法
               * 两个CompletionStage,任何一个完成了都会执行下一步的操作(Runnable)
               *
               * @param
               * @throws Exception
               */
              private static void runAfterEither() throws Exception {
                  CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
                      @Override
                      public Integer get() {
                          int t = new Random().nextInt(3);
                          System.out.println("f1=" + t);
                          return t;
                      }
                  });
                  CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
                      @Override
                      public Integer get() {
                          int t = new Random().nextInt(3);
                          System.out.println("f2=" + t);
                          return t;
                      }
                  });
                  f1.runAfterEither(f2, new Runnable() {
                      @Override
                      public void run() {
                          System.out.println("上面有一个已经完成了。");
                      }
                  });
                  System.out.println("f1:" + f1.get());
                  System.out.println("f2:" + f2.get());
              }
              /**
               * runAfterBoth
               * 两个CompletionStage,都完成了计算才会执行下一步的操作(Runnable)
               *
               * @param
               * @throws Exception
               */
              private static void runAfterBoth() throws Exception {
                  CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
                      @Override
                      public Integer get() {
                          int t = new Random().nextInt(3);
                          System.out.println("f1=" + t);
                          return t;
                      }
                  });
                  CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
                      @Override
                      public Integer get() {
                          int t = new Random().nextInt(3);
                          System.out.println("f2=" + t);
                          return t;
                      }
                  });
                  f1.runAfterBoth(f2, new Runnable() {
                      @Override
                      public void run() {
                          System.out.println("上面两个任务都执行完成了。");
                      }
                  });
              }
              /**
               * thenCompose 方法
               * thenCompose 方法允许你对两个 CompletionStage 进行流水线操作,第一个操作完成时,将其结果作为参数传递给第二个操作。
               *
               * @param
               * @throws Exception
               */
              private static void thenCompose() throws Exception {
                  CompletableFuture<Integer> f = CompletableFuture.supplyAsync(new Supplier<Integer>() {
                      @Override
                      public Integer get() {
                          int t = new Random().nextInt(3);
                          System.out.println("t1=" + t);
                          return t;
                      }
                  }).thenCompose(new Function<Integer, CompletionStage<Integer>>() {
                      @Override
                      public CompletionStage<Integer> apply(Integer param) {
                          return CompletableFuture.supplyAsync(new Supplier<Integer>() {
                              @Override
                              public Integer get() {
                                  int t = param * 2;
                                  System.out.println("t2=" + t);
                                  return t;
                              }
                          });
                      }
                  });
                  System.out.println("thenCompose result : " + f.get());
              }
              /**
               * allOf() 方法
               *
               * @param
               * @throws Exception
               */
              static void futureTest() {
                  CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
                      try {
                          Thread.sleep(10);
                      } catch (InterruptedException e) {
                          e.printStackTrace();
                      }
                      System.out.println("future1 finished!");
                      return "future1 finished!";
                  });
                  CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
                      System.out.println("future2 finished!");
                      return "future2 finished!";
                  });
                  //阻塞,直到所有任务结束。
                  System.out.println(System.currentTimeMillis() + ":阻塞");
                  CompletableFuture.allOf(future1, future2).join();
                  System.out.println(System.currentTimeMillis() + ":阻塞结束");
                  System.out.println("future1: " + future1.isDone() + " future2: " + future2.isDone());
              }
              static String queryCode(String name) {
                  try {
                      Thread.sleep(100);
                  } catch (InterruptedException e) {
                  }
                  return "601857";
              }
              static Double fetchPrice(String code) {
                  try {
                      Thread.sleep(100);
                  } catch (InterruptedException e) {
                  }
                  return 5 + Math.random() * 20;
              }
              public static void main(String[] args) throws Exception {
          //        Thread.sleep(4000);
          //        WelfareImageController.thenAcceptBothNew();
                  // 第一个任务:
                  CompletableFuture<String> cfQuery = CompletableFuture.supplyAsync(() -> {
                      return queryCode("中国石油");
                  });
                  // cfQuery成功后继续执行下一个任务:
                  CompletableFuture<Double> cfFetch = cfQuery.thenApplyAsync((code) -> {
                      return fetchPrice(code);
                  });
                  // cfFetch成功后打印结果:
                  cfFetch.thenAccept((result) -> {
                      System.out.println("price: " + result);
                  });
                  // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
                  Thread.sleep(1000);
              }
          }

          image.gif

          参考:CompletableFuture 使用详解 - 简书

          java8 CompletableFuture,allOf多实例返回 - 简书

          CompletableFuture基本用法 - 废物大师兄 - 博客园

          JDK1.8新特性CompletableFuture总结_finalheart的博客-CSDN博客_completablefuture

          目录
          相关文章
          |
          10天前
          |
          Java API
          java多线程之FutureTask、Future、CompletableFuture
          java多线程之FutureTask、Future、CompletableFuture
          |
          5月前
          CompletableFuture
          CompletableFuture
          23 0
          |
          6月前
          |
          Java 数据处理 数据库
          CompletableFuture 使用
          CompletableFuture 使用
          34 0
          |
          7月前
          |
          消息中间件 Java 中间件
          Future and CompletableFuture
          Future代表异步执行的结果,也就是说异步执行完毕后,结果保存在Future里, 我们在使用线程池submit()时需要传入Callable接口,线程池的返回值为一个Future,而Future则保存了执行的结果 ,可通过Future的get()方法取出结果,如果线程池使用的是execute(),则传入的是Runnable接口 无返回值。
          36 0
          |
          7月前
          |
          Java 调度
          并发编程——Future & CompletableFuture
          Java创建线程的方式,一般常用的是Thread,Runnable。如果需要当前处理的任务有返回结果的话,需要使用Callable。Callable运行需要配合Future。
          25 0
          |
          8月前
          |
          Java Linux
          JUC--CompletableFuture上
          Future接口(FutureTask实现类)定义了操作异步任务执行一些方法,如获取异步任务的执行结果,取消任务带带执行,判断任务是否被取消,判断任务执行是否完毕等。
          |
          8月前
          JUC--CompletableFuture下
          简单介绍CompletableFuture
          |
          8月前
          |
          Java
          CompletableFuture总结和实践
          CompletableFuture被设计在Java中进行异步编程。异步编程意味着在主线程之外创建一个独立的线程,与主线程分隔开,并在上面运行一个非阻塞的任务,然后通知主线程进展,成功或者失败。
          217 0
          Zp
          【CompletableFuture】CompletableFuture中join()和get()方法的区别
          【CompletableFuture】CompletableFuture中join()和get()方法的区别
          Zp
          502 0
          |
          Java 测试技术
          CompletableFuture使用详解
          CompletableFuture是jdk8的新特性。CompletableFuture实现了CompletionStage接口和Future接口
          216 0
          CompletableFuture使用详解