JUC之FutureTask源码深度剖析 ✨ 每日积累

简介: JUC之FutureTask源码深度剖析 ✨ 每日积累

API解释如下


可取消的异步计算。该类提供了一个Future的基本实现 ,具有启动和取消计算的方法,查询计算是否完整,并检索计算结果。结果只能在计算完成后才能检索;如果计算尚未完成,则get方法将阻止。一旦计算完成,则无法重新启动或取消计算(除非使用runAndReset()调用计算 )。A FutureTask可用于包装Callable或Runnable对象。 因为FutureTask实现Runnable ,一个FutureTask可以提交到一个Executor执行。 除了作为独立类之外,此类还提供了protected功能,在创建自定义任务类时可能很有用。

boolean

cancel(boolean mayInterruptIfRunning) 尝试取消执行此任务。

protected void

done() 此任务转换到状态 isDone (无论是正常还是通过取消)调用的受保护方法。

V

get() 等待计算完成,然后检索其结果。

V

get(long timeout, TimeUnit unit) 如果需要等待最多在给定的时间计算完成,然后检索其结果(如果可用)。

boolean

isCancelled() 如果此任务在正常完成之前取消,则返回 true 。

boolean

isDone() 返回 true如果任务已完成。

void

run() 将此未来设置为其计算结果,除非已被取消。

protected boolean

runAndReset() 执行计算而不设置其结果,然后将此将来重置为初始状态,如果计算遇到异常或被取消,则不执行此操作。

protected void

set(V v) 将此未来的结果设置为给定值,除非此未来已被设置或已被取消。

protected void

setException(Throwable t) 导致这个未来报告一个ExecutionException与给定的可抛弃的原因,除非这个未来已经被设置或被取消。

FutureTask状态流转图


image.png

FetureTask的get()和cancel()的执行示意图



1.png

代码示例

import com.sun.org.apache.xpath.internal.operations.String;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
public class FutureTaskTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask futureTask = new FutureTask<>(() -> {
            System.out.println("-----");
            return 1024;
        });
        new Thread(futureTask).start();
        Object o = futureTask.get();
    }
}

(FutureTask.java)中的方法


run()

