【精通函数式编程】(十一) CompletableFuture、反应式编程源码解析与实战

本文涉及的产品
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: Future异步编程和CompletableFuture 接口都可以实现异步编程,我们通过源码深入理解其原理和设计的思想,Java9中提供了反应式编程(Flow API)我们分析其源码并提供一个响应式查询实战。

前言📫 作者简介:小明java问道之路,专注于研究计算机底层,就职于金融公司后端高级工程师,擅长交易领域的高安全/可用/并发/性能的设计和架构📫

🏆 Java领域优质创作者、阿里云专家博主、华为云专家🏆

🔥 如果此文还不错的话,还请👍关注、点赞、收藏三连支持👍一下博主哦

本文导读

Java代码为了更好的发展和性能,开发了 异步编程的模式,Future异步编程和CompletableFuture 接口都可以实现异步编程,我们通过源码深入理解其原理和设计的思想,Java9中提供了反应式编程(Flow API)我们分析其源码并提供一个响应式查询实战。

一、同步与异步

1、为什么要有异步

在Java发展的这20年,他只做了一件事不被淘汰,为了不被淘汰不断的更新jdk的版本,以便使用计算机硬件、操作系统以及新的编程概念。

Java一开始提供了 synchronized 锁、Runable,后面java5有引入了 java.util.concurrent 包,java7中的 forkjoin 框架 java.util.concurrent.RecursiveTask,到java8中Stream流、lambda表达式的支持,这一切都是为了支持高并发。

即便如此,多线程虽然极大的提升了性能,如果合理的使用线程池的话,好处,第一可以降低资源消耗,重复利用已创建的线程;第二:提高响应速度,任务可以不需要等到线程创建就能立即执行;第三:提高线程的可管理性。统一分配、调优和监控。但是线程池也不是没有缺点,使用k个线程的线程池就只能并发的执行k个任务,其他任务还是回休眠或者阻塞

这时候如果有线程不和其他任务相关联,又可以不用阻塞,就好了。Java8考虑到了,充分发挥了计算机硬件的处理能力,异步API 应运而生。

2、什么是同步?什么是异步?

同步就是 a 程序强依赖 b 程序,我必须等到你的回复或者执行完毕,才能做出下一步响应,类似于编程中程序被解释器(JVM)顺序执行一样(加载 > 验证 > 准备 > 解析 > 初始化);

异步则相反,a 程序不强依赖 b 程序,响应的时间也无所谓,无论你返回还是不返回,a 程序都能继续运行,也就是说我不存在等待对方的概念,a 程序就是 异步非阻塞的。

下面举一个例子就说明什么是同步、什么是异步

网络异常,图片无法展示
|

异步编程涉及两种风格,Future风格API 和反应式风格API ,Future<Integer> fun(int a){},fun( a , x-> {}),这两个模式的实战会在后面小结讲解。

二、Future异步编程

1、Future 接口

Java5中就引入了Future 接口,他的涉及初衷就是异步计算,例如我们结算一个商户下的所有订单,这个时候并不需要for循环去累加,Future 接口使用的时候只需要封装 Callable中,再提交给ExecutorService。

2、Future 接口的使用

Future 接口的使用看下Java8之前是如何使用异步的

public static void main(String[] args) throws Exception {
    List<OrderInfo> orderInfos = Arrays.asList(new OrderInfo("123", BigDecimal.ONE),
            new OrderInfo("456", BigDecimal.TEN), new OrderInfo("789", BigDecimal.TEN));
    // 创建 ExecutorService 通过它可以向线程池提交任务
    ExecutorService executorService = Executors.newCachedThreadPool();
    // 异步操作的同时,可以进行其他操作
    Future<BigDecimal> decimalFuture = executorService.submit(new Callable<BigDecimal>() {
        @Override
        public BigDecimal call() throws Exception {
            return reduceAmt(orderInfos);
        }
    });
    // Java8 写法
    Future<BigDecimal> decimalFuture = executorService.submit(() -> reduceAmt(orderInfos));
    System.out.println(decimalFuture.get());
}
private static BigDecimal reduceAmt(List<OrderInfo> orderInfos) {
    return orderInfos.stream()
            .map(OrderInfo::getOrderAmt)
            .reduce(BigDecimal.ZERO, BigDecimal::add);
}

异步编程可以在 ExecutorService 中,以并发的方式调用另一个线程执行操作,后续调用get() 方法获取操作结果,如果操作完成会立刻返回,如果操作没有完成则回阻塞线程,直到操作完成返回。

网络异常,图片无法展示
|

3、Future 接口的缺陷(局限性)

