聊聊Java中CompletableFuture的使用(下)

简介: 聊聊Java中CompletableFuture的使用

任务串行执行


CompletableFuture也支持任务串行执行,后面的任务依赖前面任务的执行结果。我们再举一个做果汁的例子,我们分串行三步: 洗水果 —> 切水果 -> 榨汁,看如下代码

public static void main(String[] args) throws InterruptedException {
    ExecutorService executor = MyThreadPoolExecutor.getThreadPoolExecutor();
    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> washFruit(), executor);
    CompletableFuture<String> future2 = future1.thenApplyAsync(r -> StringUtils.isBlank(r) ? null : cutFruit());
    future2.thenApplyAsync(r -> StringUtils.isBlank(r) ? null : juicing());
    //为了主线程不立刻退出,以便查看结果
    Thread.sleep(100);
}
private static String washFruit() {
    System.out.println(Thread.currentThread().getName());
    System.out.println("洗水果");
    return "洗水果";
}
private static String cutFruit() {
    System.out.println(Thread.currentThread().getName());
    System.out.println("切水果");
    return "切水果";
}
private static String juicing() {
    System.out.println(Thread.currentThread().getName());
    System.out.println("榨汁");
    return "榨汁";
}

上面main函数执行后输出:

pool-1-thread-1
洗水果
ForkJoinPool.commonPool-worker-1
切水果
ForkJoinPool.commonPool-worker-1
榨汁

上面的代码就是一个串行执行的任务,这儿除了thenApply,还有thenAccept(Consumer<? super T> action)和thenRun(Runnable action),这2个方法都不返回执行结果。


注意:上面方法中,thenApply、thenAccept、thenRun都有一个对应的Async方法,区别在于Async方法会从线程池中拿线程执行,而不带Async的方法在当前线程执行。所以如果上面代码中thenApplyAsync换成thenApply,执行结果如下:

pool-1-thread-1
洗水果
pool-1-thread-1
切水果
pool-1-thread-1
榨汁

结果组合运算


thenCombine和thenCompose


thenCombine用于组合2个CompletableFuture,对结果进行运算,下面2段代码都是输出110

ExecutorService executor = MyThreadPoolExecutor.getThreadPoolExecutor();
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 10, executor);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 100, executor);
CompletableFuture<Integer> future3 = future1.thenCombineAsync(future2, (x, y) -> x + y);
System.out.println(future3.get());

thenCompose把第一个CompletableFuture的结果放到第二个CompletableFuture中进行运算

ExecutorService executor = MyThreadPoolExecutor.getThreadPoolExecutor();
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 10, executor);
CompletableFuture<Integer> future2 = future1.thenCompose(r -> CompletableFuture.supplyAsync(() -> r + 100));
System.out.println(future2.get());

上面的组合方法其实用上一节讲的串行执行也可以完成,见如下代码

ExecutorService executor = MyThreadPoolExecutor.getThreadPoolExecutor();
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 10, executor);
CompletableFuture<Integer> future2 = future1.thenApplyAsync(r -> r + 100);
System.out.println(future2.get());

thenAcceptBoth和runAfterBoth


thenAcceptBoth用于对前面2个线程的结果进行组合运算,下面代码输出110

ExecutorService executor = MyThreadPoolExecutor.getThreadPoolExecutor();
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 10, executor);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 100, executor);
CompletableFuture<Void> future3 = future1.thenAcceptBoth(future2, (x, y) -> System.out.println(x + y));

runAfterBoth用于等待前面2个线程之后执行第三个线程

ExecutorService executor = MyThreadPoolExecutor.getThreadPoolExecutor();
CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> System.out.println("线程1"), executor);
CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> System.out.println("线程2"), executor);
CompletableFuture<Void> future3 = future1.runAfterBothAsync(future2, () -> System.out.println("线程3"));

上面代码输出:

线程1
线程2
线程3

acceptEither、runAfterEither和applyToEither


这三个方法只取组合线程中执行最快的一个结果,看下面代码:

public static void main(String[] args) {
    CompletableFuture<String> f1 = CompletableFuture.supplyAsync(()-> getTask1());
    CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> getTask2());
    CompletableFuture<String> f3 = f1.applyToEither(f2,s -> s);
    System.out.println(f3.join());//输出task2
    CompletableFuture<String> f4 = CompletableFuture.supplyAsync(()-> getTask1());
    CompletableFuture<String> f5 = CompletableFuture.supplyAsync(() -> getTask2());
    CompletableFuture<Void> f6 = f4.runAfterEither(f5, () -> System.out.println("task3"));
    f6.join();//输出task3
    CompletableFuture<String> f7 = CompletableFuture.supplyAsync(()-> getTask1());
    CompletableFuture<String> f8 = CompletableFuture.supplyAsync(() -> getTask2());
    CompletableFuture<Void> f9 = f8.acceptEither(f7, s -> System.out.println(s));
    f9.join();//输出task2
}
private static String getTask1(){
    try {
        Thread.currentThread().sleep(2000);
        return "task1";
    } catch (InterruptedException e) {
        e.printStackTrace();
        return null;
    }
}
private static String getTask2(){
    try {
        Thread.currentThread().sleep(1000);
        return "task2";
    } catch (InterruptedException e) {
        e.printStackTrace();
        return null;
    }
}

