Future
future是java5新加的一个接口,他提供了一种异步并行计算的功能
接口定义了操作异步任务执行的一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务是否执行完毕
目的:异步多线程执行且有返回结果,特点:多线程/有返回/异步任务
补充:Runnable实现的是run方法,没有返回值,没有异常,Callable实现的是call方法,有返回值,需要处理异常
Future接口常用实现类Future Task异步任务
代码实现
1. public class CompletableFutureDemo { 2. public static void main(String[] args) throws ExecutionException, InterruptedException { 3. FutureTask<String> futureTask = new FutureTask<>(new MyThread()); 4. Thread t1 = new Thread(futureTask,"t1"); 5. t1.start(); 6. System.out.println(futureTask.get()); 7. } 8. } 9. 10. 11. 12. class MyThread implements Callable<String>{ 13. 14. @Override 15. public String call() throws Exception { 16. System.out.println("----come in call()"); 17. return "hello Callable"; 18. } 19. }
future的优缺点
NO: 1、get方法容易阻塞:一旦调用,如果计算没有完成,容易程序阻塞
2、isDone轮询:轮询的方式会耗费CPU资源,而且也不见得能及时得到计算结果
小结:future对于结果的获取不是很友好,只能通过阻塞或轮询的方式得到任务结果
注:实际程序中轮询的方式并不比阻塞好,这么写是给用户看的,用户不可能看你的程序就停在那了,这么写可以给个提示,也不会直接异常
CompletableFuture的异步优化思路
背景:对于简单的业务场景使用Future完全OK,但是对于复杂的业务场景,比如:
1、回调通知:应对Future的完成时间,完成了可以告诉我,也就是我们的回调通知,通过轮询的方式去判断任务是否完成这样非常的占用CPU并且代码也不优雅
2、创建异步任务:Future+线程池配合
3、多个任务前后依赖可以组合处理:想将多个异步任务的计算结果组合起来,后一个异步任务的计算结果需要前一个异步任务结果的值,将两个或多个异步计算合成一个异步计算,这几个异步计算相互独立,同时后面这个又依赖前一个处理的结果
4、对计算速度最快:当Future集合中某个任务最快结束时,返回结果,返回第一名处理结果
CompletableFuture提供了一种观察者模式类似的机制,可以让任务完成后通知监听的一方
类架构说明
接口CompletionStage
1、CompletionStage代表异步计算过程中某一个阶段,一个阶段完成以后可能会触发另外一个阶段
2、一个阶段的执行可以是一个Function,Consumer或者Runnable
3、一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发
代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段
类CompletableFuture
1、在java8当中,CompletableFuture提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合ComplttableFuture的方法
2、他可能代表一个明确完成的Future,也有可能代表一个完成阶段,它支持在计算完成以后触发一些函数或执行某个动作
3、它实现了Future和CompletionStage接口
注:从java8开始引入了CompletableFuture,他是Future的功能增强版,减少阻塞和轮询,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法
优点:
1、异步任务结束时,会自动回调某个对象的方法
2、主线程设置好回调后,不再关心异步任务的执行,异步任务之间可以顺序执行
3、异步任务出错时,会自动回调某个对象的方法
函数式接口复习
CompletableFuture常用方法
1、获得结果和触发计算
1. public class CompletableFutureAPIDemo { 2. public static void main(String[] args) { 3. CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> { 4. try { 5. TimeUnit.SECONDS.sleep(1); 6. } catch (InterruptedException e) { 7. e.printStackTrace(); 8. } 9. return "abc"; 10. }); 11. 12. //System.out.println(completableFuture.get()); 阻塞主线程获取结果 13. //System.out.println(completableFuture.get(2L,TimeUnit.SECONDS)); 等待指定时间获取结果 14. //System.out.println(completableFuture.join()); 阻塞线程获取结果 15. System.out.println(completableFuture.getNow("xxx")); //立即获取结果不阻塞,如果没计算完成返回XXX 16. System.out.println(completableFuture.complete("completeValue")+"\t"+completableFuture.join()); 17. //是否打断get方法立刻获得括号值 18. } 19. }
2、对计算结果进行处理
1. public class CompletableFutureAPI2Demo { 2. public static void main(String[] args) { 3. 4. ExecutorService threadPool = Executors.newFixedThreadPool(3); 5. 6. //thenApply 7. //计算结果存在依赖关系,这两个线程串行化 8. //异常相关:由于存在依赖关系(当前步错,不走下一步),当前步骤有异常的话就叫停 9. //handle:区别:有异常也可以往下一步走,根据带的异常参数可以进一步处理 10. CompletableFuture.supplyAsync(() -> { 11. try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();} 12. System.out.println("111"); 13. return 1; 14. },threadPool).handle((f,e) -> { 15. int i = 10/0; 16. System.out.println("222"); 17. return f+2; 18. }).thenApply(f -> { 19. System.out.println("333"); 20. return f+3; 21. }).whenComplete((v,e) -> { 22. if (e == null){ 23. System.out.println("------计算结果:"+v); 24. } 25. }).exceptionally(e -> { 26. e.printStackTrace(); 27. System.out.println("e = " + e.getMessage()); 28. return null; 29. }); 30. threadPool.shutdown(); 31. System.out.println(Thread.currentThread().getName()+"-----主线程先去忙其他任务"); 32. } 33. }
3、对计算结果进行消费
1. //接受任务的处理结果,并消费处理,无返回结果 2. public class CompletableFutureAPI3Demo { 3. public static void main(String[] args) { 4. // CompletableFuture.supplyAsync(() -> { 5. // return 1; 6. // }).thenApply(f -> { 7. // return f+2; 8. // }).thenApply(f -> { 9. // return f+3; 10. // }).thenAccept(System.out::println); 11. 12. //任务之间的顺序执行 13. /** 14. * 1、thenRun:任务A执行完执行任务B,并且B不需要A的结果 15. * 2、thenAccept:任务 A执行完执行B,B需要A的结果,但是任务B无返回值 16. * 3、thenApply:任务A执行完执行B,B需要A的结果,同时任务B有返回值 17. */ 18. 19. System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> {}).join()); 20. System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenAccept(r-> System.out.println("r = " + r)).join()); 21. System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenApply(r -> r+"ResuleB").join()); 22. 23. } 24. }
4、对计算速度进行选用
1. public class CompletableFutureFastDemo { 2. public static void main(String[] args) { 3. CompletableFuture<String> playA = CompletableFuture.supplyAsync(() -> { 4. System.out.println("A come in"); 5. try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();} 6. return "playA"; 7. }); 8. 9. CompletableFuture<String> playB = CompletableFuture.supplyAsync(() -> { 10. System.out.println("B come in"); 11. try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();} 12. return "playB"; 13. }); 14. CompletableFuture<String> result = playA.applyToEither(playB, f -> f + " is winer"); 15. System.out.println(Thread.currentThread().getName()+"\t"+"--------"+result.join()); 16. } 17. }
5、对计算结果进行合并
1. public class CompletableFutureCombineDemo { 2. public static void main(String[] args) { 3. CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> { 4. System.out.println(Thread.currentThread().getName() + "\t ---启动"); 5. try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();} 6. return 10; 7. }); 8. 9. CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> { 10. System.out.println(Thread.currentThread().getName() + "\t ---启动"); 11. try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();} 12. return 20; 13. }); 14. 15. CompletableFuture<Integer> result = completableFuture1.thenCombine(completableFuture2, (x, y) -> { 16. System.out.println("开始两个结果合并:" + x + y); 17. return x + y; 18. }); 19. System.out.println("result.join() = " + result.join()); 20. } 21. }
注:CompletableFuture线程池运行选择
1、没有传入自定义线程池,都用默认线程池ForkJoinPool
2、传入了一个自定义线程池
如果你执行第一个任务的时候,传入了一个自定义线程池
调用thenRun方法执行第二个任务时,第二个任务和第一个任务使用的是同一个线程池
调用thenRunAsync方法执行第二个任务时,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是ForkJoin线程池
备注:
有可能处理太快,系统优化切换原则,直接使用main线程处理
1. public class CompletableFutureWithThreadPoolDemo { 2. public static void main(String[] args) { 3. ExecutorService threadPool = Executors.newFixedThreadPool(5); 4. 5. try { 6. CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> { 7. try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();} 8. System.out.println("1号任务" + "\t" + Thread.currentThread().getName()); 9. return "abcd"; 10. },threadPool).thenRunAsync(() -> { 11. try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();} 12. System.out.println("2号任务" + "\t" + Thread.currentThread().getName()); 13. }).thenRun(() -> { 14. try {TimeUnit.MILLISECONDS.sleep(10);} catch (InterruptedException e) {e.printStackTrace();} 15. System.out.println("3号任务" + "\t" + Thread.currentThread().getName()); 16. }).thenRun(() -> { 17. try {TimeUnit.MILLISECONDS.sleep(10);} catch (InterruptedException e) {e.printStackTrace();} 18. System.out.println("4号任务" + "\t" + Thread.currentThread().getName()); 19. }); 20. System.out.println(completableFuture.get(2L, TimeUnit.SECONDS)); 21. } catch (Exception e) { 22. e.printStackTrace(); 23. }finally { 24. threadPool.shutdown(); 25. } 26. } 27. }