异步编程好帮手之CompletableFuture详解

简介: 异步编程好帮手之CompletableFuture详解

我们异步执行一个任务时,一般是用线程池Executor去创建。如果不需要有返回值, 任务实现Runnable接口;如果需要有返回值,任务实现Callable接口,调用Executor的submit方法,再使用Future获取即可。如果多个线程存在依赖组合的话,我们怎么处理呢?可使用同步组件CountDownLatch、CyclicBarrier等,但是比较麻烦。其实有简单的方法,就是用CompeletableFuture。最近刚好使用CompeletableFuture优化了项目中的代码,所以跟大家一起学习CompletableFuture。

一个例子回顾 Future

因为CompletableFuture实现了Future接口,我们先来回顾Future吧。

Future是Java5新加的一个接口,它提供了一种异步并行计算的功能。如果主线程需要执行一个很耗时的计算任务,我们就可以通过future把这个任务放到异步线程中执行。主线程继续处理其他任务,处理完成后,再通过Future获取计算结果。

来看个简单例子吧,假设我们有两个任务服务,一个查询用户基本信息,一个是查询用户勋章信息。如下,

public class UserInfoService {
    public UserInfo getUserInfo(Long userId) throws InterruptedException {
        Thread.sleep(300);//模拟调用耗时
        return new UserInfo("666", "翎野君", 27); //一般是查数据库,或者远程调用返回的
    }
}
public class MedalService {
    public MedalInfo getMedalInfo(long userId) throws InterruptedException {
        Thread.sleep(500); //模拟调用耗时
        return new MedalInfo("666", "守护勋章");
    }
}

接下来,我们来演示下,在主线程中是如何使用Future来进行异步调用的。

public class FutureTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        UserInfoService userInfoService = new UserInfoService();
        MedalService medalService = new MedalService();
        long userId =666L;
        long startTime = System.currentTimeMillis();
        //调用用户服务获取用户基本信息
        FutureTask<UserInfo> userInfoFutureTask = new FutureTask<>(new Callable<UserInfo>() {
            @Override
            public UserInfo call() throws Exception {
                return userInfoService.getUserInfo(userId);
            }
        });
        executorService.submit(userInfoFutureTask);
        Thread.sleep(300); //模拟主线程其它操作耗时
        FutureTask<MedalInfo> medalInfoFutureTask = new FutureTask<>(new Callable<MedalInfo>() {
            @Override
            public MedalInfo call() throws Exception {
                return medalService.getMedalInfo(userId);
            }
        });
        executorService.submit(medalInfoFutureTask);
        UserInfo userInfo = userInfoFutureTask.get();//获取个人信息结果
        MedalInfo medalInfo = medalInfoFutureTask.get();//获取勋章信息结果
        System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms");
    }
}

运行结果:

总共用时806ms

如果我们不使用Future进行并行异步调用,而是在主线程串行进行的话,耗时大约为300+500+300 = 1100 ms。可以发现,future+线程池异步配合,提高了程序的执行效率。

但是Future对于结果的获取,不是很友好,只能通过阻塞或者轮询的方式得到任务的结果。

  • Future.get() 就是阻塞调用,在线程获取结果之前get方法会一直阻塞
  • Future提供了一个isDone方法,可以在程序中轮询这个方法查询执行结果。

阻塞的方式和异步编程的设计理念相违背,而轮询的方式会耗费无谓的CPU资源。因此,JDK8设计出CompletableFuture。CompletableFuture提供了一种观察者模式类似的机制,可以让任务执行完成后通知监听的一方。

一个例子走进CompletableFuture

我们还是基于以上Future的例子,改用CompletableFuture 来实现

public class FutureTest {
    public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
        UserInfoService userInfoService = new UserInfoService();
        MedalService medalService = new MedalService();
        long userId =666L;
        long startTime = System.currentTimeMillis();
        //调用用户服务获取用户基本信息
        CompletableFuture<UserInfo> completableUserInfoFuture = CompletableFuture.supplyAsync(() -> userInfoService.getUserInfo(userId));
        Thread.sleep(300); //模拟主线程其它操作耗时
        CompletableFuture<MedalInfo> completableMedalInfoFuture = CompletableFuture.supplyAsync(() -> medalService.getMedalInfo(userId)); 
        UserInfo userInfo = completableUserInfoFuture.get(2,TimeUnit.SECONDS);//获取个人信息结果
        MedalInfo medalInfo = completableMedalInfoFuture.get();//获取勋章信息结果
        System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms");
    }
}

可以发现,使用CompletableFuture,代码简洁了很多。CompletableFuture的supplyAsync方法,提供了异步执行的功能,线程池也不用单独创建了。实际上,它CompletableFuture使用了默认线程池是ForkJoinPool.commonPool

CompletableFuture提供了几十种方法,辅助我们的异步任务场景。这些方法包括创建异步任务、任务异步回调、多个任务组合处理等方面。我们一起来学习吧

创建异步任务

