Java多线程专题之Callable、Future与FutureTask(含源码分析)

简介: Java多线程专题之Callable、Future与FutureTask(含源码分析)

前言

大家好,一直以来我都本着用最通俗的话理解核心的知识点, 我认为所有的难点都离不开 基础知识 的铺垫。目前正在出一个Java多线程专题长期系列教程,从入门到进阶, 篇幅会较多, 喜欢的话,给个关注❤️ ~


适合人群

  • 有一定的Java基础
  • 想学习或了解多线程开发
  • 想提高自己的同学

大佬可以绕过 ~


背景

之前给大家讲了一些框架的使用,这些都属于业务层面的东西,你需要熟练掌握它并在项目中会运用它即可,但这些对自身技术的积累是远远不够的,如果你想要提高自己,对于语言本身你需要花更多的时间去挖掘而不是局限于框架的使用,所以之前为什么跟大家一直强调基础的重要性,框架可以千变万化,层出不穷,但是基础它是不变的,不管是学java还是前端或者是其它语言, 这一点大家还是需要认清的。


接下来的几期会专门讲多线程这一块,篇幅会较多,耐心看完你一定会有收获~


情景回顾

上期带大家学习了什么是进阶学习了Thread以及分析了它的一些源码,本期带大家学习Callable、Future与FutureTask的用法以及源码分析, 内容较多, 我们一起来看一下吧~


Callable & Future

之前我们通过Runnable,Thread就可以创建一个线程,但是它也有一个局限,就是没有返回值,有时候我们的需求需要结合多任务处理后的数据做一些事情,所以通过上边的方法就不好解决了。


下面我们看一下Callable

public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}
复制代码


首先它是一个接口,且还提供了泛型的支持,call方法有返回值, 那怎么使用它呢,肯定是要实现它

public class CallableTest {
    public static class CallableDemo implements Callable<String> {
        @Override
        public String call() throws Exception {
            return "hello";
        }
    }
    public static void main(String[] args) throws Exception {
        CallableDemo demo = new CallableDemo();
        String result = demo.call();
        System.out.println(result);
        System.out.println("main");
    }
}
复制代码


运行一下实际输出

hello
main
复制代码


发现返回的结果输出出去了,但是这里有个问题,这个main输出在hello之后,似乎好像没有开启一个线程,依然是同步执行的,是这样吗,我们看一下call内部的线程环境

public String call() throws Exception {
    System.out.println(Thread.currentThread());
    Thread.sleep(3000);
    return "hello";
}
复制代码


运行一下实际输出

Thread[main,5,main]
hello
main
复制代码


好家伙,还是main线程内部,并且线程还被阻塞了,原来new是开启不了线程的,只是单纯的实现了一下它的接口,我们姿势搞错了。其实它的源码上加了注释的,说通常会借助Excutors类使用,这个类是用来创建线程池的,这个我们后边讲,这里给大家演示一下

public static void main(String[] args) throws Exception {
    CallableDemo demo = new CallableDemo();
    // 创建线程池
    ExecutorService executor = Executors.newCachedThreadPool();
    // 提交任务
    Future<String> future = executor.submit(demo);
    System.out.println("main");
}
复制代码


实际输出:

main
Thread[pool-1-thread-1,5,main]
复制代码


发现是单独线程执行的,并且没有阻塞线程。我们发现这里也用到了Future,这个翻译过来时未来的意思,这里也就是结果发生在后边,它是一个异步情况, 那么我们如何获取到结果呢?

System.out.println(future.get());
System.out.println("main");
复制代码


实际输出:

Thread[pool-1-thread-1,5,main]
hello
main
复制代码


发现结果拿到了,但是运行的时候好像线程被阻塞了,我们可以发现get()会导致线程阻塞,举一反三,我想不阻塞的情况下拿到返回值,可以吗❓那有什么办法呢?开启单独的线程不就好了,那么在单独的线程可以拿到其它线程的值吗,我们来试一下

new Thread(() -> {
    try {
        System.out.println(future.get());
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }
}).start();
System.out.println("main");
复制代码


实际运行输出:

Thread[pool-1-thread-1,5,main]
main
hello
复制代码


