浅谈Java多线程之FutureTask

简介: 浅谈Java多线程之FutureTask

Runnable和Callable是多线程中的两个任务接口,实现接口的类将拥有多线程的功能,FutureTask类与这两个类是息息相关!


FutureTask继承体系


微信截图_20230504215421.png


看下这张图,原来FutureTask类实现了Runnable和Future,既然是Runnable的实现类,我们可以写如下的代码:


public static void main(String[] args) {
    FutureTask task = new FutureTask(new Callable() {
        @Override
        public Object call() throws Exception {
            System.out.println(Thread.currentThread().getName() + "========>正在执行!");
            return "SUCCESS";
        }
    });
    new Thread(task).start();
}


因为FutureTask是Runnable的实现类嘛,根据多态的特性,肯定可以传到Thread的构造器中。


FutureTask的构造方法  

 

构造方法1 接收Callable对象  


public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
}


构造方法2 接收Runnable对象  和一个泛型的result


public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}


原来,FutureTask内部维护Callable类型的成员变量,对于Callable任务,直接赋值即可。而对于Runnable任务,需要先调用Executors#callable()把Runnable先包装成Callable。


Executors.callable(runnable, result);


这行代码用了适配器模式,你给我一个runnable对象,我还你一个callable对象。


public static <T> Callable<T> callable(Runnable task, T result) {
    if (task == null)
        throw new NullPointerException();
    return new Executors.RunnableAdapter<T>(task, result);
}


RunnableAdapter是Executors中的静态内部类,上面代码意思是调用该静态内部类的构造方法,生成RunnableAdapter对象,而RunnableAdapter对象实现了Callable接口,根据多态也就相当于得到了一个Callable对象。


static final class RunnableAdapter<T> implements Callable<T> {
    final Runnable task;
    final T result;
    RunnableAdapter(Runnable task, T result) {
        this.task = task;
        this.result = result;
    }
    public T call() {
        task.run();
        return result;
    }
}


RunnableAdapter作为Callable的适配器,也拥有call方法,这就是适配器模式。


如果你是用第二种方式来构造FutureTask对象,因为传入的是Runnable,Runnable的run方法是没有返回值的,而Callable的call方法是有返回值的,所以这边就折中一下,返回值需要你在构建FutureTask对象时自己传进去,最后再原封不动地还给你。


如果你是用第一种方式来构造FutureTask对象,那就简单多了,直接传入一个Callable对象即可,返回值你自己决定。


总而言之,FutureTask的构造方法就为了做一件事,即统一Callable和Runnable!


为什么FutureTask要花这么大的精力去搞定Callable和Runnable呢?就是因为统一了好办事啊,以后在线程池的章节中,你还会频繁看到这个类。


捋一捋思路,为什么要用FutureTask?


多线程是Java进阶的难点,也是面试的重灾区,请确保你把上面的代码都理解了之后再来看这一节。


我们再回过头来想想,如何使用多线程呢,是不是有3个方法?如果记不得了请回过去看看上一个章节【线程类】。


第1种方法是直接继承Thread类,重写run方法。


第2种方法是实现Runnable接口,然后还是要靠Thread类的构造器,把Runnable传进去,最终调用的就是Runnable的run方法。


第3种方法是用线程池技术,用ExecutorService去提交Runnable对象/Callable对象,区别是Runnable没有返回值,Callable对象有返回值。


你发现没有,不管你用哪种方式,最终都是要靠Thread类去开启线程的。因为,有且仅有Thread类能通过start0()方法向操作系统申请线程资源(本地方法)


第一种方法因为耦合性太高,很少会使用,实际开发中我们一般都会使用线程池技术,所以第3种方法是有实战意义的。那么问题来了,Runnable和Callable对象都可以被用作线程池的任务,就有人会乱用了啊,有的人喜欢Runnable,有的喜欢Callable,到时候项目的代码就乱成一锅粥啦!


所以,我私以为Java的创始人意识到这一点,就干脆搞一个FutureTask出来一统江湖。我说的这么白,应该都明白了吧,嘿嘿。


FutureTask的7种状态


既然FutureTask是子类,那么必然有比Callable和Runnable强悍的地方,比如FutureTask的7种状态


private volatile int state;
private static final int NEW          = 0; 
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;


状态含义分别是:


●     0-刚创建

●     1-即将完成

