一网打尽:异步神器 CompletableFuture 万字详解!

本文涉及的产品
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: 一网打尽:异步神器 CompletableFuture 万字详解!


来源:blog.csdn.net/zsx_xiaoxin/article/details/123898171


CompletableFuture是jdk8的新特性。CompletableFuture实现了CompletionStage接口和Future接口,前者是对后者的一个扩展,增加了异步会点、流式处理、多个Future组合处理的能力,使Java在处理多任务的协同工作时更加顺畅便利。

一、创建异步任务

1. supplyAsync

supplyAsync是创建带有返回值的异步任务。它有如下两个方法,一个是使用默认线程池(ForkJoinPool.commonPool())的方法,一个是带有自定义线程池的重载方法

// 带返回值异步请求,默认线程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
// 带返回值的异步请求,可以自定义线程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

测试代码:

public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
            System.out.println("do something....");
            return "result";
        });
        //等待任务执行完成
        System.out.println("结果->" + cf.get());
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 自定义线程池
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
            System.out.println("do something....");
            return "result";
        }, executorService);
        //等待子任务执行完成
        System.out.println("结果->" + cf.get());
}

测试结果:

2. runAsync

runAsync是创建没有返回值的异步任务。它有如下两个方法,一个是使用默认线程池(ForkJoinPool.commonPool())的方法,一个是带有自定义线程池的重载方法

// 不带返回值的异步请求,默认线程池
public static CompletableFuture<Void> runAsync(Runnable runnable)
// 不带返回值的异步请求,可以自定义线程池
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)

测试代码:

public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Void> cf = CompletableFuture.runAsync(() -> {
            System.out.println("do something....");
        });
        //等待任务执行完成
        System.out.println("结果->" + cf.get());
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 自定义线程池
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        CompletableFuture<Void> cf = CompletableFuture.runAsync(() -> {
            System.out.println("do something....");
        }, executorService);
        //等待任务执行完成
        System.out.println("结果->" + cf.get());
}

测试结果:

image.png

3.获取任务结果的方法

// 如果完成则返回结果,否则就抛出具体的异常
public T get() throws InterruptedException, ExecutionException 
// 最大时间等待返回结果,否则就抛出具体异常
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
// 完成时返回结果值,否则抛出unchecked异常。为了更好地符合通用函数形式的使用,如果完成此 CompletableFuture所涉及的计算引发异常,则此方法将引发unchecked异常并将底层异常作为其原因
public T join()
// 如果完成则返回结果值(或抛出任何遇到的异常),否则返回给定的 valueIfAbsent。
public T getNow(T valueIfAbsent)
// 如果任务没有完成,返回的值设置为给定值
public boolean complete(T value)
// 如果任务没有完成,就抛出给定异常
public boolean completeExceptionally(Throwable ex) 

基于 Spring Boot + MyBatis Plus + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能

二、异步回调处理

1. thenApply和thenApplyAsync

thenApply 表示某个任务执行完成后执行的动作,即回调方法,会将该任务的执行结果即方法返回值作为入参传递到回调方法中,带有返回值。

测试代码:

public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread() + " cf1 do something....");
            return 1;
        });
        CompletableFuture<Integer> cf2 = cf1.thenApplyAsync((result) -> {
            System.out.println(Thread.currentThread() + " cf2 do something....");
            result += 2;
            return result;
        });
        //等待任务1执行完成
        System.out.println("cf1结果->" + cf1.get());
        //等待任务2执行完成
        System.out.println("cf2结果->" + cf2.get());
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread() + " cf1 do something....");
            return 1;
        });
        CompletableFuture<Integer> cf2 = cf1.thenApply((result) -> {
            System.out.println(Thread.currentThread() + " cf2 do something....");
            result += 2;
            return result;
        });
        //等待任务1执行完成
        System.out.println("cf1结果->" + cf1.get());
        //等待任务2执行完成
        System.out.println("cf2结果->" + cf2.get());
}

测试结果:

从上面代码和测试结果我们发现thenApply和thenApplyAsync区别在于,使用thenApply方法时子任务与父任务使用的是同一个线程,而thenApplyAsync在子任务中是另起一个线程执行任务,并且thenApplyAsync可以自定义线程池,默认的使用ForkJoinPool.commonPool()线程池。

2. thenAccept和thenAcceptAsync

thenAccep表示某个任务执行完成后执行的动作,即回调方法,会将该任务的执行结果即方法返回值作为入参传递到回调方法中,无返回值。

