搞定 CompletableFuture,并发异步编程和编写串行程序还有什么区别?你们要的多图长文(下)

简介: 搞定 CompletableFuture,并发异步编程和编写串行程序还有什么区别?你们要的多图长文(下)
CompletableFuture<String> comboText = CompletableFuture.supplyAsync(() -> {
          //可以注释掉做快速返回 start
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                throw new IllegalStateException(e);
            }
            log.info("👍");
          //可以注释掉做快速返回 end
            return "赞";
        })
                .thenApply(first -> {
                    log.info("在看");
                    return first + ", 在看";
                })
                .thenApply(second -> second + ", 转发");
        log.info("三连有没有?");
        log.info(comboText.get());


微信图片_20220511124502.png


对 thenApply 的调用并没有阻塞程序打印log,也就是前面说的通过回调通知机制, 这里你看到 thenApply 使用的是supplyAsync所用的线程,如果将supplyAsync 做快速返回,我们再来看一下运行结果:


微信图片_20220511124524.png


thenApply 此时使用的是主线程,所以:


串行的后续操作并不一定会和前序操作使用同一个线程


thenAccept


如果你不想从回调函数中返回任何结果,那可以使用 thenAccept


        final CompletableFuture<Void> voidCompletableFuture = CompletableFuture.supplyAsync(
                // 模拟远端API调用,这里只返回了一个构造的对象
                () -> Product.builder().id(12345L).name("颈椎/腰椎治疗仪").build())
                .thenAccept(product -> {
                    log.info("获取到远程API产品名称 " + product.getName());
                });
        voidCompletableFuture.get();


thenRun


thenAccept 可以从回调函数中获取前序执行的结果,但thenRun 却不可以,因为它的回调函数式表达式定义中没有任何参数


CompletableFuture.supplyAsync(() -> {
    //前序操作
}).thenRun(() -> {
    //串行的后需操作,无参数也无返回值
});


我们前面同样说过了,每个提供回调方法的函数都有两个异步(Async)变体,异步就是另外起一个线程


        CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> {
            log.info("前序操作");
            return "前需操作结果";
        }).thenApplyAsync(result -> {
            log.info("后续操作");
            return "后续操作结果";
        });


微信图片_20220511124646.png


到这里,相信你串行的操作你已经非常熟练了


thenCompose


日常的任务中,通常定义的方法都会返回 CompletableFuture 类型,这样会给后续操作留有更多的余地,假如有这样的业务(X呗是不是都有这样的业务呢?):


//获取用户信息详情
    CompletableFuture<User> getUsersDetail(String userId) {
        return CompletableFuture.supplyAsync(() -> User.builder().id(12345L).name("日拱一兵").build());
    }
    //获取用户信用评级
    CompletableFuture<Double> getCreditRating(User user) {
        return CompletableFuture.supplyAsync(() -> CreditRating.builder().rating(7.5).build().getRating());
    }


这时,如果我们还是使用 thenApply() 方法来描述串行关系,返回的结果就会发生 CompletableFuture 的嵌套


        CompletableFuture<CompletableFuture<Double>> result = completableFutureCompose.getUsersDetail(12345L)
                .thenApply(user -> completableFutureCompose.getCreditRating(user));


显然这不是我们想要的,如果想“拍平” 返回结果,thenCompose 方法就派上用场了


CompletableFuture<Double> result = completableFutureCompose.getUsersDetail(12345L)
                .thenCompose(user -> completableFutureCompose.getCreditRating(user));


这个和 Lambda 的map 和 flatMap 的道理是一样一样滴


thenCombine


如果要聚合两个独立 Future 的结果,那么 thenCombine 就会派上用场了


        CompletableFuture<Double> weightFuture = CompletableFuture.supplyAsync(() -> 65.0);
        CompletableFuture<Double> heightFuture = CompletableFuture.supplyAsync(() -> 183.8);
        CompletableFuture<Double> combinedFuture = weightFuture
                .thenCombine(heightFuture, (weight, height) -> {
                    Double heightInMeter = height/100;
                    return weight/(heightInMeter*heightInMeter);
                });
        log.info("身体BMI指标 - " + combinedFuture.get());


微信图片_20220511124850.png


当然这里多数时处理两个 Future 的关系,如果超过两个Future,如何处理他们的一些聚合关系呢?


allOf | anyOf


相信你看到方法的签名,你已经明白他的用处了,这里就不再介绍了


        Integer age = -1;
        CompletableFuture<String> maturityFuture = CompletableFuture.supplyAsync(() -> {
            if( age < 0 ) {
                throw new IllegalArgumentException("何方神圣?");
            }
            if(age > 18) {
                return "大家都是成年人";
            } else {
                return "未成年禁止入内";
            }
        }).thenApply((str) -> {
            log.info("游戏开始");
            return str;
        }).exceptionally(ex -> {
            log.info("必有蹊跷,来者" + ex.getMessage());
            return "Unknown!";
        });
        log.info(maturityFuture.get());


微信图片_20220511124952.png


exceptionally 就相当于 catch,出现异常,将会跳过 thenApply 的后续操作,直接捕获异常,进行一场处理


handle


用多线程,良好的习惯是使用 try/finally 范式,handle 就可以起到 finally 的作用,对上述程序做一个小小的更改, handle 接受两个参数,一个是正常返回值,一个是异常


