Future & CompleteFuture 实践总结

简介: Future & CompleteFuture 实践总结

1 Future介绍


1.1 Future的主要功能


JDK5新增了Future接口,用于描述一个异步计算的结果。

Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果等操作。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。


Future类位于java.util.concurrent包下,它是一个接口:

public interface Future<V> { 
 /** 
  * 方法用来取消任务,如果取消任务成功则返回true,如果取消任务失败则返回false。 * 
  * @param mayInterruptIfRunning 表示是否允许取消正在执行却没有执行完毕的任务,如果设置true,则表示可以取消正在执行过程中的任务。 
  * @return 如果任务已经完成,则无论mayInterruptIfRunning为true还是false,此方法肯定返回false,即如果取消已经完成的任务会返回false; 
  * 如果任务正在执行,若mayInterruptIfRunning设置为true,则返回true,若mayInterruptIfRunning设置为false,则返回false; 
  * 如果任务还没有执行,则无论mayInterruptIfRunning为true还是false,肯定返回true。 
  */ 
  boolean cancel(boolean mayInterruptIfRunning); 
   /** 
    * 方法表示任务是否被取消成功 
    * @return 如果在任务正常完成前被取消成功,则返回 true 
    */ 
   boolean isCancelled(); 
   /** 
    * 方法表示任务是否已经完成 
    * @return 若任务完成,则返回true 
    */ 
   boolean isDone(); 
   /** 
    * 方法用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回 
    * @return 任务执行的结果值 
    * @throws InterruptedException 线程被中断异常
    * @throws ExecutionException 任务执行异常,如果任务被取消,还会抛出CancellationException
    */ 
   V get() throws InterruptedException, ExecutionException; 
   /** 
    * 用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回null(并不是抛出异常,需要注意)。 
    * @param timeout 超时时间 
    * @param unit 超时单位 
    * @return 
    * @throws InterruptedException 
    * @throws ExecutionException 
    * @throws TimeoutException 如果计算超时,将抛出TimeoutException(待确认)
    */ 
   V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; 
}


从上面方法的注释可以看出,Futrue提供了三种功能:


1)判断任务是否完成;


2)能够中断任务;


3)能够获取任务执行结果。(最为常用的)


1.2 Future的局限性


从本质上说,Future表示一个异步计算的结果。它提供了isDone()来检测计算是否已经完成,并且在计算结束后,可以通过get()方法来获取计算结果。在异步计算中,Future确实是个非常优秀的接口。但是,它的本身也确实存在着许多限制:


  • 并发执行多任务:Future只提供了get()方法来获取结果,并且是阻塞的。所以,除了等待你别无他法;当 for 循环批量获取 Future 的结果时容易 block,因此get 方法调用时应使用 timeout 限制。


  • 无法对多个任务进行链式调用:如果你希望在计算任务完成后执行特定动作,比如发邮件,但Future却没有提供这样的能力;


  • 无法组合多个任务:如果你运行了10个任务,并期望在它们全部执行结束后执行特定动作,那么在Future中这是无能为力的;


  • 没有异常处理:Future接口中没有关于异常处理的方法;


2 CompletableFuture介绍


虽然 Future 以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。如果遇到前面的task执行较慢时需要阻塞等待前面的task执行完后面task才能取得结果。


阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的 CPU 资源,而且也不能及时地得到计算结果。而CompletableFuture的主要功能就是一边生成任务,一边获取任务的返回值。让两件事分开执行,任务之间不会互相阻塞,可以实现先执行完的先取结果,不再依赖任务顺序了。


2.1 CompletableFuture原理


内部通过阻塞队列+FutureTask,实现了任务先完成可优先获取到,即结果按照完成先后顺序排序,内部有一个先进先出的阻塞队列,用于保存已经执行完成的Future,通过调用它的take方法或poll方法可以获取到一个已经执行完成的Future,进而通过调用Future接口实现类的get方法获取最终的结果。


2.2 应用场景


