CompletableFuture使用详解

简介: CompletableFuture是jdk8的新特性。CompletableFuture实现了CompletionStage接口和Future接口

CompletableFuture是jdk8的新特性。CompletableFuture实现了CompletionStage接口和Future接口,前者是对后者的一个扩展,增加了异步会点、流式处理、多个Future组合处理的能力,使Java在处理多任务的协同工作时更加顺畅便利。

一、创建异步任务

  1. supplyAsync

supplyAsync是创建带有返回值的异步任务。它有如下两个方法,一个是使用默认线程池(ForkJoinPool.commonPool())的方法,一个是带有自定义线程池的重载方法

// 带返回值异步请求,默认线程池
public static CompletableFuture supplyAsync(Supplier supplier)

// 带返回值的异步请求,可以自定义线程池
public static CompletableFuture supplyAsync(Supplier supplier, Executor executor)

测试代码:

public static void main(String[] args) throws ExecutionException, InterruptedException {

    CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
        System.out.println("do something....");
        return "result";
    });

    //等待任务执行完成
    System.out.println("结果->" + cf.get());

}

public static void main(String[] args) throws ExecutionException, InterruptedException {

    // 自定义线程池
    ExecutorService executorService = Executors.newSingleThreadExecutor();
    CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
        System.out.println("do something....");
        return "result";
    }, executorService);

    //等待子任务执行完成
    System.out.println("结果->" + cf.get());

}
测试结果:

image.png

  1. runAsync

runAsync是创建没有返回值的异步任务。它有如下两个方法,一个是使用默认线程池(ForkJoinPool.commonPool())的方法,一个是带有自定义线程池的重载方法

// 不带返回值的异步请求,默认线程池
public static CompletableFuture runAsync(Runnable runnable)

// 不带返回值的异步请求,可以自定义线程池
public static CompletableFuture runAsync(Runnable runnable, Executor executor)
测试代码:

public static void main(String[] args) throws ExecutionException, InterruptedException {

    CompletableFuture<Void> cf = CompletableFuture.runAsync(() -> {
        System.out.println("do something....");
    });

    //等待任务执行完成
    System.out.println("结果->" + cf.get());

}

public static void main(String[] args) throws ExecutionException, InterruptedException {

    // 自定义线程池
    ExecutorService executorService = Executors.newSingleThreadExecutor();
    CompletableFuture<Void> cf = CompletableFuture.runAsync(() -> {
        System.out.println("do something....");
    }, executorService);

    //等待任务执行完成
    System.out.println("结果->" + cf.get());

}
测试结果:

image.png

3.获取任务结果的方法
// 如果完成则返回结果,否则就抛出具体的异常
public T get() throws InterruptedException, ExecutionException

// 最大时间等待返回结果,否则就抛出具体异常
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException

// 完成时返回结果值,否则抛出unchecked异常。为了更好地符合通用函数形式的使用,如果完成此 CompletableFuture所涉及的计算引发异常,则此方法将引发unchecked异常并将底层异常作为其原因
public T join()

// 如果完成则返回结果值(或抛出任何遇到的异常),否则返回给定的 valueIfAbsent。
public T getNow(T valueIfAbsent)

// 如果任务没有完成,返回的值设置为给定值
public boolean complete(T value)

// 如果任务没有完成,就抛出给定异常
public boolean completeExceptionally(Throwable ex)

二、异步回调处理
1.thenApply和thenApplyAsync
thenApply 表示某个任务执行完成后执行的动作,即回调方法,会将该任务的执行结果即方法返回值作为入参传递到回调方法中,带有返回值。

测试代码:

public static void main(String[] args) throws ExecutionException, InterruptedException {

    CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
        System.out.println(Thread.currentThread() + " cf1 do something....");
        return 1;
    });

    CompletableFuture<Integer> cf2 = cf1.thenApplyAsync((result) -> {
        System.out.println(Thread.currentThread() + " cf2 do something....");
        result += 2;
        return result;
    });
    //等待任务1执行完成
    System.out.println("cf1结果->" + cf1.get());
    //等待任务2执行完成
    System.out.println("cf2结果->" + cf2.get());

}

