异步编程 - 06 基于JDK中的Future实现异步编程(中)_CompletableFuture源码解析

本文涉及的产品
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: 异步编程 - 06 基于JDK中的Future实现异步编程(中)_CompletableFuture源码解析


CompletableFuture 类图结构




CompletionStage接口


CompletableFuture实现了CompletionStage接口 。


   1)一个CompletionStage代表着一个异步计算节点,当另外一个CompletionStage计算节点完成后,当前CompletionStage会执行或者计算一个值;一个节点在计算终止时完成,可能反过来触发其他依赖其结果的节点开始计算。


   2)一个节点(CompletionStage)的计算执行可以被表述为一个函数、消费者、可执行的Runable(例如使用apply、accept、run方法),


具体取决于这个节点是否需要参数或者产生结果。例如:

stage.thenApply(x -> square(x))//计算平方和
     .thenAccept(x -> System.out.print(x))//输出计算结果
     .thenRun(() -> System.out.println());//然后执行异步任务


3)CompletionStage节点可以使用3种模式来执行:默认执行、默认异步执行(使用async后缀的方法)和用户自定义的线程执行器执行(通过传递一个Executor方式)。


4)一个节点的执行可以通过一到两个节点的执行完成来触发。一个节点依赖的其他节点通常使用then前缀的方法来进行组织。



属性


result

volatile Object result;       // Either the result or boxed AltResult


result字段用来存放任务执行的结果,如果不为null,则标识任务已经执行完成。而计算任务本身也可能需要返回null值,所以使用AltResult(如下代码)来包装计算任务返回null的情况(ex等于null的时候),AltResult也被用来存放当任务执行出现异常时候的异常信息(ex不为null的时候):

static final class AltResult { // See above
    final Throwable ex;        // null only for NIL
    AltResult(Throwable x) { this.ex = x; }
}


stack

volatile Completion stack;    // Top of Treiber stack of dependent actions



stack字段是当前任务执行完毕后要触发的一系列行为的入口,由于一个任务执行后可以触发多个行为,所以所有行为被组织成一个链表结构,并且使用Treiber stack实现了无锁基于CAS的链式栈,其中stack存放栈顶行为节点,stack是Completion类型的,定义如下所示。

 abstract static class Completion extends ForkJoinTask<Void>
    implements Runnable, AsynchronousCompletionTask {
    volatile Completion next;      // Treiber stack下一个节点
  ...
}


asyncPool

/**
 * Default executor -- ForkJoinPool.commonPool() unless it cannot
 * support parallelism.
 */
private static final Executor asyncPool = useCommonPool ?
    ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

asyncPool是用来执行异步任务的线程池,如果支持并发则默认为Fork-JoinPool.commonPool(),否则是ThreadPerTaskExecutor


方法


CompletableFuture<Void>runAsync(Runnable runnable)

该方法返回一个新的CompletableFuture对象,其结果值会在给定的runnable行为使用ForkJoinPool.commonPool()异步执行完毕后被设置为null,代码如下所示。

public static CompletableFuture<Void> runAsync(Runnable runnable) {
    return asyncRunStage(asyncPool, runnable);
}


如上代码中,默认情况下asyncPool为ForkJoinPool.commonPool(),其中asyncRunStage代码如下所示。

static CompletableFuture<Void> asyncRunStage(Executor e, Runnable f) {
    //1.任务或者行为为null,则抛出NPE异常
    if (f == null) throw new NullPointerException();
    //2.创建一个future对象
    CompletableFuture<Void> d = new CompletableFuture<Void>();
    //3.包装f和d为异步任务后,投递到线程池执行
    e.execute(new AsyncRun(d, f));
    //4.返回创建的future对象
    return d;
}


   代码1判断行为是否为null,如果是则抛出异常。

   代码2创建一个CompletableFuture对象。

   代码3首先创建一个AsyncRun任务,里面保存了创建的future对象和要执行的行为,然后投递到ForkJoinPool.commonPool()线程池执行。


   代码4直接返回创建的CompletableFuture对象。


可知runAsync方法会马上返回一个CompletableFuture对象,并且当前线程不会被阻塞;代码3投递AsyncRun任务到线程池后,线程池线程会执行其run方法。


