并发编程——Future & CompletableFuture

简介: Java创建线程的方式,一般常用的是Thread,Runnable。如果需要当前处理的任务有返回结果的话,需要使用Callable。Callable运行需要配合Future。

@[TOC]

Future介绍

Java创建线程的方式,一般常用的是Thread,Runnable。如果需要当前处理的任务有返回结果的话,需要使用Callable。Callable运行需要配合Future。

Future是一个接口,一般会使用FutureTask实现类去接收Callable任务的返回结果。

FutureTask使用

下面示例使用FutureTask来执行一个可以返回结果的异步任务。Callable是要执行的任务,FutureTask是存放任务返回结果的位置。

public static void main(String[] args) throws ExecutionException, InterruptedException {
   
    FutureTask<Integer> futureTask = new FutureTask<>(() -> {
   
        System.out.println("任务执行");
        Thread.sleep(2000);
        return 123+764;
    });

    Thread t = new Thread(futureTask);
    t.start();

    System.out.println("main线程启动了t线程处理任务");
    Integer result = futureTask.get();
    System.out.println(result);
}

FutureTask 分析

首先看一下FutureTask的核心属性

/**
 * NEW -> COMPLETING -> NORMAL          任务正常执行,返回结果是正常的结果
 * NEW -> COMPLETING -> EXCEPTIONAL     任务正常执行,但是返回结果是异常
 * NEW -> CANCELLED              任务直接被取消的流程
 * NEW -> INTERRUPTING -> INTERRUPTED
 */
// 代表当前任务的状态
private volatile int state;
private static final int NEW          = 0;  // 任务的初始化状态
private static final int COMPLETING   = 1;  // Callable的结果(正常结果,异常结果)正在封装给当前的FutureTask
private static final int NORMAL       = 2;  // NORMAL任务正常结束
private static final int EXCEPTIONAL  = 3;  // 执行任务时,发生了异常
private static final int CANCELLED    = 4;  // 任务被取消了。
private static final int INTERRUPTING = 5;  // 线程的中断状态,被设置为了true(现在还在运行)
private static final int INTERRUPTED  = 6;  // 线程被中断了。

// 当前要执行的任务
private Callable<V> callable;
// 存放任务返回结果的属性,也就是futureTask.get需要获取的结果
private Object outcome; 
// 执行任务的线程。
private volatile Thread runner;
// 单向链表,存放通过get方法挂起等待的线程
private volatile WaitNode waiters;

t.start后,是通过run方法执行的Callable的call方法,该方法是同步的,然后将返回结果赋值给了outcome。

// run方法的执行流程,最终会执行Callable的call方法
public void run() {
   
    // 保证任务的状态是NEW才可以运行
    // 基于CAS的方式,将当前线程设置为runner。
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))
        return;
    // 准备执行任务
    try {
   
        // 要执行任务 c
        Callable<V> c = callable;
        // 任务不为null,并且任务的状态还处于NEW
        if (c != null && state == NEW) {
   
            // 放返回结果
            V result;
            // 任务执行是否为正常结束
            boolean ran;
            try {
   
                // 运行call方法,拿到返回结果封装到result中
                result = c.call();
                // 正常返回,ran设置为true
                ran = true;
            } catch (Throwable ex) {
   
                // 结果为null
                result = null;
                // 异常返回,ran设置为false
                ran = false;
                // 设置异常信息
                setException(ex);
            }
            if (ran)
                // 正常执行结束,设置返回结果
                set(result);
        }
    } finally {
   
        // 将执行任务的runner设置空
        runner = null;
        // 拿到状态
        int s = state;
        // 中断要做一些后续处理
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}


// 设置返回结果
protected void set(V v) {
   
    // 首先要将任务状态从NEW设置为COMPLETING
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
   
        // 将返回结果设置给outcome。
        outcome = v;
        // 将状态修改为NORMAL,代表正常技术
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL);
        finishCompletion();
    }
}

