CompletableFuture 异步编排、案例及应用小案例2

简介: CompletableFuture 异步编排、案例及应用小案例

CompletableFuture 异步编排、案例及应用小案例1:https://developer.aliyun.com/article/1394503

3.3、thenApply和thenApplyAsync

image.png

thenApply 和 thenApplyAsync 让线程成为了一种串行化的关系,第一个任务执行完的返回值会作为第二个的任务的入参.

案例的话,比较简单.

 package com.nzc;
 ​
 import java.util.concurrent.*;
 ​
 /**
  * @description:
  * @author: Yihui Wang
  * @date: 2022年08月21日 16:32
  */
 public class ThenApplyAndAsyncDemo {
 ​
 ​
     public static ExecutorService executorService = new ThreadPoolExecutor(
             10,
             100,
             30L,
             TimeUnit.SECONDS,
             new LinkedBlockingQueue<Runnable>(100),
             Executors.defaultThreadFactory(),
             new ThreadPoolExecutor.DiscardOldestPolicy());
 ​
     /**
      * @param args
      */
     public static void main(String[] args) throws ExecutionException, InterruptedException {
         thenApply();
         thenApplyAsync();
     }
 ​
 ​
     /**
      * 线程串行化
      * 1、我进入商场
      * 2、找到了我要买的商品
      * 3、准备付款结账
      * 4、拿着东西回家!!!
      * 你会发现这是一步扣一步的在走,其实业务场景中也有很多这样的场景,希望大家在遇到的时候能够想到
      *
      * @return
      * @throws ExecutionException
      * @throws InterruptedException
      */
     public static String thenApply() throws ExecutionException, InterruptedException {
         System.out.println("主线程开始1");
 //        CompletableFuture<String> future =
 //                CompletableFuture.supplyAsync(() -> {
 //                    return "我进入商场, ";
 //                });
 //        CompletableFuture<String> future1 = future.thenApply(res -> {
 //            return res += "找到了我要买的商品,";
 //        });
 //        future.thenApply(res->{
 //            return  res+="准备付款结账,";
 //        }).thenApply(res->{
 //            return  res+="拿着东西回家!!!";
 //        });
 ​
         // 上面那种分开写和下面这种链式写法是一样的
         CompletableFuture<String> future =
                 CompletableFuture.supplyAsync(() -> {
                     System.out.println(Thread.currentThread().getId());
                     return "我进入商场, ";
 ​
                 }).thenApply(res -> {
                     System.out.println(Thread.currentThread().getId());
                     return res += "找到了我要买的商品,";
                 }).thenApply(res -> {
                     System.out.println(Thread.currentThread().getId());
                     return res += "准备付款结账,";
                 }).thenApply(res -> {
                     return res += "拿着东西回家!!!";
                 });
         String result = future.get();
         System.out.println("主线程1结束, 子线程的结果为:" + result);
         return result;
     }
 ​
     /**
      * 这里因为是异步的原因,它们之间倒是没有一个顺序上的规范
      *
      * @return
      * @throws ExecutionException
      * @throws InterruptedException
      */
     public static String thenApplyAsync() throws ExecutionException, InterruptedException {
         System.out.println("主线程2开始");
         CompletableFuture<String> future =
                 CompletableFuture.supplyAsync(() -> {
                     return "我进入商场, ";
                 },executorService).thenApplyAsync(res -> {
                     System.out.println(Thread.currentThread().getId());
                     return res += "找到了我要买的商品,";
                 },executorService).thenApplyAsync(res -> {
                     try {
                         System.out.println(Thread.currentThread().getId());
                         Thread.sleep(1000L);
                     } catch (InterruptedException e) {
                         e.printStackTrace();
                     }
 ​
                     return res += "准备付款结账,";
                 },executorService).thenApplyAsync(res -> {
                     System.out.println(Thread.currentThread().getId());
 ​
                     return res += "拿着东西回家!!!";
                 });
         String result = future.get();
         System.out.println("主线程2结束, 子线程的结果为:" + result);
         return result;
     }
 }

小结:

thenApply 和 thenApplyAsync 本质上就是将它们串起来了,必须要先完成第一个任务,才能接着做下面的任务

这里的本质区别和前面和之前说的还是一样

