异步&线程池 CompletableFuture 异步编排 【下篇】

简介: 这篇文章深入探讨了Java中的`CompletableFuture`类,解释了如何创建异步操作、使用计算完成时的回调方法、异常处理、串行化方法、任务组合以及多任务组合的使用方式,并通过代码示例展示了各种场景下的应用。

1、创建异步对象

CompletableFuture 提供了四个静态方法来创建一个异步操作。

在这里插入图片描述

提示

  • 1、runXxxx 都是没有返回结果的,supplyXxx 都是可以获取返回结果的
  • 2、可以传入自定义的线程池,否则就用默认的线程池

1.1 不存在返回结果

public class ThreadTestDemo {
    public static ExecutorService executor = Executors.newFixedThreadPool(10);

    public static void main(String[] args) {

        System.out.println("main.....start......");

        CompletableFuture.runAsync(()->{
            System.out.println("当前线程是:"+Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("运算结果是:"+i);
        },executor);

        System.out.println("main.....end......");
    }
  }

测试结果

在这里插入图片描述


1.2 存在返回结果

public class ThreadTestDemo {
    public static ExecutorService executor = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("main.....start......");

        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程是:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("运算结果是:" + i);
            return i;

        }, executor);
        Integer integer = future.get();

        System.out.println("main.....end......"+integer);
    }

测试结果

在这里插入图片描述

2、计算完成时回调方法

whenComplete 可以处理正常和异常的计算结果,exceptionally 处理异常情况。
whenComplete 和 whenCompleteAsync 的区别:

  • whenComplete:是执行当前任务的线程执行继续执行 whenComplete 的任务。
  • whenCompleteAsync:是执行把 whenCompleteAsync 这个任务继续提交给线程池来进行执行。

方法不以 Async 结尾,意味着 Action 使用相同的线程执行,而 Async 可能会使用其他线程执行(如果是使用相同的线程池,也可能会被同一个线程选中执

在这里插入图片描述

2.1 whenComplete

public class ThreadTestDemo {
    public static ExecutorService executor = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        System.out.println("main.....start......");

        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程是:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("运算结果是:" + i);
            return i;

        }, executor).whenComplete((res,exception)->{
            System.out.println("异步任务完成了...结果是..."+res+";异常是:"+exception);
        });

        System.out.println("main.....end......");
    }
 }

在这里插入图片描述

如果出现了异常情况
在这里插入图片描述

2.2 出现异常、处理返回值

public class ThreadTestDemo {
    public static ExecutorService executor = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        System.out.println("main.....start......");

        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程是:" + Thread.currentThread().getId());
            int i = 10 / 0;
            System.out.println("运算结果是:" + i);
            return i;

        }, executor).whenComplete((res,exception)->{
            //可以得到异常信息,但是不能返回修改数据
            System.out.println("异步任务完成了...结果是..."+res+";异常是:"+exception);
        }).exceptionally(throwable -> {
            //可以感知异常,同时返回默认值
            return 666;

        });
        Integer integer = future.get();

        System.out.println("main.....end......"+integer);
    }
}

结果
在这里插入图片描述

3、handle 方法

在这里插入图片描述
和 complete 一样,可对结果做最后的处理(可处理异常),可改变返回值。

3.1 出现异常情况

public class ThreadTestDemo {
    public static ExecutorService executor = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        System.out.println("main.....start......");

        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程是:" + Thread.currentThread().getId());
            int i = 10 / 0;
            System.out.println("运算结果是:" + i);
            return i;

        }, executor).handle((res,thr)->{
            if(res != null){
                return res*2;
            }

            if(thr != null){
                return 666;
            }

            return 0;
        });
        Integer integer = future.get();

        System.out.println("main.....end......"+integer);
    }
 }

结果
在这里插入图片描述

3.2 正常情况

public class ThreadTestDemo {
    public static ExecutorService executor = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        System.out.println("main.....start......");

        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程是:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("运算结果是:" + i);
            return i;

        }, executor).handle((res,thr)->{
            if(res != null){
                return res*2;
            }

            if(thr != null){
                return 666;
            }

            return 0;
        });
        Integer integer = future.get();

        System.out.println("main.....end......"+integer);
    }
 }

结果
在这里插入图片描述

4、线程串行化方法

在这里插入图片描述