public static void main(String[] args) throws ExecutionException, InterruptedException {

    CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
        System.out.println(Thread.currentThread() + " cf1 do something....");
        return 1;
    });

    CompletableFuture<Integer> cf2 = cf1.thenApply((result) -> {
        System.out.println(Thread.currentThread() + " cf2 do something....");
        result += 2;
        return result;
    });
    //等待任务1执行完成
    System.out.println("cf1结果->" + cf1.get());
    //等待任务2执行完成
    System.out.println("cf2结果->" + cf2.get());

}
测试结果:

image.png
image.png

从上面代码和测试结果我们发现thenApply和thenApplyAsync区别在于,使用thenApply方法时子任务与父任务使用的是同一个线程,而thenApplyAsync在子任务中是另起一个线程执行任务,并且thenApplyAsync可以自定义线程池,默认的使用ForkJoinPool.commonPool()线程池。

2.thenAccept和thenAcceptAsync
thenAccep表示某个任务执行完成后执行的动作,即回调方法,会将该任务的执行结果即方法返回值作为入参传递到回调方法中,无返回值。

测试代码

public static void main(String[] args) throws ExecutionException, InterruptedException {

    CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
        System.out.println(Thread.currentThread() + " cf1 do something....");
        return 1;
    });

    CompletableFuture<Void> cf2 = cf1.thenAccept((result) -> {
        System.out.println(Thread.currentThread() + " cf2 do something....");
    });

    //等待任务1执行完成
    System.out.println("cf1结果->" + cf1.get());
    //等待任务2执行完成
    System.out.println("cf2结果->" + cf2.get());

}

public static void main(String[] args) throws ExecutionException, InterruptedException {

    CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
        System.out.println(Thread.currentThread() + " cf1 do something....");
        return 1;
    });

    CompletableFuture<Void> cf2 = cf1.thenAcceptAsync((result) -> {
        System.out.println(Thread.currentThread() + " cf2 do something....");
    });

    //等待任务1执行完成
    System.out.println("cf1结果->" + cf1.get());
    //等待任务2执行完成
    System.out.println("cf2结果->" + cf2.get());

}
测试结果:
image.png

从上面代码和测试结果我们发现thenAccep和thenAccepAsync区别在于,使用thenAccep方法时子任务与父任务使用的是同一个线程,而thenAccepAsync在子任务中可能是另起一个线程执行任务,并且thenAccepAsync可以自定义线程池,默认的使用ForkJoinPool.commonPool()线程池。

2.thenRun和thenRunAsync
thenRun表示某个任务执行完成后执行的动作,即回调方法,无入参,无返回值。

测试代码:

public static void main(String[] args) throws ExecutionException, InterruptedException {

    CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
        System.out.println(Thread.currentThread() + " cf1 do something....");
        return 1;
    });

    CompletableFuture<Void> cf2 = cf1.thenRun(() -> {
        System.out.println(Thread.currentThread() + " cf2 do something....");
    });

    //等待任务1执行完成
    System.out.println("cf1结果->" + cf1.get());
    //等待任务2执行完成
    System.out.println("cf2结果->" + cf2.get());

}

public static void main(String[] args) throws ExecutionException, InterruptedException {

    CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
        System.out.println(Thread.currentThread() + " cf1 do something....");
        return 1;
    });

    CompletableFuture<Void> cf2 = cf1.thenRunAsync(() -> {
        System.out.println(Thread.currentThread() + " cf2 do something....");
    });

    //等待任务1执行完成
    System.out.println("cf1结果->" + cf1.get());
    //等待任务2执行完成
    System.out.println("cf2结果->" + cf2.get());

}
测试结果:

image.png

从上面代码和测试结果我们发现thenRun和thenRunAsync区别在于,使用thenRun方法时子任务与父任务使用的是同一个线程,而thenRunAsync在子任务中可能是另起一个线程执行任务,并且thenRunAsync可以自定义线程池,默认的使用ForkJoinPool.commonPool()线程池。

