Java新特性:异步编排CompletableFuture

简介: CompletableFuture由Java 8提供,是实现异步化的工具类,上手难度较低,且功能强大,支持通过函数式编程的方式对各类操作进行组合编排。 CompletableFuture实现了CompletionStage接口和Future接口,前者是对后者的一个扩展,增加了异步[回调](https://so.csdn.net/so/search?q=回调&spm=1001.2101.3001.7020)、流式处理、多个Future组合处理的能力,使Java在处理多任务的协同工作时更加顺畅便利。

CompletableFuture由Java 8提供,是实现异步化的工具类,上手难度较低,且功能强大,支持通过函数式编程的方式对各类操作进行组合编排。 CompletableFuture实现了CompletionStage接口和Future接口,前者是对后者的一个扩展,增加了异步回调、流式处理、多个Future组合处理的能力,使Java在处理多任务的协同工作时更加顺畅便利。

1.背景

随着业务项目数量的增大,系统服务面临的压力也越来越大,这时候系统吞吐量会下降,但是一些核心功能的接口必须保证高吞吐量,低延迟。这时候我们就需要对接口进行优化,提升性能,从而保证高吞吐量。这时候CompletableFuture就用很大的用武之地了,我们一些核心接口处理数据都是串行执行的,但是其实接口的某些数据获取、处理封装并没有前后依赖关系,我们大可并行处理,这样就可以充分利用cpu资源。

一般我们的接口调用执行分为同步或者异步:

1.1 同步执行

通常我们的接口数据查询多次数据库获取数据然后进行处理,封装返回,或者是多次rpc调用其他服务获取数据,但是无论什么获取数据的操作,都是串行执行的,也就是操作2必须要等操作1完成之后在执行,即使操作1和操作2之间没有任何联系

在同步调用的场景下,接口耗时长、性能差,接口响应时长T = T1+T2+T3+……+Tn,这时为了缩短接口的响应时间,一般会使用线程池的方式并行获取数据

1.2 异步执行

使用并行获取数据,大大降低了接口对数据获取,处理的时间

项目推荐:基于SpringBoot2.x、SpringCloud和SpringCloudAlibaba企业级系统架构底层框架封装,解决业务开发时常见的非功能性需求,防止重复造轮子,方便业务快速开发和企业技术栈框架统一管理。引入组件化的思想实现高内聚低耦合并且高度可配置化,做到可插拔。严格控制包依赖和统一版本管理,做到最少化依赖。注重代码规范和注释,非常适合个人学习和企业使用

Github地址https://github.com/plasticene/plasticene-boot-starter-parent

Gitee地址https://gitee.com/plasticene3/plasticene-boot-starter-parent

微信公众号Shepherd进阶笔记

交流探讨群:Shepherd_126

2.CompletableFuture使用

下面我们通过一个例子来讲解CompletableFuture如何使用,商品详情接口返回数据使用CompletableFuture进行数据封装任务进行异步编排:

    private static ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
            .setNameFormat("product-pool-%d").build();

    private static ExecutorService fixedThreadPool = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() * 2,
            Runtime.getRuntime().availableProcessors() * 40,
            0L,
            TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>(Runtime.getRuntime().availableProcessors() * 20),
            namedThreadFactory);

/**
     * 使用completableFuture执行多线程任务安排,提高速度,completableFuture可以让某些异步线程任务串行化顺序执行
     * 如果不要求某些异步任务串行化顺序执行,那么也可以JUC里面另一个countDownLatch实现
     *
     * @param skuId
     * @return
     */
    @Override
    public SkuInfo getSkuDetail(Long skuId) {
   
   
        SkuInfo skuInfo = new SkuInfo();
        // 获取sku信息
        CompletableFuture<ProductSku> skuFuture = CompletableFuture.supplyAsync(() -> {
   
   
            ProductSku sku = productSkuDAO.selectById(skuId);
            skuInfo.setSku(sku);
            return sku;
        }, fixedThreadPool);
        // 异步获取spu信息
        CompletableFuture<ProductSpu> spuFuture = skuFuture.thenApplyAsync(sku -> {
   
   
            ProductSpu spu = productSpuDAO.selectById(sku.getSpuId());
            skuInfo.setSpu(spu);
            return spu;
        }, fixedThreadPool);
        // 异步获取品牌信息
        CompletableFuture<BrandDTO> brandFuture = skuFuture.thenApplyAsync(sku -> {
   
   
            BrandDTO brandDTO = brandService.getBrandDetail(sku.getBrandId());
            skuInfo.setBrand(brandDTO);
            return brandDTO;
        }, fixedThreadPool);
       // 异步获取分类信息
        CompletableFuture<CategoryDTO> categoryFuture = skuFuture.thenApplyAsync(sku -> {
   
   
            CategoryDTO categoryDTO = categoryService.getCategoryDetail(sku.getCategoryId());
            skuInfo.setCategory(categoryDTO);
            return categoryDTO;
        }, fixedThreadPool);
        try {
   
   
            // 最后等待所有异步任务执行完成返回封装结果
            CompletableFuture.allOf(skuFuture, spuFuture, brandFuture, categoryFuture).get();
        } catch (Exception e) {
   
   
            log.error("<=======等候所有任务执行过程报错:======>", e);
        }
        return skuInfo;
    }

