Java多线程——FutureTask源码解析

本文涉及的产品
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介:

一个很常见的多线程案例是,我们安排主线程作为分配任务和汇总的一方,然后将计算工作切分为多个子任务,安排多个线程去计算,最后所有的计算结果由主线程进行汇总。比如,归并排序,字符频率的统计等等。

我们知道Runnable是不返回计算结果的,如果想利用多线程的话,只能存储到一个实例的内部变量里面进行交互,但存在一个问题,如何判断是否已经计算完成了。用Thread.join是一个方案,但是我们只能依次等待一个线程结束后处理一个线程,如果线程1恰好特别慢,则后续已经完成的线程不能被及时处理。我们希望能够获知线程的执行状态,发现哪个线程处理完就先统计它的计算结果。可以考虑使用Callable和FutureTask来完成。

先说Callable它是一个功能接口,它只有一个方法V call(),计算一个结果,失败的话抛出一个异常。和Runnable不同的是,它不能直接交给Thread来执行,所以需要一个别的类来封装它与Runnable,这个类就是FutureTask。FutureTask是一个类,继承了RunnableFuture,而RunnableFuture是一个多继承接口,它继承了Runnable和 Future,所以FutureTask是可以作为实现了Runnable的实例交给Thread执行。

从内部变量来看,含有一个下层Callable实例,一个状态表示,一个返回结果,以及对运行线程的记录

    /**
     * 任务的运行状态,最初是NEW。运行状态只在set, setException和cancel方法中过度到最终状态。
     * 在完成过程中,状态可能发生转移到COMPLETING(在设置结果时)或者INTERRUPTING(仅当中断运行来满足cancel(true)时)。
     * 从这些中间状态转移到最终状态使用成本更低有序/懒惰写入,因为值是唯一的且之后不能再修改。
     *
     * Possible state transitions:
     * NEW -> COMPLETING -> NORMAL
     * NEW -> COMPLETING -> EXCEPTIONAL
     * NEW -> CANCELLED
     * NEW -> INTERRUPTING -> INTERRUPTED
     */
    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

    /** 下层的callable,运行后为null */
    private Callable<V> callable;
    /** get()操作返回的结果或者抛出的异常*/
    private Object outcome; // 不是volatile,由reads/writes状态来保护
    /** 运行callable的线程,在run()通过CAS修改*/
    private volatile Thread runner;
    /** 等待线程的Treiber堆栈 */
    private volatile WaitNode waiters;

构造函数

构造函数总共有两种重载,第一种直接给出Callable实例

    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // 确保callable的可见性
    }

第二种,给出Runnable实例和期望的返回结果,如果Runnable实例运行成功则返回的是result

    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);//创建一个callable
        this.state = NEW;       // 确保callable的可见性
    }

run

run方法是Thread运行FutureTask内任务的接口。首先,根据最上方的进入条件可以看出,只有成功竞争到修改runnerOffset成功的线程才能执行后续方法,而搜索整个类文件,可以发现只有在run结束后才会重置为null,所以同一时间只能有一个线程执行run方法成功。然后要检查state和callable的状态,因为run会将它们修改。调用callable.call方法获取返回结果,成功的话设置结果,失败的话设置返回结果为异常。无论是否执行成功,runner会被重置为null。

    public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;//状态必须是NEW且修改执行线程成功,否则直接返回,避免被多个线程同时执行
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();//调用callable.call方法获取返回结果
                    ran = true;//执行成功
                } catch (Throwable ex) {
                    result = null;
                    ran = false;//执行失败
                    setException(ex);//设置返回异常并唤醒等待线程解除阻塞
                }
                if (ran)
                    set(result);//设置结果
            }
        } finally {
            //runner直到状态设置完成不能为null来避免并发调用run()
            runner = null;
            //在将runner设置为null后需要重新读取state避免漏掉中断
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

set方法先修改state为COMPLETING,然后将outcome设置为刚才计算出来的结果,最后设置state为NORMAL,并调用finishCompletion。这个方法移除并通知所有等待的线程解除阻塞,调用done(),并将callable设为null。done方法默认是什么也不做。

    protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 最终状态
            finishCompletion();//唤醒等待线程,将callable设为null
        }
    }

    private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);//如果线程被park阻塞,解除阻塞
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // 取消连接帮助gc
                    q = next;
                }
                break;
            }
        }

        done();//未重写时什么也不做

        callable = null;        // to reduce footprint减少覆盖区
    }