Future接口 还提供了方法来检测异步计算是否已经结束(isDone() 方法),等待异步操作结束。

但是当长时间计算任务完成时,该计算的结果通知到另一个长时间运行的计算任务,这两个计算任务都完成后,将计算的结果与另一个查询操作结果合并。

就会发生很多性能问题:1、将两个异步计算合并为一个(这两个异步计算之间相互独立,同时第二个又依赖于第一个的结果。)2、此时就要等待 Future 集合中的所有任务都完成。3、当Future的完成事件发生时会收到通知,使用Future计算的结果进行下一步的操作,不只是简单地阻塞等待操作的结果,每一步都需要等待。

三、CompletableFuture 接口详解

1、CompletableFuture 的创建

CompletableFuture.runAsync()也可以用来创建CompletableFuture实例。与supplyAsync()不同的是,runAsync()传入的任务要求是Runnable类型的,所以没有返回值,runAsync适合创建不需要返回值的计算任务

通过该supplyAsync 函数创建的CompletableFuture实例会异步执行当前传入的计算任务,在调用端,则可以通过get或join获取最终计算结果。

同事直接new(构造函数创建)也是可以的,下面通过一个实战小例子看下,高并发高性能的添加购物车结构如何设计

import java.util.concurrent.CompletableFuture;
import org.springframework.core.task.AsyncTaskExecutor;
public class CompletableFutureImpl {
    @Autowired
    @Qualifier("asyncExecutor")
    private AsyncTaskExecutor asyncTaskExecutor;
    public DefaultResponseVO addGoodsCart(HttpServletRequest request, @Valid AddCartReqVO reqVo) {
        // 添加购物车
        GetCartItemEntity respVO = mallCartProcess.addGoodsCart(buildAddGoodsCartReqVO(reqBo));
        /**
         * 异步刷新到购物车
         */
        CompletableFuture voidCompletableFuture = CompletableFuture.runAsync(() -> syncAddCacheCheck(respVO), asyncTaskExecutor);
        // 通过supplyAsync 函数创建的
        CompletableFuture<Object> uCompletableFuture = CompletableFuture.supplyAsync(() -> syncAddCacheCheck(respVO));
        // 构造函数创建 
        CompletableFuture completableFuture = new CompletableFuture().runAsync(() -> syncAddCacheCheck(respVO));
        return new DefaultResponseVO(code, msg, respVO);
    }
    /**
     * 添加购物车缓存(异步刷新到购物车)
     */
    public void syncAddCacheCheck(GetCartItemEntity cartItemEntity) {
        try {
            // 添加购物车缓存(异步刷新到购物车)
            mallCartProcess.getCartInfo(cartItemEntity);
        } catch (Exception e) {
            logger.error("syncAddCache error", e);
        }
    }
}

2、CompletableFuture.supplyAsync 源码分析

本小节讲解CompletableFuture的底层实现

网络异常,图片无法展示
|

上面为java 8中 supplyAsync 函数的实现源码。可以看到,当 supplyAsync 入参只有 supplier 时,会默认使用asyncPool作为线程池(一般情况下为ForkJoinPool的commonPool),并调用内部方法asyncSupplyStage执行具体的逻辑。在 asyncSupplyStage 方法中,程序会创建一个空的CompletableFuture 返回给调用方。同时该 CompletableFuture 和传入的 supplier 会被包装在一个AsyncSupply 实例对象中,然后一起提交到线程池中进行处理。

值得注意的是,当supplyAsync返回时,调用方只会拿到一个空的CompletableFuture实例。看到这里,我们可以猜测,当计算最终完成时,计算结果会被set到对应的CompletableFuture的result字段中。调用方通过join或者get就能取到该CompletableFuture的result字段的值。所以,虽然实际创建CompletableFuture的线程和进行任务计算的线程不同,但是最终会通过result来进行结果的传递。这种方式与传统的Future中结果传递方式类似(计算线程set值,使用线程get值)。

网络异常,图片无法展示
|

上面为java 8中 AsyncSupply 的实现源码,AsyncSupply的源码很简单。首先,它实现了Runnable接口,所以被提交到线程池中后,工作线程会执行其run()方法。通过对AsyncSupply中run方法的分析,也基本证实我们之前的猜测。即计算任务由工作线程调用run方法执行,并设置到CompletableFuture的结果中。其他线程中的使用方,则可以调用该CompletableFuture的join或者get方法获取其结果。

