Java8新的异步编程方式 CompletableFuture(三)

简介: Java8新的异步编程方式 CompletableFuture(三)

前面两篇文章已经整理了CompletableFuture大部分的特性,本文会整理完CompletableFuture余下的特性,以及将它跟RxJava进行比较。


3.6  Either



Either 表示的是两个CompletableFuture,当其中任意一个CompletableFuture计算完成的时候就会执行。


方法名 描述
acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action) 当任意一个CompletableFuture完成的时候,action这个消费者就会被执行。
acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action) 当任意一个CompletableFuture完成的时候,action这个消费者就会被执行。使用ForkJoinPool
acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor) 当任意一个CompletableFuture完成的时候,action这个消费者就会被执行。使用指定的线程池

Random random = new Random();
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(()->{
            try {
                Thread.sleep(random.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "from future1";
        });
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(()->{
            try {
                Thread.sleep(random.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "from future2";
        });
        CompletableFuture<Void> future =  future1.acceptEither(future2,str->System.out.println("The future is "+str));
        try {
            future.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }


执行结果:The future is from future1 或者 The future is from future2。


因为future1和future2,执行的顺序是随机的。


applyToEither 跟 acceptEither 类似。


方法名 描述
applyToEither(CompletionStage<? extends T> other, Function<? super T,U> fn) 当任意一个CompletableFuture完成的时候,fn会被执行,它的返回值会当作新的CompletableFuture<U>的计算结果。
applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn) 当任意一个CompletableFuture完成的时候,fn会被执行,它的返回值会当作新的CompletableFuture<U>的计算结果。使用ForkJoinPool
applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn, Executor executor) 当任意一个CompletableFuture完成的时候,fn会被执行,它的返回值会当作新的CompletableFuture<U>的计算结果。使用指定的线程池

Random random = new Random();
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(()->{
            try {
                Thread.sleep(random.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "from future1";
        });
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(()->{
            try {
                Thread.sleep(random.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "from future2";
        });
        CompletableFuture<String> future =  future1.applyToEither(future2,str->"The future is "+str);
        try {
            System.out.println(future.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }


执行结果也跟上面的程序类似。


3.7 其他方法



allOf、anyOf是CompletableFuture的静态方法。


3.7.1 allOf


方法名 描述
allOf(CompletableFuture<?>... cfs) 在所有Future对象完成后结束,并返回一个future。


allOf()方法所返回的CompletableFuture,并不能组合前面多个CompletableFuture的计算结果。于是我们借助Java 8的Stream来组合多个future的结果。

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "tony");
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "cafei");
        CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "aaron");
        CompletableFuture.allOf(future1, future2, future3)
                .thenApply(v ->
                Stream.of(future1, future2, future3)
                        .map(CompletableFuture::join)
                        .collect(Collectors.joining(" ")))
                .thenAccept(System.out::print);


执行结果:

tony cafei aaron


3.7.2 anyOf


方法名 描述
anyOf(CompletableFuture<?>... cfs) 在任何一个Future对象结束后结束,并返回一个future。

Random rand = new Random();
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(rand.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "from future1";
        });
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(rand.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "from future2";
        });
        CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(rand.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "from future3";
        });
        CompletableFuture<Object> future =  CompletableFuture.anyOf(future1,future2,future3);
        try {
            System.out.println(future.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }


使用anyOf()时,只要某一个future完成,就结束了。所以执行结果可能是"from future1"、"from future2"、"from future3"中的任意一个。


anyOf 和 acceptEither、applyToEither的区别在于,后两者只能使用在两个future中,而anyOf可以使用在多个future中。


3.8 CompletableFuture异常处理



CompletableFuture在运行时如果遇到异常,可以使用get()并抛出异常进行处理,但这并不是一个最好的方法。CompletableFuture本身也提供了几种方式来处理异常。


3.8.1 exceptionally


方法名 描述
exceptionally(Function<Throwable,? extends T> fn) 只有当CompletableFuture抛出异常的时候,才会触发这个exceptionally的计算,调用function计算值。

CompletableFuture.supplyAsync(() -> "hello world")
                .thenApply(s -> {
                    s = null;
                    int length = s.length();
                    return length;
                }).thenAccept(i -> System.out.println(i))
                .exceptionally(t -> {
                    System.out.println("Unexpected error:" + t);
                    return null;
                });


执行结果:

Unexpected error:java.util.concurrent.CompletionException: java.lang.NullPointerException


对上面的代码稍微做了一下修改,修复了空指针的异常。

CompletableFuture.supplyAsync(() -> "hello world")
                .thenApply(s -> {
//                    s = null;
                    int length = s.length();
                    return length;
                }).thenAccept(i -> System.out.println(i))
                .exceptionally(t -> {
                    System.out.println("Unexpected error:" + t);
                    return null;
                });


执行结果:

11


3.8.2 whenComplete


whenComplete 在上一篇文章其实已经介绍过了,在这里跟exceptionally的作用差不多,可以捕获任意阶段的异常。如果没有异常的话,就执行action。

CompletableFuture.supplyAsync(() -> "hello world")
                .thenApply(s -> {
                    s = null;
                    int length = s.length();
                    return length;
                }).thenAccept(i -> System.out.println(i))
                .whenComplete((result, throwable) -> {
                    if (throwable != null) {
                       System.out.println("Unexpected error:"+throwable);
                    } else {
                        System.out.println(result);
                    }
                });


执行结果:

Unexpected error:java.util.concurrent.CompletionException: java.lang.NullPointerException


跟whenComplete相似的方法是handle,handle的用法在上一篇文章中也已经介绍过。


四. CompletableFuture VS Java8  Stream VS RxJava1 & RxJava2



CompletableFuture 有很多特性跟RxJava很像,所以将CompletableFuture、Java 8 Stream和RxJava做一个相互的比较。


composable lazy resuable async cached push back pressure
CompletableFuture 支持 不支持 支持 支持 支持 支持 不支持
Stream 支持 支持 不支持 不支持 不支持 不支持 不支持
Observable(RxJava1) 支持 支持 支持 支持 支持 支持 支持
Observable(RxJava2) 支持 支持 支持 支持 支持 支持 不支持
Flowable(RxJava2) 支持 支持 支持 支持 支持 支持 支持


五. 总结



Java 8提供了一种函数风格的异步和事件驱动编程模型CompletableFuture,它不会造成堵塞。CompletableFuture背后依靠的是fork/join框架来启动新的线程实现异步与并发。

当然,我们也能通过指定线程池来做这些事情。


CompletableFuture特别是对微服务架构而言,会有很大的作为。举一个具体的场景,电商的商品页面可能会涉及到商品详情服务、商品评论服务、相关商品推荐服务等等。获取商品的信息时(/productdetails?productid=xxx),需要调用多个服务来处理这一个请求并返回结果。这里可能会涉及到并发编程,我们完全可以使用Java 8的CompletableFuture或者RxJava来实现。

相关文章
|
2月前
|
Java 流计算
利用java8 的 CompletableFuture 优化 Flink 程序
本文探讨了Flink使用avatorscript脚本语言时遇到的性能瓶颈,并通过CompletableFuture优化代码,显著提升了Flink的QPS。文中详细介绍了avatorscript的使用方法,包括自定义函数、从Map中取值、使用Java工具类及AviatorScript函数等,帮助读者更好地理解和应用avatorscript。
利用java8 的 CompletableFuture 优化 Flink 程序
|
5月前
|
Java API 数据库
深研Java异步编程:CompletableFuture与反应式编程范式的融合实践
【7月更文挑战第1天】Java 8的CompletableFuture革新了异步编程,提供链式处理和优雅的错误处理。反应式编程,如Project Reactor,强调数据流和变化传播,擅长处理大规模并发和延迟敏感任务。两者结合,如通过Mono转换CompletableFuture,兼顾灵活性与资源管理,提升现代Java应用的并发性能和响应性。开发者可按需选择和融合这两种技术,以适应高并发环境。
58 1
|
6月前
|
Java API 数据库
深研Java异步编程:CompletableFuture与反应式编程范式的融合实践
【6月更文挑战第30天】Java 8的CompletableFuture革新了异步编程,提供如thenApply等流畅接口,而Java 9后的反应式编程(如Reactor)强调数据流和变化传播,以事件驱动应对高并发。两者并非竞争关系,而是互补,通过Flow API和第三方库结合,如将CompletableFuture转换为Mono进行反应式处理,实现更高效、响应式的系统设计。开发者可根据需求灵活选用,提升现代Java应用的并发性能。
93 1
|
27天前
|
SQL Rust Java
Java 8 异步编程利器:CompletableFuture
Java 8引入了CompletableFuture,这是一个强大的异步编程工具,增强了Future的功能,支持链式调用、任务组合与异常处理等特性,使异步编程更加直观和高效。本文详细介绍了CompletableFuture的基本概念、用法及高级功能,帮助开发者更好地掌握这一工具。
|
27天前
|
JavaScript Java 中间件
Java CompletableFuture 异步超时实现探索
本文探讨了在JDK 8中`CompletableFuture`缺乏超时中断任务能力的问题,提出了一种异步超时实现方案,通过自定义工具类模拟JDK 9中`orTimeout`方法的功能,解决了任务超时无法精确控制的问题,适用于多线程并行执行优化场景。
|
6月前
|
设计模式 Java API
实战分析Java的异步编程,并通过CompletableFuture进行高效调优
【6月更文挑战第7天】实战分析Java的异步编程,并通过CompletableFuture进行高效调优
103 2
|
5月前
|
并行计算 算法 Java
Java面试题:解释Java中的无锁编程的概念,Java中的Fork/Join框架的作用和使用场景,Java中的CompletableFuture的作用和使用场景
Java面试题:解释Java中的无锁编程的概念,Java中的Fork/Join框架的作用和使用场景,Java中的CompletableFuture的作用和使用场景
40 0
|
5月前
|
安全 Java 数据库连接
Java面试题:解释Java内存模型的无锁编程支持,并讨论其优势和局限性,解释Java中的CompletableFuture的工作原理,并讨论其在异步编程中的应用
Java面试题:解释Java内存模型的无锁编程支持,并讨论其优势和局限性,解释Java中的CompletableFuture的工作原理,并讨论其在异步编程中的应用
33 0
|
6月前
|
存储 算法 Java
Java8 CompletableFuture:异步编程的瑞士军刀
Java8 CompletableFuture:异步编程的瑞士军刀
141 2
|
6月前
|
并行计算 Java API
Java8实战-CompletableFuture:组合式异步编程
Java8实战-CompletableFuture:组合式异步编程
74 0