简述CompletableFuture异步任务编排(下)

简介: 简述CompletableFuture异步任务编排
  • 多任务结果合并

image.png


示例代码如下:

CompletableFuture future1 = CompletableFuture
        // 任务1
        .supplyAsync(
            () -> {
              Thread currentThread = Thread.currentThread();
              String ThreadName = currentThread.getName();
              String taskName = "task1";
              System.out.println(ThreadName + "开始执行任务:" + taskName);
              System.out.println("正在执行任务" + taskName);
              System.out.println(taskName + "执行结束");
              return taskName;
            }, THREAD_POOL_EXECUTOR);
    CompletableFuture future2 = CompletableFuture
        // 任务2
        .supplyAsync(
            () -> {
              Thread currentThread = Thread.currentThread();
              String ThreadName = currentThread.getName();
              String taskName = "task2";
              System.out.println(ThreadName + "开始执行任务:" + taskName);
              System.out.println("正在执行任务" + taskName);
              System.out.println(taskName + "执行结束");
              return taskName;
            }, THREAD_POOL_EXECUTOR);
    CompletableFuture future3 = CompletableFuture
        // 任务3
        .supplyAsync(
            () -> {
              Thread currentThread = Thread.currentThread();
              String ThreadName = currentThread.getName();
              String taskName = "task3";
              System.out.println(ThreadName + "开始执行任务:" + taskName);
              System.out.println("正在执行任务" + taskName);
              System.out.println(taskName + "执行结束");
              return taskName;
            }, THREAD_POOL_EXECUTOR);
    CompletableFuture[] futures = new CompletableFuture[]{future1, future2, future3};
    CompletableFuture.allOf(futures)
        // 任务4
        .whenCompleteAsync(
            (v, e) -> {
              List<Object> values = new ArrayList<>();
              for (CompletableFuture future : futures) {
                try {
                  values.add(future.get());
                } catch (Exception ex) {
                }
              }
              Thread currentThread = Thread.currentThread();
              String ThreadName = currentThread.getName();
              String taskName = "task4";
              System.out.println(ThreadName + "开始执行任务:" + taskName);
              System.out.println("前面任务的处理结果:" + values);
              System.out.println("正在执行任务" + taskName);
              System.out.println(taskName + "执行结束");
            }, THREAD_POOL_EXECUTOR);
复制代码

执行结果如下:

completable-future-test-Thread-3开始执行任务:task2
completable-future-test-Thread-4开始执行任务:task3
completable-future-test-Thread-2开始执行任务:task1
正在执行任务task2
正在执行任务task3
正在执行任务task1
task2执行结束
task3执行结束
task1执行结束
completable-future-test-Thread-2开始执行任务:task4
前面任务的处理结果:[task1, task2, task3]
正在执行任务task4
task4执行结束
复制代码

之所以最后任务4的线程是completable-future-test-Thread-2,那是因为线程池的核心线程数设置为3,线程数设置高一点就会创建新的线程处理;

从上述代码示例中,我们可以收获到另一个知识点:allOf(),它的作用是要求所有的任务全部完成才能执行后面的任务。

任一任务完成

在一批任务中,只要有一个任务完成,那么就可以向后继续执行其他任务。

为了代码演示无异议,后续代码中,我们把线程数提升到4。

示例代码如下:

CompletableFuture future1 = CompletableFuture
        // 任务1
        .supplyAsync(
            () -> {
              Thread currentThread = Thread.currentThread();
              String ThreadName = currentThread.getName();
              String taskName = "task1";
              System.out.println(ThreadName + "开始执行任务:" + taskName);
              System.out.println("正在执行任务" + taskName);
              System.out.println(taskName + "执行结束");
              return taskName;
            }, THREAD_POOL_EXECUTOR);
    CompletableFuture future2 = CompletableFuture
        // 任务2
        .supplyAsync(
            () -> {
              Thread currentThread = Thread.currentThread();
              String ThreadName = currentThread.getName();
              String taskName = "task2";
              System.out.println(ThreadName + "开始执行任务:" + taskName);
              System.out.println("正在执行任务" + taskName);
              System.out.println(taskName + "执行结束");
              return taskName;
            }, THREAD_POOL_EXECUTOR);
    CompletableFuture future3 = CompletableFuture
        // 任务3
        .supplyAsync(
            () -> {
              Thread currentThread = Thread.currentThread();
              String ThreadName = currentThread.getName();
              String taskName = "task3";
              System.out.println(ThreadName + "开始执行任务:" + taskName);
              System.out.println("正在执行任务" + taskName);
              System.out.println(taskName + "执行结束");
              return taskName;
            }, THREAD_POOL_EXECUTOR);
    CompletableFuture.anyOf(future1, future2, future3)
        .thenApplyAsync((taskResult) -> {
          Thread currentThread = Thread.currentThread();
          String ThreadName = currentThread.getName();
          String taskName = "task4";
          System.out.println(ThreadName + "开始执行任务:" + taskName);
          System.out.println("前面任务的处理结果:" + taskResult);
          System.out.println("正在执行任务" + taskName);
          System.out.println(taskName + "执行结束");
          return taskName;
        }, THREAD_POOL_EXECUTOR);
