JUC并发编程之CompletableFuture

简介: future是java5新加的一个接口,他提供了一种异步并行计算的功能接口定义了操作异步任务执行的一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务是否执行完毕目的:异步多线程执行且有返回结果,特点:多线程/有返回/异步任务补充:Runnable实现的是run方法,没有返回值,没有异常,Callable实现的是call方法,有返回值,需要处理异常

Future

future是java5新加的一个接口,他提供了一种异步并行计算的功能

接口定义了操作异步任务执行的一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务是否执行完毕

目的:异步多线程执行且有返回结果,特点:多线程/有返回/异步任务

补充:Runnable实现的是run方法,没有返回值,没有异常,Callable实现的是call方法,有返回值,需要处理异常

Future接口常用实现类Future Task异步任务

代码实现

1. public class CompletableFutureDemo {
2. public static void main(String[] args) throws ExecutionException, InterruptedException {
3.         FutureTask<String> futureTask = new FutureTask<>(new MyThread());
4. Thread t1 = new Thread(futureTask,"t1");
5.         t1.start();
6.         System.out.println(futureTask.get());
7.     }
8. }
9. 
10. 
11. 
12. class MyThread implements Callable<String>{
13. 
14. @Override
15. public String call() throws Exception {
16.         System.out.println("----come in call()");
17. return "hello Callable";
18.     }
19. }

 

future的优缺点

NO: 1、get方法容易阻塞:一旦调用,如果计算没有完成,容易程序阻塞

2、isDone轮询:轮询的方式会耗费CPU资源,而且也不见得能及时得到计算结果

小结:future对于结果的获取不是很友好,只能通过阻塞或轮询的方式得到任务结果

注:实际程序中轮询的方式并不比阻塞好,这么写是给用户看的,用户不可能看你的程序就停在那了,这么写可以给个提示,也不会直接异常

CompletableFuture的异步优化思路

背景:对于简单的业务场景使用Future完全OK,但是对于复杂的业务场景,比如:

1、回调通知:应对Future的完成时间,完成了可以告诉我,也就是我们的回调通知,通过轮询的方式去判断任务是否完成这样非常的占用CPU并且代码也不优雅

2、创建异步任务:Future+线程池配合

3、多个任务前后依赖可以组合处理:想将多个异步任务的计算结果组合起来,后一个异步任务的计算结果需要前一个异步任务结果的值,将两个或多个异步计算合成一个异步计算,这几个异步计算相互独立,同时后面这个又依赖前一个处理的结果

4、对计算速度最快:当Future集合中某个任务最快结束时,返回结果,返回第一名处理结果

CompletableFuture提供了一种观察者模式类似的机制,可以让任务完成后通知监听的一方

类架构说明

接口CompletionStage

1、CompletionStage代表异步计算过程中某一个阶段,一个阶段完成以后可能会触发另外一个阶段

2、一个阶段的执行可以是一个Function,Consumer或者Runnable

3、一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发

代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段

类CompletableFuture

1、在java8当中,CompletableFuture提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合ComplttableFuture的方法

2、他可能代表一个明确完成的Future,也有可能代表一个完成阶段,它支持在计算完成以后触发一些函数或执行某个动作

3、它实现了Future和CompletionStage接口

注:从java8开始引入了CompletableFuture,他是Future的功能增强版,减少阻塞和轮询,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法

优点:

1、异步任务结束时,会自动回调某个对象的方法

2、主线程设置好回调后,不再关心异步任务的执行,异步任务之间可以顺序执行

3、异步任务出错时,会自动回调某个对象的方法

函数式接口复习

CompletableFuture常用方法

1、获得结果和触发计算

1. public class CompletableFutureAPIDemo {
2. public static void main(String[] args) {
3.         CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
4. try {
5.                 TimeUnit.SECONDS.sleep(1);
6.             } catch (InterruptedException e) {
7.                 e.printStackTrace();
8.             }
9. return "abc";
10.         });
11. 
12. //System.out.println(completableFuture.get());   阻塞主线程获取结果
13. //System.out.println(completableFuture.get(2L,TimeUnit.SECONDS)); 等待指定时间获取结果
14. //System.out.println(completableFuture.join());  阻塞线程获取结果
15.         System.out.println(completableFuture.getNow("xxx")); //立即获取结果不阻塞,如果没计算完成返回XXX
16.         System.out.println(completableFuture.complete("completeValue")+"\t"+completableFuture.join());
17. //是否打断get方法立刻获得括号值
18.     }
19. }

2、对计算结果进行处理

1. public class CompletableFutureAPI2Demo {
2. public static void main(String[] args) {
3. 
4. ExecutorService threadPool = Executors.newFixedThreadPool(3);
5. 
6. //thenApply
7. //计算结果存在依赖关系,这两个线程串行化
8. //异常相关:由于存在依赖关系(当前步错,不走下一步),当前步骤有异常的话就叫停
9. //handle:区别:有异常也可以往下一步走,根据带的异常参数可以进一步处理
10.         CompletableFuture.supplyAsync(() -> {
11. try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}
12.             System.out.println("111");
13. return 1;
14.         },threadPool).handle((f,e) -> {
15. int i = 10/0;
16.             System.out.println("222");
17. return f+2;
18.         }).thenApply(f -> {
19.             System.out.println("333");
20. return f+3;
21.         }).whenComplete((v,e) -> {
22. if (e == null){
23.                 System.out.println("------计算结果:"+v);
24.             }
25.         }).exceptionally(e -> {
26.             e.printStackTrace();
27.             System.out.println("e = " + e.getMessage());
28. return null;
29.         });
30.         threadPool.shutdown();
31.         System.out.println(Thread.currentThread().getName()+"-----主线程先去忙其他任务");
32.     }
33. }