2.1 supplyAsync / runAsync

supplyAsync表示创建带返回值的异步任务的,相当于ExecutorService submit(Callable task) 方法,runAsync表示创建无返回值的异步任务,相当于ExecutorService submit(Runnable task)方法,这两方法的效果跟submit是一样的,测试用例如下:

    /**
     * 测试方法CompletableFuture.runAsync:无返回值,
     */
    private static void testRunAsync() {
   
   
        CompletableFuture.runAsync(() ->{
   
   
            System.out.println("<======当前线程:" + Thread.currentThread().getName() + "=====线程id: " + Thread.currentThread().getId());
            System.out.println("supplyAsync 是否为守护线程 " + Thread.currentThread().isDaemon());
            int result = 10/2;
            System.out.println("计算结果为:"+ result);
        }, fixedThreadPool);
    }

    /**
     * 测试方法CompletableFuture.supplyAsync:有返回值
     * @throws ExecutionException
     * @throws InterruptedException
     */
    private static void testSupplyAsync() throws ExecutionException, InterruptedException {
   
   
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
   
   
            System.out.println("<======当前线程:" + Thread.currentThread().getName() + "=====线程id: " + Thread.currentThread().getId());
            int result = 10 / 2;
            return result;
        }, fixedThreadPool);
        Integer res = future.get();
        System.out.println("返回结果值为:"+res);
    }

这两方法各有一个重载版本,可以指定执行异步任务的Executor实现,如果不指定,默认使用ForkJoinPool.commonPool(),如果机器是单核的,则默认使用ThreadPerTaskExecutor,该类是一个内部类,每次执行execute都会创建一个新线程,具体可以看CompletableFuture源码。

2.2 thenApply / thenApplyAsync

thenApply 表示某个任务执行完成后执行的动作,即回调方法,会将该任务的执行结果即方法返回值作为入参传递到回调方法中,

 /**
     * 线程串行化
     * 1、thenRun:不能获取上一步的执行结果
     * 2、thenAcceptAsync:能接受上一步结果,但是无返回值
     * 3、thenApplyAsync:能接受上一步结果,有返回值
     *
     */
    private static void testThenApplyAsync() throws ExecutionException, InterruptedException {
   
   
        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
   
   
            System.out.println("<======当前线程:" + Thread.currentThread().getName() + "=====线程id: " + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("运行结果:" + i);
            try {
   
   
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
   
   
                e.printStackTrace();
            }
            return i;
        }, executor);
        CompletableFuture<String> future2 = future1.thenApplyAsync(res -> {
   
   
            System.out.println("======任务2启动了..." + res*20);
            return "Hello" + res;
        }, executor);

        CompletableFuture<Void> future3 = CompletableFuture.runAsync(() -> {
   
   
            System.out.println("======任务3执行了");
        }, executor);

        CompletableFuture.allOf(future1, future2, future3).get();
        System.out.println("=======测试结束");

    }

thenApplyAsync与thenApply的区别在于,前者是将job2提交到线程池中异步执行,实际执行future2的线程可能是另外一个线程,后者是由执行future1的线程立即执行future2,即两个future都是同一个线程执行的

2.3 exceptionally/whenComplete/handle