测试代码

public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread() + " cf1 do something....");
            return 1;
        });
        CompletableFuture<Void> cf2 = cf1.thenAccept((result) -> {
            System.out.println(Thread.currentThread() + " cf2 do something....");
        });
        //等待任务1执行完成
        System.out.println("cf1结果->" + cf1.get());
        //等待任务2执行完成
        System.out.println("cf2结果->" + cf2.get());
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread() + " cf1 do something....");
            return 1;
        });
        CompletableFuture<Void> cf2 = cf1.thenAcceptAsync((result) -> {
            System.out.println(Thread.currentThread() + " cf2 do something....");
        });
        //等待任务1执行完成
        System.out.println("cf1结果->" + cf1.get());
        //等待任务2执行完成
        System.out.println("cf2结果->" + cf2.get());
}

测试结果:

image.png

image.png

从上面代码和测试结果我们发现thenAccep和thenAccepAsync区别在于,使用thenAccep方法时子任务与父任务使用的是同一个线程,而thenAccepAsync在子任务中可能是另起一个线程执行任务,并且thenAccepAsync可以自定义线程池,默认的使用ForkJoinPool.commonPool()线程池。

3.thenRun和thenRunAsync

thenRun表示某个任务执行完成后执行的动作,即回调方法,无入参,无返回值。

测试代码:

public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread() + " cf1 do something....");
            return 1;
        });
        CompletableFuture<Void> cf2 = cf1.thenRun(() -> {
            System.out.println(Thread.currentThread() + " cf2 do something....");
        });
        //等待任务1执行完成
        System.out.println("cf1结果->" + cf1.get());
        //等待任务2执行完成
        System.out.println("cf2结果->" + cf2.get());
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread() + " cf1 do something....");
            return 1;
        });
        CompletableFuture<Void> cf2 = cf1.thenRunAsync(() -> {
            System.out.println(Thread.currentThread() + " cf2 do something....");
        });
        //等待任务1执行完成
        System.out.println("cf1结果->" + cf1.get());
        //等待任务2执行完成
        System.out.println("cf2结果->" + cf2.get());
}

测试结果:

从上面代码和测试结果我们发现thenRun和thenRunAsync区别在于,使用thenRun方法时子任务与父任务使用的是同一个线程,而thenRunAsync在子任务中可能是另起一个线程执行任务,并且thenRunAsync可以自定义线程池,默认的使用ForkJoinPool.commonPool()线程池。

4.whenComplete和whenCompleteAsync

whenComplete是当某个任务执行完成后执行的回调方法,会将执行结果或者执行期间抛出的异常传递给回调方法,如果是正常执行则异常为null,回调方法对应的CompletableFuture的result和该任务一致,如果该任务正常执行,则get方法返回执行结果,如果是执行异常,则get方法抛出异常。

测试代码:

 public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread() + " cf1 do something....");
            int a = 1/0;
            return 1;
        });
        CompletableFuture<Integer> cf2 = cf1.whenComplete((result, e) -> {
            System.out.println("上个任务结果:" + result);
            System.out.println("上个任务抛出异常:" + e);
            System.out.println(Thread.currentThread() + " cf2 do something....");
        });
//        //等待任务1执行完成
//        System.out.println("cf1结果->" + cf1.get());
//        //等待任务2执行完成
        System.out.println("cf2结果->" + cf2.get());
    }

测试结果:

whenCompleteAsync和whenComplete区别也是whenCompleteAsync可能会另起一个线程执行任务,并且thenRunAsync可以自定义线程池,默认的使用ForkJoinPool.commonPool()线程池。

5.handle和handleAsync

跟whenComplete基本一致,区别在于handle的回调方法有返回值。

测试代码:

public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread() + " cf1 do something....");
            // int a = 1/0;
            return 1;
        });
        CompletableFuture<Integer> cf2 = cf1.handle((result, e) -> {
            System.out.println(Thread.currentThread() + " cf2 do something....");
            System.out.println("上个任务结果:" + result);
            System.out.println("上个任务抛出异常:" + e);
            return result+2;
        });
        //等待任务2执行完成
        System.out.println("cf2结果->" + cf2.get());
}

测试结果 :

基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能

三、多任务组合处理

1. thenCombine、thenAcceptBoth 和runAfterBoth

这三个方法都是将两个CompletableFuture组合起来处理,只有两个任务都正常完成时,才进行下阶段任务。