3、对计算结果进行消费

1. //接受任务的处理结果,并消费处理,无返回结果
2. public class CompletableFutureAPI3Demo {
3. public static void main(String[] args) {
4. //        CompletableFuture.supplyAsync(() -> {
5. //            return 1;
6. //        }).thenApply(f -> {
7. //            return f+2;
8. //        }).thenApply(f -> {
9. //            return f+3;
10. //        }).thenAccept(System.out::println);
11. 
12. //任务之间的顺序执行
13. /**
14.          * 1、thenRun:任务A执行完执行任务B,并且B不需要A的结果
15.          * 2、thenAccept:任务  A执行完执行B,B需要A的结果,但是任务B无返回值
16.          * 3、thenApply:任务A执行完执行B,B需要A的结果,同时任务B有返回值
17.          */
18. 
19.         System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> {}).join());
20.         System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenAccept(r-> System.out.println("r = " + r)).join());
21.         System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenApply(r -> r+"ResuleB").join());
22. 
23.     }
24. }

4、对计算速度进行选用

1. public class CompletableFutureFastDemo {
2. public static void main(String[] args) {
3.         CompletableFuture<String> playA = CompletableFuture.supplyAsync(() -> {
4.             System.out.println("A come in");
5. try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}
6. return "playA";
7.         });
8. 
9.         CompletableFuture<String> playB = CompletableFuture.supplyAsync(() -> {
10.             System.out.println("B come in");
11. try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}
12. return "playB";
13.         });
14.         CompletableFuture<String> result = playA.applyToEither(playB, f -> f  + " is winer");
15.         System.out.println(Thread.currentThread().getName()+"\t"+"--------"+result.join());
16.     }
17. }

5、对计算结果进行合并

1. public class CompletableFutureCombineDemo {
2. public static void main(String[] args) {
3.         CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
4.             System.out.println(Thread.currentThread().getName() + "\t ---启动");
5. try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}
6. return 10;
7.         });
8. 
9.         CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
10.             System.out.println(Thread.currentThread().getName() + "\t ---启动");
11. try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}
12. return 20;
13.         });
14. 
15.         CompletableFuture<Integer> result = completableFuture1.thenCombine(completableFuture2, (x, y) -> {
16.             System.out.println("开始两个结果合并:" + x + y);
17. return x + y;
18.         });
19.         System.out.println("result.join() = " + result.join());
20.     }
21. }

注:CompletableFuture线程池运行选择

1、没有传入自定义线程池,都用默认线程池ForkJoinPool

2、传入了一个自定义线程池

如果你执行第一个任务的时候,传入了一个自定义线程池

调用thenRun方法执行第二个任务时,第二个任务和第一个任务使用的是同一个线程池

调用thenRunAsync方法执行第二个任务时,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是ForkJoin线程池

备注:

有可能处理太快,系统优化切换原则,直接使用main线程处理

1. public class CompletableFutureWithThreadPoolDemo {
2. public static void main(String[] args) {
3. ExecutorService threadPool = Executors.newFixedThreadPool(5);
4. 
5. try {
6.             CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> {
7. try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
8.                 System.out.println("1号任务" + "\t" + Thread.currentThread().getName());
9. return "abcd";
10.             },threadPool).thenRunAsync(() -> {
11. try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
12.                 System.out.println("2号任务" + "\t" + Thread.currentThread().getName());
13.             }).thenRun(() -> {
14. try {TimeUnit.MILLISECONDS.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}
15.                 System.out.println("3号任务" + "\t" + Thread.currentThread().getName());
16.             }).thenRun(() -> {
17. try {TimeUnit.MILLISECONDS.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}
18.                 System.out.println("4号任务" + "\t" + Thread.currentThread().getName());
19.             });
20.             System.out.println(completableFuture.get(2L, TimeUnit.SECONDS));
21.         } catch (Exception e) {
22.             e.printStackTrace();
23.         }finally {
24.             threadPool.shutdown();
25.         }
26.     }
27. }


目录
相关文章
|
8月前
|
安全 Java 编译器
高并发编程之什么是 JUC
高并发编程之什么是 JUC
68 1
|
监控 Java API
并发编程 - CompletableFuture
并发编程 - CompletableFuture
90 0
|
资源调度
JUC并发编程之同步器(Semaphore、CountDownLatch、CyclicBarrier、Exchanger、CompletableFuture)附带相关面试题
1.Semaphore(资源调度) 2.CountDownLatch(子线程优先) 3.CyclicBarrier(栅栏) 4.Exchanger(公共交换区) 5.CompletableFuture(异步编程)
189 0
|
7月前
|
安全 算法 Java
|
8月前
|
Java 编译器
JUC并发编程之CompletableFuture详解
JUC并发编程中的Future接口是Java 5中引入的一种异步编程机制,用于表示一个可能在未来完成的计算结果。它允许我们提交一个任务给线程池或其他执行器执行,并且可以通过Future对象获取任务执行的结果或者判断任务是否已经完成。
179 0
|
并行计算 Java 应用服务中间件
JUC并发编程超详细详解篇(一)
JUC并发编程超详细详解篇
1693 1
JUC并发编程超详细详解篇(一)
|
安全 Java 调度
JUC并发编程(上)
JUC并发编程(上)
83 0
|
存储 缓存 监控
JUC并发编程(下)
JUC并发编程(下)
48 0
|
存储 Dubbo Java
JUC第二十五讲:JUC线程池-CompletableFuture 实现原理与实践
JUC第二十五讲:JUC线程池-CompletableFuture 实现原理与实践
179 0
|
安全 Java 数据库
JUC第二十二讲:JUC线程池-FutureTask详解
JUC第二十二讲:JUC线程池-FutureTask详解