因此,我们只需要搞清楚其run()中的实现即可。在 run() 中,程序首先检查了传入的CompletableFuture 和 Supplier 是否为空,如果均不为空,再检查 CompletableFuture 的 d.result是否为空,如果不为空,则说明 CompletableFuture 的值已经被其他线程主动设置过了(这也是CompletableFuture与Future最大的不同之处),因此这里就不会再被重新设置一次。如果 d.result 为空,则调用Supplier(源码中的 f 变量)的get()方法,执行具体的计算,然后通过 completeValue 方法将结果设置到CompletableFuture中。最后,调用CompletableFuture的postComplete()方法,执行连接到当前CompletableFuture上的后置任务。

3、CompletableFuture.runAsync 源码分析

网络异常,图片无法展示
|

通过上面的源码可以看出,runAsync也会生成一个空的CompletableFuture,并包装在AsyncRun中提交到线程池中执行。这与supplyAsync是完全一致的。由于Runnable没有返回值,这里返回的CompletableFuture的结果值是Void类型的。

网络异常,图片无法展示
|

AsyncRun的run中,计算的执行是通过调用传入的Runnable(源码中的 f 变量)的run方法进行的。由于没有返回值,所以这里在设置CompletableFuture的值时,使用其completeNull()方法,设置一个特殊的空值标记。 设计方面和supplyAsync一致

4、CompletableFuture API 实战

thenApply、thenAccept、thenRun

thenApply 提交的任务类型需遵从Function签名,也就是有入参和返回值,其中入参为前置任务的结果。thenAccept 提交的任务类型需遵从Consumer签名,也就是有入参但是没有返回值,其中入参为前置任务的结果。thenRun 提交的任务类型需遵从Runnable签名,即没有入参也没有返回值。

thenCombine、thenCompose

thenCombine最大的不同是连接任务可以是一个独立的CompletableFuture。嵌套获取层级也越来越深。因此,需要一种方式,能将这种嵌套模式展开,使其没有那么多层级。thenCompose的主要目的就是解决这个问题(这里也可以将thenCompose的作用类比于stream接口中的flatMap,因为他们都可以将类型嵌套展开)。

whenComplete、handle

whenComplete主要用于注入任务完成时的回调通知逻辑(获得的结果是前置任务的结果,whenComplete中的逻辑不会影响计算结果)。handle与handle接收的处理函数有返回值,而且返回值会影响最终获取的计算结果(产生了新的结果)

四、反应式编程

1、什么是反应式(resctive)编程

反应式编程是最近几年才提出的概念,主要有四个特征:响应式,反应式编程的响应速度应该很快,而且是稳定可预测的。韧性,系统出现失败时,任然可以继续响应服务,任何一个组件都能以异步的方式想其他组件分发任务。弹性,影响代码响应的因素的代码(系统)的负载能力,反应式编程可以增加分配的资源,受流量影响后有自动适配的能力,服务更大的负载。消息驱动,各个组件之间松耦合,组件隔离,跨组件通信使用异步消息传递。

反应式(resctive)编程在应用层的主要特征是任务以异步的方式执行,非阻塞的处理事件流,充分利用多核CPU的特点,大多反应式框架(RxJava等)都可以独立开辟线程池,用于执行阻塞式操作,主线程池中运行都是无阻塞的。

2、反应式流(Flow API)

2.1、发布订阅模式

Future CompletableFuture 的思维模式是计算的执行是独立且并发的。使用 get()方法可以在执行结束后获取 Future 对象的执行结果。因此,Future 是一个一次性对象,它只能从头到尾执行代码一次。

与此相反,反应式编程的思维模式是类 Future 的对象随着时间的推移可以产生很多的结果。举个例子是 Web 服务器的监听组件对象。该组件监听来自网络的 HTTP请求,并根据请求的内容返回相应的数据。

网络异常,图片无法展示
|

2.2、Flow API 源码解析

Java9 使用 java.util.concurrent.Flow 提供的接口对反应式编程进行建模,实现了名为“发布-订阅”的模型(也叫协议,简写为 pub-sub )

反应式编程有三个主要的概念,分别是:订阅者可以订阅的发布者;名为订阅的连接;消息(也叫事件),它们通过连接传输。

反应式流(Flow API)规范可以总结为4个接口:Publisher(发布者)、Subscriber(订阅者)、Subscription(订阅)和Processor(处理者)

网络异常,图片无法展示
|

Publisher负责生成数据,并将数据发送给 Subscription(每个Subscriber对应一个Subscription)。Publisher接口声明了一个方法 subscribe(),Subscriber可以通过该方法向 Publisher发起订阅。

网络异常,图片无法展示
|

一旦Subscriber订阅成功,就可以接收来自Publisher的事件。这些事件是通过Subscriber接口上的方法发送的

网络异常,图片无法展示
|

