Future原理解析

本文涉及的产品
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: 介绍了Java多线程中Future类的原理

Callable & Runnable

Java在java.util.concurrent包下提供了两个基础的线程任务接口CallableRunnable,线程池可以执行这这两个接口的实现类。这两个接口的最大的区别就是,Callable允许实现类包含一个返回值,而Runnable接口的返回值类型为void。

现在,我们知道了Runnable接口包含一个返回值,那么如何获取到这个返回值呢?

Future

为了获取到Runnable接口的返回值,java.util.concurrent提供了另外一个类Future,这个类的具体讲解后面再说,我们先看一下下面的这个例子:

import java.util.concurrent.*;

public class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        // 创建一个线程池
        ExecutorService executor = Executors.newSingleThreadExecutor();
        // 执行一个任务
        Future<String> callResult = executor.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                // 线程阻塞1s
                Thread.sleep(1000);
                // 返回具体的结果,Callable与Runnable区别为,Callable接口需要一个返回值
                return "hello world";
            }
        });
        // get方法会阻塞主线程,直到获取到真正的返回值
        String result = callResult.get();
        // 下面这段代码会抛异常java.util.concurrent.TimeoutException
        // String result = callResult.get(500, TimeUnit.MILLISECONDS);
        System.out.println(result);
        // 关闭线程池
        executor.shutdown();
    }
}

上面的例子中,我们看到了Future.get()的用法,即获取Callable接口的返回值。我们通过Future.get()方法获取线程返回值时,最大的特点就是其会阻塞主线程,直到子线程执行完毕将返回值返回,主线程才会继续执行;同时,为了避免线程响应时间过长影响主线程,其还提供了一个重载方法Future.get(long timeout, TimeUnit unit),当子线程执行时间超过了指定的时间后,则抛出java.util.concurrent.TimeoutException异常。

下面,我们通过debug来追寻一下源码,看看Future.get()是如何工作的。

创建Feature

第一步,我们先看一下Callable任务被提交到线程池之后发生了什么?

根据debug调用,我们进入到了以下代码处

package java.util.concurrent;

public abstract class AbstractExecutorService implements ExecutorService {
    // ... 上文代码省略
    public <T> Future<T> submit(Callable<T> task) {
        // 任务判空
        if (task == null) throw new NullPointerException();
        // 将task封装到Feature中
        RunnableFuture<T> ftask = newTaskFor(task);
        // 执行任务
        execute(ftask);
        return ftask;
    }
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }
    // ... 下文代码省略
}

当我们调用线程池的submit方法后,首先对任务进行判空,不为空则将其封装在RunnableFuture中,通过名字可知,这个RunnableFuture就是Feature接口的一个子类或这子接口,再通过第14行的方法可知,我们最终得到的一个对象是FutureTask。当前我们得到的依赖关系如下:

继续向下执行,则到了execute(ftask)这行代码,注意看,代码调用的类是Executors.newSingleThreadExecutor()的返回值,所以其实这行代码就是调用线程池的execute(Runnable command)方法,此方法详解后面写线程池的时候再细看,此处只需要直到其最终目的就是调用Runnable.run()方法即可。

执行任务

根据上面的类图关系,我们大概可以猜出来Runnable的run应该是在FutureTask类中进行过重写了,因为我们传入的是一个Callable接口,但是线程池的execute接口却只支持传入Runnable接口。

事实就是如此,通过看源码,我们得到了重写后的run方法,代码如下:

package java.util.concurrent;