发现,这下就对了~


Future & FutureTask 源码解析

端起小板凳,这部分好好听,我们主要看下它的源码实现。我们上文使用到了 Future,我们看一下它的定义,发现它也是一个接口

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
}
复制代码


还有一个接口叫做RunnableFuture,FutureTask是它的一个实现类,这个类帮我实现了很多好用的方法,因为我们自己实现的话是很麻烦的

public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}
复制代码


之前的例子也可以用FutureTask改写成:

public static void main(String[] args) throws Exception {
    CallableDemo demo = new CallableDemo();
    ExecutorService executor = Executors.newCachedThreadPool();
    FutureTask<String> futureTask = new FutureTask<>(demo);
    executor.submit(futureTask);
    System.out.println(futureTask.get());
}
复制代码


它继承了 Runnable, Future接口,我们之前调用的get方法就是其中之一,来一起看一下这个get是如何拿到值的,该部分源码来自FutureTask类实现

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}
复制代码


这个state线程的状态值,这里很好理解,一个是阻塞方法awaitDone,一个是抛出结果report,我们重点看一下awaitDone的实现:

private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }
        int s = state;
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        else if (q == null)
            q = new WaitNode();
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                    q.next = waiters, q);
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            LockSupport.parkNanos(this, nanos);
        }
        else
            LockSupport.park(this);
    }
}
复制代码


首先它是一个内部方法,timed指定是否定时等待,如果传true的话需要指定时间nanos

// 销亡时间  System.nanoTime() 正在运行的 Java 虚拟机的高分辨率时间源的当前值,以纳秒为单位
final long deadline = timed ? System.nanoTime() + nanos : 0L;
复制代码


WaitNode q = null;它是一个链表结构 volatile 被用来修饰会被不同线程访问和修改的变量, 后边还会讲到,此处先有个印象

static final class WaitNode {
    volatile Thread thread;
    volatile WaitNode next;
    WaitNode() { thread = Thread.currentThread(); }
}
复制代码


for (;;) {...},这是一个死循环,这里就是阻塞部分了,内部先会判断线程状态

// 判断线程状态 如果中断,直接抛出异常,并且将```q```从节点中移除
if (Thread.interrupted()) {
    removeWaiter(q);
    throw new InterruptedException();
}
复制代码


这里为什么会移除呢,想想看,如果不移除,内部积累太多,每次都要遍历它,如果是有竞争的情况下,是不是很浪费。这里主要是避免不必要的高额开销

// 线程状态 最先是 NEW
int s = state;
if (s > COMPLETING) {
    // 如果线程完成状态 移除q节点 并返回当前线程状态 最终通过report返回结果
    if (q != null)
        q.thread = null;
    return s;
}
复制代码


这里为什么移除?因为完成了,我只要结果就好了,不需要在进一步判断了

else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
复制代码


如果处于COMPLETING,会让出cpu时间

else if (q == null)
                q = new WaitNode();
复制代码


这个很好理解,节点不存在就创建一个

else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
复制代码


如果有新任务进来,会新建一个节点,然后利用CAS操作放入waiter链表的头部,这里是一个原子性操作,CAS的概念我们后边给大家讲,这里一切都是为了安全

compareAndSwap是个原子方法,原理是CAS,即将内存中的值与期望值进行比较,如果相等,就将内存中的值修改成新值并返回true。

else if (timed) {
    nanos = deadline - System.nanoTime();
    if (nanos <= 0L) {
        removeWaiter(q);
        return state;
    }
    LockSupport.parkNanos(this, nanos);
}
else
    LockSupport.park(this);
复制代码


这里判断消亡时间,如果超时了,移除节点,并返回线程状态,LockSupport使线程阻塞,有的同学可能会问,for不是已经阻塞了吗❓那为啥还调用LockSupport,这里其实是线程优化,想想你一直for循环一直判断是不是也会产生开销,加上LockSupport避免不要的操作,其实for的整个过程是实现了自旋锁的操作。


阻塞了不就没法执行了吗,park加锁方法还有一个对应的unpark相当于释放,但此处没有看到这个方法,那么它在哪个地方呢❓我们大体应该可以猜到,它应该是在执行阶段,还记得RunnableFuture接口下的run方法吗?下面我们看一下它的实现