当需要批量提交异步任务的时候建议使用CompletableFuture。CompletableFuture将线程池Executor和阻塞队列BlockingQueue的功能融合在了一起,能够让批量异步任务的管理更简单。


CompletableFuture能够让异步任务的执行结果有序化。先执行完的先进入阻塞队列,利用这个特性,你可以轻松实现后续处理的有序性,避免无谓的等待。

线程池隔离。CompletionService支持自己创建线程池,这种隔离性能避免几个特别耗时的任务拖垮整个应用的风险。


2.3 CompletableFuture使用详解


简单的任务,用Future获取结果还好,但我们并行提交的多个异步任务,往往并不是独立的,很多时候业务逻辑处理存在串行[依赖]、并行、聚合的关系。如果要我们手动用 Fueture 实现,是非常麻烦的。


CompletableFuture是Future接口的扩展和增强。CompletableFuture实现了Future接口,并在此基础上进行了丰富地扩展,完美地弥补了Future上述的种种问题。

更为重要的是,CompletableFuture实现了对任务的编排能力。借助这项能力,我们可以轻松地组织不同任务的运行顺序、规则以及方式。从某种程度上说,这项能力是它的核心能力。而在以往,虽然通过CountDownLatch等工具类也可以实现任务的编排,但需要复杂的逻辑处理,不仅耗费精力且难以维护。


3 CompletableFuture应用梳理


网络异常,图片无法展示
|


按照使用功能分类


网络异常,图片无法展示
|


按应用场景分类


  • 很多方法上,可以指定线程池,而没有指定Executor的方法会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码。如果指定线程池,则使用指定的线程池运行。


  • 默认情况下 CompletableFuture 会使用公共的 ForkJoinPool 线程池,这个线程池默认创建的线程数是 CPU 的核数(也可以通过 JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 来设置 ForkJoinPool 线程池的线程数)。


  • 如果所有 CompletableFuture 共享一个线程池,那么一旦有任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能。所以,强烈建议要根据不同的业务类型创建不同的线程池,以避免互相干扰。


  • 等我们使用的时候,会注意到CompletableFuture的方法命名规则:
  • xxx():表示该方法将继续在已有的线程中执行;


  • xxxAsync():表示可能会使用其它的线程去执行(如果使用相同的线程池,也可能会被同一个线程选中执行)。


4 使用案例


4.1 基础使用案例


串行执行:


定义两个CompletableFuture,第一个CompletableFuture根据证券名称查询证券代码,第二个CompletableFuture根据证券代码查询证券价格,这两个CompletableFuture实现串行操作如下:

CompletableFuture.supplyAsync():创建一个包含返回值的异步任务;
thenApplyAsync():获取前一个线程的结果进行转换,有返回值;
thenAccept():获取前一个线程的结果进行消费,无返回值。

public class Demo {
    private static CountDownLatch countDownLatch = new CountDownLatch(1);
    public static void main(String[] args) throws Exception {
        // 第一个任务:创建一个包含返回值的CompletableFuture
        CompletableFuture<String> cfQuery = CompletableFuture.supplyAsync(() -> {
            return queryCode("中国石油");
        });
        // cfQuery成功后继续执行下一个任务:
        CompletableFuture<Double> cfFetch = cfQuery.thenApplyAsync((code) -> {
            return fetchPrice(code);
        });
        // cfFetch成功后打印结果:
        cfFetch.thenAccept((result) -> {
            System.out.println("price: " + result);
        });
        // 主线程不要结束,否则CompletableFuture默认使用的线程池会立刻关闭:
        countDownLatch.await();
    }
    public static void main2(String[] args) throws Exception {
        CompletableFuture.supplyAsync(() -> queryCode("中国石油"))
                .thenApplyAsync((code) -> fetchPrice(code))
                .thenAccept((result) -> System.out.println("price: " + result));
        countDownLatch.await();
    }
    static String queryCode(String name) {
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
        }
        String code = "601857";
        System.out.println("查询证券编码,name:" + name + ",code:" + code);
        return code;
    }
    static Double fetchPrice(String code) {
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
        }
        Double price = 5 + Math.random() * 20;
        System.out.println("根据证券编码查询价格,code:" + code + ";price:" + price);
        return price;
    }
}


