CompletableFuture 异步编排、案例及应用小案例1

简介: CompletableFuture 异步编排、案例及应用小案例

前言

今天的话,就来以一个应用场景来进行一步一步的推导,在实现案例的过程中,将CompletableFuture相关的知识点逐步讲述明白。

应用场景如下

我们在查询掘金文章页面数据为应用场景,如何使用异步编程进行优化。

以掘金展示页面为例,点进文章页面时,大致需要渲染的数据为以下几点:

 //1. 文章内容信息 1s
 //2. 作者相关信息 0.5s 依赖于1的查询结果
 //3. 文章评论信息 0.5s 依赖于1的查询结果
 //4. 文章分类信息 0.5s 依赖于1的查询结果
 //5. 文章专栏信息 0.5s 依赖于1的查询结果
 //6. 相关文章信息 0.5s 依赖于1的查询结果
 //...

补充:这是我随意拆分的,里面具体的表结构和接口请求先后的关系,以及具体的请求时间都是比较随意的,具体想要陈述的就是同步和异步编程的关系。

那么我们就要根据这个组装一个视图数据来进行返回。

注意:实际上并非是如此,掘金文章页面内容的数据是多个接口返回的,我只是为了模拟内容,这么写罢了,切勿当真,真正应用还需要分业务场景,或者应用场景中可异步编排,到那个时候希望大家能应用上。


现在来说:按照以前我们以前串行化的执行方式,那么总花费的时间就是3.5s,也就是从一开始执行到六,无疑这样是非常慢的,并且 2,3,4,5,6都是依赖于 1的结果查询,但2,3,4,5,6并不互相依赖,此时我们可以将他们从串行化变成异步执行,自己准备一个线程池,然后在执行的时候,将它们放进线程池中异步运行。

如此总耗费时间就从原来的 3.5s 变成了 1.5s,编程思想的改变,对于性能还是有一定程度的提高的

接下来我们就开始接触CompletableFuture

一、CompletableFuture 引入之前

再讲CompletableFuture之前,我还是秉承着一贯的理念,先讲述一些之前的东西,然后再将它引入进来,不至于让大家对于它的出现处于一种非常迷茫的状态。


在之前我们如果只是普通异步的执行一个任务,无需返回结果的话,只要将一个任务实现 Runnable接口,然后将放进线程池即可。

如果需要返回结果,就让任务实现Callable接口,但实际上Callable与 Thread 并没有任何关系,Callable 还需要使用Future与线程建立关系,然后再让线程池执行,最后通过futureTask.get()方法来获取执行的返回结果。

futureTaskFuture接口的一个基本实现。

(Callable 类似于Runnable 接口,但 Runnable 接口中的 run()方法不会返回结果,并且也无法抛出经过检查的异常,但是 Callable中 call()方法能够返回计算结果,并且也能够抛出经过检查的异常。)

一个小案例:

 /**
  * @description:
  * @author: Yihui Wang
  * @date: 2022年08月21日 11:34
  */
 public class Demo {
     public static ExecutorService executorService = new ThreadPoolExecutor(
             10,
             100,
             30L,
             TimeUnit.SECONDS,
             new LinkedBlockingQueue<Runnable>(100),
             Executors.defaultThreadFactory(),
             new ThreadPoolExecutor.DiscardOldestPolicy());
 ​
     public static void runnableTest(){
         RunnableTest runnableTest = new RunnableTest();
         executorService.submit(runnableTest);
     }
 ​
     public static void callableTest() throws ExecutionException, InterruptedException, TimeoutException {
         CallableAndFutureTest callableAndFutureTest = new CallableAndFutureTest();
         FutureTask<String> task = new FutureTask<>(callableAndFutureTest);
         // 采用线程池执行完程序并不会结束, 如果是想测试一次性的那种 可以采用
         // new Thread(task).start();
         executorService.submit(task);
         //System.out.println("尝试取消任务,传true表示取消任务,false则不取消任务::"+task.cancel(true));
         System.out.println("判断任务是否已经完成::"+task.isDone());
         //结果已经计算出来,则立马取出来,如若摸没有计算出来,则一直等待,直到结果出来,或任务取消或发生异常。
         System.out.println("阻塞式获取结果::"+task.get());
         System.out.println("在获取结果时,给定一个等待时间,如果超过等待时间还未获取到结果,则会主动抛出超时异常::"+task.get(2, TimeUnit.SECONDS));
     }
     public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
         runnableTest();
         callableTest();
     }
 ​
 }
 ​
 class RunnableTest implements Runnable{
     @Override
     public void run() {
         System.out.println("我是Runnable执行的结果,我无法返回结果");
     }
 }
 class CallableAndFutureTest implements Callable<String> {
     @Override
     public String call() throws Exception {
         String str = "";
         for (int i = 0; i < 10; i++) {
             str += String.valueOf(i);
             Thread.sleep(100);
         }
         return str;
     }
 }
 ​