但是你如果仔细看了案例代码,你会发现我在里面打印了线程ID. 并且我在测试的时候,尝试将放入自定义线程池和不放入两种情况,实际上 thenApplyAsync 执行的任务线程确实不是一个.

但效果其实和 thenApply 是一样的,都需要等待上一个任务完成。

注意我说的是效果,并非是内部的执行机制。再说就又得进去看源码了...

3.4、thenAccept 和 thenAcceptAsync

image.png

如果你不想从你的回调函数中返回任何东西,仅仅想在Future完成后运行一些代码片段,你可以使用thenAccept()thenRun()方法,这些方法经常在调用链的最末端的最后一个回调函数中使用。

thenAccept消费处理结果, 接收任务的处理结果,并消费处理,无返回结果。

thenAcceptAsync则是异步的消费处理结果, 接收任务的处理结果,并消费处理,无返回结果。

 package com.nzc;
 ​
 import java.util.concurrent.*;
 import java.util.function.Consumer;
 ​
 /**
  * @description:
  * @author: Yihui Wang
  * @date: 2022年08月21日 17:21
  */
 public class ThenAcceptDemo {
 ​
     public static ExecutorService executorService = new ThreadPoolExecutor(
             10,
             100,
             30L,
             TimeUnit.SECONDS,
             new LinkedBlockingQueue<Runnable>(100),
             Executors.defaultThreadFactory(),
             new ThreadPoolExecutor.DiscardOldestPolicy());
 ​
 ​
     public static void main(String[] args) throws Exception {
         thenAccept();
         thenAcceptAsync();
     }
 ​
     private static String action1 = "";
 ​
     public static void thenAccept() {
         System.out.println("主线程开始");
         CompletableFuture.supplyAsync(() -> {
             try {
                 action1 = "逛jd,想买台电脑~ ";
             } catch (Exception e) {
                 e.printStackTrace();
             }
             return action1;
         }).thenApply(string -> {
             return action1 + "选中了,付款,下单成功!!";
         }).thenApply(String -> {
             return action1 + "等待快递到来";
         }).thenAccept((res) -> {
             System.out.println("子线程全部处理完成,最后调用了 accept,结果为:" + res);
         });
     }
 ​
     private static String action2 = "";
 ​
     public static void thenAcceptAsync() {
         System.out.println("主线程开始");
         CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
             try {
                 action2 = "逛jd,想买台电脑~ ";
             } catch (Exception e) {
                 e.printStackTrace();
             }
             return action2;
         }).thenApply(string -> {
             return action2 + "选中了,付款,下单成功!!";
         }).thenApply(String -> {
             return action2 + "等待快递到来";
         });
         
         // 这里不采用链式写法,是因为thenAcceptAsync 无返回值,
         // 第二个thenAcceptAsync 连接在第一个thenAcceptAsync后,会没有入参值
         // 就都拿出来了。
         future.thenAcceptAsync((res) -> {
             System.out.println("线程ID"+Thread.currentThread().getId()+"拿到依任务一二的返回结果,===>异步的执行任务三,晚饭时间了,打算一边看电影");
         },executorService);
         future.thenAcceptAsync((res) -> {
             System.out.println("线程ID"+Thread.currentThread().getId()+"拿到依任务一二的返回结果,===>异步的执行任务四,一边干饭~");
         },executorService);
     }
 }
 ​

thenAcceptAsync也是我们今天文章开头中用到的,异步编排式的组合视图结果集。

这一部分平时用的倒是不少,也比较方便~

上面说了这么多,但是万一我们在执行某个任务的时候出现异常该如何处理呢?

别慌,它也封装好了的。

3.5、exceptionally 和 handle

exceptionally 异常处理,出现异常时触发,可以回调给你一个从原始Future中生成的错误恢复的机会。你可以在这里记录这个异常并返回一个默认值。

一般而言,exceptionally都是写到方法调用的末尾,以来出来中间过程中会出现的异常。

另外就是 handle 也可以用来处理异常。

