6.2 构造方法
6.2.1 Callable 参数
6.2.2 Runnable 参数
为协调 callable 属性,辅助 result 参数。
Runnable 是没有返回值的,所以 result 一般没有用,置为 null 即可,正如 JDK 所推荐的写法
Future<?> f = new FutureTask<Void>(runnable, null)}
Executors.callable 方法负责将 runnable 适配成 callable。
通过转化类 RunnableAdapter进行适配
6.2.3 小结
适配器模式把 Runnable 适配成 Callable,那么我们首先要实现 Callable 接口,并且在 Callable 的 call 方法里面调用被适配对象即 Runnable的方法即可.
6.3 get
限时阻塞的 get 方法:
public V get(long timeout, TimeUnit unit) { int s = state; // 任务已经在执行中了 if (s <= COMPLETING && // 并且等待一定时间后,仍在执行中,抛异常 (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) throw new TimeoutException(); // 任务完成,返回执行结果 return report(s); }
等待任务执行完成
private int awaitDone(boolean timed, long nanos) { // 计算等待终止时间,如果一直等待的话,终止时间为 0 final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; // 不排队 boolean queued = false; // 无限循环 for (;;) { // 如果线程已经被打断了,删除,抛异常 if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } // 当前任务状态 int s = state; // 当前任务已经执行完了,返回 if (s > COMPLETING) { // 当前任务的线程置空 if (q != null) q.thread = null; return s; } // 如果正在执行,当前线程让出 cpu,重新竞争,防止 cpu 飙高 else if (s == COMPLETING) // cannot time out yet Thread.yield(); // 如果第一次运行,新建 waitNode,当前线程就是 waitNode 的属性 else if (q == null) q = new WaitNode(); // 默认第一次都会执行这里,执行成功之后,queued 就为 true,就不会再执行了 // 把当前 waitNode 当做 waiters 链表的第一个 else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); // 如果设置了超时时间,并过了超时时间的话,从 waiters 链表中删除当前 wait else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } // 没有过超时时间,线程进入 TIMED_WAITING 状态 LockSupport.parkNanos(this, nanos); } // 没有设置超时时间,进入 WAITING 状态 else LockSupport.park(this); } }
get 是一种阻塞式方法,当发现任务还在进行中,没有完成时,就会阻塞当前进程,等待任务完成后再返回结果值。
阻塞底层使用的是 LockSupport.park 方法,使当前线程进入 WAITING
或 TIMED_WAITING
态。
6.4 run
该方法可被直接调用,也可由线程池调用
public void run() { // 状态非 NEW 或当前任务已有线程在执行,直接返回 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; // Callable 非空且已 NEW if (c != null && state == NEW) { V result; boolean ran; try { // 真正执行业务代码的地方 result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } // 给 outcome 赋值,这样 Future.get 方法执行时,就可以从 outCome 中取值 if (ran) set(result); } } finally { runner = null; int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
run 方法没有返回值,通过给 outcome 属性赋值(set(result)),get 时就能从 outcome 属性中拿到返回值。
FutureTask 两种构造器,最终都转化成了 Callable,所以在 run 方法执行的时候,只需要执行 Callable 的 call 方法即可,在执行 c.call() 代码时,如果入参是 Runnable 的话, 调用路径为 c.call() -> RunnableAdapter.call() -> Runnable.run(),如果入参是 Callable 的话,直接调用。
6.5 cancel
// 取消任务,如果正在运行,尝试去打断 public boolean cancel(boolean mayInterruptIfRunning) { // 任务状态不是创建 if (!(state == NEW && // 并且不能把 new 状态置为取消 UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) // 直接返回 false return false; // 进行取消操作,打断可能会抛异常,选择 try/finally结构 try { // in case call to interrupt throws exception if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null) t.interrupt(); } finally { // final state //状态设置成已打断 UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { // 清理线程 finishCompletion(); } return true; }