setException跟set逻辑上基本一样,除了设置返回结果是Throwable对象

    protected void setException(Throwable t) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {//修改状态
            outcome = t;//结果为Throwable
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // 最终状态
            finishCompletion();
        }
    }

get

get方法如果FutureTask已经执行完成则返回结果,否则会等待并阻止线程调度。等待时长可以输入,单位为纳秒,不输入为不限时等待,限时等待超时仍然没有完成会抛出异常。

    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);//等待完成
        return report(s);//检查时返回结果还是抛出异常
    }

    public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null)
            throw new NullPointerException();
        int s = state;
        if (s <= COMPLETING &&
            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)//限时等待完成
            throw new TimeoutException();
        return report(s);
    }

awaitDone这个方法会阻塞当前线程(get方法的调用线程)的调度并增加等待结点,阻塞时长根据输入的时间长度决定。如果执行Callable任务的线程完成了运行或者被中断,则会解除栈中等待结点对应线程的阻塞。然后会根据执行结果决定是否要抛出异常还是返回执行完成的结果。

    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;//是否完成入栈
        for (;;) {
            if (Thread.interrupted()) {//检查线程是否已经被中断
                removeWaiter(q);//移除被中断的等待结点
                throw new InterruptedException();
            }

            int s = state;
            if (s > COMPLETING) {//已经完成
                if (q != null)
                    q.thread = null;//移除等待
                return s;
            }
            else if (s == COMPLETING) // cannot time out yet还没有超时
                Thread.yield();//已经在赋值,所以只需让出时间片等待赋值完成
            //下方都是还在没有完成call方法的情况
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);//q加入到栈的最前方
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {//超时了
                    removeWaiter(q);//移除超时的等待结点
                    return state;
                }
                LockSupport.parkNanos(this, nanos);//阻塞当前线程nanos纳秒
            }
            else
                LockSupport.park(this);//阻塞当前线程
        }
    }

cancel