thenApply 方法:当一个线程依赖另一个线程时,获取上一个任务返回的结果,并返回当前任务的返回值。

thenAccept 方法:消费处理结果。接收任务的处理结果,并消费处理,无返回结果。

thenRun 方法:只要上面的任务执行完成,就开始执行 thenRun,只是处理完任务后,执行thenRun 的后续操作

带有 Async 默认是异步执行的。同之前。
以上都要前置任务成功完成。

Function<? super T,? extends U>
    T:上一个任务返回结果的类型
    U:当前任务的返回值类型

4.1 thenRun

不能获取到上一步的执行结果

public class ThreadTestDemo {
    public static ExecutorService executor = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        System.out.println("main.....start......");

        CompletableFuture<Void> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程是:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("运算结果是:" + i);
            return i;

        }, executor).thenRunAsync(()->{
            System.out.println("任务二启动了");
        },executor);

        System.out.println("main.....end......");
    }
   }

结果
在这里插入图片描述

4.2 thenAccept

能接收上一步的返回值,但是无返回值

public class ThreadTestDemo {
    public static ExecutorService executor = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        System.out.println("main.....start......");

        CompletableFuture<Void> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程是:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("运算结果是:" + i);
            return i;

        }, executor).thenAcceptAsync(res->{
            System.out.println("任务二启动了,获取上一步的结果:"+res);
        },executor);

        System.out.println("main.....end......");
    }
}

结果
在这里插入图片描述

4.3 thenApply

获取上一步的返回结果,同时返回结果

public class ThreadTestDemo {
    public static ExecutorService executor = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        System.out.println("main.....start......");

        CompletableFuture<Object> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程是:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("运算结果是:" + i);
            return i;

        }, executor).thenApplyAsync(res->{
            System.out.println("任务二启动了,上一步的结果是:"+res);
            return "Hello" + res;
        },executor);

        System.out.println("main.....end......"+future1.get());
    }
}

结果
在这里插入图片描述

5、两任务组合 - 都要完成

在这里插入图片描述
在这里插入图片描述

两个任务必须都完成,触发该任务。

thenCombine:组合两个 future,获取两个 future 的返回结果,并返回当前任务的返回值

thenAcceptBoth:组合两个 future,获取两个 future 任务的返回结果,然后处理任务,没有返回值。

runAfterBoth:组合两个 future,不需要获取 future 的结果,只需两个 future 处理完任务后,处理该任务。

5.1 runAfterBoth

组合两个future,不接收值,也不返回值

public class ThreadTestDemo {
    public static ExecutorService executor = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        System.out.println("main.....start......");

        CompletableFuture<Object> future01 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务1线程开始..." + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("任务1线程结束..." + i);
            return i;

        }, executor);

        CompletableFuture<Object> future02 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务2线程开始..." + Thread.currentThread().getId());
            System.out.println("任务2线程结束..." );
            return "hello";

        }, executor);

        future01.runAfterBothAsync(future02,()->{
            System.out.println("任务三开始");
        },executor);

        System.out.println("main.....end......"+future01.get());
    }
}

结果

在这里插入图片描述

5.2 thenAcceptBoth

接收前两个的值,不返回值

public class ThreadTestDemo {
    public static ExecutorService executor = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        System.out.println("main.....start......");

        CompletableFuture<Object> future01 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务1线程开始..." + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("任务1线程结束..." + i);
            return i;

        }, executor);

        CompletableFuture<Object> future02 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务2线程开始..." + Thread.currentThread().getId());
            System.out.println("任务2线程结束..." );
            return "hello";

        }, executor);

        future01.thenAcceptBothAsync(future02,(f1,f2)->{
            System.out.println("任务三开始...之前的结果是:"+f1 +"......"+f2);
        },executor);

        System.out.println("main.....end......"+future01.get());
    }

结果
在这里插入图片描述

5.3 thenCombine

接收值同时返回值

public class ThreadTestDemo {
    public static ExecutorService executor = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        System.out.println("main.....start......");

        CompletableFuture<Object> future01 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务1线程开始..." + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("任务1线程结束..." + i);
            return i;

        }, executor);

        CompletableFuture<Object> future02 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务2线程开始..." + Thread.currentThread().getId());
            System.out.println("任务2线程结束..." );
            return "hello";

        }, executor);

        CompletableFuture<Object> future = future01.thenCombineAsync(future02, (f1, f2) -> {
            return "任务三开始:"+f1+"..."+f2+"---->";
        }, executor);

        System.out.println("main.....end......"+future.get());
    }

