【深度挖掘Java并发编程底层源码】「底层技术原理体系」带你零基础认识和分析学习相关的异步任务提交机制FutureTask的底层原理

简介: 【深度挖掘Java并发编程底层源码】「底层技术原理体系」带你零基础认识和分析学习相关的异步任务提交机制FutureTask的底层原理

FutureTask的基本介绍

FutureTask是Java中的一个类,它实现了Future接口和Runnable接口,并且被用作线程执行的任务。FutureTask可以在多线程环境下异步执行一个任务并获取其结果。

FutureTask的特点用法

  1. 异步执行:通过将耗时的任务交给FutureTask,在一个单独的线程中执行,当前线程可以继续执行其他任务,不会被阻塞,从而提高程序的并发性。
  2. 获取结果:FutureTask提供了get()方法,用于在任务完成后获取其执行结果。如果任务尚未完成,get()方法将阻塞当前线程,直到任务完成并返回结果。
  3. 取消任务:可以使用cancel()方法取消任务的执行。如果任务尚未开始执行,则会取消任务的执行;如果任务已经开始执行,则根据传入的参数决定是否中断正在执行的任务。
  4. 线程安全:FutureTask是线程安全的,可以在多个线程中共享使用。多个线程可以同时调用FutureTask的get()方法等待任务结果。

FutureTask常见的应用场景是在并发编程中,将计算密集型的任务交给FutureTask后台执行,并在需要时获取结果。这样可以充分利用系统资源,提高程序的性能和响应能力。

FutureTask的执行流程

任务提交

在之前的线程池分析中,我们介绍了AbstractExecutorService的实现。AbstractExecutorService是Java中的一个抽象类,它实现了ExecutorService接口,并提供了一些通用的逻辑和方法来简化线程池的实现。

AbstractExecutorService的主要特点和用法

  1. 线程池管理:AbstractExecutorService提供了默认的线程池管理逻辑。我们可以通过继承AbstractExecutorService并实现其中的抽象方法来定制自己的线程池。
  2. 任务执行:通过调用submit()或invokeAll()等方法,可以将任务提交给AbstractExecutorService来执行。它会在适当的时候自动创建线程并执行任务。
  3. 异常处理:AbstractExecutorService提供了一些默认的异常处理机制,比如可以捕获任务执行过程中抛出的异常,并进行相应的处理操作。我们也可以在继承AbstractExecutorService时自定义异常处理逻辑。
  4. 生命周期管理:AbstractExecutorService定义了线程池的生命周期方法,如shutdown()和isShutdown()等,用于管理线程池的启动和关闭。

对象模型转换传递流程

submit方法提交任务

AbstractExecutorService主要用于实现ExecutorService接口中的submit()和invokeAll()等方法。它提供了一些默认的实现,使得我们可以更方便地创建和管理线程池,并执行任务。

java

复制代码

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task, result);
    execute(ftask);
    return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

RunnableFuture任务模型

通过使用RunnableFuture,我们可以更加灵活地管理线程池,处理任务的提交和执行,并在需要时进行异常处理和线程池的关闭。最终进行构建一个FutureTask对象。

java

复制代码

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
}

FutureTask

对于通过submit方法提交的任务,无论是Runnable还是Callable接口实现,都会被统一封装为FutureTask对象,并传递给execute方法。

java

复制代码

public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // 确保 callable 的可见性
}
public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;      // 确保可调用的可见性
}

注意,AbstractExecutorService是一个抽象类,不能直接实例化。我们可以继承AbstractExecutorService类并实现其中的抽象方法,或者使用Java提供的具体实现类如ThreadPoolExecutor来创建并使用线程池

RunnableAdapter适配器模型

RunnableAdapter的作用是将一个Runnable对象包装成一个Callable对象。在适配器的call()方法中,会调用任务的run()方法来执行任务,并返回预先指定的结果。

java

复制代码

public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<T>(task, result);
 }

通过使用RunnableAdapter,我们可以将Runnable任务在提交给ExecutorService时,以Callable的方式进行处理和执行。