3.whenComplete和whenCompleteAsync
whenComplete是当某个任务执行完成后执行的回调方法,会将执行结果或者执行期间抛出的异常传递给回调方法,如果是正常执行则异常为null,回调方法对应的CompletableFuture的result和该任务一致,如果该任务正常执行,则get方法返回执行结果,如果是执行异常,则get方法抛出异常。

测试代码:

public static void main(String[] args) throws ExecutionException, InterruptedException {

    CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
        System.out.println(Thread.currentThread() + " cf1 do something....");
        int a = 1/0;
        return 1;
    });

    CompletableFuture<Integer> cf2 = cf1.whenComplete((result, e) -> {
        System.out.println("上个任务结果:" + result);
        System.out.println("上个任务抛出异常:" + e);
        System.out.println(Thread.currentThread() + " cf2 do something....");
    });

// //等待任务1执行完成
// System.out.println("cf1结果->" + cf1.get());
// //等待任务2执行完成

    System.out.println("cf2结果->" + cf2.get());
}

测试结果:

image.png

whenCompleteAsync和whenComplete区别也是whenCompleteAsync可能会另起一个线程执行任务,并且thenRunAsync可以自定义线程池,默认的使用ForkJoinPool.commonPool()线程池。

4.handle和handleAsync
跟whenComplete基本一致,区别在于handle的回调方法有返回值。

测试代码:

public static void main(String[] args) throws ExecutionException, InterruptedException {

    CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
        System.out.println(Thread.currentThread() + " cf1 do something....");
        // int a = 1/0;
        return 1;
    });

    CompletableFuture<Integer> cf2 = cf1.handle((result, e) -> {
        System.out.println(Thread.currentThread() + " cf2 do something....");
        System.out.println("上个任务结果:" + result);
        System.out.println("上个任务抛出异常:" + e);
        return result+2;
    });

    //等待任务2执行完成
    System.out.println("cf2结果->" + cf2.get());

}
测试结果 :
image.png

三、多任务组合处理
1.thenCombine、thenAcceptBoth 和runAfterBoth
这三个方法都是将两个CompletableFuture组合起来处理,只有两个任务都正常完成时,才进行下阶段任务。

区别:thenCombine会将两个任务的执行结果作为所提供函数的参数,且该方法有返回值;thenAcceptBoth同样将两个任务的执行结果作为方法入参,但是无返回值;runAfterBoth没有入参,也没有返回值。注意两个任务中只要有一个执行异常,则将该异常信息作为指定任务的执行结果。

测试代码:

public static void main(String[] args) throws ExecutionException, InterruptedException {

    CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
        System.out.println(Thread.currentThread() + " cf1 do something....");
        return 1;
    });

    CompletableFuture<Integer> cf2 = CompletableFuture.supplyAsync(() -> {
        System.out.println(Thread.currentThread() + " cf2 do something....");
        return 2;
    });

    CompletableFuture<Integer> cf3 = cf1.thenCombine(cf2, (a, b) -> {
        System.out.println(Thread.currentThread() + " cf3 do something....");
        return a + b;
    });

    System.out.println("cf3结果->" + cf3.get());

}

public static void main(String[] args) throws ExecutionException, InterruptedException {

    CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
        System.out.println(Thread.currentThread() + " cf1 do something....");
        return 1;
    });

    CompletableFuture<Integer> cf2 = CompletableFuture.supplyAsync(() -> {
        System.out.println(Thread.currentThread() + " cf2 do something....");
        return 2;
    });
    
    CompletableFuture<Void> cf3 = cf1.thenAcceptBoth(cf2, (a, b) -> {
        System.out.println(Thread.currentThread() + " cf3 do something....");
        System.out.println(a + b);
    });

    System.out.println("cf3结果->" + cf3.get());

}

public static void main(String[] args) throws ExecutionException, InterruptedException {

    CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
        System.out.println(Thread.currentThread() + " cf1 do something....");
        return 1;
    });

    CompletableFuture<Integer> cf2 = CompletableFuture.supplyAsync(() -> {
        System.out.println(Thread.currentThread() + " cf2 do something....");
        return 2;
    });

    CompletableFuture<Void> cf3 = cf1.runAfterBoth(cf2, () -> {
        System.out.println(Thread.currentThread() + " cf3 do something....");
    });

    System.out.println("cf3结果->" + cf3.get());

}
测试结果:

