Future原理解析

本文涉及的产品
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: 介绍了Java多线程中Future类的原理

Callable & Runnable

Java在java.util.concurrent包下提供了两个基础的线程任务接口CallableRunnable,线程池可以执行这这两个接口的实现类。这两个接口的最大的区别就是,Callable允许实现类包含一个返回值,而Runnable接口的返回值类型为void。

现在,我们知道了Runnable接口包含一个返回值,那么如何获取到这个返回值呢?

Future

为了获取到Runnable接口的返回值,java.util.concurrent提供了另外一个类Future,这个类的具体讲解后面再说,我们先看一下下面的这个例子:

import java.util.concurrent.*;

public class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        // 创建一个线程池
        ExecutorService executor = Executors.newSingleThreadExecutor();
        // 执行一个任务
        Future<String> callResult = executor.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                // 线程阻塞1s
                Thread.sleep(1000);
                // 返回具体的结果,Callable与Runnable区别为,Callable接口需要一个返回值
                return "hello world";
            }
        });
        // get方法会阻塞主线程,直到获取到真正的返回值
        String result = callResult.get();
        // 下面这段代码会抛异常java.util.concurrent.TimeoutException
        // String result = callResult.get(500, TimeUnit.MILLISECONDS);
        System.out.println(result);
        // 关闭线程池
        executor.shutdown();
    }
}

上面的例子中,我们看到了Future.get()的用法,即获取Callable接口的返回值。我们通过Future.get()方法获取线程返回值时,最大的特点就是其会阻塞主线程,直到子线程执行完毕将返回值返回,主线程才会继续执行;同时,为了避免线程响应时间过长影响主线程,其还提供了一个重载方法Future.get(long timeout, TimeUnit unit),当子线程执行时间超过了指定的时间后,则抛出java.util.concurrent.TimeoutException异常。

下面,我们通过debug来追寻一下源码,看看Future.get()是如何工作的。

创建Feature

第一步,我们先看一下Callable任务被提交到线程池之后发生了什么?

根据debug调用,我们进入到了以下代码处

package java.util.concurrent;

public abstract class AbstractExecutorService implements ExecutorService {
    // ... 上文代码省略
    public <T> Future<T> submit(Callable<T> task) {
        // 任务判空
        if (task == null) throw new NullPointerException();
        // 将task封装到Feature中
        RunnableFuture<T> ftask = newTaskFor(task);
        // 执行任务
        execute(ftask);
        return ftask;
    }
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }
    // ... 下文代码省略
}

当我们调用线程池的submit方法后,首先对任务进行判空,不为空则将其封装在RunnableFuture中,通过名字可知,这个RunnableFuture就是Feature接口的一个子类或这子接口,再通过第14行的方法可知,我们最终得到的一个对象是FutureTask。当前我们得到的依赖关系如下:

继续向下执行,则到了execute(ftask)这行代码,注意看,代码调用的类是Executors.newSingleThreadExecutor()的返回值,所以其实这行代码就是调用线程池的execute(Runnable command)方法,此方法详解后面写线程池的时候再细看,此处只需要直到其最终目的就是调用Runnable.run()方法即可。

执行任务

根据上面的类图关系,我们大概可以猜出来Runnable的run应该是在FutureTask类中进行过重写了,因为我们传入的是一个Callable接口,但是线程池的execute接口却只支持传入Runnable接口。

事实就是如此,通过看源码,我们得到了重写后的run方法,代码如下:

package java.util.concurrent;