看起来Callable搭配Future好像已经可以实现我们今天要实现的效果了,从结果的意义上来说,确实可以,但是并不优雅,也会存在一些问题。

如果多个线程之间存在依赖组合,该如何呢?

这个时候就轮到 CompletableFuture出现了~

二、CompletableFuture 案例

我先直接将实现应用场景的效果代码写出来,然后再接着慢慢的去讲

 package com.nzc;
 ​
 import lombok.Data;
 ​
 import java.util.concurrent.*;
 ​
 /**
  * @description:
  * @author: Yihui Wang
  * @date: 2022年08月21日 11:48
  */
 public class CompletableFutureDemo {
 ​
     public static ExecutorService executorService = new ThreadPoolExecutor(
             10,
             100,
             30L,
             TimeUnit.SECONDS,
             new LinkedBlockingQueue<Runnable>(100),
             Executors.defaultThreadFactory(),
             new ThreadPoolExecutor.DiscardOldestPolicy());
 ​
     public static ArticleVO asyncReturn(){
         ArticleVO article=new ArticleVO();
         long startTime=System.currentTimeMillis();
         CompletableFuture<ArticleVO> articleContent = CompletableFuture.supplyAsync(() -> {
             try {
                 article.setId(1L);
                 article.setContent("我是宁在春写的文章内容");
                 Thread.sleep(1000);
             } catch (Exception e) {
                 e.printStackTrace();
             }
             return article;
         },executorService);
 ​
         // 这里的res 就是第一个个 CompletableFuture 执行完返回的结果
         // 如果要测试它们的异步性,其实应该在下面的所有执行中,都让它们沉睡一会,这样效果会更加明显
         // executorService 是放到我们自己创建的线程池中运行
         CompletableFuture<Void> author = articleContent.thenAcceptAsync((res) -> {
             res.setAuthor(res.getId()+"的作者是宁在春");
         },executorService);
 ​
         CompletableFuture<Void> articleComment = articleContent.thenAcceptAsync((res) -> {
             res.setComment(res.getId()+"的相关评论");
         },executorService);
 ​
         CompletableFuture<Void> articleCategory = articleContent.thenAcceptAsync((res) -> {
             res.setCategory(res.getId()+"的分类信息");
         },executorService);
 ​
         CompletableFuture<Void> articleColumn = articleContent.thenAcceptAsync((res) -> {
             res.setColumn(res.getId()+"的文章专栏信息");
         },executorService);
 ​
         CompletableFuture<Void> recommend = articleContent.thenAcceptAsync((res) -> {
             res.setRecommend(res.getId()+"的文章推荐信息");
         },executorService);
 ​
         CompletableFuture<Void> futureAll = CompletableFuture.allOf(articleContent, author, articleComment, articleCategory, articleColumn, recommend);
 ​
         try {
             // get() 是一个阻塞式方法 这里是阻塞式等待所有结果返回
             // 因为要等待所有结果返回,才允许方法的结束,否则一些还在执行中,但是方法已经返回,就会造成一些错误。
             futureAll.get();
         } catch (InterruptedException e) {
             e.printStackTrace();
         } catch (ExecutionException e) {
             e.printStackTrace();
         }
         long endTime=System.currentTimeMillis();
         System.out.println("耗费的总时间===>"+(endTime-startTime));
 ​
         // 所有任务执行完成后,将构建出来的视图结果进行返回
         return article;
     }
 ​
     public static void main(String[] args) {
         ArticleVO articleVO = asyncReturn();
         System.out.println(articleVO);
     }
 ​
 }
 ​
 @Data
 class ArticleVO {
     private Long id;
     private String content;
     private String author;
     private String comment;
     private String category;
     private String column;
     private String recommend;
 }