exceptionally方法指定某个任务执行异常时执行的回调方法,会将抛出异常作为参数传递到回调方法中,如果该任务正常执行则会exceptionally方法返回的CompletionStage的result就是该任务正常执行的结果;whenComplete是当某个任务执行完成后执行的回调方法,会将执行结果或者执行期间抛出的异常传递给回调方法,如果是正常执行则异常为null,回调方法对应的CompletableFuture的result和该任务一致,如果该任务正常执行,则get方法返回执行结果,如果是执行异常,则get方法抛出异常

 /**
     * 测试whenComplete和exceptionally: 异步方法执行完的处理
     */
    private static void testWhenCompleteAndExceptionally() throws ExecutionException, InterruptedException {
   
   
         CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
   
   
             System.out.println("<======当前线程:" + Thread.currentThread().getName() + "=====线程id: " + Thread.currentThread().getId());
             Integer num = 10;
             int i = num / 2;
             String s = String.valueOf(null);
             System.out.println("运行结果:" + i);
             return i;
         }, executor).whenComplete((res,exception) -> {
   
   
             //虽然能得到异常信息,但是没法修改返回数据
             System.out.println("<=====异步任务成功完成了=====结果是:" + res + "=======异常是:" + exception);
         }).exceptionally(throwable -> {
   
   
             //可以感知异常,同时返回默认值
             System.out.println("<=====异步任务成功发生异常了======"+throwable);
             return 10;
         });
        Integer result = future.get();
        System.out.println("<=====最终返回结果result=" + result + "======>");

    }

    /**
     * 测试handle方法:它是whenComplete和exceptionally的结合
     */
    private static void testHandle() {
   
   
         CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
   
   
             System.out.println("<======当前线程:" + Thread.currentThread().getName() + "=====线程id: " + Thread.currentThread().getId());
             int i = 10 / 2;
             System.out.println("运行结果:" + i);
             return i;
         }, executor).handle((result,thr) -> {
   
   
             if (result != null) {
   
   
                 return result * 2;
             }
             if (thr != null) {
   
   
                 System.out.println("异步任务成功完成了...结果是:" + result + "异常是:" + thr);
                 return 0;
             }
             return 0;
         });
    }

2.4 组合处理 thenCombine / thenAcceptBoth / runAfterBoth

这三个方法都是将两个CompletableFuture组合起来,只有这两个都正常执行完了才会执行某个任务,区别在于,thenCombine会将两个任务的执行结果作为方法入参传递到指定方法中,且该方法有返回值;thenAcceptBoth同样将两个任务的执行结果作为方法入参,但是无返回值;runAfterBoth没有入参,也没有返回值。注意两个任务中只要有一个执行异常,则将该异常信息作为指定任务的执行结果

    private static void thenCombine() throws Exception {
   
   
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "hello1", fixedThreadPool);
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "hello2", fixedThreadPool);
        CompletableFuture<String> result = future1.thenCombine(future2, (t, u) -> t+" "+u);
        System.out.println(result.get());
    }

        private static void thenAcceptBoth() throws Exception {
   
   
        CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {
   
   
            int t = new Random().nextInt(3);
            try {
   
   
                TimeUnit.SECONDS.sleep(t);
            } catch (InterruptedException e) {
   
   
                e.printStackTrace();
            }
            System.out.println("f1="+t);
            return t;
        },fixedThreadPool);

        CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(() -> {
   
   
            int t = new Random().nextInt(3);
            try {
   
   
                TimeUnit.SECONDS.sleep(t);
            } catch (InterruptedException e) {
   
   
                e.printStackTrace();
            }
            System.out.println("f2="+t);
            return t;
        },fixedThreadPool);
    }

2.5 applyToEither / acceptEither / runAfterEither

这三个方法都是将两个CompletableFuture组合起来,只要其中一个执行完了就会执行某个任务,其区别在于applyToEither会将已经执行完成的任务的执行结果作为方法入参,并有返回值;acceptEither同样将已经执行完成的任务的执行结果作为方法入参,但是没有返回值;runAfterEither没有方法入参,也没有返回值。注意两个任务中只要有一个执行异常,则将该异常信息作为指定任务的执行结果

    private static void applyToEither() throws Exception {
   
   
        CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {
   
   
            int t = new Random().nextInt(3);
            try {
   
   
                TimeUnit.SECONDS.sleep(t);
            } catch (InterruptedException e) {
   
   
                e.printStackTrace();
            }
            System.out.println("f1="+t);
            return t;
        },fixedThreadPool);
        CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(() -> {
   
   
            int t = new Random().nextInt(3);
            try {
   
   
                TimeUnit.SECONDS.sleep(t);
            } catch (InterruptedException e) {
   
   
                e.printStackTrace();
            }
            System.out.println("f2="+t);
            return t;
        },fixedThreadPool);

        CompletableFuture<Integer> result = f1.applyToEither(f2, t -> {
   
   
            System.out.println("applyEither:"+t);
            return t * 2;
        });

    }

    private static void acceptEither() throws Exception {
   
   
        CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {
   
   
            int t = new Random().nextInt(3);
            try {
   
   
                TimeUnit.SECONDS.sleep(t);
            } catch (InterruptedException e) {
   
   
                e.printStackTrace();
            }
            System.out.println("f1="+t);
            return t;
        },fixedThreadPool);
        CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(() -> {
   
   
            int t = new Random().nextInt(3);
            try {
   
   
                TimeUnit.SECONDS.sleep(t);
            } catch (InterruptedException e) {
   
   
                e.printStackTrace();
            }
            System.out.println("f2="+t);
            return t;
        },fixedThreadPool);

        CompletableFuture<Void> result = f1.acceptEither(f2, t -> {
   
   
            System.out.println("acceptEither:"+t);
        });

    }

