Future:异步任务结果获取

简介: Future:异步任务结果获取

我们之前说过如何正确创建线程池,我们详细介绍了怎么合理使用线程池,我们也只是介绍了 ThreadPoolExecutorvoid execute(Runnable command)方法,利用这个我们可以提交任务,让线程去消费处理,但是没有办法获取任务的执行结果。因为该方法没有返回值。而有一些场景我们需要获取任务的执行结果再判断逻辑。

1. FutureTask 介绍

Java 通过 ThreadPoolExecutor提供的 3 个 submit()方法和 1 个 FutureTask 工具类来支持获得任务执行结果的需求。下面我们先来介绍这 3 个 submit()方法,这 3 个方法的方法签名如下。

// 提交Runnable任务
Future<?> submit(Runnable task);
// 提交Callable任务
<T> Future<T> submit(Callable<T> task);
// 提交Runnable任务及结果引用
<T> Future<T> submit(Runnable task, T result);

他们的返回值都是 Future接口,一共有 5 个方法, 取消任务的方法 cancel()、判断任务是否已取消的方法 isCancelled()、判断任务是否已结束的方法 isDone()**以及**2 个获得任务执行结果的 get()和 get(timeout, unit),其中最后一个 get(timeout, unit)支持超时机制。通过 Future 接口的这 5 个方法你会发现,我们提交的任务不但能够获取任务执行结果,还可以取消任务。不过需要注意的是:这两个 get()方法都是阻塞式的,如果被调用的时候,任务还没有执行完,那么调用 get()方法的线程会阻塞,直到任务执行完才会被唤醒。

// 取消任务
boolean cancel(boolean mayInterruptIfRunning);
// 判断任务是否已取消
boolean isCancelled();
// 判断任务是否已结束
boolean isDone();
// 获得任务执行结果
get();
// 获得任务执行结果,支持超时
get(long timeout, TimeUnit unit);

这 3 个 submit()方法之间的区别在于方法参数不同,下面我们简要介绍一下。

  1. 提交 Runnable 任务 submit(Runnable task) :这个方法的参数是一个 Runnable 接口,Runnable 接口的 run()方法是没有返回值的,所以 submit(Runnable task) 这个方法返回的 Future 仅可以用来断言任务已经结束了,类似于Thread.join()
  2. 提交 Callable 任务 submit(Callable task):这个方法的参数是一个Callable接口,它只有一个call()方法,并且这个方法是有返回值的,所以这个方法返回的Future对象可以通过调用其get()方法来获取任务的执行结果。
  3. 提交 Runnable 任务及结果引用 submit(Runnable task, T result):这个方法很有意思,假设这个方法返回的Future对象是ff.get()的返回值就是传给submit()方法的参数result。result 相当于主线程和子线程之间的桥梁,通过它主子线程可以共享数据。

2. 实现最优的“烧水泡茶”程序

记得以前初中语文课文里有一篇著名数学家华罗庚先生的文章《统筹方法》,这篇文章里介绍了一个烧水泡茶的例子,文中提到最优的工序应该是下面这样:

img

下面我们用程序来模拟一下这个最优工序。我们专栏前面曾经提到,并发编程可以总结为三个核心问题:分工、同步和互斥。

编写并发程序,首先要做的就是分工,所谓分工指的是如何高效地拆解任务并分配给线程。

对于烧水泡茶这个程序,一种最优的分工方案可以是下图所示的这样:用两个线程 T1 和 T2 来完成烧水泡茶程序,T1 负责洗水壶、烧开水、泡茶这三道工序,T2 负责洗茶壶、洗茶杯、拿茶叶三道工序,其中 T1 在执行泡茶这道工序时需要等待 T2 完成拿茶叶的工序。对于 T1 的这个等待动作,你应该可以想出很多种办法,例如 Thread.join()、CountDownLatch,甚至阻塞队列都可以解决,不过今天我们用 Future 特性来实现。

img

2.1 烧水泡茶实战

首先我们要创建任务,也就是实现 Callable接口重写call()方法。

创建负责洗茶壶、烧开水,等待泡茶预备工作完成的泡茶任务

/**
 * 洗水壶、烧开水,并等待泡茶预备任务完成的结果执行泡茶
 */
