【小家java】Java8新特性之---CompletableFuture的系统讲解和实例演示(使用CompletableFuture构建异步应用)(上)

简介: 【小家java】Java8新特性之---CompletableFuture的系统讲解和实例演示(使用CompletableFuture构建异步应用)(上)

异步


传统单线程环境下,调用函数是同步的,必须等待程序返回结果后,才可进行其他处理。因此为了提高系统整体的并发性能,引入了异步执行~


jdk中已经内置future模式的实现。Future是Java5添加的类,用来描述一个异步计算的结果。可以用isDone方法来检查计算是否完成,或者使用get阻塞住调用线程,直至计算完成返回结果,也可以用cancel方法来停止任务的执行。

Futrue异步模式存在的问题


Future以及相关使用方法提供了异步执行任务的能力,但对于结果的获取却是不方便,只能通过阻塞或轮询的方式得到任务结果。


阻塞的方式与我们理解的异步编程其实是相违背的,而轮询又会耗无谓的CPU资源。而且还不能及时得到计算结果,为什么不能用观察者设计模式当计算结果完成及时通知监听者呢?


很多语言像Node.js,采用回调的方式实现异步编程。Java的一些框架像Netty,自己扩展Java的Future接口,提供了addListener等多个扩展方法。


guava里面也提供了通用的扩展Future: ListenableFuture\SettableFuture以及辅助类Futures等,方便异步编程


Spring4.0也扩展了Futrue,提供了ListenableFuture来addCallback()采用回调函数的形式来提高整体异步性能~


作为正统Java类库,是不是应该加点什么特性,可以加强一下自身库的功能?

JDK8引入中重磅类库:CompletableFuture


Java8里面新增加了一个包含50个方法左右的类:CompletableFuture. 提供了非常强大的Future的扩展功能,可以帮助简化异步编程的复杂性,提供了函数式编程能力,可以通过回调的方式计算处理结果,并且提供了转换和组织CompletableFuture的方法。


JDK1.8才新加入的一个实现类CompletableFuture,实现了Future, CompletionStage两个接口。


CompletableFuture实现了CompletionStage接口的如下策略:


1.为了完成当前的CompletableFuture接口或者其他完成方法的回调函数的线程,提供了非异步的完成操作。


2.没有显式入参Executor的所有async方法都使用ForkJoinPool.commonPool()为了简化监视、调试和跟踪,所有生成的异步任务都是标记接口AsynchronousCompletionTask的实例。


3.所有的CompletionStage方法都是独立于其他共有方法实现的,因此一个方法的行为不会受到子类中其他方法的覆盖


CompletableFuture实现了Futurre接口的如下策略:


1.CompletableFuture无法直接控制完成,所以cancel操作被视为是另一种异常完成形式。方法isCompletedExceptionally可以用来确定一个CompletableFuture是否以任何异常的方式完成。


2.以一个CompletionException为例,方法get()和get(long,TimeUnit)抛出一个ExecutionException,对应CompletionException。为了在大多数上下文中简化用法,这个类还定义了方法join()和getNow,而不是直接在这些情况中直接抛出CompletionException。


CompletableFuture中4个异步执行任务静态方法:


public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
        return asyncSupplyStage(asyncPool, supplier);
    }
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor) {
    return asyncSupplyStage(screenExecutor(executor), supplier);
}
public static CompletableFuture<Void> runAsync(Runnable runnable) {
    return asyncRunStage(asyncPool, runnable);
}
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) {
    return asyncRunStage(screenExecutor(executor), runnable);
}


其中supplyAsync用于有返回值的任务,runAsync则用于没有返回值的任务。Executor参数可以手动指定线程池,否则默认ForkJoinPool.commonPool()系统级公共线程池,

注意:这些线程都是Daemon线程,主线程结束Daemon线程不结束,只有JVM关闭时,生命周期终止


主动完成计算


CompletableFuture 类实现了CompletionStage和Future接口,所以还是可以像以前一样通过阻塞或轮询的方式获得结果。尽管这种方式不推荐使用。


如下四个方法都可以获取结果:


public T  get()  //Futrue的方法 阻塞
public T  get(long timeout, TimeUnit unit) //Futrue的方法 阻塞
// 新提供的方法
public T  getNow(T valueIfAbsent) //getNow有点特殊,如果结果已经计算完则返回结果或抛异常,否则返回给定的valueIfAbsent的值(此方法有点反人类有木有)
public T  join() // 返回计算的结果或抛出一个uncheckd异常。 推荐使用


上面4个方法,推荐使用join,还有带超时时间的get方法