2.6 allOf / anyOf

allOf返回的CompletableFuture是多个任务都执行完成后才会执行,只有有一个任务执行异常,则返回的CompletableFuture执行get方法时会抛出异常,如果都是正常执行,则get返回null。

    private static void testThenApplyAsync() throws ExecutionException, InterruptedException {
   
   
        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
   
   
            System.out.println("<======当前线程:" + Thread.currentThread().getName() + "=====线程id: " + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("运行结果:" + i);
            try {
   
   
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
   
   
                e.printStackTrace();
            }
            return i;
        }, executor);
        CompletableFuture<String> future2 = future1.thenApplyAsync(res -> {
   
   
            System.out.println("======任务2启动了..." + res*20);
            return "Hello" + res;
        }, executor);

        CompletableFuture<Void> future3 = CompletableFuture.runAsync(() -> {
   
   
            System.out.println("======任务3执行了");
        }, executor);

        CompletableFuture.allOf(future1, future2, future3).get();
        System.out.println("=======测试结束");

    }

注意,使用CompletableFuture可能有某些异步任务不执行,示例如下:

    private static void testNotExecute() {
   
   
        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
   
   
            System.out.println("<======当前线程:" + Thread.currentThread().getName() + "=====线程id: " + Thread.currentThread().getId());
            System.out.println("supplyAsync 是否为守护线程 " + Thread.currentThread().isDaemon());
            int i = 10 / 2;
            System.out.println("运行结果:" + i);
            try {
   
   
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
   
   
                e.printStackTrace();
            }
            // 下面不打印
            System.out.println("return之前的打印");
            return i;
        });
    }

造成这个原因是因为Daemon。因为completableFuture这套使用异步任务的操作都是创建成了守护线程。那么我们没有调用get方法不阻塞这个主线程的时候。主线程执行完毕。所有线程执行完毕就会导致一个问题,就是守护线程退出。那么我们没有执行的代码就是因为主线程不再跑任务而关闭导致的。

3.CompletableFuture的实现原理

CompletableFuture源码可知,CompletableFuture中包含两个字段:resultstack。result用于存储当前CF的结果,stack(Completion)表示当前CF完成后需要触发的依赖动作(Dependency Actions),去触发依赖它的CF的计算,依赖动作可以有多个(表示有多个依赖它的CF),以栈(Treiber stack)的形式存储,stack表示栈顶元素。具体原理实现细节,请参考美团技术团队的CompletableFuture原理与实践