下面我们看看在AsyncRun中是如何执行我们设置的行为,并把结果设置到创建的future对象中的。


static final class AsyncRun extends ForkJoinTask<Void>
       implements Runnable, AsynchronousCompletionTask {
        CompletableFuture<Void> dep; Runnable fn;
        //保存创建的future和要执行的行为
        AsyncRun(CompletableFuture<Void> dep, Runnable fn) {
            this.dep = dep; this.fn = fn;
        }
       ...
        public void run() {
            CompletableFuture<Void> d; Runnable f;
            if ((d = dep) != null && (f = fn) != null) {
                dep = null; fn = null;
                //5.如果future的result等于null,说明任务还没完成
                if (d.result == null) {
                    try {
                        //5.1执行传递的行为
                        f.run();
                        //5.2设置future的结果为null
                        d.completeNull();
                    } catch (Throwable ex) {
                        d.completeThrowable(ex);
                    }
                }
                //6弹出当前future中依赖当前结果的行为并执行
                d.postComplete();
            }
        }
}


   这里代码5如果发现future的result不为null,说明当前future还没开始执行,则代码5.1执行我们传递的runnable方法,然后执行代码5.2将future对象的结果设置为null,这时候其他因调用future的get()方法而被阻塞的线程就会从get()处返回null。


   当代码6的future任务结束后,看看其stack栈里面是否有依赖其结果的行为,如果有则从栈中弹出来,并执行。

其实上面代码中的runAsync实现可以用我们自己编写的简单代码来模拟。

public static CompletableFuture runAsync(Runnable runnable) {
    CompletableFuture<String> future = new CompletableFuture<String>();
    // 2.开启线程计算任务结果,并设置
    POOL_EXECUTOR.execute(() -> {
        // 2.1模拟任务计算
        try {
            runnable.run();
            future.complete(null);
        } catch (Exception e) {
            future.completeExceptionally(e);
        }
    });
    return future;
}



CompletableFuture<U> supplyAsync(Supplier<U>supplier)


该方法返回一个新的CompletableFuture对象,其结果值为入参supplier行为执行的结果,代码如下所示。

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
    return asyncSupplyStage(asyncPool, supplier);
}


static <U> CompletableFuture<U> asyncSupplyStage(Executor e,
                                             Supplier<U> f) {
    if (f == null) throw new NullPointerException();
    CompletableFuture<U> d = new CompletableFuture<U>();
    e.execute(new AsyncSupply<U>(d, f));
    return d;
}


如上代码与runAsync类似,不同点在于,其提交到线程池的是AsyncSupply类型的任务,下面我们来看其代码。

static final class AsyncSupply<T> extends ForkJoinTask<Void>
        implements Runnable, AsynchronousCompletionTask {
    CompletableFuture<T> dep; Supplier<T> fn;
    AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) {
        this.dep = dep; this.fn = fn;
    }
   ...
    public void run() {
        CompletableFuture<T> d; Supplier<T> f;
        if ((d = dep) != null && (f = fn) != null) {
            dep = null; fn = null;
            //1.如果future的result等于null,说明任务还没完成
            if (d.result == null) {
                try {
                    //1.1 f.get()执行行为f的方法,并获取结果
                    //1.2 把f.get()执行结果设置到future对象
                    d.completeValue(f.get());
                } catch (Throwable ex) {
                    d.completeThrowable(ex);
                }
            }
            //2.弹出当前future中依赖当前结果的行为并执行
            d.postComplete();
        }
    }
}


如上代码与runAsync的不同点在于,这里的行为方法是Supplier,其get()方法有返回值,且返回值会被设置到future中,然后调用future的get()方法的线程就会获取到该值。


CompletableFuture<U> supplyAsync(Supplier<U>supplier,Executor executor)


该方法返回一个新的CompletableFuture对象,其结果值为入参supplier行为执行的结果,需要注意的是,supplier行为的执行不再是ForkJoinPool.commonPool(),而是业务自己传递的executor,其代码如下所示。

    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
                                                       Executor executor) {
        return asyncSupplyStage(screenExecutor(executor), supplier);
    }
static Executor screenExecutor(Executor e) {
    //如果使用commonpool并且传递的e本身就是commonpool
    if (!useCommonPool && e == ForkJoinPool.commonPool())
        return asyncPool;
    //如果传递的线程池为null,则抛出NPE异常
    if (e == null) throw new NullPointerException();
    //返回业务传递的线程池e
    return e;
}


