Java 8 的异步利器:CompletableFuture源码解析(建议精读)

本文涉及的产品
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
简介: 实现了俩接口,本身是个class。这个是Future的实现类,使用 completionStage 接口去支持完成时触发的函数和操作。

completableFuture 是JDK1.8版本新引入的类。下面是这个类:

实现了俩接口,本身是个class。这个是Future的实现类,使用 completionStage 接口去支持完成时触发的函数和操作。

一个 completetableFuture 就代表了一个任务,他能用Future的方法,还能做一些之前说的 executorService 配合 futures 做不了的。

之前future需要等待isDone为true才能知道任务跑完了,或者就是用get方法调用的时候会出现阻塞,而使用 completableFuture 的使用就可以用then,when等等操作来防止以上的阻塞和轮询isDone的现象出现。

1.创建CompletableFuture直接new对象。

一个 completableFuture 对象代表着一个任务,这个对象能跟这个任务产生联系。

下面用的 complete 方法意思就是这个任务完成了需要返回的结果,然后用 get() 方法可以获取到。

2.JDK1.8使用的接口类。

在本文的 CompletableFuture 中大量地使用了这些函数式接口。

注:这些声明大量应用于方法的入参中,像 thenApplythenAccept 这俩就是一个用Function一个用Consumer

而lambda函数正好是可以作为这些接口的实现。例如 s->{return 1;} 这个就相当于一个Function。因为有入参和返回结果。

(1)Function

(2)Consumer

对于前面有Bi的就是这样的,BiConsumer就是两个参数的。

(3)Predicate这个接口声明是一个入参,返回一个boolean。

(4)supplier

3.下面是这个类的静态方法

带有Async就是异步执行的意思、也是一个 completableFuture 对象代表着一个任务这个原则。

这种异步方法都可以指定一个线程池作为任务的运行环境,如果没有指定就会使用 ForkJoinPool 线程池来执行

(1) supplyAsync&runAsync 的使用例子。

public static void main(String[] args) throws ExecutionException, InterruptedException {
    ExecutorService executorService = Executors.newCachedThreadPool();
    executorService.submit(new Callable<Object>() {
        @Override
        public Object call() throws Exception {
            System.out.println("executorService 是否为守护线程 :" + Thread.currentThread().isDaemon());
            return null;
        }
    });
    final CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
        System.out.println("this is lambda supplyAsync");
        System.out.println("supplyAsync 是否为守护线程 " + Thread.currentThread().isDaemon());
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("this lambda is executed by forkJoinPool");
        return "result1";
    });
    final CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        System.out.println("this is task with executor");
System.out.println("supplyAsync 使用executorService 时是否为守护线程 : " + Thread.currentThread().isDaemon());
        return "result2";
    }, executorService);
    System.out.println(completableFuture.get());
    System.out.println(future.get());
    executorService.shutdown();
}

这些任务中带有supply是持有返回值的,run是void返回值的,在玩supply时发现一个问题:如果使用supplyAsync任务时不使用任务的返回值,即 不用get方法阻塞主线程会导致任务执行中断。

注:跟get方法无关,后面有答案

然后我开始探索是否是只有 supplyAsync 是这样。我测试了 runAsync 发现也是这样。

下图为与 supplyAsync 任务执行不全面一样的问题,我甚至测试了将lambda换成runnable发现无济于事。

答案:

造成这个原因是因为Daemon。因为 completableFuture 这套使用异步任务的操作都是创建成了守护线程,那么我们没有调用get方法不阻塞这个主线程的时候。主线程执行完毕,所有线程执行完毕就会导致一个问题,就是守护线程退出。

那么我们没有执行的代码就是因为主线程不再跑任务而关闭导致的,可能这个不叫问题,因为在开发中我们主线程常常是一直开着的。但是这个小问题同样让我想了好久。

下面我们开一个非守护线程,可以看到程序执行顺利。

下面证实守护线程在其他非守护线程全部退出的情况下不继续执行。

final CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
    System.out.println("this is lambda supplyAsync");
    System.out.println("supplyAsync 是否为守护线程 " + Thread.currentThread().isDaemon());
    try {
        TimeUnit.SECONDS.sleep(1);
        try(BufferedWriter writer = new BufferedWriter
                (new OutputStreamWriter(new FileOutputStream(new File("/Users/zhangyong/Desktop/temp/out.txt"))))){
            writer.write("this is completableFuture daemon test");
        }catch (Exception e){
            System.out.println("exception find");
        }
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("this lambda is executed by forkJoinPool");
    return "result1";
});

这个代码就是操作本地文件,并且sleep了一秒。其他线程就一句控制台输出的代码,最终的结果是文件没有任何变化。

当我把主线程 sleep 5 秒时,本地文件会写入一句 this is completableFuture daemon test 验证成功。

(2)allOf&anyOf

这两个方法的入参是一个 completableFuture 组、allOf就是所有任务都完成时返回,但是是个Void的返回值。

anyOf是当入参的 completableFuture 组中有一个任务执行完毕就返回,返回结果是第一个完成的任务的结果。

public static void otherStaticMethod() throws ExecutionException, InterruptedException {
        final CompletableFuture<String> futureOne = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                System.out.println("futureOne InterruptedException");
            }
            return "futureOneResult";
        });
        final CompletableFuture<String> futureTwo = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(6000);
            } catch (InterruptedException e) {
                System.out.println("futureTwo InterruptedException");
            }
            return "futureTwoResult";
        });
        CompletableFuture future = CompletableFuture.allOf(futureOne, futureTwo);
        System.out.println(future.get());
//        CompletableFuture completableFuture = CompletableFuture.anyOf(futureOne, futureTwo);
//        System.out.println(completableFuture.get());
    }

(3) completedFuture 这个方法我没懂他是干啥的,源码就是返回一个值。感觉没啥意义。

(4)取值方法,除了get还有一个 getNow(); 这个就比较特殊了。

这个方法是执行这个方法的时候任务执行完了就返回任务的结果,如果任务没有执行完就返回你的入参。

(5)join方法跟线程的join用法差不多。

(6) whenXXX ,在一个任务执行完成之后调用的方法。

这个有三个名差不多的方法: whenCompletewhenCompleteAsync 、还有一个是 whenCompleteAsync 用自定义 Executor

首先看一下这个 whenComplete 实例方法。这个就是任务执行完毕调用的,传入一个action,这个方法的执行线程是当前线程,意味着会阻塞当前线程。

下面图中test的输出跟 whenComplete 方法运行的线程有关,运行到main线程就会阻塞test的输出,运行的是 completableFuture 线程则不会阻塞住test的输出。

下面是任务执行的线程的探索。

根据测试得出的结论是:如果调用 whenComplete 的中途,还发生了其他事情,图中的主线程的 sleep(400); 导致 completableFuture 这个任务执行完毕了,那么就使用主线程调用。

如果调用的中途没有发生其他任务且在触碰到 whenComplete 方法时 completableFuture 这个任务还没有彻底执行完毕那么就会用 completableFuture 这个任务所使用的线程。

下面是 whenCompleteAsync 方法。这个方法就是新创建一个异步线程执行。所以不会阻塞。

(7) then方法瞅着挺多的,实际上就是异不异步和加不加自定义 Executor

注: whenComplete 中出现的问题在then中测试不存在、使用的就是上一个任务的线程。这个 thenCompose 就是一个任务执行完之后可以用它的返回结果接着执行的方法,方法返回的是另一个你期盼泛型的结果。

compose 理解就是上一个任务结果是then的一部分。

下面介绍一下 thenCombine

这个 combine 的理解就是结合两个任务的结果。

综上:这个线程的问题并不是大问题,只要你不用线程来做判断条件,他并不会影响你的效率。试想pool线程都执行完了就用主线程跑呗。没跑完,而使你等了那你就用pool线程呗。

thenRun就是这个任务运行完,再运行下一个任务,感觉像是join了一下。