image.png

 public class ExceptionallyDemo {
     public static void main(String[] args) throws Exception{
         System.out.println("主线程开始");
         CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
             int i= 1/0;
             System.out.println("子线程执行中");
             return i;
         }).exceptionally(ex -> {
             System.out.println(ex.getMessage());
             return -1;
         });
         System.out.println(future.get());
     }
 }
 ​
 //主线程开始
 //java.lang.ArithmeticException: / by zero
 //-1
 public static void main(String[] args) throws Exception {
     CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
         System.out.println("任务开始");
         int i = 0 / 1;
         return i;
     }).handle((i, ex) -> {
         System.out.println("进入 handle 方法");
         if (ex != null) {
             System.out.println("发生了异常,内容为:" + ex.getMessage());
             return -1;
         } else {
             System.out.println("正常完成,内容为: " + i);
             return i;
         }
     });
 }

handle是有入参和带返回值的,入参是之前任务执行的结果。

当然一切具体的使用还是要看业务场景啦

3.6、结果合并

thenCompose 合并两个有依赖关系的 CompletableFutures的执行结果,有入参有返回值。

它的入参是第一个future和第一二两个的任何的返回结果。

thenAcceptBoth则是会将两个任务的执行结果作为方法入参,传递到指定方法中,但无返回值

runAfterBoth 则是不会把执行结果当做方法入参,也没有返回值。

 package com.nzc;
 ​
 import java.util.WeakHashMap;
 import java.util.concurrent.*;
 ​
 /**
  * @description:
  * @author: Yihui Wang
  * @date: 2022年08月21日 17:53
  */
 public class ThenCombineDemo {
 ​
     public static void main(String[] args) throws Exception {
         test();
     }
 ​
     private static Integer num = 10;
     public static void test() throws Exception {
         System.out.println("主线程开始");
         //第一步加 10
         CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
             System.out.println("第一个任务:让num+10;任务开始");
             num += 10;
             return num;
         });
         CompletableFuture<String > future1 = CompletableFuture.supplyAsync(() -> {
             System.out.println("第二个任务:让num+1;任务开始");
             return num + 1;
             //它的入参是第一个future和第一二两个的任何的返回结果。
         }).thenCombine(future,(w,s)->{
             System.out.println("任务一的结果==>"+w);
             System.out.println("任务二的结果==>"+s);
             return "我是两个任务的合并"+(w+s);
         });
         System.out.println(future.get());
         System.out.println(future1.get());
     }
 }
 /**
  * 主线程开始
  * 第一个任务:让num+10;任务开始
  * 第二个任务:让num+1;任务开始
  * 任务一的结果==>21
  * 任务二的结果==>20
  * 20
  * 我是两个任务的合并41
  */

thenAcceptBoth

 package com.nzc;
 ​
 import java.util.WeakHashMap;
 import java.util.concurrent.*;
 ​
 /**
  * @description:
  * @author: Yihui Wang
  * @date: 2022年08月21日 17:53
  */
 public class ThenCombineDemo {
 ​
     public static void main(String[] args) throws Exception {
         test();
     }
 ​
     private static Integer num = 10;
     public static void test() throws Exception {
         System.out.println("主线程开始");
         //第一步加 10
         CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
             System.out.println("第一个任务:让num+10;任务开始");
             num += 10;
             return num;
         });
         CompletableFuture<Void > future1 = CompletableFuture.supplyAsync(() -> {
             System.out.println("第二个任务:让num+1;任务开始");
             return num + 1;
         }).thenAcceptBoth(future,(w,s)->{
             System.out.println("任务一的结果==>"+w);
             System.out.println("任务二的结果==>"+s);
             System.out.println( "我是两个任务的合并"+(w+s)+"但是我没有返回值");
         });
         System.out.println("任务一的结果==>"+future.get());
         // 不采用链式写法,任务二实际上是有返回值,大家看业务场景写即可
         System.out.println("任务二后接上thenAcceptBoth方法的结果==>"+future1.get());
     }
 }
 /**
  主线程开始
  第一个任务:让num+10;任务开始
  第二个任务:让num+1;任务开始
  任务一的结果==>21
  任务二的结果==>20
  我是两个任务的合并41但是我没有返回值
  任务一的结果==>20
  任务二后接上thenAcceptBoth方法的结果==>null
  */

runAfterBoth

 public static  void test2(){
     //第一步加 10
     CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
         System.out.println("第一个任务:让num+10;任务开始");
         num += 10;
         return num;
     });
     CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
         System.out.println("第一个任务:让num+10;任务开始");
         num += 10;
         return num;
     });
     future2.runAfterBoth(future,()->{
         System.out.println("不会把执行结果当做方法入参,也没有返回值");
     });
 ​
 }

