CompletableFuture异步编排
什么是CompletableFuture#
CompletableFuture是JDK8提供的Future增强类。CompletableFuture异步任务执行线程池,默认是把异步任务都放在ForkJoinPool中执行。
在这种方式中,主线程不会被阻塞,不需要一直等到子线程完成。主线程可以并行的执行其他任务。
Future存在的问题#
Future实际采用FutureTask实现,该对象相当于是消费者和生产者的桥梁,消费者通过 FutureTask 存储任务的处理结果,更新任务的状态:未开始、正在处理、已完成等。而生产者拿到的 FutureTask 被转型为 Future 接口,可以阻塞式获取任务的处理结果,非阻塞式获取任务处理状态。
使用#
runAsync 和 supplyAsync方法#
CompletableFuture 提供了四个静态方法来创建一个异步操作。
Copy
public static CompletableFuture runAsync(Runnable runnable)
public static CompletableFuture runAsync(Runnable runnable, Executor executor)
public static CompletableFuture supplyAsync(Supplier supplier)
public static CompletableFuture supplyAsync(Supplier supplier, Executor executor)
没有指定Executor的方法会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码。如果指定线程池,则使用指定的线程池运行。以下所有的方法都类同。
runAsync方法不支持返回值。
supplyAsync可以支持返回值。
计算完成时回调方法#
当CompletableFuture的计算结果完成,或者抛出异常的时候,可以执行特定的Action。主要是下面的方法:
Copy
public CompletableFuture whenComplete(BiConsumer<? super T,? super Throwable> action);
public CompletableFuture whenCompleteAsync(BiConsumer<? super T,? super Throwable> action);
public CompletableFuture whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor);
public CompletableFuture exceptionally(Function fn);
whenComplete可以处理正常和异常的计算结果,exceptionally处理异常情况。BiConsumer<? super T,? super Throwable>可以定义处理业务
whenComplete 和 whenCompleteAsync 的区别:
whenComplete:是执行当前任务的线程执行继续执行 whenComplete 的任务。
whenCompleteAsync:是执行把 whenCompleteAsync 这个任务继续提交给线程池来进行执行。
方法不以Async结尾,意味着Action使用相同的线程执行,而Async可能会使用其他线程执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)
代码示例:
Copy
public class CompletableFutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture future = CompletableFuture.supplyAsync(new Supplier<Object>() {
@Override
public Object get() {
System.out.println(Thread.currentThread().getName() + "\t completableFuture");
int i = 10 / 0;
return 1024;
}
}).whenComplete(new BiConsumer<Object, Throwable>() {
@Override
public void accept(Object o, Throwable throwable) {
System.out.println("-------o=" + o.toString());
System.out.println("-------throwable=" + throwable);
}
}).exceptionally(new Function<Throwable, Object>() {
@Override
public Object apply(Throwable throwable) {
System.out.println("throwable=" + throwable);
return 6666;
}
});
System.out.println(future.get());
}
}
handle 方法#
handle 是执行任务完成时对结果的处理。
handle 是在任务完成后再执行,还可以处理异常的任务。
Copy
public CompletionStage handle(BiFunction<? super T, Throwable, ? extends U> fn);
public CompletionStage handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public CompletionStage handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);
线程串行化方法#
thenApply 方法:当一个线程依赖另一个线程时,获取上一个任务返回的结果,并返回当前任务的返回值。
thenAccept方法:消费处理结果。接收任务的处理结果,并消费处理,无返回结果。
thenRun方法:只要上面的任务执行完成,就开始执行thenRun,只是处理完任务后,执行 thenRun的后续操作
带有Async默认是异步执行的。这里所谓的异步指的是不在当前线程内执行。
Copy
public CompletableFuture thenApply(Function<? super T,? extends U> fn)
public CompletableFuture thenApplyAsync(Function<? super T,? extends U> fn)
public CompletableFuture thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
public CompletionStage thenAccept(Consumer<? super T> action);
public CompletionStage thenAcceptAsync(Consumer<? super T> action);
public CompletionStage thenAcceptAsync(Consumer<? super T> action,Executor executor);
public CompletionStage thenRun(Runnable action);
public CompletionStage thenRunAsync(Runnable action);
public CompletionStage thenRunAsync(Runnable action,Executor executor);
Function<? super T,? extends U>
T:上一个任务返回结果的类型
U:当前任务的返回值类型
代码演示:
Copy
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
System.out.println(Thread.currentThread().getName() + "\t completableFuture");
//int i = 10 / 0;
return 1024;
}
}).thenApply(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer o) {
System.out.println("thenApply方法,上次返回结果:" + o);
return o * 2;
}
}).whenComplete(new BiConsumer<Integer, Throwable>() {
@Override
public void accept(Integer o, Throwable throwable) {
System.out.println("-------o=" + o);
System.out.println("-------throwable=" + throwable);
}
}).exceptionally(new Function<Throwable, Integer>() {
@Override
public Integer apply(Throwable throwable) {
System.out.println("throwable=" + throwable);
return 6666;
}
}).handle(new BiFunction<Integer, Throwable, Integer>() {
@Override
public Integer apply(Integer integer, Throwable throwable) {
System.out.println("handle o=" + integer);
System.out.println("handle throwable=" + throwable);
return 8888;
}
});
System.out.println(future.get());
}
两任务组合 - 都要完成#
两个任务必须都完成,触发该任务。
thenCombine:组合两个future,获取两个future的返回结果,并返回当前任务的返回值
thenAcceptBoth:组合两个future,获取两个future任务的返回结果,然后处理任务,没有返回值。
runAfterBoth:组合两个future,不需要获取future的结果,只需两个future处理完任务后,处理该任务。
Copy
public CompletableFuture thenCombine(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn);
public CompletableFuture thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn);
public CompletableFuture thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn, Executor executor);
public CompletableFuture thenAcceptBoth(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action);
public CompletableFuture thenAcceptBothAsync(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action);
public CompletableFuture thenAcceptBothAsync(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action, Executor executor);
public CompletableFuture runAfterBoth(CompletionStage<?> other,
Runnable action);
public CompletableFuture runAfterBothAsync(CompletionStage<?> other,
Runnable action);
public CompletableFuture runAfterBothAsync(CompletionStage<?> other,
Runnable action,
Executor executor);
测试案例:
Copy
public static void main(String[] args) {
CompletableFuture.supplyAsync(() -> {
return "hello";
}).thenApplyAsync(t -> {
return t + " world!";
}).thenCombineAsync(CompletableFuture.completedFuture(" CompletableFuture"), (t, u) -> {
return t + u;
}).whenComplete((t, u) -> {
System.out.println(t);
});
}
输出:hello world! CompletableFuture
两任务组合 - 一个完成#
当两个任务中,任意一个future任务完成的时候,执行任务。
applyToEither:两个任务有一个执行完成,获取它的返回值,处理任务并有新的返回值。
acceptEither:两个任务有一个执行完成,获取它的返回值,处理任务,没有新的返回值。
runAfterEither:两个任务有一个执行完成,不需要获取future的结果,处理任务,也没有返回值。
Copy
public CompletableFuture applyToEither(
CompletionStage<? extends T> other, Function<? super T, U> fn);
public CompletableFuture applyToEitherAsync(
CompletionStage<? extends T> other, Function<? super T, U> fn);
public CompletableFuture applyToEitherAsync(
CompletionStage<? extends T> other, Function<? super T, U> fn,
Executor executor);
public CompletableFuture acceptEither(
CompletionStage<? extends T> other, Consumer<? super T> action);
public CompletableFuture acceptEitherAsync(
CompletionStage<? extends T> other, Consumer<? super T> action);
public CompletableFuture acceptEitherAsync(
CompletionStage<? extends T> other, Consumer<? super T> action,
Executor executor);
public CompletableFuture runAfterEither(CompletionStage<?> other,
Runnable action);
public CompletableFuture runAfterEitherAsync(CompletionStage<?> other,
Runnable action);
public CompletableFuture runAfterEitherAsync(CompletionStage<?> other,
Runnable action,
Executor executor);
多任务组合#
Copy
public static CompletableFuture allOf(CompletableFuture<?>... cfs);
public static CompletableFuture
anyOf:只要有一个任务完成
Copy
public static void main(String[] args) {
List<CompletableFuture> futures = Arrays.asList(CompletableFuture.completedFuture("hello"),
CompletableFuture.completedFuture(" world!"),
CompletableFuture.completedFuture(" hello"),
CompletableFuture.completedFuture("java!"));
final CompletableFuture<Void> allCompleted = CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{}));
allCompleted.thenRun(() -> {
futures.stream().forEach(future -> {
try {
System.out.println("get future at:"+System.currentTimeMillis()+", result:"+future.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
});
}
作者: ingxx