public class FutureTask<V> implements RunnableFuture<V> {
    // ... 上文代码省略
    public void run() {
        // 判断线程状态
        if (state != NEW ||
            !RUNNER.compareAndSet(this, null, Thread.currentThread()))
            return;
        try {
            // 获取到callable任务
            Callable<V> c = callable;
            // callable任务不为空且状态为NEW
            if (c != null && state == NEW) {
                // 定义结果和标记位
                V result;
                boolean ran;
                try {
                    // 执行call方法获得返回值,正常拿到返回值后,将标记位设置为true
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    // 执行call方法抛出异常后,将返回值设置为null,将标记位设置为false
                    result = null;
                    ran = false;
                    setException(ex);
                }
                // 如果标记位为true,则设置返回值
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
    protected void set(V v) {
        // 判断线程状态
        if (STATE.compareAndSet(this, NEW, COMPLETING)) {
            // 设置返回值
            outcome = v;
            // 更新状态
            STATE.setRelease(this, NORMAL); // final state
            finishCompletion();
        }
    }
    protected void setException(Throwable t) {
        // 判断线程状态
        if (STATE.compareAndSet(this, NEW, COMPLETING)) {
            // 设置返回值
            outcome = t;
            // 更新状态
            STATE.setRelease(this, EXCEPTIONAL); // final state
            finishCompletion();
        }
    }
    // ... 下文代码省略
}

上面的代码就是FutureTask类对run()方法的重写,包括其代码里面的两个重要调用。通过源码可知,其实run()方法内部会根据线程正常结束和异常结束分成两部分,每部分分别设置了返回值,其中正常线程正常结束时,返回值设置为真正的结果,且状态更新为NORMAL;线程执行异常时,返回值设置为异常,且状态更新为EXCEPTIONAL。

上面这一步只是任务执行的调用处,线程执行结果的返回值及异常处理,我们还需要看Future.get()方法

获取结果

package java.util.concurrent;

public class FutureTask<V> implements RunnableFuture<V> {
    // ... 上文代码省略
    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        // 判断状态
        if (s <= COMPLETING)
            // 等待执行结果
            s = awaitDone(false, 0L);
        // 返回结果
        return report(s);
    }
    private V report(int s) throws ExecutionException {
        Object x = outcome;
        // 根据状态判断是返回结果还是抛出异常
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }
    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        // The code below is very delicate, to achieve these goals:
        // - call nanoTime exactly once for each call to park
        // - if nanos <= 0L, return promptly without allocation or nanoTime
        // - if nanos == Long.MIN_VALUE, don't underflow
        // - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic
        //   and we suffer a spurious wakeup, we will do no worse than
        //   to park-spin for a while
        long startTime = 0L;    // Special value 0L means not yet parked
        WaitNode q = null;
        boolean queued = false;
        // 注意,这里是一个死循环,只有满足某些特定条件才能结束该循环
        for (;;) {
            int s = state;
            // 判断是否已经完成该任务
            if (s > COMPLETING) {
                // 手动清理q所指向的线程
                if (q != null)
                    q.thread = null;
                // 返回结果
                return s;
            }
            // 如果正在赋值,则释放该线程的CPU资源
            else if (s == COMPLETING)
                // We may have already promised (via isDone) that we are done
                // so never return empty-handed or throw InterruptedException
                Thread.yield();
            // 如果线程被中断,则移除该线程任务,并抛出异常
            else if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }
            // 如果q为空
            else if (q == null) {
                // 如果设置了超时时间,且已经超过了该超时时间,则直接返回该当前状态
                if (timed && nanos <= 0L)
                    return s;
                // 如果没有超时,则初始化q
                q = new WaitNode();
            }
            // 如果没有在队列中,则将该任务放到此队列中,注意,该队列为一个链表
            else if (!queued)
                queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q);
            // 下面时判断超时的操作
            else if (timed) {
                final long parkNanos;
                // 初始化开始时间
                if (startTime == 0L) { // first time
                    startTime = System.nanoTime();
                    if (startTime == 0L)
                        startTime = 1L;
                    parkNanos = nanos;
                } else {
                    // 判断是否已经超时
                    long elapsed = System.nanoTime() - startTime;
                    if (elapsed >= nanos) {
                        removeWaiter(q);
                        return state;
                    }
                    parkNanos = nanos - elapsed;
                }
                // nanoTime may be slow; recheck before parking
                // 挂起线程,即阻塞主线程
                if (state < COMPLETING)
                    LockSupport.parkNanos(this, parkNanos);
            }
            // 挂起线程,即阻塞主线程
            else
                LockSupport.park(this);
        }
    }
    // ... 下文代码省略
}

