不会用Java Future,我怀疑你泡茶没我快, 又是超长图文!!(上)

简介: 不会用Java Future,我怀疑你泡茶没我快, 又是超长图文!!(上)

微信图片_20220511114115.png

  • 你有一个思想,我有一个思想,我们交换后,一个人就有两个思想
  • If you can NOT explain it simply, you do NOT understand it well enough


微信图片_20220511114142.png


前言


创建线程有几种方式?这个问题的答案应该是可以脱口而出的吧


  • 继承 Thread 类


  • 实现 Runnable 接口


但这两种方式创建的线程是属于”三wu产品“:


  • 没有参数


  • 没有返回值


  • 没办法抛出异常


class MyThread implements Runnable{
   @Override
   public void run() {
      log.info("my thread");
   }
}


Runnable 接口是 JDK1.0 的核心产物


 /**
 * @since   JDK1.0
 */
@FunctionalInterface
public interface Runnable {
    public abstract void run();
}


用着 “三wu产品” 总是有一些弊端,其中没办法拿到返回值是最让人不能忍的,于是 Callable 就诞生了


Callable


又是 Doug Lea 大师,又是 Java 1.5 这个神奇的版本


 /**
 * @see Executor
 * @since 1.5
 * @author Doug Lea
 * @param <V> the result type of method {@code call}
 */
@FunctionalInterface
public interface Callable<V> {
    V call() throws Exception;
}


Callable 是一个泛型接口,里面只有一个 call() 方法,该方法可以返回泛型值 V ,使用起来就像这样:


Callable<String> callable = () -> {
    // Perform some computation
    Thread.sleep(2000);
    return "Return some result";
};


微信图片_20220511114449.png


二者都是函数式接口,里面都仅有一个方法,使用上又是如此相似,除了有无返回值,Runnable 与 Callable 就点差别吗?


Runnable VS Callable


两个接口都是用于多线程执行任务的,但他们还是有很明显的差别的


微信图片_20220511114529.png


执行机制


先从执行机制上来看,Runnable 你太清楚了,它既可以用在 Thread 类中,也可以用在 ExecutorService 类中配合线程池的使用;Bu~~~~t, Callable 只能在 ExecutorService 中使用,你翻遍 Thread 类,也找不到Callable 的身影


微信图片_20220511114553.png


异常处理


Runnable 接口中的 run 方法签名上没有 throws ,自然也就没办法向上传播受检异常;而 Callable 的 call() 方法签名却有 throws,所以它可以处理受检异常;

所以归纳起来看主要有这几处不同点:


微信图片_20220511114619.png


整体差别虽然不大,但是这点差别,却具有重大意义


返回值和处理异常很好理解,另外,在实际工作中,我们通常要使用线程池来管理线程(原因已经在 为什么要使用线程池? 中明确说明),所以我们就来看看 ExecutorService 中是如何使用二者的


ExecutorService


先来看一下 ExecutorService 类图


微信图片_20220511114705.png


我将上图标记的方法单独放在此处


void execute(Runnable command);
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);


可以看到,使用ExecutorService 的 execute() 方法依旧得不到返回值,而 submit() 方法清一色的返回 Future 类型的返回值


细心的朋友可能已经发现, submit() 方法已经在 CountDownLatch 和 CyclicBarrier 傻傻的分不清楚? 文章中多次使用了,只不过我们没有获取其返回值罢了,那么


  • Future 到底是什么呢?


  • 怎么通过它获取返回值呢?


我们带着这些疑问一点点来看


Future


Future 又是一个接口,里面只有五个方法:


微信图片_20220511114817.png


从方法名称上相信你已经能看出这些方法的作用


// 取消任务
boolean cancel(boolean mayInterruptIfRunning);
// 获取任务执行结果
V get() throws InterruptedException, ExecutionException;
// 获取任务执行结果,带有超时时间限制
V get(long timeout, TimeUnit unit) throws InterruptedException,                             ExecutionException,  TimeoutException;
// 判断任务是否已经取消
boolean isCancelled();
// 判断任务是否已经结束
boolean isDone();


