线程池中的Future:
1.线程池的典型使用场景
ExecutorService executorService = Executors.newFixedThreadPool(10); //此处Task为实现Runnable接口的类 Future future = executorService.submit(new Task()); try { future.get();//此处会阻塞等待结果 } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); }
线程池的提交接口会返回一个Future对象,该对象的get方法会造成调用线程阻塞,等任务执行完毕时,会唤醒调用线程继续执行.
2.线程池Future的实现原理;
public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); //此处构造了一个future对象 RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } //构造一个FutureTask返回 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value); }
可以看到线程池的任务提交后内部会构造一个FutureTask对象返回
3.FutureTask对象:
//类名FutureTask public FutureTask(Runnable runnable, V result) { //包装成callable接口 this.callable = Executors.callable(runnable, result); this.state = NEW; // 设置任务的初始状态 } /** * 等待任务执行完毕返回结果 * @throws CancellationException {@inheritDoc} */ public V get() throws InterruptedException, ExecutionException { int s = state; //如果任务状态为处理中,或者未处理则等待处理完成 if (s <= COMPLETING) s = awaitDone(false, 0L);//此处会导致阻塞 //报告结果 return report(s); }
造成调用线程阻塞的方法是awaitDone,任务处理完成后会唤醒阻塞线程,从而继续调用report方法返回处理结果.
4.awaitDone
//类名FutureTask /** * Awaits completion or aborts on interrupt or timeout. * 此方法有3种处理结果,等待任务完成或者被interrupt退出,或者超时退出 * @param timed true表示设置了等待时间 * @param nanos 等待时长 * @return state upon completion */ private int awaitDone(boolean timed, long nanos) throws InterruptedException { //任务到期时间 final long deadline = timed ? System.nanoTime() + nanos : 0L; //等待节点,链表结构,记录了所有调用get方法的线程 WaitNode q = null; boolean queued = false; for (;;) { //判断当前线程是否被中断 if (Thread.interrupted()) { //删除链表中该线程所在的节点 removeWaiter(q); //抛出中断异常 throw new InterruptedException(); } int s = state; //任务已经执行结束 if (s > COMPLETING) { if (q != null) q.thread = null;//help gc return s;//返回执行状态 } //任务正在执行中 else if (s == COMPLETING) // cannot time out yet Thread.yield();//让出cpu执行权 else if (q == null) q = new WaitNode();//初始一个等待节点 else if (!queued)//CAS操作,如果节点没有放入链表(栈结构),就放入栈中 queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) {//如果设置了超时时间,则进行超时等待 nanos = deadline - System.nanoTime(); if (nanos <= 0L) {//判断是否超时 removeWaiter(q);//删除等待节点 return state; } LockSupport.parkNanos(this, nanos);//阻塞当前线程,nanos时间 } else LockSupport.park(this);//阻塞当前线程 } } //等待节点,记录等待执行结果的线程 static final class WaitNode { volatile Thread thread; //等待线程 volatile WaitNode next;//指向下一节点,链表结构 WaitNode() { thread = Thread.currentThread(); } } //删除等待的节点 private void removeWaiter(WaitNode node) { if (node != null) { node.thread = null;//设置待删除节点的线程为空,用于标记待删除线程 retry: for (;;) { // 处理多个节点同时删除的情况,restart on removeWaiter race for (WaitNode pred = null, q = waiters, s; q != null; q = s) { s = q.next;//s指向q的后继节点 if (q.thread != null)//判断当前节点是否是待删除的节点 pred = q;//pred指向q的前驱节点 else if (pred != null) {//走到这个分支,说明q.thread==null,也就是说q为待删除节点 pred.next = s;//将q的前驱指向q的后继 if (pred.thread == null) // 如果此时前驱节点也变成待删除节点,说明此时有竞争,CAS重试 continue retry; } //走到这个分支,说明前驱节点为空,此时设置q的后继节点为链表的头节点 else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s)) continue retry;//失败则跳出,cas重试 } break; } } } //返回任务执行结果 private V report(int s) throws ExecutionException { Object x = outcome;//执行结果 if (s == NORMAL)//判断状态,执行完成则返回 return (V)x; if (s >= CANCELLED)//取消执行 throw new CancellationException(); throw new ExecutionException((Throwable)x);//执行异常 }
以上,我们分析了Future造成调用线程阻塞的逻辑,那么何时会唤醒阻塞的线程呢?有两种情况,一是任务执行完毕,一是任务被取消执行。
5.唤醒阻塞的线程:
//类名FutureTask //线程池的任务执行逻辑 public void run() { //不是初始状态,或者CAS设置任务的当前处理线程失败,则返回 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) {//判断状态,任务有可能被取消执行 V result; boolean ran; try { result = c.call();//执行具体任务 ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex);//设置任务异常 } if (ran) set(result);//设置执行结果 } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null;//执行线程置空 // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } } protected void set(V v) { //cas操作,防止此时其他线程修改状态 if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v;//记录结果 UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 设置最终状态,final state finishCompletion();//执行完成,唤醒等待结果的线程 } } protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } } //任务执行完成,唤醒等待节点 private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { //cas设置链表头结点为空,设置失败,重新循环判断 if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t);//唤醒等待节点 } WaitNode next = q.next; if (next == null)//到尾部后,退出循环 break; q.next = null; // unlink to help gc q = next;//移动节点,循环判断 } break; } } done(); callable = null; // to reduce footprint } //取消任务执行,返回false表示取消失败,true表示成功 public boolean cancel(boolean mayInterruptIfRunning) { //如果状态不是NEW,或者CAS设置状态失败,返回false if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { // in case call to interrupt throws exception if (mayInterruptIfRunning) {//运行的时候是否允许中断 try { Thread t = runner; if (t != null) t.interrupt();//中断任务执行线程 } finally { // final state UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { finishCompletion();//通知任务已执行完毕 } return true; }
6.Java中线程池的Future的运行原理:任务提交到线程池后,线程池会生成一个Future对象用于跟踪任务的执行状态,并可以取消任务的执行,如果线程池还没有开始执行任务,那么取消操作将导致任务不会被线程池执行,否则的话取消操作可以对线程池中执行任务的线程进行中断。