public class FutureTask<V> implements RunnableFuture<V> {
    // ... 上文代码省略
    public void run() {
        // 判断线程状态
        if (state != NEW ||
            !RUNNER.compareAndSet(this, null, Thread.currentThread()))
            return;
        try {
            // 获取到callable任务
            Callable<V> c = callable;
            // callable任务不为空且状态为NEW
            if (c != null && state == NEW) {
                // 定义结果和标记位
                V result;
                boolean ran;
                try {
                    // 执行call方法获得返回值,正常拿到返回值后,将标记位设置为true
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    // 执行call方法抛出异常后,将返回值设置为null,将标记位设置为false
                    result = null;
                    ran = false;
                    setException(ex);
                }
                // 如果标记位为true,则设置返回值
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
    protected void set(V v) {
        // 判断线程状态
        if (STATE.compareAndSet(this, NEW, COMPLETING)) {
            // 设置返回值
            outcome = v;
            // 更新状态
            STATE.setRelease(this, NORMAL); // final state
            finishCompletion();
        }
    }
    protected void setException(Throwable t) {
        // 判断线程状态
        if (STATE.compareAndSet(this, NEW, COMPLETING)) {
            // 设置返回值
            outcome = t;
            // 更新状态
            STATE.setRelease(this, EXCEPTIONAL); // final state
            finishCompletion();
        }
    }
    // ... 下文代码省略
}

上面的代码就是FutureTask类对run()方法的重写,包括其代码里面的两个重要调用。通过源码可知,其实run()方法内部会根据线程正常结束和异常结束分成两部分,每部分分别设置了返回值,其中正常线程正常结束时,返回值设置为真正的结果,且状态更新为NORMAL;线程执行异常时,返回值设置为异常,且状态更新为EXCEPTIONAL。

上面这一步只是任务执行的调用处,线程执行结果的返回值及异常处理,我们还需要看Future.get()方法

获取结果

package java.util.concurrent;

public class FutureTask<V> implements RunnableFuture<V> {
    // ... 上文代码省略
    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        // 判断状态
        if (s <= COMPLETING)
            // 等待执行结果
            s = awaitDone(false, 0L);
        // 返回结果
        return report(s);
    }
    private V report(int s) throws ExecutionException {
        Object x = outcome;
        // 根据状态判断是返回结果还是抛出异常
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }
    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        // The code below is very delicate, to achieve these goals:
        // - call nanoTime exactly once for each call to park
        // - if nanos <= 0L, return promptly without allocation or nanoTime
        // - if nanos == Long.MIN_VALUE, don't underflow
        // - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic
        //   and we suffer a spurious wakeup, we will do no worse than
        //   to park-spin for a while
        long startTime = 0L;    // Special value 0L means not yet parked
        WaitNode q = null;
        boolean queued = false;
        // 注意,这里是一个死循环,只有满足某些特定条件才能结束该循环
        for (;;) {
            int s = state;
            // 判断是否已经完成该任务
            if (s > COMPLETING) {
                // 手动清理q所指向的线程
                if (q != null)
                    q.thread = null;
                // 返回结果
                return s;
            }
            // 如果正在赋值,则释放该线程的CPU资源
            else if (s == COMPLETING)
                // We may have already promised (via isDone) that we are done
                // so never return empty-handed or throw InterruptedException
                Thread.yield();
            // 如果线程被中断,则移除该线程任务,并抛出异常
            else if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }
            // 如果q为空
            else if (q == null) {
                // 如果设置了超时时间,且已经超过了该超时时间,则直接返回该当前状态
                if (timed && nanos <= 0L)
                    return s;
                // 如果没有超时,则初始化q
                q = new WaitNode();
            }
            // 如果没有在队列中,则将该任务放到此队列中,注意,该队列为一个链表
            else if (!queued)
                queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q);
            // 下面时判断超时的操作
            else if (timed) {
                final long parkNanos;
                // 初始化开始时间
                if (startTime == 0L) { // first time
                    startTime = System.nanoTime();
                    if (startTime == 0L)
                        startTime = 1L;
                    parkNanos = nanos;
                } else {
                    // 判断是否已经超时
                    long elapsed = System.nanoTime() - startTime;
                    if (elapsed >= nanos) {
                        removeWaiter(q);
                        return state;
                    }
                    parkNanos = nanos - elapsed;
                }
                // nanoTime may be slow; recheck before parking
                // 挂起线程,即阻塞主线程
                if (state < COMPLETING)
                    LockSupport.parkNanos(this, parkNanos);
            }
            // 挂起线程,即阻塞主线程
            else
                LockSupport.park(this);
        }
    }
    // ... 下文代码省略
}

上面就是Future.get()方法的源码,通过report()方法,我们很好理解,其异常处理就是根据state的当前值进行判断,如果这个状态是NORMAL,那么久返回正常的返回值,否则就抛出异常。

那么为什么get()方法会阻塞主线程呢?核心代码awateDone()方法源码中。

为了看懂awateDone的源码,那就必须要了解一下FutureTask里的一个状态流转,下面是注释的原文

The run state of this task, initially NEW. The run state transitions to a terminal state only in methods set, setException, and cancel. During completion, state may take on transient values of COMPLETING (while outcome is being set) or INTERRUPTING (only while interrupting the runner to satisfy a cancel(true)). Transitions from these intermediate to final states use cheaper ordered/lazy writes because values are unique and cannot be further modified. Possible state transitions: NEW -> COMPLETING -> NORMAL NEW -> COMPLETING -> EXCEPTIONAL NEW -> CANCELLED NEW -> INTERRUPTING -> INTERRUPTED

其实翻译过来就是,FutureTask里的状态一共包含以下几个

  • NEW(0):初始化状态,只有在执行构建方法时才会设置为此状态
  • COMPLETING(1):赋值过程中状态,只有执行set()/setException()方法时,会设置为此状态,以表示该任务的子线程已经执行完成并拿到了返回结果,正在执行赋值操作
  • NORMAL(2):正常状态,正常赋值完成后的状态,只有在执行set()方法时会设置为此状态
  • EXCEPTIONAL(3):异常状态,当线程执行异常时的状态,只有在执行setException()方法时,会设置为此状态,表示子线程执行过程中出现了异常
  • CANCELLED(4)、INTERRUPTING(5)、INTERRUPTED(6):三个状态分别表示已取消,正在中断,已中断,由于上文应用不多,此处不做过多解读。