铺垫了这么多,看到这你也许有些乱了,咱们赶紧看一个例子,演示一下几个方法的作用


@Slf4j
public class FutureAndCallableExample {
   public static void main(String[] args) throws InterruptedException, ExecutionException {
      ExecutorService executorService = Executors.newSingleThreadExecutor();
      // 使用 Callable ,可以获取返回值
      Callable<String> callable = () -> {
         log.info("进入 Callable 的 call 方法");
         // 模拟子线程任务,在此睡眠 2s,
         // 小细节:由于 call 方法会抛出 Exception,这里不用像使用 Runnable 的run 方法那样 try/catch 了
         Thread.sleep(5000);
         return "Hello from Callable";
      };
      log.info("提交 Callable 到线程池");
      Future<String> future = executorService.submit(callable);
      log.info("主线程继续执行");
      log.info("主线程等待获取 Future 结果");
      // Future.get() blocks until the result is available
      String result = future.get();
      log.info("主线程获取到 Future 结果: {}", result);
      executorService.shutdown();
   }
}


程序运行结果如下:


微信图片_20220511114930.png


如果你运行上述示例代码,主线程调用 future.get() 方法会阻塞自己,直到子任务完成。我们也可以使用 Future 方法提供的 isDone 方法,它可以用来检查 task 是否已经完成了,我们将上面程序做点小修改:


// 如果子线程没有结束,则睡眠 1s 重新检查
while(!future.isDone()) {
   System.out.println("Task is still not done...");
   Thread.sleep(1000);
}


来看运行结果:


微信图片_20220511115016.png


如果子程序运行时间过长,或者其他原因,我们想 cancel 子程序的运行,则我们可以使用 Future 提供的 cancel 方法,继续对程序做一些修改


while(!future.isDone()) {
   System.out.println("子线程任务还没有结束...");
   Thread.sleep(1000);
   double elapsedTimeInSec = (System.nanoTime() - startTime)/1000000000.0;
      // 如果程序运行时间大于 1s,则取消子线程的运行
   if(elapsedTimeInSec > 1) {
      future.cancel(true);
   }
}


来看运行结果:


微信图片_20220511115109.png


为什么调用 cancel 方法程序会出现 CancellationException 呢? 是因为调用 get() 方法时,明确说明了:


调用 get() 方法时,如果计算结果被取消了,则抛出 CancellationException (具体原因,你会在下面的源码分析中看到)


微信图片_20220511115146.png


有异常不处理是非常不专业的,所以我们需要进一步修改程序,以更友好的方式处理异常


// 通过 isCancelled 方法判断程序是否被取消,如果被取消,则打印日志,如果没被取消,则正常调用 get() 方法
if (!future.isCancelled()){
   log.info("子线程任务已完成");
   String result = future.get();
   log.info("主线程获取到 Future 结果: {}", result);
}else {
   log.warn("子线程任务被取消");
}


查看程序运行结果:


微信图片_20220511115231.png


相信到这里你已经对 Future 的几个方法有了基本的使用印象,但 Future 是接口,其实使用 ExecutorService.submit() 方法返回的一直都是 Future 的实现类 FutureTask


微信图片_20220511115255.png


接下来我们就进入这个核心实现类一探究竟


FutureTask


同样先来看类结构


微信图片_20220511115341.png


public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
}


很神奇的一个接口,FutureTask 实现了 RunnableFuture 接口,而 RunnableFuture 接口又分别实现了 RunnableFuture 接口,所以可以推断出 FutureTask 具有这两种接口的特性:


  • Runnable 特性,所以可以用在 ExecutorService 中配合线程池使用


  • Future 特性,所以可以从中获取到执行结果


FutureTask源码分析


如果你完整的看过 AQS 相关分析的文章,你也许会发现,阅读 Java 并发工具类源码,我们无非就是要关注以下这三点:


- 状态 (代码逻辑的主要控制)
- 队列 (等待排队队列)
- CAS (安全的set 值)
脑海中牢记这三点,咱们开始看 FutureTask 源码,看一下它是如何围绕这三点实现相应的逻辑的


文章开头已经提到,实现 Runnable 接口形式创建的线程并不能获取到返回值,而实现 Callable 的才可以,所以 FutureTask 想要获取返回值,必定是和 Callable 有联系的,这个推断一点都没错,从构造方法中就可以看出来:


public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
}


即便在 FutureTask 构造方法中传入的是 Runnable 形式的线程,该构造方法也会通过 Executors.callable 工厂方法将其转换为 Callable 类型:


public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}


微信图片_20220511115541.png


但是 FutureTask 实现的是 Runnable 接口,也就是只能重写 run() 方法,run() 方法又没有返回值,那问题来了:


  • FutureTask 是怎样在 run() 方法中获取返回值的?
  • 它将返回值放到哪里了?
  • get() 方法又是怎样拿到这个返回值的呢?


我们来看一下 run() 方法(关键代码都已标记注释)


public void run() {
      // 如果状态不是 NEW,说明任务已经执行过或者已经被取消,直接返回
      // 如果状态是 NEW,则尝试把执行线程保存在 runnerOffset(runner字段),如果赋值失败,则直接返回
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
          // 获取构造函数传入的 Callable 值
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                  // 正常调用 Callable 的 call 方法就可以获取到返回值
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                  // 保存 call 方法抛出的异常
                setException(ex);
            }
            if (ran)
                  // 保存 call 方法的执行结果
                set(result);
        }
    } finally {        
        runner = null;       
        int s = state;
          // 如果任务被中断,则执行中断处理
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}


run() 方法没有返回值,至于 run() 方法是如何将 call() 方法的返回结果和异常都保存起来的呢?其实非常简单, 就是通过 set(result) 保存正常程序运行结果,或通过 setException(ex) 保存程序异常信息


/** The result to return or exception to throw from get() */
private Object outcome; // non-volatile, protected by state reads/writes
// 保存异常结果
protected void setException(Throwable t) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = t;
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
        finishCompletion();
    }
}
// 保存正常结果
protected void set(V v) {
  if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
    outcome = v;
    UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
    finishCompletion();
  }
}


setExceptionset 方法非常相似,都是将异常或者结果保存在 Object 类型的 outcome 变量中,outcome 是成员变量,就要考虑线程安全,所以他们要通过 CAS方式设置 outcome 变量的值,既然是在 CAS 成功后 更改 outcome 的值,这也就是 outcome 没有被 volatile 修饰的原因所在。


微信图片_20220511115658.png


保存正常结果值(set方法)与保存异常结果值(setException方法)两个方法代码逻辑,唯一的不同就是 CAS 传入的 state 不同。我们上面提到,state 多数用于控制代码逻辑,FutureTask 也是这样,所以要搞清代码逻辑,我们需要先对 state 的状态变化有所了解


 /*
 *
 * Possible state transitions:
 * NEW -> COMPLETING -> NORMAL  //执行过程顺利完成
 * NEW -> COMPLETING -> EXCEPTIONAL //执行过程出现异常
 * NEW -> CANCELLED // 执行过程中被取消
 * NEW -> INTERRUPTING -> INTERRUPTED //执行过程中,线程被中断
 */
private volatile int state;
private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;


7种状态,千万别慌,整个状态流转其实只有四种线路


微信图片_20220511115740.png


FutureTask 对象被创建出来,state 的状态就是 NEW 状态,从上面的构造函数中你应该已经发现了,四个最终状态 NORMAL ,EXCEPTIONAL , CANCELLED , INTERRUPTED 也都很好理解,两个中间状态稍稍有点让人困惑:


  • COMPLETING: outcome 正在被set 值的时候


  • INTERRUPTING:通过 cancel(true) 方法正在中断线程的时候