同样注意:上面7个方法都存在对应的Async方法,会从线程池中取线程来执行。


java9的改进


1.可以设置超时时间,超时后给一个默认值,比如下面代码输出100

ExecutorService executor = MyThreadPoolExecutor.getThreadPoolExecutor();
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> getNum(), executor);
future1.completeOnTimeout(100, 3000, TimeUnit.MILLISECONDS);
System.out.println(future1.get());

2.增加failedFuture跟之前的completedFuture配对,前者创建一个指定异常的CompletableFuture,后者创建一个指定给定值的CompletableFuture。

public static <U> CompletableFuture<U> failedFuture(Throwable ex)
public static <U> CompletableFuture<U> completedFuture(U ,value)

3.增加了completedStagefailedStage,这2个方法返回CompletableFuture的继承类MinimalStage

public static <U> CompletionStage<U> completedStage(U value)
public static <U> CompletionStage<U> failedStage(Throwable ex)

4.增加了defaultExecutornewIncompleteFuture,可以让子类自己去实现

public Executor defaultExecutor()
public <U> CompletableFuture<U> newIncompleteFuture()

总结


CompletableFuture类对多线程调度的支持还是挺强大的,本文主要介绍了一些常用的方法,对于其他方法,大家可以查看api或者CompletionStage接口中定义的方法选择使用。

相关文章
|
7月前
|
Java API 网络架构
Java 线程中CompletableFuture的例子
Java 线程中CompletableFuture的例子
71 0
|
2月前
|
Java 流计算
利用java8 的 CompletableFuture 优化 Flink 程序
本文探讨了Flink使用avatorscript脚本语言时遇到的性能瓶颈,并通过CompletableFuture优化代码,显著提升了Flink的QPS。文中详细介绍了avatorscript的使用方法,包括自定义函数、从Map中取值、使用Java工具类及AviatorScript函数等,帮助读者更好地理解和应用avatorscript。
利用java8 的 CompletableFuture 优化 Flink 程序
|
5月前
|
Java API 数据库
深研Java异步编程:CompletableFuture与反应式编程范式的融合实践
【7月更文挑战第1天】Java 8的CompletableFuture革新了异步编程,提供链式处理和优雅的错误处理。反应式编程,如Project Reactor,强调数据流和变化传播,擅长处理大规模并发和延迟敏感任务。两者结合,如通过Mono转换CompletableFuture,兼顾灵活性与资源管理,提升现代Java应用的并发性能和响应性。开发者可按需选择和融合这两种技术,以适应高并发环境。
54 1
|
6月前
|
Java API 数据库
深研Java异步编程:CompletableFuture与反应式编程范式的融合实践
【6月更文挑战第30天】Java 8的CompletableFuture革新了异步编程,提供如thenApply等流畅接口,而Java 9后的反应式编程(如Reactor)强调数据流和变化传播,以事件驱动应对高并发。两者并非竞争关系,而是互补,通过Flow API和第三方库结合,如将CompletableFuture转换为Mono进行反应式处理,实现更高效、响应式的系统设计。开发者可根据需求灵活选用,提升现代Java应用的并发性能。
90 1
|
15天前
|
SQL Rust Java
Java 8 异步编程利器:CompletableFuture
Java 8引入了CompletableFuture,这是一个强大的异步编程工具,增强了Future的功能,支持链式调用、任务组合与异常处理等特性,使异步编程更加直观和高效。本文详细介绍了CompletableFuture的基本概念、用法及高级功能,帮助开发者更好地掌握这一工具。
|
15天前
|
JavaScript Java 中间件
Java CompletableFuture 异步超时实现探索
本文探讨了在JDK 8中`CompletableFuture`缺乏超时中断任务能力的问题,提出了一种异步超时实现方案,通过自定义工具类模拟JDK 9中`orTimeout`方法的功能,解决了任务超时无法精确控制的问题,适用于多线程并行执行优化场景。
|
6月前
|
设计模式 Java API
实战分析Java的异步编程,并通过CompletableFuture进行高效调优
【6月更文挑战第7天】实战分析Java的异步编程,并通过CompletableFuture进行高效调优
98 2
|
5月前
|
并行计算 算法 Java
Java面试题:解释Java中的无锁编程的概念,Java中的Fork/Join框架的作用和使用场景,Java中的CompletableFuture的作用和使用场景
Java面试题:解释Java中的无锁编程的概念,Java中的Fork/Join框架的作用和使用场景,Java中的CompletableFuture的作用和使用场景
36 0
|
5月前
|
安全 Java 数据库连接
Java面试题:解释Java内存模型的无锁编程支持,并讨论其优势和局限性,解释Java中的CompletableFuture的工作原理,并讨论其在异步编程中的应用
Java面试题:解释Java内存模型的无锁编程支持,并讨论其优势和局限性,解释Java中的CompletableFuture的工作原理,并讨论其在异步编程中的应用
30 0
|
6月前
|
存储 算法 Java
Java8 CompletableFuture:异步编程的瑞士军刀
Java8 CompletableFuture:异步编程的瑞士军刀
126 2