JUC并发编程之CompletableFuture详解

简介: JUC并发编程中的Future接口是Java 5中引入的一种异步编程机制,用于表示一个可能在未来完成的计算结果。它允许我们提交一个任务给线程池或其他执行器执行,并且可以通过Future对象获取任务执行的结果或者判断任务是否已经完成。

1.Future接口


1.1 Future介绍


JUC并发编程中的Future接口是Java 5中引入的一种异步编程机制,用于表示一个可能在未来完成的计算结果。它允许我们提交一个任务给线程池或其他执行器执行,并且可以通过Future对象获取任务执行的结果或者判断任务是否已经完成。


Future接口的基本方法如下:


isDone(): 判断任务是否已经完成。

get(): 获取任务的计算结果,如果任务还未完成,调用该方法将会阻塞,直到任务完成并返回结果。

get(long timeout, TimeUnit unit): 获取任务的计算结果,但最多等待指定时间,如果任务在指定时间内没有完成,则会抛出超时异常。

cancel(boolean mayInterruptIfRunning): 尝试取消任务的执行,如果任务已经开始执行,参数mayInterruptIfRunning决定是否中断任务。

isCancelled(): 判断任务是否已经被取消。


1.1.1 FutureTask


FutureTask是Java并发编程中的一个类,它实现了Future接口,并且是Runnable的实现类。FutureTask可以用来包装一个Callable或Runnable任务,使得我们可以在另一个线程中执行该任务,并且在未来的某个时间点获取任务的结果。FutureTask提供了一些方法来获取任务的执行结果,以及判断任务是否已经完成或取消。


如下图所示:


FutureTask实现了Future接口和Runnable接口,因此可以调用Future和Runnable的方法。


1.1.2 代码示例

import java.util.concurrent.*;
public class FutureExample {
    public static void main(String[] args) {
        // 创建一个线程池
        ExecutorService executor = Executors.newFixedThreadPool(1);
        // 提交一个异步任务
        Future<String> future = executor.submit(() -> {
            try {
                Thread.sleep(2000); // 模拟耗时任务
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "异步任务执行完成";
        });
        System.out.println("异步任务已经提交");
        // 等待任务完成,并获取结果
        try {
            String result = future.get(); // 这里会阻塞,直到任务完成并返回结果
            System.out.println(result);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        // 关闭线程池
        executor.shutdown();
    }
}


在上面的示例中,我们使用ExecutorService创建一个拥有一个线程的线程池,然后通过submit()方法提交一个简单的异步任务,该任务在执行过程中睡眠2秒钟,然后返回一个字符串。接着,我们使用Future的get()方法获取任务的执行结果,这里get()方法将会阻塞直到任务完成并返回结果。


需要注意的是,Future的get()方法在获取结果时会阻塞,这可能会导致程序在等待过程中无法执行其他任务。为了更好地处理异步任务,Java 8引入了CompletableFuture,它提供了更多灵活的方法来处理异步任务和任务组合,可以更好地解决Future在并发编程中的一些限制。


2. CompletableFuture


2.1 基本概念


CompletableFuture是Java并发工具包(Java Util Concurrent,JUC)中的一部分,自Java 8版本引入,用于简化异步编程和并发任务的处理。它是java.util.concurrent包中的一个类,提供了强大的功能来处理异步操作和任务之间的依赖关系。


CompletableFuture是一个可完成或可编程的Future。它可以被手动完成,也可以用于串联多个异步操作,并在操作完成时触发后续的操作。它提供了一系列方法来组合和转换异步任务,例如thenApply(), thenAccept(), thenCombine(), thenCompose(), thenRun()等,CompletableFuture提供了一种类似观察者模式的机制,可以让任务执行完成后通知监听的一方。


在Java8中,CompletableFuture提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合CompletableFuture的方法。

它可能代表一个明确完成的Future,也有可能代表一个完成阶段(CompletionStage ),它支持在计算完成以后触发一些函数或执行某些动作。它实现了Future和CompletionStage接口。


如下图:



2.2 代码示例


2.2.1 创建CompletableFuture


CompletableFuture.runAsync(): 创建一个没有返回值的CompletableFuture,用于执行异步任务。

ExecutorService threadPool = Executors.newFixedThreadPool(1);
        CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
            System.out.println(Thread.currentThread().getName());
        },threadPool);


CompletableFuture.supplyAsync(): 创建一个具有返回值的CompletableFuture,用于执行异步计算。

ExecutorService threadPool = Executors.newFixedThreadPool(1);
  CompletableFuture<String> stringCompletableFuture = 
  CompletableFuture.supplyAsync(() -> {
            return "hello";
        }, threadPool);
        System.out.println(stringCompletableFuture.get());
        threadPool.shutdown();


CompletableFuture是Future的功能加强版,减少阻塞和轮询,可以传入回调对象,当异步任务完成后或发生异常时,自动调用回调对象的回调方法。代码如下:

CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> {
            return "hello";
        }).whenComplete((v,e)->{ //v表示返回的结果,e为异常,没有异常的时候e为null
            if(e==null){
                System.out.println("异步线程完成任务返回结果为 "+v);
            }
        }).exceptionally(e->{
            e.printStackTrace();
            System.out.println("goods");
            return null;
        });


在这里我们未使用我们自定义的线程池,这里会自动使用默认的线程池。


