CompletableFuture<String> comboText = CompletableFuture.supplyAsync(() -> { //可以注释掉做快速返回 start try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { throw new IllegalStateException(e); } log.info("👍"); //可以注释掉做快速返回 end return "赞"; }) .thenApply(first -> { log.info("在看"); return first + ", 在看"; }) .thenApply(second -> second + ", 转发"); log.info("三连有没有?"); log.info(comboText.get());
对 thenApply 的调用并没有阻塞程序打印log,也就是前面说的通过回调通知机制, 这里你看到 thenApply 使用的是supplyAsync所用的线程,如果将supplyAsync 做快速返回,我们再来看一下运行结果:
thenApply 此时使用的是主线程,所以:
串行的后续操作并不一定会和前序操作使用同一个线程
thenAccept
如果你不想从回调函数中返回任何结果,那可以使用 thenAccept
final CompletableFuture<Void> voidCompletableFuture = CompletableFuture.supplyAsync( // 模拟远端API调用,这里只返回了一个构造的对象 () -> Product.builder().id(12345L).name("颈椎/腰椎治疗仪").build()) .thenAccept(product -> { log.info("获取到远程API产品名称 " + product.getName()); }); voidCompletableFuture.get();
thenRun
thenAccept
可以从回调函数中获取前序执行的结果,但thenRun 却不可以,因为它的回调函数式表达式定义中没有任何参数
CompletableFuture.supplyAsync(() -> { //前序操作 }).thenRun(() -> { //串行的后需操作,无参数也无返回值 });
我们前面同样说过了,每个提供回调方法的函数都有两个异步(Async)变体,异步就是另外起一个线程
CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> { log.info("前序操作"); return "前需操作结果"; }).thenApplyAsync(result -> { log.info("后续操作"); return "后续操作结果"; });
到这里,相信你串行的操作你已经非常熟练了
thenCompose
日常的任务中,通常定义的方法都会返回 CompletableFuture 类型,这样会给后续操作留有更多的余地,假如有这样的业务(X呗是不是都有这样的业务呢?):
//获取用户信息详情 CompletableFuture<User> getUsersDetail(String userId) { return CompletableFuture.supplyAsync(() -> User.builder().id(12345L).name("日拱一兵").build()); } //获取用户信用评级 CompletableFuture<Double> getCreditRating(User user) { return CompletableFuture.supplyAsync(() -> CreditRating.builder().rating(7.5).build().getRating()); }
这时,如果我们还是使用 thenApply() 方法来描述串行关系,返回的结果就会发生 CompletableFuture 的嵌套
CompletableFuture<CompletableFuture<Double>> result = completableFutureCompose.getUsersDetail(12345L) .thenApply(user -> completableFutureCompose.getCreditRating(user));
显然这不是我们想要的,如果想“拍平” 返回结果,thenCompose 方法就派上用场了
CompletableFuture<Double> result = completableFutureCompose.getUsersDetail(12345L) .thenCompose(user -> completableFutureCompose.getCreditRating(user));
这个和 Lambda 的map 和 flatMap 的道理是一样一样滴
thenCombine
如果要聚合两个独立 Future 的结果,那么 thenCombine 就会派上用场了
CompletableFuture<Double> weightFuture = CompletableFuture.supplyAsync(() -> 65.0); CompletableFuture<Double> heightFuture = CompletableFuture.supplyAsync(() -> 183.8); CompletableFuture<Double> combinedFuture = weightFuture .thenCombine(heightFuture, (weight, height) -> { Double heightInMeter = height/100; return weight/(heightInMeter*heightInMeter); }); log.info("身体BMI指标 - " + combinedFuture.get());
当然这里多数时处理两个 Future 的关系,如果超过两个Future,如何处理他们的一些聚合关系呢?
allOf | anyOf
相信你看到方法的签名,你已经明白他的用处了,这里就不再介绍了
Integer age = -1; CompletableFuture<String> maturityFuture = CompletableFuture.supplyAsync(() -> { if( age < 0 ) { throw new IllegalArgumentException("何方神圣?"); } if(age > 18) { return "大家都是成年人"; } else { return "未成年禁止入内"; } }).thenApply((str) -> { log.info("游戏开始"); return str; }).exceptionally(ex -> { log.info("必有蹊跷,来者" + ex.getMessage()); return "Unknown!"; }); log.info(maturityFuture.get());
exceptionally 就相当于 catch,出现异常,将会跳过 thenApply 的后续操作,直接捕获异常,进行一场处理
handle
用多线程,良好的习惯是使用 try/finally 范式,handle 就可以起到 finally 的作用,对上述程序做一个小小的更改, handle 接受两个参数,一个是正常返回值,一个是异常
注意:handle的写法也算是范式的一种
Integer age = -1; CompletableFuture<String> maturityFuture = CompletableFuture.supplyAsync(() -> { if( age < 0 ) { throw new IllegalArgumentException("何方神圣?"); } if(age > 18) { return "大家都是成年人"; } else { return "未成年禁止入内"; } }).thenApply((str) -> { log.info("游戏开始"); return str; }).handle((res, ex) -> { if(ex != null) { log.info("必有蹊跷,来者" + ex.getMessage()); return "Unknown!"; } return res; }); log.info(maturityFuture.get());
到这里,关于 CompletableFuture
的基本使用你已经了解的差不多了,不知道你是否注意,我们前面说的带有 Sync 的方法是单独起一个线程来执行,但是我们并没有创建线程,这是怎么实现的呢?
细心的朋友如果仔细看每个变种函数的第三个方法也许会发现里面都有一个 Executor 类型的参数,用于指定线程池,因为实际业务中我们是严谨手动创建线程的,这在 我会手动创建线程,为什么要使用线程池?文章中明确说明过;如果没有指定线程池,那自然就会有一个默认的线程池,也就是 ForkJoinPool
private static final Executor ASYNC_POOL = USE_COMMON_POOL ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
ForkJoinPool 的线程数默认是 CPU 的核心数。但是,在前序文章中明确说明过:
不要所有业务共用一个线程池,因为,一旦有任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能
总结
CompletableFuture
的方法并没有全部介绍完全,也没必要全部介绍,相信大家按照这个思路来理解 CompletableFuture
也不会有什么大问题了,剩下的就交给实践/时间
以及自己的体会了
后记
你以为 JDK1.8 CompletableFuture 已经很完美了是不是,但追去完美的道路上永无止境,Java 9 对CompletableFuture 又做了部分升级和改造
- 添加了新的工厂方法
- 支持延迟和超时处理
orTimeout() completeOnTimeout()
- 改进了对子类的支持
详情可以查看: Java 9 CompletableFuture API Improvements. 怎样快速的切换不同 Java 版本来尝鲜?SDKMAN 统一灵活管理多版本Java 这篇文章的方法送给你
最后咱们再泡一壶茶,感受一下新变化吧
灵魂追问
- 听说 ForkJoinPool 线程池效率更高,为什么呢?
- 如果批量处理异步程序,有什么可用的方案吗?