Future

简介: Future

Future解析

  • Future是什么?
  • 类如如下:
  • Future提供了任务检索、任务取消的高层规范
  • 表示异步计算的结果
  • Future的Get需要产生结果才能获取;否则会阻塞
  • FutureTask结构图
  • 如下:
  • FutureTask是基于Future的基本实现;
  • FutureTask提供了启动和取消计算、查询计算是否完成以及检索计算结果的方法
  • FutureTask内部支持Callable、Runnable
  • FutureTask实现了Runnable,可以交给Executor执行
  • FutureTask的内部状态
  • NEW:构造方法赋予的初始状态
  • COMPLETING:Future计算完成的状态
  • NORMAL:Future计算完成并且正确给出结果的状态
  • EXCEPTIONAL:Future计算完成但是抛出异常的状态
  • CANCELLED:Future中止线程的状态;未中止正在运行的线程
  • INTERRUPTING:中间状态
  • INTERRUPTED:中止正在运行的线程;强制中断正在运行的线程
  • Future的内部状态转换序列
  • NEW -> COMPLETING -> NORMAL(正常执行)
  • NEW -> COMPLETING -> EXCEPTIONAL(执行时发生异常)
  • NEW -> CANCELLED(中断任务,但是不中断正在执行的线程)
  • NEW -> INTERRUPTING -> INTERRUPTED(中断任务,且中断正在执行的任务)

FutureTask源码分析

• FutureTask#run()
• 执行Thread
    public void run() {
    //通过CAS获取runner属性并且设置为当前线程的ThreadID;
    //保证在计算出结果之前,不会重复计算;
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
        //callable在初始化的时候传递进来的
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                //执行callable的结果通过result字段保存
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    //保存callable发生错误的情况
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            //初始化自身状态;在CAS时需要使用runnerOffset;
            //只有runner为null,才会执行任务
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                //在后面
                handlePossibleCancellationInterrupt(s);
        }
    }
  • FutureTask#setException() and FutureTask#set()
  • 保存Thread执行结果
//Future执行发生异常时
    //此时的状态流为NEW -> COMPLETING -> EXCEPTIONAL
    protected void setException(Throwable t) {
    //CAS修改状态;保证计算结果的正确性;并且修改状态为COMPLETING
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = t;
            //修改状态为EXCEPTIONAL
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
            finishCompletion();
        }
    }
    //Future正常执行
    //此时的状态流为NEW -> COMPLETING -> NORMAL
    protected void set(V v) {
    //CAS修改状态;
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            //修改状态为NORMAL
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }
  • FutureTask#finishCompletion()
  • 清除Get操纵导致阻塞的队列;重置Callable
private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
        //清除等待队列
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        //唤醒等待的线程
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }
        //执行done
        done();
        //清除callable
        callable = null;        // to reduce footprint
    }
  • FutureTask#get()
  • 返回结果、或者抛出异常    
//不包含超时的Get
    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        //如果当前Thread的state没有转化为完成状态
        if (s <= COMPLETING)
        //则等待;
            s = awaitDone(false, 0L);
        //返回执行的结果或者异常信息
return report(s);
    }
    //包含超时时间的Get
    public V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    if (unit == null)
        throw new NullPointerException();
    int s = state;
    //如果在规定时间内没有做出响应且没有完成计算;则抛出TimeOut异常
    if (s <= COMPLETING &&
        (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
        throw new TimeoutException();
    return report(s);
}
  • Future#awaitDone()
  • 等待线程执行完成;等待的结果不确定
private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        //计算等待结束时间
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            //如果线程以及中止
            if (Thread.interrupted()) {
                //将WaitNode q移除等待队列
                removeWaiter(q);
                //抛出异常
                throw new InterruptedException();
            }
            int s = state;
            //如果线程线程以及执行完成,则直接返回结果
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            //如果线程状态即将转化为完成;则让CPU当前执行线程让出CPU
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            //新建队列节点    
            else if (q == null)
                q = new WaitNode();
            //将当前节点置于等待队列    
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            //指定线程在nanos时间后被唤醒                                         
            else if (timed) {
                nanos = deadline - System.nanoTime();
                //如果时间不合法
                if (nanos <= 0L) {
                    //则将超时的等待线程移除等待队列
                    removeWaiter(q);
                    return state;
                }
                //线程等待单位时间后被唤醒
                LockSupport.parkNanos(this, nanos);
            }
            //线程一直阻塞,直到被其余线程唤醒
            else
                LockSupport.park(this);
        }
    }
  • FutureTask#cancel(boolean mayInterruptIfRunning)
  • 尝试取消此任务的执行

public boolean cancel(boolean mayInterruptIfRunning) {
    //mayInterruptIfRunning变量声明对于Thread是否执行强制中断
    //基于mayInterruptIfRunning变量;通过CAS修改状态
    //NEW -> INTERRUPTING或者NEW -> CANCELLED
    //INTERRUPTING只是中间状态
        if (!(state == NEW &&
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {    // in case call to interrupt throws exception
            //如果允许强制中断
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    if (t != null)
                        //中断当前线程
                        t.interrupt();
                } finally { // final state
                    //修改线程状态
                    //NEW -> INTERRUPTING -> INTERRUPTED
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
            finishCompletion();
        }
        return true;
    }


目录
相关文章
|
3月前
|
并行计算 Java 大数据
Callable和Future
Callable和Future
|
2月前
|
并行计算 Java API
|
1月前
|
前端开发 Java API
解密 asyncio 的 Future 和 Task
解密 asyncio 的 Future 和 Task
58 2
|
6月前
|
Java
Future:异步任务结果获取
Future:异步任务结果获取
69 0
|
6月前
|
C++
C++Future简单的使用
C++Future简单的使用
53 0
|
消息中间件 Java 中间件
Future and CompletableFuture
Future代表异步执行的结果,也就是说异步执行完毕后,结果保存在Future里, 我们在使用线程池submit()时需要传入Callable接口,线程池的返回值为一个Future,而Future则保存了执行的结果 ,可通过Future的get()方法取出结果,如果线程池使用的是execute(),则传入的是Runnable接口 无返回值。
71 0
|
Java API
Java并发编程-Future系列之Future的介绍和基本用法
Java并发编程-Future系列之Future的介绍和基本用法
189 0
Java并发编程-Future系列之Future的介绍和基本用法
|
Java
Future和Callable学习
通常使用线程池+Runnable的时候,会发现Runnable不能返回值,也就执行的结果情况,同时对于出现异常,我们获取异常信息,进行相应的处理。如果需要返回结果,同时需要进一步加工的时候,就可以考虑使用Future+Callable了。同时接口Future的默认实现是FutureTask,因此对于其实现get()方法,会有一个问题,就是如果前面的任务一旦执行的时间耗时较长的时候,就会出现一直阻塞的状态,此时就会出现排队等待的状态,大大影响其性能。适用场景:当一个线程需要等待另一个线程把某个任务执行完成后它才能继续执行,此时可以使用FutureTask。因为FutureTask基于AQS实现,
94 0
Future和Callable学习
|
存储
多线程 - Callable、Future 和 FutureTask 简单应用(二)
多线程 - Callable、Future 和 FutureTask 简单应用(二)
119 0
多线程 - Callable、Future 和 FutureTask 简单应用(二)
Future & CompleteFuture 实践总结
Future & CompleteFuture 实践总结