CompletableFuture并非一定要交给线程池执行才能实现异步,你可以像下面这样实现异步运行:


     public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> completableFuture = new CompletableFuture();
        //自己开个线程去执行 执行完把结果告诉completableFuture即可
        new Thread(() -> {
            // 模拟执行耗时任务
            System.out.println("task doing...");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 告诉completableFuture任务已经完成 并且把结果告诉completableFuture
            completableFuture.complete("ok"); //这里把你信任的结果set进去后,所有阻塞的get()方法都能立马苏醒,获得到结果
        }).start();
        // 获取任务结果,如果没有完成会一直阻塞等待
        System.out.println("准备打印结果...");
        String result = completableFuture.get();
        System.out.println("计算结果:" + result);
    }
输出:
准备打印结果...
task doing...
计算结果:ok


如果没有意外,上面发的代码工作得很正常。但是,如果任务执行过程中产生了异常会怎样呢?如下:只加一句1/0的代码


 //自己开个线程去执行 执行完把结果告诉completableFuture即可
        new Thread(() -> {
            // 模拟执行耗时任务
            System.out.println("task doing...");
            try {
                Thread.sleep(3000);
                System.out.println(1 / 0);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 告诉completableFuture任务已经完成 并且把结果告诉completableFuture
            completableFuture.complete("ok");
        }).start();


这种情况下会得到一个相当糟糕的结果:异常会被限制在执行任务的线程的范围内,最终会杀死该守护线程,而主线程,将永远永远阻塞了。


怎么解决呢?


   使用get(long timeout, TimeUnit unit)代替get()方法,它使用一个超时参数来避免发生这样的情况。这是一种值得推荐的做法,我们应该尽量在你的代码中添加超时判断的逻辑,避免发生类似的问题。


使用这种方法至少能防止程序永久地等待下去,超时发生时,程序会得到通知发生了TimeoutException 。不过,也因为如此,你不能确定执行任务的线程内到底发生了什么问题(因此自己要做好权衡)。


   更好的解决方案是:为了能获取任务线程内发生的异常,你需要使用

CompletableFuture的completeExceptionally方法将导致CompletableFuture内发生问题的异常抛出。这样,当执行任务发生异常时,调用get()方法的线程将会收到一个 ExecutionException异常,该异常接收了一个包含失败原因的Exception 参数。


    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> completableFuture = new CompletableFuture();
        //自己开个线程去执行 执行完把结果告诉completableFuture即可
        new Thread(() -> {
            // 模拟执行耗时任务
            System.out.println("task doing...");
            try {
                Thread.sleep(3000);
                System.out.println(1 / 0);
            //} catch (InterruptedException e) {
            } catch (Exception e) {
                // 告诉completableFuture任务发生异常了
                completableFuture.completeExceptionally(e);
                e.printStackTrace();
            }
            // 告诉completableFuture任务已经完成 并且把结果告诉completableFuture
            completableFuture.complete("ok");
        }).start();
        // 获取任务结果,如果没有完成会一直阻塞等待
        System.out.println("准备打印结果...");
        String result = completableFuture.get();
        System.out.println("计算结果:" + result);
    }


这样子,如果内部发生了异常,调用get方法的时候就能得到这个Exception,进而能拿到抛异常的原因了。


说明:若执行了completableFuture.completeExceptionally(e);,那么completableFuture.get()它最终不是把异常获取出来了,而是给throw出来了,阻断主线程。(get()方法是用于获取正常返回值的)

使用案例


在Java8中,CompletableFuture提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合 CompletableFuture 的方法。


它可能代表一个明确完成的Future,也有可能代表一个完成阶段( CompletionStage ),它支持在计算完成以后触发一些函数或执行某些动作。


创建CompletableFuture


四个静态方法(如上),一个空构造函数


whenComplete计算结果完成时的处理


当CompletableFuture的计算结果完成,或者抛出异常的时候,我们可以执行特定的Action。主要是下面的方法:


public CompletableFuture<T>   whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T>   whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T>   whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T>     exceptionally(Function<Throwable,? extends T> fn)


可以看到Action的类型是BiConsumer<? super T,? super Throwable>,它可以处理正常的计算结果,或者异常情况。

方法不以Async结尾,意味着Action使用相同的线程执行,而Async可能会使用其它的线程去执行(如果使用相同的线程池,也可能会被同一个线程选中执行)。


注意这几个方法都会返回CompletableFuture。


CompletableFuture.supplyAsync(() -> 100)
                .thenApplyAsync(i -> i * 10)
                .thenApply(i -> i.toString())
                .whenComplete((r, e) -> System.out.println(r + "_____" + e)); //1000_____null
        //若有异常
        CompletableFuture.supplyAsync(() -> 1 / 0)
                .thenApplyAsync(i -> i * 10)
                .thenApply(i -> i.toString())
                .whenComplete((r, e) -> System.out.println(r + "_____" + e)); //null_____java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero


Main方法本地测试小细节:


假若我们直接通过这种方式,sleep个几秒:


    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> 100)
                .thenApplyAsync(i -> {
                    try {
                        TimeUnit.SECONDS.sleep(5);
                        System.out.println("this my sleep time");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return i + 1;
                })
                .thenApply(i -> i.toString())
                .whenComplete((r, e) -> System.out.println(r + "_____" + e));
    }


发现控制台木有任何打印:


image.png


很多小伙伴就就开始打断点,发现断点都进不去,囧。

其实这个小细节是干扰众多小伙伴的的地方~


根本原因:因为是异步执行的,所以主线程会线结束,JVM都退出了,自然异步线程也会死掉喽


解决方案:让main线程不这么快退出就行,自己测试的时候我们加上这么一句话就Ok了:


    public static void main(String[] args) throws InterruptedException {
      ...
      // 让主线程等待 只要这个时间大于你异步线程的时间就成~~~
      TimeUnit.SECONDS.sleep(20);
    }


Tips:对于web环境是不会存在此问题的,因为它的主线程一般情况下永远不会退出~~~~这里只是小伙伴在本地测试上需要注意的一个小细节~

目录
打赏
0
0
0
0
37
分享
相关文章
Java也能快速搭建AI应用?一文带你玩转Spring AI可落地性
Java语言凭借其成熟的生态与解决方案,特别是通过 Spring AI 框架,正迅速成为 AI 应用开发的新选择。本文将探讨如何利用 Spring AI Alibaba 构建在线聊天 AI 应用,并实现对其性能的全面可观测性。
|
25天前
|
Java静态代码块深度剖析:机制、特性与最佳实践
在Java中,静态代码块(或称静态初始化块)是指类中定义的一个或多个`static { ... }`结构。其主要功能在于初始化类级别的数据,例如静态变量的初始化或执行仅需运行一次的初始化逻辑。
39 4
构建高效Java后端与前端交互的定时任务调度系统
通过以上步骤,我们构建了一个高效的Java后端与前端交互的定时任务调度系统。该系统使用Spring Boot作为后端框架,Quartz作为任务调度器,并通过前端界面实现用户交互。此系统可以应用于各种需要定时任务调度的业务场景,如数据同步、报告生成和系统监控等。
57 9
Java 也能快速搭建 AI 应用?一文带你玩转 Spring AI 可观测性
Java 也能快速搭建 AI 应用?一文带你玩转 Spring AI 可观测性
CRaC技术助力ACS上的Java应用启动加速
容器计算服务借助ACS的柔性算力特性并搭配CRaC技术极致地提升Java类应用的启动速度。
Java 也能快速搭建 AI 应用?一文带你玩转 Spring AI 可观测性
Java 也能快速搭建 AI 应用?一文带你玩转 Spring AI 可观测性
Java14发布,16大新特性,代码更加简洁明快
Java14发布,16大新特性,代码更加简洁明快
342 0
Java14发布,16大新特性,代码更加简洁明快
|
2月前
|
【Java并发】【线程池】带你从0-1入门线程池
欢迎来到我的技术博客!我是一名热爱编程的开发者,梦想是编写高端CRUD应用。2025年我正在沉淀中,博客更新速度加快,期待与你一起成长。 线程池是一种复用线程资源的机制,通过预先创建一定数量的线程并管理其生命周期,避免频繁创建/销毁线程带来的性能开销。它解决了线程创建成本高、资源耗尽风险、响应速度慢和任务执行缺乏管理等问题。
173 60
【Java并发】【线程池】带你从0-1入门线程池
Java网络编程,多线程,IO流综合小项目一一ChatBoxes
**项目介绍**:本项目实现了一个基于TCP协议的C/S架构控制台聊天室,支持局域网内多客户端同时聊天。用户需注册并登录,用户名唯一,密码格式为字母开头加纯数字。登录后可实时聊天,服务端负责验证用户信息并转发消息。 **项目亮点**: - **C/S架构**:客户端与服务端通过TCP连接通信。 - **多线程**:采用多线程处理多个客户端的并发请求,确保实时交互。 - **IO流**:使用BufferedReader和BufferedWriter进行数据传输,确保高效稳定的通信。 - **线程安全**:通过同步代码块和锁机制保证共享数据的安全性。
73 23
|
30天前
|
【源码】【Java并发】【线程池】邀请您从0-1阅读ThreadPoolExecutor源码
当我们创建一个`ThreadPoolExecutor`的时候,你是否会好奇🤔,它到底发生了什么?比如:我传的拒绝策略、线程工厂是啥时候被使用的? 核心线程数是个啥?最大线程数和它又有什么关系?线程池,它是怎么调度,我们传入的线程?...不要着急,小手手点上关注、点赞、收藏。主播马上从源码的角度带你们探索神秘线程池的世界...
101 0
【源码】【Java并发】【线程池】邀请您从0-1阅读ThreadPoolExecutor源码