Runnable和Callable是多线程中的两个任务接口,实现接口的类将拥有多线程的功能,FutureTask类与这两个类是息息相关!
FutureTask继承体系
看下这张图,原来FutureTask类实现了Runnable和Future,既然是Runnable的实现类,我们可以写如下的代码:
public static void main(String[] args) { FutureTask task = new FutureTask(new Callable() { @Override public Object call() throws Exception { System.out.println(Thread.currentThread().getName() + "========>正在执行!"); return "SUCCESS"; } }); new Thread(task).start(); }
因为FutureTask是Runnable的实现类嘛,根据多态的特性,肯定可以传到Thread的构造器中。
FutureTask的构造方法
构造方法1 接收Callable对象
public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable }
构造方法2 接收Runnable对象 和一个泛型的result
public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable }
原来,FutureTask内部维护Callable类型的成员变量,对于Callable任务,直接赋值即可。而对于Runnable任务,需要先调用Executors#callable()把Runnable先包装成Callable。
Executors.callable(runnable, result);
这行代码用了适配器模式,你给我一个runnable对象,我还你一个callable对象。
public static <T> Callable<T> callable(Runnable task, T result) { if (task == null) throw new NullPointerException(); return new Executors.RunnableAdapter<T>(task, result); }
RunnableAdapter是Executors中的静态内部类,上面代码意思是调用该静态内部类的构造方法,生成RunnableAdapter对象,而RunnableAdapter对象实现了Callable接口,根据多态也就相当于得到了一个Callable对象。
static final class RunnableAdapter<T> implements Callable<T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; } }
RunnableAdapter作为Callable的适配器,也拥有call方法,这就是适配器模式。
如果你是用第二种方式来构造FutureTask对象,因为传入的是Runnable,Runnable的run方法是没有返回值的,而Callable的call方法是有返回值的,所以这边就折中一下,返回值需要你在构建FutureTask对象时自己传进去,最后再原封不动地还给你。
如果你是用第一种方式来构造FutureTask对象,那就简单多了,直接传入一个Callable对象即可,返回值你自己决定。
总而言之,FutureTask的构造方法就为了做一件事,即统一Callable和Runnable!
为什么FutureTask要花这么大的精力去搞定Callable和Runnable呢?就是因为统一了好办事啊,以后在线程池的章节中,你还会频繁看到这个类。
捋一捋思路,为什么要用FutureTask?
多线程是Java进阶的难点,也是面试的重灾区,请确保你把上面的代码都理解了之后再来看这一节。
我们再回过头来想想,如何使用多线程呢,是不是有3个方法?如果记不得了请回过去看看上一个章节【线程类】。
第1种方法是直接继承Thread类,重写run方法。
第2种方法是实现Runnable接口,然后还是要靠Thread类的构造器,把Runnable传进去,最终调用的就是Runnable的run方法。
第3种方法是用线程池技术,用ExecutorService去提交Runnable对象/Callable对象,区别是Runnable没有返回值,Callable对象有返回值。
你发现没有,不管你用哪种方式,最终都是要靠Thread类去开启线程的。因为,有且仅有Thread类能通过start0()方法向操作系统申请线程资源(本地方法)
第一种方法因为耦合性太高,很少会使用,实际开发中我们一般都会使用线程池技术,所以第3种方法是有实战意义的。那么问题来了,Runnable和Callable对象都可以被用作线程池的任务,就有人会乱用了啊,有的人喜欢Runnable,有的喜欢Callable,到时候项目的代码就乱成一锅粥啦!
所以,我私以为Java的创始人意识到这一点,就干脆搞一个FutureTask出来一统江湖。我说的这么白,应该都明白了吧,嘿嘿。
FutureTask的7种状态
既然FutureTask是子类,那么必然有比Callable和Runnable强悍的地方,比如FutureTask的7种状态
private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6;
状态含义分别是:
● 0-刚创建
● 1-即将完成
● 2-完成
● 3-抛异常
● 4-任务取消
● 5-任务即将被打断
● 6-任务被打断
为什么要设置这些状态呢,那是因为FutureTask=任务+结果,调用者何时可以去获取这个结果result呢?FutureTask在调用get方法时,会去判断当前任务的状态,只有当任务完成才会给你实际的result,因此get方法是阻塞的。
FutureTask的get() 方法
先看下FutureTask的get() 方法是如何使用的:
FutureTask task = new FutureTask(new Callable() { @Override public Object call() throws Exception { System.out.println(Thread.currentThread().getName() + "========>正在执行!"); Thread.sleep(2000); //执行耗时操作 return "SUCCESS"; } }); new Thread(task).start(); System.out.println(task.get());
效果:
Thread-0========>正在执行! SUCCESS
过了两秒后才打印出SUCCESS,说明get确实是阻塞的。再来一个线程池的例子:
ExecutorService executorService = Executors.newSingleThreadExecutor(); /** * 往线程池中提交一个Callable,立刻返回Future对象,但是该Future对象里面的返回值目前还是null * 只有当你调用get方法时,才会阻塞地获取该任务真实的返回值 */ Future<Object> objectFuture = executorService.submit(new Callable<Object>() { @Override public Object call() throws Exception { System.out.println(Thread.currentThread().getName() + "========>正在执行!"); Thread.sleep(2000); //执行耗时操作 return "SUCCESS"; } }); Object result = objectFuture.get(); System.out.println(result); executorService.shutdownNow();
FutureTask的run方法
先来看看源代码吧:
public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
FutureTask的run方法第一步果然是获取callable对象,这个callable对象也可能是runnable伪装的,上面介绍了适配器模式,这边就不再赘述了。
最终是存储到outcome对象了,简而言之,FutureTask的run方法的作用就是运行callable的call方法,拿到返回值保存到outcome对象,等待有人来取。
薛定谔的FutureTask
为什么说是薛定谔的FutureTask呢?那是因为,当你把FutureTask跑起来的时候,里面的outcome可能没有值,也可能有值。
但是又因为outcome在FutureTask源码中被设置成private,所以如果你要获取这个数据,只能通过get方法。而get方法是阻塞的,当你调用get方法时,一定是等到任务执行成功后,才会返回真实的值。
这就有点像薛定谔的猫,你不去观察它,两种状态皆有可能,一旦你去观察了(调用get方法),就只有一种明确的状态。
其实这真的只是一个小技巧,相信你也能办到,我们用代码来模拟一下这个过程。
首先,新建一个MyFutureTask类:
/** * 自定义任务类 */ public class MyFutureTask implements Runnable{ /** * 为了看到效果,outcome设置为Object */ public Object outcome; public void run(){ try { /** * 执行耗时操作 */ Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } /** * 给outcome赋值 */ this.outcome = "SUCCESS"; } public Object get(){ return outcome; } }
为了模拟线程池,新建一个MyExecutorService类:
class MyExecutorService { /** * 提交任务 * @param myFutureTask * @return */ public MyFutureTask submit(MyFutureTask myFutureTask){ /** * 开启一个线程把myFutureTask跑掉 */ new Thread(myFutureTask).start(); /** * 线程有没有跑完不关心,直接把myFutureTask返回 * 此时myFutureTask很可能不是最终结果,但其中的outcome一定指向最终结果 */ return myFutureTask; } }
测试:
public static void main(String[] args) throws InterruptedException { MyExecutorService myExecutorService = new MyExecutorService(); MyFutureTask myFutureTask = myExecutorService.submit(new MyFutureTask()); /** * 线程开启立刻查看outcome */ System.out.println(myFutureTask.outcome); /** * 主线程继续运作 */ Thread.sleep(1100); /** * 再次查看outcome是否有值 */ System.out.println(myFutureTask.get()); }
结果:
null SUCCESS
总结一下,ExecutorService的submit方法只是提交Runnable或Callable任务到线程池,直接返回FutureTask给你,这个FutureTask是薛定谔的FutureTask,里面的outcome现在可能有值,也可能没有。只有当你主动调用get方法,才可以得到确切的值。
FutureTask的get方法阻塞原理
FutureTask的get方法是阻塞的,当你调用这个方法就一定要等该线程跑完,那么为什么能做到这样呢?
接下来我们看看get方法的阻塞原理是什么,我重新写了一个例子,注释能写的都写了,代码如下:
package com.javaxbfs.thread; import java.util.concurrent.locks.LockSupport; class MyExecutorService { /** * 提交任务 * @param myFutureTask * @return */ public MyFutureTask submit(MyFutureTask myFutureTask){ /** * 开启一个线程把myFutureTask跑掉 */ Thread thread = new Thread(myFutureTask); thread.start(); /** * 把线程赋给myFutureTask的runner属性,以方便查看线程状态 */ myFutureTask.runner = thread; /** * 线程有没有跑完不关心,直接把myFutureTask返回 * 此时myFutureTask很可能不是最终结果,但其中的outcome一定指向最终结果 */ return myFutureTask; } } /** * 自定义任务类 */ public class MyFutureTask implements Runnable{ /** * 为了看到效果,outcome设置为Object */ public Object outcome; /** * 当前任务所在的线程 */ public volatile Thread runner; public void run(){ try { /** * 执行耗时操作 */ Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } /** * 给outcome赋值 */ this.outcome = "SUCCESS"; } public Object get() throws InterruptedException { if(awaitDone()) return outcome; return null; } private boolean awaitDone() throws InterruptedException { /** * 做一个死循环,轮询检查当前线程状态 */ for(;;){ /** * 如果当前线程被打断,则抛异常结束任务 */ if(runner.isInterrupted()){ throw new InterruptedException(); } if(runner.getState() == Thread.State.NEW){ System.out.println(Thread.currentThread().getName() + "新建!"); } if(runner.getState() == Thread.State.RUNNABLE){ System.out.println(Thread.currentThread().getName() + "准备就绪!"); } if(runner.getState() == Thread.State.TERMINATED){ System.out.println(Thread.currentThread().getName() + "执行完毕!"); return true; } Thread.sleep(200); } } public static void main(String[] args) throws InterruptedException { MyExecutorService myExecutorService = new MyExecutorService(); MyFutureTask myFutureTask = myExecutorService.submit(new MyFutureTask()); /** * 线程开启立刻查看outcome */ System.out.println(myFutureTask.outcome); /** * 主线程继续运作 */ //Thread.sleep(1100); /** * 再次查看outcome是否有值,现在是阻塞的 */ System.out.println(myFutureTask.get()); } }
关键就在于这个 awaitDone 方法(源码也叫这个名字),它里面是一个死循环,不断去检查当前FutureTask所在线程的状态,当线程执行结束,就返回true,表示可以给出精确的result了。
真实的get方法实现非常复杂,不过思路是差不多的。