【小家java】Java8新特性之---CompletableFuture的系统讲解和实例演示(使用CompletableFuture构建异步应用)(上)

简介: 【小家java】Java8新特性之---CompletableFuture的系统讲解和实例演示(使用CompletableFuture构建异步应用)(上)

异步


传统单线程环境下,调用函数是同步的,必须等待程序返回结果后,才可进行其他处理。因此为了提高系统整体的并发性能,引入了异步执行~


jdk中已经内置future模式的实现。Future是Java5添加的类,用来描述一个异步计算的结果。可以用isDone方法来检查计算是否完成,或者使用get阻塞住调用线程,直至计算完成返回结果,也可以用cancel方法来停止任务的执行。

Futrue异步模式存在的问题


Future以及相关使用方法提供了异步执行任务的能力,但对于结果的获取却是不方便,只能通过阻塞或轮询的方式得到任务结果。


阻塞的方式与我们理解的异步编程其实是相违背的,而轮询又会耗无谓的CPU资源。而且还不能及时得到计算结果,为什么不能用观察者设计模式当计算结果完成及时通知监听者呢?


很多语言像Node.js,采用回调的方式实现异步编程。Java的一些框架像Netty,自己扩展Java的Future接口,提供了addListener等多个扩展方法。


guava里面也提供了通用的扩展Future: ListenableFuture\SettableFuture以及辅助类Futures等,方便异步编程


Spring4.0也扩展了Futrue,提供了ListenableFuture来addCallback()采用回调函数的形式来提高整体异步性能~


作为正统Java类库,是不是应该加点什么特性,可以加强一下自身库的功能?

JDK8引入中重磅类库:CompletableFuture


Java8里面新增加了一个包含50个方法左右的类:CompletableFuture. 提供了非常强大的Future的扩展功能,可以帮助简化异步编程的复杂性,提供了函数式编程能力,可以通过回调的方式计算处理结果,并且提供了转换和组织CompletableFuture的方法。


JDK1.8才新加入的一个实现类CompletableFuture,实现了Future, CompletionStage两个接口。


CompletableFuture实现了CompletionStage接口的如下策略:


1.为了完成当前的CompletableFuture接口或者其他完成方法的回调函数的线程,提供了非异步的完成操作。


2.没有显式入参Executor的所有async方法都使用ForkJoinPool.commonPool()为了简化监视、调试和跟踪,所有生成的异步任务都是标记接口AsynchronousCompletionTask的实例。


3.所有的CompletionStage方法都是独立于其他共有方法实现的,因此一个方法的行为不会受到子类中其他方法的覆盖


CompletableFuture实现了Futurre接口的如下策略:


1.CompletableFuture无法直接控制完成,所以cancel操作被视为是另一种异常完成形式。方法isCompletedExceptionally可以用来确定一个CompletableFuture是否以任何异常的方式完成。


2.以一个CompletionException为例,方法get()和get(long,TimeUnit)抛出一个ExecutionException,对应CompletionException。为了在大多数上下文中简化用法,这个类还定义了方法join()和getNow,而不是直接在这些情况中直接抛出CompletionException。


CompletableFuture中4个异步执行任务静态方法:


public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
        return asyncSupplyStage(asyncPool, supplier);
    }
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor) {
    return asyncSupplyStage(screenExecutor(executor), supplier);
}
public static CompletableFuture<Void> runAsync(Runnable runnable) {
    return asyncRunStage(asyncPool, runnable);
}
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) {
    return asyncRunStage(screenExecutor(executor), runnable);
}


其中supplyAsync用于有返回值的任务,runAsync则用于没有返回值的任务。Executor参数可以手动指定线程池,否则默认ForkJoinPool.commonPool()系统级公共线程池,

注意:这些线程都是Daemon线程,主线程结束Daemon线程不结束,只有JVM关闭时,生命周期终止


主动完成计算


CompletableFuture 类实现了CompletionStage和Future接口,所以还是可以像以前一样通过阻塞或轮询的方式获得结果。尽管这种方式不推荐使用。


如下四个方法都可以获取结果:


public T  get()  //Futrue的方法 阻塞
public T  get(long timeout, TimeUnit unit) //Futrue的方法 阻塞
// 新提供的方法
public T  getNow(T valueIfAbsent) //getNow有点特殊,如果结果已经计算完则返回结果或抛异常,否则返回给定的valueIfAbsent的值(此方法有点反人类有木有)
public T  join() // 返回计算的结果或抛出一个uncheckd异常。 推荐使用


