Java8新的异步编程方式 CompletableFuture(二)

简介: Java8新的异步编程方式 CompletableFuture(二)

上一篇文章,讲述了Future模式的机制、缺点,CompletableFuture产生的由来、静态工厂方法、complete()方法等等。


本文将继续整理CompletableFuture的特性。


3.3 转换



我们可以通过CompletableFuture来异步获取一组数据,并对数据进行一些转换,类似RxJava、Scala的map、flatMap操作。


3.3.1 map


方法名 描述
thenApply(Function<? super T,? extends U> fn) 接受一个Function<? super T,? extends U>参数用来转换CompletableFuture
thenApplyAsync(Function<? super T,? extends U> fn) 接受一个Function<? super T,? extends U>参数用来转换CompletableFuture,使用ForkJoinPool
thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) 接受一个Function<? super T,? extends U>参数用来转换CompletableFuture,使用指定的线程池


thenApply的功能相当于将CompletableFuture<T>转换成CompletableFuture<U>。

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello");
        future = future.thenApply(new Function<String, String>() {
            @Override
            public String apply(String s) {
                return s + " World";
            }
        }).thenApply(new Function<String, String>() {
            @Override
            public String apply(String s) {
                return s.toUpperCase();
            }
        });
        try {
            System.out.println(future.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }


再用lambda表达式简化一下

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello")
                .thenApply(s -> s + " World").thenApply(String::toUpperCase);
        try {
            System.out.println(future.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }


执行结果:

HELLO WORLD


下面的例子,展示了数据流的类型经历了如下的转换:String -> Integer -> Double。

CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> "10")
                .thenApply(Integer::parseInt)
                .thenApply(i->i*10.0);
        try {
            System.out.println(future.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }


执行结果:

100.0


3.3.2 flatMap


方法名 描述
thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) 在异步操作完成的时候对异步操作的结果进行一些操作,并且仍然返回CompletableFuture类型。
thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) 在异步操作完成的时候对异步操作的结果进行一些操作,并且仍然返回CompletableFuture类型。使用ForkJoinPool。
thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn,Executor executor) 在异步操作完成的时候对异步操作的结果进行一些操作,并且仍然返回CompletableFuture类型。使用指定的线程池。


thenCompose可以用于组合多个CompletableFuture,将前一个结果作为下一个计算的参数,它们之间存在着先后顺序。

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello")
                .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));
        try {
            System.out.println(future.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }


执行结果:

Hello World


下面的例子展示了多次调用thenCompose()

CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> "100")
                .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + "100"))
                .thenCompose(s -> CompletableFuture.supplyAsync(() -> Double.parseDouble(s)));
        try {
            System.out.println(future.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }


执行结果:

100100.0


3.4 组合



方法名 描述
thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) 当两个CompletableFuture都正常完成后,执行提供的fn,用它来组合另外一个CompletableFuture的结果。
thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) 当两个CompletableFuture都正常完成后,执行提供的fn,用它来组合另外一个CompletableFuture的结果。使用ForkJoinPool。
thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor) 当两个CompletableFuture都正常完成后,执行提供的fn,用它来组合另外一个CompletableFuture的结果。使用指定的线程池。


现在有CompletableFuture<T>、CompletableFuture<U>和一个函数(T,U)->V,thenCompose就是将CompletableFuture<T>和CompletableFuture<U>变为CompletableFuture<V>。

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "100");
        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 100);
        CompletableFuture<Double> future = future1.thenCombine(future2, (s, i) -> Double.parseDouble(s + i));
        try {
            System.out.println(future.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }


执行结果:

100100.0


使用thenCombine()之后future1、future2之间是并行执行的,最后再将结果汇总。这一点跟thenCompose()不同。


thenAcceptBoth跟thenCombine类似,但是返回CompletableFuture<Void>类型。


方法名 描述
thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action) 当两个CompletableFuture都正常完成后,执行提供的action,用它来组合另外一个CompletableFuture的结果。
thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action) 当两个CompletableFuture都正常完成后,执行提供的action,用它来组合另外一个CompletableFuture的结果。使用ForkJoinPool。
thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action, Executor executor) 当两个CompletableFuture都正常完成后,执行提供的action,用它来组合另外一个CompletableFuture的结果。使用指定的线程池。

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "100");
        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 100);
        CompletableFuture<Void> future = future1.thenAcceptBoth(future2, (s, i) -> System.out.println(Double.parseDouble(s + i)));
        try {
            future.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }


执行结果:

100100.0


3.5 计算结果完成时的处理



当CompletableFuture完成计算结果后,我们可能需要对结果进行一些处理。


3.5.1 执行特定的Action


方法名 描述
whenComplete(BiConsumer<? super T,? super Throwable> action) 当CompletableFuture完成计算结果时对结果进行处理,或者当CompletableFuture产生异常的时候对异常进行处理。
whenCompleteAsync(BiConsumer<? super T,? super Throwable> action) 当CompletableFuture完成计算结果时对结果进行处理,或者当CompletableFuture产生异常的时候对异常进行处理。使用ForkJoinPool。
whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor) 当CompletableFuture完成计算结果时对结果进行处理,或者当CompletableFuture产生异常的时候对异常进行处理。使用指定的线程池。

CompletableFuture.supplyAsync(() -> "Hello")
                .thenApply(s->s+" World")
                .thenApply(s->s+ "\nThis is CompletableFuture demo")
                .thenApply(String::toLowerCase)
                .whenComplete((result, throwable) -> System.out.println(result));


执行结果:

hello world
this is completablefuture demo


3.5.2 执行完Action可以做转换


方法名 描述
handle(BiFunction<? super T, Throwable, ? extends U> fn) 当CompletableFuture完成计算结果或者抛出异常的时候,执行提供的fn
handleAsync(BiFunction<? super T, Throwable, ? extends U> fn) 当CompletableFuture完成计算结果或者抛出异常的时候,执行提供的fn,使用ForkJoinPool。
handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) 当CompletableFuture完成计算结果或者抛出异常的时候,执行提供的fn,使用指定的线程池。

CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> "100")
                .thenApply(s->s+"100")
                .handle((s, t) -> s != null ? Double.parseDouble(s) : 0);
        try {
            System.out.println(future.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }


执行结果:

100100.0


在这里,handle()的参数是BiFunction,apply()方法返回R,相当于转换的操作。

@FunctionalInterface
public interface BiFunction<T, U, R> {
    /**
     * Applies this function to the given arguments.
     *
     * @param t the first function argument
     * @param u the second function argument
     * @return the function result
     */
    R apply(T t, U u);
    /**
     * Returns a composed function that first applies this function to
     * its input, and then applies the {@code after} function to the result.
     * If evaluation of either function throws an exception, it is relayed to
     * the caller of the composed function.
     *
     * @param <V> the type of output of the {@code after} function, and of the
     *           composed function
     * @param after the function to apply after this function is applied
     * @return a composed function that first applies this function and then
     * applies the {@code after} function
     * @throws NullPointerException if after is null
     */
    default <V> BiFunction<T, U, V> andThen(Function<? super R, ? extends V> after) {
        Objects.requireNonNull(after);
        return (T t, U u) -> after.apply(apply(t, u));
    }
}


而whenComplete()的参数是BiConsumer,accept()方法返回void。

@FunctionalInterface
public interface BiConsumer<T, U> {
    /**
     * Performs this operation on the given arguments.
     *
     * @param t the first input argument
     * @param u the second input argument
     */
    void accept(T t, U u);
    /**
     * Returns a composed {@code BiConsumer} that performs, in sequence, this
     * operation followed by the {@code after} operation. If performing either
     * operation throws an exception, it is relayed to the caller of the
     * composed operation.  If performing this operation throws an exception,
     * the {@code after} operation will not be performed.
     *
     * @param after the operation to perform after this operation
     * @return a composed {@code BiConsumer} that performs in sequence this
     * operation followed by the {@code after} operation
     * @throws NullPointerException if {@code after} is null
     */
    default BiConsumer<T, U> andThen(BiConsumer<? super T, ? super U> after) {
        Objects.requireNonNull(after);
        return (l, r) -> {
            accept(l, r);
            after.accept(l, r);
        };
    }
}


所以,handle()相当于whenComplete()+转换。


3.5.3 纯消费(执行Action)


方法名 描述
thenAccept(Consumer<? super T> action) 当CompletableFuture完成计算结果,只对结果执行Action,而不返回新的计算值
thenAcceptAsync(Consumer<? super T> action) 当CompletableFuture完成计算结果,只对结果执行Action,而不返回新的计算值,使用ForkJoinPool。
thenAcceptAsync(Consumer<? super T> action, Executor executor) 当CompletableFuture完成计算结果,只对结果执行Action,而不返回新的计算值


thenAccept()是只会对计算结果进行消费而不会返回任何结果的方法。

CompletableFuture.supplyAsync(() -> "Hello")
                .thenApply(s->s+" World")
                .thenApply(s->s+ "\nThis is CompletableFuture demo")
                .thenApply(String::toLowerCase)
                .thenAccept(System.out::print);


执行结果:

hello world
this is completablefuture demo


相关文章
|
28天前
|
Java 流计算
利用java8 的 CompletableFuture 优化 Flink 程序
本文探讨了Flink使用avatorscript脚本语言时遇到的性能瓶颈,并通过CompletableFuture优化代码,显著提升了Flink的QPS。文中详细介绍了avatorscript的使用方法,包括自定义函数、从Map中取值、使用Java工具类及AviatorScript函数等,帮助读者更好地理解和应用avatorscript。
利用java8 的 CompletableFuture 优化 Flink 程序
|
4月前
|
Java API 数据库
深研Java异步编程:CompletableFuture与反应式编程范式的融合实践
【7月更文挑战第1天】Java 8的CompletableFuture革新了异步编程,提供链式处理和优雅的错误处理。反应式编程,如Project Reactor,强调数据流和变化传播,擅长处理大规模并发和延迟敏感任务。两者结合,如通过Mono转换CompletableFuture,兼顾灵活性与资源管理,提升现代Java应用的并发性能和响应性。开发者可按需选择和融合这两种技术,以适应高并发环境。
50 1
|
5月前
|
Java API 数据库
深研Java异步编程:CompletableFuture与反应式编程范式的融合实践
【6月更文挑战第30天】Java 8的CompletableFuture革新了异步编程,提供如thenApply等流畅接口,而Java 9后的反应式编程(如Reactor)强调数据流和变化传播,以事件驱动应对高并发。两者并非竞争关系,而是互补,通过Flow API和第三方库结合,如将CompletableFuture转换为Mono进行反应式处理,实现更高效、响应式的系统设计。开发者可根据需求灵活选用,提升现代Java应用的并发性能。
67 1
|
4月前
|
并行计算 算法 Java
Java面试题:解释Java中的无锁编程的概念,Java中的Fork/Join框架的作用和使用场景,Java中的CompletableFuture的作用和使用场景
Java面试题:解释Java中的无锁编程的概念,Java中的Fork/Join框架的作用和使用场景,Java中的CompletableFuture的作用和使用场景
33 0
|
4月前
|
安全 Java 数据库连接
Java面试题:解释Java内存模型的无锁编程支持,并讨论其优势和局限性,解释Java中的CompletableFuture的工作原理,并讨论其在异步编程中的应用
Java面试题:解释Java内存模型的无锁编程支持,并讨论其优势和局限性,解释Java中的CompletableFuture的工作原理,并讨论其在异步编程中的应用
29 0
|
5月前
|
设计模式 Java API
实战分析Java的异步编程,并通过CompletableFuture进行高效调优
【6月更文挑战第7天】实战分析Java的异步编程,并通过CompletableFuture进行高效调优
84 2
|
5月前
|
存储 算法 Java
Java8 CompletableFuture:异步编程的瑞士军刀
Java8 CompletableFuture:异步编程的瑞士军刀
107 2
|
5月前
|
并行计算 Java API
Java8实战-CompletableFuture:组合式异步编程
Java8实战-CompletableFuture:组合式异步编程
58 0
|
6月前
|
Java API C++
Java8 CompletableFuture异步编程-进阶篇
Java8 CompletableFuture异步编程-进阶篇
|
6月前
|
消息中间件 缓存 NoSQL
Java多线程实战-CompletableFuture异步编程优化查询接口响应速度
Java多线程实战-CompletableFuture异步编程优化查询接口响应速度