前言
今天的话,就来以一个应用场景来进行一步一步的推导,在实现案例的过程中,将CompletableFuture
相关的知识点逐步讲述明白。
应用场景如下:
我们在查询掘金文章页面数据为应用场景,如何使用异步编程进行优化。
以掘金展示页面为例,点进文章页面时,大致需要渲染的数据为以下几点:
//1. 文章内容信息 1s //2. 作者相关信息 0.5s 依赖于1的查询结果 //3. 文章评论信息 0.5s 依赖于1的查询结果 //4. 文章分类信息 0.5s 依赖于1的查询结果 //5. 文章专栏信息 0.5s 依赖于1的查询结果 //6. 相关文章信息 0.5s 依赖于1的查询结果 //...
补充
:这是我随意拆分的,里面具体的表结构和接口请求先后的关系,以及具体的请求时间都是比较随意的,具体想要陈述的就是同步和异步编程的关系。
那么我们就要根据这个组装一个视图数据来进行返回。
注意
:实际上并非是如此,掘金文章页面内容的数据是多个接口返回的,我只是为了模拟内容,这么写罢了,切勿当真,真正应用还需要分业务场景,或者应用场景中可异步编排,到那个时候希望大家能应用上。
现在来说:按照以前我们以前串行化的执行方式,那么总花费的时间就是3.5s
,也就是从一开始执行到六,无疑这样是非常慢的,并且 2,3,4,5,6
都是依赖于 1的结果查询,但2,3,4,5,6
并不互相依赖,此时我们可以将他们从串行化变成异步执行,自己准备一个线程池,然后在执行的时候,将它们放进线程池中异步运行。
如此总耗费时间就从原来的 3.5s
变成了 1.5s
,编程思想的改变,对于性能还是有一定程度的提高的
接下来我们就开始接触CompletableFuture
吧
一、CompletableFuture 引入之前
再讲CompletableFuture
之前,我还是秉承着一贯的理念,先讲述一些之前的东西,然后再将它引入进来,不至于让大家对于它的出现处于一种非常迷茫的状态。
在之前我们如果只是普通异步的执行一个任务,无需返回结果的话,只要将一个任务实现 Runnable
接口,然后将放进线程池即可。
如果需要返回结果,就让任务实现Callable
接口,但实际上Callable
与 Thread 并没有任何关系,Callable
还需要使用Future
与线程建立关系,然后再让线程池执行,最后通过futureTask.get()
方法来获取执行的返回结果。
futureTask
是Future
接口的一个基本实现。
(Callable 类似于Runnable 接口,但 Runnable 接口中的 run()方法不会返回结果,并且也无法抛出经过检查的异常,但是 Callable中 call()方法能够返回计算结果,并且也能够抛出经过检查的异常。)
一个小案例:
/** * @description: * @author: Yihui Wang * @date: 2022年08月21日 11:34 */ public class Demo { public static ExecutorService executorService = new ThreadPoolExecutor( 10, 100, 30L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(100), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardOldestPolicy()); public static void runnableTest(){ RunnableTest runnableTest = new RunnableTest(); executorService.submit(runnableTest); } public static void callableTest() throws ExecutionException, InterruptedException, TimeoutException { CallableAndFutureTest callableAndFutureTest = new CallableAndFutureTest(); FutureTask<String> task = new FutureTask<>(callableAndFutureTest); // 采用线程池执行完程序并不会结束, 如果是想测试一次性的那种 可以采用 // new Thread(task).start(); executorService.submit(task); //System.out.println("尝试取消任务,传true表示取消任务,false则不取消任务::"+task.cancel(true)); System.out.println("判断任务是否已经完成::"+task.isDone()); //结果已经计算出来,则立马取出来,如若摸没有计算出来,则一直等待,直到结果出来,或任务取消或发生异常。 System.out.println("阻塞式获取结果::"+task.get()); System.out.println("在获取结果时,给定一个等待时间,如果超过等待时间还未获取到结果,则会主动抛出超时异常::"+task.get(2, TimeUnit.SECONDS)); } public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException { runnableTest(); callableTest(); } } class RunnableTest implements Runnable{ @Override public void run() { System.out.println("我是Runnable执行的结果,我无法返回结果"); } } class CallableAndFutureTest implements Callable<String> { @Override public String call() throws Exception { String str = ""; for (int i = 0; i < 10; i++) { str += String.valueOf(i); Thread.sleep(100); } return str; } }
看起来Callable
搭配Future
好像已经可以实现我们今天要实现的效果了,从结果的意义上来说,确实可以,但是并不优雅,也会存在一些问题。
如果多个线程之间存在依赖组合,该如何呢?
这个时候就轮到 CompletableFuture
出现了~
二、CompletableFuture 案例
我先直接将实现应用场景的效果代码写出来,然后再接着慢慢的去讲
package com.nzc; import lombok.Data; import java.util.concurrent.*; /** * @description: * @author: Yihui Wang * @date: 2022年08月21日 11:48 */ public class CompletableFutureDemo { public static ExecutorService executorService = new ThreadPoolExecutor( 10, 100, 30L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(100), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardOldestPolicy()); public static ArticleVO asyncReturn(){ ArticleVO article=new ArticleVO(); long startTime=System.currentTimeMillis(); CompletableFuture<ArticleVO> articleContent = CompletableFuture.supplyAsync(() -> { try { article.setId(1L); article.setContent("我是宁在春写的文章内容"); Thread.sleep(1000); } catch (Exception e) { e.printStackTrace(); } return article; },executorService); // 这里的res 就是第一个个 CompletableFuture 执行完返回的结果 // 如果要测试它们的异步性,其实应该在下面的所有执行中,都让它们沉睡一会,这样效果会更加明显 // executorService 是放到我们自己创建的线程池中运行 CompletableFuture<Void> author = articleContent.thenAcceptAsync((res) -> { res.setAuthor(res.getId()+"的作者是宁在春"); },executorService); CompletableFuture<Void> articleComment = articleContent.thenAcceptAsync((res) -> { res.setComment(res.getId()+"的相关评论"); },executorService); CompletableFuture<Void> articleCategory = articleContent.thenAcceptAsync((res) -> { res.setCategory(res.getId()+"的分类信息"); },executorService); CompletableFuture<Void> articleColumn = articleContent.thenAcceptAsync((res) -> { res.setColumn(res.getId()+"的文章专栏信息"); },executorService); CompletableFuture<Void> recommend = articleContent.thenAcceptAsync((res) -> { res.setRecommend(res.getId()+"的文章推荐信息"); },executorService); CompletableFuture<Void> futureAll = CompletableFuture.allOf(articleContent, author, articleComment, articleCategory, articleColumn, recommend); try { // get() 是一个阻塞式方法 这里是阻塞式等待所有结果返回 // 因为要等待所有结果返回,才允许方法的结束,否则一些还在执行中,但是方法已经返回,就会造成一些错误。 futureAll.get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } long endTime=System.currentTimeMillis(); System.out.println("耗费的总时间===>"+(endTime-startTime)); // 所有任务执行完成后,将构建出来的视图结果进行返回 return article; } public static void main(String[] args) { ArticleVO articleVO = asyncReturn(); System.out.println(articleVO); } } @Data class ArticleVO { private Long id; private String content; private String author; private String comment; private String category; private String column; private String recommend; }
这里就是对应着应用场景里的那几步,引入以下lombok包就可以直接测试了。
为了更好的看出效果,也可以在执行某个任务的时候,让它睡上一会。
三、CompletableFuture 详解
看完上面的例子,算是看到他是如何的啦,接下来还是需要详细说一说的,思维导图如下:
3.1、通过 CompletableFuture 创建普通异步任务
CompletableFuture.runAsync()
创建无返回值的简单异步任务 Executor
表示线程池~
package com.nzc; import java.util.concurrent.*; /** * @description: * @author: Yihui Wang * @date: 2022年08月21日 15:58 */ public class AsyncDemo { public static ExecutorService executorService = new ThreadPoolExecutor( 10, 100, 30L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(100), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardOldestPolicy()); public static void main(String[] args) throws ExecutionException, InterruptedException { System.out.println("主线程开始"); CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> { try { Thread.sleep(500L); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("通过CompletableFuture.runAsync创建一个简单的异步任务~"); // 另外此处还可以填写第二个参数,放进自定义线程池中执行 },executorService); //runAsync.isDone() 可以判断任务是否已经 完成 System.out.println("任务是否完成==>" + runAsync.isDone()); //这里是阻塞式等待任务完成 runAsync.get(); System.out.println("主线程结束"); System.out.println("任务是否完成==>" + runAsync.isDone()); } }
CompletableFuture.supplyAsync()
创建有返回值的简单异步任务
public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> { return "我是由宁在春创建的有返回结果的异步任务"; }, executorService); // 如果只有一条返回语句,还可以写的更加简便 //CompletableFuture<String> supplyAsync1 = CompletableFuture.supplyAsync(() -> "我是有返回结果的异步任务", executorService); //这里同样也是阻塞式的 String result = supplyAsync.get(); System.out.println("异步任务执行的回调结果:==>"+result); }
3.2、thenRun/thenRunAsync
简单说就是,这两个方法就是将执行任务的线程排起来,执行完一个,接着再执行第二个。并且它不需要接收上一个任务的结果,也不会返回结果,一定程度上来说,它的应用场景并不是特别高。
public static ExecutorService executorService = new ThreadPoolExecutor( 10, 100, 30L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(100), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardOldestPolicy()); /** * @param args */ public static void main(String[] args) throws Exception { thenRun(); thenRunAsync(); } public static void thenRun() throws ExecutionException, InterruptedException { long startTime = System.currentTimeMillis(); System.out.println("主线程开始1"); CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { System.out.println("我是一个无需传参也没有返回值的简单异步任务1"); return "我是宁在春"; }); CompletableFuture<Void> thenRun = future.thenRun(() -> { try { Thread.sleep(500L); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("等待任务1执行完后,我再执行任务2"); }); CompletableFuture<Void> thenRun1 = future.thenRun(() -> { try { Thread.sleep(500L); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("等待任务1执行完后,我再执行任务3"); }); CompletableFuture<Void> thenRun2 = future.thenRun(() -> { try { Thread.sleep(500L); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("等待任务1执行完后,我再执行任务4"); }); future.get(); long endTime = System.currentTimeMillis(); System.out.println("主线程结束1,耗费时间为:"+(endTime-startTime)); } public static void thenRunAsync() throws ExecutionException, InterruptedException { long startTime = System.currentTimeMillis(); System.out.println("主线程开始2"); CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { System.out.println("我是一个无需传参也没有返回值的简单异步任务 一"); return "我是宁在春"; },executorService); CompletableFuture<Void> thenRunAsync1 = future.thenRunAsync(() -> { try { Thread.sleep(500L); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("等待任务一执行完后,我再执行任务二"); },executorService); CompletableFuture<Void> thenRunAsync2 = future.thenRunAsync(() -> { try { Thread.sleep(500L); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("等待任务一执行完后,我再执行任务三"); },executorService); CompletableFuture<Void> thenRunAsync3 = future.thenRunAsync(() -> { try { Thread.sleep(500L); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("等待任务一执行完后,我再执行任务四"); },executorService); // 这里是让所有的阻塞,等待所有任务完成,才结束整个任务 CompletableFuture<Void> allOf = CompletableFuture.allOf(future,thenRunAsync1, thenRunAsync2, thenRunAsync3); allOf.get(); long endTime = System.currentTimeMillis(); System.out.println("主线程结束2,耗费时间为:"+(endTime-startTime)); } }
小结:
浅显的说它们两个的区别的话,其实就是thenRunAsync
可异步执行,thenRun
不可异步执行,不过都可以异步的阻塞式等待结果的返回。
在案例中我是自己手动创建了线程池,但其实就算我没有手动创建线程池,当调用thenRunAsync
方法,它也是放在异步线程中执行的。
源码比较:
public CompletableFuture<Void> thenRun(Runnable action) { return uniRunStage(null, action); } public CompletableFuture<Void> thenRunAsync(Runnable action) { return uniRunStage(asyncPool, action); } public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor) { return uniRunStage(screenExecutor(executor), action); } /** * Default executor -- ForkJoinPool.commonPool() unless it cannot * support parallelism. */ private static final Executor asyncPool = useCommonPool ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
小结:
浅显的说它们两个的区别的话,其实就是thenRunAsync
可异步执行,thenRun
不可异步执行,不过都可以异步的阻塞式等待结果的返回。
在案例中我是自己手动创建了线程池,但其实就算我没有手动创建线程池,当调用thenRunAsync
方法,它也是放在异步线程中执行的。
源码比较:
public CompletableFuture<Void> thenRun(Runnable action) { return uniRunStage(null, action); } public CompletableFuture<Void> thenRunAsync(Runnable action) { return uniRunStage(asyncPool, action); } public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor) { return uniRunStage(screenExecutor(executor), action); } /** * Default executor -- ForkJoinPool.commonPool() unless it cannot * support parallelism. */ private static final Executor asyncPool = useCommonPool ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
thenRun
它是同第一个任务是同一个线程,所以当第一个任务结束后,它才会开始执行任务。thenRunAsync
它则是不一样的,如果我传入我自定义的线程池,它就是放入我们自定义的线程池进行运行,如果传线程池这个参数的话,就是默认使用ForkJoin线程池
之后的类比区别也是同样的,总共是三组这样的方法。
CompletableFuture 异步编排、案例及应用小案例2:https://developer.aliyun.com/article/1394508