结果
在这里插入图片描述

6、两任务组合 - 一个完成

在这里插入图片描述
在这里插入图片描述
当两个任务中,任意一个 future 任务完成的时候,执行任务。

applyToEither:两个任务有一个执行完成,获取它的返回值,处理任务并有新的返回值。

acceptEither:两个任务有一个执行完成,获取它的返回值,处理任务,没有新的返回值。

runAfterEither:两个任务有一个执行完成,不需要获取 future 的结果,处理任务,也没有返回值。

6.1 runAfterEither

两个有一个执行结束,就执行。不接收返回值,本身没有返回值。休息几秒钟,看效果

public class ThreadTestDemo {
    public static ExecutorService executor = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        System.out.println("main.....start......");

        CompletableFuture<Object> future01 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务1线程开始..." + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("任务1线程结束..." + i);
            return i;

        }, executor);

        CompletableFuture<Object> future02 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务2线程开始..." + Thread.currentThread().getId());
            try {
                Thread.sleep(6000);
                System.out.println("任务2线程结束..." );
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello";

        }, executor);

        future01.runAfterEitherAsync(future02, () -> {
            System.out.println("任务三开始");
        }, executor);

        System.out.println("main.....end......");
    }
   }

效果
在这里插入图片描述

6.2 acceptEither

其中一个执行完,就执行,得到值,本身无返回值。两个future的返回值类型要相同。

public class ThreadTestDemo {
    public static ExecutorService executor = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        System.out.println("main.....start......");

        CompletableFuture<Object> future01 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务1线程开始..." + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("任务1线程结束..." + i);
            return i;

        }, executor);

        CompletableFuture<Object> future02 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务2线程开始..." + Thread.currentThread().getId());
            try {
                Thread.sleep(6000);
                System.out.println("任务2线程结束..." );
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello";

        }, executor);

        future01.acceptEitherAsync(future02,(res)->{
            System.out.println("任务三开始...之前的结果:"+res);
        },executor);

        System.out.println("main.....end......");
    }

在这里插入图片描述

6.3 applyToEither

接收返回值,同时返回值

public class ThreadTestDemo {
    public static ExecutorService executor = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        System.out.println("main.....start......");

        CompletableFuture<Object> future01 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务1线程开始..." + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("任务1线程结束..." + i);
            return i;

        }, executor);

        CompletableFuture<Object> future02 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务2线程开始..." + Thread.currentThread().getId());
            try {
                Thread.sleep(6000);
                System.out.println("任务2线程结束..." );
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello";

        }, executor);

        CompletableFuture<String> future = future01.applyToEitherAsync(future02, res -> {
            System.out.println("任务三开始....之前的结果:" + res);
            return res.toString() + " -->哈喽";
        }, executor);

        System.out.println("main.....end......"+future.get());
    }

结果
在这里插入图片描述

7、多任务组合

在这里插入图片描述

allOf:等待所有任务完成
anyOf:只要有一个任务完成

7.1 allOf

public class ThreadTestDemo {
    public static ExecutorService executor = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        System.out.println("main.....start......");

        CompletableFuture<String> futureImage = CompletableFuture.supplyAsync(() -> {
            System.out.println("商品图片查询");
            return "水杯.png";
        }, executor);

        CompletableFuture<String> futureAttr = CompletableFuture.supplyAsync(() -> {
            System.out.println("商品属性查询");
            return "黑色+128G";
        }, executor);

        CompletableFuture<String> futureDesc = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
                System.out.println("商品介绍查询");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "华为手机好用呢";
        }, executor);

        CompletableFuture<Void> allOf = CompletableFuture.allOf(futureImage, futureAttr, futureDesc);
        allOf.get();//阻塞等待

        System.out.println("main.....end......");
    }
}

在这里插入图片描述

如果没有allOf.get(); 查看结果

在这里插入图片描述

7.2 anyOf

任意一个执行成功,就返回

public class ThreadTestDemo {
    public static ExecutorService executor = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        System.out.println("main.....start......");

        CompletableFuture<String> futureImage = CompletableFuture.supplyAsync(() -> {
            System.out.println("商品图片查询");
            return "水杯.png";
        }, executor);