现在我们再来看上面的代码以及注释,我们就可以理解FutureTask是如何做到阻塞主线程,并等待子线程取得返回值后再继续执行主线程操作了,大概操作如下:

  1. 初始化等待任务节点
  2. 将等待任务节点添加到队列中
  3. 挂起主线程
  4. 如果子线程已经完成,则释放其CPU资源
  5. 如果子线程已经完成且结果已经赋值成功,则返回结果
  6. 过程中如果线程中断或者超时,则抛出异常
相关文章
|
14天前
|
vr&ar
简单易懂的 全景图高清下载方法以及原理简要解析(支持下载建E、720yun、酷雷曼、景站、酷家乐、百度街景原图)
这篇文章介绍了一种简单易懂的全景图高清下载方法,使用在线网站全景管家,支持下载包括建E、720yun、酷雷曼等多个平台的全景图原图,并简要解析了全景图的原理和制作方法。
简单易懂的 全景图高清下载方法以及原理简要解析(支持下载建E、720yun、酷雷曼、景站、酷家乐、百度街景原图)
|
8天前
|
域名解析 网络协议
DNS服务工作原理
文章详细介绍了DNS服务的工作原理,包括FQDN的概念、名称解析过程、DNS域名分级策略、根服务器的作用、DNS解析流程中的递归查询和迭代查询,以及为何有时基于IP能访问而基于域名不能访问的原因。
21 2
|
17天前
|
JavaScript 前端开发 安全
JS 混淆解析:JS 压缩混淆原理、OB 混淆特性、OB 混淆JS、混淆突破实战
JS 混淆解析:JS 压缩混淆原理、OB 混淆特性、OB 混淆JS、混淆突破实战
25 2
|
17天前
|
缓存 前端开发 JavaScript
Webpack 模块解析:打包原理、构造形式、扣代码补参数和全局导出
Webpack 模块解析:打包原理、构造形式、扣代码补参数和全局导出
21 1
|
19天前
|
设计模式 JavaScript 前端开发
Vue响应式原理全解析
Vue的响应式系统是其核心特性之一,它使得Vue能够以高效的方式响应数据的变化。通过对对象属性的getter和setter进行劫持,Vue实现了对数据变化的侦测和依赖收集,当数据变化时能够自动派发更新。Vue3中,响应式系统得到了进一步的加强和优化,使用Proxy替代了 `Object.defineProperty`,带来了更好的性能和更强大的拦截能力。理解Vue的响应式原理,对于深入理解Vue的工作机制和进行高效的Vue开发都具有重要意义。
32 1
|
22天前
|
缓存 监控 网络协议
DNS缓存中毒原理
【8月更文挑战第17天】
55 1
|
5天前
|
负载均衡 网络协议 安全
DNS解析中的Anycast技术:原理与优势
【9月更文挑战第7天】在互联网体系中,域名系统(DNS)将域名转换为IP地址,但网络规模的扩张使DNS解析面临高效、稳定与安全挑战。Anycast技术应运而生,通过将同一IP地址分配给多个地理分布的服务器,并依据网络状况自动选择最近且负载低的服务器响应查询请求,提升了DNS解析速度与效率,实现负载均衡,缓解DDoS攻击,增强系统高可用性。此技术利用动态路由协议如BGP实现,未来在网络发展中将扮演重要角色。
25 0
|
26天前
|
域名解析 缓存 网络协议
DNS解析过程原理!
DNS解析过程原理!
|
11天前
|
前端开发 Java UED
瞬间变身高手!JSF 与 Ajax 强强联手,打造极致用户体验的富客户端应用,让你的应用焕然一新!
【8月更文挑战第31天】JavaServer Faces (JSF) 是 Java EE 标准的一部分,常用于构建企业级 Web 应用。传统 JSF 应用采用全页面刷新方式,可能影响用户体验。通过集成 Ajax 技术,可以显著提升应用的响应速度和交互性。本文详细介绍如何在 JSF 应用中使用 Ajax 构建富客户端应用,并通过具体示例展示 Ajax 在 JSF 中的应用。首先,确保安装 JDK 和支持 Java EE 的应用服务器(如 Apache Tomcat 或 WildFly)。
23 0
|
11天前
|
Java Spring
🔥JSF 与 Spring 强强联手:打造高效、灵活的 Web 应用新标杆!💪 你还不知道吗?
【8月更文挑战第31天】JavaServer Faces(JSF)与 Spring 框架是常用的 Java Web 技术。本文介绍如何整合两者,发挥各自优势,构建高效灵活的 Web 应用。首先通过 `web.xml` 和 `ContextLoaderListener` 配置 Spring 上下文,在 `applicationContext.xml` 定义 Bean。接着使用 `@Autowired` 将 Spring 管理的 Bean 注入到 JSF 管理的 Bean 中。
25 0

热门文章

最新文章

推荐镜像

更多