Java异步编程之Future、CompletableFuture

简介: 本文从后端面临的几个压力开始讲起,分析了异步为什么可以提高性能、提高资源利用率,然后通过代码示例,介绍了Java对异步的API级别的支持,如Java5就出现的Future,以及Java8增强的CompletableFuture,最后,我们结合了Java Stream,完成了一个综合的异步应用。

为什么需要异步?

随着应用越来越复杂,用户基数越来越大,后端服务将面临着极大的压力,这个压力主要体现在两个方面:

  1. 是否能为用户提供持续高效稳定的服务,提高可用性;
  2. 是否能充分利用服务器资源做更多的事情,降低成本;

我们先看第1点,大家作为资深互联网用户,其实都是知道自己不好伺候的,比如说你现在去某个电商平台购买物品,你几乎无法忍受任何延迟,哪怕对方客服告诉你:我们在你购买时,会同时帮你计算积分,所以会有所延迟...但真正“抢”购时,韭菜都更在意速度而不是积分的,对吧?

再看看第2点,后端服务的压力越来越大时,工程师们会非常上火,为了周末能顺利陪女朋友玩耍(假如有的话),打算来个暴力解决方案:升配!这个时候,运维站出来,给你贴了一张CPU和内存利用率的图,说道:代码写的烂,升配解决不了的,资源利用率也太低了。架构师老脸一红,站出来了:大家周末加加班,一起优化下代码吧~

好了,我们怎么通过优化代码,来解决这两个问题呢?答案是:异步。

我们大部分代码,都是同步调用的,所谓同步,是指调用方必须等到方法返回后才会进行下一步,而异步,是指调用方不必等到方法返回,只需要方法在完成任务后通知调用方即可。

以上面的电商为例,当用户购买商品时,系统会计算用户积分,而积分的计算,涉及到很多规则,也需要操作数据库或其他数据源,往往会比较耗时,最重要的,它并不属于购物主流程,假如做成同步顺序执行,用户会等待很长时间。对于这种业务,我们可以考虑将其异步化,让其不耽误主流程的继续进行。

那么,异步是怎么解决资源利用的问题的呢?当你的业务逻辑都是同步执行的,也就意味着一旦被耗时的操作阻塞,是没有执行其他业务逻辑的机会的,此时CPU几乎被闲置,利用率低,假如此时是通过多线程异步执行的,那么其他线程仍然能同时处理其他逻辑,CPU会一直跑。这里顺便提一下:异步和多线程并不是一个等价的概念,Java中的异步是依靠多线程实现的。

Java对异步编程的支持

使用Future编写异步代码

Java对异步API的直接支持是从JDK1.5开始的,在该版本的并发包下(JUG),提供了Future,它表示一个异步计算的结果,该结果只能在执行完成后,由其get方法获取到,在这之前,get方法会一直阻塞,除非设置了超时时间,下面先看个例子:

ExecutorService executorService=Executors.newFixedThreadPool(5);
Future<Integer> future=executorService.submit(()->{
            System.out.println("hello future");
            Thread.sleep(3000);
            //模拟计算错误
            //int i=10/0;
            return 1;
 });
 
System.out.println("...其他操作...");
 
try {
    Integer count = future.get(5,TimeUnit.SECONDS);
    System.out.println("count: "+count);
} catch (InterruptedException e) {
     e.printStackTrace();
} catch (ExecutionException e) {
     e.printStackTrace();
} catch (TimeoutException e) {
    e.printStackTrace();
}

这段代码并不难,但是很多工程师用法不对,会导致很多不该出的问题。Future是靠线程池驱动的,以前线程池的使用经验仍然是基础,以前线程池出什么问题,现在依然会出什么问题。Future适用于把计算剥离在一个单独的线程环境中处理,然后期待它“未来”返回一个结果给你。【在任何时候】,这个“未来”都是要做超时控制的,否则会一直阻塞下去,你无法处理一个看不见的未来...

在上面的代码中,我们调用了Future的get方法来获取返回值,假如计算还未结束(模拟了3秒),那么此时会阻塞当前线程。在这个过程中,可能会遇到三种异常,InterruptedException表示线程被中断,ExecutionException表示计算异常,TimeoutException表示超时异常。很多工程师习惯用一个Exception来catch所有异常,这是会出现很多问题的。比如说ExecutionException,是计算体内抛出的异常包装的(可以解开模拟异常代码的注释进行测试),在发生异常时,一般情况下内部计算是失败的,而TimeoutException是计算体执行太长,外面等不了导致的,并不一定代表内部的计算是失败的(会继续执行下去),假如说把这两种异常放一起处理,是有可能出现问题的,除非能完全保证计算的幂等性,然后统一进行重试。