总的来说,这两个中间状态都表示一种瞬时状态,我们将几种状态图形化展示一下:


微信图片_20220511115819.png


我们知道了 run() 方法是如何保存结果的,以及知道了将正常结果/异常结果保存到了 outcome 变量里,那就需要看一下 FutureTask 是如何通过 get() 方法获取结果的:


public V get() throws InterruptedException, ExecutionException {
    int s = state;
      // 如果 state 还没到 set outcome 结果的时候,则调用 awaitDone() 方法阻塞自己
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
      // 返回结果
    return report(s);
}


awaitDone 方法是 FutureTask 最核心的一个方法


// get 方法支持超时限制,如果没有传入超时时间,则接受的参数是 false 和 0L
// 有等待就会有队列排队或者可响应中断,从方法签名上看有 InterruptedException,说明该方法这是可以被中断的
private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
      // 计算等待截止时间
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
          // 如果当前线程被中断,如果是,则在等待对立中删除该节点,并抛出 InterruptedException
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }
        int s = state;
          // 状态大于 COMPLETING 说明已经达到某个最终状态(正常结束/异常结束/取消)
          // 把 thread 只为空,并返回结果
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
          // 如果是COMPLETING 状态(中间状态),表示任务已结束,但 outcome 赋值还没结束,这时主动让出执行权,让其他线程优先执行(只是发出这个信号,至于是否别的线程执行一定会执行可是不一定的)
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
          // 等待节点为空
        else if (q == null)
              // 将当前线程构造节点
            q = new WaitNode();
          // 如果还没有入队列,则把当前节点加入waiters首节点并替换原来waiters
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
          // 如果设置超时时间
        else if (timed) {
            nanos = deadline - System.nanoTime();
              // 时间到,则不再等待结果
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
              // 阻塞等待特定时间
            LockSupport.parkNanos(this, nanos);
        }
        else
              // 挂起当前线程,知道被其他线程唤醒
            LockSupport.park(this);
    }
}


总的来说,进入这个方法,通常会经历三轮循环


  1. 第一轮for循环,执行的逻辑是 q == null, 这时候会新建一个节点 q, 第一轮循环结束。


  1. 第二轮for循环,执行的逻辑是 !queue,这个时候会把第一轮循环中生成的节点的 next 指针指向waiters,然后CAS的把节点q 替换waiters, 也就是把新生成的节点添加到waiters 中的首节点。如果替换成功,queued=true。第二轮循环结束。


  1. 第三轮for循环,进行阻塞等待。要么阻塞特定时间,要么一直阻塞知道被其他线程唤醒。


对于第二轮循环,大家可能稍稍有点迷糊,我们前面说过,有阻塞,就会排队,有排队自然就有队列,FutureTask 内部同样维护了一个队列


/** Treiber stack of waiting threads */
private volatile WaitNode waiters;


说是等待队列,其实就是一个 Treiber 类型 stack,既然是 stack, 那就像手枪的弹夹一样(脑补一下子弹放入弹夹的情形),后进先出,所以刚刚说的第二轮循环,会把新生成的节点添加到 waiters stack 的首节点


如果程序运行正常,通常调用 get() 方法,会将当前线程挂起,那谁来唤醒呢?自然是 run() 方法运行完会唤醒,设置返回结果(set方法)/异常的方法(setException方法) 两个方法中都会调用 finishCompletion 方法,该方法就会唤醒等待队列中的线程


private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                      // 唤醒等待队列中的线程
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }
    done();
    callable = null;        // to reduce footprint
}