这里就是对应着应用场景里的那几步,引入以下lombok包就可以直接测试了。

为了更好的看出效果,也可以在执行某个任务的时候,让它睡上一会。

三、CompletableFuture 详解

看完上面的例子,算是看到他是如何的啦,接下来还是需要详细说一说的,思维导图如下:

image.png

3.1、通过 CompletableFuture 创建普通异步任务

CompletableFuture.runAsync()创建无返回值的简单异步任务 Executor表示线程池~

 package com.nzc;
 ​
 import java.util.concurrent.*;
 ​
 /**
  * @description:
  * @author: Yihui Wang
  * @date: 2022年08月21日 15:58
  */
 public class AsyncDemo {
 ​
     public static ExecutorService executorService = new ThreadPoolExecutor(
             10,
             100,
             30L,
             TimeUnit.SECONDS,
             new LinkedBlockingQueue<Runnable>(100),
             Executors.defaultThreadFactory(),
             new ThreadPoolExecutor.DiscardOldestPolicy());
 ​
     public static void main(String[] args) throws ExecutionException, InterruptedException {
         System.out.println("主线程开始");
         CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> {
             try {
                 Thread.sleep(500L);
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
             System.out.println("通过CompletableFuture.runAsync创建一个简单的异步任务~");
             // 另外此处还可以填写第二个参数,放进自定义线程池中执行
         },executorService);
         //runAsync.isDone() 可以判断任务是否已经 完成
         System.out.println("任务是否完成==>" + runAsync.isDone());
         //这里是阻塞式等待任务完成
         runAsync.get();
         System.out.println("主线程结束");
         System.out.println("任务是否完成==>" + runAsync.isDone());
     }
 }
 ​

CompletableFuture.supplyAsync()创建有返回值的简单异步任务

 public static void main(String[] args) throws ExecutionException, InterruptedException {
     CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> {
         return "我是由宁在春创建的有返回结果的异步任务";
     }, executorService);
     // 如果只有一条返回语句,还可以写的更加简便
     //CompletableFuture<String> supplyAsync1 = CompletableFuture.supplyAsync(() -> "我是有返回结果的异步任务", executorService);
 ​
     //这里同样也是阻塞式的
     String result = supplyAsync.get();
     System.out.println("异步任务执行的回调结果:==>"+result);
 }

3.2、thenRun/thenRunAsync

image.png