目录
相关文章
|
13天前
|
Java
探索Java新境界!异步+事件驱动,打造响应式编程热潮,未来已来!
【8月更文挑战第30天】在现代软件开发中,系统响应性和可扩展性至关重要。Java作为主流编程语言,提供了多种机制如Future、CompletableFuture及事件驱动编程,有效提升应用性能。本文探讨Java异步编程模型与事件驱动编程,并介绍响应式模式,助您构建高效、灵活的应用程序。
31 3
|
24天前
|
安全 前端开发 Java
随着企业应用复杂度提升,Java Spring框架以其强大与灵活特性简化开发流程,成为构建高效、可维护应用的理想选择
随着企业应用复杂度提升,Java Spring框架以其强大与灵活特性简化开发流程,成为构建高效、可维护应用的理想选择。依赖注入使对象管理交由Spring容器处理,实现低耦合高内聚;AOP则分离横切关注点如事务管理,增强代码模块化。Spring还提供MVC、Data、Security等模块满足多样需求,并通过Spring Boot简化配置与部署,加速微服务架构构建。掌握这些核心概念与工具,开发者能更从容应对挑战,打造卓越应用。
32 1
|
14天前
|
安全 Java API
告别繁琐编码,拥抱Java 8新特性:Stream API与Optional类助你高效编程,成就卓越开发者!
【8月更文挑战第29天】Java 8为开发者引入了多项新特性,其中Stream API和Optional类尤其值得关注。Stream API对集合操作进行了高级抽象,支持声明式的数据处理,避免了显式循环代码的编写;而Optional类则作为非空值的容器,有效减少了空指针异常的风险。通过几个实战示例,我们展示了如何利用Stream API进行过滤与转换操作,以及如何借助Optional类安全地处理可能为null的数据,从而使代码更加简洁和健壮。
44 0
|
3天前
|
安全 Java API
Java 18 概述:新特性一览
Java 18 作为 Java 平台的最新版本,引入了多项令人振奋的新特性和改进,包括模式匹配、记录类型、流库改进、外部函数与内存 API 以及并发处理增强。这些新功能不仅提升了开发者的生产力,还显著增强了 Java 的性能和安全性。本文将详细介绍 Java 18 的主要新特性,并通过代码示例帮助读者更好地理解和应用这些功能。
|
13天前
|
Java API
Java 8新特性:Lambda表达式与Stream API的深度解析
【7月更文挑战第61天】本文将深入探讨Java 8中的两个重要特性:Lambda表达式和Stream API。我们将首先介绍Lambda表达式的基本概念和语法,然后详细解析Stream API的使用和优势。最后,我们将通过实例代码演示如何结合使用Lambda表达式和Stream API,以提高Java编程的效率和可读性。
|
15天前
|
Java 开发者
Java 8新特性之Lambda表达式与函数式接口
【7月更文挑战第59天】本文将介绍Java 8中的一个重要新特性——Lambda表达式,以及与之密切相关的函数式接口。通过对比传统的匿名内部类,我们将探讨Lambda表达式的语法、使用方法和优势。同时,我们还将了解函数式接口的定义和用途,以及如何将Lambda表达式应用于函数式编程。
|
23天前
|
前端开发 JavaScript Java
Ajax进行异步交互:提升Java Web应用的用户体验
Ajax 技术允许在不重载整个页面的情况下与服务器异步交换数据,通过局部更新页面内容,极大提升了 Java Web 应用的响应速度和用户体验。本文介绍 Ajax 的基本原理及其实现方式,包括使用 XMLHttpRequest 对象发送请求、处理响应数据,并在 Java Web 应用中集成 Ajax。此外,还探讨了 Ajax 如何通过减少页面刷新、实时数据更新等功能改善用户体验。
39 3
|
24天前
|
分布式计算 Java API
Java 8带来了流处理与函数式编程等新特性,极大提升了开发效率
Java 8带来了流处理与函数式编程等新特性,极大提升了开发效率。流处理采用声明式编程模型,通过filter、map等操作简化数据集处理,提高代码可读性。Lambda表达式支持轻量级函数定义,配合Predicate、Function等接口,使函数式编程无缝融入Java。此外,Optional类及新日期时间API等增强功能,让开发者能更优雅地处理潜在错误,编写出更健壮的应用程序。
24 1
|
11天前
|
Java 数据库连接 数据库
AI 时代风起云涌,Hibernate 实体映射引领数据库高效之路,最佳实践与陷阱全解析!
【8月更文挑战第31天】Hibernate 是一款强大的 Java 持久化框架,可将 Java 对象映射到关系数据库表中。本文通过代码示例详细介绍了 Hibernate 实体映射的最佳实践,包括合理使用关联映射(如 `@OneToMany` 和 `@ManyToOne`)以及正确处理继承关系(如单表继承)。此外,还探讨了常见陷阱,例如循环依赖可能导致的无限递归问题,并提供了使用 `@JsonIgnore` 等注解来避免此类问题的方法。通过遵循这些最佳实践,可以显著提升开发效率和数据库操作性能。
29 0
|
27天前
|
前端开发 JavaScript Java
java实现异步回调返回给前端
综上,Java中实现异步回调并将结果返回给前端是一项涉及后端异步处理和前端交互的综合任务。在实际项目中,开发人员需要根据应用需求和性能预期选择合适的异步模型与工具,并进行适当的配置和优化。
62 3