Subscriber的第一个事件是通过对 onSubscribe()方法的调用接收的。Publisher调用 onSubscribe() 方法时,会将Subscription对象传递给 Subscriber。通过Subscription,Subscriber可以管理其订阅情况

网络异常,图片无法展示
|

Subscriber 可以通过调用 request() 方法来请求 Publisher 发送数据,或者通过调用 cancel()方法表明它不再对数据感兴趣并且取消订阅。当调用 request() 时,Subscriber 可以传入一个long类型的数值以表明它愿意接受多少数据。这也是回压能够发挥作用的地方,以避免Publisher 发送多于 Subscriber能够处理的数据量。在 Publisher 发送完所请求数量的数据项之后,Subscriber 可以再次调用 request()方法来请求更多的数据。

Subscriber 请求数据之后,数据就会开始流经反应式流。Publisher 发布的每个数据项都会通过调用 Subscriber 的 onNext()方法递交给 Subscriber。如果有任何错误,就会调用 onError()方法。如果 Publisher 没有更多的数据,也不会继续产生更多的数据,那么将会调用 Subscriber 的onComplete() 方法来告知 Subscriber 它已经结束

反应式流规范的接口本身并不支持以函数式的方式组成这样的流。Reactor 项目是反应式流规范的一个实现,提供了一组用于组装反应式流的函数式API。有我们自己实现。

2.3、Flow API 实战

FlowImpl :创建Publisher并向其订阅TempSubscriber

public class FlowImpl {
    public static void main(String[] args) {
        // 创建 Publisher 并向其订阅 Subscriber
        getOrderAmt("20220727123").subscribe(new SubscriberImpl());
    }
    // Publisher是个函数式接口
    private static Flow.Publisher<OrderInfo> getOrderAmt(String orderId) {
        return subscriber -> subscriber.onSubscribe(new SubscriptionImpl(subscriber, orderId));
    }
}

Subscription接口:实现向 Subscriber 发送 OrderInfo Steam

public class SubscriptionImpl implements Flow.Subscription {
    private final Flow.Subscriber<? super OrderInfo> subscriber;
    private final String orderId;
    public SubscriptionImpl(Flow.Subscriber<? super OrderInfo> subscriber, String orderId) {
        this.subscriber = subscriber;
        this.orderId = orderId;
    }
    @Override
    public void request(long n) {
        // 另起一个线程向 subscriber 发送下一个元素
        Executors.newSingleThreadExecutor().submit(() -> {
            for (long i = 0L; i < n; i++) // subscriber 每处理一个请求执行一次循环
                try {
                    // 将当前 订单号 发送给 Subscriber
                    subscriber.onNext(OrderInfo.reduceAmt(orderId));
                } catch (Exception e) {
                    // 查询报错将这个报错信息传给 Subscriber
                    subscriber.onError(e);
                    e.printStackTrace();
                    break;
                }
        });
    }
    @Override
    public void cancel() {
        // 如果 Subscription 被取消了,向 subscriber 发送一个完成信号
        subscriber.onComplete();
    }
}

Subscriber接口:实现打印输出收到的 订单 数据

public class SubscriberImpl implements Flow.Subscriber<OrderInfo> {
    private Flow.Subscription subscription;
    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }
    @Override
    public void onNext(OrderInfo orderInfo) {
        System.out.println(orderInfo);
        subscription.request(1);
    }
    @Override
    public void onError(Throwable throwable) {
        System.out.println(throwable.getMessage());
    }
    @Override
    public void onComplete() {
        System.out.println("Done!");
    }
}

总结

Java代码为了更好的发展和性能,开发了 异步编程的模式,Future异步编程和CompletableFuture 接口都可以实现异步编程,我们通过源码深入理解其原理和设计的思想,Java9中提供了反应式编程(Flow API)我们分析其源码并提供一个响应式查询实战。