//Thread.start()最后会执行的地方
  public void run() {
        //非NEW状态线程或者通过CAS设置当前线程运行当前任务失败(表示被其他线程抢占了当前任务)直接返回
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        //执行到这里,当前线程为NEW状态,且抢占TASK任务成功,进入任务执行的流程
        try {
            //自己封装的Callable或者装饰后的Runnable
            Callable<V> c = callable;
            //防止空指针   或者  当前线程被中断的情况(小概率)
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    //调用我们实现的call方法
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }


get()

  /**
     * The run state of this task, initially NEW.  The run state
     * transitions to a terminal state only in methods set,
     * setException, and cancel.  During completion, state may take on
     * transient values of COMPLETING (while outcome is being set) or
     * INTERRUPTING (only while interrupting the runner to satisfy a
     * cancel(true)). Transitions from these intermediate to final
     * states use cheaper ordered/lazy writes because values are unique
     * and cannot be further modified.
     *
     * Possible state transitions:
     * NEW -> COMPLETING -> NORMAL
     * NEW -> COMPLETING -> EXCEPTIONAL
     * NEW -> CANCELLED
     * NEW -> INTERRUPTING -> INTERRUPTED
     */
    private volatile int state;
    /**
    *线程状态几种类型
    */
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;
    /** The underlying callable; nulled out after running */
  //Callable或者Runnable使用装饰者模式伪装成Callable
    private Callable<V> callable;
    /** The result to return or exception to throw from get() */
    private Object outcome; // non-volatile, protected by state reads/writes
    /** The thread running the callable; CASed during run() */
  //执行线程的引用
    private volatile Thread runner;
    /** Treiber stack of waiting threads */
    private volatile WaitNode waiters;
    /**
     * Returns result or throws exception for completed task.
     *
     * @param s completed state value
     */
    @SuppressWarnings("unchecked")
    private V report(int s) throws ExecutionException {
        //正常执行结束,outcome保存执行的结果
        //非正常outcome结束保存抛出的异常
        Object x = outcome;
        if (s == NORMAL)
            //返回运算结果
            return (V)x;
        //调用cancel()被取消的状态
        if (s >= CANCELLED)
            throw new CancellationException();
        //如果走到这一步说明实现有问题,需要check自己实现的代码。。。
        throw new ExecutionException((Throwable)x);
    }
    public V get() throws InterruptedException, ExecutionException {
        //获取当前线程状态
        int s = state;
        //当前线程状态为未执行、正在执行、正在完成状态时,调用get()的外部线程被阻塞(这里指的线程不是执行当前任务的线程)
        if (s <= COMPLETING)
            //返回当前线程状态,如果此方法中抛出异常会往上抛出
            s = awaitDone(false, 0L);
        return report(s);
    }
  /**
     * Awaits completion or aborts on interrupt or timeout.
     *
     * @param timed true if use timed waits
     * @param nanos time to wait, if timed
     * @return state upon completion
     */
    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        //FutureTask.get()这里的调用时不带超时属性的
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        //当前线程会被封装为WaitNode对象
        WaitNode q = null;
        //判断当前线程的 WaitNode有没有在队列中   
        boolean queued = false;
        //自旋操作
        for (;;) {
            //被中断的线程进入,被中断的线程线程调用interrupted(),返回true,之后将Thread的中断标记设置为false
            if (Thread.interrupted()) {
                //当前线程移出队列操作
                removeWaiter(q);
                throw new InterruptedException();
            }
            //被挂起的线程使用unpark(thread)唤醒或者新生线程走下面逻辑
            //获取任务最新状态
            int s = state;
            if (s > COMPLETING) {
                //已经创建WaitNode对象,q.thread = null;(help gc)
                if (q != null)
                    q.thread = null;
                //返回当前状态
                return s;
            }
            //当前任务正在完成任务中,让当前线程释放1cpu,继续下一次抢占cpu 
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            //自旋第一步:当前线程还未创建WaitNode对象,为其创建
            else if (q == null)
                q = new WaitNode();
            //相同线程再次自旋之后,已转变为WaitNode对象,放入队列中
            else if (!queued)
                //通过cas方式设置waiters指向当前线程node,设置成功返回true
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                    //将当前node节点的next指向等待队列的头节点
                                                    //waiters一直指向队列的头节点
                                                    q.next = waiters, q);
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            //FutureTask.get()调用会走这里,当前get()操作的线程被park挂起,线程状态变为WAITING状态
            //等待被中断或者被唤醒,继续自旋
            else
                LockSupport.park(this);
        }
    }
    /**
     * Tries to unlink a timed-out or interrupted wait node to avoid
     * accumulating garbage.  Internal nodes are simply unspliced
     * without CAS since it is harmless if they are traversed anyway
     * by releasers.  To avoid effects of unsplicing from already
     * removed nodes, the list is retraversed in case of an apparent
     * race.  This is slow when there are a lot of nodes, but we don't
     * expect lists to be long enough to outweigh higher-overhead
     * schemes.
     */
  //当有很多节点时,这是很慢的,但我们不希望列表足够长,超过更高的开销方案。
    private void removeWaiter(WaitNode node) {
        if (node != null) {
            //node.thread = null的node会出队
            node.thread = null;
            retry:
            for (;;) {          // restart on removeWaiter race
                // (pred)前一个结点,(q)当前节点,(s)当前节点下个节点 
                for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                    s = q.next;
                    if (q.thread != null)
                        pred = q;
                    //非头节点进入
                    else if (pred != null) {
            //【1】-》【2】-》【3】-》【4】
                        //例如当前节点为3,将【2】-》【4】,【3】出队
                        pred.next = s;
                        //判断前一个节点是否出队
                        if (pred.thread == null) // check for race
                            continue retry;
                    }
                    //当前节点为头节点的处理,通过cas指向头节点的下一个节点
                    else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                          q, s))
                        continue retry;
                }
                break;
            }
        }
    }
    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long stateOffset;
    private static final long runnerOffset;
    //waiters一直指向队列的头节点
    private static final long waitersOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = FutureTask.class;
            stateOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("state"));
            runnerOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("runner"));
            waitersOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("waiters"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }

cancel()

