Future解析
- Future是什么?
- 类如如下:
- Future提供了任务检索、任务取消的高层规范
- 表示异步计算的结果
- Future的Get需要产生结果才能获取;否则会阻塞
- FutureTask结构图
- 如下:
- FutureTask是基于Future的基本实现;
- FutureTask提供了启动和取消计算、查询计算是否完成以及检索计算结果的方法
- FutureTask内部支持Callable、Runnable
- FutureTask实现了Runnable,可以交给Executor执行
- FutureTask的内部状态
- NEW:构造方法赋予的初始状态
- COMPLETING:Future计算完成的状态
- NORMAL:Future计算完成并且正确给出结果的状态
- EXCEPTIONAL:Future计算完成但是抛出异常的状态
- CANCELLED:Future中止线程的状态;未中止正在运行的线程
- INTERRUPTING:中间状态
- INTERRUPTED:中止正在运行的线程;强制中断正在运行的线程
- Future的内部状态转换序列
- NEW -> COMPLETING -> NORMAL(正常执行)
- NEW -> COMPLETING -> EXCEPTIONAL(执行时发生异常)
- NEW -> CANCELLED(中断任务,但是不中断正在执行的线程)
- NEW -> INTERRUPTING -> INTERRUPTED(中断任务,且中断正在执行的任务)
FutureTask源码分析
• FutureTask#run() • 执行Thread public void run() { //通过CAS获取runner属性并且设置为当前线程的ThreadID; //保证在计算出结果之前,不会重复计算; if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { //callable在初始化的时候传递进来的 Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { //执行callable的结果通过result字段保存 result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; //保存callable发生错误的情况 setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() //初始化自身状态;在CAS时需要使用runnerOffset; //只有runner为null,才会执行任务 runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) //在后面 handlePossibleCancellationInterrupt(s); } }
- FutureTask#setException() and FutureTask#set()
- 保存Thread执行结果
//Future执行发生异常时 //此时的状态流为NEW -> COMPLETING -> EXCEPTIONAL protected void setException(Throwable t) { //CAS修改状态;保证计算结果的正确性;并且修改状态为COMPLETING if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; //修改状态为EXCEPTIONAL UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } } //Future正常执行 //此时的状态流为NEW -> COMPLETING -> NORMAL protected void set(V v) { //CAS修改状态; if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; //修改状态为NORMAL UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } }
- FutureTask#finishCompletion()
- 清除Get操纵导致阻塞的队列;重置Callable
private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { //清除等待队列 if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; //唤醒等待的线程 LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } //执行done done(); //清除callable callable = null; // to reduce footprint }
- FutureTask#get()
- 返回结果、或者抛出异常
//不包含超时的Get public V get() throws InterruptedException, ExecutionException { int s = state; //如果当前Thread的state没有转化为完成状态 if (s <= COMPLETING) //则等待; s = awaitDone(false, 0L); //返回执行的结果或者异常信息 return report(s); } //包含超时时间的Get public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) throw new NullPointerException(); int s = state; //如果在规定时间内没有做出响应且没有完成计算;则抛出TimeOut异常 if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) throw new TimeoutException(); return report(s); }
- Future#awaitDone()
- 等待线程执行完成;等待的结果不确定
private int awaitDone(boolean timed, long nanos) throws InterruptedException { //计算等待结束时间 final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { //如果线程以及中止 if (Thread.interrupted()) { //将WaitNode q移除等待队列 removeWaiter(q); //抛出异常 throw new InterruptedException(); } int s = state; //如果线程线程以及执行完成,则直接返回结果 if (s > COMPLETING) { if (q != null) q.thread = null; return s; } //如果线程状态即将转化为完成;则让CPU当前执行线程让出CPU else if (s == COMPLETING) // cannot time out yet Thread.yield(); //新建队列节点 else if (q == null) q = new WaitNode(); //将当前节点置于等待队列 else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); //指定线程在nanos时间后被唤醒 else if (timed) { nanos = deadline - System.nanoTime(); //如果时间不合法 if (nanos <= 0L) { //则将超时的等待线程移除等待队列 removeWaiter(q); return state; } //线程等待单位时间后被唤醒 LockSupport.parkNanos(this, nanos); } //线程一直阻塞,直到被其余线程唤醒 else LockSupport.park(this); } }
- FutureTask#cancel(boolean mayInterruptIfRunning)
- 尝试取消此任务的执行
public boolean cancel(boolean mayInterruptIfRunning) { //mayInterruptIfRunning变量声明对于Thread是否执行强制中断 //基于mayInterruptIfRunning变量;通过CAS修改状态 //NEW -> INTERRUPTING或者NEW -> CANCELLED //INTERRUPTING只是中间状态 if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { // in case call to interrupt throws exception //如果允许强制中断 if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null) //中断当前线程 t.interrupt(); } finally { // final state //修改线程状态 //NEW -> INTERRUPTING -> INTERRUPTED UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { finishCompletion(); } return true; }