CompletableFuture 类图结构
CompletionStage接口
CompletableFuture实现了CompletionStage接口 。
1)一个CompletionStage代表着一个异步计算节点,当另外一个CompletionStage计算节点完成后,当前CompletionStage会执行或者计算一个值;一个节点在计算终止时完成,可能反过来触发其他依赖其结果的节点开始计算。
2)一个节点(CompletionStage)的计算执行可以被表述为一个函数、消费者、可执行的Runable(例如使用apply、accept、run方法),
具体取决于这个节点是否需要参数或者产生结果。例如:
stage.thenApply(x -> square(x))//计算平方和 .thenAccept(x -> System.out.print(x))//输出计算结果 .thenRun(() -> System.out.println());//然后执行异步任务
3)CompletionStage节点可以使用3种模式来执行:默认执行、默认异步执行(使用async后缀的方法)和用户自定义的线程执行器执行(通过传递一个Executor方式)。
4)一个节点的执行可以通过一到两个节点的执行完成来触发。一个节点依赖的其他节点通常使用then前缀的方法来进行组织。
属性
result
volatile Object result; // Either the result or boxed AltResult
result字段用来存放任务执行的结果,如果不为null,则标识任务已经执行完成。而计算任务本身也可能需要返回null值,所以使用AltResult(如下代码)来包装计算任务返回null的情况(ex等于null的时候),AltResult也被用来存放当任务执行出现异常时候的异常信息(ex不为null的时候):
static final class AltResult { // See above final Throwable ex; // null only for NIL AltResult(Throwable x) { this.ex = x; } }
stack
volatile Completion stack; // Top of Treiber stack of dependent actions
stack字段是当前任务执行完毕后要触发的一系列行为的入口,由于一个任务执行后可以触发多个行为,所以所有行为被组织成一个链表结构,并且使用Treiber stack实现了无锁基于CAS的链式栈,其中stack存放栈顶行为节点,stack是Completion类型的,定义如下所示。
abstract static class Completion extends ForkJoinTask<Void> implements Runnable, AsynchronousCompletionTask { volatile Completion next; // Treiber stack下一个节点 ... }
asyncPool
/** * Default executor -- ForkJoinPool.commonPool() unless it cannot * support parallelism. */ private static final Executor asyncPool = useCommonPool ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
asyncPool是用来执行异步任务的线程池,如果支持并发则默认为Fork-JoinPool.commonPool()
,否则是ThreadPerTaskExecutor
。
方法
CompletableFuture<Void>runAsync(Runnable runnable)
该方法返回一个新的CompletableFuture对象,其结果值会在给定的runnable行为使用ForkJoinPool.commonPool()异步执行完毕后被设置为null,代码如下所示。
public static CompletableFuture<Void> runAsync(Runnable runnable) { return asyncRunStage(asyncPool, runnable); }
如上代码中,默认情况下asyncPool为ForkJoinPool.commonPool(),其中asyncRunStage代码如下所示。
static CompletableFuture<Void> asyncRunStage(Executor e, Runnable f) { //1.任务或者行为为null,则抛出NPE异常 if (f == null) throw new NullPointerException(); //2.创建一个future对象 CompletableFuture<Void> d = new CompletableFuture<Void>(); //3.包装f和d为异步任务后,投递到线程池执行 e.execute(new AsyncRun(d, f)); //4.返回创建的future对象 return d; }
代码1判断行为是否为null,如果是则抛出异常。
代码2创建一个CompletableFuture对象。
代码3首先创建一个AsyncRun任务,里面保存了创建的future对象和要执行的行为,然后投递到ForkJoinPool.commonPool()线程池执行。
代码4直接返回创建的CompletableFuture对象。
可知runAsync方法会马上返回一个CompletableFuture对象,并且当前线程不会被阻塞;代码3投递AsyncRun任务到线程池后,线程池线程会执行其run方法。
下面我们看看在AsyncRun中是如何执行我们设置的行为,并把结果设置到创建的future对象中的。
static final class AsyncRun extends ForkJoinTask<Void> implements Runnable, AsynchronousCompletionTask { CompletableFuture<Void> dep; Runnable fn; //保存创建的future和要执行的行为 AsyncRun(CompletableFuture<Void> dep, Runnable fn) { this.dep = dep; this.fn = fn; } ... public void run() { CompletableFuture<Void> d; Runnable f; if ((d = dep) != null && (f = fn) != null) { dep = null; fn = null; //5.如果future的result等于null,说明任务还没完成 if (d.result == null) { try { //5.1执行传递的行为 f.run(); //5.2设置future的结果为null d.completeNull(); } catch (Throwable ex) { d.completeThrowable(ex); } } //6弹出当前future中依赖当前结果的行为并执行 d.postComplete(); } } }
这里代码5如果发现future的result不为null,说明当前future还没开始执行,则代码5.1执行我们传递的runnable方法,然后执行代码5.2将future对象的结果设置为null,这时候其他因调用future的get()方法而被阻塞的线程就会从get()处返回null。
当代码6的future任务结束后,看看其stack栈里面是否有依赖其结果的行为,如果有则从栈中弹出来,并执行。
其实上面代码中的runAsync实现可以用我们自己编写的简单代码来模拟。
public static CompletableFuture runAsync(Runnable runnable) { CompletableFuture<String> future = new CompletableFuture<String>(); // 2.开启线程计算任务结果,并设置 POOL_EXECUTOR.execute(() -> { // 2.1模拟任务计算 try { runnable.run(); future.complete(null); } catch (Exception e) { future.completeExceptionally(e); } }); return future; }
CompletableFuture<U> supplyAsync(Supplier<U>supplier)
该方法返回一个新的CompletableFuture对象,其结果值为入参supplier行为执行的结果,代码如下所示。
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) { return asyncSupplyStage(asyncPool, supplier); }
static <U> CompletableFuture<U> asyncSupplyStage(Executor e, Supplier<U> f) { if (f == null) throw new NullPointerException(); CompletableFuture<U> d = new CompletableFuture<U>(); e.execute(new AsyncSupply<U>(d, f)); return d; }
如上代码与runAsync类似,不同点在于,其提交到线程池的是AsyncSupply类型的任务,下面我们来看其代码。
static final class AsyncSupply<T> extends ForkJoinTask<Void> implements Runnable, AsynchronousCompletionTask { CompletableFuture<T> dep; Supplier<T> fn; AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) { this.dep = dep; this.fn = fn; } ... public void run() { CompletableFuture<T> d; Supplier<T> f; if ((d = dep) != null && (f = fn) != null) { dep = null; fn = null; //1.如果future的result等于null,说明任务还没完成 if (d.result == null) { try { //1.1 f.get()执行行为f的方法,并获取结果 //1.2 把f.get()执行结果设置到future对象 d.completeValue(f.get()); } catch (Throwable ex) { d.completeThrowable(ex); } } //2.弹出当前future中依赖当前结果的行为并执行 d.postComplete(); } } }
如上代码与runAsync的不同点在于,这里的行为方法是Supplier,其get()方法有返回值,且返回值会被设置到future中,然后调用future的get()方法的线程就会获取到该值。
CompletableFuture<U> supplyAsync(Supplier<U>supplier,Executor executor)
该方法返回一个新的CompletableFuture对象,其结果值为入参supplier行为执行的结果,需要注意的是,supplier行为的执行不再是ForkJoinPool.commonPool(),而是业务自己传递的executor,其代码如下所示。
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) { return asyncSupplyStage(screenExecutor(executor), supplier); } static Executor screenExecutor(Executor e) { //如果使用commonpool并且传递的e本身就是commonpool if (!useCommonPool && e == ForkJoinPool.commonPool()) return asyncPool; //如果传递的线程池为null,则抛出NPE异常 if (e == null) throw new NullPointerException(); //返回业务传递的线程池e return e; }
如上代码通过使用screenExecutor方法来判断传入的线程池是否是一个可用的线程池,如果不是则抛出异常。