区别:thenCombine会将两个任务的执行结果作为所提供函数的参数,且该方法有返回值;thenAcceptBoth同样将两个任务的执行结果作为方法入参,但是无返回值;runAfterBoth没有入参,也没有返回值。注意两个任务中只要有一个执行异常,则将该异常信息作为指定任务的执行结果。

测试代码:

public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread() + " cf1 do something....");
            return 1;
        });
        CompletableFuture<Integer> cf2 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread() + " cf2 do something....");
            return 2;
        });
        CompletableFuture<Integer> cf3 = cf1.thenCombine(cf2, (a, b) -> {
            System.out.println(Thread.currentThread() + " cf3 do something....");
            return a + b;
        });
        System.out.println("cf3结果->" + cf3.get());
}
 public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread() + " cf1 do something....");
            return 1;
        });
        CompletableFuture<Integer> cf2 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread() + " cf2 do something....");
            return 2;
        });
        CompletableFuture<Void> cf3 = cf1.thenAcceptBoth(cf2, (a, b) -> {
            System.out.println(Thread.currentThread() + " cf3 do something....");
            System.out.println(a + b);
        });
        System.out.println("cf3结果->" + cf3.get());
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread() + " cf1 do something....");
            return 1;
        });
        CompletableFuture<Integer> cf2 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread() + " cf2 do something....");
            return 2;
        });
        CompletableFuture<Void> cf3 = cf1.runAfterBoth(cf2, () -> {
            System.out.println(Thread.currentThread() + " cf3 do something....");
        });
        System.out.println("cf3结果->" + cf3.get());
}

测试结果:

2.applyToEither、acceptEither和runAfterEither

这三个方法和上面一样也是将两个CompletableFuture组合起来处理,当有一个任务正常完成时,就会进行下阶段任务。

区别:applyToEither会将已经完成任务的执行结果作为所提供函数的参数,且该方法有返回值;acceptEither同样将已经完成任务的执行结果作为方法入参,但是无返回值;runAfterEither没有入参,也没有返回值。

测试代码:

public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println(Thread.currentThread() + " cf1 do something....");
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "cf1 任务完成";
        });
        CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println(Thread.currentThread() + " cf2 do something....");
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "cf2 任务完成";
        });
        CompletableFuture<String> cf3 = cf1.applyToEither(cf2, (result) -> {
            System.out.println("接收到" + result);
            System.out.println(Thread.currentThread() + " cf3 do something....");
            return "cf3 任务完成";
        });
        System.out.println("cf3结果->" + cf3.get());
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println(Thread.currentThread() + " cf1 do something....");
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "cf1 任务完成";
        });
        CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println(Thread.currentThread() + " cf2 do something....");
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "cf2 任务完成";
        });
        CompletableFuture<Void> cf3 = cf1.acceptEither(cf2, (result) -> {
            System.out.println("接收到" + result);
            System.out.println(Thread.currentThread() + " cf3 do something....");
        });
        System.out.println("cf3结果->" + cf3.get());
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println(Thread.currentThread() + " cf1 do something....");
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("cf1 任务完成");
            return "cf1 任务完成";
        });
        CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println(Thread.currentThread() + " cf2 do something....");
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("cf2 任务完成");
            return "cf2 任务完成";
        });
        CompletableFuture<Void> cf3 = cf1.runAfterEither(cf2, () -> {
            System.out.println(Thread.currentThread() + " cf3 do something....");
            System.out.println("cf3 任务完成");
        });
        System.out.println("cf3结果->" + cf3.get());
}

测试结果:

从上面可以看出cf1任务完成需要2秒,cf2任务完成需要5秒,使用applyToEither组合两个任务时,只要有其中一个任务完成时,就会执行cf3任务,显然cf1任务先完成了并且将自己任务的结果传值给了cf3任务,cf3任务中打印了接收到cf1任务完成,接着完成自己的任务,并返回cf3任务完成;acceptEither和runAfterEither类似,acceptEither会将cf1任务的结果作为cf3任务的入参,但cf3任务完成时并无返回值;runAfterEither不会将cf1任务的结果作为cf3任务的入参,它是没有任务入参,执行完自己的任务后也并无返回值。

2. allOf / anyOf

allOf:CompletableFuture是多个任务都执行完成后才会执行,只有有一个任务执行异常,则返回的CompletableFuture执行get方法时会抛出异常,如果都是正常执行,则get返回null。

anyOf :CompletableFuture是多个任务只要有一个任务执行完成,则返回的CompletableFuture执行get方法时会抛出异常,如果都是正常执行,则get返回执行完成任务的结果。