image.png

2.applyToEither、acceptEither和runAfterEither
这三个方法和上面一样也是将两个CompletableFuture组合起来处理,当有一个任务正常完成时,就会进行下阶段任务。

区别:applyToEither会将已经完成任务的执行结果作为所提供函数的参数,且该方法有返回值;acceptEither同样将已经完成任务的执行结果作为方法入参,但是无返回值;runAfterEither没有入参,也没有返回值。

测试代码:

public static void main(String[] args) throws ExecutionException, InterruptedException {

    CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
        try {
            System.out.println(Thread.currentThread() + " cf1 do something....");
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "cf1 任务完成";
    });

    CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
        try {
            System.out.println(Thread.currentThread() + " cf2 do something....");
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "cf2 任务完成";
    });

    CompletableFuture<String> cf3 = cf1.applyToEither(cf2, (result) -> {
        System.out.println("接收到" + result);
        System.out.println(Thread.currentThread() + " cf3 do something....");
        return "cf3 任务完成";
    });

    System.out.println("cf3结果->" + cf3.get());

}

public static void main(String[] args) throws ExecutionException, InterruptedException {

    CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
        try {
            System.out.println(Thread.currentThread() + " cf1 do something....");
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "cf1 任务完成";
    });

    CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
        try {
            System.out.println(Thread.currentThread() + " cf2 do something....");
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "cf2 任务完成";
    });

    CompletableFuture<Void> cf3 = cf1.acceptEither(cf2, (result) -> {
        System.out.println("接收到" + result);
        System.out.println(Thread.currentThread() + " cf3 do something....");
    });

    System.out.println("cf3结果->" + cf3.get());

}

public static void main(String[] args) throws ExecutionException, InterruptedException {

    CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
        try {
            System.out.println(Thread.currentThread() + " cf1 do something....");
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("cf1 任务完成");
        return "cf1 任务完成";
    });

    CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
        try {
            System.out.println(Thread.currentThread() + " cf2 do something....");
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("cf2 任务完成");
        return "cf2 任务完成";
    });

    CompletableFuture<Void> cf3 = cf1.runAfterEither(cf2, () -> {
        System.out.println(Thread.currentThread() + " cf3 do something....");
        System.out.println("cf3 任务完成");
    });

    System.out.println("cf3结果->" + cf3.get());

}
测试结果:

image.png

从上面可以看出cf1任务完成需要2秒,cf2任务完成需要5秒,使用applyToEither组合两个任务时,只要有其中一个任务完成时,就会执行cf3任务,显然cf1任务先完成了并且将自己任务的结果传值给了cf3任务,cf3任务中打印了接收到cf1任务完成,接着完成自己的任务,并返回cf3任务完成;acceptEither和runAfterEither类似,acceptEither会将cf1任务的结果作为cf3任务的入参,但cf3任务完成时并无返回值;runAfterEither不会将cf1任务的结果作为cf3任务的入参,它是没有任务入参,执行完自己的任务后也并无返回值。

3.allOf / anyOf
allOf:CompletableFuture是多个任务都执行完成后才会执行,只有有一个任务执行异常,则返回的CompletableFuture执行get方法时会抛出异常,如果都是正常执行,则get返回null。

anyOf :CompletableFuture是多个任务只要有一个任务执行完成,则返回的CompletableFuture执行get方法时会抛出异常,如果都是正常执行,则get返回执行完成任务的结果。

测试代码:

public static void main(String[] args) throws ExecutionException, InterruptedException {

    CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
        try {
            System.out.println(Thread.currentThread() + " cf1 do something....");
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("cf1 任务完成");
        return "cf1 任务完成";
    });

    CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
        try {
            System.out.println(Thread.currentThread() + " cf2 do something....");
            int a = 1/0;
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("cf2 任务完成");
        return "cf2 任务完成";
    });

    CompletableFuture<String> cf3 = CompletableFuture.supplyAsync(() -> {
        try {
            System.out.println(Thread.currentThread() + " cf2 do something....");
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("cf3 任务完成");
        return "cf3 任务完成";
    });

    CompletableFuture<Void> cfAll = CompletableFuture.allOf(cf1, cf2, cf3);
    System.out.println("cfAll结果->" + cfAll.get());

}

