本篇文章将从实例和原理上介绍JDK8引入的CompletableFuture的使用方法,意在让未接触过CompletableFuture的同学能够读懂并正确的使用该功能。本文希望以浅显易懂的方式来帮助大家理解CompletableFuture的流式编程特点,只会涉及最常用类、方法和最核心的源码,感兴趣的同学可自行阅读完整源码来了解更多内容。另外文中代码会涉及JDK8的新特性,Lambda表达式和方法引用,这些内容需要先自行掌握。
研究CompletableFuture的动机源自一个跨团队合作项目,该项目中对方团队在过去大量的使用了CompletableStage来描述几乎所有的业务逻辑依赖关系,而本团队的同学之前都较少使用此特性,导致进入合作初期读写代码效率都不高,而按习惯的传统方法来写还会导致前后代码风格非常不一致,可能会使后续的开发维护工作更困难。随着越深入的了解CompletableFuture,越觉得通过此方式组织的代码在逻辑描述能力上相当的灵活优雅,故整理了以下内容供大家参考。
CompletableFuture类定义
public class CompletableFuture<T>
extends Object
implements java.util.concurrent.Future<T>, java.util.concurrent.CompletionStage<T>
CompletableFuture实现了Future和CompletionStage两个接口。
Future接口
其中Future大家应该都很熟悉了,在异步应用中也很常见,这里简单的回顾下普通模式和Future模式的区别:
可以看到当工作线程的结果我们并不急着需要的话,可以交给Future,然后主线程可以去做一些别的事情,当需要工作线程结果的时候,使用get()来尝试获取即可。注意get()方法是阻塞的,这也是Future常被吐槽的地方,另外Future无法表达任务间的依赖关系也是它的一个局限。
CompletableStage接口
CompletableStage用来表示异步过程中的一个阶段,它可以在另一个CompletableStage完成时做一些操作或计算,此接口中定义了一些基本的行为,通过这些行为组合可以简洁的描述非常复杂的任务。
常用的几个方法:
- thenApply 将上一个stage的结果转化成新的类型或值
- thenAccept 将上一个stage的结果进行消耗,无返回值
- thenRun 在上一个stage有结果后,执行一段新的操作
- thenCombine 结合两个CompletableStage的结果,转化成新的类型或值
- thenCompose 返回一个新的CompletableStage,并将上一个stage的结果作为新的stage的supplier
- exceptionally 当运算过程中遇到异常时的一个补偿处理
- handle 统一了对正常结果和异常结果的处理
大部分方法都有以Async结尾的方法,表示异步执行,后面会提到。更多信息可以参考jdk文档。
CompletableFuture的工作流
CompletableFuture初始化时可以处于completed和incompleted两种状态,先看两个最简单的例子。
初始化就completed
// base直接初始化成一个已完成的CompletableFuture,完成值是"completed"
CompletableFuture<String> base = CompletableFuture.completedFuture("completed");
log.info(base.get());
输出:
[INFO ] [2019-07-15 10:05:13] [main] completed
这里base对象是一个已完成的CompletableFuture,所以get()直接返回了"completed"。当然如果初始化时用了未完成的CompletableFuture,那么get()方法是会阻塞等待它完成,这个就成了Future模式,毕竟get()方法是在Future接口中定义的嘛。
初始化后主动complete
我们也可以在之后的代码中,或是其他线程中将它“完成”:
// 这是一个未完成的CompletableFuture
CompletableFuture<String> base = new CompletableFuture<>();
log.info("start another thread to complete it");
new Thread(
() -> {
log.info("will complete in 1 sec");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
base.complete("completed");
})
.start();
log.info(base.get());
输出:
[INFO ] [2019-07-15 14:32:26] [main] start another thread to complete it
[INFO ] [2019-07-15 14:32:26] [Thread-0] will complete in 1 sec
[INFO ] [2019-07-15 14:32:27] [main] completed
这个例子中主线程在调用get()方法时阻塞,Thread-0线程在sleep 1秒后调用complete()方法将base完成,主线程get()返回得到完成值completed。
异常complete
CompletableFuture<String> base = new CompletableFuture<>();
base.completeExceptionally(new RuntimeException(("runtime error")));
log.info(base.get());
输出:
Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.RuntimeException: runtime error
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at com.aliyun.completable.Main.main(Main.java:33)
Caused by: java.lang.RuntimeException: runtime error
at com.aliyun.completable.Main.main(Main.java:32)
在complete时发生异常,在base调用get()方法时抛出ExecutionException。
小结
我们可以得出最基本的一个流程,CompletableFuture是靠complete作为一个初始力来驱动的,虽然这不是它的全部,但至少得complete它才会去继续执行后面依赖它的一系列处理。
任务的依赖关系
同样,我们先跑一段代码,再来分析:
CompletableFuture<String> base = new CompletableFuture<>();
CompletableFuture<String> future = base.thenApply(s -> s + " 2").thenApply(s -> s + " 3");
base.complete("1");
log.info(future.get());
输出:
[INFO ] [2019-07-15 15:15:44] [main] 1 2 3
代码中用了2个thenApply的链式调用,在Lambda表达式助力下,显得非常优雅简洁。可能大家也觉得这个输出是完全可预期的,那么我再给出几段代码,大家先思考下输出是什么,稍后会从CompletableFuture相关源码来解读它的工作原理。
CompletableFuture<String> base = new CompletableFuture<>();
CompletableFuture<String> future = base.thenApply(s -> s + " 2").thenApply(s -> s + " 3");
future.complete("1");
log.info(future.get());
// base.complete("1");
// log.info(base.get());
// future.complete("1");
// log.info(base.get());
这里base和future对象,分别调用complete()和get()方法的排列组合,是不是开始有点懵了,算上前面的例子,这四种组合的结果是完全不一样的。
核心源码解读
本节通过对核心源码解读,来分析CompletableFuture是如何通过一系列链式方法调用来关联起来的。
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
// ......
volatile Object result; // Either the result or boxed AltResult
volatile Completion stack; // Top of Treiber stack of dependent actions
}
CompletableFuture有两个关键成员属性,一个是Completion对象stack,这是一个CAS实现的无锁并发栈,每个链式调用的任务会被压入这个栈。另一个是Object对象result,这是当前CompletableFuture的结果。
abstract static class Completion extends ForkJoinTask<Void>
implements Runnable, AsynchronousCompletionTask {
volatile Completion next; // Treiber stack link
/**
* Performs completion action if triggered, returning a
* dependent that may need propagation, if one exists.
*
* @param mode SYNC, ASYNC, or NESTED
*/
abstract CompletableFuture<?> tryFire(int mode);
/** Returns true if possibly still triggerable. Used by cleanStack. */
abstract boolean isLive();
public final void run() { tryFire(ASYNC); }
public final boolean exec() { tryFire(ASYNC); return true; }
public final Void getRawResult() { return null; }
public final void setRawResult(Void v) {}
}
Completion中的next保存了栈中下一个元素的引用,而CompletableFuture中的stack永远指向栈顶。
多个线程对同一个CompletableFuture对象complete时,只有一个会成功,所以CompletableFuture是线程安全且高效的。下面看下thenApply()方法做了什么。
public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor);
thenApply提供了类似的三个方法,以Async结尾的表示异步执行,如果传入Executor则以指定线程池执行,否则默认使用的线程池是ForkJoinPool,而ForkJoinPool里面的线程都是daemon线程,非daemon线程都结束后,虚拟机也就退出了。如果需要执行较长时间或执行内容比较重要不希望被中断而导致数据不一致的话,那就自己传一个Executor吧。类似的Async结尾的方法在CompletionStage接口中非常常见,后面就不再解释了。
看下CompletableFuture里的实现:
public <U> CompletableFuture<U> thenApply(
Function<? super T,? extends U> fn) {
return uniApplyStage(null, fn);
}
private <V> CompletableFuture<V> uniApplyStage(
Executor e, Function<? super T,? extends V> f) {
if (f == null) throw new NullPointerException();
CompletableFuture<V> d = new CompletableFuture<V>(); // 1.新建了一个CompletableFuture
if (e != null || !d.uniApply(this, f, null)) {
// 2. 用d,this和f构造了一个UniApply对象c。
UniApply<T,V> c = new UniApply<T,V>(e, d, this, f);
// 4. UniApply继承UniCompletion继承Completion,c其实就是Completion对象,被push到栈中
push(c);
// 5. 尝试执行c,
c.tryFire(SYNC);
}
// 注意这个d会一直返回到调用thenApply的地方,后续的链式调用会作用在这个d上面
return d;
}
static final class UniApply<T,V> extends UniCompletion<T,V> {
Function<? super T,? extends V> fn;
UniApply(Executor executor, CompletableFuture<V> dep,
CompletableFuture<T> src,
Function<? super T,? extends V> fn) {
// 3. UniCompletion中的dep和src分别就是第2步中的d和this,dep的执行依赖于src
super(executor, dep, src); this.fn = fn;
}
final CompletableFuture<V> tryFire(int mode) {
CompletableFuture<V> d; CompletableFuture<T> a;
// 6. 如果uniApply执行成功,则会进到下面的postFire调用,
// 否则return null,也就是tryFire失败了,就要等待以后的主动complete来再次触发
if ((d = dep) == null ||
!d.uniApply(a = src, fn, mode > 0 ? null : this))
return null;
// 9. tryFire成功后,会把以下几个属性设为null,代表此Completion已经完成任务,变成dead状态
dep = null; src = null; fn = null;
return d.postFire(a, mode);
}
}
final <S> boolean uniApply(CompletableFuture<S> a,
Function<? super S,? extends T> f,
UniApply<S,T> c) {
Object r; Throwable x;
// 7. 如果a(也就是c中的src)还没有完成,那result是空,这里就会直接返回false
if (a == null || (r = a.result) == null || f == null)
return false;
tryComplete: if (result == null) {
if (r instanceof AltResult) {
if ((x = ((AltResult)r).ex) != null) {
completeThrowable(x, r);
break tryComplete;
}
r = null;
}
try {
if (c != null && !c.claim())
return false;
@SuppressWarnings("unchecked") S s = (S) r;
// 8. 如果r不为空,则会作为f的输入参数,f的输出则成为当前CompletableFuture的完成值。
// 通常能走到这里的话,就会呈链式反应一直传递下去。
completeValue(f.apply(s));
} catch (Throwable ex) {
completeThrowable(ex);
}
}
return true;
}
为方便阅读,关键调用的解释我都写在注释上了。
注释6提到如果tryFire失败则要等以后的主动complete来再次触发,我们来看下这个过程是怎样的。
public boolean complete(T value) {
boolean triggered = completeValue(value);
postComplete();
return triggered;
}
/**
* Pops and tries to trigger all reachable dependents. Call only
* when known to be done.
*/
final void postComplete() {
/*
* On each step, variable f holds current dependents to pop
* and run. It is extended along only one path at a time,
* pushing others to avoid unbounded recursion.
*/
CompletableFuture<?> f = this; Completion h;
while ((h = f.stack) != null ||
(f != this && (h = (f = this).stack) != null)) {
CompletableFuture<?> d; Completion t;
if (f.casStack(h, t = h.next)) {
if (t != null) {
if (f != this) {
pushStack(h);
continue;
}
h.next = null; // detach
}
f = (d = h.tryFire(NESTED)) == null ? this : d;
}
}
}
postComplete()方法官方已经给了一些注释,代码还是比较清晰的,链式调用的核心就在postComplete()方法的while循环中。可以通过几个示意图来更好的理解这个过程。
调用与内部执行步骤拆解
常见的链式调用对象关系如下:
其实每次调用都会new一个Completion对象,并压入上一个CompletableFuture的stack中。所以,通常的base.thenApply(..).thenApply(..),每次调用产生的Completion并不在同一个stack中哦。
来个复杂一些的:
CompletableFuture<String> base = new CompletableFuture<>();
CompletableFuture<String> future =
base.thenApply(
s -> {
log.info("2");
return s + " 2";
});
base.thenAccept(s -> log.info(s+"a")).thenAccept(aVoid -> log.info("b"));
base.thenAccept(s -> log.info(s+"c")).thenAccept(aVoid -> log.info("d"));
base.complete("1");
log.info("base result: {}", base.get());
log.info("future result: {}", future.get());
执行到第7行后,对象关系如下图:
第8行后:
第9行后:
至此,整个对象关系如同一个执行计划,等待着base的complete那一刻。
我们再来分解下第10行的执行步骤:
- base.complete("1")后base里的result属性会变成1
- 取base中stack(对象1)执行,出栈
- 取对象1中dep属性的stack(对象2)执行,出栈
- 取base中stack(对象3)执行,出栈
- 取对象3中dep属性的stack(对象4)执行,出栈
- 取base中stack(对象5)执行,出栈
通用执行顺序示意图
base的stack(对象2、1、0)和它下面那些dep中的stack执行上顺序正好是相反的,暂且称base的stack为主stack吧,我们来画一张更通用的关系来重点看下stack:
先执行base的栈顶Completion 2,成功后出栈。然后会检查Completion 2中dep的stack,只要没到栈底,则会取出栈顶压入base的stack中,该图则把Completion 8、7分别压到base的stack中,然后执行栈底的Completion 6
重复这个过程,执行base的栈顶Completion 7,由于Completion 7的dep的stack为空,则直接出栈即可。接着Completion 8会被执行。
接下来处理Completion 1的过程和之前类似。
最终的执行顺序是base,2,6,7,8,1,3,4,5,0
更多实例
持久化后加钩子
public static void main(String[] args) {
Main m = new Main();
int id = 101;
String content = "string content.";
// String content = "illegal";
CompletableFuture<String> creation = m.create(id, content);
log.info(creation.get());
}
public <I, R> CompletableFuture<R> create(I id, R resource) {
ExecutorService listenersPool =
Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder()
.setNameFormat("listener-executor-%d")
// .setDaemon(true)
.build());
Map<I, R> map = new HashMap<>();
CompletableFuture<R> creation =
doCreate(id, resource)
.exceptionally(translateThrowablesIfPresent(id))
.thenApply(
ignored -> {
// 执行到这里表示前面的create已经完成了,可以做一些其他事,比如加到map缓存起来
map.put(id, resource);
return resource;
});
// 至此,creation对象已经可以返回用于其他业务场景,但我们有一些Listeners需要在create之后触发
creation.thenAcceptAsync(this::applyAfterCreationListeners, listenersPool);
return creation;
}
private <I, R> CompletableFuture<Void> doCreate(I id, R resource) {
// 这里通过resource内容模拟了2个异常,否则返回一个completedFuture
// 当然你也可以改成incompleted的,然后自己在以后主动complete
if (resource.equals("error")) {
throw new RuntimeException("error");
} else if (resource.equals("illegal")) {
throw new IllegalArgumentException("illegal args");
}
return CompletableFuture.completedFuture(null);
}
private <U extends Throwable, T, I> Function<U, T> translateThrowablesIfPresent(I id) {
return throwable -> {
Throwable cause = Throwables.getRootCause(throwable);
if (cause instanceof IllegalArgumentException) {
throw new IllegalArgumentException(throwable.getMessage());
}
throw Throwables.propagate(cause);
};
}
private <R> void applyAfterCreationListeners(R created) {
log.info("invoking a listener after creation, for resource {}", created);
}