ExecutorService、Callable、Future实现有返回结果的多线程原理解析

简介: ExecutorService、Callable、Future实现有返回结果的多线程原理解析

原创/朱季谦

在并发多线程场景下,存在需要获取各线程的异步执行结果,这时,就可以通过ExecutorService线程池结合Callable、Future来实现。

我们先来写一个简单的例子——

public class ExecutorTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        Callable callable = new MyCallable();
        Future future = executor.submit(callable);
        System.out.println("打印线程池返回值:" + future.get());
    }
}
class MyCallable implements Callable<String>{
    @Override
    public String call() throws Exception {
        return "测试返回值";
    }
}

执行完成后,会打印出以下结果:

打印线程池返回值:测试返回值

可见,线程池执行完异步线程任务,我们是可以获取到异步线程里的返回值。

那么,ExecutorService、Callable、Future实现有返回结果的多线程是如何实现的呢?

首先,我们需要创建一个实现函数式接口Callable的类,该Callable接口只定义了一个被泛型修饰的call方法,这意味着,需要返回什么类型的值可以由具体实现类来定义——

@FunctionalInterface
public interface Callable<V> {
    V call() throws Exception;
}

因此,我自定义了一个实现Callable接口的类,该类的重写了call方法,我们在执行多线程时希望返回什么样的结果,就可以在该重写的call方法定义。

class MyCallable implements Callable<String>{
    @Override
    public String call() throws Exception {
        return "测试返回值";
    }
}

在自定义的MyCallable类中,我在call方法里设置一个很简单的String返回值 “测试返回值”,这意味着,我是希望在线程池执行完异步线程任务时,可以返回“测试返回值”这个字符串给我。

接下来,我们就可以创建该MyCallable类的对象,然后通过executor.submit(callable)丢到线程池里,线程池里会利用空闲线程来帮我们执行一个异步线程任务。

ExecutorService executor = Executors.newSingleThreadExecutor();
Callable callable = new MyCallable();
Future future = executor.submit(callable);

值得注意一点是,若需要实现获取线程返回值的效果,只能通过executor.submit(callable)去执行,而不能通过executor.execute(Runnable command)执行,因为executor.execute(Runnable command)只能传入实现Runnable 接口的对象,但这类对象是不具备返回线程效果的功能。

进入到executor.submit(callable)底层,具体实现在AbstractExecutorService类中。可以看到,执行到submit方法内部时,会将我们传进来的new MyCallable()对象作为参数传入到newTaskFor(task)方法里——

public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

这个newTaskFor(task)方法内部具体实现,是将new MyCallable()对象传入构造器中,生成了一个FutureTask对象。

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
}

这个FutureTask对象实现RunableFuture接口,这个RunableFuture接口又继承了Runnable,说明FutureTask类内部会实现一个run方法,然后本身就可以当做一个Runnable线程任务,借助线程Thread(new FutureTask(.....)).start()方式开启一个新线程,去异步执行其内部实现的run方法逻辑。

public class FutureTask<V> implements RunnableFuture<V>{.....}
public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}

分析到这里,可以知道FutureTask的核心方法一定是run方法,线程执行start方法后,最后会去调用FutureTask的run方法。在讲解这个run方法前,我们先去看一下创建FutureTask的初始化构造方法底层逻辑new FutureTask(callable) ——

public class FutureTask<V> implements RunnableFuture<V> {
private Callable<V> callable;
......//省略其余源码
public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    //通过构造方法初始化Callable<V> callable赋值
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
}
......//省略其余源码
}

可以看到,FutureTask(Callable callable)构造器,主要是将我们先前创建的new MyCallable()对象传进来,赋值给FutureTask内部定义的Callable callable引用,实现子类对象指向父类引用。这一点很关键,这就意味着,在初始化创建FutureTask对象后,我们是可以通过callable.call()来调用我们自定义设置可以返回“测试返回值”的call方法,这不就是我们希望在异步线程执行完后能够返回的值吗?

我们不妨猜测一下整体返数主流程,在Thread(new FutureTask(.....)).start()开启一个线程后,当线程获得了CPU时间片,就会去执行FutureTask对象里的run方法,这时run方法里可以通过callable.call()调用到我们自定义的MyCallable#call()方法,进而得到方法返回值 “测试返回值”——到这一步,只需要将这个返回值赋值给FutureTask里某个定义的对象属性,那么,在主线程在通过获取FutureTask里被赋值的X对象属性值,不就可以拿到返回字符串值 “测试返回值”了吗?

实现上,主体流程确实是这样,只不过忽略了一些细节而已。

接下来,让我们看一下FutureTask的run方法——

