Java8 CompletableFuture异步编程-进阶篇

简介: Java8 CompletableFuture异步编程-进阶篇

前言

我们在前面文章讲解了CompletableFuture这个异步编程类的基本用法,这节我们继续学习CompletableFuture相关的进阶知识,上文入口:Java8 CompletableFuture异步编程-入门篇


1、异步任务的交互

异步任务交互指 将异步任务获取结果的速度相比较,按一定的规则( 先到先用 )进行下一步处理。


1.1 applyToEither

applyToEither() 把两个异步任务做比较,异步任务先到结果的,就对先到的结果进行下一步的操作。

CompletableFuture<R> applyToEither(CompletableFuture<T> other, Function<T,R> func)


演示案例:使用最先完成的异步任务的结果

public class ApplyToEitherDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 开启异步任务1
        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
            int x = new Random().nextInt(3);
            CommonUtils.sleepSecond(x);
            CommonUtils.printThreadLog("任务1耗时:" + x + "秒");
            return x;
        });
        // 开启异步任务2
        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
            int y = new Random().nextInt(3);
            CommonUtils.sleepSecond(y);
            CommonUtils.printThreadLog("任务2耗时:" + y + "秒");
            return y;
        });
        // 哪些异步任务的结果先到达,就使用哪个异步任务的结果
        CompletableFuture<Integer> future = future1.applyToEither(future2, (result -> {
            CommonUtils.printThreadLog("最先到达的结果:" + result);
            return result;
        }));
        // 主线程休眠4秒,等待所有异步任务完成
        CommonUtils.sleepSecond(4);
        Integer ret = future.get();
        CommonUtils.printThreadLog("ret = " + ret);
    }
}


速记心法:任务1、任务2就像两辆公交,哪路公交先到,就乘坐(使用)哪路公交。

以下是applyToEither 和其对应的异步回调版本

CompletableFuture<R> applyToEither(CompletableFuture<T> other, Function<T,R> func)
CompletableFuture<R> applyToEitherAsync(CompletableFuture<T> other, Function<T,R> func)
CompletableFuture<R> applyToEitherAsync(CompletableFuture<T> other, Function<T,R> func,Executor executor)


1.2 acceptEither

acceptEither() 把两个异步任务做比较,异步任务先到结果的,就对先到的结果进行下一步操作 ( 消费使用 )。

CompletableFuture<Void> acceptEither(CompletableFuture<T> other, Consumer<T> action)
CompletableFuture<Void> acceptEitherAsync(CompletableFuture<T> other, Consumer<T> action)  
CompletableFuture<Void> acceptEitherAsync(CompletableFuture<T> other, Consumer<T> action,Executor executor)

演示案例:使用最先完成的异步任务的结果

public class AcceptEitherDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 异步任务交互
        CommonUtils.printThreadLog("main start");
        // 开启异步任务1
        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
            int x = new Random().nextInt(3);
            CommonUtils.sleepSecond(x);
            CommonUtils.printThreadLog("任务1耗时:" + x + "秒");
            return x;
        });
        // 开启异步任务2
        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
            int y = new Random().nextInt(3);
            CommonUtils.sleepSecond(y);
            CommonUtils.printThreadLog("任务2耗时:" + y + "秒");
            return y;
        });
        // 哪些异步任务的结果先到达,就使用哪个异步任务的结果
        future1.acceptEither(future2,result -> {
            CommonUtils.printThreadLog("最先到达的结果:" + result);
        });
        // 主线程休眠4秒,等待所有异步任务完成
        CommonUtils.sleepSecond(4);
        CommonUtils.printThreadLog("main end");
    }
}


1.3 runAfterEither

如果不关心最先到达的结果,只想在有一个异步任务先完成时得到完成的通知,可以使用 runAfterEither() ,以下是它的相关方法:

CompletableFuture<Void> runAfterEither(CompletableFuture<T> other, Runnable action)
CompletableFuture<Void> runAfterEitherAsync(CompletableFuture<T> other, Runnable action)
CompletableFuture<Void> runAfterEitherAsync(CompletableFuture<T> other, Runnable action, Executor executor)

提示

异步任务交互的三个方法和之前学习的异步的回调方法 thenApply、thenAccept、thenRun 有异曲同工之妙。

