异步
传统单线程环境下,调用函数是同步的,必须等待程序返回结果后,才可进行其他处理。因此为了提高系统整体的并发性能,引入了异步执行~
jdk中已经内置future模式的实现。Future是Java5添加的类,用来描述一个异步计算的结果。可以用isDone方法来检查计算是否完成,或者使用get阻塞住调用线程,直至计算完成返回结果,也可以用cancel方法来停止任务的执行。
Futrue异步模式存在的问题
Future以及相关使用方法提供了异步执行任务的能力,但对于结果的获取却是不方便,只能通过阻塞或轮询的方式得到任务结果。
阻塞的方式与我们理解的异步编程其实是相违背的,而轮询又会耗无谓的CPU资源。而且还不能及时得到计算结果,为什么不能用观察者设计模式当计算结果完成及时通知监听者呢?
很多语言像Node.js,采用回调的方式实现异步编程。Java的一些框架像Netty,自己扩展Java的Future接口,提供了addListener等多个扩展方法。
guava里面也提供了通用的扩展Future: ListenableFuture\SettableFuture以及辅助类Futures等,方便异步编程
Spring4.0也扩展了Futrue,提供了ListenableFuture来addCallback()采用回调函数的形式来提高整体异步性能~
作为正统Java类库,是不是应该加点什么特性,可以加强一下自身库的功能?
JDK8引入中重磅类库:CompletableFuture
Java8里面新增加了一个包含50个方法左右的类:CompletableFuture. 提供了非常强大的Future的扩展功能,可以帮助简化异步编程的复杂性,提供了函数式编程能力,可以通过回调的方式计算处理结果,并且提供了转换和组织CompletableFuture的方法。
JDK1.8才新加入的一个实现类CompletableFuture,实现了Future, CompletionStage两个接口。
CompletableFuture实现了CompletionStage接口的如下策略:
1.为了完成当前的CompletableFuture接口或者其他完成方法的回调函数的线程,提供了非异步的完成操作。
2.没有显式入参Executor的所有async方法都使用ForkJoinPool.commonPool()为了简化监视、调试和跟踪,所有生成的异步任务都是标记接口AsynchronousCompletionTask的实例。
3.所有的CompletionStage方法都是独立于其他共有方法实现的,因此一个方法的行为不会受到子类中其他方法的覆盖
CompletableFuture实现了Futurre接口的如下策略:
1.CompletableFuture无法直接控制完成,所以cancel操作被视为是另一种异常完成形式。方法isCompletedExceptionally可以用来确定一个CompletableFuture是否以任何异常的方式完成。
2.以一个CompletionException为例,方法get()和get(long,TimeUnit)抛出一个ExecutionException,对应CompletionException。为了在大多数上下文中简化用法,这个类还定义了方法join()和getNow,而不是直接在这些情况中直接抛出CompletionException。
CompletableFuture中4个异步执行任务静态方法:
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) { return asyncSupplyStage(asyncPool, supplier); } public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor) { return asyncSupplyStage(screenExecutor(executor), supplier); } public static CompletableFuture<Void> runAsync(Runnable runnable) { return asyncRunStage(asyncPool, runnable); } public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) { return asyncRunStage(screenExecutor(executor), runnable); }
其中supplyAsync用于有返回值的任务,runAsync则用于没有返回值的任务。Executor参数可以手动指定线程池,否则默认ForkJoinPool.commonPool()系统级公共线程池,
注意:这些线程都是Daemon线程,主线程结束Daemon线程不结束,只有JVM关闭时,生命周期终止
主动完成计算
CompletableFuture 类实现了CompletionStage和Future接口,所以还是可以像以前一样通过阻塞或轮询的方式获得结果。尽管这种方式不推荐使用。
如下四个方法都可以获取结果:
public T get() //Futrue的方法 阻塞 public T get(long timeout, TimeUnit unit) //Futrue的方法 阻塞 // 新提供的方法 public T getNow(T valueIfAbsent) //getNow有点特殊,如果结果已经计算完则返回结果或抛异常,否则返回给定的valueIfAbsent的值(此方法有点反人类有木有) public T join() // 返回计算的结果或抛出一个uncheckd异常。 推荐使用
上面4个方法,推荐使用join,还有带超时时间的get方法
CompletableFuture并非一定要交给线程池执行才能实现异步,你可以像下面这样实现异步运行:
public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> completableFuture = new CompletableFuture(); //自己开个线程去执行 执行完把结果告诉completableFuture即可 new Thread(() -> { // 模拟执行耗时任务 System.out.println("task doing..."); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } // 告诉completableFuture任务已经完成 并且把结果告诉completableFuture completableFuture.complete("ok"); //这里把你信任的结果set进去后,所有阻塞的get()方法都能立马苏醒,获得到结果 }).start(); // 获取任务结果,如果没有完成会一直阻塞等待 System.out.println("准备打印结果..."); String result = completableFuture.get(); System.out.println("计算结果:" + result); } 输出: 准备打印结果... task doing... 计算结果:ok
如果没有意外,上面发的代码工作得很正常。但是,如果任务执行过程中产生了异常会怎样呢?如下:只加一句1/0的代码
//自己开个线程去执行 执行完把结果告诉completableFuture即可 new Thread(() -> { // 模拟执行耗时任务 System.out.println("task doing..."); try { Thread.sleep(3000); System.out.println(1 / 0); } catch (InterruptedException e) { e.printStackTrace(); } // 告诉completableFuture任务已经完成 并且把结果告诉completableFuture completableFuture.complete("ok"); }).start();
这种情况下会得到一个相当糟糕的结果:异常会被限制在执行任务的线程的范围内,最终会杀死该守护线程,而主线程,将永远永远阻塞了。
怎么解决呢?
使用get(long timeout, TimeUnit unit)代替get()方法,它使用一个超时参数来避免发生这样的情况。这是一种值得推荐的做法,我们应该尽量在你的代码中添加超时判断的逻辑,避免发生类似的问题。
使用这种方法至少能防止程序永久地等待下去,超时发生时,程序会得到通知发生了TimeoutException 。不过,也因为如此,你不能确定执行任务的线程内到底发生了什么问题(因此自己要做好权衡)。
更好的解决方案是:为了能获取任务线程内发生的异常,你需要使用
CompletableFuture的completeExceptionally方法将导致CompletableFuture内发生问题的异常抛出。这样,当执行任务发生异常时,调用get()方法的线程将会收到一个 ExecutionException异常,该异常接收了一个包含失败原因的Exception 参数。
public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> completableFuture = new CompletableFuture(); //自己开个线程去执行 执行完把结果告诉completableFuture即可 new Thread(() -> { // 模拟执行耗时任务 System.out.println("task doing..."); try { Thread.sleep(3000); System.out.println(1 / 0); //} catch (InterruptedException e) { } catch (Exception e) { // 告诉completableFuture任务发生异常了 completableFuture.completeExceptionally(e); e.printStackTrace(); } // 告诉completableFuture任务已经完成 并且把结果告诉completableFuture completableFuture.complete("ok"); }).start(); // 获取任务结果,如果没有完成会一直阻塞等待 System.out.println("准备打印结果..."); String result = completableFuture.get(); System.out.println("计算结果:" + result); }
这样子,如果内部发生了异常,调用get方法的时候就能得到这个Exception,进而能拿到抛异常的原因了。
说明:若执行了completableFuture.completeExceptionally(e);,那么completableFuture.get()它最终不是把异常获取出来了,而是给throw出来了,阻断主线程。(get()方法是用于获取正常返回值的)
使用案例
在Java8中,CompletableFuture提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合 CompletableFuture 的方法。
它可能代表一个明确完成的Future,也有可能代表一个完成阶段( CompletionStage ),它支持在计算完成以后触发一些函数或执行某些动作。
创建CompletableFuture
四个静态方法(如上),一个空构造函数
whenComplete计算结果完成时的处理
当CompletableFuture的计算结果完成,或者抛出异常的时候,我们可以执行特定的Action。主要是下面的方法:
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action) public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action) public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor) public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
可以看到Action的类型是BiConsumer<? super T,? super Throwable>,它可以处理正常的计算结果,或者异常情况。
方法不以Async结尾,意味着Action使用相同的线程执行,而Async可能会使用其它的线程去执行(如果使用相同的线程池,也可能会被同一个线程选中执行)。
注意这几个方法都会返回CompletableFuture。
CompletableFuture.supplyAsync(() -> 100) .thenApplyAsync(i -> i * 10) .thenApply(i -> i.toString()) .whenComplete((r, e) -> System.out.println(r + "_____" + e)); //1000_____null //若有异常 CompletableFuture.supplyAsync(() -> 1 / 0) .thenApplyAsync(i -> i * 10) .thenApply(i -> i.toString()) .whenComplete((r, e) -> System.out.println(r + "_____" + e)); //null_____java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
Main方法本地测试小细节:
假若我们直接通过这种方式,sleep个几秒:
public static void main(String[] args) { CompletableFuture.supplyAsync(() -> 100) .thenApplyAsync(i -> { try { TimeUnit.SECONDS.sleep(5); System.out.println("this my sleep time"); } catch (InterruptedException e) { e.printStackTrace(); } return i + 1; }) .thenApply(i -> i.toString()) .whenComplete((r, e) -> System.out.println(r + "_____" + e)); }
发现控制台木有任何打印:
很多小伙伴就就开始打断点,发现断点都进不去,囧。
其实这个小细节是干扰众多小伙伴的的地方~
根本原因:因为是异步执行的,所以主线程会线结束,JVM都退出了,自然异步线程也会死掉喽
解决方案:让main线程不这么快退出就行,自己测试的时候我们加上这么一句话就Ok了:
public static void main(String[] args) throws InterruptedException { ... // 让主线程等待 只要这个时间大于你异步线程的时间就成~~~ TimeUnit.SECONDS.sleep(20); }
Tips:对于web环境是不会存在此问题的,因为它的主线程一般情况下永远不会退出~~~~这里只是小伙伴在本地测试上需要注意的一个小细节~