测试代码:

public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println(Thread.currentThread() + " cf1 do something....");
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("cf1 任务完成");
            return "cf1 任务完成";
        });
        CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println(Thread.currentThread() + " cf2 do something....");
                int a = 1/0;
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("cf2 任务完成");
            return "cf2 任务完成";
        });
        CompletableFuture<String> cf3 = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println(Thread.currentThread() + " cf2 do something....");
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("cf3 任务完成");
            return "cf3 任务完成";
        });
        CompletableFuture<Void> cfAll = CompletableFuture.allOf(cf1, cf2, cf3);
        System.out.println("cfAll结果->" + cfAll.get());
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println(Thread.currentThread() + " cf1 do something....");
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("cf1 任务完成");
            return "cf1 任务完成";
        });
        CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println(Thread.currentThread() + " cf2 do something....");
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("cf2 任务完成");
            return "cf2 任务完成";
        });
        CompletableFuture<String> cf3 = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println(Thread.currentThread() + " cf2 do something....");
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("cf3 任务完成");
            return "cf3 任务完成";
        });
        CompletableFuture<Object> cfAll = CompletableFuture.anyOf(cf1, cf2, cf3);
        System.out.println("cfAll结果->" + cfAll.get());
}

测试结果:



相关文章
|
1天前
|
并行计算 算法 Java
ForkJoin并行计算神器(史上最全图文详解)
本文详细介绍ForkJoin框架的设计原理、工作窃取算法及使用案例,帮助你更好地利用多处理器并行运算能力提升应用性能。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
ForkJoin并行计算神器(史上最全图文详解)
|
5月前
|
消息中间件 缓存 中间件
【赠书活动 - 第1期】- 测试工程师Python开发实战(异步图书出品)| 文末送书
【赠书活动 - 第1期】- 测试工程师Python开发实战(异步图书出品)| 文末送书
|
消息中间件 缓存 Java
五个月,秒杀,38个大的篇章,126+篇文章、视频、小册,150+源码分支,完美收官!
经过四个多月的坚持,《Seckill秒杀系统》终于接近尾声了,也感谢大家这四个多月以来的坚持和陪伴,也相信大家在《Seckill秒杀系统》专栏中,学到了不少知识和技术。接下来,我们就一起对《Seckill秒杀系统》专栏做个总结。
189 3
五个月,秒杀,38个大的篇章,126+篇文章、视频、小册,150+源码分支,完美收官!
|
12月前
|
Linux 数据库 数据安全/隐私保护
C++实战-Linux多线程(入门到精通)(三)
C++实战-Linux多线程(入门到精通)(三)
51 0
|
消息中间件 存储 Java
一网打尽异步神器CompletableFuture
最近一直畅游在RocketMQ的源码中,发现在RocketMQ中很多地方都使用到了CompletableFuture,所以今天就跟大家来聊一聊JDK1.8提供的异步神器CompletableFuture,并且最后会结合RocketMQ源码分析一下CompletableFuture的使用。
|
JavaScript 前端开发 网络协议
直击灵魂!美团大牛手撸并发原理笔记,由浅入深剖析JDK源码
并发编程这四个字想必大家最近都在网上看到过有很多的帖子在讨论。我们都知道并发编程可选择的方式有多进程、多线程和多协程。在Java中,并发就是多线程模式。而多线程编程也一直是一个被广泛而深入讨论的领域。如果遇到复杂的多线程编程场景,大多数情况下我们就需要站在巨人的肩膀上利用并发编程框架——JDK Concurrent包来解决相关线程问题。
|
Java 开发者 容器
先到先学!Alibaba甩出第四次更新的JDK源码高级笔记(终极版)
作为Java开发者,面试肯定被问过多线程。对于它,大多数好兄弟面试前都是看看八股文背背面试题以为就OK了;殊不知现在的面试官都是针对一个点往深了问,你要是不懂其中原理,面试就挂了。可能你知道什么是进程什么是线程,但面试官要是问你进程之间是如何通讯的?ConcurrentHashMap 和 HashTable有什么区别?为什么wait和notify方法要在同步块代码中调用?你答不上来就只能等通知了。。。
|
Java
【Java原理探索】教你如何使用异步神器CompletableFuture| 8月更文挑战
【Java原理探索】教你如何使用异步神器CompletableFuture| 8月更文挑战
122 0
异步神器CompletableFuture
异步神器CompletableFuture