@[TOC]
Future介绍
Java创建线程的方式,一般常用的是Thread,Runnable。如果需要当前处理的任务有返回结果的话,需要使用Callable。Callable运行需要配合Future。
Future是一个接口,一般会使用FutureTask实现类去接收Callable任务的返回结果。
FutureTask使用
下面示例使用FutureTask来执行一个可以返回结果的异步任务。Callable是要执行的任务,FutureTask是存放任务返回结果的位置。
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<Integer> futureTask = new FutureTask<>(() -> {
System.out.println("任务执行");
Thread.sleep(2000);
return 123+764;
});
Thread t = new Thread(futureTask);
t.start();
System.out.println("main线程启动了t线程处理任务");
Integer result = futureTask.get();
System.out.println(result);
}
FutureTask 分析
首先看一下FutureTask的核心属性
/**
* NEW -> COMPLETING -> NORMAL 任务正常执行,返回结果是正常的结果
* NEW -> COMPLETING -> EXCEPTIONAL 任务正常执行,但是返回结果是异常
* NEW -> CANCELLED 任务直接被取消的流程
* NEW -> INTERRUPTING -> INTERRUPTED
*/
// 代表当前任务的状态
private volatile int state;
private static final int NEW = 0; // 任务的初始化状态
private static final int COMPLETING = 1; // Callable的结果(正常结果,异常结果)正在封装给当前的FutureTask
private static final int NORMAL = 2; // NORMAL任务正常结束
private static final int EXCEPTIONAL = 3; // 执行任务时,发生了异常
private static final int CANCELLED = 4; // 任务被取消了。
private static final int INTERRUPTING = 5; // 线程的中断状态,被设置为了true(现在还在运行)
private static final int INTERRUPTED = 6; // 线程被中断了。
// 当前要执行的任务
private Callable<V> callable;
// 存放任务返回结果的属性,也就是futureTask.get需要获取的结果
private Object outcome;
// 执行任务的线程。
private volatile Thread runner;
// 单向链表,存放通过get方法挂起等待的线程
private volatile WaitNode waiters;
t.start后,是通过run方法执行的Callable的call方法,该方法是同步的,然后将返回结果赋值给了outcome。
// run方法的执行流程,最终会执行Callable的call方法
public void run() {
// 保证任务的状态是NEW才可以运行
// 基于CAS的方式,将当前线程设置为runner。
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))
return;
// 准备执行任务
try {
// 要执行任务 c
Callable<V> c = callable;
// 任务不为null,并且任务的状态还处于NEW
if (c != null && state == NEW) {
// 放返回结果
V result;
// 任务执行是否为正常结束
boolean ran;
try {
// 运行call方法,拿到返回结果封装到result中
result = c.call();
// 正常返回,ran设置为true
ran = true;
} catch (Throwable ex) {
// 结果为null
result = null;
// 异常返回,ran设置为false
ran = false;
// 设置异常信息
setException(ex);
}
if (ran)
// 正常执行结束,设置返回结果
set(result);
}
} finally {
// 将执行任务的runner设置空
runner = null;
// 拿到状态
int s = state;
// 中断要做一些后续处理
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
// 设置返回结果
protected void set(V v) {
// 首先要将任务状态从NEW设置为COMPLETING
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
// 将返回结果设置给outcome。
outcome = v;
// 将状态修改为NORMAL,代表正常技术
UNSAFE.putOrderedInt(this, stateOffset, NORMAL);
finishCompletion();
}
}
get方法获取返回结果时会查看当前线程状态,如果状态还未达成,也就是说call方法还未执行完未执行set方法,该线程就会被挂起阻塞LockSupport.park(this);。
public V get() throws InterruptedException, ExecutionException {
// 拿状态
int s = state;
// 满足找个状态就代表现在可能还没有返回结果
if (s <= COMPLETING)
// 尝试挂起线程,等待拿结果
s = awaitDone(false, 0L);
return report(s);
}
// 线程要等待任务执行结束,等待任务执行的状态变为大于COMPLETING状态
private int awaitDone(boolean timed, long nanos) throws InterruptedException {
// 计算deadline,如果是get(),就是0, 如果是get(time,unit)那就追加当前系统时间
final long deadline = timed ? System.nanoTime() + nanos : 0L;
// 构建WaitNode
WaitNode q = null;
// queued = false
boolean queued = false;
// 死循环
for (;;) {
// 找个get的线程是否中断了。
if (Thread.interrupted()) {
// 将当前节点从waiters中移除。
removeWaiter(q);
// 并且抛出中断异常
throw new InterruptedException();
}
// 拿到现在任务的状态
int s = state;
// 判断任务是否已经执行结束了
if (s > COMPLETING) {
// 如果设置过WaitNode,直接移除WaitNode的线程
if (q != null)
q.thread = null;
// 返回当前任务的状态
return s;
}
// 如果任务的状态处于 COMPLETING ,
else if (s == COMPLETING)
// COMPLETING的持续时间非常短,只需要做一手现成的让步即可。
Thread.yield();
// 现在线程的状态是NEW,(call方法可能还没执行完呢,准备挂起线程)
else if (q == null)
// 封装WaitNode存放当前线程
q = new WaitNode();
else if (!queued)
// 如果WaitNode还没有排在waiters中,现在就排进来(头插法的效果)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);
else if (timed) {
// get(time,unit)挂起线程的方式
// 计算挂起时间
nanos = deadline - System.nanoTime();
// 挂起的时间,是否小于等于0
if (nanos <= 0L) {
// 移除waiters中的当前Node
removeWaiter(q);
// 返回任务状态
return state;
}
// 正常指定挂起时间即可。(线程挂起)
LockSupport.parkNanos(this, nanos);
}
else {
// get()挂起线程的方式
LockSupport.park(this);
}
}
}
当任务执行完毕(set方法执行完成),由finishCompletion唤醒线程,LockSupport.unpark(t);
// 任务状态已经变为了NORMAL,做一些后续处理
private void finishCompletion() {
for (WaitNode q; (q = waiters) != null;) {
// 拿到第一个节点后,直接用CAS的方式,将其设置为null
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
// 基于q拿到线程信息
Thread t = q.thread;
// 线程不为null
if (t != null) {
// 将WaitNode的thread设置为null
q.thread = null;
// 唤醒这个线程
LockSupport.unpark(t);
}
// 往后遍历,接着唤醒
WaitNode next = q.next;
if (next == null)
break;
q.next = null;
// 指向next的WaitNode
q = next;
}
break;
}
}
// 扩展方法,没任何实现,你可以自己实现
done();
// 任务处理完了,可以拜拜了!
callable = null;
}
拿到返回结果的处理
// 任务结束。
private V report(int s) throws ExecutionException {
// 拿到结果
Object x = outcome;
// 判断是正常返回结束
if (s == NORMAL)
// 返回结果
return (V)x;
// 任务状态是大于取消
if (s >= CANCELLED)
// 甩异常。
throw new CancellationException();
// 扔异常。
throw new ExecutionException((Throwable)x);
}
// 正常返回 report
// 异常返回 report
// 取消任务 report
// 中断任务 awaitDone
CompletableFuture
FutureTask存在的问题:
问题1:FutureTask获取线程执行的结果前,主线程需要通过get方法一直阻塞等待子线程执行完call方法,才可以拿到返回结果。
问题2:如果不通过get去挂起线程,通过while循环,不停的判断任务的执行状态是否结束,结束后,再拿结果。如果任务长时间没执行完毕,CPU会一直调度查看任务状态的方法,会浪费CPU资源。
FutureTask是一个同步非阻塞处理任务的方式。需要一个异步非阻塞处理任务的方式。CompletableFuture在一定程度上就提供了各种异步非阻塞的处理方案。
CompletableFuture也是实现了Future接口实现的功能,可以不使用FutureTask,提供非常丰富的函数去执行各种异步操作,直接使用CompletableFuture即可。
CompletableFuture的应用
CompletableFuture最重要的就是解决了异步回调的问题
CompletableFuture就是执行一个异步任务,异步任务可以有返回结果,也可以没有返回结果,使用了函数式编程中三个最核心的接口
Supplier - 生产者,没有入参,但是有返回结果
Consumer - 消费者,有入参,但是没有返回结果
Function - 函数,有入参,并且有返回结果
提供了两个最基本运行的基本方法
supplyAsync(Supplier<U> supplier) 异步执行任务,有返回结果
runAsync(Runnable runnable) 异步执行任务,没有返回结果
在不指定线程池的前提下,这两个异步任务都是交给ForkJoinPool去执行的。
但是只是用这两个方法,无法实现异步回调的。如果需要在当前任务执行完毕后,拿着返回结果继续去执行后续任务操作的话,需要基于其他方法去实现。
thenApply(Function<prevResult,currResult>); 等待前一个任务处理结束后,拿着前置任务的返回结果,再做处理,并且返回当前结果
thenApplyAsync(Function<prevResult,currResult>,线程池) 采用全新的线程执行
thenAccept(Consumer<preResult>);等待前一个任务处理结束后,拿着前置任务的返回结果再做处理,没有返回结果
thenAcceptAsync(Consumer<preResult>,线程池);采用全新的线程执行
thenRun(Runnable) 等待前一个任务处理结束后,再做处理。不接收前置任务结果,也不返回结果
thenRunAsync(Runnable[,线程池]) 采用全新的线程执行
其次还有可以执行相对复杂的处理,在前一个任务执行的同时,执行后续任务。等待前置任务和后置任务都搞定之后,再执行最终任务
thenCombine(CompletionStage,Function<prevResult,nextResult,afterResult>) 让prevResult和nextResult一起执行,等待执行完成后,获取前两个任务的结果执行最终处理,最终处理也可以返回结果
thenCombineAsync(CompletionStage,Function<prevResult,nextResult,afterResult>[,线程池]) 采用全新的线程执行
thenAcceptBoth(CompletionStage,Consumer<prevResult,nextResult>);让前置任务和后续任务同时执行,都执行完毕后,拿到两个任务的结果,再做后续处理,但是没有返回结果
thenAcceptBothAsync(CompletionStage,Consumer<prevResult,nextResult>[,线程池])采用全新的线程执行
runAfterBoth(CompletionStage,Runnble) 让前置任务和后续任务同时执行,都执行完毕后再做后续处理
runAfterBothAsync(CompletionStage,Runnble[,线程池]) 采用全新的线程执行
还提供了可以让两个任务一起执行,但是有一个任务结束,有返回结果后,就做最终处理
applyToEither(CompletionStage,Function<firstResult,afterResult>) 前面两个任务同时执行,有一个任务执行完,获取返回结果,做最终处理,再返回结果
acceptEither(CompletionStage,Consumer<firstResult>) 前面两个任务同时执行,有一个任务执行完,获取返回结果,做最终处理,没有返回结果
runAfterEither(CompletionStage,Runnable) 前面两个任务同时执行,有一个任务执行完,做最终处理
还提供了等到前置任务处理完,再做后续处理,后续处理返回的结果为CompletionStage
thenCompose(Function<prevResult,CompletionStage>)
最后还有处理异常的各种姿势
exceptionally(Function<Throwable,currResult>)
whenComplete(Consumer<prevResult,Throwable>)
hanle(Function<prevResult,Throwable,currResult>)
CompletableFuture 示例
public static void main(String[] args) throws InterruptedException {
sout("我回家吃饭");
CompletableFuture<String> task = CompletableFuture.supplyAsync(() -> {
sout("阿姨做饭!");
return "锅包肉!";
});
sout("我看电视!");
sout("我吃饭:" + task.join());
}
public static void main(String[] args) throws InterruptedException {
sout("我回家吃饭");
CompletableFuture<String> task = CompletableFuture.supplyAsync(() -> {
sout("阿姨炒菜!");
return "锅包肉!";
},executor).thenCombineAsync(CompletableFuture.supplyAsync(() -> {
sout("小王焖饭");
return "大米饭!";
},executor),(food,rice) -> {
sout("大王端" + food + "," + rice);
return "饭菜好了!";
},executor);
sout("我看电视!");
sout("我吃饭:" + task.join());
}
总结
Future和CompletableFuture都是用于处理异步任务的接口和类,它们的主要区别在于功能复杂度和使用场景。Future比较简单,主要用于简单的异步任务处理,而CompletableFuture则更加灵活和强大,适用于复杂的异步任务处理场景。