除了这些外,CompletableFuture还有我之前案例中就已经用到的allofanyOf

3.7、allof 合并多个任务结果

allOf: 一系列独立的 future任务,等其所有的任务执行完后做一些事情.

 public class CompletableFutureDemo9 {
 ​
     private static Integer num = 10;
 ​
 ​
     public static void main(String[] args) throws Exception{
         System.out.println("主线程开始");
         List<CompletableFuture> list = new ArrayList<>();
 ​
         CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> {
             System.out.println("加 10 任务开始");
             num += 10;
             return num;
         });
         list.add(job1);
 ​
         CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> {
             System.out.println("乘以 10 任务开始");
             num = num * 10;
             return num;
         });
         list.add(job2);
 ​
         CompletableFuture<Integer> job3 = CompletableFuture.supplyAsync(() -> {
             System.out.println("减以 10 任务开始");
             num = num - 10;
             return num;
         });
         list.add(job3);
 ​
         CompletableFuture<Integer> job4 = CompletableFuture.supplyAsync(() -> {
             System.out.println("除以 10 任务开始");
             num = num / 10;
             return num;
         });
         list.add(job4);
 ​
         //多任务合并
         List<Integer> collect =
             list.stream().map(CompletableFuture<Integer>::join).collect(Collectors.toList());
         System.out.println(collect);
     }
 ​
 }
 /**主线程开始
  乘以 10 任务开始
  加 10 任务开始
  减以 10 任务开始
  除以 10 任务开始
  [110, 100, 100, 10]
 */

allof的除了在合并结果时经常用到以外,像我们今天案例它也用到了它的get()方法,在那里使用的作用时,阻塞式的等待所有的任务结束,才结束方法的调用。

3.8、anyof

anyOf: 只要在多个 future里面有一个返回,整个任务就可以结束,而不需要等到每一个 future 结束

 public class CompletableFutureDemo10 {
 ​
     private static Integer num = 10;
     /**
      * 先对一个数加 10,然后取平方
      * @param args
      */
     public static void main(String[] args) throws Exception{
         System.out.println("主线程开始");
 ​
         CompletableFuture<Integer>[] futures = new CompletableFuture[4];
         CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> {
             try{
                 Thread.sleep(5000);
                 System.out.println("加 10 任务开始");
                 num += 10;
                 return num;
             }catch (Exception e){
                 return 0;
             }
         });
         futures[0] = job1;
         CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> {
             try{
                 Thread.sleep(2000);
                 System.out.println("乘以 10 任务开始");
                 num = num * 10;
                 return num;
             }catch (Exception e){
                 return 1;
             }
         });
         futures[1] = job2;
         CompletableFuture<Integer> job3 = CompletableFuture.supplyAsync(() -> {
             try{
                 Thread.sleep(3000);
                 System.out.println("减以 10 任务开始");
                 num = num - 10;
                 return num;
             }catch (Exception e){
                 return 2;
             }
         });
         futures[2] = job3;
         CompletableFuture<Integer> job4 = CompletableFuture.supplyAsync(() -> {
             try{
                 Thread.sleep(4000);
                 System.out.println("除以 10 任务开始");
                 num = num / 10;
                 return num;
             }catch (Exception e){
                 return 3;
             }
         });
         futures[3] = job4;
         CompletableFuture<Object> future = CompletableFuture.anyOf(futures);
         System.out.println(future.get());
     }
 }
 //主线程开始
 //乘以 10 任务开始
 //100

3.9、注意的小问题

1、一般来讲,如果要使用线程的话,都应该是自定义线程,这点阿里Java开发规范中也有说到。

2、自定义线程池的话,一定要把参数设置合理,这点没啥可说的,都得测,空谈都是大话,线程池的话有一篇美团技术团队的文章,讲的很好。Java线程池实现原理及其在美团业务中的实践

3、今天的案例,我在最后调用了 get()方法,一直阻塞到所有任务完成,所以你在编排的时候,一定要注意你需不需要任务的返回结果,不然很可能会产生一些数据方面问题。

4、关于异常我写到后面心有些浮躁,写的不是非常精细。获取异常信息,future需要获取返回值,才能获取异常信息。