get方法获取返回结果时会查看当前线程状态,如果状态还未达成,也就是说call方法还未执行完未执行set方法,该线程就会被挂起阻塞LockSupport.park(this);。

public V get() throws InterruptedException, ExecutionException {
   
    // 拿状态
    int s = state;
    // 满足找个状态就代表现在可能还没有返回结果
    if (s <= COMPLETING)
        // 尝试挂起线程,等待拿结果
        s = awaitDone(false, 0L);
    return report(s);
}

// 线程要等待任务执行结束,等待任务执行的状态变为大于COMPLETING状态
private int awaitDone(boolean timed, long nanos) throws InterruptedException {
   
    // 计算deadline,如果是get(),就是0,  如果是get(time,unit)那就追加当前系统时间
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    // 构建WaitNode
    WaitNode q = null;
    // queued = false
    boolean queued = false;
    // 死循环
    for (;;) {
   
        // 找个get的线程是否中断了。
        if (Thread.interrupted()) {
   
            // 将当前节点从waiters中移除。
            removeWaiter(q);
            // 并且抛出中断异常
            throw new InterruptedException();
        }

        // 拿到现在任务的状态
        int s = state;
        // 判断任务是否已经执行结束了
        if (s > COMPLETING) {
   
            // 如果设置过WaitNode,直接移除WaitNode的线程
            if (q != null)
                q.thread = null;
            // 返回当前任务的状态
            return s;
        }
        // 如果任务的状态处于 COMPLETING ,
        else if (s == COMPLETING)
            // COMPLETING的持续时间非常短,只需要做一手现成的让步即可。
            Thread.yield();

        // 现在线程的状态是NEW,(call方法可能还没执行完呢,准备挂起线程)
        else if (q == null)
            // 封装WaitNode存放当前线程
            q = new WaitNode();
        else if (!queued)
            // 如果WaitNode还没有排在waiters中,现在就排进来(头插法的效果)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);
        else if (timed) {
   
            // get(time,unit)挂起线程的方式
            // 计算挂起时间
            nanos = deadline - System.nanoTime();
            // 挂起的时间,是否小于等于0
            if (nanos <= 0L) {
   
                // 移除waiters中的当前Node
                removeWaiter(q);
                // 返回任务状态
                return state;
            }
            // 正常指定挂起时间即可。(线程挂起)
            LockSupport.parkNanos(this, nanos);
        }
        else {
   
            // get()挂起线程的方式
            LockSupport.park(this);
        }
    }
}

当任务执行完毕(set方法执行完成),由finishCompletion唤醒线程,LockSupport.unpark(t);

// 任务状态已经变为了NORMAL,做一些后续处理
private void finishCompletion() {
   
    for (WaitNode q; (q = waiters) != null;) {
   
        // 拿到第一个节点后,直接用CAS的方式,将其设置为null
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
   
            for (;;) {
   
                // 基于q拿到线程信息
                Thread t = q.thread;
                // 线程不为null
                if (t != null) {
   
                    // 将WaitNode的thread设置为null
                    q.thread = null;
                    // 唤醒这个线程
                    LockSupport.unpark(t);
                }
                // 往后遍历,接着唤醒
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null;
                // 指向next的WaitNode
                q = next;
            }
            break;
        }
    }

    // 扩展方法,没任何实现,你可以自己实现
    done();

    // 任务处理完了,可以拜拜了!
    callable = null;   
}

拿到返回结果的处理

// 任务结束。
private V report(int s) throws ExecutionException {
   
    // 拿到结果
    Object x = outcome;
    // 判断是正常返回结束
    if (s == NORMAL)
        // 返回结果
        return (V)x;
    // 任务状态是大于取消
    if (s >= CANCELLED)
        // 甩异常。
        throw new CancellationException();
    // 扔异常。
    throw new ExecutionException((Throwable)x);
}

// 正常返回 report
// 异常返回 report
// 取消任务 report
// 中断任务 awaitDone

CompletableFuture