●     2-完成

●     3-抛异常

●     4-任务取消

●     5-任务即将被打断

●     6-任务被打断


为什么要设置这些状态呢,那是因为FutureTask=任务+结果,调用者何时可以去获取这个结果result呢?FutureTask在调用get方法时,会去判断当前任务的状态,只有当任务完成才会给你实际的result,因此get方法是阻塞的。


FutureTask的get() 方法


先看下FutureTask的get() 方法是如何使用的:


FutureTask task = new FutureTask(new Callable() {
    @Override
    public Object call() throws Exception {
        System.out.println(Thread.currentThread().getName() + "========>正在执行!");
        Thread.sleep(2000); //执行耗时操作
        return "SUCCESS";
    }
});
new Thread(task).start();
System.out.println(task.get());


效果:


Thread-0========>正在执行!
SUCCESS


过了两秒后才打印出SUCCESS,说明get确实是阻塞的。再来一个线程池的例子:


ExecutorService executorService = Executors.newSingleThreadExecutor();
/**
 * 往线程池中提交一个Callable,立刻返回Future对象,但是该Future对象里面的返回值目前还是null
 * 只有当你调用get方法时,才会阻塞地获取该任务真实的返回值
 */
Future<Object> objectFuture = executorService.submit(new Callable<Object>() {
    @Override
    public Object call() throws Exception {
        System.out.println(Thread.currentThread().getName() + "========>正在执行!");
        Thread.sleep(2000); //执行耗时操作
        return "SUCCESS";
    }
});
Object result = objectFuture.get();
System.out.println(result);
executorService.shutdownNow();


FutureTask的run方法


先来看看源代码吧:


public void run() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}


12.png


FutureTask的run方法第一步果然是获取callable对象,这个callable对象也可能是runnable伪装的,上面介绍了适配器模式,这边就不再赘述了。


13.png


14.png


最终是存储到outcome对象了,简而言之,FutureTask的run方法的作用就是运行callable的call方法,拿到返回值保存到outcome对象,等待有人来取。


薛定谔的FutureTask


为什么说是薛定谔的FutureTask呢?那是因为,当你把FutureTask跑起来的时候,里面的outcome可能没有值,也可能有值。


但是又因为outcome在FutureTask源码中被设置成private,所以如果你要获取这个数据,只能通过get方法。而get方法是阻塞的,当你调用get方法时,一定是等到任务执行成功后,才会返回真实的值。


这就有点像薛定谔的猫,你不去观察它,两种状态皆有可能,一旦你去观察了(调用get方法),就只有一种明确的状态。


其实这真的只是一个小技巧,相信你也能办到,我们用代码来模拟一下这个过程。


首先,新建一个MyFutureTask类:


/**
 * 自定义任务类
 */