后记

今天最想说的就是 “温故而知新

这方面的知识在去年,我其实已经学过一遍,但应用场景一少,你就会慢慢忘记它的存在。

另外想要说明的是基础我觉得是十分重要的

最近在翻阅 Java 8 实战这本书,Lamda表达式一直会写,但是对于那些思想,我一直处于一直很模糊的状态,这次在看书的时候,发现了很多以前不知道的知识,也让自己恍然大悟。

目录
相关文章
|
2月前
|
并行计算 Java 数据处理
SpringBoot高级并发实践:自定义线程池与@Async异步调用深度解析
SpringBoot高级并发实践:自定义线程池与@Async异步调用深度解析
215 0
|
2月前
|
Java API
异步任务编排神器CompletableFuture
【10月更文挑战第10天】CompletableFuture是JDK8并发包中引入的强大工具,用于处理复杂的异步任务编排。它提供了丰富的API,支持任务的串行、并行、组合及异常处理,适用于需要高效管理和协调多个异步操作的场景。例如,网页加载时需从多个服务异步获取数据,CompletableFuture可以有效提升性能和响应速度。使用时应注意异常处理和合理选择线程池,以确保程序稳定性和效率。
异步任务编排神器CompletableFuture
|
3月前
|
数据采集 JavaScript Java
CompletableFuture异步编排,你还不会?
本文介绍了同步与异步编程的概念,探讨了在复杂业务场景中使用异步编排的重要性。通过对比 `Future` 与 `CompletableFuture`,详细讲解了 `CompletableFuture` 的多种方法,如 `runAsync`、`supplyAsync`、`whenComplete`、`exceptionally` 等,并展示了如何通过 `CompletableFuture` 实现异步任务的组合与异常处理。最后,通过实战案例演示了如何利用线程池与 `CompletableFuture` 优化商品详情页的查询效率,显著减少响应时间。
CompletableFuture异步编排,你还不会?
|
4月前
|
Java 数据库
异步&线程池 CompletableFuture 异步编排 实战应用 【终结篇】
这篇文章通过一个电商商品详情页的实战案例,展示了如何使用`CompletableFuture`进行异步编排,以解决在不同数据库表中查询商品信息的问题,并提供了详细的代码实现和遇到问题(如图片未显示)的解决方案。
异步&线程池 CompletableFuture 异步编排 实战应用 【终结篇】
|
4月前
|
Java
异步&线程池 CompletableFuture 异步编排 【下篇】
这篇文章深入探讨了Java中的`CompletableFuture`类,解释了如何创建异步操作、使用计算完成时的回调方法、异常处理、串行化方法、任务组合以及多任务组合的使用方式,并通过代码示例展示了各种场景下的应用。
异步&线程池 CompletableFuture 异步编排 【下篇】
|
7月前
|
前端开发 Java
CompletableFuture的高级用法与实战
【4月更文挑战第20天】
300 1
|
安全 Java
任务编排:CompletableFuture从入门到精通
最近遇到了一个业务场景,涉及到多数据源之间的请求的流程编排,正好看到了一篇某团介绍CompletableFuture原理和使用的技术文章,主要还是涉及使用层面。网上很多文章涉及原理的部分讲的不是特别详细且比较抽象。因为涉及到多线程的工具必须要理解原理,不然一旦遇到问题排查起来就只能凭玄学,正好借此梳理一下CompletableFuture的工作原理
355 0
|
7月前
|
Java
CompletableFuture 异步编排、案例及应用小案例1
CompletableFuture 异步编排、案例及应用小案例
172 0
|
存储 SpringCloudAlibaba Java
Java新特性:异步编排CompletableFuture
CompletableFuture由Java 8提供,是实现异步化的工具类,上手难度较低,且功能强大,支持通过函数式编程的方式对各类操作进行组合编排。 CompletableFuture实现了CompletionStage接口和Future接口,前者是对后者的一个扩展,增加了异步[回调](https://so.csdn.net/so/search?q=回调&spm=1001.2101.3001.7020)、流式处理、多个Future组合处理的能力,使Java在处理多任务的协同工作时更加顺畅便利。
372 1
Java新特性:异步编排CompletableFuture
|
设计模式 JavaScript 前端开发
CompletableFuture 异步编排
CompletableFuture 异步编排