【小家java】Java8新特性之---CompletableFuture的系统讲解和实例演示(使用CompletableFuture构建异步应用)(上)

简介: 【小家java】Java8新特性之---CompletableFuture的系统讲解和实例演示(使用CompletableFuture构建异步应用)(上)

异步


传统单线程环境下,调用函数是同步的,必须等待程序返回结果后,才可进行其他处理。因此为了提高系统整体的并发性能,引入了异步执行~


jdk中已经内置future模式的实现。Future是Java5添加的类,用来描述一个异步计算的结果。可以用isDone方法来检查计算是否完成,或者使用get阻塞住调用线程,直至计算完成返回结果,也可以用cancel方法来停止任务的执行。

Futrue异步模式存在的问题


Future以及相关使用方法提供了异步执行任务的能力,但对于结果的获取却是不方便,只能通过阻塞或轮询的方式得到任务结果。


阻塞的方式与我们理解的异步编程其实是相违背的,而轮询又会耗无谓的CPU资源。而且还不能及时得到计算结果,为什么不能用观察者设计模式当计算结果完成及时通知监听者呢?


很多语言像Node.js,采用回调的方式实现异步编程。Java的一些框架像Netty,自己扩展Java的Future接口,提供了addListener等多个扩展方法。


guava里面也提供了通用的扩展Future: ListenableFuture\SettableFuture以及辅助类Futures等,方便异步编程


Spring4.0也扩展了Futrue,提供了ListenableFuture来addCallback()采用回调函数的形式来提高整体异步性能~


作为正统Java类库,是不是应该加点什么特性,可以加强一下自身库的功能?

JDK8引入中重磅类库:CompletableFuture


Java8里面新增加了一个包含50个方法左右的类:CompletableFuture. 提供了非常强大的Future的扩展功能,可以帮助简化异步编程的复杂性,提供了函数式编程能力,可以通过回调的方式计算处理结果,并且提供了转换和组织CompletableFuture的方法。


JDK1.8才新加入的一个实现类CompletableFuture,实现了Future, CompletionStage两个接口。


CompletableFuture实现了CompletionStage接口的如下策略:


1.为了完成当前的CompletableFuture接口或者其他完成方法的回调函数的线程,提供了非异步的完成操作。


2.没有显式入参Executor的所有async方法都使用ForkJoinPool.commonPool()为了简化监视、调试和跟踪,所有生成的异步任务都是标记接口AsynchronousCompletionTask的实例。


3.所有的CompletionStage方法都是独立于其他共有方法实现的,因此一个方法的行为不会受到子类中其他方法的覆盖


CompletableFuture实现了Futurre接口的如下策略:


1.CompletableFuture无法直接控制完成,所以cancel操作被视为是另一种异常完成形式。方法isCompletedExceptionally可以用来确定一个CompletableFuture是否以任何异常的方式完成。


2.以一个CompletionException为例,方法get()和get(long,TimeUnit)抛出一个ExecutionException,对应CompletionException。为了在大多数上下文中简化用法,这个类还定义了方法join()和getNow,而不是直接在这些情况中直接抛出CompletionException。


CompletableFuture中4个异步执行任务静态方法:


public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
        return asyncSupplyStage(asyncPool, supplier);
    }
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor) {
    return asyncSupplyStage(screenExecutor(executor), supplier);
}
public static CompletableFuture<Void> runAsync(Runnable runnable) {
    return asyncRunStage(asyncPool, runnable);
}
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) {
    return asyncRunStage(screenExecutor(executor), runnable);
}


其中supplyAsync用于有返回值的任务,runAsync则用于没有返回值的任务。Executor参数可以手动指定线程池,否则默认ForkJoinPool.commonPool()系统级公共线程池,

注意:这些线程都是Daemon线程,主线程结束Daemon线程不结束,只有JVM关闭时,生命周期终止


主动完成计算


CompletableFuture 类实现了CompletionStage和Future接口,所以还是可以像以前一样通过阻塞或轮询的方式获得结果。尽管这种方式不推荐使用。


如下四个方法都可以获取结果:


public T  get()  //Futrue的方法 阻塞
public T  get(long timeout, TimeUnit unit) //Futrue的方法 阻塞
// 新提供的方法
public T  getNow(T valueIfAbsent) //getNow有点特殊,如果结果已经计算完则返回结果或抛异常,否则返回给定的valueIfAbsent的值(此方法有点反人类有木有)
public T  join() // 返回计算的结果或抛出一个uncheckd异常。 推荐使用


上面4个方法,推荐使用join,还有带超时时间的get方法


