概述
Future接口和实现Future接口的FutureTask类,代表异步计算的结果。
FutureTask除了实现Future接口外,还实现了Runnable接口。因此,FutureTask可以交给Executor执行,也可以由调用线程直接执行(FutureTask.run())。
FutureTask的三种运行状态
根据FutureTask.run()方法被执行的时机,FutureTask可以处于下面3种状态
未启动。FutureTask.run()方法还没有被执行之前,FutureTask处于未启动状态。当创建一个FutureTask,且没有执行FutureTask.run()方法之前,这个FutureTask处于未启动状态。
已启动。FutureTask.run()方法被执行的过程中,FutureTask处于已启动状态。
已完成。FutureTask.run()方法执行完后正常结束,或被取消(FutureTask.cancel(…)),或执行FutureTask.run()方法时抛出异常而异常结束,FutureTask处于已完成状态。
FutureTask的三种运行状态下的get/cancel操作及结果
当FutureTask处于未启动或已启动状态时,执行FutureTask.get()方法将导致调用线程阻塞
当FutureTask处于已完成状态时,执行FutureTask.get()方法将导致调用线程立即返回结果或抛出异常
当FutureTask处于未启动状态时,执行FutureTask.cancel()方法将导致此任务永远不会被执行
当FutureTask处于已启动状态时,执行FutureTask.cancel(true)方法将以中断执行此任务线程的方式来试图停止任务
当FutureTask处于已启动状态时,执行FutureTask.cancel(false)方法将不会对正在执行此任务的线程产生影响(让正在执行的任务运行完成)
当FutureTask处于已完成状态时,执行FutureTask.cancel(…)方法将返回false。
FutureTask的实现
FutureTask的实现基于AbstractQueuedSynchronizer(AQS)。
AQS是一个同步框架,它提供通用机制来原子性管理同步状态、阻塞和唤醒线程,以及维护被阻塞线程的队列。
基于AQS实现的同步器包括:ReentrantLock、Semaphore、ReentrantReadWriteLock、CountDownLatch和FutureTask
并发编程-15并发容器(J.U.C)核心 AbstractQueuedSynchronizer 抽象队列同步器AQS介绍
每一个基于AQS实现的同步器都会包含两种类型的操作
至少一个acquire操作。这个操作阻塞调用线程,除非/直到AQS的状态允许这个线程继续执行。FutureTask的acquire操作为get()/get(long timeout,TimeUnit unit)方法调用
至少一个release操作。这个操作改变AQS的状态,改变后的状态可允许一个或多个阻塞线程被解除阻塞。FutureTask的release操作包括run()方法和cancel(…)方法
基于“复合优先于继承”的原则,FutureTask声明了一个内部私有的继承于AQS的子类Sync,对FutureTask所有公有方法的调用都会委托给这个内部子类
AQS被作为“模板方法模式”的基础类提供给FutureTask的内部子类Sync,这个内部子类只需要实现状态检查和状态更新的方法即可,这些方法将控制FutureTask的获取和释放操作。具体来说,Sync实现了AQS的tryAcquireShared(int)方法和tryReleaseShared(int)方法,Sync通过这两个方法来检查和更新同步状态。
FutureTask的使用
- 可以把FutureTask交给Executor执行
- 也可以通过
ExecutorService.submit(…)
方法返回一个FutureTask,然后执行FutureTask.get()
方法或FutureTask.cancel(…)
方法 - 除此以外,还可以单独使用FutureTask
当一个线程需要等待另一个线程把某个任务执行完后它才能继续执行,此时可以使用FutureTask.
示例
Future
package com.artisan.example.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @Slf4j public class FutureExample { static class MyCallable implements Callable<String> { @Override public String call() throws Exception { // 耗时任务 log.info("do something in callable start"); Thread.sleep(5000); log.info("do something in callable end"); return "DONE"; } } public static void main(String[] args) throws Exception { // 创建一个newCachedThreadPool线程池 ExecutorService executorService = Executors.newCachedThreadPool(); // submit任务 Future<String> future = executorService.submit(new MyCallable()); // 主线程模拟一些业务操作,假设耗时一秒 log.info("do something in main begin"); Thread.sleep(1000); log.info("do something in main finish"); // 获取刚才提交的线程MyCallable的返回结果 log.info("获取MyCallable的返回结果,如果未返回,主线程将阻塞,处于等待状态"); String result = future.get(); log.info("result:{}", result); // 关闭线程池 executorService.shutdown(); } }
观察执行结果:
FutureTask
package com.artisan.example.aqs; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.FutureTask; import lombok.extern.slf4j.Slf4j; @Slf4j public class FutureTaskExample { public static void main(String[] args) throws Exception { FutureTask<String> futureTask = new FutureTask<String>(new Callable<String>() { @Override public String call() throws Exception { // 耗时任务 log.info("do something in callable"); Thread.sleep(5000); return "DONE"; } }); //创建一个newCachedThreadPool线程池 ExecutorService executorService = Executors.newCachedThreadPool(); // execute futureTask任务 executorService.execute(futureTask); // 主线程模拟一些业务操作,假设耗时一秒 log.info("do something in main begin"); Thread.sleep(1000); log.info("do something in main finish"); // 获取刚才提交的线程MyCallable的返回结果 log.info("获取futureTask的返回结果,如果未返回,主线程将阻塞,处于等待状态"); String result = futureTask.get(); log.info("result:{}", result); // 关闭线程池 executorService.shutdown(); } }