任务串行执行
CompletableFuture也支持任务串行执行,后面的任务依赖前面任务的执行结果。我们再举一个做果汁的例子,我们分串行三步: 洗水果 —> 切水果 -> 榨汁,看如下代码
public static void main(String[] args) throws InterruptedException { ExecutorService executor = MyThreadPoolExecutor.getThreadPoolExecutor(); CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> washFruit(), executor); CompletableFuture<String> future2 = future1.thenApplyAsync(r -> StringUtils.isBlank(r) ? null : cutFruit()); future2.thenApplyAsync(r -> StringUtils.isBlank(r) ? null : juicing()); //为了主线程不立刻退出,以便查看结果 Thread.sleep(100); } private static String washFruit() { System.out.println(Thread.currentThread().getName()); System.out.println("洗水果"); return "洗水果"; } private static String cutFruit() { System.out.println(Thread.currentThread().getName()); System.out.println("切水果"); return "切水果"; } private static String juicing() { System.out.println(Thread.currentThread().getName()); System.out.println("榨汁"); return "榨汁"; }
上面main函数执行后输出:
pool-1-thread-1 洗水果 ForkJoinPool.commonPool-worker-1 切水果 ForkJoinPool.commonPool-worker-1 榨汁
上面的代码就是一个串行执行的任务,这儿除了thenApply,还有thenAccept(Consumer<? super T> action)和thenRun(Runnable action),这2个方法都不返回执行结果。
注意:上面方法中,thenApply、thenAccept、thenRun都有一个对应的Async方法,区别在于Async方法会从线程池中拿线程执行,而不带Async的方法在当前线程执行。所以如果上面代码中thenApplyAsync换成thenApply,执行结果如下:
pool-1-thread-1 洗水果 pool-1-thread-1 切水果 pool-1-thread-1 榨汁
结果组合运算
thenCombine和thenCompose
thenCombine用于组合2个CompletableFuture,对结果进行运算,下面2段代码都是输出110
ExecutorService executor = MyThreadPoolExecutor.getThreadPoolExecutor(); CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 10, executor); CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 100, executor); CompletableFuture<Integer> future3 = future1.thenCombineAsync(future2, (x, y) -> x + y); System.out.println(future3.get());
thenCompose把第一个CompletableFuture的结果放到第二个CompletableFuture中进行运算
ExecutorService executor = MyThreadPoolExecutor.getThreadPoolExecutor(); CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 10, executor); CompletableFuture<Integer> future2 = future1.thenCompose(r -> CompletableFuture.supplyAsync(() -> r + 100)); System.out.println(future2.get());
上面的组合方法其实用上一节讲的串行执行也可以完成,见如下代码
ExecutorService executor = MyThreadPoolExecutor.getThreadPoolExecutor(); CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 10, executor); CompletableFuture<Integer> future2 = future1.thenApplyAsync(r -> r + 100); System.out.println(future2.get());
thenAcceptBoth和runAfterBoth
thenAcceptBoth用于对前面2个线程的结果进行组合运算,下面代码输出110
ExecutorService executor = MyThreadPoolExecutor.getThreadPoolExecutor(); CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 10, executor); CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 100, executor); CompletableFuture<Void> future3 = future1.thenAcceptBoth(future2, (x, y) -> System.out.println(x + y));
runAfterBoth用于等待前面2个线程之后执行第三个线程
ExecutorService executor = MyThreadPoolExecutor.getThreadPoolExecutor(); CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> System.out.println("线程1"), executor); CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> System.out.println("线程2"), executor); CompletableFuture<Void> future3 = future1.runAfterBothAsync(future2, () -> System.out.println("线程3"));
上面代码输出:
线程1 线程2 线程3
acceptEither、runAfterEither和applyToEither
这三个方法只取组合线程中执行最快的一个结果,看下面代码:
public static void main(String[] args) { CompletableFuture<String> f1 = CompletableFuture.supplyAsync(()-> getTask1()); CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> getTask2()); CompletableFuture<String> f3 = f1.applyToEither(f2,s -> s); System.out.println(f3.join());//输出task2 CompletableFuture<String> f4 = CompletableFuture.supplyAsync(()-> getTask1()); CompletableFuture<String> f5 = CompletableFuture.supplyAsync(() -> getTask2()); CompletableFuture<Void> f6 = f4.runAfterEither(f5, () -> System.out.println("task3")); f6.join();//输出task3 CompletableFuture<String> f7 = CompletableFuture.supplyAsync(()-> getTask1()); CompletableFuture<String> f8 = CompletableFuture.supplyAsync(() -> getTask2()); CompletableFuture<Void> f9 = f8.acceptEither(f7, s -> System.out.println(s)); f9.join();//输出task2 } private static String getTask1(){ try { Thread.currentThread().sleep(2000); return "task1"; } catch (InterruptedException e) { e.printStackTrace(); return null; } } private static String getTask2(){ try { Thread.currentThread().sleep(1000); return "task2"; } catch (InterruptedException e) { e.printStackTrace(); return null; } }
同样注意:上面7个方法都存在对应的Async方法,会从线程池中取线程来执行。
java9的改进
1.可以设置超时时间,超时后给一个默认值,比如下面代码输出100
ExecutorService executor = MyThreadPoolExecutor.getThreadPoolExecutor(); CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> getNum(), executor); future1.completeOnTimeout(100, 3000, TimeUnit.MILLISECONDS); System.out.println(future1.get());
2.增加failedFuture跟之前的completedFuture配对,前者创建一个指定异常的CompletableFuture,后者创建一个指定给定值的CompletableFuture。
public static <U> CompletableFuture<U> failedFuture(Throwable ex) public static <U> CompletableFuture<U> completedFuture(U ,value)
3.增加了completedStage和failedStage,这2个方法返回CompletableFuture的继承类MinimalStage
public static <U> CompletionStage<U> completedStage(U value) public static <U> CompletionStage<U> failedStage(Throwable ex)
4.增加了defaultExecutor和newIncompleteFuture,可以让子类自己去实现
public Executor defaultExecutor() public <U> CompletableFuture<U> newIncompleteFuture()
总结
CompletableFuture类对多线程调度的支持还是挺强大的,本文主要介绍了一些常用的方法,对于其他方法,大家可以查看api或者CompletionStage接口中定义的方法选择使用。