public class MyFutureTask implements Runnable{
    /**
     * 为了看到效果,outcome设置为Object
     */
    public Object outcome;
    public void run(){
        try {
            /**
             * 执行耗时操作
             */
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        /**
         * 给outcome赋值
         */
        this.outcome = "SUCCESS";
    }
    public Object get(){
        return outcome;
    }
}


为了模拟线程池,新建一个MyExecutorService类:


class MyExecutorService {
    /**
     * 提交任务
     * @param myFutureTask
     * @return
     */
    public MyFutureTask submit(MyFutureTask myFutureTask){
        /**
         * 开启一个线程把myFutureTask跑掉
         */
        new Thread(myFutureTask).start();
        /**
         * 线程有没有跑完不关心,直接把myFutureTask返回
         * 此时myFutureTask很可能不是最终结果,但其中的outcome一定指向最终结果
         */
        return myFutureTask;
    }
}


测试:


public static void main(String[] args) throws InterruptedException {
    MyExecutorService myExecutorService = new MyExecutorService();
    MyFutureTask myFutureTask = myExecutorService.submit(new MyFutureTask());
    /**
     * 线程开启立刻查看outcome
     */
    System.out.println(myFutureTask.outcome);
    /**
     * 主线程继续运作
     */
    Thread.sleep(1100);
    /**
     * 再次查看outcome是否有值
     */
    System.out.println(myFutureTask.get());
}


结果:


null
SUCCESS


总结一下,ExecutorService的submit方法只是提交Runnable或Callable任务到线程池,直接返回FutureTask给你,这个FutureTask是薛定谔的FutureTask,里面的outcome现在可能有值,也可能没有。只有当你主动调用get方法,才可以得到确切的值。


FutureTask的get方法阻塞原理


FutureTask的get方法是阻塞的,当你调用这个方法就一定要等该线程跑完,那么为什么能做到这样呢?


接下来我们看看get方法的阻塞原理是什么,我重新写了一个例子,注释能写的都写了,代码如下:


package com.javaxbfs.thread;
import java.util.concurrent.locks.LockSupport;
class MyExecutorService {
  /**
     * 提交任务
     * @param myFutureTask
     * @return
     */
    public MyFutureTask submit(MyFutureTask myFutureTask){
      /**
         * 开启一个线程把myFutureTask跑掉
         */
        Thread thread = new Thread(myFutureTask);
        thread.start();
      /**
         * 把线程赋给myFutureTask的runner属性,以方便查看线程状态
         */
        myFutureTask.runner = thread;
      /**
         * 线程有没有跑完不关心,直接把myFutureTask返回
         * 此时myFutureTask很可能不是最终结果,但其中的outcome一定指向最终结果
         */
        return myFutureTask;
    }
}
/**
 * 自定义任务类
 */
public class MyFutureTask implements Runnable{
  /**
     * 为了看到效果,outcome设置为Object
     */
    public Object outcome;
  /**
     * 当前任务所在的线程
     */
    public volatile Thread runner;
    public void run(){
        try {
          /**
             * 执行耗时操作
             */
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
      /**
         * 给outcome赋值
         */
        this.outcome = "SUCCESS";
    }
    public Object get() throws InterruptedException {
        if(awaitDone())
            return outcome;
        return null;
    }
    private boolean awaitDone() throws InterruptedException {
      /**
         * 做一个死循环,轮询检查当前线程状态
         */
        for(;;){
          /**
             * 如果当前线程被打断,则抛异常结束任务
             */
            if(runner.isInterrupted()){
                throw new InterruptedException();
            }
            if(runner.getState() == Thread.State.NEW){
                System.out.println(Thread.currentThread().getName() + "新建!");
            }
            if(runner.getState() == Thread.State.RUNNABLE){
                System.out.println(Thread.currentThread().getName() + "准备就绪!");
            }
            if(runner.getState() == Thread.State.TERMINATED){
                System.out.println(Thread.currentThread().getName() + "执行完毕!");
                return true;
            }
            Thread.sleep(200);
        }
    }
    public static void main(String[] args) throws InterruptedException {
        MyExecutorService myExecutorService = new MyExecutorService();
        MyFutureTask myFutureTask = myExecutorService.submit(new MyFutureTask());
      /**
         * 线程开启立刻查看outcome
         */
        System.out.println(myFutureTask.outcome);
      /**
         * 主线程继续运作
         */
        //Thread.sleep(1100);
      /**
         * 再次查看outcome是否有值,现在是阻塞的
         */
        System.out.println(myFutureTask.get());
    }
}


关键就在于这个 awaitDone 方法(源码也叫这个名字),它里面是一个死循环,不断去检查当前FutureTask所在线程的状态,当线程执行结束,就返回true,表示可以给出精确的result了。


真实的get方法实现非常复杂,不过思路是差不多的。


相关文章
|
2天前
|
Java
Java—多线程实现生产消费者
本文介绍了多线程实现生产消费者模式的三个版本。Version1包含四个类:`Producer`(生产者)、`Consumer`(消费者)、`Resource`(公共资源)和`TestMain`(测试类)。通过`synchronized`和`wait/notify`机制控制线程同步,但存在多个生产者或消费者时可能出现多次生产和消费的问题。 Version2将`if`改为`while`,解决了多次生产和消费的问题,但仍可能因`notify()`随机唤醒线程而导致死锁。因此,引入了`notifyAll()`来唤醒所有等待线程,但这会带来性能问题。
Java—多线程实现生产消费者
|
4天前
|
安全 Java Kotlin
Java多线程——synchronized、volatile 保障可见性
Java多线程中,`synchronized` 和 `volatile` 关键字用于保障可见性。`synchronized` 保证原子性、可见性和有序性,通过锁机制确保线程安全;`volatile` 仅保证可见性和有序性,不保证原子性。代码示例展示了如何使用 `synchronized` 和 `volatile` 解决主线程无法感知子线程修改共享变量的问题。总结:`volatile` 确保不同线程对共享变量操作的可见性,使一个线程修改后,其他线程能立即看到最新值。
|
4天前
|
消息中间件 缓存 安全
Java多线程是什么
Java多线程简介:本文介绍了Java中常见的线程池类型,包括`newCachedThreadPool`(适用于短期异步任务)、`newFixedThreadPool`(适用于固定数量的长期任务)、`newScheduledThreadPool`(支持定时和周期性任务)以及`newSingleThreadExecutor`(保证任务顺序执行)。同时,文章还讲解了Java中的锁机制,如`synchronized`关键字、CAS操作及其实现方式,并详细描述了可重入锁`ReentrantLock`和读写锁`ReadWriteLock`的工作原理与应用场景。
|
5天前
|
安全 Java 编译器
深入理解Java中synchronized三种使用方式:助您写出线程安全的代码
`synchronized` 是 Java 中的关键字,用于实现线程同步,确保多个线程互斥访问共享资源。它通过内置的监视器锁机制,防止多个线程同时执行被 `synchronized` 修饰的方法或代码块。`synchronized` 可以修饰非静态方法、静态方法和代码块,分别锁定实例对象、类对象或指定的对象。其底层原理基于 JVM 的指令和对象的监视器,JDK 1.6 后引入了偏向锁、轻量级锁等优化措施,提高了性能。
21 3
|
5天前
|
存储 安全 Java
Java多线程编程秘籍:各种方案一网打尽,不要错过!
Java 中实现多线程的方式主要有四种:继承 Thread 类、实现 Runnable 接口、实现 Callable 接口和使用线程池。每种方式各有优缺点,适用于不同的场景。继承 Thread 类最简单,实现 Runnable 接口更灵活,Callable 接口支持返回结果,线程池则便于管理和复用线程。实际应用中可根据需求选择合适的方式。此外,还介绍了多线程相关的常见面试问题及答案,涵盖线程概念、线程安全、线程池等知识点。
61 2
|
13天前
|
安全 Java API
java如何请求接口然后终止某个线程
通过本文的介绍,您应该能够理解如何在Java中请求接口并根据返回结果终止某个线程。合理使用标志位或 `interrupt`方法可以确保线程的安全终止,而处理好网络请求中的各种异常情况,可以提高程序的稳定性和可靠性。
44 6
|
Java API UED
Java 并发专题 :FutureTask 实现预加载数据 在线看电子书、浏览器浏览网页等
转自:http://blog.csdn.net/lmj623565791/article/details/26817403  继续并发专题~ FutureTask 有点类似Runnable,都可以通过Thread来启动,不过FutureTask可以返回执行完毕的数据,并且FutureTask的get方法支持阻塞。 由于:FutureTask可以返回执行完毕的数据,
1104 0
|
28天前
|
设计模式 Java 开发者
Java多线程编程的陷阱与解决方案####
本文深入探讨了Java多线程编程中常见的问题及其解决策略。通过分析竞态条件、死锁、活锁等典型场景,并结合代码示例和实用技巧,帮助开发者有效避免这些陷阱,提升并发程序的稳定性和性能。 ####
|
26天前
|
存储 监控 小程序
Java中的线程池优化实践####
本文深入探讨了Java中线程池的工作原理,分析了常见的线程池类型及其适用场景,并通过实际案例展示了如何根据应用需求进行线程池的优化配置。文章首先介绍了线程池的基本概念和核心参数,随后详细阐述了几种常见的线程池实现(如FixedThreadPool、CachedThreadPool、ScheduledThreadPool等)的特点及使用场景。接着,通过一个电商系统订单处理的实际案例,分析了线程池参数设置不当导致的性能问题,并提出了相应的优化策略。最终,总结了线程池优化的最佳实践,旨在帮助开发者更好地利用Java线程池提升应用性能和稳定性。 ####
|
28天前
|
缓存 Java 开发者
Java多线程编程的陷阱与最佳实践####
本文深入探讨了Java多线程编程中常见的陷阱,如竞态条件、死锁和内存一致性错误,并提供了实用的避免策略。通过分析典型错误案例,本文旨在帮助开发者更好地理解和掌握多线程环境下的编程技巧,从而提升并发程序的稳定性和性能。 ####