public static void main(String[] args) throws ExecutionException, InterruptedException {

    CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
        try {
            System.out.println(Thread.currentThread() + " cf1 do something....");
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("cf1 任务完成");
        return "cf1 任务完成";
    });

    CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
        try {
            System.out.println(Thread.currentThread() + " cf2 do something....");
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("cf2 任务完成");
        return "cf2 任务完成";
    });

    CompletableFuture<String> cf3 = CompletableFuture.supplyAsync(() -> {
        try {
            System.out.println(Thread.currentThread() + " cf2 do something....");
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("cf3 任务完成");
        return "cf3 任务完成";
    });

    CompletableFuture<Object> cfAll = CompletableFuture.anyOf(cf1, cf2, cf3);
    System.out.println("cfAll结果->" + cfAll.get());

}
测试结果:
image.png

相关文章
|
小程序 前端开发 JavaScript
微信小程序框架---详细教程
微信小程序框架---详细教程
630 0
|
2月前
|
缓存 安全 Java
如何在Java中实现多线程编程
Java多线程编程有三种主要方式:继承Thread类、实现Runnable接口、实现Callable接口(结合Future获取结果),推荐使用Runnable避免单继承限制。通过线程池(如ExecutorService)可高效管理线程,提升性能。多线程共享资源时需注意线程安全,使用synchronized或Lock机制保证数据一致性。适用于并发执行、异步计算等场景。
239 1
|
6月前
|
XML 缓存 Java
一文讲透程序编排的核心方式:从表达式语言到并行化实践
高德的poi数据来源多种多样,处理流程也多种多样,但因流程相对固定,因此使用了流程化配置简化开发,使用表达式语言保证灵活性。为了加深对平台的理解,并帮助大家对编排有一定的了解,本文会以影响范围的视角去总结当前编排的方案。
357 13
一文讲透程序编排的核心方式:从表达式语言到并行化实践
|
6月前
|
人工智能 API 定位技术
MCP全方位扫盲
MCP(Model Context Protocol)是由Anthropic提出的协议,旨在标准化大模型与外部数据源和工具的通信方式。其核心架构包括MCP Client(客户端)和MCP Server(服务端),通过标准化接口实现解耦,支持不同LLM无缝调用工具。相比传统方法,MCP简化了Prompt工程,减少定制代码,提升复用性。实际场景中,如天气查询或支付处理,MCP可智能调用对应工具,优化用户体验。MCP的核心价值在于标准化通信、统一工具描述及动态兼容性,成为大模型与外部服务的智能桥梁。
|
Web App开发 前端开发 Java
SpringBoot配置HTTPS及开发调试
在实际开发过程中,如果后端需要启用https访问,通常项目启动后配置nginx代理再配置https,前端调用时高版本的chrome还会因为证书未信任导致调用失败,通过摸索整理一套开发调试下的https方案,特此分享
388 0
SpringBoot配置HTTPS及开发调试
|
运维 监控 测试技术
如何确保微服务架构的高可用性?
如何确保微服务架构的高可用性?
489 57
|
消息中间件 存储 监控
RocketMQ消息重试机制解析!
RocketMQ消息重试机制解析!
1168 1
RocketMQ消息重试机制解析!
|
负载均衡 Java Nacos
SpringCloud基础2——Nacos配置、Feign、Gateway
nacos配置管理、Feign远程调用、Gateway服务网关
SpringCloud基础2——Nacos配置、Feign、Gateway
|
Kubernetes 负载均衡 网络协议
在K8S中,svc底层是如何实现的?
在K8S中,svc底层是如何实现的?
|
缓存 安全 应用服务中间件
nginx配置proxy_set_header
nginx配置proxy_set_header

热门文章

最新文章