java

复制代码

static final class RunnableAdapter<T> implements Callable<T> {
    final Runnable task;
    final T result;
    
    RunnableAdapter(Runnable task, T result) {
        this.task = task;
        this.result = result;
    }
    
    public T call() {
        task.run();
        return result;
    }
}

该适配器的设计使得我们可以更加方便地将Runnable任务与Callable任务一同处理,充分利用ExecutorService的统一接口,并且无需对Runnable任务进行额外的修改。

任务状态

FutureTask的状态定义,说明了FutureTask对象可能处于的不同状态。初始状态为NEW,后续可能会演化为COMPLETING、NORMAL、EXCEPTIONAL、CANCELLED、INTERRUPTING、INTERRUPTED等状态。

java

复制代码

private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;

这些状态描述了FutureTask在执行过程中的不同情况,包括正常完成、遇到异常、被取消、被中断等。了解FutureTask的状态有助于我们理解和处理FutureTask对象在多线程环境下的行为。

任务执行

首先,方法会检查FutureTask的状态是否为NEW,并尝试使用compareAndSwapObject方法将执行线程设置为当前线程。然后,方法将执行并处理相关任务,并根据任务是否成功完成设置相应的结果。最后,清除执行线程(runner)并重新获取FutureTask的状态,根据状态处理可能的取消操作。

java

复制代码

public void run() {
    if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) {
        return;
    }
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran) {
                set(result);
            }
        }
    } finally {
        // 让runner非空直到state被稳定设置,以防止并发调用run()
        runner = null;
        // 在将runner设置为空之后必须重新读取state以防止泄漏中断
        int s = state;
        if (s >= INTERRUPTING) {
            handlePossibleCancellationInterrupt(s);
        }
    }
}

设置线程任务数据结果

对于Callable对象,FutureTask会直接执行它的call方法。如果call方法执行成功,FutureTask会调用set方法来设置执行结果;如果遇到异常,FutureTask会调用setException方法来设置异常。

java

复制代码

protected void set(V v) {
    // 首先使用CAS(比较并交换)将状态设置为中间状态COMPLETING
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        // 将状态设置为正常状态
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 最终状态
        finishCompletion();
    }
}
protected void setException(Throwable t) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = t;
        // 将状态设置为异常状态
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // 最终状态
        finishCompletion();
    }
}

下面是对于Callable直接执行其call方法的处理过程。

  1. 通过CAS操作将FutureTask的状态设置为中间状态COMPLETING。
  2. 根据执行的结果(成功或异常),将结果或异常存储在outcome变量中,并将状态设置为相应的最终状态(正常状态或异常状态)。
  3. 完成执行并处理相关的完成操作。

获取线程任务数据结果

下面的这两个方法都是用来对全局变量outcome进行赋值的。而当我们通过get方法在另一个线程中获取结果时,将会执行以下过程:

java

复制代码

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}

如果任务尚未完成,则会等待任务完成:

java

复制代码

private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    // 使用for循环阻塞当前线程
    for (;;) {
        // 响应中断
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }
        int s = state;
        // 任务已经完成或者已经抛出异常,直接返回
        if (s > COMPLETING) {
            // WaitNode已经创建,当前可以设置为null了
            if (q != null)
                q.thread = null;
            return s;
        }
        else if (s == COMPLETING) // 不能超时
            Thread.yield();
        else if (q == null)
            q = new WaitNode();
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                q.next = waiters, q);
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            LockSupport.parkNanos(this, nanos);
        }
        else
            LockSupport.park(this);
    }
}