public void run() {
    //如果状态不是NEW或者设置runner为当前线程时,说明FutureTask任务已经取消,无法继续执行
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        //在该文中,callable被赋值为指向我们定义的new MyCallable()对象引用
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                //c.call最后会调用new MyCallable()的call()方法,得到字符串返回值“测试返回值”给result
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            //正常执行完c.call()方法时,ran值为true,说明会执行set(result)方法。
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

根据以上源码简单分析,可以看到run方法当中,最终确实会执行new MyCallable()的call()方法,得到字符串返回值“测试返回值”给result,然后执行set(result)方法,根据set方法名就不难猜出,这是一个会赋值给某个字段的方法。

这里分析会忽略一些状态值的讲解,这块会包括线程的取消、终止等内容,后面我会出一片专门针对FutureTask源码分析的文章再介绍,本文主要还是介绍异步线程返回结果的主要原理。

沿着以上分析,追踪至set(result)方法里——

protected void set(V v) {
    //通过CAS原子操作,将运行的线程设置为COMPLETING,说明线程已经执行完成中
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        //若CAS原子比较赋值成功,说明线程可以被正常执行完成的话,然后将result结果值赋值给outcome
        outcome = v;
        //线程正常完成结束
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();
    }
}

这个方法的主要是,若该线程执行能够正常完成话,就将得到的返回值赋值给outcome,这个outcome是FutureTask的一个Object变量——

private Object outcome;

至此,就完成了流程的这一步——

最后,就是执行主线程的根据ftask.get()获取执行完成的值,这个get可以设置超时时间,例如 ftask.get(2,TimeUnit.SECONDS)表示超过2秒还没有获取到线程返回值的话,就直接结束该get方法,继续主线程往下执行。

System.out.println("打印线程池返回值:" + ftask.get(2,TimeUnit.SECONDS));

进入到get方法,可以看到当状态在s <= COMPLETING时,表示任务还没有执行完,就会去执行awaitDone(false, 0L)方法,这个方法表示,将一直做死循环等待线程执行完成,才会跳出等待循环继续往下走。若设置了超时时间,例如ftask.get(2,TimeUnit.SECONDS)),就会在awaitDone方法循环至2秒,在2秒内发现线程状态被设置为正常完成时,就会跳出循环,若2秒后线程没有执行完成,也会强制跳出循环了,但这种情况将无法获取到线程结果值。

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        //循环等待线程执行状态
        s = awaitDone(false, 0L);
    return report(s);
}

最后就是report(s)方法,可以看到outcome值最终赋值给Object x,若s==NORMAL表示线程任务已经正常完成结束,就可以根据我们定义的类型进行泛型转换返回,我们定义的是String字符串类型,故而会返回字符串值,也就是 “测试返回值”。

private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
        //返回线程任务执行结果
        return (V)x;
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}

你看,最后就能获取到了异步线程执行的结果返回给main主线程——

以上就是执行线程任务run方法后,如何将线程任务结果返回给主线程,其实,还少一个地方补充,就是如何将FutureTask任务丢给线程执行,我们这里用到了线程池, 但是execute(ftask)底层同样是使用一个了线程通过执行start方法开启一个线程,这个新运行的线程最终会执行FutureTask的run方法。

public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

可以简单优化下,直接用一个线程演示该案例,这样看着更好理解些,当时,生产上是不会有这样直接用一个线程来执行的,更多是通过原生线程池——

