CompletableFuture 异步编排、案例及应用小案例1:https://developer.aliyun.com/article/1394503
3.3、thenApply和thenApplyAsync
thenApply 和 thenApplyAsync 让线程成为了一种串行化的关系,第一个任务执行完的返回值会作为第二个的任务的入参.
案例的话,比较简单.
package com.nzc; import java.util.concurrent.*; /** * @description: * @author: Yihui Wang * @date: 2022年08月21日 16:32 */ public class ThenApplyAndAsyncDemo { public static ExecutorService executorService = new ThreadPoolExecutor( 10, 100, 30L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(100), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardOldestPolicy()); /** * @param args */ public static void main(String[] args) throws ExecutionException, InterruptedException { thenApply(); thenApplyAsync(); } /** * 线程串行化 * 1、我进入商场 * 2、找到了我要买的商品 * 3、准备付款结账 * 4、拿着东西回家!!! * 你会发现这是一步扣一步的在走,其实业务场景中也有很多这样的场景,希望大家在遇到的时候能够想到 * * @return * @throws ExecutionException * @throws InterruptedException */ public static String thenApply() throws ExecutionException, InterruptedException { System.out.println("主线程开始1"); // CompletableFuture<String> future = // CompletableFuture.supplyAsync(() -> { // return "我进入商场, "; // }); // CompletableFuture<String> future1 = future.thenApply(res -> { // return res += "找到了我要买的商品,"; // }); // future.thenApply(res->{ // return res+="准备付款结账,"; // }).thenApply(res->{ // return res+="拿着东西回家!!!"; // }); // 上面那种分开写和下面这种链式写法是一样的 CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getId()); return "我进入商场, "; }).thenApply(res -> { System.out.println(Thread.currentThread().getId()); return res += "找到了我要买的商品,"; }).thenApply(res -> { System.out.println(Thread.currentThread().getId()); return res += "准备付款结账,"; }).thenApply(res -> { return res += "拿着东西回家!!!"; }); String result = future.get(); System.out.println("主线程1结束, 子线程的结果为:" + result); return result; } /** * 这里因为是异步的原因,它们之间倒是没有一个顺序上的规范 * * @return * @throws ExecutionException * @throws InterruptedException */ public static String thenApplyAsync() throws ExecutionException, InterruptedException { System.out.println("主线程2开始"); CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { return "我进入商场, "; },executorService).thenApplyAsync(res -> { System.out.println(Thread.currentThread().getId()); return res += "找到了我要买的商品,"; },executorService).thenApplyAsync(res -> { try { System.out.println(Thread.currentThread().getId()); Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } return res += "准备付款结账,"; },executorService).thenApplyAsync(res -> { System.out.println(Thread.currentThread().getId()); return res += "拿着东西回家!!!"; }); String result = future.get(); System.out.println("主线程2结束, 子线程的结果为:" + result); return result; } }
小结:
thenApply 和 thenApplyAsync 本质上就是将它们串起来了,必须要先完成第一个任务,才能接着做下面的任务
这里的本质区别和前面和之前说的还是一样
但是你如果仔细看了案例代码,你会发现我在里面打印了线程ID. 并且我在测试的时候,尝试将放入自定义线程池和不放入两种情况,实际上 thenApplyAsync 执行的任务线程确实不是一个.
但效果其实和 thenApply 是一样的,都需要等待上一个任务完成。
注意我说的是效果,并非是内部的执行机制。再说就又得进去看源码了...
3.4、thenAccept 和 thenAcceptAsync
如果你不想从你的回调函数中返回任何东西,仅仅想在Future完成后运行一些代码片段,你可以使用thenAccept()
和 thenRun()
方法,这些方法经常在调用链的最末端的最后一个回调函数中使用。
thenAccept
消费处理结果, 接收任务的处理结果,并消费处理,无返回结果。
thenAcceptAsync
则是异步的消费处理结果, 接收任务的处理结果,并消费处理,无返回结果。
package com.nzc; import java.util.concurrent.*; import java.util.function.Consumer; /** * @description: * @author: Yihui Wang * @date: 2022年08月21日 17:21 */ public class ThenAcceptDemo { public static ExecutorService executorService = new ThreadPoolExecutor( 10, 100, 30L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(100), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardOldestPolicy()); public static void main(String[] args) throws Exception { thenAccept(); thenAcceptAsync(); } private static String action1 = ""; public static void thenAccept() { System.out.println("主线程开始"); CompletableFuture.supplyAsync(() -> { try { action1 = "逛jd,想买台电脑~ "; } catch (Exception e) { e.printStackTrace(); } return action1; }).thenApply(string -> { return action1 + "选中了,付款,下单成功!!"; }).thenApply(String -> { return action1 + "等待快递到来"; }).thenAccept((res) -> { System.out.println("子线程全部处理完成,最后调用了 accept,结果为:" + res); }); } private static String action2 = ""; public static void thenAcceptAsync() { System.out.println("主线程开始"); CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { try { action2 = "逛jd,想买台电脑~ "; } catch (Exception e) { e.printStackTrace(); } return action2; }).thenApply(string -> { return action2 + "选中了,付款,下单成功!!"; }).thenApply(String -> { return action2 + "等待快递到来"; }); // 这里不采用链式写法,是因为thenAcceptAsync 无返回值, // 第二个thenAcceptAsync 连接在第一个thenAcceptAsync后,会没有入参值 // 就都拿出来了。 future.thenAcceptAsync((res) -> { System.out.println("线程ID"+Thread.currentThread().getId()+"拿到依任务一二的返回结果,===>异步的执行任务三,晚饭时间了,打算一边看电影"); },executorService); future.thenAcceptAsync((res) -> { System.out.println("线程ID"+Thread.currentThread().getId()+"拿到依任务一二的返回结果,===>异步的执行任务四,一边干饭~"); },executorService); } }
thenAcceptAsync
也是我们今天文章开头中用到的,异步编排式的组合视图结果集。
这一部分平时用的倒是不少,也比较方便~
上面说了这么多,但是万一我们在执行某个任务的时候出现异常该如何处理呢?
别慌,它也封装好了的。
3.5、exceptionally 和 handle
exceptionally
异常处理,出现异常时触发,可以回调给你一个从原始Future
中生成的错误恢复的机会。你可以在这里记录这个异常并返回一个默认值。
一般而言,exceptionally
都是写到方法调用的末尾,以来出来中间过程中会出现的异常。
另外就是 handle 也可以用来处理异常。
public class ExceptionallyDemo { public static void main(String[] args) throws Exception{ System.out.println("主线程开始"); CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { int i= 1/0; System.out.println("子线程执行中"); return i; }).exceptionally(ex -> { System.out.println(ex.getMessage()); return -1; }); System.out.println(future.get()); } } //主线程开始 //java.lang.ArithmeticException: / by zero //-1
public static void main(String[] args) throws Exception { CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> { System.out.println("任务开始"); int i = 0 / 1; return i; }).handle((i, ex) -> { System.out.println("进入 handle 方法"); if (ex != null) { System.out.println("发生了异常,内容为:" + ex.getMessage()); return -1; } else { System.out.println("正常完成,内容为: " + i); return i; } }); }
handle是有入参和带返回值的,入参是之前任务执行的结果。
当然一切具体的使用还是要看业务场景啦
3.6、结果合并
thenCompose
合并两个有依赖关系的 CompletableFutures
的执行结果,有入参有返回值。
它的入参是第一个future
和第一二两个的任何的返回结果。
thenAcceptBoth
则是会将两个任务的执行结果作为方法入参,传递到指定方法中,但无返回值
runAfterBoth
则是不会把执行结果当做方法入参,也没有返回值。
package com.nzc; import java.util.WeakHashMap; import java.util.concurrent.*; /** * @description: * @author: Yihui Wang * @date: 2022年08月21日 17:53 */ public class ThenCombineDemo { public static void main(String[] args) throws Exception { test(); } private static Integer num = 10; public static void test() throws Exception { System.out.println("主线程开始"); //第一步加 10 CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { System.out.println("第一个任务:让num+10;任务开始"); num += 10; return num; }); CompletableFuture<String > future1 = CompletableFuture.supplyAsync(() -> { System.out.println("第二个任务:让num+1;任务开始"); return num + 1; //它的入参是第一个future和第一二两个的任何的返回结果。 }).thenCombine(future,(w,s)->{ System.out.println("任务一的结果==>"+w); System.out.println("任务二的结果==>"+s); return "我是两个任务的合并"+(w+s); }); System.out.println(future.get()); System.out.println(future1.get()); } } /** * 主线程开始 * 第一个任务:让num+10;任务开始 * 第二个任务:让num+1;任务开始 * 任务一的结果==>21 * 任务二的结果==>20 * 20 * 我是两个任务的合并41 */
thenAcceptBoth
package com.nzc; import java.util.WeakHashMap; import java.util.concurrent.*; /** * @description: * @author: Yihui Wang * @date: 2022年08月21日 17:53 */ public class ThenCombineDemo { public static void main(String[] args) throws Exception { test(); } private static Integer num = 10; public static void test() throws Exception { System.out.println("主线程开始"); //第一步加 10 CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { System.out.println("第一个任务:让num+10;任务开始"); num += 10; return num; }); CompletableFuture<Void > future1 = CompletableFuture.supplyAsync(() -> { System.out.println("第二个任务:让num+1;任务开始"); return num + 1; }).thenAcceptBoth(future,(w,s)->{ System.out.println("任务一的结果==>"+w); System.out.println("任务二的结果==>"+s); System.out.println( "我是两个任务的合并"+(w+s)+"但是我没有返回值"); }); System.out.println("任务一的结果==>"+future.get()); // 不采用链式写法,任务二实际上是有返回值,大家看业务场景写即可 System.out.println("任务二后接上thenAcceptBoth方法的结果==>"+future1.get()); } } /** 主线程开始 第一个任务:让num+10;任务开始 第二个任务:让num+1;任务开始 任务一的结果==>21 任务二的结果==>20 我是两个任务的合并41但是我没有返回值 任务一的结果==>20 任务二后接上thenAcceptBoth方法的结果==>null */
runAfterBoth
public static void test2(){ //第一步加 10 CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { System.out.println("第一个任务:让num+10;任务开始"); num += 10; return num; }); CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> { System.out.println("第一个任务:让num+10;任务开始"); num += 10; return num; }); future2.runAfterBoth(future,()->{ System.out.println("不会把执行结果当做方法入参,也没有返回值"); }); }
除了这些外,CompletableFuture
还有我之前案例中就已经用到的allof
和anyOf
3.7、allof 合并多个任务结果
allOf
: 一系列独立的 future
任务,等其所有的任务执行完后做一些事情.
public class CompletableFutureDemo9 { private static Integer num = 10; public static void main(String[] args) throws Exception{ System.out.println("主线程开始"); List<CompletableFuture> list = new ArrayList<>(); CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> { System.out.println("加 10 任务开始"); num += 10; return num; }); list.add(job1); CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> { System.out.println("乘以 10 任务开始"); num = num * 10; return num; }); list.add(job2); CompletableFuture<Integer> job3 = CompletableFuture.supplyAsync(() -> { System.out.println("减以 10 任务开始"); num = num - 10; return num; }); list.add(job3); CompletableFuture<Integer> job4 = CompletableFuture.supplyAsync(() -> { System.out.println("除以 10 任务开始"); num = num / 10; return num; }); list.add(job4); //多任务合并 List<Integer> collect = list.stream().map(CompletableFuture<Integer>::join).collect(Collectors.toList()); System.out.println(collect); } } /**主线程开始 乘以 10 任务开始 加 10 任务开始 减以 10 任务开始 除以 10 任务开始 [110, 100, 100, 10] */
allof的除了在合并结果时经常用到以外,像我们今天案例它也用到了它的get()方法,在那里使用的作用时,阻塞式的等待所有的任务结束,才结束方法的调用。
3.8、anyof
anyOf
: 只要在多个 future
里面有一个返回,整个任务就可以结束,而不需要等到每一个 future 结束
public class CompletableFutureDemo10 { private static Integer num = 10; /** * 先对一个数加 10,然后取平方 * @param args */ public static void main(String[] args) throws Exception{ System.out.println("主线程开始"); CompletableFuture<Integer>[] futures = new CompletableFuture[4]; CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> { try{ Thread.sleep(5000); System.out.println("加 10 任务开始"); num += 10; return num; }catch (Exception e){ return 0; } }); futures[0] = job1; CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> { try{ Thread.sleep(2000); System.out.println("乘以 10 任务开始"); num = num * 10; return num; }catch (Exception e){ return 1; } }); futures[1] = job2; CompletableFuture<Integer> job3 = CompletableFuture.supplyAsync(() -> { try{ Thread.sleep(3000); System.out.println("减以 10 任务开始"); num = num - 10; return num; }catch (Exception e){ return 2; } }); futures[2] = job3; CompletableFuture<Integer> job4 = CompletableFuture.supplyAsync(() -> { try{ Thread.sleep(4000); System.out.println("除以 10 任务开始"); num = num / 10; return num; }catch (Exception e){ return 3; } }); futures[3] = job4; CompletableFuture<Object> future = CompletableFuture.anyOf(futures); System.out.println(future.get()); } } //主线程开始 //乘以 10 任务开始 //100
3.9、注意的小问题
1、一般来讲,如果要使用线程的话,都应该是自定义线程,这点阿里Java开发规范中也有说到。
2、自定义线程池的话,一定要把参数设置合理,这点没啥可说的,都得测,空谈都是大话,线程池的话有一篇美团技术团队的文章,讲的很好。Java线程池实现原理及其在美团业务中的实践
3、今天的案例,我在最后调用了 get()方法,一直阻塞到所有任务完成,所以你在编排的时候,一定要注意你需不需要任务的返回结果,不然很可能会产生一些数据方面问题。
4、关于异常我写到后面心有些浮躁,写的不是非常精细。获取异常信息,future需要获取返回值,才能获取异常信息。
后记
今天最想说的就是 “温故而知新”
这方面的知识在去年,我其实已经学过一遍,但应用场景一少,你就会慢慢忘记它的存在。
另外想要说明的是基础我觉得是十分重要的。
最近在翻阅 Java 8 实战这本书,Lamda表达式一直会写,但是对于那些思想,我一直处于一直很模糊的状态,这次在看书的时候,发现了很多以前不知道的知识,也让自己恍然大悟。