下面是我们通过get方法在另一个线程中获取结果的过程。

  1. 首先,通过调用get方法获取FutureTask的状态值。
  2. 如果状态值小于或等于COMPLETING,则调用awaitDone方法来等待任务的完成。
  3. 在awaitDone方法中,根据是否设置了超时时间计算出等待的截止时间,并使用for循环来阻塞当前线程。
  4. 在循环中,对中断请求进行响应,并检查任务的状态。
  5. 如果设置了超时时间,则根据剩余的等待时间使用LockSupport.parkNanos进行阻塞。
  6. 如果没有设置超时时间,则使用LockSupport.park进行阻塞。
  7. 循环会一直执行,直到任务完成或超时,才会返回任务的状态值。
  8. 最后,通过调用report方法来返回相应的结果。 总之,该过程描述了在另一个线程中通过get方法等待任务完成的过程。它通过阻塞当前线程并监测任务状态来实现等待,直到任务完成后才返回结果。

上报线程任务数据结果

如果任务已完成或者等待任务直到完成后,将会调用report方法来返回结果, report 方法,发现状态为异常的话将包装成 ExecutionException((Throwable)x); 这个异常就是我们在使用 get 的时候需要捕获的异常。

java

复制代码

private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
        return (V) x;
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable) x);
}

如果状态s等于NORMAL,表示任务正常完成,将会返回实际的结果。如果状态s大于等于CANCELLED,则抛出CancellationException异常。否则,抛出ExecutionException异常。通过这样的设计,无论是任务正常完成还是抛出异常,都能够在线程池中执行的任务中被感知到。

任务取消

通过调用cancel方法可以取消FutureTask的执行,并在任务完成后通过finishCompletion方法来唤醒等待的线程。被唤醒的线程在awaitDone方法中继续循环,检查任务的状态以获取结果。

java

复制代码