public class BoilWaterTask implements Callable<String> {
    private Future<String> prepareTeaFuture;
    public BoilWaterTask(Future<String> prepareTeaFuture) {
        this.prepareTeaFuture = prepareTeaFuture;
    }
    @Override
    public String call() throws Exception {
        System.out.println("BoilWaterTask:洗水壶...");
        TimeUnit.SECONDS.sleep(1);
        System.out.println("BoilWaterTask:烧开水...");
        TimeUnit.SECONDS.sleep(15);
        // 获取T2线程的茶叶
        String prepareTeaResult = prepareTeaFuture.get();
        System.out.println("BoilWaterTask:拿到茶叶:" + prepareTeaResult);
        System.out.println("BoilWaterTask:泡茶...");
        return "上茶:" + prepareTeaResult;
    }
}

接着创建洗茶壶、洗茶杯、拿茶叶任务的任务

/**
 * 洗茶壶、洗茶杯、拿茶叶任务
 */
public class PrepareTeaTask implements Callable<String> {
    @Override
    public String call() throws Exception {
        System.out.println("PrepareTeaTask:洗茶壶...");
        TimeUnit.SECONDS.sleep(1);
        System.out.println("PrepareTeaTask:洗茶杯...");
        TimeUnit.SECONDS.sleep(2);
        System.out.println("PrepareTeaTask:拿茶叶...");
        TimeUnit.SECONDS.sleep(1);
        return "龙井";
    }
}

最后我们定义一个 TeaService 提供makeTea()泡茶方法。内部使用线程池执行任务。我们使用FutureTask 的构造方法创建任务,然后将任务提交到线程池中。boilWaterFutureTask 任务内部会依赖等prepareTeaFutureTask完成后执行泡茶动作。

public class TeaService {
    private ExecutorService executorService = Executors.newFixedThreadPool(2);
    public void makeTea() throws ExecutionException, InterruptedException {
        //构造任务
        FutureTask<String> prepareTeaFutureTask = new FutureTask<>(new PrepareTeaTask());
        FutureTask<String> boilWaterFutureTask = new FutureTask<>(new BoilWaterTask(prepareTeaFutureTask));
//提交任务
        executorService.submit(prepareTeaFutureTask);
        executorService.submit(boilWaterFutureTask);
// 获取任务执行结果,阻塞等待
        System.out.println(boilWaterFutureTask.get());
        executorService.shutdown();
    }
}
// 一次执行结果
BoilWaterTask:洗水壶...
PrepareTeaTask:洗茶壶...
PrepareTeaTask:洗茶杯...
BoilWaterTask:烧开水...
PrepareTeaTask:拿茶叶...
BoilWaterTask:拿到茶叶:龙井
BoilWaterTask:泡茶...
上茶:龙井

Future 可以类比为现实世界里的提货单,比如去蛋糕店订生日蛋糕,蛋糕店都是先给你一张提货单,你拿到提货单之后,没有必要一直在店里等着,可以先去干点其他事,比如看场电影;等看完电影后,基本上蛋糕也做好了,然后你就可以凭提货单领蛋糕了。

3. ExecutorCompletionService 优化 Future

上文泡茶的程序我们是通过 FutureTask 提供的 get 方法获取执行结果, 假设现在有一大批需要进行计算的任务,为了提高整批任务的执行效率,你可能会使用线程池,向线程池中不断 submit 异步计算任务,同时你需要保留与每个任务关联的 Future,最后遍历这些 Future,通过调用 Future 接口实现类的 get 方法获取整批计算任务的各个结果。

虽然使用了线程池提高了整体的执行效率,但遍历这些 Future,调用 Future 接口实现类的 get 方法是阻塞的,也就是和当前这个 Future 关联的计算任务真正执行完成的时候,get 方法才返回结果,如果当前计算任务没有执行完成,而有其它 Future 关联的计算任务已经执行完成了,就会白白浪费很多等待的时间,所以最好是遍历的时候谁先执行完成就先获取哪个结果,这样就节省了很多持续等待的时间。

而 ExecutorCompletionService 可以实现这样的效果,它的内部有一个先进先出的阻塞队列,用于保存已经执行完成的 Future,通过调用它的 take 方法或 poll 方法可以获取到一个已经执行完成的 Future,进而通过调用 Future 接口实现类的 get 方法获取最终的结果。

定义一个计算任务,最终我们需要获取到执行结果。