2、get() 和 join() 区别

get() 和 join() 都是CompletableFuture提供的以阻塞方式获取结果的方法。

那么该如何选用呢?请看如下案例:

public class GetOrJoinDemo {
    public static void main(String[] args) {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            return "hello";
        });
        String ret = null;
        // 抛出检查时异常,必须处理
        try {
            ret = future.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        System.out.println("ret = " + ret);
        // 抛出运行时异常,可以不处理
        ret = future.join();
        System.out.println("ret = " + ret);
    }
}


使用时,我们发现,get() 抛出检查时异常 ,需要程序必须处理;而join() 方法抛出运行时异常,程序可以不处理。所以,join() 更适合用在流式编程中。

3、ParallelStream VS CompletableFuture

CompletableFuture 虽然提高了任务并行处理的能力,如果它和 Stream API 结合使用,能否进一步多个任务的并行处理能力呢?


同时,对于 Stream API 本身就提供了并行流ParallelStream,它们有什么不同呢?


我们将通过一个耗时的任务来体现它们的不同,更重要地是,我们能进一步加强 CompletableFuture 和 Stream API 的结合使用,同时搞清楚CompletableFuture 在流式操作的优势


需求:创建10个MyTask耗时的任务,统计它们执行完的总耗时定义一个MyTask类,来模拟耗时的长任务

public class MyTask {
    private int duration;
    public MyTask(int duration) {
        this.duration = duration;
    }
    // 模拟耗时的长任务
    public int doWork() {
        CommonUtils.printThreadLog("doWork");
        CommonUtils.sleepSecond(duration);
        return duration;
    }
}


同时,我们创建10个任务,每个持续1秒。

IntStream intStream = IntStream.range(0, 10);
List<MyTask> tasks = intStream.mapToObj(item -> {
    return new MyTask(1);
}).collect(Collectors.toList());


3.1 并行流的局限

我们先使用串行执行,让所有的任务都在主线程 main 中执行。

public class SequenceDemo {
    public static void main(String[] args) {
        // 方案一:在主线程中使用串行执行
        // step 1: 创建10个MyTask对象,每个任务持续1s,存入list集合便于启动Stream操作
        IntStream intStream = IntStream.range(0, 10);
        List<MyTask> tasks = intStream.mapToObj(item -> {
            return new MyTask(1);
        }).collect(Collectors.toList());
 
        // step 2: 执行tasks集合中的每个任务,统计总耗时
        long start = System.currentTimeMillis();
        List<Integer> result = tasks.stream().map(myTask -> {
            return myTask.doWork();
        }).collect(Collectors.toList());
        long end = System.currentTimeMillis();
        double costTime = (end - start) / 1000.0;
 
        System.out.printf("processed %d tasks cost %.2f second",tasks.size(),costTime);
    }
}


它花费了10秒, 因为每个任务在主线程一个接一个的执行。

因为涉及 Stream API,而且存在耗时的长任务,所以,我们可以使用 parallelStream()

public class ParallelDemo {
    public static void main(String[] args) {
        // 方案二:使用并行流
        // step 1: 创建10个MyTask对象,每个任务持续1s,存入List集合
        IntStream intStream = IntStream.range(0, 10);
        List<MyTask> tasks = intStream.mapToObj(item -> {
            return new MyTask(1);
        }).collect(Collectors.toList());
 
        // step 2: 执行10个MyTask,统计总耗时
        long start = System.currentTimeMillis();
        List<Integer> results = tasks.parallelStream().map(myTask -> {
            return myTask.doWork();
        }).collect(Collectors.toList());
        long end = System.currentTimeMillis();
 
        double costTime = (end - start) / 1000.0;
        System.out.printf("processed %d tasks %.2f second",tasks.size(),costTime);
    }
}


它花费了2秒多,因为此次并行执行使用了8个线程 (7个是ForkJoinPool线程池中的, 一个是 main 线程),需要注意是:运行结果由自己电脑CPU的核数决定。

3.2 CompletableFuture 在流式操作的优势

让我们看看使用CompletableFuture是否执行的更有效率