FutureTask存在的问题:
问题1:FutureTask获取线程执行的结果前,主线程需要通过get方法一直阻塞等待子线程执行完call方法,才可以拿到返回结果。
问题2:如果不通过get去挂起线程,通过while循环,不停的判断任务的执行状态是否结束,结束后,再拿结果。如果任务长时间没执行完毕,CPU会一直调度查看任务状态的方法,会浪费CPU资源。

FutureTask是一个同步非阻塞处理任务的方式。需要一个异步非阻塞处理任务的方式。CompletableFuture在一定程度上就提供了各种异步非阻塞的处理方案。

CompletableFuture也是实现了Future接口实现的功能,可以不使用FutureTask,提供非常丰富的函数去执行各种异步操作,直接使用CompletableFuture即可。

CompletableFuture的应用

CompletableFuture最重要的就是解决了异步回调的问题

CompletableFuture就是执行一个异步任务,异步任务可以有返回结果,也可以没有返回结果,使用了函数式编程中三个最核心的接口

Supplier - 生产者,没有入参,但是有返回结果
Consumer - 消费者,有入参,但是没有返回结果
Function - 函数,有入参,并且有返回结果

提供了两个最基本运行的基本方法

supplyAsync(Supplier<U> supplier) 异步执行任务,有返回结果
runAsync(Runnable runnable) 异步执行任务,没有返回结果

在不指定线程池的前提下,这两个异步任务都是交给ForkJoinPool去执行的。

但是只是用这两个方法,无法实现异步回调的。如果需要在当前任务执行完毕后,拿着返回结果继续去执行后续任务操作的话,需要基于其他方法去实现。

thenApply(Function<prevResult,currResult>); 等待前一个任务处理结束后,拿着前置任务的返回结果,再做处理,并且返回当前结果
thenApplyAsync(Function<prevResult,currResult>,线程池) 采用全新的线程执行
thenAccept(Consumer<preResult>);等待前一个任务处理结束后,拿着前置任务的返回结果再做处理,没有返回结果
thenAcceptAsync(Consumer<preResult>,线程池);采用全新的线程执行
thenRun(Runnable) 等待前一个任务处理结束后,再做处理。不接收前置任务结果,也不返回结果
thenRunAsync(Runnable[,线程池]) 采用全新的线程执行

其次还有可以执行相对复杂的处理,在前一个任务执行的同时,执行后续任务。等待前置任务和后置任务都搞定之后,再执行最终任务

thenCombine(CompletionStage,Function<prevResult,nextResult,afterResult>) 让prevResult和nextResult一起执行,等待执行完成后,获取前两个任务的结果执行最终处理,最终处理也可以返回结果
thenCombineAsync(CompletionStage,Function<prevResult,nextResult,afterResult>[,线程池]) 采用全新的线程执行
thenAcceptBoth(CompletionStage,Consumer<prevResult,nextResult>);让前置任务和后续任务同时执行,都执行完毕后,拿到两个任务的结果,再做后续处理,但是没有返回结果
thenAcceptBothAsync(CompletionStage,Consumer<prevResult,nextResult>[,线程池])采用全新的线程执行
runAfterBoth(CompletionStage,Runnble) 让前置任务和后续任务同时执行,都执行完毕后再做后续处理
runAfterBothAsync(CompletionStage,Runnble[,线程池]) 采用全新的线程执行

还提供了可以让两个任务一起执行,但是有一个任务结束,有返回结果后,就做最终处理

applyToEither(CompletionStage,Function<firstResult,afterResult>) 前面两个任务同时执行,有一个任务执行完,获取返回结果,做最终处理,再返回结果
acceptEither(CompletionStage,Consumer<firstResult>) 前面两个任务同时执行,有一个任务执行完,获取返回结果,做最终处理,没有返回结果
runAfterEither(CompletionStage,Runnable) 前面两个任务同时执行,有一个任务执行完,做最终处理

还提供了等到前置任务处理完,再做后续处理,后续处理返回的结果为CompletionStage