public void run() {
        // 判断线程状态 如果不为NEW 或者 并判断值是否一样,如果不一样就直接返回
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            // 这一步是执行我们的任务
            Callable<V> c = callable;
            // 如果任务存在 并且处于NEW状态
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    // 执行任务
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    // 异常检测
                    result = null;
                    ran = false;
                    setException(ex);
                }
                // 执行成功,设置返回值
                if (ran)
                    set(result);
            }
        } finally {
            // 这里其实是释放阶段 防止并发调用
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            // 这一步其实是防止在中断时提交任务,内部是调用了一个Thread.yield()
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
复制代码


下面我们重点看一下这个set方法

protected void set(V v) {
        // 先比较是否相同
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            // outcome 是返回的结果或者异常 setException这里是设置异常结果 异常赋值给outcome
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 设置最终状态
            finishCompletion();
        }
}
复制代码


UNSAFE类是一个很特殊的类,它的内部几乎都是native方法,它可以使得我们能够操作内存空间来获得更高的性能,但一般我们很少使用它,因为它不被gc控制,使用不当jvm可能都会挂了。我们重点关注一下 finishCompletion这个方法

private void finishCompletion() {
        for (WaitNode q; (q = waiters) != null;) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                // 遍历节点释放锁
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }
        // 默认下它是一个空方法,可以用于执行完成的回调方法, 可以覆盖实现
        done();
        callable = null;        // to reduce footprint
    }
复制代码


我们可以看到在这个内部它是调了一个unpark方法的,可以看出之前awaitDone()方法内部的线程阻塞在这个地方被唤醒了, 再回回过头看awaitDone()方法,就明白为啥要调用park方法了,因为线程没有达到大于COMPLETING状态,它会一直for

最后一个就是report了,返回值

private V report(int s) throws ExecutionException {
        Object x = outcome;
        // 在set的时候 我们可以看到有设置为这个状态。 V就是传入的类型
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }
复制代码


于是我们的get就拿到返回值了


FutureTask 状态

这里给大家补充一下FutureTask的状态值

private volatile int state;
private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;
复制代码


state可能的状态转变路径如下:

  • NEW -> COMPLETING -> NORMAL
  • NEW -> COMPLETING -> EXCEPTIONAL
  • NEW -> CANCELLED
  • NEW -> INTERRUPTING -> INTERRUPTED


结束语

本期到这里就结束了, 总结一下,本节主要讲了Callable、Future与FutureTask的常用方法,以及从问题触发,带大家分析了一下FutureTask的源码,这里大家要好好理解,不要去背,想要告诉大家的是学习要带着问题, 看源码一定要大胆猜测,冷静分析 ~


下期预告

之前提到过线程组的概念,下期就带大家学习线程组和线程的优先级。关注公众号加群,一起学习进步。关注我,不迷路, 下期不见不散 ~