CompletableFuture创建异步任务,一般有supplyAsync和runAsync两个方法

  • supplyAsync执行CompletableFuture任务,支持返回值
  • runAsync执行CompletableFuture任务,没有返回值。

supplyAsync方法

//使用默认内置线程池ForkJoinPool.commonPool(),根据supplier构建执行任务
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
//自定义线程,根据supplier构建执行任务
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

runAsync方法

//使用默认内置线程池ForkJoinPool.commonPool(),根据runnable构建执行任务
public static CompletableFuture<Void> runAsync(Runnable runnable) 
//自定义线程,根据runnable构建执行任务
public static CompletableFuture<Void> runAsync(Runnable runnable,  Executor executor)

实例代码如下:

public class FutureTest {
    public static void main(String[] args) {
        //可以自定义线程池
        ExecutorService executor = Executors.newCachedThreadPool();
        //runAsync的使用
        CompletableFuture<Void> runFuture = CompletableFuture.runAsync(() -> System.out.println("run,关注公众号:翎野君"), executor);
        //supplyAsync的使用
        CompletableFuture<String> supplyFuture = CompletableFuture.supplyAsync(() -> {
                    System.out.print("supply,关注公众号:翎野君");
                    return "翎野君"; }, executor);
        //runAsync的future没有返回值,输出null
        System.out.println(runFuture.join());
        //supplyAsync的future,有返回值
        System.out.println(supplyFuture.join());
        executor.shutdown(); // 线程池需要关闭
    }
}
//输出
run,关注公众号:翎野君
null
supply,关注公众号:翎野君

注意避坑

1. Future需要获取返回值,才能获取异常信息

ExecutorService executorService = new ThreadPoolExecutor(5, 10, 5L,
    TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
      int a = 0;
      int b = 666;
      int c = b / a;
      return true;
   },executorService).thenAccept(System.out::println);
 //如果不加 get()方法这一行,看不到异常信息
 //future.get();

Future需要获取返回值,才能获取到异常信息。如果不加 get()/join()方法,看不到异常信息。小伙伴们使用的时候,注意一下哈,考虑是否加try...catch...或者使用exceptionally方法。

2. CompletableFuture的get()方法是阻塞的。

CompletableFuture的get()方法是阻塞的,如果使用它来获取异步调用的返回值,需要添加超时时间~

//反例
 CompletableFuture.get();
//正例
CompletableFuture.get(5, TimeUnit.SECONDS);

3. 默认线程池的注意点

CompletableFuture代码中又使用了默认的线程池,处理的线程个数是电脑CPU核数-1。在大量请求过来的时候,处理逻辑复杂的话,响应会很慢。一般建议使用自定义线程池,优化线程池配置参数。

4. 自定义线程池时,注意饱和策略

CompletableFuture的get()方法是阻塞的,我们一般建议使用future.get(3, TimeUnit.SECONDS)。并且一般建议使用自定义线程池。

但是如果线程池拒绝策略是DiscardPolicy或者DiscardOldestPolicy,当线程池饱和时,会直接丢弃任务,不会抛弃异常。因此建议,CompletableFuture线程池策略最好使用AbortPolicy,然后耗时的异步线程,做好线程池隔离哈。

本篇文章如有帮助到您,请给「翎野君」点个赞,感谢您的支持。


目录
相关文章
|
15天前
|
消息中间件 缓存 NoSQL
Java多线程实战-CompletableFuture异步编程优化查询接口响应速度
Java多线程实战-CompletableFuture异步编程优化查询接口响应速度
|
7月前
|
Java
异步编程 - 07 基于JDK中的Future实现异步编程(下)_当Stream遇见CompletableFuture
异步编程 - 07 基于JDK中的Future实现异步编程(下)_当Stream遇见CompletableFuture
36 0
|
8月前
|
消息中间件 存储 Java
一网打尽异步神器CompletableFuture
最近一直畅游在RocketMQ的源码中,发现在RocketMQ中很多地方都使用到了CompletableFuture,所以今天就跟大家来聊一聊JDK1.8提供的异步神器CompletableFuture,并且最后会结合RocketMQ源码分析一下CompletableFuture的使用。
|
10月前
|
消息中间件 Java 数据库
实现异步编程的方式
实现异步编程的方式
|
12月前
|
Java API 调度
【并发编程】异步编程CompletableFuture实战
【并发编程】异步编程CompletableFuture实战
【并发编程】异步编程CompletableFuture实战
|
12月前
|
Java
异步利刃CompletableFuture
异步利刃CompletableFuture
69 0
|
机器学习/深度学习 Java 编译器
2.2异步编程
.net core异步编程
异步神器CompletableFuture
异步神器CompletableFuture
|
存储 算法 前端开发
一文了解异步编程基础
异步编程是指并发编程的范式,其中除了单个主应用程序线程之外,工作可以委托给一个或多个并行工作线程。这被称为非阻塞系统,其中整体系统速度不受订单执行的影响,并且多个进程可以同时发生。