前言
我们在日常的多线程编程中,为了充分的利用现在计算机多核的CPU资源,通常是需要开启多个线程来执行相对应的异步任务。在Java中,如果想新建一个线程,就必须要实现Runnable接口或者继承Thread。但是无论这两种方式如何实现,我们都无法获取任务执行的返回结果,那么有没有一种方式是可以获取异步线程返回的结果呢?
为什么要使用Future?
拿笔者早晨上班举例说明,笔者到公司需要50分钟,到公司附近KFC需要吃个帕尼尼,那么就会如下两种场景。相对于自己去下单排队领取早晨的情况,通过线上下单并由配送员送货到公司显得笔者的工作时间更为紧凑一些,这样省下来的时间我就能专注于本职工作,拉满价值观。而通过这两种场景,也不难看出,这也是同步调用跟异步调用的差别所在。
JavaFuture概述
上文说到不管是继承Thread类还是实现Runnable接口,都无法获取任务执行的结果。但是在JDK5中引入了Callable和Future,我们可以通过Callable可以来封装异步任务,而Callable的运行是依赖Future的,虽然我们可以通过它们执行异步任务可以获取执行结果,但是它的使用姿势是什么?它的执行流程又是什么?
JavaFuture定义
Future代表异步计算的结果。方法是提供给检查计算是否完成,等待其完成,检索的结果计算。检索的结果只能是使用get方法计算已经完成,必要时阻塞,直到它已经准备好了。取消由cancel执行方法。提供额外的方法来确定任务正常完成或被取消了,计算完成后,计算不能取消。
package java.util.concurrent;
/**
* @see FutureTask
* @see Executor
* @since 1.5
* @author Doug Lea
* @param <V> The result type returned by this Future's {@code get} method
*/
public interface Future<V> {
/** 试图取消执行这一任务。 */
boolean cancel(boolean mayInterruptIfRunning);
/** 返回 true如果这个任务被取消之前完成正常。 */
boolean isCancelled();
/** 返回 true如果这个任务完成。 */
boolean isDone();
/** 如果有必要等待计算完成,然后获取它的结果。 */
V get() throws InterruptedException, ExecutionException;
/** 如果有必要等待最多计算给定的时间完成,然后获取它的结果,如果可用。 */
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
JavaFutureTask使用
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<Integer> futureTask = new FutureTask<>(() -> {
System.out.println("FutureTask任务执行");
return 0;
});
Thread t = new Thread(futureTask);
t.start();
System.out.println(futureTask.get());
}
JavaFutureTask解析
FutureTask核心属性
public class FutureTask<V> implements RunnableFuture<V> {
/**
* 可能的状态转换:
* NEW -> COMPLETING -> NORMAL 返回结果是正常的(任务正常结束)
* NEW -> COMPLETING -> EXCEPTIONAL 返回结果是异常的(任务正常结束)
* NEW -> CANCELLED 任务被取消
* NEW -> INTERRUPTING -> INTERRUPTED 任务被中断
*/
private volatile int state; // 当前任务状态
private static final int NEW = 0; // 任务初始化状态
private static final int COMPLETING = 1; // 正在将Callable封装给当前的FutureTask(正常结果、异常结果)
private static final int NORMAL = 2; // 正常结束状态
private static final int EXCEPTIONAL = 3; // 执行任务时发生的异常
private static final int CANCELLED = 4; // 任务被取消
private static final int INTERRUPTING = 5; // 线程的中断状态被设置为true
private static final int INTERRUPTED = 6; // 线程被中断
// 需要执行的任务。
private Callable<V> callable;
// 任务返回的结果,也就是.get()方法的获取的结果。
private Object outcome; // non-volatile, protected by state reads/writes
// 执行任务的线程。
private volatile Thread runner;
// 底层是单向链表,存放通过get方法挂起的等待线程,插入采用头插法。
private volatile WaitNode waiters;
....
}
FutureTask执行入口
通用使用示例可以看出在通过Thread.start();方法以后执行了Callable的内容,而FutureTask实现了RunnableFuture,但是RunnableFuture通过继承的方式实现了Runnable,也就是说最后FutureTask的执行还是通过run方法进行执行的,而这个run方法底层才会真正执行Callable的内容。
/**
* A {@link Future} that is {@link Runnable}. Successful execution of
* the {@code run} method causes completion of the {@code Future}
* and allows access to its results.
* @see FutureTask
* @see Executor
* @since 1.6
* @author Doug Lea
* @param <V> The result type returned by this Future's {@code get} method
*/
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}
/**
* FutureTask构造器
*/
public FutureTask(Callable<V> callable) {
if (callable == null) {
throw new NullPointerException();
}
// 设置具体的Callable,可参照FutureTask核心属性
this.callable = callable;
this.state = NEW; // 初始化任务状态
}
public void run() {
// 保证任务的状态是NEW才可以运行,基于CAS的方式,将当前线程设置为runner。
if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread())) {
return;
}
try {
// 需要执行的任务
Callable<V> c = callable;
// 二次校验,任务不为null,并且任务的状态还处于NEW
if (c != null && state == NEW) {
V result; // 返回结果预定义
boolean ran; // 任务是否正常结束
try {
result = c.call(); // 运行call方法,将返回结果封装到result中
ran = true; // 正常返回设置为true
} catch (Throwable ex) {
result = null; // 异常返回结果肯定为null
ran = false; // 异常返回设置为false
setException(ex); // 设置异常信息
}
if (ran) {
set(result); // 正常执行完毕,设置返回结果,详见下一小节
}
}
} finally {
runner = null; // 将执行任务的runner设置空
int s = state; // 拿到状态
if (s >= INTERRUPTING) {
// 中断后续处理
handlePossibleCancellationInterrupt(s);
}
}
}
/**
* Sets the result of this future to the given value unless
* this future has already been set or has been cancelled.
*
* <p>This method is invoked internally by the {@link #run} method
* upon successful completion of the computation.
*
* @param v the value
*/
protected void set(V v) {
// 将任务状态从NEW设置为COMPLETING
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
// 将返回结果设置给outcome,可参照FutureTask核心属性
outcome = v;
// 将状态修改为NORMAL,代表当前任务正常结束
UNSAFE.putOrderedInt(this, stateOffset, NORMAL);
// 任务执行完毕,需要将挂起的线程唤醒,详见下一小节
finishCompletion();
}
}
FutureTask获取结果
/**
* @throws CancellationException {@inheritDoc}
*/
public V get() throws InterruptedException, ExecutionException {
// 获取当前任务状态
int s = state;
// 如果没在封装中的,就代表还没有返回结果
if (s <= COMPLETING) {
// 尝试挂起线程,等待拿结果,详见下一小节
s = awaitDone(false, 0L);
}
// 返回结果处理
return report(s);
}
/**
* @throws CancellationException {@inheritDoc}
*/
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null) {
throw new NullPointerException();
}
// 获取当前任务状态
int s = state;
// 如果没在封装中的,就代表还没有返回结果。尝试挂起线程,等待拿结果,并判断超时,详见下一小节
if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) {
throw new TimeoutException();
}
// 返回结果处理
return report(s);
}
/**
* Awaits completion or aborts on interrupt or timeout.
* 线程要等待任务执行结束,等待任务执行的状态变为大于COMPLETING状态
* @param timed true if use timed waits
* @param nanos time to wait, if timed
* @return state upon completion
*/
private int awaitDone(boolean timed, long nanos) throws InterruptedException {
// 如果是无限等待的get(),就为0。如果是get(time,unit)就追加当前的系统时间
final long deadline = timed ? System.nanoTime() + nanos : 0L;
// 构建WaitNode,可参照FutureTask核心属性
WaitNode q = null;
boolean queued = false;
for (;;) {
// 检查当前的get的线程是否中断了,如果中断了,将当前节点从waiters中移除,并且抛出中断异常。
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
// 获取任务的状态
int s = state;
// 判断了
if (s > COMPLETING) {
// 如果当前任务已经执行结束,并且设置过等待的线程,直接移除等待的线程
if (q != null) {
q.thread = null;
}
return s;
}
else if (s == COMPLETING)
// 如果当前任务正在封装结果中,让出时间片(COMPLETING的持续时间很短)
Thread.yield();
/*
* 如果到这里,说明不是在封装中,也不是执行完毕,也只有状态是NEW
*/
else if (q == null)
// 新增WaitNode准备存放当前线程
q = new WaitNode();
else if (!queued)
// 如果当前等待线程,还没有在waiters中排队,使用头插法插入链表中
queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);
else if (timed) {
/*
* 这是指定时间,get(time,unit)的挂起线程的方式
*/
nanos = deadline - System.nanoTime(); // 计算挂起时间
if (nanos <= 0L) {
// 挂起线程超时判断,如果超时,移除waiters中的当前Node
removeWaiter(q);
return state;
}
// 挂起指定时间的线程
LockSupport.parkNanos(this, nanos);
}
else {
// 无现挂起线程
LockSupport.park(this);
}
}
}
FutureTask唤醒线程
在上文"FutureTask执行入口"中有如下代码,在执行完毕以后,会调用finishCompletion唤起挂起的线程。
/**
* Sets the result of this future to the given value unless
* this future has already been set or has been cancelled.
*
* <p>This method is invoked internally by the {@link #run} method
* upon successful completion of the computation.
*
* @param v the value
*/
protected void set(V v) {
// 将任务状态从NEW设置为COMPLETING
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
// 将返回结果设置给outcome,可参照FutureTask核心属性
outcome = v;
// 将状态修改为NORMAL,代表当前任务正常结束
UNSAFE.putOrderedInt(this, stateOffset, NORMAL);
// 任务执行完毕,需要将挂起的线程唤醒
finishCompletion();
}
}
/**
* 到这个方法的都是执行完毕的
* Removes and signals all waiting threads, invokes done(), and
* nulls out callable.
*/
private void finishCompletion() {
for (WaitNode q; (q = waiters) != null;) {
// 拿到第一个节点后,直接用CAS的方式,将其设置为null
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
// 获取线程信息
Thread t = q.thread;
if (t != null) {
// 如果等待线程不为null,将等待节点中的的线程设置为null
q.thread = null;
// 唤醒线程
LockSupport.unpark(t);
}
// 然后继续遍历,循环唤醒
WaitNode next = q.next;
if (next == null) {
break;
}
q.next = null;
// 指向next的WaitNode
q = next;
}
break;
}
}
done();// 扩展的执行完成方法,可以自己实现你想要的任何逻辑,比如日志摘要等等
callable = null; // 任务执行完毕,设置callable为null
}
总结
通过此上,你应该充分了解了FutureTask的执行入口、获取结果、唤醒线程的方式以及流程,尽管文章代码注释看起来可能比较晦涩,但你可以根据源码跟自己尝试写个Demo,配合文中信息断点调试跟踪一下,这样有助于充分理解文中所描述内容。
虽然尽管我们日常中大量的在使用FutureTask,但是它目前是存在一些问题的,其主要原因是因为它是同步非阻塞的执行任务的,用大白话说就是它不会主动通知你执行结果是什么,在获取的结果的时候,你可以需要不必要的等待,关于这个问题,java又引入了CompletableFuture,我们下个文章就肝一下看看这个Future高阶使用又是如何?它又是如何实现的?
梳理编写文章不易,还请各位看官点点关注。