相关文章
|
9天前
|
供应链 搜索推荐 API
深度解析1688 API对电商的影响与实战应用
在全球电子商务迅猛发展的背景下,1688作为知名的B2B电商平台,为中小企业提供商品批发、分销、供应链管理等一站式服务,并通过开放的API接口,为开发者和电商企业提供数据资源和功能支持。本文将深入解析1688 API的功能(如商品搜索、详情、订单管理等)、应用场景(如商品展示、搜索优化、交易管理和用户行为分析)、收益分析(如流量增长、销售提升、库存优化和成本降低)及实际案例,帮助电商从业者提升运营效率和商业收益。
80 17
|
29天前
|
自然语言处理 搜索推荐 数据安全/隐私保护
鸿蒙登录页面好看的样式设计-HarmonyOS应用开发实战与ArkTS代码解析【HarmonyOS 5.0(Next)】
鸿蒙登录页面设计展示了 HarmonyOS 5.0(Next)的未来美学理念,结合科技与艺术,为用户带来视觉盛宴。该页面使用 ArkTS 开发,支持个性化定制和无缝智能设备连接。代码解析涵盖了声明式 UI、状态管理、事件处理及路由导航等关键概念,帮助开发者快速上手 HarmonyOS 应用开发。通过这段代码,开发者可以了解如何构建交互式界面并实现跨设备协同工作,推动智能生态的发展。
145 10
鸿蒙登录页面好看的样式设计-HarmonyOS应用开发实战与ArkTS代码解析【HarmonyOS 5.0(Next)】
|
28天前
|
存储 缓存 Java
Java 并发编程——volatile 关键字解析
本文介绍了Java线程中的`volatile`关键字及其与`synchronized`锁的区别。`volatile`保证了变量的可见性和一定的有序性,但不能保证原子性。它通过内存屏障实现,避免指令重排序,确保线程间数据一致。相比`synchronized`,`volatile`性能更优,适用于简单状态标记和某些特定场景,如单例模式中的双重检查锁定。文中还解释了Java内存模型的基本概念,包括主内存、工作内存及并发编程中的原子性、可见性和有序性。
Java 并发编程——volatile 关键字解析
|
27天前
|
存储 设计模式 算法
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
行为型模式用于描述程序在运行时复杂的流程控制,即描述多个类或对象之间怎样相互协作共同完成单个对象都无法单独完成的任务,它涉及算法与对象间职责的分配。行为型模式分为类行为模式和对象行为模式,前者采用继承机制来在类间分派行为,后者采用组合或聚合在对象间分配行为。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象行为模式比类行为模式具有更大的灵活性。 行为型模式分为: • 模板方法模式 • 策略模式 • 命令模式 • 职责链模式 • 状态模式 • 观察者模式 • 中介者模式 • 迭代器模式 • 访问者模式 • 备忘录模式 • 解释器模式
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
|
27天前
|
设计模式 存储 安全
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
结构型模式描述如何将类或对象按某种布局组成更大的结构。它分为类结构型模式和对象结构型模式,前者采用继承机制来组织接口和类,后者釆用组合或聚合来组合对象。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象结构型模式比类结构型模式具有更大的灵活性。 结构型模式分为以下 7 种: • 代理模式 • 适配器模式 • 装饰者模式 • 桥接模式 • 外观模式 • 组合模式 • 享元模式
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
|
27天前
|
设计模式 存储 安全
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
创建型模式的主要关注点是“怎样创建对象?”,它的主要特点是"将对象的创建与使用分离”。这样可以降低系统的耦合度,使用者不需要关注对象的创建细节。创建型模式分为5种:单例模式、工厂方法模式抽象工厂式、原型模式、建造者模式。
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
|
14天前
|
数据采集 XML API
深入解析BeautifulSoup:从sohu.com视频页面提取关键信息的实战技巧
深入解析BeautifulSoup:从sohu.com视频页面提取关键信息的实战技巧
|
25天前
|
安全 API 数据安全/隐私保护
速卖通AliExpress商品详情API接口深度解析与实战应用
速卖通(AliExpress)作为全球化电商的重要平台,提供了丰富的商品资源和便捷的购物体验。为了提升用户体验和优化商品管理,速卖通开放了API接口,其中商品详情API尤为关键。本文介绍如何获取API密钥、调用商品详情API接口,并处理API响应数据,帮助开发者和商家高效利用这些工具。通过合理规划API调用策略和确保合法合规使用,开发者可以更好地获取商品信息,优化管理和营销策略。
|
3天前
|
自然语言处理 数据处理 索引
mindspeed-llm源码解析(一)preprocess_data
mindspeed-llm是昇腾模型套件代码仓,原来叫"modelLink"。这篇文章带大家阅读一下数据处理脚本preprocess_data.py(基于1.0.0分支),数据处理是模型训练的第一步,经常会用到。
15 0
|
28天前
|
安全 搜索推荐 数据挖掘
陪玩系统源码开发流程解析,成品陪玩系统源码的优点
我们自主开发的多客陪玩系统源码,整合了市面上主流陪玩APP功能,支持二次开发。该系统适用于线上游戏陪玩、语音视频聊天、心理咨询等场景,提供用户注册管理、陪玩者资料库、预约匹配、实时通讯、支付结算、安全隐私保护、客户服务及数据分析等功能,打造综合性社交平台。随着互联网技术发展,陪玩系统正成为游戏爱好者的新宠,改变游戏体验并带来新的商业模式。

热门文章

最新文章

推荐镜像

更多