注意:handle的写法也算是范式的一种


        Integer age = -1;
        CompletableFuture<String> maturityFuture = CompletableFuture.supplyAsync(() -> {
            if( age < 0 ) {
                throw new IllegalArgumentException("何方神圣?");
            }
            if(age > 18) {
                return "大家都是成年人";
            } else {
                return "未成年禁止入内";
            }
        }).thenApply((str) -> {
            log.info("游戏开始");
            return str;
        }).handle((res, ex) -> {
            if(ex != null) {
                log.info("必有蹊跷,来者" + ex.getMessage());
                return "Unknown!";
            }
            return res;
        });
        log.info(maturityFuture.get());


到这里,关于 CompletableFuture 的基本使用你已经了解的差不多了,不知道你是否注意,我们前面说的带有 Sync 的方法是单独起一个线程来执行,但是我们并没有创建线程,这是怎么实现的呢?


微信图片_20220511125046.png


细心的朋友如果仔细看每个变种函数的第三个方法也许会发现里面都有一个 Executor 类型的参数,用于指定线程池,因为实际业务中我们是严谨手动创建线程的,这在 我会手动创建线程,为什么要使用线程池?文章中明确说明过;如果没有指定线程池,那自然就会有一个默认的线程池,也就是 ForkJoinPool


private static final Executor ASYNC_POOL = USE_COMMON_POOL ?
    ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();


ForkJoinPool 的线程数默认是 CPU 的核心数。但是,在前序文章中明确说明过:


不要所有业务共用一个线程池,因为,一旦有任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能


总结


CompletableFuture 的方法并没有全部介绍完全,也没必要全部介绍,相信大家按照这个思路来理解 CompletableFuture 也不会有什么大问题了,剩下的就交给实践/时间以及自己的体会了


后记


你以为 JDK1.8 CompletableFuture 已经很完美了是不是,但追去完美的道路上永无止境,Java 9 对CompletableFuture 又做了部分升级和改造


  1. 添加了新的工厂方法


  1. 支持延迟和超时处理
orTimeout()
completeOnTimeout()


  1. 改进了对子类的支持


详情可以查看: Java 9 CompletableFuture API Improvements. 怎样快速的切换不同 Java 版本来尝鲜?SDKMAN 统一灵活管理多版本Java 这篇文章的方法送给你

最后咱们再泡一壶茶,感受一下新变化吧


灵魂追问


  1. 听说 ForkJoinPool 线程池效率更高,为什么呢?


  1. 如果批量处理异步程序,有什么可用的方案吗?
目录
打赏
0
0
0
0
1
分享
相关文章
【专栏】Kotlin在Android开发中的多线程优化,包括线程池、协程的使用,任务分解、避免阻塞操作以及资源管理
【4月更文挑战第27天】本文探讨了Kotlin在Android开发中的多线程优化,包括线程池、协程的使用,任务分解、避免阻塞操作以及资源管理。通过案例分析展示了网络请求、图像处理和数据库操作的优化实践。同时,文章指出并发编程的挑战,如性能评估、调试及兼容性问题,并强调了多线程优化对提升应用性能的重要性。开发者应持续学习和探索新的优化策略,以适应移动应用市场的竞争需求。
242 5
|
9月前
|
并发编程之线程池的应用以及一些小细节的详细解析
并发编程之线程池的应用以及一些小细节的详细解析
49 0
|
7月前
|
告别低效编程!Python线程与进程并发技术详解,让你的代码飞起来!
【7月更文挑战第9天】Python并发编程提升效率:**理解并发与并行,线程借助`threading`模块处理IO密集型任务,受限于GIL;进程用`multiprocessing`实现并行,绕过GIL限制。示例展示线程和进程创建及同步。选择合适模型,注意线程安全,利用多核,优化性能,实现高效并发编程。
89 3
难懂,误点!将多线程技术应用于Python的异步事件循环
难懂,误点!将多线程技术应用于Python的异步事件循环
112 0
别再盲目编码!一文读懂Python线程与进程的使用场景与限制,助你成为并发编程高手!
【7月更文挑战第8天】Python并发编程提升效率,关键在于理解线程和进程的适用场景。I/O密集型任务如Web服务器适合用线程,示例展示了使用`threading`处理HTTP请求。CPU密集型任务则利用`multiprocessing`创建进程,绕过GIL限制,实现多核利用。注意线程的GIL限制和进程的开销,选择合适模型以优化并发性能。
53 0
|
9月前
|
C++
C++语言多线程学习应用案例
使用C++ `std::thread`和`std::mutex`实现多线程同步。示例创建两个线程`t1`、`t2`,共享资源`shared_resource`,每个线程调用`increase`函数递增资源值。互斥锁确保在任何时候只有一个线程访问资源,防止数据竞争。最后输出资源总值。
45 1
Java多线程实战-CompletableFuture异步编程优化查询接口响应速度
Java多线程实战-CompletableFuture异步编程优化查询接口响应速度
js开发:请解释同步和异步编程的区别。
同步编程按顺序执行,易阻塞;异步编程不阻塞,提高效率。同步适合简单操作,异步适合并发场景。示例展示了JavaScript中同步和异步函数的使用。
49 0
|
9月前
|
多线程和异步编程:解释 iOS 中的同步和异步任务的概念。
多线程和异步编程:解释 iOS 中的同步和异步任务的概念。
155 1
一网打尽异步神器CompletableFuture
最近一直畅游在RocketMQ的源码中,发现在RocketMQ中很多地方都使用到了CompletableFuture,所以今天就跟大家来聊一聊JDK1.8提供的异步神器CompletableFuture,并且最后会结合RocketMQ源码分析一下CompletableFuture的使用。