public class CompletableFutureDemo {
    public static void main(String[] args) {
        // 需求:创建10MyTask耗时的任务,统计它们执行完的总耗时
        // 方案三:使用CompletableFuture
        // step 1: 创建10个MyTask对象,每个任务持续1s,存入List集合
        IntStream intStream = IntStream.range(0, 10);
        List<MyTask> tasks = intStream.mapToObj(item -> {
            return new MyTask(1);
        }).collect(Collectors.toList());
 
        // step 2: 根据MyTask对象构建10个耗时的异步任务
        long start = System.currentTimeMillis();
        List<CompletableFuture<Integer>> futures = tasks.stream().map(myTask -> {
            return CompletableFuture.supplyAsync(() -> {
                return myTask.doWork();
            });
        }).collect(Collectors.toList());
 
        // step 3: 当所有任务完成时,获取每个异步任务的执行结果,存入List集合中
        List<Integer> results = futures.stream().map(future -> {
            return future.join();
        }).collect(Collectors.toList());
        long end = System.currentTimeMillis();
 
        double costTime = (end - start) / 1000.0;
        System.out.printf("processed %d tasks cost %.2f second",tasks.size(),costTime);
    }
}

运行发现,两者使用的时间大致一样。能否进一步优化呢?


CompletableFutures 比 ParallelStream 优点之一是你可以指定Executor去处理任务。你能选择更合适数量的线程。我们可以选择大于Runtime.getRuntime().availableProcessors() 数量的线程,如下所示:

public class CompletableFutureDemo2 {
    public static void main(String[] args) {
        // 需求:创建10MyTask耗时的任务,统计它们执行完的总耗时
        // 方案三:使用CompletableFuture
        // step 1: 创建10个MyTask对象,每个任务持续1s,存入List集合
        IntStream intStream = IntStream.range(0, 10);
        List<MyTask> tasks = intStream.mapToObj(item -> {
            return new MyTask(1);
        }).collect(Collectors.toList());
 
        // 准备线程池
        final int N_CPU = Runtime.getRuntime().availableProcessors();
        // 设置线程池的数量最少是10个,最大是16个
        ExecutorService executor = Executors.newFixedThreadPool(Math.min(tasks.size(), N_CPU * 2));
 
        // step 2: 根据MyTask对象构建10个耗时的异步任务
        long start = System.currentTimeMillis();
        List<CompletableFuture<Integer>> futures = tasks.stream().map(myTask -> {
            return CompletableFuture.supplyAsync(() -> {
                return myTask.doWork();
            },executor);
        }).collect(Collectors.toList());
 
        // step 3: 当所有任务完成时,获取每个异步任务的执行结果,存入List集合中
        List<Integer> results = futures.stream().map(future -> {
            return future.join();
        }).collect(Collectors.toList());
        long end = System.currentTimeMillis();
 
        double costTime = (end - start) / 1000.0;
        System.out.printf("processed %d tasks cost %.2f second",tasks.size(),costTime);
        // 关闭线程池
        executor.shutdown();
    }
}

测试代码时,电脑配置是4核8线程,而我们创建的线程池中线程数最少也是10个,所以,每个线程负责一个任务( 耗时1s ),总体来说,处理10个任务总共需要约1秒。

3.3 合理配置线程池中的线程数

正如我们看到的,CompletableFuture 可以更好地控制线程池中线程的数量,而 ParallelStream 不能。


问题1:如何选用 CompletableFuture 和 ParallelStream ?


如果你的任务是IO密集型的,你应该使用CompletableFuture;


如果你的任务是CPU密集型的,使用比处理器更多的线程是没有意义的,所以选择ParallelStream ,因为它不需要创建线程池,更容易使用。


问题2:IO密集型任务和CPU密集型任务的区别?


CPU密集型也叫计算密集型,此时,系统运行时大部分的状况是CPU占用率近乎100%,I/O在很短的时间就可以完成,而CPU还有许多运算要处理,CPU 使用率很高。比如说要计算1+2+3+…+ 10万亿、天文计算、圆周率后几十位等, 都是属于CPU密集型程序。


CPU密集型任务的特点:大量计算,CPU占用率一般都很高,I/O时间很短


IO密集型指大部分的状况是CPU在等I/O (硬盘/内存) 的读写操作,但CPU的使用率不高。


