多线程一直Java开发中的难点,也是面试中的常客,趁着还有时间,打算巩固一下JUC方面知识,我想机会随处可见,但始终都是留给有准备的人的,希望我们都能加油!!!
沉下去,再浮上来
,我想我们会变的不一样的。
一、什么是CompletableFuture?
在Java中CompletableFuture
用于异步编程,异步通常意味着非阻塞,可以使我们的任务单独运行在与主线程分离的其他线程中,并且通过回调可以在主线程中得到异步任务的执行状态,是否完成,和是否异常等信息。
在这种方式中,主线程不会被阻塞,因为子线程是另外一条线程在执行,所以不需要一直等到子线程完成。主线程就可以并行的执行其他任务。这种并行方式,可以极大的提供程序性能。
CompletableFuture
实现了 Future
, CompletionStage
接口。
- 实现了
Future
接口CompletableFuture
就可以兼容现在有线程池框架;
CompletionStage
接口是异步编程的接口抽象,里面定义多种异步方法,实现了CompletionStage
多种抽象方法和Future
并与一起使用,从而才打造出了强大的CompletableFuture 类。
二、Future 与 CompletableFuture
CompletableFuture
是 Future
API的扩展。
Future
接口源码上说明:
Future表示异步计算的结果。 提供了检查计算是否完成、等待计算完成以及检索计算结果的方法。 结果只能在计算完成后使用get方法检索,必要时阻塞,直到它准备好。 取消由cancel方法执行。 提供了其他方法来确定任务是正常完成还是被取消。 一旦计算完成,就不能取消计算。 --来自谷歌翻译
Future 的主要缺点如下:
(1)不能够手动的主动给完成任务(即不能手动的主动结束任务)
(2)Future 的结果在非阻塞的情况下,不能执行更进一步的操作
Future
不会通知你它已经完成了,它提供了一个阻塞的get()
方法通知你结果。就是它完成了,你不会被通知,只能主动去询问它。
(3)不能够支持链式调用
- 就是不能将上一个
Future
的计算结果传递给下一个Future
使用,即不能构成像Web中的Filter模式一样.
(4)不支持多个 Future 合并
- 就是不能将多个
Future
合并起来。
(5)不支持异常处理
Future
的 API 没有任何的异常处理的 api,所以运行时,很有可能无法定位到错误。
- Future API:
public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); //尝试取消此任务的执行。 boolean isCancelled();//如果此任务在正常完成之前被取消,则返回true boolean isDone(); //如果此任务完成,则返回true 。 完成可能是由于正常终止、异常或取消——在所有这些情况下,此方法将返回true V get() throws InterruptedException, ExecutionException; //获得任务计算结果 V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;//可等待多少时间去获得任务计算结果 }
三、应用
3.1、创建CompletableFuture对象
CompletableFuture
提供了四个静态方法用来创建CompletableFuture对象:
//runAsync 返回void 函数第二个参数表示是用我们自己创建的线程池,否则采用默认的ForkJoinPool.commonPool() public static CompletableFuture<Void> runAsync(Runnable runnable) public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) //supplyAsync 异步返回一个结果 函数第二个参数表示是用我们自己创建的线程池,否则采用默认的ForkJoinPool.commonPool() //Supplier 是一个函数式接口,代表是一个生成者的意思 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
3.2、场景一:主动完成任务
场景:主线程里面创建一个 CompletableFuture
,然后主线程调用 get 方法会 阻塞,最后我们在一个子线程中使其终止。
/** * @Author: crush * @Date: 2021-08-23 9:08 * version 1.0 */ public class CompletableFutureDemo1 { /** * 主线程里面创建一个 CompletableFuture,然后主线程调用 get 方法会阻塞,最后我们在一个子线程中使其终止 * * @param args */ public static void main(String[] args) throws Exception { CompletableFuture<String> future = new CompletableFuture<>(); new Thread(() -> { try { System.out.println(Thread.currentThread().getName() + "子线程开始干活"); //子线程睡 5 秒 Thread.sleep(5000); // //在子线程中完成主线程 如果注释掉这一行代码将会一直停住 future.complete("success"); } catch (Exception e) { e.printStackTrace(); } }, "A").start(); //主线程调用 get 方法阻塞 System.out.println("主线程调用 get 方法获取结果为: " + future.get()); System.out.println("主线程完成,阻塞结束!!!!!!"); } }
3.3、场景二:没有返回值的异步任务
runAsync:返回一个新的 CompletableFuture
,它在运行给定操作后由在ForkJoinPool.commonPool()运行的任务异步完成。
如果你想异步的运行一个后台任务并且不需要任务返回结果,就可以使用runAsync
/** * @Author: crush * @Date: 2021-08-23 9:08 * version 1.0 */ public class CompletableFutureDemo2 { /** * 没有返回值的异步任务 * * @param args */ public static void main(String[] args) throws Exception { System.out.println("主线程开始"); //运行一个没有返回值的异步任务 CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { try { System.out.println("子线程启动干活"); Thread.sleep(5000); System.out.println("子线程完成"); } catch (Exception e) { e.printStackTrace(); } }); //主线程阻塞 future.get(); System.out.println("主线程结束"); } }
3.4、场景三:有返回值的异步任务
supplyAsync:返回任务结果。
CompletableFuture.supplyAsync()
它持有supplier
并且返回CompletableFuture
,T
是通过调用 传入的supplier取得的值的类型。
Supplier
是一个简单的函数式接口,表示supplier的结果。它有一个get()
方法,该方法可以写入你的后台任务中,并且返回结果。
public static <T> CompletableFuture<T> supplyAsync(Supplier<T> supplier) { return asyncSupplyStage(ASYNC_POOL, supplier); }
/** * @Author: crush * @Date: 2021-08-23 9:08 * version 1.0 */ public class CompletableFutureDemo2 { /** * 有返回值的异步任务 * * @param args */ public static void main(String[] args) throws Exception { System.out.println("主线程开始"); //运行一个没有返回值的异步任务 CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { try { System.out.println("子线程启动干活"); Thread.sleep(5000); } catch (Exception e) { e.printStackTrace(); } return "子线程任务完成"; }); //主线程阻塞 System.out.println(future.get()); System.out.println("主线程结束"); } } /** * 主线程开始 * 子线程启动干活 * 子线程任务完成 * 主线程结束 */
3.5、场景四:线程串行化
当一个线程依赖另一个线程时,可以使用 thenApply
方法来把这两个线程串行化。
/** * @Author: crush * @Date: 2021-08-23 9:08 * version 1.0 */ public class CompletableFutureDemo4 { private static String action=""; /** * 线程依赖 * 1、我到了烧烤店, * 2、开始点烧烤 * 3、和朋友次完烧烤 ,给女朋友带奶茶回去 * @param args */ public static void main(String[] args) throws Exception { System.out.println("主线程开始"); CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { action="和朋友一起去次烧烤!!!! "; return action; }).thenApply(string -> { return action+="到店里——>开始点烧烤!!"; }).thenApply(String->{ return action+="和朋友们次完烧烤,给女朋友带杯奶茶回去!!"; }); String str = future.get(); System.out.println("主线程结束, 子线程的结果为:" + str); } } /** 主线程开始 主线程结束, 子线程的结果为:和朋友一起去次烧烤!!!到店里——>开始点烧烤!!和朋友们次完烧烤,给女朋友带杯奶茶回去!! */
3.6、场景五:thenAccept 消费处理结果
如果你不想从你的回调函数中返回任何东西,仅仅想在Future完成后运行一些代码片段,你可以使用thenAccept()
和 thenRun()
方法,这些方法经常在调用链的最末端的最后一个回调函数中使用。
thenAccept
消费处理结果, 接收任务的处理结果,并消费处理,无返回结果。
/** * @Author: crush * @Date: 2021-08-23 9:08 * version 1.0 */ public class CompletableFutureDemo5 { private static String action = ""; public static void main(String[] args) throws Exception { System.out.println("主线程开始"); CompletableFuture.supplyAsync(() -> { try { action = "逛淘宝,想买双鞋 "; } catch (Exception e) { e.printStackTrace(); } return action; }).thenApply(string -> { return action + "选中了,下单成功!!"; }).thenApply(String -> { return action + "等待快递到来"; }).thenAccept(new Consumer<String>() { @Override public void accept(String s) { System.out.println("子线程全部处理完成,最后调用了 accept,结果为:" + s); } }); } } /** 主线程开始 子线程全部处理完成,最后调用了 accept,结果为:逛淘宝,想买双鞋 等待快递到来 */
3.7、场景六:异常处理
exceptionally
异常处理,出现异常时触发,可以回调给你一个从原始Future
中生成的错误恢复的机会。你可以在这里记录这个异常并返回一个默认值。
/** * @Author: crush * @Date: 2021-08-23 9:08 * version 1.0 */ public class CompletableFutureDemo6 { public static void main(String[] args) throws Exception{ System.out.println("主线程开始"); CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { int i= 1/0; System.out.println("子线程执行中"); return i; }).exceptionally(ex -> { System.out.println(ex.getMessage()); return -1; }); System.out.println(future.get()); } } /** * 主线程开始 * java.lang.ArithmeticException: / by zero * -1 */
使用 handle() 方法处理异常API提供了一个更通用的方法 - handle()
从异常恢复,无论一个异常是否发生它都会被调用
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> { System.out.println("任务开始"); int i=0/1; return i; }).handle((i,ex) -> { System.out.println("进入 handle 方法"); if (ex != null) { System.out.println("发生了异常,内容为:" + ex.getMessage()); return -1; } else { System.out.println("正常完成,内容为: " + i); return i; } });
3.8、场景七: 结果合并
thenCompose
合并两个有依赖关系的 CompletableFutures
的执行结果
/** * @Author: crush * @Date: 2021-08-23 9:08 * version 1.0 */ public class CompletableFutureDemo7 { private static Integer num = 10; public static void main(String[] args) throws Exception { System.out.println("主线程开始"); //第一步加 10 CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { System.out.println("让num+10;任务开始"); num += 10; return num; }); //合并 CompletableFuture<Integer> future1 = future.thenCompose(i -> //再来一个 CompletableFuture CompletableFuture.supplyAsync(() -> { return i + 1; })); System.out.println(future.get()); System.out.println(future1.get()); } } /** * 主线程开始 * 让num+10;任务开始 * 20 * 21 */
thenCombine
合并两个没有依赖关系的 CompletableFutures
任务
package com.crush.juc09; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.function.BiFunction; /** * @Author: crush * @Date: 2021-08-23 9:08 * version 1.0 */ public class CompletableFutureDemo8 { private static Integer sum = 0; private static Integer count = 1; public static void main(String[] args) throws Exception{ System.out.println("主线程开始"); CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> { System.out.println("从1+...+50开始"); for (int i=1;i<=50;i++){ sum+=i; } System.out.println("sum::"+sum); return sum; }); CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> { System.out.println("从1*...*10开始"); for (int i=1;i<=10;i++){ count=count*i; } System.out.println("count::"+count); return count; }); //合并两个结果 CompletableFuture<Object> future = job1.thenCombine(job2, new BiFunction<Integer, Integer, List<Integer>>() { @Override public List<Integer> apply(Integer a, Integer b) { List<Integer> list = new ArrayList<>(); list.add(a); list.add(b); return list; } }); System.out.println("合并结果为:" + future.get()); } } /** 主线程开始 从1*...*10开始 从1+...+50开始 sum::1275 count::3628800 合并结果为:[1275, 3628800] */
3.9、场景八:合并多个任务的结果
allOf 与 anyOf
allOf
: 一系列独立的 future
任务,等其所有的任务执行完后做一些事情
/** * @Author: crush * @Date: 2021-08-23 9:08 * version 1.0 */ public class CompletableFutureDemo9 { private static Integer num = 10; public static void main(String[] args) throws Exception{ System.out.println("主线程开始"); List<CompletableFuture> list = new ArrayList<>(); CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> { System.out.println("加 10 任务开始"); num += 10; return num; }); list.add(job1); CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> { System.out.println("乘以 10 任务开始"); num = num * 10; return num; }); list.add(job2); CompletableFuture<Integer> job3 = CompletableFuture.supplyAsync(() -> { System.out.println("减以 10 任务开始"); num = num - 10; return num; }); list.add(job3); CompletableFuture<Integer> job4 = CompletableFuture.supplyAsync(() -> { System.out.println("除以 10 任务开始"); num = num / 10; return num; }); list.add(job4); //多任务合并 List<Integer> collect = list.stream().map(CompletableFuture<Integer>::join).collect(Collectors.toList()); System.out.println(collect); } } /**主线程开始 乘以 10 任务开始 加 10 任务开始 减以 10 任务开始 除以 10 任务开始 [110, 100, 100, 10] */
anyOf
: 只要在多个 future
里面有一个返回,整个任务就可以结束,而不需要等到每一个 future 结束
package com.crush.juc09; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; /** * @Author: crush * @Date: 2021-08-23 9:08 * version 1.0 */ public class CompletableFutureDemo10 { private static Integer num = 10; /** * 先对一个数加 10,然后取平方 * @param args */ public static void main(String[] args) throws Exception{ System.out.println("主线程开始"); CompletableFuture<Integer>[] futures = new CompletableFuture[4]; CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> { try{ Thread.sleep(5000); System.out.println("加 10 任务开始"); num += 10; return num; }catch (Exception e){ return 0; } }); futures[0] = job1; CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> { try{ Thread.sleep(2000); System.out.println("乘以 10 任务开始"); num = num * 10; return num; }catch (Exception e){ return 1; } }); futures[1] = job2; CompletableFuture<Integer> job3 = CompletableFuture.supplyAsync(() -> { try{ Thread.sleep(3000); System.out.println("减以 10 任务开始"); num = num - 10; return num; }catch (Exception e){ return 2; } }); futures[2] = job3; CompletableFuture<Integer> job4 = CompletableFuture.supplyAsync(() -> { try{ Thread.sleep(4000); System.out.println("除以 10 任务开始"); num = num / 10; return num; }catch (Exception e){ return 3; } }); futures[3] = job4; CompletableFuture<Object> future = CompletableFuture.anyOf(futures); System.out.println(future.get()); } } //主线程开始 //乘以 10 任务开始 //100
四、小结
本文只是做了一点简单介绍,还需要大家更深入的了解。
🌈自言自语
最近又开始了JUC的学习,感觉Java内容真的很多,但是为了能够走的更远,还是觉得应该需要打牢一下基础。
最近在持续更新中,如果你觉得对你有所帮助,也感兴趣的话,关注我吧,让我们
一起学习,一起讨论吧。
你好,我是博主宁在春
,Java学习路上的一颗小小的种子,也希望有一天能扎根长成苍天大树。
希望与君共勉
😁
我们:待别时相见时,都已有所成。