public class ComputeTask implements Callable<String> {
    @Override
    public String call() throws Exception {
        int sleepTime = new Random().nextInt(1000);
        try {
            Thread.sleep(sleepTime);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // 返回给调用者的值
        String name = Thread.currentThread().getName();
        String result = name + " sleep time:" + sleepTime;
        System.out.println(name + " finished...");
        return result;
    }
}
public class FutureTest {
    ExecutorService pool = Executors.newFixedThreadPool(4);
    @After
    public void after() {
        pool.shutdown();
    }
    @Test
    public void testComputeTask() throws InterruptedException, ExecutionException {
        ExecutorCompletionService<String> completionService = new ExecutorCompletionService<>(pool);
        int taskSize = 4;
        for (int i = 0; i < taskSize; i++) {
            completionService.submit(new ComputeTask());
        }
        for (int i = 0; i < taskSize; i++) {
            System.out.println(completionService.take().get());
        }
    }
}

3.1 方法解析

ExecutorCompletionService 实现了 CompletionService 接口,在 CompletionService 接口中定义了如下这些方法:

  • Futuresubmit(Callabletask):提交一个 Callable 类型任务,并返回该任务执行结果关联的 Future;
  • Futuresubmit(Runnable task,V result):提交一个 Runnable 类型任务,并返回该任务执行结果关联的 Future;
  • Futuretake():从内部阻塞队列中获取并移除第一个执行完成的任务,阻塞,直到有任务完成;
  • Futurepoll():从内部阻塞队列中获取并移除第一个执行完成的任务,获取不到则返回 null,不阻塞;
  • Futurepoll(long timeout, TimeUnit unit):从内部阻塞队列中获取并移除第一个执行完成的任务,阻塞时间为 timeout,获取不到则返回 null;


相关文章
Gradle安装配置下载
Gradle安装配置下载
897 1
|
前端开发 Java 关系型数据库
【SpringBoot】微服务学习笔记七:微服务中异步调用数据提交数据库的问题
【SpringBoot】微服务学习笔记七:微服务中异步调用数据提交数据库的问题
1014 0
【SpringBoot】微服务学习笔记七:微服务中异步调用数据提交数据库的问题
|
5月前
|
监控 关系型数据库 MySQL
MySQL 联合索引
联合索引是MySQL中提升多列查询性能的关键技术,由多个列组成,遵循最左前缀原则。合理设计索引顺序可显著加速查询,避免全表扫描。需结合实际查询需求创建,避免冗余,配合EXPLAIN分析执行计划,优化数据库性能。(238字)
|
存储 XML JSON
Activiti 7 核心数据库表概览及流程生命周期中的作用
Activiti 7 工作流引擎通过约25张核心数据库表实现流程定义、运行时状态、历史记录与身份数据的存储。表名以ACT_开头,后跟标识用途的字母组合(如RE表示Repository静态信息,RU表示Runtime动态数据)。流程启动时在运行时表登记数据,任务执行中更新关联信息,结束时清理运行时记录并完善历史记录。各表分工明确且逻辑紧密关联,确保高效运行与完整留痕的平衡。掌握这些表的作用和关联有助于深入理解Activiti底层原理及进行高级应用开发。
1045 0
|
11月前
|
XML 人工智能 Java
Spring IOC 到底是什么?
IOC(控制反转)是一种设计思想,主要用于解耦代码,简化依赖管理。其核心是将对象的创建和管理交给容器处理,而非由程序直接硬编码实现。通过IOC,开发者无需手动new对象,而是由框架负责实例化、装配和管理依赖对象。常见应用如Spring框架中的BeanFactory和ApplicationContext,它们实现了依赖注入和动态管理功能,提升了代码的灵活性与可维护性。
255 1
|
设计模式 Java
Java中的finally一定会被执行吗
在Java中,`finally`块通常会在正常情况下执行,但在特定异常情况下(如调用`System.exit()`、`Runtime.getRuntime().halt()`、死锁、掉电或JVM崩溃)则不会执行。此外,`System.exit()`会触发JVM关闭钩子,而`Runtime.getRuntime().halt()`则不会。面试时遇到疑问句,答案往往是“否定”的,符合这一逻辑。
289 0
Java中的finally一定会被执行吗
|
人工智能 小程序 Java
【Java】throw异常后代码还执行吗?80%小伙伴竟然不知道
本文通过具体的Java代码示例,探讨了Java异常处理机制下的程序流程变化,包括未使用try-catch时异常导致流程中断、使用try-catch捕获异常后的不同执行路径、循环中的异常处理以及throw抛出异常后的代码执行情况。总结了异常处理的关键点,强调了finally块的重要性。
393 4
【Java】throw异常后代码还执行吗?80%小伙伴竟然不知道
什么是死锁?产生死锁的原因?产生死锁的四个必要条件?死锁的避免与预防?
什么是死锁?产生死锁的原因?产生死锁的四个必要条件?死锁的避免与预防?
1678 0
|
存储 负载均衡 算法
1. Etcd 介绍
1. Etcd 介绍