搞定 CompletableFuture,并发异步编程和编写串行程序还有什么区别?你们要的多图长文(下)

简介: 搞定 CompletableFuture,并发异步编程和编写串行程序还有什么区别?你们要的多图长文(下)
CompletableFuture<String> comboText = CompletableFuture.supplyAsync(() -> {
          //可以注释掉做快速返回 start
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                throw new IllegalStateException(e);
            }
            log.info("👍");
          //可以注释掉做快速返回 end
            return "赞";
        })
                .thenApply(first -> {
                    log.info("在看");
                    return first + ", 在看";
                })
                .thenApply(second -> second + ", 转发");
        log.info("三连有没有?");
        log.info(comboText.get());


微信图片_20220511124502.png


对 thenApply 的调用并没有阻塞程序打印log,也就是前面说的通过回调通知机制, 这里你看到 thenApply 使用的是supplyAsync所用的线程,如果将supplyAsync 做快速返回,我们再来看一下运行结果:


微信图片_20220511124524.png


thenApply 此时使用的是主线程,所以:


串行的后续操作并不一定会和前序操作使用同一个线程


thenAccept


如果你不想从回调函数中返回任何结果,那可以使用 thenAccept


        final CompletableFuture<Void> voidCompletableFuture = CompletableFuture.supplyAsync(
                // 模拟远端API调用,这里只返回了一个构造的对象
                () -> Product.builder().id(12345L).name("颈椎/腰椎治疗仪").build())
                .thenAccept(product -> {
                    log.info("获取到远程API产品名称 " + product.getName());
                });
        voidCompletableFuture.get();


thenRun


thenAccept 可以从回调函数中获取前序执行的结果,但thenRun 却不可以,因为它的回调函数式表达式定义中没有任何参数


CompletableFuture.supplyAsync(() -> {
    //前序操作
}).thenRun(() -> {
    //串行的后需操作,无参数也无返回值
});


我们前面同样说过了,每个提供回调方法的函数都有两个异步(Async)变体,异步就是另外起一个线程


        CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> {
            log.info("前序操作");
            return "前需操作结果";
        }).thenApplyAsync(result -> {
            log.info("后续操作");
            return "后续操作结果";
        });


微信图片_20220511124646.png


到这里,相信你串行的操作你已经非常熟练了


thenCompose


日常的任务中,通常定义的方法都会返回 CompletableFuture 类型,这样会给后续操作留有更多的余地,假如有这样的业务(X呗是不是都有这样的业务呢?):


//获取用户信息详情
    CompletableFuture<User> getUsersDetail(String userId) {
        return CompletableFuture.supplyAsync(() -> User.builder().id(12345L).name("日拱一兵").build());
    }
    //获取用户信用评级
    CompletableFuture<Double> getCreditRating(User user) {
        return CompletableFuture.supplyAsync(() -> CreditRating.builder().rating(7.5).build().getRating());
    }


这时,如果我们还是使用 thenApply() 方法来描述串行关系,返回的结果就会发生 CompletableFuture 的嵌套


        CompletableFuture<CompletableFuture<Double>> result = completableFutureCompose.getUsersDetail(12345L)
                .thenApply(user -> completableFutureCompose.getCreditRating(user));


显然这不是我们想要的,如果想“拍平” 返回结果,thenCompose 方法就派上用场了


CompletableFuture<Double> result = completableFutureCompose.getUsersDetail(12345L)
                .thenCompose(user -> completableFutureCompose.getCreditRating(user));


这个和 Lambda 的map 和 flatMap 的道理是一样一样滴


thenCombine


如果要聚合两个独立 Future 的结果,那么 thenCombine 就会派上用场了


        CompletableFuture<Double> weightFuture = CompletableFuture.supplyAsync(() -> 65.0);
        CompletableFuture<Double> heightFuture = CompletableFuture.supplyAsync(() -> 183.8);
        CompletableFuture<Double> combinedFuture = weightFuture
                .thenCombine(heightFuture, (weight, height) -> {
                    Double heightInMeter = height/100;
                    return weight/(heightInMeter*heightInMeter);
                });
        log.info("身体BMI指标 - " + combinedFuture.get());


微信图片_20220511124850.png


当然这里多数时处理两个 Future 的关系,如果超过两个Future,如何处理他们的一些聚合关系呢?


allOf | anyOf


相信你看到方法的签名,你已经明白他的用处了,这里就不再介绍了


        Integer age = -1;
        CompletableFuture<String> maturityFuture = CompletableFuture.supplyAsync(() -> {
            if( age < 0 ) {
                throw new IllegalArgumentException("何方神圣?");
            }
            if(age > 18) {
                return "大家都是成年人";
            } else {
                return "未成年禁止入内";
            }
        }).thenApply((str) -> {
            log.info("游戏开始");
            return str;
        }).exceptionally(ex -> {
            log.info("必有蹊跷,来者" + ex.getMessage());
            return "Unknown!";
        });
        log.info(maturityFuture.get());


微信图片_20220511124952.png


exceptionally 就相当于 catch,出现异常,将会跳过 thenApply 的后续操作,直接捕获异常,进行一场处理


handle