上面4个方法,推荐使用join,还有带超时时间的get方法


CompletableFuture并非一定要交给线程池执行才能实现异步,你可以像下面这样实现异步运行:


     public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> completableFuture = new CompletableFuture();
        //自己开个线程去执行 执行完把结果告诉completableFuture即可
        new Thread(() -> {
            // 模拟执行耗时任务
            System.out.println("task doing...");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 告诉completableFuture任务已经完成 并且把结果告诉completableFuture
            completableFuture.complete("ok"); //这里把你信任的结果set进去后,所有阻塞的get()方法都能立马苏醒,获得到结果
        }).start();
        // 获取任务结果,如果没有完成会一直阻塞等待
        System.out.println("准备打印结果...");
        String result = completableFuture.get();
        System.out.println("计算结果:" + result);
    }
输出:
准备打印结果...
task doing...
计算结果:ok


如果没有意外,上面发的代码工作得很正常。但是,如果任务执行过程中产生了异常会怎样呢?如下:只加一句1/0的代码


 //自己开个线程去执行 执行完把结果告诉completableFuture即可
        new Thread(() -> {
            // 模拟执行耗时任务
            System.out.println("task doing...");
            try {
                Thread.sleep(3000);
                System.out.println(1 / 0);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 告诉completableFuture任务已经完成 并且把结果告诉completableFuture
            completableFuture.complete("ok");
        }).start();


这种情况下会得到一个相当糟糕的结果:异常会被限制在执行任务的线程的范围内,最终会杀死该守护线程,而主线程,将永远永远阻塞了。


怎么解决呢?


   使用get(long timeout, TimeUnit unit)代替get()方法,它使用一个超时参数来避免发生这样的情况。这是一种值得推荐的做法,我们应该尽量在你的代码中添加超时判断的逻辑,避免发生类似的问题。


使用这种方法至少能防止程序永久地等待下去,超时发生时,程序会得到通知发生了TimeoutException 。不过,也因为如此,你不能确定执行任务的线程内到底发生了什么问题(因此自己要做好权衡)。


   更好的解决方案是:为了能获取任务线程内发生的异常,你需要使用

CompletableFuture的completeExceptionally方法将导致CompletableFuture内发生问题的异常抛出。这样,当执行任务发生异常时,调用get()方法的线程将会收到一个 ExecutionException异常,该异常接收了一个包含失败原因的Exception 参数。


    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> completableFuture = new CompletableFuture();
        //自己开个线程去执行 执行完把结果告诉completableFuture即可
        new Thread(() -> {
            // 模拟执行耗时任务
            System.out.println("task doing...");
            try {
                Thread.sleep(3000);
                System.out.println(1 / 0);
            //} catch (InterruptedException e) {
            } catch (Exception e) {
                // 告诉completableFuture任务发生异常了
                completableFuture.completeExceptionally(e);
                e.printStackTrace();
            }
            // 告诉completableFuture任务已经完成 并且把结果告诉completableFuture
            completableFuture.complete("ok");
        }).start();
        // 获取任务结果,如果没有完成会一直阻塞等待
        System.out.println("准备打印结果...");
        String result = completableFuture.get();
        System.out.println("计算结果:" + result);
    }


这样子,如果内部发生了异常,调用get方法的时候就能得到这个Exception,进而能拿到抛异常的原因了。


说明:若执行了completableFuture.completeExceptionally(e);,那么completableFuture.get()它最终不是把异常获取出来了,而是给throw出来了,阻断主线程。(get()方法是用于获取正常返回值的)

使用案例


在Java8中,CompletableFuture提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合 CompletableFuture 的方法。


它可能代表一个明确完成的Future,也有可能代表一个完成阶段( CompletionStage ),它支持在计算完成以后触发一些函数或执行某些动作。


创建CompletableFuture


四个静态方法(如上),一个空构造函数


whenComplete计算结果完成时的处理


当CompletableFuture的计算结果完成,或者抛出异常的时候,我们可以执行特定的Action。主要是下面的方法:


public CompletableFuture<T>   whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T>   whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T>   whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T>     exceptionally(Function<Throwable,? extends T> fn)


可以看到Action的类型是BiConsumer<? super T,? super Throwable>,它可以处理正常的计算结果,或者异常情况。