public boolean cancel(boolean mayInterruptIfRunning) {
    // 如果任务不是刚创建或者是刚创建但是更改为指定状态失败则返回 false
    if (!(state == NEW &&
          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {    // in case call to interrupt throws exception
        if (mayInterruptIfRunning) {
            try {
                Thread t = runner;
                if (t != null)
                    t.interrupt();
            } finally { // final state
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        finishCompletion();
    }
    return true;
}
  1. FutureTask类中有一个cancel方法,接受一个布尔类型参数mayInterruptIfRunning,用于表示在执行中的任务是否可以中断。
  2. cancel方法首先检查任务的状态,如果状态不是NEW或者尝试将状态更改为INTERRUPTING或CANCELLED失败,则返回false。
  3. 如果允许中断正在运行的任务(mayInterruptIfRunning为true),则调用Thread的interrupt方法来中断任务的线程,并将任务的状态设置为INTERRUPTED。
  4. 然后,调用finishCompletion方法来完成任务的完成操作。
  5. 在finishCompletion方法中,首先尝试将waiters设置为null,以便其他线程无法再加入等待队列。然后循环遍历等待队列,唤醒等待的线程,并将等待队列中节点的thread属性设置为null。
  6. 在唤醒线程后,被唤醒的线程会在awaitDone方法中继续循环,检查任务的状态。
  7. 如果任务的状态大于COMPLETING,则表示任务已完成或抛出异常,直接返回状态。

总结说明

  • FutureTask是一个用于异步执行任务并获取结果的Java类。它提供了获取结果、取消任务等功能,帮助我们更好地处理多线程编程中的任务执行和结果获取。
  • AbstractExecutorService是Java中用于简化线程池实现的一个抽象类。通过继承并实现其中的方法,我们可以更方便地管理线程池的创建、任务的提交和执行。它提供了默认的异常处理和生命周期管理,并可以根据需要进行定制和扩展。
相关文章
|
13天前
|
前端开发 JavaScript Java
[Java计算机毕设]基于ssm的OA办公管理系统的设计与实现,附源码+数据库+论文+开题,包安装调试
OA办公管理系统是一款基于Java和SSM框架开发的B/S架构应用,适用于Windows系统。项目包含管理员、项目管理人员和普通用户三种角色,分别负责系统管理、请假审批、图书借阅等日常办公事务。系统使用Vue、HTML、JavaScript、CSS和LayUI构建前端,后端采用SSM框架,数据库为MySQL,共24张表。提供完整演示视频和详细文档截图,支持远程安装调试,确保顺利运行。
55 17
|
1天前
|
缓存 Dubbo Java
理解的Java中SPI机制
本文深入解析了JDK提供的Java SPI(Service Provider Interface)机制,这是一种基于接口编程、策略模式与配置文件组合实现的动态加载机制,核心在于解耦。文章通过具体示例介绍了SPI的使用方法,包括定义接口、创建配置文件及加载实现类的过程,并分析了其原理与优缺点。SPI适用于框架扩展或替换场景,如JDBC驱动加载、SLF4J日志实现等,但存在加载效率低和线程安全问题。
理解的Java中SPI机制
|
15天前
|
缓存 运维 Java
Java静态代码块深度剖析:机制、特性与最佳实践
在Java中,静态代码块(或称静态初始化块)是指类中定义的一个或多个`static { ... }`结构。其主要功能在于初始化类级别的数据,例如静态变量的初始化或执行仅需运行一次的初始化逻辑。
32 4
|
20天前
|
Java 调度
【源码】【Java并发】【线程池】邀请您从0-1阅读ThreadPoolExecutor源码
当我们创建一个`ThreadPoolExecutor`的时候,你是否会好奇🤔,它到底发生了什么?比如:我传的拒绝策略、线程工厂是啥时候被使用的? 核心线程数是个啥?最大线程数和它又有什么关系?线程池,它是怎么调度,我们传入的线程?...不要着急,小手手点上关注、点赞、收藏。主播马上从源码的角度带你们探索神秘线程池的世界...
91 0
【源码】【Java并发】【线程池】邀请您从0-1阅读ThreadPoolExecutor源码
|
1月前
|
JavaScript 安全 Java
智慧产科一体化管理平台源码,基于Java,Vue,ElementUI技术开发,二开快捷
智慧产科一体化管理平台覆盖从备孕到产后42天的全流程管理,构建科室协同、医患沟通及智能设备互联平台。通过移动端扫码建卡、自助报道、智能采集数据等手段优化就诊流程,提升孕妇就诊体验,并实现高危孕产妇五色管理和孕妇学校三位一体化管理,全面提升妇幼健康宣教质量。
51 12
|
7天前
|
JavaScript Java Docker
干货含源码!如何用Java后端操作Docker(命令行篇)
只有锻炼思维才能可持续地解决问题,只有思维才是真正值得学习和分享的核心要素。如果这篇博客能给您带来一点帮助,麻烦您点个赞支持一下,还可以收藏起来以备不时之需,有疑问和错误欢迎在评论区指出~
|
15天前
|
存储 监控 数据可视化
SaaS云计算技术的智慧工地源码,基于Java+Spring Cloud框架开发
智慧工地源码基于微服务+Java+Spring Cloud +UniApp +MySql架构,利用传感器、监控摄像头、AI、大数据等技术,实现施工现场的实时监测、数据分析与智能决策。平台涵盖人员、车辆、视频监控、施工质量、设备、环境和能耗管理七大维度,提供可视化管理、智能化报警、移动智能办公及分布计算存储等功能,全面提升工地的安全性、效率和质量。
|
9月前
|
Java C++
关于《Java并发编程之线程池十八问》的补充内容
【6月更文挑战第6天】关于《Java并发编程之线程池十八问》的补充内容
66 5
|
6月前
|
缓存 监控 Java
Java中的并发编程:理解并应用线程池
在Java的并发编程中,线程池是提高应用程序性能的关键工具。本文将深入探讨如何有效利用线程池来管理资源、提升效率和简化代码结构。我们将从基础概念出发,逐步介绍线程池的配置、使用场景以及最佳实践,帮助开发者更好地掌握并发编程的核心技巧。
|
8月前
|
安全 Java 开发者
Java中的并发编程:深入理解线程池
在Java的并发编程中,线程池是管理资源和任务执行的核心。本文将揭示线程池的内部机制,探讨如何高效利用这一工具来优化程序的性能与响应速度。通过具体案例分析,我们将学习如何根据不同的应用场景选择合适的线程池类型及其参数配置,以及如何避免常见的并发陷阱。
80 1