API解释如下
可取消的异步计算。该类提供了一个Future的基本实现 ,具有启动和取消计算的方法,查询计算是否完整,并检索计算结果。结果只能在计算完成后才能检索;如果计算尚未完成,则get方法将阻止。一旦计算完成,则无法重新启动或取消计算(除非使用runAndReset()调用计算 )。A FutureTask可用于包装Callable或Runnable对象。 因为FutureTask实现Runnable ,一个FutureTask可以提交到一个Executor执行。 除了作为独立类之外,此类还提供了protected功能,在创建自定义任务类时可能很有用。
boolean |
|
protected void |
|
V |
|
V |
|
boolean |
|
boolean |
|
void |
|
protected boolean |
|
protected void |
set(V v) 将此未来的结果设置为给定值,除非此未来已被设置或已被取消。 |
protected void |
|
FutureTask状态流转图
FetureTask的get()和cancel()的执行示意图
代码示例
import com.sun.org.apache.xpath.internal.operations.String; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; public class FutureTaskTest { public static void main(String[] args) throws ExecutionException, InterruptedException { FutureTask futureTask = new FutureTask<>(() -> { System.out.println("-----"); return 1024; }); new Thread(futureTask).start(); Object o = futureTask.get(); } }
(FutureTask.java)中的方法
run()
//Thread.start()最后会执行的地方 public void run() { //非NEW状态线程或者通过CAS设置当前线程运行当前任务失败(表示被其他线程抢占了当前任务)直接返回 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; //执行到这里,当前线程为NEW状态,且抢占TASK任务成功,进入任务执行的流程 try { //自己封装的Callable或者装饰后的Runnable Callable<V> c = callable; //防止空指针 或者 当前线程被中断的情况(小概率) if (c != null && state == NEW) { V result; boolean ran; try { //调用我们实现的call方法 result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
get()
/** * The run state of this task, initially NEW. The run state * transitions to a terminal state only in methods set, * setException, and cancel. During completion, state may take on * transient values of COMPLETING (while outcome is being set) or * INTERRUPTING (only while interrupting the runner to satisfy a * cancel(true)). Transitions from these intermediate to final * states use cheaper ordered/lazy writes because values are unique * and cannot be further modified. * * Possible state transitions: * NEW -> COMPLETING -> NORMAL * NEW -> COMPLETING -> EXCEPTIONAL * NEW -> CANCELLED * NEW -> INTERRUPTING -> INTERRUPTED */ private volatile int state; /** *线程状态几种类型 */ private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6; /** The underlying callable; nulled out after running */ //Callable或者Runnable使用装饰者模式伪装成Callable private Callable<V> callable; /** The result to return or exception to throw from get() */ private Object outcome; // non-volatile, protected by state reads/writes /** The thread running the callable; CASed during run() */ //执行线程的引用 private volatile Thread runner; /** Treiber stack of waiting threads */ private volatile WaitNode waiters; /** * Returns result or throws exception for completed task. * * @param s completed state value */ @SuppressWarnings("unchecked") private V report(int s) throws ExecutionException { //正常执行结束,outcome保存执行的结果 //非正常outcome结束保存抛出的异常 Object x = outcome; if (s == NORMAL) //返回运算结果 return (V)x; //调用cancel()被取消的状态 if (s >= CANCELLED) throw new CancellationException(); //如果走到这一步说明实现有问题,需要check自己实现的代码。。。 throw new ExecutionException((Throwable)x); } public V get() throws InterruptedException, ExecutionException { //获取当前线程状态 int s = state; //当前线程状态为未执行、正在执行、正在完成状态时,调用get()的外部线程被阻塞(这里指的线程不是执行当前任务的线程) if (s <= COMPLETING) //返回当前线程状态,如果此方法中抛出异常会往上抛出 s = awaitDone(false, 0L); return report(s); } /** * Awaits completion or aborts on interrupt or timeout. * * @param timed true if use timed waits * @param nanos time to wait, if timed * @return state upon completion */ private int awaitDone(boolean timed, long nanos) throws InterruptedException { //FutureTask.get()这里的调用时不带超时属性的 final long deadline = timed ? System.nanoTime() + nanos : 0L; //当前线程会被封装为WaitNode对象 WaitNode q = null; //判断当前线程的 WaitNode有没有在队列中 boolean queued = false; //自旋操作 for (;;) { //被中断的线程进入,被中断的线程线程调用interrupted(),返回true,之后将Thread的中断标记设置为false if (Thread.interrupted()) { //当前线程移出队列操作 removeWaiter(q); throw new InterruptedException(); } //被挂起的线程使用unpark(thread)唤醒或者新生线程走下面逻辑 //获取任务最新状态 int s = state; if (s > COMPLETING) { //已经创建WaitNode对象,q.thread = null;(help gc) if (q != null) q.thread = null; //返回当前状态 return s; } //当前任务正在完成任务中,让当前线程释放1cpu,继续下一次抢占cpu else if (s == COMPLETING) // cannot time out yet Thread.yield(); //自旋第一步:当前线程还未创建WaitNode对象,为其创建 else if (q == null) q = new WaitNode(); //相同线程再次自旋之后,已转变为WaitNode对象,放入队列中 else if (!queued) //通过cas方式设置waiters指向当前线程node,设置成功返回true queued = UNSAFE.compareAndSwapObject(this, waitersOffset, //将当前node节点的next指向等待队列的头节点 //waiters一直指向队列的头节点 q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } //FutureTask.get()调用会走这里,当前get()操作的线程被park挂起,线程状态变为WAITING状态 //等待被中断或者被唤醒,继续自旋 else LockSupport.park(this); } } /** * Tries to unlink a timed-out or interrupted wait node to avoid * accumulating garbage. Internal nodes are simply unspliced * without CAS since it is harmless if they are traversed anyway * by releasers. To avoid effects of unsplicing from already * removed nodes, the list is retraversed in case of an apparent * race. This is slow when there are a lot of nodes, but we don't * expect lists to be long enough to outweigh higher-overhead * schemes. */ //当有很多节点时,这是很慢的,但我们不希望列表足够长,超过更高的开销方案。 private void removeWaiter(WaitNode node) { if (node != null) { //node.thread = null的node会出队 node.thread = null; retry: for (;;) { // restart on removeWaiter race // (pred)前一个结点,(q)当前节点,(s)当前节点下个节点 for (WaitNode pred = null, q = waiters, s; q != null; q = s) { s = q.next; if (q.thread != null) pred = q; //非头节点进入 else if (pred != null) { //【1】-》【2】-》【3】-》【4】 //例如当前节点为3,将【2】-》【4】,【3】出队 pred.next = s; //判断前一个节点是否出队 if (pred.thread == null) // check for race continue retry; } //当前节点为头节点的处理,通过cas指向头节点的下一个节点 else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s)) continue retry; } break; } } } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long stateOffset; private static final long runnerOffset; //waiters一直指向队列的头节点 private static final long waitersOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = FutureTask.class; stateOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("state")); runnerOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("runner")); waitersOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("waiters")); } catch (Exception e) { throw new Error(e); } }
cancel()
public boolean cancel(boolean mayInterruptIfRunning) { //state == NEW表示任务运行中或者处于线程池任务工作队列中 if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; //任务运行中或者处于线程池任务工作队列中且通过CAS设置当前线程状态成功执行下边逻辑 try { // in case call to interrupt throws exception //mayInterruptIfRunning为true执行逻辑 if (mayInterruptIfRunning) { try { Thread t = runner; //当前线程很有可能为null,表明目前还没有线程获取到这个TASK任务 if (t != null) //调用中断 t.interrupt(); } finally { // final state //通过CAS操作将线程状态改为中断完成 UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { //唤醒当前所有阻塞的外部调用get()的线程 finishCompletion(); } return true; } /** * Removes and signals all waiting threads, invokes done(), and * nulls out callable. */ private void finishCompletion() { // assert state > COMPLETING; //q指向waiters节点,waiters表示当前链表头节点 for (WaitNode q; (q = waiters) != null;) { //使用cas设置waiters为null为了防止外部线程的调用cancel()【小概率事件】 if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { //获取当前WaitNode节点中的thread Thread t = q.thread; if (t != null) { q.thread = null; // help gc //唤醒当前节点对应的线程 LockSupport.unpark(t); } //当前节点的下一个节点 WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } //自己可以扩展的方法 done(); callable = null; // to reduce footprint }
小结
当外部线程调用get()之后,当前线程状态为正在执行、未执行或者正在完成的状态时,外部调用get()的线程被阻塞;之后进入一个自旋操作中,为其创建WaitNode对象,并在下一次自选成功之后通过cas操作将队列头节点设置为当前节点node对象,当前节点node对象的下一个节点指向原队列waiters(原队列的头节点),之后会被UNSAFE执行park挂起当前线程,等待被中断或者被唤醒,继续进入自旋。之后当线程被中断之后重新进入自旋操作,此时的线程节点会被执行出队操作。当被挂起的线程被唤醒之后也继续进入自旋操作中,将之前 创建的WaitNode对象中的thread属性设为null(help gc),之后返回当前线程状态。如果当先线程状态为完成中时,会在自旋中释放cpu资源,等待下一次cpu的抢占。
当外部线程调用cancel(),任务正在执行中或者任务在线程池中的工作队列中且通过cas将当前线程状态置为调用者自己传递的参数成功后,会调用线程的interrupt(),在finally 块中通过cas操作将线程状态改为已中断。之后唤醒外部调用get()被阻塞的线程。
外部线程调用Thread.start()或者线程池调度,最后执行到FutureTask.run(),只有当前线程为NEW状态并且通过cas抢占到当前TASK的运行锁之后,执行用户实现的call()