public boolean cancel(boolean mayInterruptIfRunning) {
        //state == NEW表示任务运行中或者处于线程池任务工作队列中
        if (!(state == NEW &&
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        //任务运行中或者处于线程池任务工作队列中且通过CAS设置当前线程状态成功执行下边逻辑
        try {    // in case call to interrupt throws exception
            //mayInterruptIfRunning为true执行逻辑
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    //当前线程很有可能为null,表明目前还没有线程获取到这个TASK任务
                    if (t != null)
                        //调用中断
                        t.interrupt();
                } finally { // final state
                    //通过CAS操作将线程状态改为中断完成
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
            //唤醒当前所有阻塞的外部调用get()的线程
            finishCompletion();
        }
        return true;
    }
    /**
     * Removes and signals all waiting threads, invokes done(), and
     * nulls out callable.
     */
    private void finishCompletion() {
        // assert state > COMPLETING;
        //q指向waiters节点,waiters表示当前链表头节点
        for (WaitNode q; (q = waiters) != null;) {
            //使用cas设置waiters为null为了防止外部线程的调用cancel()【小概率事件】
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    //获取当前WaitNode节点中的thread
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null; // help gc
                        //唤醒当前节点对应的线程
                        LockSupport.unpark(t);
                    }
                    //当前节点的下一个节点
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }
        //自己可以扩展的方法
        done();
        callable = null;        // to reduce footprint
    }

小结

当外部线程调用get()之后,当前线程状态为正在执行、未执行或者正在完成的状态时,外部调用get()的线程被阻塞;之后进入一个自旋操作中,为其创建WaitNode对象,并在下一次自选成功之后通过cas操作将队列头节点设置为当前节点node对象,当前节点node对象的下一个节点指向原队列waiters(原队列的头节点),之后会被UNSAFE执行park挂起当前线程,等待被中断或者被唤醒,继续进入自旋。之后当线程被中断之后重新进入自旋操作,此时的线程节点会被执行出队操作。当被挂起的线程被唤醒之后也继续进入自旋操作中,将之前 创建的WaitNode对象中的thread属性设为null(help gc),之后返回当前线程状态。如果当先线程状态为完成中时,会在自旋中释放cpu资源,等待下一次cpu的抢占。


当外部线程调用cancel(),任务正在执行中或者任务在线程池中的工作队列中且通过cas将当前线程状态置为调用者自己传递的参数成功后,会调用线程的interrupt(),在finally 块中通过cas操作将线程状态改为已中断。之后唤醒外部调用get()被阻塞的线程。


外部线程调用Thread.start()或者线程池调度,最后执行到FutureTask.run(),只有当前线程为NEW状态并且通过cas抢占到当前TASK的运行锁之后,执行用户实现的call()
相关文章
|
8月前
|
NoSQL 前端开发 Java
剑指JUC原理-20.并发编程实践(中)
剑指JUC原理-20.并发编程实践
75 0
|
8月前
|
消息中间件 存储 Java
剑指JUC原理-20.并发编程实践(下)
剑指JUC原理-20.并发编程实践
69 0
|
设计模式 监控 安全
JUC第一讲:Java并发知识体系详解 + 面试题汇总(P6熟练 P7精通)
JUC第一讲:Java并发知识体系详解 + 面试题汇总(P6熟练 P7精通)
2230 0
|
8月前
|
安全 算法 Java
Java并发编程的探索与实践
【5月更文挑战第25天】随着多核处理器的普及,并发编程变得越来越重要。Java语言提供了丰富的并发编程工具,本文将介绍Java并发编程的基本概念、原理以及实践经验,帮助读者更好地理解和应用Java并发编程。
|
8月前
|
设计模式 搜索推荐 Java
面试官不按套路出牌,上来就让聊一聊Java中的迭代器(Iterator ),夺命连环问,怎么办?
面试官不按套路出牌,上来就让聊一聊Java中的迭代器(Iterator ),夺命连环问,怎么办?
58 0
|
8月前
|
消息中间件 canal Java
剑指JUC原理-20.并发编程实践(上)
剑指JUC原理-20.并发编程实践
73 0
|
8月前
|
存储 缓存 安全
剑指JUC原理-11.不可变设计
剑指JUC原理-11.不可变设计
40 0
|
8月前
|
Java
剑指JUC原理-12.手写简易版线程池思路
剑指JUC原理-12.手写简易版线程池思路
48 0
|
Java C++
说一下 synchronized 底层实现原理?(高薪常问)
说一下 synchronized 底层实现原理?(高薪常问)
119 2
|
容灾 Java API
技术汇总:第二章:JUC
技术汇总:第二章:JUC