哪个线程执行 CompletableFuture’s tasks 和 callbacks?

简介:

CompletableFuture尽管在2014年的三月随着Java8被提出来,但它现在仍然是一种相对较新潮的概念。但也许这个类不为人所熟知是好事,因为它很容易被滥用,特别是涉及到使用线程和线程池的时候。而这篇文章的目的就是要描述线程是怎样使用CompletableFuture的。

Running tasks

这是API的基础部分,它有一个很实用的supplyAsync()方法,这个方法和ExecutorService.submit()很像,但不同的是返回CompletableFuture:

1 CompletableFuture.supplyAsync(() -> {
2             try (InputStream is = new URL("http://www.nurkiewicz.com").openStream()) {
3                 log.info("Downloading");
4                 return IOUtils.toString(is, StandardCharsets.UTF_8);
5             catch (IOException e) {
6                 throw new RuntimeException(e);
7             }
8         });

问题是supplyAsync()默认使用 ForkJoinPool.commonPool(),线程池由所有的CompletableFutures分享,所有的并行流和所有的应用都部署在同一个虚拟机上(如果你很不幸的仍在使用有很多人工部署的应用服务器)。这种硬编码的,不可配置的线程池完全超出了我们的控制,很难去监测和度量。因此你应该指定你自己的Executor,就像这里(也可以看看这里几种创造这样Exetutor的方法):

1 ExecutorService pool = Executors.newFixedThreadPool(10);
2  
3 final CompletableFuture future =
4         CompletableFuture.supplyAsync(() -> {
5             //...
6         }, pool);

这仅仅是开始…

Callbacks and transformations

假如你想转换给定的CompletableFuture,例如提取String的长度:

1 CompletableFuture intFuture =
2     future.thenApply(s -> s.length());

那么是谁调用了s.length()?坦白点,我一点也不在乎。只要涉及到lambda表达式,那么所有的执行者像thenApply这样的就是廉价的,我们并不关心是谁调用了lambda表达式。但如果这样的表达式会占用一点点的CPU来完成阻塞的网络通信那又会如何呢?

首先默认情况下会发生什么?试想一下:我们有一个返回String类型的后台任务,当结果完成时我们想要异步地去执行特定的变换。最容易的实现方法是通过包装一个原始的任务(返回String),任务完成时截获它。当内部的task结束后,回调就开始执行,执行变换和返回改进的值。就像有一个面介于我们的代码和初始的计算结果之间(个人看法:这里指的是下面的future里面包含的task执行完毕返回结果s,然后立马执行callback也就是thenApply里面的lambda表达式,这也就是为什么作者说有一个面位于初始计算结果和回调执行代码之间)。那就是说这应该相当明显了,s.length()的变换会在和执行原始任务相同的线程里完成,哈?并不完全是这样!(这里指的是有时候变换的线程和执行原始任务的线程不是同一个线程,看下面就知道)

01 CompletableFuture future =
02         CompletableFuture.supplyAsync(() -> {
03             sleepSeconds(2);
04             return "ABC";
05         }, pool);
06  
07 future.thenApply(s -> {
08     log.info("First transformation");
09     return s.length();
10 });
11  
12 future.get();
13 pool.shutdownNow();
14 pool.awaitTermination(1, TimeUnit.MINUTES);
15  
16 future.thenApply(s -> {
17     log.info("Second transformation");
18     return s.length();
19 });

如果future里面的task还在运行,那么包含first transformation的 thenApply()就会一直处于挂起状态。而这个task完成后thenApply()会立即执行,执行的线程和执行task的线程是同一个。然而在注册第二次变换之前(也就是执行第二个thenApply()),我们将一直等待直到task完成(和第一个变换是一样的,都需要等待)。更坏的情况是,我们完全地关闭了线程池,保证其他的代码将不会执行。那么哪个线程将要执行二次变换呢?我们都知道当注册了callback的future完成时,二次变换必定会立刻执行。这就是说它是使用默认的主线程(来完成callback),上面的代码输出如下:

pool-1-thread-1 | First transformation      main | Second transformation

二次变换在注册的时候就意识到CompletableFuture已经完成了(指的是future里面的task已经返回结果,其实在第一次调用thenApply()之前就已经返回了,所以这一次不用等待task),因此它立刻执行了变换。由于此时已经没有其他的线程,所以thenApply()就只能在当前的main线程环境中被调用。最主要的原因还是因为这种行为机制在实际的变换成本很高时(如很耗时)很容易出错。想象一下thenApply()内部的lambda表达式在进行一些繁重的计算或者阻塞的网络调用,突然我们的异步 CompletableFuture阻塞了调用者线程!

Controlling callback’s thread pool

有两种技术去控制执行回调和变换的线程,需要注意的是这些方法仅仅适用你的变换需要很高成本的时候,其他情况下可以忽略。那么第一个方法可以选择使用操作者的 *Async方法,例如:

1 future.thenApplyAsync(s -> {
2     log.info("Second transformation");
3     return s.length();
4 });

这一次second transformation被自动地卸载到了我们的老朋友线程ForkJoinPool.commonPool()中去了:

1 pool-1-thread-1                  | First transformation
2 ForkJoinPool.commonPool-worker-1 | Second transformation

但我们并不喜欢commonPool,所以我们提供自己的:

1 future.thenApplyAsync(s -> {
2     log.info("Second transformation");
3     return s.length();
4 }, pool2);

注意到这里使用的是不同的线程池(pool-1 vs. pool-2):

1 pool-1-thread-1 | First transformation
2 pool-2-thread-1 | Second transformation

Treating callback like another computation step

我相信如果你在处理一些长时间运行的callbacks和transformations上有些麻烦(记住这篇文章同样也适用于CompletableFuture的其他大部分方法),你应该简单地使用其他表意明确的CompletableFuture,就像这样:

01 //Imagine this is slow and costly
02 CompletableFuture<Integer> strLen(String s) {
03     return CompletableFuture.supplyAsync(
04             () -> s.length(),
05             pool2);
06 }
07  
08 //...
09  
10 CompletableFuture<Integer> intFuture =
11         future.thenCompose(s -> strLen(s));

这种方法更加明确,知道我们的变换有很大的开销,我们不会将它运行在一些随意的不可控的线程上。取而代之的是我们会将String到CompletableFuture<Integer>的变换封装为一个异步操作。然而,我们必须用thenCompose()取代thenApply(),否则的话我们会得到CompletableFuture<CompletableFuture<Integer>>.

但如果我们的transformation 没有一个能够很好地处理嵌套CompletableFuture的形式怎么办,如applyToEither()会等待第一个Future完成然后执行transformation.

1 CompletableFuture<CompletableFuture<Integer>> poor =
2         future1.applyToEither(future2, s -> strLen(s));

这里有个很实用的技巧,用来“展开”这类难以理解的数据结构,这种技巧叫flatten,通过使用flatMap(identity) (or flatMap(x -> x))。在我们的例子中flatMap()就叫做thenCompose:

1 CompletableFuture<Integer> good =
2         poor.thenCompose(x -> x);

我把它留给你,去弄懂它是怎样和为什么这样工作的。我想这篇文章已经尽量清楚地阐述了线程是如何参与到CompletableFuture中去的。

转载自 并发编程网 - ifeve.com

相关文章
|
6月前
|
Java API 网络架构
Java 线程中CompletableFuture的例子
Java 线程中CompletableFuture的例子
71 0
Java CompletableFuture:allOf等待所有异步线程任务结束(4)
Java CompletableFuture:allOf等待所有线程任务结束(4) private void method() throws ExecutionException, InterruptedExcept...
6387 0
|
3月前
|
Java 数据库
异步&线程池 CompletableFuture 异步编排 实战应用 【终结篇】
这篇文章通过一个电商商品详情页的实战案例,展示了如何使用`CompletableFuture`进行异步编排,以解决在不同数据库表中查询商品信息的问题,并提供了详细的代码实现和遇到问题(如图片未显示)的解决方案。
异步&线程池 CompletableFuture 异步编排 实战应用 【终结篇】
|
3月前
|
Java
异步&线程池 CompletableFuture 异步编排 【下篇】
这篇文章深入探讨了Java中的`CompletableFuture`类,解释了如何创建异步操作、使用计算完成时的回调方法、异常处理、串行化方法、任务组合以及多任务组合的使用方式,并通过代码示例展示了各种场景下的应用。
异步&线程池 CompletableFuture 异步编排 【下篇】
|
6月前
|
消息中间件 缓存 NoSQL
Java多线程实战-CompletableFuture异步编程优化查询接口响应速度
Java多线程实战-CompletableFuture异步编程优化查询接口响应速度
|
6月前
|
Java API
java多线程之FutureTask、Future、CompletableFuture
java多线程之FutureTask、Future、CompletableFuture
277 0
|
存储 Dubbo Java
JUC第二十五讲:JUC线程池-CompletableFuture 实现原理与实践
JUC第二十五讲:JUC线程池-CompletableFuture 实现原理与实践
150 0
|
JavaScript 前端开发 Java
JUC-Java多线程Future,CompletableFuture
JUC-Java多线程Future,CompletableFuture
JUC-Java多线程Future,CompletableFuture
多线程之 completableFuture
Callable与Runnable的功能大致相似,但是call()函数有返回值. Callable一般是和ExecutorService配合来使用的
|
Java
Java多线程Future与CompletableFuture-异步获取接口返回结果
当调用一些耗时接口时,如果我们一直在原地等待方法返回,整体程序的运行效率会大大降低。可以把调用的过程放到子线程去执行,再通过 Future 去控制子线程的调用过程,最后获取到调用结果,来提高整个程序的运行效率。
2448 0
下一篇
无影云桌面