thenCompose(Function<prevResult,CompletionStage>)

最后还有处理异常的各种姿势

exceptionally(Function<Throwable,currResult>)  
whenComplete(Consumer<prevResult,Throwable>) 
hanle(Function<prevResult,Throwable,currResult>)

CompletableFuture 示例

public static void main(String[] args) throws InterruptedException {
   
    sout("我回家吃饭");

    CompletableFuture<String> task = CompletableFuture.supplyAsync(() -> {
   
        sout("阿姨做饭!");
        return "锅包肉!";
    });
    sout("我看电视!");

    sout("我吃饭:" + task.join());
}
public static void main(String[] args) throws InterruptedException {
   
    sout("我回家吃饭");

    CompletableFuture<String> task = CompletableFuture.supplyAsync(() -> {
   
        sout("阿姨炒菜!");
        return "锅包肉!";
    },executor).thenCombineAsync(CompletableFuture.supplyAsync(() -> {
   
        sout("小王焖饭");
        return "大米饭!";
    },executor),(food,rice) -> {
   
        sout("大王端" + food + "," + rice);
        return "饭菜好了!";
    },executor);

    sout("我看电视!");
    sout("我吃饭:" + task.join());
}

总结

Future和CompletableFuture都是用于处理异步任务的接口和类,它们的主要区别在于功能复杂度和使用场景。Future比较简单,主要用于简单的异步任务处理,而CompletableFuture则更加灵活和强大,适用于复杂的异步任务处理场景。

相关文章
|
监控 Java API
并发编程 - CompletableFuture
并发编程 - CompletableFuture
81 0
|
监控 算法 Java
异步编程 - 05 基于JDK中的Future实现异步编程(中)_CompletableFuture
异步编程 - 05 基于JDK中的Future实现异步编程(中)_CompletableFuture
50 0
|
3月前
|
Java
JAVA并发编程系列(13)Future、FutureTask异步小王子
本文详细解析了Future及其相关类FutureTask的工作原理与应用场景。首先介绍了Future的基本概念和接口方法,强调其异步计算特性。接着通过FutureTask实现了一个模拟外卖订单处理的示例,展示了如何并发查询外卖信息并汇总结果。最后深入分析了FutureTask的源码,包括其内部状态转换机制及关键方法的实现原理。通过本文,读者可以全面理解Future在并发编程中的作用及其实现细节。
|
6月前
|
IDE Java API
玩转 CompletableFuture 异步编程
玩转 CompletableFuture 异步编程
34 0
|
7月前
|
Java API
java多线程之FutureTask、Future、CompletableFuture
java多线程之FutureTask、Future、CompletableFuture
305 0
|
7月前
|
Java 编译器
JUC并发编程之CompletableFuture详解
JUC并发编程中的Future接口是Java 5中引入的一种异步编程机制,用于表示一个可能在未来完成的计算结果。它允许我们提交一个任务给线程池或其他执行器执行,并且可以通过Future对象获取任务执行的结果或者判断任务是否已经完成。
169 0
|
存储 Java
并发编程系列教程(09) - Callable与Future模式
并发编程系列教程(09) - Callable与Future模式
58 0
|
Java
异步编程 - 05 基于JDK中的Future实现异步编程(中)_CompletableFuture2
异步编程 - 05 基于JDK中的Future实现异步编程(中)_CompletableFuture2
70 0
|
Java
异步编程 - 04 基于JDK中的Future实现异步编程(上)_Future & FutureTask 源码解析
异步编程 - 04 基于JDK中的Future实现异步编程(上)_Future & FutureTask 源码解析
81 0
|
消息中间件 Java 中间件
Future and CompletableFuture
Future代表异步执行的结果,也就是说异步执行完毕后,结果保存在Future里, 我们在使用线程池submit()时需要传入Callable接口,线程池的返回值为一个Future,而Future则保存了执行的结果 ,可通过Future的get()方法取出结果,如果线程池使用的是execute(),则传入的是Runnable接口 无返回值。
76 0