用多线程,良好的习惯是使用 try/finally 范式,handle 就可以起到 finally 的作用,对上述程序做一个小小的更改, handle 接受两个参数,一个是正常返回值,一个是异常


注意:handle的写法也算是范式的一种


        Integer age = -1;
        CompletableFuture<String> maturityFuture = CompletableFuture.supplyAsync(() -> {
            if( age < 0 ) {
                throw new IllegalArgumentException("何方神圣?");
            }
            if(age > 18) {
                return "大家都是成年人";
            } else {
                return "未成年禁止入内";
            }
        }).thenApply((str) -> {
            log.info("游戏开始");
            return str;
        }).handle((res, ex) -> {
            if(ex != null) {
                log.info("必有蹊跷,来者" + ex.getMessage());
                return "Unknown!";
            }
            return res;
        });
        log.info(maturityFuture.get());


到这里,关于 CompletableFuture 的基本使用你已经了解的差不多了,不知道你是否注意,我们前面说的带有 Sync 的方法是单独起一个线程来执行,但是我们并没有创建线程,这是怎么实现的呢?


微信图片_20220511125046.png


细心的朋友如果仔细看每个变种函数的第三个方法也许会发现里面都有一个 Executor 类型的参数,用于指定线程池,因为实际业务中我们是严谨手动创建线程的,这在 我会手动创建线程,为什么要使用线程池?文章中明确说明过;如果没有指定线程池,那自然就会有一个默认的线程池,也就是 ForkJoinPool


private static final Executor ASYNC_POOL = USE_COMMON_POOL ?
    ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();


ForkJoinPool 的线程数默认是 CPU 的核心数。但是,在前序文章中明确说明过:


不要所有业务共用一个线程池,因为,一旦有任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能


总结


CompletableFuture 的方法并没有全部介绍完全,也没必要全部介绍,相信大家按照这个思路来理解 CompletableFuture 也不会有什么大问题了,剩下的就交给实践/时间以及自己的体会了


后记


你以为 JDK1.8 CompletableFuture 已经很完美了是不是,但追去完美的道路上永无止境,Java 9 对CompletableFuture 又做了部分升级和改造


  1. 添加了新的工厂方法


  1. 支持延迟和超时处理
orTimeout()
completeOnTimeout()


  1. 改进了对子类的支持


详情可以查看: Java 9 CompletableFuture API Improvements. 怎样快速的切换不同 Java 版本来尝鲜?SDKMAN 统一灵活管理多版本Java 这篇文章的方法送给你

最后咱们再泡一壶茶,感受一下新变化吧


灵魂追问


  1. 听说 ForkJoinPool 线程池效率更高,为什么呢?


  1. 如果批量处理异步程序,有什么可用的方案吗?
相关文章
|
22天前
|
前端开发 编译器 Android开发
构建高效Android应用:探究Kotlin协程的异步处理机制
【4月更文挑战第2天】在现代移动应用开发中,提供流畅且响应迅速的用户体验是至关重要的。随着Android平台的发展,Kotlin语言凭借其简洁性和功能性编程的特点成为了主流选择之一。特别地,Kotlin协程作为一种新型的轻量级线程管理机制,为开发者提供了强大的异步处理能力,从而显著提升了应用程序的性能和响应速度。本文将深入探讨Kotlin协程在Android中的应用,分析其原理、实现以及如何通过协程优化应用性能。
23 2
|
1月前
|
数据采集 存储 Java
「多线程大杀器」Python并发编程利器:ThreadPoolExecutor,让你一次性轻松开启多个线程,秒杀大量任务!
「多线程大杀器」Python并发编程利器:ThreadPoolExecutor,让你一次性轻松开启多个线程,秒杀大量任务!
|
2天前
|
Java
并发编程之线程池的应用以及一些小细节的详细解析
并发编程之线程池的应用以及一些小细节的详细解析
14 0
|
2天前
|
JavaScript 前端开发
js开发:请解释同步和异步编程的区别。
同步编程按顺序执行,易阻塞;异步编程不阻塞,提高效率。同步适合简单操作,异步适合并发场景。示例展示了JavaScript中同步和异步函数的使用。
12 0
|
17天前
|
C++
C++语言多线程学习应用案例
使用C++ `std::thread`和`std::mutex`实现多线程同步。示例创建两个线程`t1`、`t2`,共享资源`shared_resource`,每个线程调用`increase`函数递增资源值。互斥锁确保在任何时候只有一个线程访问资源,防止数据竞争。最后输出资源总值。
9 1
|
3月前
|
iOS开发
多线程和异步编程:解释 iOS 中的同步和异步任务的概念。
多线程和异步编程:解释 iOS 中的同步和异步任务的概念。
37 1
|
9月前
|
缓存 并行计算 算法
Python多线程与多进程教程:全面解析、代码案例与优化技巧
Python多线程与多进程教程:全面解析、代码案例与优化技巧
299 0
|
10月前
|
数据采集 程序员 调度
python爬虫中多线程的实现方式
python爬虫中多线程的实现方式
|
11月前
|
Java
高并发编程-自定义简易的线程池(1),体会原理
高并发编程-自定义简易的线程池(1),体会原理
57 0
|
11月前
|
Java
高并发编程-自定义简易的线程池(2),体会原理
高并发编程-自定义简易的线程池(2),体会原理
49 0