cancel输入的参数表示如果当前还在运行中是否要中断执行线程,如果输入参数是false则只有线程已经执行完成或者抛出异常或者已经被中断时可以把状态修改为CANCELLED,如果是true则会中断线程并将状态改为INTERRUPTED。所以,cancel在该任务已经结束或者已被取消,或者竞争修改状态失败时都会失败。如果中断成功,会释放所有被阻塞的等待线程。

    public boolean cancel(boolean mayInterruptIfRunning) {
        if (!(state == NEW &&
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;//已经完成或者被取消或者竞争取消失败返回false
        try {    // in case call to interrupt throws exception
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();
                } finally { // final state
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
            finishCompletion();
        }
        return true;
    }

状态检查

非常简单的两个方法。因为CANCELLED是state中最大的,所以只有cancel方法成功才会是这种状态。而isDone只要不是还在运行或者还没有被执行就是返回true。

    public boolean isCancelled() {
        return state >= CANCELLED;
    }

    public boolean isDone() {
        return state != NEW;
    }

简单的使用示例

public class CallableTest implements Callable<Integer>{
    private int start;
    
    public CallableTest(int start) {
        this.start = start;
    }
    
    @Override
    public Integer call() throws Exception {
        Thread.sleep(500);
        return start + 1;
    }
    
    public static void main(String args[]) throws InterruptedException, ExecutionException{
        long start = System.currentTimeMillis();
        FutureTask<Integer> task1 = new FutureTask<>(new CallableTest(2));
        new Thread(task1).start();
        FutureTask<Integer> task2 = new FutureTask<>(new CallableTest(4));
        new Thread(task2).start();
        System.out.println(task1.get() + task2.get());//8
        long end = System.currentTimeMillis();
        System.out.println(end - start);//506
    }
}
相关文章
|
6天前
|
存储 缓存 安全
【Java面试题汇总】多线程、JUC、锁篇(2023版)
线程和进程的区别、CAS的ABA问题、AQS、哪些地方使用了CAS、怎么保证线程安全、线程同步方式、synchronized的用法及原理、Lock、volatile、线程的六个状态、ThreadLocal、线程通信方式、创建方式、两种创建线程池的方法、线程池设置合适的线程数、线程安全的集合?ConcurrentHashMap、JUC
【Java面试题汇总】多线程、JUC、锁篇(2023版)
|
2天前
|
Java
深入理解Java中的多线程编程
本文将探讨Java多线程编程的核心概念和技术,包括线程的创建与管理、同步机制以及并发工具类的应用。我们将通过实例分析,帮助读者更好地理解和应用Java多线程编程,提高程序的性能和响应能力。
15 4
|
1天前
|
安全 Java 调度
Java 并发编程中的线程安全和性能优化
本文将深入探讨Java并发编程中的关键概念,包括线程安全、同步机制以及性能优化。我们将从基础入手,逐步解析高级技术,并通过实例展示如何在实际开发中应用这些知识。阅读完本文后,读者将对如何在多线程环境中编写高效且安全的Java代码有一个全面的了解。
|
6天前
|
Java 调度 开发者
Java中的多线程基础及其应用
【9月更文挑战第13天】本文将深入探讨Java中的多线程概念,从基本理论到实际应用,带你一步步了解如何有效使用多线程来提升程序的性能。我们将通过实际代码示例,展示如何在Java中创建和管理线程,以及如何利用线程池优化资源管理。无论你是初学者还是有经验的开发者,这篇文章都将为你提供有价值的见解和技巧,帮助你更好地理解和应用多线程编程。
|
21天前
|
监控 网络协议 Java
Tomcat源码解析】整体架构组成及核心组件
Tomcat,原名Catalina,是一款优雅轻盈的Web服务器,自4.x版本起扩展了JSP、EL等功能,超越了单纯的Servlet容器范畴。Servlet是Sun公司为Java编程Web应用制定的规范,Tomcat作为Servlet容器,负责构建Request与Response对象,并执行业务逻辑。
Tomcat源码解析】整体架构组成及核心组件
|
1月前
|
存储 NoSQL Redis
redis 6源码解析之 object
redis 6源码解析之 object
54 6
|
6天前
|
存储 缓存 Java
什么是线程池?从底层源码入手,深度解析线程池的工作原理
本文从底层源码入手,深度解析ThreadPoolExecutor底层源码,包括其核心字段、内部类和重要方法,另外对Executors工具类下的四种自带线程池源码进行解释。 阅读本文后,可以对线程池的工作原理、七大参数、生命周期、拒绝策略等内容拥有更深入的认识。
什么是线程池?从底层源码入手,深度解析线程池的工作原理
|
6天前
|
设计模式 Java 关系型数据库
【Java笔记+踩坑汇总】Java基础+JavaWeb+SSM+SpringBoot+SpringCloud+瑞吉外卖/谷粒商城/学成在线+设计模式+面试题汇总+性能调优/架构设计+源码解析
本文是“Java学习路线”专栏的导航文章,目标是为Java初学者和初中高级工程师提供一套完整的Java学习路线。
|
10天前
|
开发工具
Flutter-AnimatedWidget组件源码解析
Flutter-AnimatedWidget组件源码解析
|
28天前
|
测试技术 Python
python自动化测试中装饰器@ddt与@data源码深入解析
综上所述,使用 `@ddt`和 `@data`可以大大简化写作测试用例的过程,让我们能专注于测试逻辑的本身,而无需编写重复的测试方法。通过讲解了 `@ddt`和 `@data`源码的关键部分,我们可以更深入地理解其背后的工作原理。
23 1

热门文章

最新文章

推荐镜像

更多