并行执行:


除了串行执行外,多个CompletableFuture还可以并行执行。例如,我们考虑这样的场景:


同时从新浪和网易查询证券代码,只要任意一个返回结果,就进行下一步查询价格,查询价格也同时从新浪和网易查询,只要任意一个返回结果,就完成操作:

CompletableFuture.supplyAsync():创建一个包含返回值的异步任务;
CompletableFuture.anyOf(cf1,cf2,cf3).join():多个异步线程任一执行完即返回,有返回值Object;
thenAccept():获取前一个线程的结果进行消费,无返回值。

public class Demo2 {
    private static CountDownLatch COUNT_DOWN_LATCH = new CountDownLatch(1);
    public static void main(String[] args) throws Exception {
        // 两个CompletableFuture执行异步查询:
        CompletableFuture<String> cfQueryFromSina = CompletableFuture.supplyAsync(() -> {
            return queryCode("中国石油", "https://finance.sina.com.cn/code/");
        });
        CompletableFuture<String> cfQueryFrom163 = CompletableFuture.supplyAsync(() -> {
            return queryCode("中国石油", "https://money.163.com/code/");
        });
        // 用anyOf合并为一个新的CompletableFuture:
        CompletableFuture<Object> cfQuery = CompletableFuture.anyOf(cfQueryFromSina, cfQueryFrom163);
        // 两个CompletableFuture执行异步查询:
        CompletableFuture<Double> cfFetchFromSina = cfQuery.thenApplyAsync((code) -> {
            return fetchPrice((String) code, "https://finance.sina.com.cn/price/");
        });
        CompletableFuture<Double> cfFetchFrom163 = cfQuery.thenApplyAsync((code) -> {
            return fetchPrice((String) code, "https://money.163.com/price/");
        });
        // 用anyOf合并为一个新的CompletableFuture:
        CompletableFuture<Object> cfFetch = CompletableFuture.anyOf(cfFetchFromSina, cfFetchFrom163);
        // 最终结果:
        cfFetch.thenAccept((result) -> {
            System.out.println("price: " + result);
        });
        // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
        COUNT_DOWN_LATCH.await();
    }
    public static void main2(String[] args) throws Exception {
        // 两个CompletableFuture执行异步查询:
        CompletableFuture<String> cfQueryFromSina = CompletableFuture.supplyAsync(() -> queryCode("中国石油", "https://finance.sina.com.cn/code/"));
        CompletableFuture<String> cfQueryFrom163 = CompletableFuture.supplyAsync(() -> queryCode("中国石油", "https://money.163.com/code/"));
        // 用anyOf合并为一个新的CompletableFuture:
        CompletableFuture<Object> cfQuery = CompletableFuture.anyOf(cfQueryFromSina, cfQueryFrom163);
        // 两个CompletableFuture执行异步查询:
        CompletableFuture<Double> cfFetchFromSina = cfQuery.thenApplyAsync((code) -> fetchPrice((String) code, "https://finance.sina.com.cn/price/"));
        CompletableFuture<Double> cfFetchFrom163 = cfQuery.thenApplyAsync((code) -> fetchPrice((String) code, "https://money.163.com/price/"));
        // 用anyOf合并为一个新的CompletableFuture:
        CompletableFuture<Object> cfFetch = CompletableFuture.anyOf(cfFetchFromSina, cfFetchFrom163);
        // 最终结果:
        cfFetch.thenAccept((result) -> System.out.println("price: " + result));
        // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
        COUNT_DOWN_LATCH.await();
    }
    static String queryCode(String name, String url) {
        System.out.println(Thread.currentThread().getName() + " query code from " + url + "...");
        try {
            Thread.sleep((long) (Math.random() * 100));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        String code = "601857";
        System.out.println(Thread.currentThread().getName() + " 查询证券编码,name:" + name + ",code:" + code);
        return code;
    }
    static Double fetchPrice(String code, String url) {
        System.out.println(Thread.currentThread().getName() + " query price from " + url + "...");
        try {
            Thread.sleep((long) (Math.random() * 100));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        Double price = 5 + Math.random() * 20;
        System.out.println(Thread.currentThread().getName() + " 根据证券编码查询价格,code:" + code + ";price:" + price);
        return price;
    }
}


上述逻辑实现的异步查询规则实际上是:


网络异常,图片无法展示
|


image


4.2 实现最优的“烧水泡茶”程序


网络异常,图片无法展示
|

public class Demo3 {
    public static void main(String[] args) {
        //任务1:洗水壶 -> 烧开水
        CompletableFuture<String> f11 = CompletableFuture.supplyAsync(() -> {
            System.out.println("T1:洗水壶...开始");
            sleep(1000);
            return "T1:洗水壶...完成";
        });
        CompletableFuture<String> f12 = f11.thenApply((f11Result) -> {
            System.out.println(f11Result);
            System.out.println("T1:烧开水...开始");
            sleep(3000);
            return "T1:烧开水...完成";
        });
        //任务2:洗茶壶->洗茶杯->拿茶叶
        CompletableFuture<Void> f21 = CompletableFuture.runAsync(() -> {
            System.out.println("==============T2:洗茶壶...开始");
            sleep(1000);
            System.out.println("==============T2:洗茶壶...完成");
        });
        CompletableFuture<Void> f22 = f21.thenRun(() -> {
            System.out.println("==============T2:洗茶杯...开始");
            sleep(2000);
            System.out.println("==============T2:洗茶杯...完成");
        });
        CompletableFuture<String> f23 = f22.thenApply(result -> {
            System.out.println("==============T2:拿茶叶...开始");
            sleep(1000);
            System.out.println("==============T2:拿茶叶...完成");
            return "龙井";
        });
        //任务3:任务1和任务2完成后执行:泡茶
        CompletableFuture<String> f3 = f12.thenCombine(f23, (f1Result, f2Result) -> {
            System.out.println(f1Result);
            System.out.println("************T2:拿到茶叶:result" + f2Result);
            System.out.println("************T3:泡茶...,什么茶:" + f2Result);
            return "上茶:" + f2Result;
        });
        //等待任务3执行结果
        System.out.println(f3.join());
    }
    static void sleep(int t) {
        try {
            Thread.sleep(t);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}


相关文章
|
4月前
|
并行计算 Java API
|
5月前
|
Java
Future原理解析
介绍了Java多线程中Future类的原理
Future原理解析
|
6月前
|
并行计算 Java
Future、CompletableFuture概述
Future、CompletableFuture概述
120 0
|
8月前
|
Java
Java并发编程:理解并使用Future和Callable接口
【2月更文挑战第25天】 在Java中,多线程编程是一个重要的概念,它允许我们同时执行多个任务。然而,有时候我们需要等待一个或多个线程完成,然后才能继续执行其他任务。这就需要使用到Future和Callable接口。本文将深入探讨这两个接口的用法,以及它们如何帮助我们更好地管理多线程。
|
8月前
|
C++
C++Future简单的使用
C++Future简单的使用
57 0
|
Java
ExecutorService、Callable、Future实现有返回结果的多线程原理解析
ExecutorService、Callable、Future实现有返回结果的多线程原理解析
85 0
|
存储 Java
并发编程系列教程(09) - Callable与Future模式
并发编程系列教程(09) - Callable与Future模式
59 0
|
Java
异步编程 - 04 基于JDK中的Future实现异步编程(上)_Future & FutureTask 源码解析
异步编程 - 04 基于JDK中的Future实现异步编程(上)_Future & FutureTask 源码解析
85 0
|
存储 Java
JUC基础(二)—— Future接口 及其实现
JUC基础(二)—— Future接口 及其实现
172 1
【并发技术11】Callable与Future的应用
【并发技术11】Callable与Future的应用