简单的说,就是需要大量的输入输出,例如读写文件、传输文件、网络请求。


IO密集型任务的特点:大量网络请求,文件操作,CPU运算少,很多时候CPU在等待资源才能进一步操作。


问题3:既然要控制线程池中线程的数量,多少合适呢?


如果是CPU密集型任务,就需要尽量压榨CPU,参考值可以设为 Ncpu+1


如果是IO密集型任务,参考值可以设置为 2 * Ncpu,其中Ncpu 表示 核心数。

总结


通过这两篇文章的讲解,我们基本学习了CompletableFuture这个异步编程类的基础用法和相关进阶玩法,不过总体上还是偏理论,我后续可以可能会开一篇新的专栏,专门讲解和分享Java高并发相关的代码片段,都是比较实用,请多多支持吧~


相关文章
|
1月前
|
Java 流计算
利用java8 的 CompletableFuture 优化 Flink 程序
本文探讨了Flink使用avatorscript脚本语言时遇到的性能瓶颈,并通过CompletableFuture优化代码,显著提升了Flink的QPS。文中详细介绍了avatorscript的使用方法,包括自定义函数、从Map中取值、使用Java工具类及AviatorScript函数等,帮助读者更好地理解和应用avatorscript。
利用java8 的 CompletableFuture 优化 Flink 程序
|
4月前
|
Java API 数据库
深研Java异步编程:CompletableFuture与反应式编程范式的融合实践
【7月更文挑战第1天】Java 8的CompletableFuture革新了异步编程,提供链式处理和优雅的错误处理。反应式编程,如Project Reactor,强调数据流和变化传播,擅长处理大规模并发和延迟敏感任务。两者结合,如通过Mono转换CompletableFuture,兼顾灵活性与资源管理,提升现代Java应用的并发性能和响应性。开发者可按需选择和融合这两种技术,以适应高并发环境。
54 1
|
5月前
|
Java API 数据库
深研Java异步编程:CompletableFuture与反应式编程范式的融合实践
【6月更文挑战第30天】Java 8的CompletableFuture革新了异步编程,提供如thenApply等流畅接口,而Java 9后的反应式编程(如Reactor)强调数据流和变化传播,以事件驱动应对高并发。两者并非竞争关系,而是互补,通过Flow API和第三方库结合,如将CompletableFuture转换为Mono进行反应式处理,实现更高效、响应式的系统设计。开发者可根据需求灵活选用,提升现代Java应用的并发性能。
73 1
|
5月前
|
设计模式 Java API
实战分析Java的异步编程,并通过CompletableFuture进行高效调优
【6月更文挑战第7天】实战分析Java的异步编程,并通过CompletableFuture进行高效调优
90 2
|
4月前
|
并行计算 算法 Java
Java面试题:解释Java中的无锁编程的概念,Java中的Fork/Join框架的作用和使用场景,Java中的CompletableFuture的作用和使用场景
Java面试题:解释Java中的无锁编程的概念,Java中的Fork/Join框架的作用和使用场景,Java中的CompletableFuture的作用和使用场景
34 0
|
4月前
|
安全 Java 数据库连接
Java面试题:解释Java内存模型的无锁编程支持,并讨论其优势和局限性,解释Java中的CompletableFuture的工作原理,并讨论其在异步编程中的应用
Java面试题:解释Java内存模型的无锁编程支持,并讨论其优势和局限性,解释Java中的CompletableFuture的工作原理,并讨论其在异步编程中的应用
29 0
|
5月前
|
存储 算法 Java
Java8 CompletableFuture:异步编程的瑞士军刀
Java8 CompletableFuture:异步编程的瑞士军刀
112 2
|
5月前
|
并行计算 Java API
Java8实战-CompletableFuture:组合式异步编程
Java8实战-CompletableFuture:组合式异步编程
62 0
|
6月前
|
消息中间件 缓存 NoSQL
Java多线程实战-CompletableFuture异步编程优化查询接口响应速度
Java多线程实战-CompletableFuture异步编程优化查询接口响应速度
|
Java API 网络架构
20个使用 Java CompletableFuture的示例(下)
20个使用 Java CompletableFuture的示例(下)
270 1