上面就是Future.get()方法的源码,通过report()方法,我们很好理解,其异常处理就是根据state的当前值进行判断,如果这个状态是NORMAL,那么久返回正常的返回值,否则就抛出异常。

那么为什么get()方法会阻塞主线程呢?核心代码awateDone()方法源码中。

为了看懂awateDone的源码,那就必须要了解一下FutureTask里的一个状态流转,下面是注释的原文

The run state of this task, initially NEW. The run state transitions to a terminal state only in methods set, setException, and cancel. During completion, state may take on transient values of COMPLETING (while outcome is being set) or INTERRUPTING (only while interrupting the runner to satisfy a cancel(true)). Transitions from these intermediate to final states use cheaper ordered/lazy writes because values are unique and cannot be further modified. Possible state transitions: NEW -> COMPLETING -> NORMAL NEW -> COMPLETING -> EXCEPTIONAL NEW -> CANCELLED NEW -> INTERRUPTING -> INTERRUPTED

其实翻译过来就是,FutureTask里的状态一共包含以下几个

  • NEW(0):初始化状态,只有在执行构建方法时才会设置为此状态
  • COMPLETING(1):赋值过程中状态,只有执行set()/setException()方法时,会设置为此状态,以表示该任务的子线程已经执行完成并拿到了返回结果,正在执行赋值操作
  • NORMAL(2):正常状态,正常赋值完成后的状态,只有在执行set()方法时会设置为此状态
  • EXCEPTIONAL(3):异常状态,当线程执行异常时的状态,只有在执行setException()方法时,会设置为此状态,表示子线程执行过程中出现了异常
  • CANCELLED(4)、INTERRUPTING(5)、INTERRUPTED(6):三个状态分别表示已取消,正在中断,已中断,由于上文应用不多,此处不做过多解读。

现在我们再来看上面的代码以及注释,我们就可以理解FutureTask是如何做到阻塞主线程,并等待子线程取得返回值后再继续执行主线程操作了,大概操作如下:

  1. 初始化等待任务节点
  2. 将等待任务节点添加到队列中
  3. 挂起主线程
  4. 如果子线程已经完成,则释放其CPU资源
  5. 如果子线程已经完成且结果已经赋值成功,则返回结果
  6. 过程中如果线程中断或者超时,则抛出异常