注意:当我们在异步线程的逻辑处理中加上Thread.sleep(1000)代码后再次运行会发现如下面的结果:



因为主线程是用户线程处理速度太快,CompletableFuture默认使用的线程池会立刻关闭。所以推荐使用自定义线程池。


2.2.2 函数式接口(补充)

在这里用到不少函数式接口,故在这里进行简单介绍一下。


Java函数式接口是Java 8版本中引入的一个特性,它是一种只包含一个抽象方法的接口。函数式接口可以被认为是用来支持Lambda表达式的接口,Lambda表达式可以被转换为函数式接口的实例。


Java函数式接口有以下特点:


只包含一个抽象方法:函数式接口只能有一个未实现的抽象方法。可以有默认方法和静态方法,但只有一个抽象方法。


@FunctionalInterface注解:为了确保一个接口是函数式接口,可以使用@FunctionalInterface注解进行标记。如果接口不符合函数式接口的定义,编译器会给出错误提示。


如下图:



2.2.3 异步任务组合

thenApply(): 对前一个阶段的计算结果进行转换

CompletableFuture.supplyAsync(() -> "Hello")
    .thenAccept(s -> System.out.println(s + " World"));


thenRun(): 在上一个阶段的任务完成后执行指定的动作,没有返回值。

CompletableFuture<Void> hello = CompletableFuture.supplyAsync(() -> "Hello")
                .thenRun(() -> {
                    System.out.println("hello");
                });

 

thenCombine(): 组合两个独立的CompletableFuture的结果。

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> " World");
CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (s1, s2) -> s1 + s2);

handle():  因为thenApply方法是需要存在依赖关系,所以当前步骤错了,就不会继续往下一步执行,而handle可以

CompletableFuture.supplyAsync(()->{
            System.out.println("执行第一步");
            Integer i=1/0;
            return 1;
        }).handle((v,e)->{
            System.out.println("执行第二步");
            return v+2;
        }).handle((v,e)->{
            System.out.println("执行第三步");
            return v+3;
        }).whenComplete((v,e)->{
            if (e == null) {
                System.out.println("最后结果:"+v);
            }
        }).exceptionally(e->{
            e.printStackTrace();
            return null;
        });
    }

 


applyToEither(): 是CompletableFuture类中的一个方法,用于处理两个CompletableFuture的结果,只要其中任意一个完成就可以执行指定的转换函数。applyToEither方法在异步编程中常用于多个任务竞争的场景,我们希望只要其中一个任务先完成就可以对其结果进行处理,而不需要等待其他任务的完成。


CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Future 1";
        });
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Future 2";
        });
        CompletableFuture<String> resultFuture = future1.applyToEither(future2, s -> s + " wins!");
        resultFuture.thenAccept(System.out::println); // 输出较快的任务的结果


总结(摘录网图)




注意:


1. 没有传入自定义线程池,都用默认线程池ForkJoinPool;

2. 如果你执行第一个任务的时候,传入了一个自定义线程池:

调用thenRun方法执行第二个任务时,则第二个任务和第一个任务是共用同一个线程池。

调用thenRunAsync执行第二个任务时,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是ForkJoinPool线程池l


备注

有可能处理太快,系统优化切换原则,直接使用main线程处理


其它如: thenAccept和thenAcceptAsync,thenApply和thenApplyAsync等,它们之间的区别也是同理。


 


相关文章
|
7月前
|
安全 Java 编译器
高并发编程之什么是 JUC
高并发编程之什么是 JUC
63 1
|
7月前
|
存储 Java
JUC并发编程之深入理解ThreadLocal
ThreadLocal是Java标准库提供的一个工具类,位于java.lang包下。它允许你创建一个线程局部变量,每个线程都可以独立地访问自己的变量副本,互不干扰。这在某些场景下非常有用,比如在多线程环境下,每个线程需要维护自己的状态信息,但又不想通过方法参数传递的方式来实现。
|
监控 Java API
并发编程 - CompletableFuture
并发编程 - CompletableFuture
81 0
|
资源调度
JUC并发编程之同步器(Semaphore、CountDownLatch、CyclicBarrier、Exchanger、CompletableFuture)附带相关面试题
1.Semaphore(资源调度) 2.CountDownLatch(子线程优先) 3.CyclicBarrier(栅栏) 4.Exchanger(公共交换区) 5.CompletableFuture(异步编程)
184 0
|
6月前
|
安全 算法 Java
|
安全 Java 调度
JUC并发编程(上)
JUC并发编程(上)
75 0
|
并行计算 Java 应用服务中间件
JUC并发编程超详细详解篇(一)
JUC并发编程超详细详解篇
1673 1
JUC并发编程超详细详解篇(一)
|
存储 缓存 监控
JUC并发编程(下)
JUC并发编程(下)
42 0
|
Java 调度
并发编程——Future & CompletableFuture
Java创建线程的方式,一般常用的是Thread,Runnable。如果需要当前处理的任务有返回结果的话,需要使用Callable。Callable运行需要配合Future。
63 0
|
并行计算 Java
JUC并发编程之CompletableFuture
future是java5新加的一个接口,他提供了一种异步并行计算的功能 接口定义了操作异步任务执行的一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务是否执行完毕 目的:异步多线程执行且有返回结果,特点:多线程/有返回/异步任务 补充:Runnable实现的是run方法,没有返回值,没有异常,Callable实现的是call方法,有返回值,需要处理异常
92 0