方法不以Async结尾,意味着Action使用相同的线程执行,而Async可能会使用其它的线程去执行(如果使用相同的线程池,也可能会被同一个线程选中执行)。


注意这几个方法都会返回CompletableFuture。


CompletableFuture.supplyAsync(() -> 100)
                .thenApplyAsync(i -> i * 10)
                .thenApply(i -> i.toString())
                .whenComplete((r, e) -> System.out.println(r + "_____" + e)); //1000_____null
        //若有异常
        CompletableFuture.supplyAsync(() -> 1 / 0)
                .thenApplyAsync(i -> i * 10)
                .thenApply(i -> i.toString())
                .whenComplete((r, e) -> System.out.println(r + "_____" + e)); //null_____java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero


Main方法本地测试小细节:


假若我们直接通过这种方式,sleep个几秒:


    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> 100)
                .thenApplyAsync(i -> {
                    try {
                        TimeUnit.SECONDS.sleep(5);
                        System.out.println("this my sleep time");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return i + 1;
                })
                .thenApply(i -> i.toString())
                .whenComplete((r, e) -> System.out.println(r + "_____" + e));
    }


发现控制台木有任何打印:


image.png


很多小伙伴就就开始打断点,发现断点都进不去,囧。

其实这个小细节是干扰众多小伙伴的的地方~


根本原因:因为是异步执行的,所以主线程会线结束,JVM都退出了,自然异步线程也会死掉喽


解决方案:让main线程不这么快退出就行,自己测试的时候我们加上这么一句话就Ok了:


    public static void main(String[] args) throws InterruptedException {
      ...
      // 让主线程等待 只要这个时间大于你异步线程的时间就成~~~
      TimeUnit.SECONDS.sleep(20);
    }


Tips:对于web环境是不会存在此问题的,因为它的主线程一般情况下永远不会退出~~~~这里只是小伙伴在本地测试上需要注意的一个小细节~

相关文章
|
2天前
|
Java 编译器 开发者
Java中的this关键字详解:深入理解与应用
本文深入解析了Java中`this`关键字的多种用法
28 9
|
2天前
|
Java 应用服务中间件 API
【潜意识Java】javaee中的SpringBoot在Java 开发中的应用与详细分析
本文介绍了 Spring Boot 的核心概念和使用场景,并通过一个实战项目演示了如何构建一个简单的 RESTful API。
21 5
|
2天前
|
人工智能 自然语言处理 搜索推荐
【潜意识Java】了解并详细分析Java与AIGC的结合应用和使用方式
本文介绍了如何将Java与AIGC(人工智能生成内容)技术结合,实现智能文本生成。
21 5
|
2天前
|
SQL Java 数据库连接
【潜意识Java】深入理解MyBatis,从基础到高级的深度细节应用
本文详细介绍了MyBatis,一个轻量级的Java持久化框架。内容涵盖MyBatis的基本概念、配置与环境搭建、基础操作(如创建实体类、Mapper接口及映射文件)以及CRUD操作的实现。此外,还深入探讨了高级特性,包括动态SQL和缓存机制。通过代码示例,帮助开发者更好地掌握MyBatis的使用技巧,提升数据库操作效率。总结部分强调了MyBatis的优势及其在实际开发中的应用价值。
11 1
|
搜索推荐 Java 程序员
Java 12都有哪些新特性?
Java 12都有哪些新特性?
166 0
Java 12都有哪些新特性?
|
Java API Apache
Java 9都有哪些新特性?
Java 9都有哪些新特性?
130 0
|
JavaScript 前端开发 Java
Java 10都有哪些新特性?
Java 10都有哪些新特性?
155 0
|
Java API 数据安全/隐私保护
Java 11都有哪些新特性?
Java 11都有哪些新特性?
128 0
|
前端开发 Java 编译器
Java 14 有哪些新特性?
记录为 Java 提供了一种正确实现数据类的能力,不再需要为实现数据类而编写冗长的代码。下面就来看看 Java 14 中的记录有哪些新特性。
|
16天前
|
监控 Java
java异步判断线程池所有任务是否执行完
通过上述步骤,您可以在Java中实现异步判断线程池所有任务是否执行完毕。这种方法使用了 `CompletionService`来监控任务的完成情况,并通过一个独立线程异步检查所有任务的执行状态。这种设计不仅简洁高效,还能确保在大量任务处理时程序的稳定性和可维护性。希望本文能为您的开发工作提供实用的指导和帮助。
74 17