相关文章
|
2月前
|
存储 缓存 算法
HashMap深度解析:从原理到实战
HashMap,作为Java集合框架中的一个核心组件,以其高效的键值对存储和检索机制,在软件开发中扮演着举足轻重的角色。作为一名资深的AI工程师,深入理解HashMap的原理、历史、业务场景以及实战应用,对于提升数据处理和算法实现的效率至关重要。本文将通过手绘结构图、流程图,结合Java代码示例,全方位解析HashMap,帮助读者从理论到实践全面掌握这一关键技术。
113 14
|
3月前
|
运维 持续交付 云计算
深入解析云计算中的微服务架构:原理、优势与实践
深入解析云计算中的微服务架构:原理、优势与实践
134 3
|
6天前
|
机器学习/深度学习 算法 数据挖掘
解析静态代理IP改善游戏体验的原理
静态代理IP通过提高网络稳定性和降低延迟,优化游戏体验。具体表现在加快游戏网络速度、实时玩家数据分析、优化游戏设计、简化更新流程、维护网络稳定性、提高连接可靠性、支持地区特性及提升访问速度等方面,确保更流畅、高效的游戏体验。
52 22
解析静态代理IP改善游戏体验的原理
|
3天前
|
编解码 缓存 Prometheus
「ximagine」业余爱好者的非专业显示器测试流程规范,同时也是本账号输出内容的数据来源!如何测试显示器?荒岛整理总结出多种测试方法和注意事项,以及粗浅的原理解析!
本期内容为「ximagine」频道《显示器测试流程》的规范及标准,我们主要使用Calman、DisplayCAL、i1Profiler等软件及CA410、Spyder X、i1Pro 2等设备,是我们目前制作内容数据的重要来源,我们深知所做的仍是比较表面的活儿,和工程师、科研人员相比有着不小的差距,测试并不复杂,但是相当繁琐,收集整理测试无不花费大量时间精力,内容不完善或者有错误的地方,希望大佬指出我们好改进!
46 16
「ximagine」业余爱好者的非专业显示器测试流程规范,同时也是本账号输出内容的数据来源!如何测试显示器?荒岛整理总结出多种测试方法和注意事项,以及粗浅的原理解析!
|
4月前
|
存储 算法 Java
解析HashSet的工作原理,揭示Set如何利用哈希算法和equals()方法确保元素唯一性,并通过示例代码展示了其“无重复”特性的具体应用
在Java中,Set接口以其独特的“无重复”特性脱颖而出。本文通过解析HashSet的工作原理,揭示Set如何利用哈希算法和equals()方法确保元素唯一性,并通过示例代码展示了其“无重复”特性的具体应用。
85 3
|
1月前
|
机器学习/深度学习 自然语言处理 搜索推荐
自注意力机制全解析:从原理到计算细节,一文尽览!
自注意力机制(Self-Attention)最早可追溯至20世纪70年代的神经网络研究,但直到2017年Google Brain团队提出Transformer架构后才广泛应用于深度学习。它通过计算序列内部元素间的相关性,捕捉复杂依赖关系,并支持并行化训练,显著提升了处理长文本和序列数据的能力。相比传统的RNN、LSTM和GRU,自注意力机制在自然语言处理(NLP)、计算机视觉、语音识别及推荐系统等领域展现出卓越性能。其核心步骤包括生成查询(Q)、键(K)和值(V)向量,计算缩放点积注意力得分,应用Softmax归一化,以及加权求和生成输出。自注意力机制提高了模型的表达能力,带来了更精准的服务。
|
2月前
|
存储 物联网 大数据
探索阿里云 Flink 物化表:原理、优势与应用场景全解析
阿里云Flink的物化表是流批一体化平台中的关键特性,支持低延迟实时更新、灵活查询性能、无缝流批处理和高容错性。它广泛应用于电商、物联网和金融等领域,助力企业高效处理实时数据,提升业务决策能力。实践案例表明,物化表显著提高了交易欺诈损失率的控制和信贷审批效率,推动企业在数字化转型中取得竞争优势。
120 16
|
2月前
|
网络协议 安全 网络安全
探索网络模型与协议:从OSI到HTTPs的原理解析
OSI七层网络模型和TCP/IP四层模型是理解和设计计算机网络的框架。OSI模型包括物理层、数据链路层、网络层、传输层、会话层、表示层和应用层,而TCP/IP模型则简化为链路层、网络层、传输层和 HTTPS协议基于HTTP并通过TLS/SSL加密数据,确保安全传输。其连接过程涉及TCP三次握手、SSL证书验证、对称密钥交换等步骤,以保障通信的安全性和完整性。数字信封技术使用非对称加密和数字证书确保数据的机密性和身份认证。 浏览器通过Https访问网站的过程包括输入网址、DNS解析、建立TCP连接、发送HTTPS请求、接收响应、验证证书和解析网页内容等步骤,确保用户与服务器之间的安全通信。
172 3
|
3月前
|
JavaScript 前端开发 API
Vue.js响应式原理深度解析:从Vue 2到Vue 3的演进
Vue.js响应式原理深度解析:从Vue 2到Vue 3的演进
117 17
|
3月前
|
运维 持续交付 虚拟化
深入解析Docker容器化技术的核心原理
深入解析Docker容器化技术的核心原理
83 1

热门文章

最新文章

推荐镜像

更多