复制代码

执行结果如下:

completable-future-test-Thread-2开始执行任务:task1
completable-future-test-Thread-4开始执行任务:task3
completable-future-test-Thread-3开始执行任务:task2
正在执行任务task3
正在执行任务task2
正在执行任务task1
task1执行结束
task3执行结束
task2执行结束
completable-future-test-Thread-5开始执行任务:task4
前面任务的处理结果:task1
正在执行任务task4
task4执行结束
复制代码

可以看到,任务1第一个结束,所以任务4中接收到的执行结果就是任务1的返回值。

快速失败

在一批任务当中,只要有任意一个任务执行产生异常了,那么就直接结束;否则就要等待所有任务成功执行完毕。

示例代码如下:

CompletableFuture future1 = CompletableFuture
        // 任务1
        .supplyAsync(
            () -> {
              Thread currentThread = Thread.currentThread();
              String ThreadName = currentThread.getName();
              String taskName = "task1";
              System.out.println(ThreadName + "开始执行任务:" + taskName);
              System.out.println("正在执行任务" + taskName);
              System.out.println(taskName + "执行结束");
              return taskName;
            }, THREAD_POOL_EXECUTOR);
    CompletableFuture future2 = CompletableFuture
        // 任务2
        .supplyAsync(
            () -> {
              Thread currentThread = Thread.currentThread();
              String ThreadName = currentThread.getName();
              String taskName = "task2";
              System.out.println(ThreadName + "开始执行任务:" + taskName);
              System.out.println("正在执行任务" + taskName);
              System.out.println(taskName + "执行结束");
              throw new RuntimeException("任务2异常!");
            }, THREAD_POOL_EXECUTOR);
    CompletableFuture future3 = CompletableFuture
        // 任务3
        .supplyAsync(
            () -> {
              Thread currentThread = Thread.currentThread();
              String ThreadName = currentThread.getName();
              String taskName = "task3";
              System.out.println(ThreadName + "开始执行任务:" + taskName);
              System.out.println("正在执行任务" + taskName);
              System.out.println(taskName + "执行结束");
              throw new RuntimeException("任务3异常!");
            }, THREAD_POOL_EXECUTOR);
    CompletableFuture[] futures = new CompletableFuture[]{future1, future2, future3};
    CompletableFuture allCompletableFuture = CompletableFuture.allOf(futures);
    // 创建一个任务来监听异常
    CompletableFuture<?> anyException = new CompletableFuture<>();
    for (CompletableFuture<?> completableFuture : futures) {
      completableFuture.exceptionally((t) -> {
        // 任何一个任务异常都会让anyException任务完成
        anyException.completeExceptionally(t);
        return null;
      });
    }
    // 要么allCompletableFuture全部成功,要么一个出现异常就结束任务
    CompletableFuture.anyOf(allCompletableFuture, anyException)
        .whenComplete((value, exception) -> {
          if (Objects.nonNull(exception)) {
            System.out.println("产生异常,提前结束!");
            exception.printStackTrace();
            return;
          }
          System.out.println("所有任务正常完成!");
        });
复制代码

执行结果如下:

completable-future-test-Thread-2开始执行任务:task1
completable-future-test-Thread-3开始执行任务:task2
completable-future-test-Thread-4开始执行任务:task3
正在执行任务task2
正在执行任务task3
正在执行任务task1
task2执行结束
task1执行结束
task3执行结束
产生异常,提前结束!
java.util.concurrent.CompletionException: java.lang.RuntimeException: 任务2异常!
  at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
  at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)
  at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702)
  at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
  at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
  at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.RuntimeException: 任务2异常!
  at com.example.awesomerocketmq.completable.CompletableFutureTest.lambda$t$1(CompletableFutureTest.java:53)
  at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
  ... 3 more
复制代码

CompletableFuture没有现成的api实现快速失败的功能,所以我们只能结合allOf()anyOf()来逻辑来自定义方法完成快速失败的逻辑;

1.我们需要额外创建一个CompletableFuture来监听所有的CompletableFuture,一旦其中一个CompletableFuture产生异常,我们就设置额外的CompletableFuture立即完成。

2.把所有的CompletableFuture和额外的CompletableFuture放在anyOf()方法中,这样一旦额外的CompletableFuture完成,说明产生异常了;否则就需要等待所有的CompletableFuture完成。

注意

  • 异常处理

最后需要注意的是,所有的CompletableFuture任务一定要加上异常处理:

CompletableFuture
        // 任务1
        .supplyAsync(
            () -> {
              Thread currentThread = Thread.currentThread();
              String ThreadName = currentThread.getName();
              String taskName = "task1";
              System.out.println(ThreadName + "开始执行任务:" + taskName);
              System.out.println("正在执行任务" + taskName);
              System.out.println(taskName + "执行结束");
              return taskName;
            }, THREAD_POOL_EXECUTOR)
        .whenComplete((v,e)->{
          if(Objects.nonNull(e)){
            // todo
            // 处理异常
          }
          if(Objects.nonNull(v)){
            // todo
          }
        });
复制代码

还可以通过另外两个方法处理:exceptionally()或者handle()

  • 自定义线程池

CompletableFuture默认的线程池是ForkJoinThreadPool,建议大家在使用的时候尽可能地使用自定义线程池,这样方便后续的代码优化以及相关的日志查看。


相关文章
|
1月前
|
Java API
异步任务编排神器CompletableFuture
【10月更文挑战第10天】CompletableFuture是JDK8并发包中引入的强大工具,用于处理复杂的异步任务编排。它提供了丰富的API,支持任务的串行、并行、组合及异常处理,适用于需要高效管理和协调多个异步操作的场景。例如,网页加载时需从多个服务异步获取数据,CompletableFuture可以有效提升性能和响应速度。使用时应注意异常处理和合理选择线程池,以确保程序稳定性和效率。
异步任务编排神器CompletableFuture
|
2月前
|
数据采集 JavaScript Java
CompletableFuture异步编排,你还不会?
本文介绍了同步与异步编程的概念,探讨了在复杂业务场景中使用异步编排的重要性。通过对比 `Future` 与 `CompletableFuture`,详细讲解了 `CompletableFuture` 的多种方法,如 `runAsync`、`supplyAsync`、`whenComplete`、`exceptionally` 等,并展示了如何通过 `CompletableFuture` 实现异步任务的组合与异常处理。最后,通过实战案例演示了如何利用线程池与 `CompletableFuture` 优化商品详情页的查询效率,显著减少响应时间。
CompletableFuture异步编排,你还不会?
|
3月前
|
Java 数据库
异步&线程池 CompletableFuture 异步编排 实战应用 【终结篇】
这篇文章通过一个电商商品详情页的实战案例,展示了如何使用`CompletableFuture`进行异步编排,以解决在不同数据库表中查询商品信息的问题,并提供了详细的代码实现和遇到问题(如图片未显示)的解决方案。
异步&线程池 CompletableFuture 异步编排 实战应用 【终结篇】
|
3月前
|
Java
异步&线程池 CompletableFuture 异步编排 【下篇】
这篇文章深入探讨了Java中的`CompletableFuture`类,解释了如何创建异步操作、使用计算完成时的回调方法、异常处理、串行化方法、任务组合以及多任务组合的使用方式,并通过代码示例展示了各种场景下的应用。
异步&线程池 CompletableFuture 异步编排 【下篇】
|
6月前
|
前端开发 Java API
Java并发基础:CompletableFuture全面解析
CompletableFuture类使得并发任务的处理变得简单而高效,通过简洁的API,开发者能轻松创建、组合和链式调用异步操作,无需关心底层线程管理,这不仅提升了程序的响应速度,还优化了资源利用率,让复杂的并发逻辑变得易于掌控。
191 1
Java并发基础:CompletableFuture全面解析
|
安全 Java
任务编排:CompletableFuture从入门到精通
最近遇到了一个业务场景,涉及到多数据源之间的请求的流程编排,正好看到了一篇某团介绍CompletableFuture原理和使用的技术文章,主要还是涉及使用层面。网上很多文章涉及原理的部分讲的不是特别详细且比较抽象。因为涉及到多线程的工具必须要理解原理,不然一旦遇到问题排查起来就只能凭玄学,正好借此梳理一下CompletableFuture的工作原理
349 0
|
6月前
|
Java
CompletableFuture 异步编排、案例及应用小案例1
CompletableFuture 异步编排、案例及应用小案例
164 0
|
6月前
|
Java
CompletableFuture 异步编排、案例及应用小案例2
CompletableFuture 异步编排、案例及应用小案例
78 0
|
存储 SpringCloudAlibaba Java
Java新特性:异步编排CompletableFuture
CompletableFuture由Java 8提供,是实现异步化的工具类,上手难度较低,且功能强大,支持通过函数式编程的方式对各类操作进行组合编排。 CompletableFuture实现了CompletionStage接口和Future接口,前者是对后者的一个扩展,增加了异步[回调](https://so.csdn.net/so/search?q=回调&spm=1001.2101.3001.7020)、流式处理、多个Future组合处理的能力,使Java在处理多任务的协同工作时更加顺畅便利。
364 1
Java新特性:异步编排CompletableFuture
|
Java
异步编程 - 06 基于JDK中的Future实现异步编程(中)_CompletableFuture源码解析
异步编程 - 06 基于JDK中的Future实现异步编程(中)_CompletableFuture源码解析
62 0