        CompletableFuture<String> futureAttr = CompletableFuture.supplyAsync(() -> {
            System.out.println("商品属性查询");
            return "黑色+128G";
        }, executor);

        CompletableFuture<String> futureDesc = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
                System.out.println("商品介绍查询");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "华为手机好用呢";
        }, executor);

        CompletableFuture<Object> anyOf = CompletableFuture.anyOf(futureImage, futureAttr, futureDesc);

        System.out.println("main.....end......查看哪个执行成功"+anyOf.get());
    }

结果
在这里插入图片描述

相关文章
|
6月前
|
消息中间件 前端开发 Java
美团面试:如何实现线程任务编排?
线程任务编排指的是对多个线程任务按照一定的逻辑顺序或条件进行组织和安排,以实现协同工作、顺序执行或并行执行的一种机制。 ## 1.线程任务编排 VS 线程通讯 有同学可能会想:那线程的任务编排是不是问的就是线程间通讯啊? 线程间通讯我知道了,它的实现方式总共有以下几种方式: 1. Object 类下的 wait()、notify() 和 notifyAll() 方法; 2. Condition 类下的 await()、signal() 和 signalAll() 方法; 3. LockSupport 类下的 park() 和 unpark() 方法。 但是,**线程通讯和线程的任务编排是
64 1
|
1月前
|
编解码 数据安全/隐私保护 计算机视觉
Opencv学习笔记(十):同步和异步(多线程)操作打开海康摄像头
如何使用OpenCV进行同步和异步操作来打开海康摄像头,并提供了相关的代码示例。
70 1
Opencv学习笔记(十):同步和异步(多线程)操作打开海康摄像头
|
29天前
|
安全 调度 C#
STA模型、同步上下文和多线程、异步调度
【10月更文挑战第19天】本文介绍了 STA 模型、同步上下文和多线程、异步调度的概念及其优缺点。STA 模型适用于单线程环境,确保资源访问的顺序性;同步上下文和多线程提高了程序的并发性和响应性,但增加了复杂性;异步调度提升了程序的响应性和资源利用率,但也带来了编程复杂性和错误处理的挑战。选择合适的模型需根据具体应用场景和需求进行权衡。
|
1月前
|
网络协议 安全 Java
难懂,误点!将多线程技术应用于Python的异步事件循环
难懂,误点!将多线程技术应用于Python的异步事件循环
56 0
|
2月前
|
设计模式 缓存 Java
谷粒商城笔记+踩坑(14)——异步和线程池
初始化线程的4种方式、线程池详解、异步编排 CompletableFuture
谷粒商城笔记+踩坑(14)——异步和线程池
|
3月前
|
缓存 Java
异步&线程池 线程池的七大参数 初始化线程的4种方式 【上篇】
这篇文章详细介绍了Java中线程的四种初始化方式,包括继承Thread类、实现Runnable接口、实现Callable接口与FutureTask结合使用,以及使用线程池。同时,还深入探讨了线程池的七大参数及其作用,解释了线程池的运行流程,并列举了四种常见的线程池类型。最后,阐述了在开发中使用线程池的原因,如降低资源消耗、提高响应速度和增强线程的可管理性。
异步&线程池 线程池的七大参数 初始化线程的4种方式 【上篇】
|
3月前
|
Java 数据库
异步&线程池 CompletableFuture 异步编排 实战应用 【终结篇】
这篇文章通过一个电商商品详情页的实战案例,展示了如何使用`CompletableFuture`进行异步编排,以解决在不同数据库表中查询商品信息的问题,并提供了详细的代码实现和遇到问题(如图片未显示)的解决方案。
异步&线程池 CompletableFuture 异步编排 实战应用 【终结篇】
|
3月前
|
数据采集 Python
多线程和异步
【8月更文挑战第12天】
39 3
|
4月前
|
Java Spring 容器
Spring boot 自定义ThreadPoolTaskExecutor 线程池并进行异步操作
Spring boot 自定义ThreadPoolTaskExecutor 线程池并进行异步操作
202 3
|
3月前
|
Dart API C语言
Dart ffi 使用问题之想在C/C++中创建异步线程来调用Dart方法,如何操作
Dart ffi 使用问题之想在C/C++中创建异步线程来调用Dart方法,如何操作