我们异步执行一个任务时,一般是用线程池Executor去创建。如果不需要有返回值, 任务实现Runnable接口;如果需要有返回值,任务实现Callable接口,调用Executor的submit方法,再使用Future获取即可。如果多个线程存在依赖组合的话,我们怎么处理呢?可使用同步组件CountDownLatch、CyclicBarrier等,但是比较麻烦。其实有简单的方法,就是用CompeletableFuture。最近刚好使用CompeletableFuture优化了项目中的代码,所以跟大家一起学习CompletableFuture。
一个例子回顾 Future
因为CompletableFuture实现了Future
接口,我们先来回顾Future吧。
Future是Java5新加的一个接口,它提供了一种异步并行计算的功能。如果主线程需要执行一个很耗时的计算任务,我们就可以通过future把这个任务放到异步线程中执行。主线程继续处理其他任务,处理完成后,再通过Future获取计算结果。
来看个简单例子吧,假设我们有两个任务服务,一个查询用户基本信息,一个是查询用户勋章信息。如下,
public class UserInfoService { public UserInfo getUserInfo(Long userId) throws InterruptedException { Thread.sleep(300);//模拟调用耗时 return new UserInfo("666", "翎野君", 27); //一般是查数据库,或者远程调用返回的 } } public class MedalService { public MedalInfo getMedalInfo(long userId) throws InterruptedException { Thread.sleep(500); //模拟调用耗时 return new MedalInfo("666", "守护勋章"); } }
接下来,我们来演示下,在主线程中是如何使用Future来进行异步调用的。
public class FutureTest { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(10); UserInfoService userInfoService = new UserInfoService(); MedalService medalService = new MedalService(); long userId =666L; long startTime = System.currentTimeMillis(); //调用用户服务获取用户基本信息 FutureTask<UserInfo> userInfoFutureTask = new FutureTask<>(new Callable<UserInfo>() { @Override public UserInfo call() throws Exception { return userInfoService.getUserInfo(userId); } }); executorService.submit(userInfoFutureTask); Thread.sleep(300); //模拟主线程其它操作耗时 FutureTask<MedalInfo> medalInfoFutureTask = new FutureTask<>(new Callable<MedalInfo>() { @Override public MedalInfo call() throws Exception { return medalService.getMedalInfo(userId); } }); executorService.submit(medalInfoFutureTask); UserInfo userInfo = userInfoFutureTask.get();//获取个人信息结果 MedalInfo medalInfo = medalInfoFutureTask.get();//获取勋章信息结果 System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms"); } }
运行结果:
总共用时806ms
如果我们不使用Future进行并行异步调用,而是在主线程串行进行的话,耗时大约为300+500+300 = 1100 ms。可以发现,future+线程池异步配合,提高了程序的执行效率。
但是Future对于结果的获取,不是很友好,只能通过阻塞或者轮询的方式得到任务的结果。
- Future.get() 就是阻塞调用,在线程获取结果之前get方法会一直阻塞。
- Future提供了一个isDone方法,可以在程序中轮询这个方法查询执行结果。
阻塞的方式和异步编程的设计理念相违背,而轮询的方式会耗费无谓的CPU资源。因此,JDK8设计出CompletableFuture。CompletableFuture提供了一种观察者模式类似的机制,可以让任务执行完成后通知监听的一方。
一个例子走进CompletableFuture
我们还是基于以上Future的例子,改用CompletableFuture 来实现
public class FutureTest { public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException { UserInfoService userInfoService = new UserInfoService(); MedalService medalService = new MedalService(); long userId =666L; long startTime = System.currentTimeMillis(); //调用用户服务获取用户基本信息 CompletableFuture<UserInfo> completableUserInfoFuture = CompletableFuture.supplyAsync(() -> userInfoService.getUserInfo(userId)); Thread.sleep(300); //模拟主线程其它操作耗时 CompletableFuture<MedalInfo> completableMedalInfoFuture = CompletableFuture.supplyAsync(() -> medalService.getMedalInfo(userId)); UserInfo userInfo = completableUserInfoFuture.get(2,TimeUnit.SECONDS);//获取个人信息结果 MedalInfo medalInfo = completableMedalInfoFuture.get();//获取勋章信息结果 System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms"); } }
可以发现,使用CompletableFuture,代码简洁了很多。CompletableFuture的supplyAsync方法,提供了异步执行的功能,线程池也不用单独创建了。实际上,它CompletableFuture使用了默认线程池是ForkJoinPool.commonPool。
CompletableFuture提供了几十种方法,辅助我们的异步任务场景。这些方法包括创建异步任务、任务异步回调、多个任务组合处理等方面。我们一起来学习吧
创建异步任务
CompletableFuture创建异步任务,一般有supplyAsync和runAsync两个方法
- supplyAsync执行CompletableFuture任务,支持返回值
- runAsync执行CompletableFuture任务,没有返回值。
supplyAsync方法
//使用默认内置线程池ForkJoinPool.commonPool(),根据supplier构建执行任务 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) //自定义线程,根据supplier构建执行任务 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
runAsync方法
//使用默认内置线程池ForkJoinPool.commonPool(),根据runnable构建执行任务 public static CompletableFuture<Void> runAsync(Runnable runnable) //自定义线程,根据runnable构建执行任务 public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
实例代码如下:
public class FutureTest { public static void main(String[] args) { //可以自定义线程池 ExecutorService executor = Executors.newCachedThreadPool(); //runAsync的使用 CompletableFuture<Void> runFuture = CompletableFuture.runAsync(() -> System.out.println("run,关注公众号:翎野君"), executor); //supplyAsync的使用 CompletableFuture<String> supplyFuture = CompletableFuture.supplyAsync(() -> { System.out.print("supply,关注公众号:翎野君"); return "翎野君"; }, executor); //runAsync的future没有返回值,输出null System.out.println(runFuture.join()); //supplyAsync的future,有返回值 System.out.println(supplyFuture.join()); executor.shutdown(); // 线程池需要关闭 } } //输出 run,关注公众号:翎野君 null supply,关注公众号:翎野君
注意避坑
1. Future需要获取返回值,才能获取异常信息
ExecutorService executorService = new ThreadPoolExecutor(5, 10, 5L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10)); CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> { int a = 0; int b = 666; int c = b / a; return true; },executorService).thenAccept(System.out::println); //如果不加 get()方法这一行,看不到异常信息 //future.get();
Future需要获取返回值,才能获取到异常信息。如果不加 get()/join()方法,看不到异常信息。小伙伴们使用的时候,注意一下哈,考虑是否加try...catch...或者使用exceptionally方法。
2. CompletableFuture的get()方法是阻塞的。
CompletableFuture的get()方法是阻塞的,如果使用它来获取异步调用的返回值,需要添加超时时间~
//反例 CompletableFuture.get(); //正例 CompletableFuture.get(5, TimeUnit.SECONDS);
3. 默认线程池的注意点
CompletableFuture代码中又使用了默认的线程池,处理的线程个数是电脑CPU核数-1。在大量请求过来的时候,处理逻辑复杂的话,响应会很慢。一般建议使用自定义线程池,优化线程池配置参数。
4. 自定义线程池时,注意饱和策略
CompletableFuture的get()方法是阻塞的,我们一般建议使用future.get(3, TimeUnit.SECONDS)
。并且一般建议使用自定义线程池。
但是如果线程池拒绝策略是DiscardPolicy
或者DiscardOldestPolicy
,当线程池饱和时,会直接丢弃任务,不会抛弃异常。因此建议,CompletableFuture线程池策略最好使用AbortPolicy,然后耗时的异步线程,做好线程池隔离哈。
本篇文章如有帮助到您,请给「翎野君」点个赞,感谢您的支持。