辅助方法 allOf 和 anyOf
前面我们已经介绍了几个静态方法:completedFuture、runAsync、supplyAsync,下面介绍的这两个方法用来组合多个CompletableFuture。
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
allOf方法是当所有的CompletableFuture都执行完后执行计算。
anyOf方法是当任意一个CompletableFuture执行完后就会执行计算,计算的结果返回
但是anyOf和applyToEither不同。anyOf接受任意多的CompletableFuture但是applyToEither只是判断两个CompletableFuture,anyOf返回值的计算结果是参数中其中一个CompletableFuture的计算结果,applyToEither返回值的计算结果却是要经过fn处理的。当然还有静态方法的区别,线程池的选择等
public static void main(String[] args) { Random rand = new Random(); CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(10000 + rand.nextInt(1000)); } catch (InterruptedException e) { e.printStackTrace(); } return 100; }); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(10000 + rand.nextInt(1000)); } catch (InterruptedException e) { e.printStackTrace(); } return "abc"; }); //CompletableFuture<Void> f = CompletableFuture.allOf(future1,future2); CompletableFuture<Object> f = CompletableFuture.anyOf(future1, future2); System.out.println(f.join()); }
whenComplete
略
我想通过上面的介绍,应该把CompletableFuture的方法和功能介绍完了(cancel、isCompletedExceptionally()、isDone()以及继承于Object的方法无需介绍了, toCompletableFuture()返回CompletableFuture本身)
希望你能全面了解CompletableFuture强大的功能,并将它应用到Java的异步编程中。如果你有使用它的开源项目,可以留言分享一下。
CompletableFuture的同步回调和异步回调
CompletableFuture 根据任务的主从关系为:
提交任务的方法,如静态方法 supplyAsync(supplier[, executor]), runAsync(runnable[, executor])
回调函数,即对任务执行后所作出回应的方法,多数方法了,如 thenRun(action), thenRunAsync(action[, executor]), whenComplete(action), whenCompleteAsync(action[, executor]) 等
- 注意:每个方法都有带Async后缀和不带此后缀的方法(下面会说区别)
根据执行方法可分为同步与异步方法,任务都是要被异步执行(请注意:任务的执行都是异步的)。所以提交任务的方法都是异步的。而对任务作出回应的方法很多分为两个版本,如
同步回应,如 thenRun(action), whenComplete(action)
异步回应,如 thenRunAsync(action[, executor]), whenCompleteAsync(action[, executor]), 异步方法可以传入线程池,否则用默认的。可参考:理解 CompletableFuture 的任务与回调函数的线程 和 CompletableFuture的async后缀函数与不带async的函数的区别
更上一层楼(手写sequence方法)
如果你用过Guava的Future类,你就会知道它的Futures辅助类提供了很多便利方法,用来处理多个Future,而不像Java的CompletableFuture,只提供了allOf、anyOf两个方法。
比如有这样一个需求,将多个CompletableFuture组合成一个CompletableFuture,这个组合后的CompletableFuture的计算结果是个List,它包含前面所有的CompletableFuture的计算结果,guava的Futures.allAsList可以实现这样的功能,但是对于java CompletableFuture,我们需要一些辅助方法:
/** * 可以把多个futures序列化起来 最终返回一个装载有结果的CompletableFuture即可 调用join方法就够了 * 当然只能是同一类型哦(返回的结果) * * @param <T> the type parameter * @param futures the futures * @return the completable future */ public static <T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> futures) { //通过allOf方法把所有的futures放到一起 返回Void CompletableFuture<Void> allDoneFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])); //遍历把每一个futures通过join方法把结果拿到 最终给返回出去 并且是用CompletableFuture包装着的 return allDoneFuture.thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.<T>toList())); }
或者Java Future转CompletableFuture:
public static <T> CompletableFuture<T> toCompletable(Future<T> future, Executor executor) { return CompletableFuture.supplyAsync(() -> { try { return future.get(); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); } }, executor); }
使用案例:
public static void main(String[] args) { CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } return 100; }); CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } return 200; }); CompletableFuture<List<Integer>> resultList = sequence(Arrays.asList(future1, future2)); System.out.println(resultList.join()); //[100, 200] }
事实上,如果每个操作都很简单的话(比如:我们仅仅是getById()这种查询)没有必要用这种多线程异步的方式,因为创建线程还需要时间,还不如直接同步执行来得快。
事实证明,只有当每个操作很复杂需要花费相对很长的时间(比如,调用多个其它的系统的接口;比如,商品详情页面这种需要从多个系统中查数据显示的)的时候用CompletableFuture才合适,不然区别真的不大,还不如顺序同步执行。