public static void main(String[] args) throws Exception{
    Callable callable = new MyCallable();
    RunnableFuture<String> ftask = new FutureTask<String>(callable);
    new Thread(ftask).start();
    System.out.println("打印线程池返回值:" + ftask.get());
}
目录
相关文章
|
8月前
|
安全 算法 网络协议
解析:HTTPS通过SSL/TLS证书加密的原理与逻辑
HTTPS通过SSL/TLS证书加密,结合对称与非对称加密及数字证书验证实现安全通信。首先,服务器发送含公钥的数字证书,客户端验证其合法性后生成随机数并用公钥加密发送给服务器,双方据此生成相同的对称密钥。后续通信使用对称加密确保高效性和安全性。同时,数字证书验证服务器身份,防止中间人攻击;哈希算法和数字签名确保数据完整性,防止篡改。整个流程保障了身份认证、数据加密和完整性保护。
|
10月前
|
存储 缓存 算法
HashMap深度解析:从原理到实战
HashMap,作为Java集合框架中的一个核心组件,以其高效的键值对存储和检索机制,在软件开发中扮演着举足轻重的角色。作为一名资深的AI工程师,深入理解HashMap的原理、历史、业务场景以及实战应用,对于提升数据处理和算法实现的效率至关重要。本文将通过手绘结构图、流程图,结合Java代码示例,全方位解析HashMap,帮助读者从理论到实践全面掌握这一关键技术。
297 14
|
7月前
|
机器学习/深度学习 数据可视化 PyTorch
深入解析图神经网络注意力机制:数学原理与可视化实现
本文深入解析了图神经网络(GNNs)中自注意力机制的内部运作原理,通过可视化和数学推导揭示其工作机制。文章采用“位置-转移图”概念框架,并使用NumPy实现代码示例,逐步拆解自注意力层的计算过程。文中详细展示了从节点特征矩阵、邻接矩阵到生成注意力权重的具体步骤,并通过四个类(GAL1至GAL4)模拟了整个计算流程。最终,结合实际PyTorch Geometric库中的代码,对比分析了核心逻辑,为理解GNN自注意力机制提供了清晰的学习路径。
490 7
深入解析图神经网络注意力机制:数学原理与可视化实现
|
7月前
|
机器学习/深度学习 缓存 自然语言处理
深入解析Tiktokenizer:大语言模型中核心分词技术的原理与架构
Tiktokenizer 是一款现代分词工具,旨在高效、智能地将文本转换为机器可处理的离散单元(token)。它不仅超越了传统的空格分割和正则表达式匹配方法,还结合了上下文感知能力,适应复杂语言结构。Tiktokenizer 的核心特性包括自适应 token 分割、高效编码能力和出色的可扩展性,使其适用于从聊天机器人到大规模文本分析等多种应用场景。通过模块化设计,Tiktokenizer 确保了代码的可重用性和维护性,并在分词精度、处理效率和灵活性方面表现出色。此外,它支持多语言处理、表情符号识别和领域特定文本处理,能够应对各种复杂的文本输入需求。
840 6
深入解析Tiktokenizer:大语言模型中核心分词技术的原理与架构
|
8月前
|
机器学习/深度学习 算法 数据挖掘
解析静态代理IP改善游戏体验的原理
静态代理IP通过提高网络稳定性和降低延迟,优化游戏体验。具体表现在加快游戏网络速度、实时玩家数据分析、优化游戏设计、简化更新流程、维护网络稳定性、提高连接可靠性、支持地区特性及提升访问速度等方面,确保更流畅、高效的游戏体验。
192 22
解析静态代理IP改善游戏体验的原理
|
9月前
|
Java 程序员
Java社招面试中的高频考点:Callable、Future与FutureTask详解
大家好,我是小米。本文主要讲解Java多线程编程中的三个重要概念:Callable、Future和FutureTask。它们在实际开发中帮助我们更灵活、高效地处理多线程任务,尤其适合社招面试场景。通过 Callable 可以定义有返回值且可能抛出异常的任务;Future 用于获取任务结果并提供取消和检查状态的功能;FutureTask 则结合了两者的优势,既可执行任务又可获取结果。掌握这些知识不仅能提升你的编程能力,还能让你在面试中脱颖而出。文中结合实例详细介绍了这三个概念的使用方法及其区别与联系。希望对大家有所帮助!
422 60
|
8月前
|
编解码 缓存 Prometheus
「ximagine」业余爱好者的非专业显示器测试流程规范,同时也是本账号输出内容的数据来源!如何测试显示器?荒岛整理总结出多种测试方法和注意事项,以及粗浅的原理解析!
本期内容为「ximagine」频道《显示器测试流程》的规范及标准,我们主要使用Calman、DisplayCAL、i1Profiler等软件及CA410、Spyder X、i1Pro 2等设备,是我们目前制作内容数据的重要来源,我们深知所做的仍是比较表面的活儿,和工程师、科研人员相比有着不小的差距,测试并不复杂,但是相当繁琐,收集整理测试无不花费大量时间精力,内容不完善或者有错误的地方,希望大佬指出我们好改进!
489 16
「ximagine」业余爱好者的非专业显示器测试流程规范,同时也是本账号输出内容的数据来源!如何测试显示器?荒岛整理总结出多种测试方法和注意事项,以及粗浅的原理解析!
|
7月前
|
传感器 人工智能 监控
反向寻车系统怎么做?基本原理与系统组成解析
本文通过反向寻车系统的核心组成部分与技术分析,阐述反向寻车系统的工作原理,适用于适用于商场停车场、医院停车场及火车站停车场等。如需获取智慧停车场反向寻车技术方案前往文章最下方获取,如有项目合作及技术交流欢迎私信作者。
455 2
|
9月前
|
机器学习/深度学习 自然语言处理 搜索推荐
自注意力机制全解析:从原理到计算细节,一文尽览!
自注意力机制(Self-Attention)最早可追溯至20世纪70年代的神经网络研究,但直到2017年Google Brain团队提出Transformer架构后才广泛应用于深度学习。它通过计算序列内部元素间的相关性,捕捉复杂依赖关系,并支持并行化训练,显著提升了处理长文本和序列数据的能力。相比传统的RNN、LSTM和GRU,自注意力机制在自然语言处理(NLP)、计算机视觉、语音识别及推荐系统等领域展现出卓越性能。其核心步骤包括生成查询(Q)、键(K)和值(V)向量,计算缩放点积注意力得分,应用Softmax归一化,以及加权求和生成输出。自注意力机制提高了模型的表达能力,带来了更精准的服务。
10371 46
|
8月前
|
Java 数据库 开发者
详细介绍SpringBoot启动流程及配置类解析原理
通过对 Spring Boot 启动流程及配置类解析原理的深入分析,我们可以看到 Spring Boot 在启动时的灵活性和可扩展性。理解这些机制不仅有助于开发者更好地使用 Spring Boot 进行应用开发,还能够在面对问题时,迅速定位和解决问题。希望本文能为您在 Spring Boot 开发过程中提供有效的指导和帮助。
817 12

推荐镜像

更多
  • DNS