Future的增强版:CompletableFuture

Future使用起来比较简单,但是也能看出它的一些局限性。比如说,我们要获取计算结果,仍然只能通过阻塞调用(get),并且,它不太便于我们做异步计算的编排组合。而JDK8出现的CompletableFuture弥补了它的这些短板。

CompletableFuture实现了Future接口,既可以完成Future所有能做的事。最关键是,它还实现了CompletionStage接口,该接口描述了一个异步计算的阶段。很多计算可以分成多个阶段或步骤,此时可以通过它将所有步骤组合起来,形成异步计算的流水线。

CompletableFuture基本用法

CompletableFuture主要提供两套工具函数来创建异步处理对象:

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor) 

public static CompletableFuture<Void> runAsync(Runnable runnable)

public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)

其中supplyAsync主要用于需要返回计算结果的异步处理;runAsync主要用于不需要返回计算结果的异步处理,下面我们看看supplyAsync的用法:

CompletableFuture<String> completableFuture=CompletableFuture.supplyAsync(()->{
            //模拟耗时操作,sleep 3秒
            timeConsuming(3000);
            System.out.println(Thread.currentThread().getName());
            return "Hello CompletableFuture";
 });
 
    /**
     * 模拟耗时处理
     * @param millis
     */
    public static void timeConsuming(long millis){
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

这段代码中,我们在模拟耗时3秒后,返回了计算结果,那么怎么获取该结果呢?前面我们提到,CompletableFuture实现了Future接口,所以通过get方法肯定是能获取到的,但是这样做并不好,更好的做法是发挥其另外一个实现接口CompletionStage的作用,让结果处理也异步化,CompletionStage提供很多链式函数来做各种结果处理:

completableFuture.thenApply(word->{
        System.out.println(Thread.currentThread().getName());
        //模拟异常
        //int i=10/0;
        return word+" stage";
}).exceptionally(ex->{
    System.out.println("stage1: "+ex.getMessage());
    return "hello";
}).thenApply(word->{
        System.out.println(Thread.currentThread().getName());
        return word.length();
}).thenAccept(System.out::println);

thenApply方法会沿用之前的线程上下文来执行计算,这段代码中,我们连续调用两次thenApplay,每次都是将上次转换的结果作为本次计算的入参,最后,调用thenAccept完成最终处理(消费)。thenApply方法还有个对应的thenApplyAsync方法,它与前者的区别在于:会将任务重新提交到线程池异步执行,而不是沿用之前的线程上下文,说白了就是另外一个异步执行,thenAccept同理。exceptionally用于异常处理,当发生异常时,返回一个默认或者修正后的结果,后面的thenApply会继续将此结果作为入参进行计算。

线程池解惑

不知道大家是否会有疑问,之前我们讲过,异步是依赖于线程池实现的,但是我们在使用CompletableFuture的过程中,并没有看到任何线程池的设置,这是什么原因呢?实际上,JDK8中内置了一个公共线程池ForkJoinPool.commonPool(),在没有显示设置线程池的时候,就使用的该公共线程池。大家可以运行上述代码,看下当前线程名称的打印,会出现“ForkJoinPool.commonPool-worker”的字样。在实际上项目中,笔者还是建议大家手动设置一下线程池,线程隔离会让计算更加高效、安全(比如不会因为线程池的不可用,导致所有依赖它的任务都得不到执行),我们可以调用supplyAsync或runAsync带有Executor的重载方法进行设置。这里顺便提一下,Java8中的并行流parallelStream,内部也是通过ForkJoinPool.commonPool()执行的,我们是无法为它指定线程池的。

组合CompletableFuture

CompletableFuture提供的另外一个超强功能就是可以让多个异步处理组合成新的CompletableFuture,被合并的异步处理,可以有依赖关系,也可以是两个完全独立的个体,工程师完全可以把精力放在业务开发上,而无需像以前一样,自己管理异步流水线。下面我们分别以thenCompose和thenCombine为例进行说明。

CompletableFuture可以通过调用thenCompose,将处理结果传递给下一个CompletableFuture去处理,然后返回一个新的CompletableFuture。设想一个虚拟场景:在购物应用里,我们要在处理订单之后,通过订单金额来给用户算积分,下面看看示例代码:

CompletableFuture<Integer> resultFuture=CompletableFuture.supplyAsync(()->{
            //模拟耗时2秒处理订单
            timeConsuming(2000);
            //返回订单金额
           return 30;
}).thenCompose(value->
            CompletableFuture.supplyAsync(()->{
                //根据订单金额计算积分
                if(value<50){
                    return 1;
                }else{
                    return 2;
                }
            })
);

try {
        Integer result=resultFuture.get(5,TimeUnit.SECONDS);
            System.out.println("result= "+result);//result=1
} catch (InterruptedException e) {
        e.printStackTrace();
} catch (ExecutionException e) {
        e.printStackTrace();
} catch (TimeoutException e) {
        e.printStackTrace();
}

在这段代码中,我们仍然首先通过supplyAsync构建了订单异步处理的CompletableFuture对象,并将计算好的结果通过thenCompose传给了积分异步处理的CompletableFuture对象,最终通过get方法获取到积分结果。

CompletableFuture的thenCombine方法更有趣,它不需要关心多个异步之间是否有数据依赖关系,只要我们需要,就可以并行的将各个异步处理的结果合并起来。比如说,我们想获取某本书在A、B两家书店的价格并计算均值,那么可以这么做:

CompletableFuture<Integer> resultFuture=CompletableFuture.supplyAsync(()->{
  //模拟耗时1秒获取A书店价格
  timeConsuming(1000);
  //返回价格
  return 30;
}).thenCombine(
  CompletableFuture.supplyAsync(()->{
  //模拟耗时1秒获取B书店价格
  timeConsuming(1000);
  //返回价格
  return 50;
}),(a,b)->{
        System.out.println("a="+a);//30
        System.out.println("b="+b);//50
        return (a+b)/2;
}
);

这两个操作A、B是完全异步执行的,理论上,获取两个价格的最大时间=max(A、B),而不是A、B两个时间叠加,在实际应用中,性能提升会比较明显。

结合Stream

接着上面那个场景来说,假如现在有多本书,都需要知道在书店A、B中的价格和均价,并且以列表形式存储起来,这个如何做呢?这种场景下,结合Stream来做是非常爽的。


/*********准备【书籍_书店_价格】数据 begin***********/
Map<String,Integer> priceMap=new HashMap<>();
priceMap.put("001_A",30);
priceMap.put("001_B",50);

priceMap.put("002_A",40);
priceMap.put("002_B",60);

priceMap.put("003_A",100);
priceMap.put("003_B",120);

/*********准备【书籍_书店_价格】数据 end***********/

//待查书籍ID列表
Stream<String> bookStreams=Arrays.stream(new String[]{"001","002","003"});

List<CompletableFuture<Integer>> futureList=bookStreams.map(book->CompletableFuture.supplyAsync(()->{
        //模拟耗时1秒获取A书店价格
        timeConsuming(2000);
        //返回价格
        return priceMap.get(book+"_A");
     }).thenCombine(
        CompletableFuture.supplyAsync(()->{
        //模拟耗时1秒获取A书店价格
        timeConsuming(2000);
        //返回价格
        return priceMap.get(book+"_B");
     }),(a,b)->{
        //计算均值
        return (a+b)/2;
      }
)).collect(Collectors.toList());

List<Integer> list=futureList.stream().map(CompletableFuture::join).collect(Collectors.toList());

这段代码中,我们首先准备了一些基础数据,为了演示方便,我这里直接用Map预存储了【书籍_书店_价格】的关系,其中key=书籍ID_书店标识,value=价格。最核心的代码就是Stream.map方法,这个方法在之前文章里面讲过,它主要用于将Stream中的每个元素迭代出来,然后转换成新的元素。在这里,我们是把之前的异步处理逻辑直接放在map里,即:让每个元素都去异步计算两个书店的价格并取均值。注意:map里面得到的元素并不是最终的均值,而是包含均值的CompletableFuture对象。我们在获取到CompletableFuture列表后,最终通过join方法,将每个对象中的值提取出来,形成最终的价格列表。

总结

本文从后端面临的几个压力开始讲起,分析了异步为什么可以提高性能、提高资源利用率,然后通过代码示例,介绍了Java对异步的API级别的支持,如Java5就出现的Future,以及Java8增强的CompletableFuture,最后,我们结合了Java Stream,完成了一个综合的异步应用,大家不妨自行尝试一下。同时也提醒大家:在使用这些API的过程中,有一些非常重要的细节要处理好,比如异常处理、线程池定义(隔离)等,生产级代码,全靠细节的把控!

目录
相关文章
|
1月前
|
Java 流计算
利用java8 的 CompletableFuture 优化 Flink 程序
本文探讨了Flink使用avatorscript脚本语言时遇到的性能瓶颈,并通过CompletableFuture优化代码,显著提升了Flink的QPS。文中详细介绍了avatorscript的使用方法,包括自定义函数、从Map中取值、使用Java工具类及AviatorScript函数等,帮助读者更好地理解和应用avatorscript。
利用java8 的 CompletableFuture 优化 Flink 程序
|
4月前
|
缓存 Java 调度
Java并发编程:深入解析线程池与Future任务
【7月更文挑战第9天】线程池和Future任务是Java并发编程中非常重要的概念。线程池通过重用线程减少了线程创建和销毁的开销,提高了资源利用率。而Future接口则提供了检查异步任务状态和获取任务结果的能力,使得异步编程更加灵活和强大。掌握这些概念,将有助于我们编写出更高效、更可靠的并发程序。
|
4月前
|
Java API 数据库
深研Java异步编程:CompletableFuture与反应式编程范式的融合实践
【7月更文挑战第1天】Java 8的CompletableFuture革新了异步编程,提供链式处理和优雅的错误处理。反应式编程,如Project Reactor,强调数据流和变化传播,擅长处理大规模并发和延迟敏感任务。两者结合,如通过Mono转换CompletableFuture,兼顾灵活性与资源管理,提升现代Java应用的并发性能和响应性。开发者可按需选择和融合这两种技术,以适应高并发环境。
54 1
|
5月前
|
Java API 数据库
深研Java异步编程:CompletableFuture与反应式编程范式的融合实践
【6月更文挑战第30天】Java 8的CompletableFuture革新了异步编程,提供如thenApply等流畅接口,而Java 9后的反应式编程(如Reactor)强调数据流和变化传播,以事件驱动应对高并发。两者并非竞争关系,而是互补,通过Flow API和第三方库结合,如将CompletableFuture转换为Mono进行反应式处理,实现更高效、响应式的系统设计。开发者可根据需求灵活选用,提升现代Java应用的并发性能。
79 1
|
2月前
|
Java
JAVA并发编程系列(13)Future、FutureTask异步小王子
本文详细解析了Future及其相关类FutureTask的工作原理与应用场景。首先介绍了Future的基本概念和接口方法,强调其异步计算特性。接着通过FutureTask实现了一个模拟外卖订单处理的示例,展示了如何并发查询外卖信息并汇总结果。最后深入分析了FutureTask的源码,包括其内部状态转换机制及关键方法的实现原理。通过本文,读者可以全面理解Future在并发编程中的作用及其实现细节。
|
5月前
|
设计模式 Java API
实战分析Java的异步编程,并通过CompletableFuture进行高效调优
【6月更文挑战第7天】实战分析Java的异步编程,并通过CompletableFuture进行高效调优
94 2
|
4月前
|
并行计算 算法 Java
Java面试题:解释Java中的无锁编程的概念,Java中的Fork/Join框架的作用和使用场景,Java中的CompletableFuture的作用和使用场景
Java面试题:解释Java中的无锁编程的概念,Java中的Fork/Join框架的作用和使用场景,Java中的CompletableFuture的作用和使用场景
34 0
|
4月前
|
安全 Java 数据库连接
Java面试题:解释Java内存模型的无锁编程支持,并讨论其优势和局限性,解释Java中的CompletableFuture的工作原理,并讨论其在异步编程中的应用
Java面试题:解释Java内存模型的无锁编程支持,并讨论其优势和局限性,解释Java中的CompletableFuture的工作原理,并讨论其在异步编程中的应用
30 0
|
5月前
|
并行计算 Java API
Java8实战-CompletableFuture:组合式异步编程
Java8实战-CompletableFuture:组合式异步编程
67 0
|
Java API 网络架构
20个使用 Java CompletableFuture的示例(下)
20个使用 Java CompletableFuture的示例(下)
272 1