CompletableFuture并非一定要交给线程池执行才能实现异步,你可以像下面这样实现异步运行:


     public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> completableFuture = new CompletableFuture();
        //自己开个线程去执行 执行完把结果告诉completableFuture即可
        new Thread(() -> {
            // 模拟执行耗时任务
            System.out.println("task doing...");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 告诉completableFuture任务已经完成 并且把结果告诉completableFuture
            completableFuture.complete("ok"); //这里把你信任的结果set进去后,所有阻塞的get()方法都能立马苏醒,获得到结果
        }).start();
        // 获取任务结果,如果没有完成会一直阻塞等待
        System.out.println("准备打印结果...");
        String result = completableFuture.get();
        System.out.println("计算结果:" + result);
    }
输出:
准备打印结果...
task doing...
计算结果:ok


如果没有意外,上面发的代码工作得很正常。但是,如果任务执行过程中产生了异常会怎样呢?如下:只加一句1/0的代码


 //自己开个线程去执行 执行完把结果告诉completableFuture即可
        new Thread(() -> {
            // 模拟执行耗时任务
            System.out.println("task doing...");
            try {
                Thread.sleep(3000);
                System.out.println(1 / 0);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 告诉completableFuture任务已经完成 并且把结果告诉completableFuture
            completableFuture.complete("ok");
        }).start();


这种情况下会得到一个相当糟糕的结果:异常会被限制在执行任务的线程的范围内,最终会杀死该守护线程,而主线程,将永远永远阻塞了。


怎么解决呢?


   使用get(long timeout, TimeUnit unit)代替get()方法,它使用一个超时参数来避免发生这样的情况。这是一种值得推荐的做法,我们应该尽量在你的代码中添加超时判断的逻辑,避免发生类似的问题。


使用这种方法至少能防止程序永久地等待下去,超时发生时,程序会得到通知发生了TimeoutException 。不过,也因为如此,你不能确定执行任务的线程内到底发生了什么问题(因此自己要做好权衡)。


   更好的解决方案是:为了能获取任务线程内发生的异常,你需要使用

CompletableFuture的completeExceptionally方法将导致CompletableFuture内发生问题的异常抛出。这样,当执行任务发生异常时,调用get()方法的线程将会收到一个 ExecutionException异常,该异常接收了一个包含失败原因的Exception 参数。


    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> completableFuture = new CompletableFuture();
        //自己开个线程去执行 执行完把结果告诉completableFuture即可
        new Thread(() -> {
            // 模拟执行耗时任务
            System.out.println("task doing...");
            try {
                Thread.sleep(3000);
                System.out.println(1 / 0);
            //} catch (InterruptedException e) {
            } catch (Exception e) {
                // 告诉completableFuture任务发生异常了
                completableFuture.completeExceptionally(e);
                e.printStackTrace();
            }
            // 告诉completableFuture任务已经完成 并且把结果告诉completableFuture
            completableFuture.complete("ok");
        }).start();
        // 获取任务结果,如果没有完成会一直阻塞等待
        System.out.println("准备打印结果...");
        String result = completableFuture.get();
        System.out.println("计算结果:" + result);
    }


这样子,如果内部发生了异常,调用get方法的时候就能得到这个Exception,进而能拿到抛异常的原因了。


说明:若执行了completableFuture.completeExceptionally(e);,那么completableFuture.get()它最终不是把异常获取出来了,而是给throw出来了,阻断主线程。(get()方法是用于获取正常返回值的)

使用案例


在Java8中,CompletableFuture提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合 CompletableFuture 的方法。


它可能代表一个明确完成的Future,也有可能代表一个完成阶段( CompletionStage ),它支持在计算完成以后触发一些函数或执行某些动作。


创建CompletableFuture


四个静态方法(如上),一个空构造函数


whenComplete计算结果完成时的处理


当CompletableFuture的计算结果完成,或者抛出异常的时候,我们可以执行特定的Action。主要是下面的方法:


public CompletableFuture<T>   whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T>   whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T>   whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T>     exceptionally(Function<Throwable,? extends T> fn)


可以看到Action的类型是BiConsumer<? super T,? super Throwable>,它可以处理正常的计算结果,或者异常情况。

方法不以Async结尾,意味着Action使用相同的线程执行,而Async可能会使用其它的线程去执行(如果使用相同的线程池,也可能会被同一个线程选中执行)。


注意这几个方法都会返回CompletableFuture。


CompletableFuture.supplyAsync(() -> 100)
                .thenApplyAsync(i -> i * 10)
                .thenApply(i -> i.toString())
                .whenComplete((r, e) -> System.out.println(r + "_____" + e)); //1000_____null
        //若有异常
        CompletableFuture.supplyAsync(() -> 1 / 0)
                .thenApplyAsync(i -> i * 10)
                .thenApply(i -> i.toString())
                .whenComplete((r, e) -> System.out.println(r + "_____" + e)); //null_____java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero


Main方法本地测试小细节:


假若我们直接通过这种方式,sleep个几秒:


    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> 100)
                .thenApplyAsync(i -> {
                    try {
                        TimeUnit.SECONDS.sleep(5);
                        System.out.println("this my sleep time");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return i + 1;
                })
                .thenApply(i -> i.toString())
                .whenComplete((r, e) -> System.out.println(r + "_____" + e));
    }


发现控制台木有任何打印:


image.png


很多小伙伴就就开始打断点,发现断点都进不去,囧。

其实这个小细节是干扰众多小伙伴的的地方~


根本原因:因为是异步执行的,所以主线程会线结束,JVM都退出了,自然异步线程也会死掉喽


解决方案:让main线程不这么快退出就行,自己测试的时候我们加上这么一句话就Ok了:


    public static void main(String[] args) throws InterruptedException {
      ...
      // 让主线程等待 只要这个时间大于你异步线程的时间就成~~~
      TimeUnit.SECONDS.sleep(20);
    }


Tips:对于web环境是不会存在此问题的,因为它的主线程一般情况下永远不会退出~~~~这里只是小伙伴在本地测试上需要注意的一个小细节~

相关文章
|
11小时前
|
Java
排课系统【JSP+Servlet+JavaBean】(Java课设)
排课系统【JSP+Servlet+JavaBean】(Java课设)
15 5
|
11小时前
|
监控 前端开发 Java
Java公立医院绩效考核管理系统 医院绩效考核系统的优势有哪些? 
医院绩效管理系统解决方案紧扣新医改形势下医院绩效管理的要求,以“工作量为基础的考核方案”为核心思想,结合患者满意度、服务质量、技术难度、工作效率、医德医风等管理发展目标的考核体系,形成医院的内部绩效考核与分配机制,通过信息化手段为绩效考评管理人员实施医院绩效考评工作提供了有效工具,扩展了信息管理范围,增加了信息分析的广度与深度。这不仅使绩效评价工作更加科学化、规范化和自动化,而且从根本上改变了绩效评估工作方式,实现了绩效评价数据网络化采集,评价结果透明化管理,奖金分配数据自动化生成,极大地提高了绩效评估的全面性、准确性、时效性、公正性。从而推进医院绩效管理的专业化、规范化和精细化管理,充分发挥
13 0
|
11小时前
|
存储 Java Maven
Maven 构建 Java 项目
使用 Maven 的 `maven-archetype-quickstart` 插件在 `C:\MVN` 创建 Java 应用项目 `consumerBanking`,命令行参数包括 `-DgroupId`, `-DartifactId` 和 `-DarchetypeArtifactId`。项目包含 src/main/java 和 src/test/java 目录,分别存放 Java 代码和测试代码,以及 src/main/resources 用于存储资源文件。默认生成的 `App.java` 和 `AppTest.java` 分别为应用主类和测试类。
|
11小时前
|
Java 程序员 API
Java 8新特性之Lambda表达式与Stream API的深度解析
【5月更文挑战第12天】本文将深入探讨Java 8中的两个重要新特性:Lambda表达式和Stream API。我们将从基本概念入手,逐步深入到实际应用场景,帮助读者更好地理解和掌握这两个新特性,提高Java编程效率。
41 2
|
11小时前
|
数据采集 前端开发 Java
Java医院绩效考核系统源码maven+Visual Studio Code一体化人力资源saas平台系统源码
医院绩效解决方案包括医院绩效管理(BSC)、综合奖金核算(RBRVS),涵盖从绩效方案的咨询与定制、数据采集、绩效考核及反馈、绩效奖金核算到科到组、分配到员工个人全流程绩效管理;将医院、科室、医护人员利益绑定;全面激活人才活力;兼顾质量和效益、长期与短期利益;助力医院降本增效,持续改善、优化收入、成本结构。
15 0
|
11小时前
|
移动开发 前端开发 NoSQL
ruoyi-nbcio从spring2.7.18升级springboot到3.1.7,java从java8升级到17(二)
ruoyi-nbcio从spring2.7.18升级springboot到3.1.7,java从java8升级到17(二)
44 0
|
11小时前
|
移动开发 Oracle 前端开发
本地开发环境安装java8、java17与java21
本地开发环境安装java8、java17与java21
16 0
|
11小时前
|
存储 Java API
java对接IPFS系统-以nft.storage为列
java对接IPFS系统-以nft.storage为列
14 2
|
11小时前
|
监控 前端开发 Java
Java基于B/S医院绩效考核管理平台系统源码 医院智慧绩效管理系统源码
医院绩效考核系统是一个关键的管理工具,旨在评估和优化医院内部各部门、科室和员工的绩效。一个有效的绩效考核系统不仅能帮助医院实现其战略目标,还能提升医疗服务质量,增强患者满意度,并促进员工的专业成长
18 0
|
SQL 存储 Java
Java 应用与数据库的关系| 学习笔记
快速学习 Java 应用与数据库的关系。
172 0
Java 应用与数据库的关系| 学习笔记