相关文章
|
4月前
|
安全 算法 Java
Java 多线程:线程安全与同步控制的深度解析
本文介绍了 Java 多线程开发的关键技术,涵盖线程的创建与启动、线程安全问题及其解决方案,包括 synchronized 关键字、原子类和线程间通信机制。通过示例代码讲解了多线程编程中的常见问题与优化方法,帮助开发者提升程序性能与稳定性。
200 0
|
5月前
|
Java 数据挖掘 调度
Java 多线程创建零基础入门新手指南:从零开始全面学习多线程创建方法
本文从零基础角度出发,深入浅出地讲解Java多线程的创建方式。内容涵盖继承`Thread`类、实现`Runnable`接口、使用`Callable`和`Future`接口以及线程池的创建与管理等核心知识点。通过代码示例与应用场景分析,帮助读者理解每种方式的特点及适用场景,理论结合实践,轻松掌握Java多线程编程 essentials。
364 5
|
5月前
|
监控 搜索推荐 Java
Java 多线程最新实操技术与应用场景全解析:从基础到进阶
本文深入探讨了Java多线程的现代并发编程技术,涵盖Java 8+新特性,如CompletableFuture异步处理、Stream并行流操作,以及Reactive编程中的Reactor框架。通过具体代码示例,讲解了异步任务组合、并行流优化及响应式编程的核心概念(Flux与Mono)。同时对比了同步、CompletableFuture和Reactor三种实现方式的性能,并总结了最佳实践,帮助开发者构建高效、扩展性强的应用。资源地址:[点击下载](https://pan.quark.cn/s/14fcf913bae6)。
375 3
|
6月前
|
存储 缓存 安全
JUC并发—11.线程池源码分析
本文主要介绍了线程池的优势和JUC提供的线程池、ThreadPoolExecutor和Excutors创建的线程池、如何设计一个线程池、ThreadPoolExecutor线程池的执行流程、ThreadPoolExecutor的源码分析、如何合理设置线程池参数 + 定制线程池。
JUC并发—11.线程池源码分析
|
6月前
|
算法 Java 调度
Java多线程基础
本文主要讲解多线程相关知识,分为两部分。第一部分涵盖多线程概念(并发与并行、进程与线程)、Java程序运行原理(JVM启动多线程特性)、实现多线程的两种方式(继承Thread类与实现Runnable接口)及其区别。第二部分涉及线程同步(同步锁的应用场景与代码示例)及线程间通信(wait()与notify()方法的使用)。通过多个Demo代码实例,深入浅出地解析多线程的核心知识点,帮助读者掌握其实现与应用技巧。
131 1
|
6月前
|
Java
java 多线程异常处理
本文介绍了Java中ThreadGroup的异常处理机制,重点讲解UncaughtExceptionHandler的使用。通过示例代码展示了当线程的run()方法抛出未捕获异常时,JVM如何依次查找并调用线程的异常处理器、线程组的uncaughtException方法或默认异常处理器。文章还提供了具体代码和输出结果,帮助理解不同处理器的优先级与执行逻辑。
163 1
|
5月前
|
人工智能 Java API
Java并发编程之Future与FutureTask
本文深入解析了Future接口及其实现类FutureTask的原理与使用。Future接口定义了获取任务结果、取消任务及查询任务状态的规范,而FutureTask作为其核心实现类,结合了Runnable与Future的功能。文章通过分析FutureTask的成员变量、状态流转、关键方法(如run、set、get、cancel等)的源码,展示了异步任务的执行与结果处理机制。最后,通过示例代码演示了FutureTask的简单用法,帮助读者更直观地理解其工作原理。适合希望深入了解Java异步编程机制的开发者阅读。
108 0
|
10月前
|
Java 程序员
Java社招面试中的高频考点:Callable、Future与FutureTask详解
大家好,我是小米。本文主要讲解Java多线程编程中的三个重要概念:Callable、Future和FutureTask。它们在实际开发中帮助我们更灵活、高效地处理多线程任务,尤其适合社招面试场景。通过 Callable 可以定义有返回值且可能抛出异常的任务;Future 用于获取任务结果并提供取消和检查状态的功能;FutureTask 则结合了两者的优势,既可执行任务又可获取结果。掌握这些知识不仅能提升你的编程能力,还能让你在面试中脱颖而出。文中结合实例详细介绍了这三个概念的使用方法及其区别与联系。希望对大家有所帮助!
513 60
|
8月前
|
存储 网络协议 安全
Java网络编程,多线程,IO流综合小项目一一ChatBoxes
**项目介绍**:本项目实现了一个基于TCP协议的C/S架构控制台聊天室,支持局域网内多客户端同时聊天。用户需注册并登录,用户名唯一,密码格式为字母开头加纯数字。登录后可实时聊天,服务端负责验证用户信息并转发消息。 **项目亮点**: - **C/S架构**:客户端与服务端通过TCP连接通信。 - **多线程**:采用多线程处理多个客户端的并发请求,确保实时交互。 - **IO流**:使用BufferedReader和BufferedWriter进行数据传输,确保高效稳定的通信。 - **线程安全**:通过同步代码块和锁机制保证共享数据的安全性。
347 23
|
7月前
|
数据采集 存储 网络协议
Java HttpClient 多线程爬虫优化方案
Java HttpClient 多线程爬虫优化方案

热门文章

最新文章

下一篇
oss云网关配置