如上代码通过使用screenExecutor方法来判断传入的线程池是否是一个可用的线程池,如果不是则抛出异常。

相关文章
|
4月前
|
缓存 Java 调度
Java并发编程:深入解析线程池与Future任务
【7月更文挑战第9天】线程池和Future任务是Java并发编程中非常重要的概念。线程池通过重用线程减少了线程创建和销毁的开销,提高了资源利用率。而Future接口则提供了检查异步任务状态和获取任务结果的能力,使得异步编程更加灵活和强大。掌握这些概念,将有助于我们编写出更高效、更可靠的并发程序。
|
14天前
|
Java 关系型数据库 MySQL
【编程基础知识】Eclipse连接MySQL 8.0时的JDK版本和驱动问题全解析
本文详细解析了在使用Eclipse连接MySQL 8.0时常见的JDK版本不兼容、驱动类错误和时区设置问题,并提供了清晰的解决方案。通过正确配置JDK版本、选择合适的驱动类和设置时区,确保Java应用能够顺利连接MySQL 8.0。
79 1
|
26天前
|
缓存 Java 索引
查看并解析当前jdk的垃圾收集器
本文介绍了如何查看和解析当前JDK使用的垃圾收集器,通过在IDEA中配置JVM选项并运行示例代码来展示G1垃圾回收器的详细信息和命令行标志。
21 0
查看并解析当前jdk的垃圾收集器
|
18天前
|
前端开发 JavaScript UED
JavaScript异步编程深入解析
【10月更文挑战第8天】JavaScript异步编程深入解析
10 0
|
3月前
|
Java 开发者 UED
“Java开发者必看:异步编程实战解析,掌握这些技巧,让你的代码跑得更快!
【8月更文挑战第30天】随着互联网技术的发展,系统性能和用户体验成为关注焦点。异步编程作为提高应用响应速度和吞吐量的技术,在Java中广泛采用。本文详细介绍了Java异步编程的概念与优势,并通过实战示例展示了如何利用Future、Callable及CompletableFuture在实际项目中实施异步编程,帮助开发者更好地理解和应用这一技术。
48 2
|
3月前
|
运维 Cloud Native JavaScript
云端新纪元:云原生技术深度解析深入理解Node.js事件循环及其在异步编程中的应用
【8月更文挑战第27天】随着云计算技术的飞速发展,云原生已成为推动现代软件开发和运维的关键力量。本文将深入探讨云原生的基本概念、核心价值及其在实际业务中的应用,帮助读者理解云原生如何重塑IT架构,提升企业的创新能力和市场竞争力。通过具体案例分析,我们将揭示云原生技术背后的哲学思想,以及它如何影响企业决策和操作模式。
|
3月前
|
算法 安全 Java
深入JDK源码:揭开ConcurrentHashMap底层结构的神秘面纱
【8月更文挑战第24天】`ConcurrentHashMap`是Java并发编程中不可或缺的线程安全哈希表实现。它通过精巧的锁机制和无锁算法显著提升了并发性能。本文首先介绍了早期版本中使用的“段”结构,每个段是一个带有独立锁的小型哈希表,能够减少线程间竞争并支持动态扩容以应对高并发场景。随后探讨了JDK 8的重大改进:取消段的概念,采用更细粒度的锁控制,并引入`Node`等内部类以及CAS操作,有效解决了哈希冲突并实现了高性能的并发访问。这些设计使得`ConcurrentHashMap`成为构建高效多线程应用的强大工具。
50 2
|
5月前
|
数据采集 数据处理 API
深度解析Python中的异步编程
本文将深入探讨Python中的异步编程模型,包括基本概念、常用库、以及实际应用场景,帮助读者更好地理解和应用异步编程技术来提升程序的性能与响应速度。
|
5月前
|
Java Spring
深入解析Spring源码,揭示JDK动态代理的工作原理。
深入解析Spring源码,揭示JDK动态代理的工作原理。
56 0
|
19天前
|
缓存 Java 程序员
Map - LinkedHashSet&Map源码解析
Map - LinkedHashSet&Map源码解析
39 0

推荐镜像

更多