其余不再介绍,大同小异。

thenApply(Function); 这样的就是有入参有返回值类型的。

thenAccept(Consumer); 这样的就是有入参,但是没有返回值的。详情在上文中有过关于函数式接口的叙述。

本文就是愿天堂没有BUG给大家分享的内容,大家有收获的话可以分享下,想学习更多的话可以到微信公众号里找我,我等你哦。

相关文章
|
4天前
|
存储 缓存 Java
Java 并发编程——volatile 关键字解析
本文介绍了Java线程中的`volatile`关键字及其与`synchronized`锁的区别。`volatile`保证了变量的可见性和一定的有序性,但不能保证原子性。它通过内存屏障实现,避免指令重排序,确保线程间数据一致。相比`synchronized`,`volatile`性能更优,适用于简单状态标记和某些特定场景,如单例模式中的双重检查锁定。文中还解释了Java内存模型的基本概念,包括主内存、工作内存及并发编程中的原子性、可见性和有序性。
Java 并发编程——volatile 关键字解析
|
3天前
|
存储 设计模式 算法
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
行为型模式用于描述程序在运行时复杂的流程控制,即描述多个类或对象之间怎样相互协作共同完成单个对象都无法单独完成的任务,它涉及算法与对象间职责的分配。行为型模式分为类行为模式和对象行为模式,前者采用继承机制来在类间分派行为,后者采用组合或聚合在对象间分配行为。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象行为模式比类行为模式具有更大的灵活性。 行为型模式分为: • 模板方法模式 • 策略模式 • 命令模式 • 职责链模式 • 状态模式 • 观察者模式 • 中介者模式 • 迭代器模式 • 访问者模式 • 备忘录模式 • 解释器模式
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
|
3天前
|
设计模式 存储 安全
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
结构型模式描述如何将类或对象按某种布局组成更大的结构。它分为类结构型模式和对象结构型模式,前者采用继承机制来组织接口和类,后者釆用组合或聚合来组合对象。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象结构型模式比类结构型模式具有更大的灵活性。 结构型模式分为以下 7 种: • 代理模式 • 适配器模式 • 装饰者模式 • 桥接模式 • 外观模式 • 组合模式 • 享元模式
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
|
3天前
|
设计模式 存储 安全
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
创建型模式的主要关注点是“怎样创建对象?”,它的主要特点是"将对象的创建与使用分离”。这样可以降低系统的耦合度,使用者不需要关注对象的创建细节。创建型模式分为5种:单例模式、工厂方法模式抽象工厂式、原型模式、建造者模式。
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
|
2天前
|
Java 数据库连接 Spring
反射-----浅解析(Java)
在java中,我们可以通过反射机制,知道任何一个类的成员变量(成员属性)和成员方法,也可以堆任何一个对象,调用这个对象的任何属性和方法,更进一步我们还可以修改部分信息和。
|
4天前
|
安全 搜索推荐 数据挖掘
陪玩系统源码开发流程解析,成品陪玩系统源码的优点
我们自主开发的多客陪玩系统源码,整合了市面上主流陪玩APP功能,支持二次开发。该系统适用于线上游戏陪玩、语音视频聊天、心理咨询等场景,提供用户注册管理、陪玩者资料库、预约匹配、实时通讯、支付结算、安全隐私保护、客户服务及数据分析等功能,打造综合性社交平台。随着互联网技术发展,陪玩系统正成为游戏爱好者的新宠,改变游戏体验并带来新的商业模式。
|
1月前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
77 2
|
2月前
|
缓存 Java 程序员
Map - LinkedHashSet&Map源码解析
Map - LinkedHashSet&Map源码解析
81 0
|
2月前
|
算法 Java 容器
Map - HashSet & HashMap 源码解析
Map - HashSet & HashMap 源码解析
67 0
|
2月前
|
存储 Java C++
Collection-PriorityQueue源码解析
Collection-PriorityQueue源码解析
71 0

热门文章

最新文章

推荐镜像

更多