相关文章
|
2月前
|
缓存 Java 调度
Java并发编程:深入解析线程池与Future任务
【7月更文挑战第9天】线程池和Future任务是Java并发编程中非常重要的概念。线程池通过重用线程减少了线程创建和销毁的开销,提高了资源利用率。而Future接口则提供了检查异步任务状态和获取任务结果的能力,使得异步编程更加灵活和强大。掌握这些概念,将有助于我们编写出更高效、更可靠的并发程序。
|
4月前
|
Java 调度
Java一分钟之线程池:ExecutorService与Future
【5月更文挑战第12天】Java并发编程中,`ExecutorService`和`Future`是关键组件,简化多线程并提供异步执行能力。`ExecutorService`是线程池接口,用于提交任务到线程池,如`ThreadPoolExecutor`和`ScheduledThreadPoolExecutor`。通过`submit()`提交任务并返回`Future`对象,可检查任务状态、获取结果或取消任务。注意处理`ExecutionException`和避免无限等待。实战示例展示了如何异步执行任务并获取结果。理解这些概念对提升并发性能至关重要。
59 5
|
4月前
|
Java
Java 并发编程:深入理解 ExecutorService 和 Future
【5月更文挑战第29天】本文将深入探讨 Java 中的 ExecutorService 和 Future,这两个在并发编程中非常重要的概念。我们将详细解释他们的作用,如何使用,以及他们的一些高级用法。通过本文,你将能够更好地理解和使用 Java 的并发工具,提高你的编程效率和代码质量。
|
4月前
|
安全 Java
【亮剑】Java中的`Future`接口代表异步计算结果,常与`ExecutorService`配合启动任务并获取结果
【4月更文挑战第30天】Java中的`Future`接口代表异步计算结果,常与`ExecutorService`配合启动任务并获取结果。`Future`接口提供`isDone()`、`get()`、`get(timeout, unit)`和`cancel(mayInterruptIfRunning)`等方法。`FutureTask`是`Future`的实现类,可作为`Runnable`执行并返回结果。
49 1
|
4月前
|
Java
Java并发编程:理解并使用Future和Callable接口
【2月更文挑战第25天】 在Java中,多线程编程是一个重要的概念,它允许我们同时执行多个任务。然而,有时候我们需要等待一个或多个线程完成,然后才能继续执行其他任务。这就需要使用到Future和Callable接口。本文将深入探讨这两个接口的用法,以及它们如何帮助我们更好地管理多线程。
|
4月前
|
Java API
java多线程之FutureTask、Future、CompletableFuture
java多线程之FutureTask、Future、CompletableFuture
178 0
|
4月前
|
Java
深入理解 Java 异步编程:Future 和 CompletableFuture 的全面比较
深入理解 Java 异步编程:Future 和 CompletableFuture 的全面比较
131 0
|
4月前
|
网络协议 Java 程序员
完美!腾讯技术官发布Java零基础就业宝典,不用再怀疑人生了
近几年来,互联网行业变化非常大,除了龙头企业的更替,“裁员潮”“失业潮”也不断掀起,尤其是对于年纪太大的程序员真的是不太友好。但是,根据数据统计表明,自2018来,学习IT行业的人不减反增,更有不少其他行业的人转学转行。
|
4月前
|
Java
Java 并发编程 Future及CompletionService
Java 并发编程 Future及CompletionService `Future`用于异步结果计算。它提供了一些方法来检查计算是否完成,使用`get`方法将阻塞线程直到结果返回 `CompletionService`整合了`Executor`和`BlockingQueue`的功能。将`Callable`任务提交给它去执行,使用`take()`和`poll()`获取最新完成的任务执行结果.
Java 并发编程 Future及CompletionService
|
12月前
|
消息中间件 Java UED
Java并发编程异步操作Future和FutureTask
生活是一个洗礼自己的过程,这个洗礼并不是传统意义上的洗礼,传统意义上的洗礼通常认为这个人的思想得到洗礼,灵魂得到洗礼,十分的清新脱俗,不世故,不圆滑,而现实的洗礼实则是让一个人褪去幼稚,褪去无知,让你变得点头哈腰,圆滑世故,我们都是动物,需要物质满足,更需要欲望填补,所以,变成自己小时候唾骂的对象也是可以理解,不过这是一个选择,你可以进行选择,只是在物欲横流的时代,多数人没有这种选择的权力!
69 0