简单说就是,这两个方法就是将执行任务的线程排起来,执行完一个,接着再执行第二个。并且它不需要接收上一个任务的结果,也不会返回结果,一定程度上来说,它的应用场景并不是特别高。

 public static ExecutorService executorService = new ThreadPoolExecutor(
     10,
     100,
     30L,
     TimeUnit.SECONDS,
     new LinkedBlockingQueue<Runnable>(100),
     Executors.defaultThreadFactory(),
     new ThreadPoolExecutor.DiscardOldestPolicy());
 /**
      * @param args
      */
 public static void main(String[] args) throws Exception {
     thenRun();
     thenRunAsync();
 }
 ​
 public static void thenRun() throws ExecutionException, InterruptedException {
     long startTime = System.currentTimeMillis();
     System.out.println("主线程开始1");
     CompletableFuture<String> future =
         CompletableFuture.supplyAsync(() -> {
             System.out.println("我是一个无需传参也没有返回值的简单异步任务1");
             return "我是宁在春";
         });
     CompletableFuture<Void> thenRun = future.thenRun(() -> {
         try {
             Thread.sleep(500L);
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
         System.out.println("等待任务1执行完后,我再执行任务2");
     });
     CompletableFuture<Void> thenRun1 = future.thenRun(() -> {
         try {
             Thread.sleep(500L);
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
         System.out.println("等待任务1执行完后,我再执行任务3");
     });
     CompletableFuture<Void> thenRun2 = future.thenRun(() -> {
         try {
             Thread.sleep(500L);
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
         System.out.println("等待任务1执行完后,我再执行任务4");
     });
     future.get();
     long endTime = System.currentTimeMillis();
     System.out.println("主线程结束1,耗费时间为:"+(endTime-startTime));
 }
 ​
 public static void thenRunAsync() throws ExecutionException, InterruptedException {
     long startTime = System.currentTimeMillis();
     System.out.println("主线程开始2");
     CompletableFuture<String> future =
         CompletableFuture.supplyAsync(() -> {
             System.out.println("我是一个无需传参也没有返回值的简单异步任务 一");
             return "我是宁在春";
         },executorService);
     CompletableFuture<Void> thenRunAsync1 = future.thenRunAsync(() -> {
         try {
             Thread.sleep(500L);
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
         System.out.println("等待任务一执行完后,我再执行任务二");
     },executorService);
 ​
     CompletableFuture<Void> thenRunAsync2 = future.thenRunAsync(() -> {
         try {
             Thread.sleep(500L);
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
         System.out.println("等待任务一执行完后,我再执行任务三");
     },executorService);
 ​
     CompletableFuture<Void> thenRunAsync3 = future.thenRunAsync(() -> {
         try {
             Thread.sleep(500L);
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
         System.out.println("等待任务一执行完后,我再执行任务四");
     },executorService);
 ​
     // 这里是让所有的阻塞,等待所有任务完成,才结束整个任务
     CompletableFuture<Void> allOf = CompletableFuture.allOf(future,thenRunAsync1, thenRunAsync2, thenRunAsync3);
     allOf.get();
     long endTime = System.currentTimeMillis();
     System.out.println("主线程结束2,耗费时间为:"+(endTime-startTime));
 }
 }

小结

浅显的说它们两个的区别的话,其实就是thenRunAsync可异步执行,thenRun不可异步执行,不过都可以异步的阻塞式等待结果的返回。

在案例中我是自己手动创建了线程池,但其实就算我没有手动创建线程池,当调用thenRunAsync方法,它也是放在异步线程中执行的。

源码比较:

 public CompletableFuture<Void> thenRun(Runnable action) {
     return uniRunStage(null, action);
 }
 ​
 public CompletableFuture<Void> thenRunAsync(Runnable action) {
     return uniRunStage(asyncPool, action);
 }
 ​
 public CompletableFuture<Void> thenRunAsync(Runnable action,
                                             Executor executor) {
     return uniRunStage(screenExecutor(executor), action);
 }
 ​
 /**
      * Default executor -- ForkJoinPool.commonPool() unless it cannot
      * support parallelism.
      */
 private static final Executor asyncPool = useCommonPool ?
     ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
 ​

小结

浅显的说它们两个的区别的话,其实就是thenRunAsync可异步执行,thenRun不可异步执行,不过都可以异步的阻塞式等待结果的返回。

在案例中我是自己手动创建了线程池,但其实就算我没有手动创建线程池,当调用thenRunAsync方法,它也是放在异步线程中执行的。

源码比较:

 public CompletableFuture<Void> thenRun(Runnable action) {
     return uniRunStage(null, action);
 }
 ​
 public CompletableFuture<Void> thenRunAsync(Runnable action) {
     return uniRunStage(asyncPool, action);
 }
 ​
 public CompletableFuture<Void> thenRunAsync(Runnable action,
                                             Executor executor) {
     return uniRunStage(screenExecutor(executor), action);
 }
 ​
 /**
      * Default executor -- ForkJoinPool.commonPool() unless it cannot
      * support parallelism.
      */
 private static final Executor asyncPool = useCommonPool ?
     ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
 ​


  • thenRun它是同第一个任务是同一个线程,所以当第一个任务结束后,它才会开始执行任务。
  • thenRunAsync它则是不一样的,如果我传入我自定义的线程池,它就是放入我们自定义的线程池进行运行,如果传线程池这个参数的话,就是默认使用ForkJoin线程池

之后的类比区别也是同样的,总共是三组这样的方法。

CompletableFuture 异步编排、案例及应用小案例2:https://developer.aliyun.com/article/1394508

目录
相关文章
|
1天前
|
Java 关系型数据库 MySQL
CompletableFuture基础实践小结
CompletableFuture基础实践小结
11 1
|
4天前
|
前端开发 Java
CompletableFuture的高级用法与实战
【4月更文挑战第20天】
23 1
|
4月前
|
Java
CompletableFuture 异步编排、案例及应用小案例2
CompletableFuture 异步编排、案例及应用小案例
31 0
|
安全 Java
任务编排:CompletableFuture从入门到精通
最近遇到了一个业务场景,涉及到多数据源之间的请求的流程编排,正好看到了一篇某团介绍CompletableFuture原理和使用的技术文章,主要还是涉及使用层面。网上很多文章涉及原理的部分讲的不是特别详细且比较抽象。因为涉及到多线程的工具必须要理解原理,不然一旦遇到问题排查起来就只能凭玄学,正好借此梳理一下CompletableFuture的工作原理
286 0
|
9月前
|
设计模式 JavaScript 前端开发
CompletableFuture 异步编排
CompletableFuture 异步编排
|
9月前
|
存储 SpringCloudAlibaba Java
Java新特性:异步编排CompletableFuture
CompletableFuture由Java 8提供,是实现异步化的工具类,上手难度较低,且功能强大,支持通过函数式编程的方式对各类操作进行组合编排。 CompletableFuture实现了CompletionStage接口和Future接口,前者是对后者的一个扩展,增加了异步[回调](https://so.csdn.net/so/search?q=回调&spm=1001.2101.3001.7020)、流式处理、多个Future组合处理的能力,使Java在处理多任务的协同工作时更加顺畅便利。
172 1
Java新特性:异步编排CompletableFuture
|
Java API
CompletableFuture实现异步编排
场景:电商系统中获取一个完整的商品信息可能分为以下几步:①获取商品基本信息 ②获取商品图片信息 ③获取商品促销活动信息 ④获取商品各种类的基本信息 等操作,如果使用串行方式去执行这些操作,假设每个操作执行1s,那么用户看到完整的商品详情就需要4s的时间,如果使用并行方式执行这些操作,可能只需要1s就可以完成。所以这就是异步执行的好处。
132 0
CompletableFuture实现异步编排
|
7天前
|
弹性计算 运维 Serverless
基于函数计算搭建的异步任务执行框架
本文介绍基于函数计算实现的异步任务执行框架(编程语言:Python3),把跟阿里云资源开通相关的API封装到一个独立的模块,提供标准的API跟企业内部在用的ITSM或OA进行集成,降低客户对接API门槛,更快上阿里云。
基于函数计算搭建的异步任务执行框架
|
Java
简述CompletableFuture异步任务编排(上)
简述CompletableFuture异步任务编排
308 0
简述CompletableFuture异步任务编排(上)
|
Java API
简述CompletableFuture异步任务编排(下)
简述CompletableFuture异步任务编排
172 0
简述CompletableFuture异步任务编排(下)

热门文章

最新文章