【精通函数式编程】(十一) CompletableFuture、反应式编程源码解析与实战

本文涉及的产品
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
简介: Future异步编程和CompletableFuture 接口都可以实现异步编程,我们通过源码深入理解其原理和设计的思想,Java9中提供了反应式编程(Flow API)我们分析其源码并提供一个响应式查询实战。

前言📫 作者简介:小明java问道之路,专注于研究计算机底层,就职于金融公司后端高级工程师,擅长交易领域的高安全/可用/并发/性能的设计和架构📫

🏆 Java领域优质创作者、阿里云专家博主、华为云专家🏆

🔥 如果此文还不错的话,还请👍关注、点赞、收藏三连支持👍一下博主哦

本文导读

Java代码为了更好的发展和性能,开发了 异步编程的模式,Future异步编程和CompletableFuture 接口都可以实现异步编程,我们通过源码深入理解其原理和设计的思想,Java9中提供了反应式编程(Flow API)我们分析其源码并提供一个响应式查询实战。

一、同步与异步

1、为什么要有异步

在Java发展的这20年,他只做了一件事不被淘汰,为了不被淘汰不断的更新jdk的版本,以便使用计算机硬件、操作系统以及新的编程概念。

Java一开始提供了 synchronized 锁、Runable,后面java5有引入了 java.util.concurrent 包,java7中的 forkjoin 框架 java.util.concurrent.RecursiveTask,到java8中Stream流、lambda表达式的支持,这一切都是为了支持高并发。

即便如此,多线程虽然极大的提升了性能,如果合理的使用线程池的话,好处,第一可以降低资源消耗,重复利用已创建的线程;第二:提高响应速度,任务可以不需要等到线程创建就能立即执行;第三:提高线程的可管理性。统一分配、调优和监控。但是线程池也不是没有缺点,使用k个线程的线程池就只能并发的执行k个任务,其他任务还是回休眠或者阻塞

这时候如果有线程不和其他任务相关联,又可以不用阻塞,就好了。Java8考虑到了,充分发挥了计算机硬件的处理能力,异步API 应运而生。

2、什么是同步?什么是异步?

同步就是 a 程序强依赖 b 程序,我必须等到你的回复或者执行完毕,才能做出下一步响应,类似于编程中程序被解释器(JVM)顺序执行一样(加载 > 验证 > 准备 > 解析 > 初始化);

异步则相反,a 程序不强依赖 b 程序,响应的时间也无所谓,无论你返回还是不返回,a 程序都能继续运行,也就是说我不存在等待对方的概念,a 程序就是 异步非阻塞的。

下面举一个例子就说明什么是同步、什么是异步

网络异常,图片无法展示
|

异步编程涉及两种风格,Future风格API 和反应式风格API ,Future<Integer> fun(int a){},fun( a , x-> {}),这两个模式的实战会在后面小结讲解。

二、Future异步编程

1、Future 接口

Java5中就引入了Future 接口,他的涉及初衷就是异步计算,例如我们结算一个商户下的所有订单,这个时候并不需要for循环去累加,Future 接口使用的时候只需要封装 Callable中,再提交给ExecutorService。

2、Future 接口的使用

Future 接口的使用看下Java8之前是如何使用异步的

public static void main(String[] args) throws Exception {
    List<OrderInfo> orderInfos = Arrays.asList(new OrderInfo("123", BigDecimal.ONE),
            new OrderInfo("456", BigDecimal.TEN), new OrderInfo("789", BigDecimal.TEN));
    // 创建 ExecutorService 通过它可以向线程池提交任务
    ExecutorService executorService = Executors.newCachedThreadPool();
    // 异步操作的同时,可以进行其他操作
    Future<BigDecimal> decimalFuture = executorService.submit(new Callable<BigDecimal>() {
        @Override
        public BigDecimal call() throws Exception {
            return reduceAmt(orderInfos);
        }
    });
    // Java8 写法
    Future<BigDecimal> decimalFuture = executorService.submit(() -> reduceAmt(orderInfos));
    System.out.println(decimalFuture.get());
}
private static BigDecimal reduceAmt(List<OrderInfo> orderInfos) {
    return orderInfos.stream()
            .map(OrderInfo::getOrderAmt)
            .reduce(BigDecimal.ZERO, BigDecimal::add);
}

异步编程可以在 ExecutorService 中,以并发的方式调用另一个线程执行操作,后续调用get() 方法获取操作结果,如果操作完成会立刻返回,如果操作没有完成则回阻塞线程,直到操作完成返回。

网络异常,图片无法展示
|

3、Future 接口的缺陷(局限性)

Future接口 还提供了方法来检测异步计算是否已经结束(isDone() 方法),等待异步操作结束。

但是当长时间计算任务完成时,该计算的结果通知到另一个长时间运行的计算任务,这两个计算任务都完成后,将计算的结果与另一个查询操作结果合并。

就会发生很多性能问题:1、将两个异步计算合并为一个(这两个异步计算之间相互独立,同时第二个又依赖于第一个的结果。)2、此时就要等待 Future 集合中的所有任务都完成。3、当Future的完成事件发生时会收到通知,使用Future计算的结果进行下一步的操作,不只是简单地阻塞等待操作的结果,每一步都需要等待。

三、CompletableFuture 接口详解

1、CompletableFuture 的创建

CompletableFuture.runAsync()也可以用来创建CompletableFuture实例。与supplyAsync()不同的是,runAsync()传入的任务要求是Runnable类型的,所以没有返回值,runAsync适合创建不需要返回值的计算任务

通过该supplyAsync 函数创建的CompletableFuture实例会异步执行当前传入的计算任务,在调用端,则可以通过get或join获取最终计算结果。

同事直接new(构造函数创建)也是可以的,下面通过一个实战小例子看下,高并发高性能的添加购物车结构如何设计

import java.util.concurrent.CompletableFuture;
import org.springframework.core.task.AsyncTaskExecutor;
public class CompletableFutureImpl {
    @Autowired
    @Qualifier("asyncExecutor")
    private AsyncTaskExecutor asyncTaskExecutor;
    public DefaultResponseVO addGoodsCart(HttpServletRequest request, @Valid AddCartReqVO reqVo) {
        // 添加购物车
        GetCartItemEntity respVO = mallCartProcess.addGoodsCart(buildAddGoodsCartReqVO(reqBo));
        /**
         * 异步刷新到购物车
         */
        CompletableFuture voidCompletableFuture = CompletableFuture.runAsync(() -> syncAddCacheCheck(respVO), asyncTaskExecutor);
        // 通过supplyAsync 函数创建的
        CompletableFuture<Object> uCompletableFuture = CompletableFuture.supplyAsync(() -> syncAddCacheCheck(respVO));
        // 构造函数创建 
        CompletableFuture completableFuture = new CompletableFuture().runAsync(() -> syncAddCacheCheck(respVO));
        return new DefaultResponseVO(code, msg, respVO);
    }
    /**
     * 添加购物车缓存(异步刷新到购物车)
     */
    public void syncAddCacheCheck(GetCartItemEntity cartItemEntity) {
        try {
            // 添加购物车缓存(异步刷新到购物车)
            mallCartProcess.getCartInfo(cartItemEntity);
        } catch (Exception e) {
            logger.error("syncAddCache error", e);
        }
    }
}

2、CompletableFuture.supplyAsync 源码分析

本小节讲解CompletableFuture的底层实现

网络异常,图片无法展示
|

上面为java 8中 supplyAsync 函数的实现源码。可以看到,当 supplyAsync 入参只有 supplier 时,会默认使用asyncPool作为线程池(一般情况下为ForkJoinPool的commonPool),并调用内部方法asyncSupplyStage执行具体的逻辑。在 asyncSupplyStage 方法中,程序会创建一个空的CompletableFuture 返回给调用方。同时该 CompletableFuture 和传入的 supplier 会被包装在一个AsyncSupply 实例对象中,然后一起提交到线程池中进行处理。

值得注意的是,当supplyAsync返回时,调用方只会拿到一个空的CompletableFuture实例。看到这里,我们可以猜测,当计算最终完成时,计算结果会被set到对应的CompletableFuture的result字段中。调用方通过join或者get就能取到该CompletableFuture的result字段的值。所以,虽然实际创建CompletableFuture的线程和进行任务计算的线程不同,但是最终会通过result来进行结果的传递。这种方式与传统的Future中结果传递方式类似(计算线程set值,使用线程get值)。

网络异常,图片无法展示
|

上面为java 8中 AsyncSupply 的实现源码,AsyncSupply的源码很简单。首先,它实现了Runnable接口,所以被提交到线程池中后,工作线程会执行其run()方法。通过对AsyncSupply中run方法的分析,也基本证实我们之前的猜测。即计算任务由工作线程调用run方法执行,并设置到CompletableFuture的结果中。其他线程中的使用方,则可以调用该CompletableFuture的join或者get方法获取其结果。

因此,我们只需要搞清楚其run()中的实现即可。在 run() 中,程序首先检查了传入的CompletableFuture 和 Supplier 是否为空,如果均不为空,再检查 CompletableFuture 的 d.result是否为空,如果不为空,则说明 CompletableFuture 的值已经被其他线程主动设置过了(这也是CompletableFuture与Future最大的不同之处),因此这里就不会再被重新设置一次。如果 d.result 为空,则调用Supplier(源码中的 f 变量)的get()方法,执行具体的计算,然后通过 completeValue 方法将结果设置到CompletableFuture中。最后,调用CompletableFuture的postComplete()方法,执行连接到当前CompletableFuture上的后置任务。

3、CompletableFuture.runAsync 源码分析

网络异常,图片无法展示
|

通过上面的源码可以看出,runAsync也会生成一个空的CompletableFuture,并包装在AsyncRun中提交到线程池中执行。这与supplyAsync是完全一致的。由于Runnable没有返回值,这里返回的CompletableFuture的结果值是Void类型的。

网络异常,图片无法展示
|

AsyncRun的run中,计算的执行是通过调用传入的Runnable(源码中的 f 变量)的run方法进行的。由于没有返回值,所以这里在设置CompletableFuture的值时,使用其completeNull()方法,设置一个特殊的空值标记。 设计方面和supplyAsync一致

4、CompletableFuture API 实战

thenApply、thenAccept、thenRun

thenApply 提交的任务类型需遵从Function签名,也就是有入参和返回值,其中入参为前置任务的结果。thenAccept 提交的任务类型需遵从Consumer签名,也就是有入参但是没有返回值,其中入参为前置任务的结果。thenRun 提交的任务类型需遵从Runnable签名,即没有入参也没有返回值。

thenCombine、thenCompose

thenCombine最大的不同是连接任务可以是一个独立的CompletableFuture。嵌套获取层级也越来越深。因此,需要一种方式,能将这种嵌套模式展开,使其没有那么多层级。thenCompose的主要目的就是解决这个问题(这里也可以将thenCompose的作用类比于stream接口中的flatMap,因为他们都可以将类型嵌套展开)。

whenComplete、handle

whenComplete主要用于注入任务完成时的回调通知逻辑(获得的结果是前置任务的结果,whenComplete中的逻辑不会影响计算结果)。handle与handle接收的处理函数有返回值,而且返回值会影响最终获取的计算结果(产生了新的结果)

四、反应式编程

1、什么是反应式(resctive)编程

反应式编程是最近几年才提出的概念,主要有四个特征:响应式,反应式编程的响应速度应该很快,而且是稳定可预测的。韧性,系统出现失败时,任然可以继续响应服务,任何一个组件都能以异步的方式想其他组件分发任务。弹性,影响代码响应的因素的代码(系统)的负载能力,反应式编程可以增加分配的资源,受流量影响后有自动适配的能力,服务更大的负载。消息驱动,各个组件之间松耦合,组件隔离,跨组件通信使用异步消息传递。

反应式(resctive)编程在应用层的主要特征是任务以异步的方式执行,非阻塞的处理事件流,充分利用多核CPU的特点,大多反应式框架(RxJava等)都可以独立开辟线程池,用于执行阻塞式操作,主线程池中运行都是无阻塞的。

2、反应式流(Flow API)

2.1、发布订阅模式

Future CompletableFuture 的思维模式是计算的执行是独立且并发的。使用 get()方法可以在执行结束后获取 Future 对象的执行结果。因此,Future 是一个一次性对象,它只能从头到尾执行代码一次。

与此相反,反应式编程的思维模式是类 Future 的对象随着时间的推移可以产生很多的结果。举个例子是 Web 服务器的监听组件对象。该组件监听来自网络的 HTTP请求,并根据请求的内容返回相应的数据。

网络异常,图片无法展示
|

2.2、Flow API 源码解析

Java9 使用 java.util.concurrent.Flow 提供的接口对反应式编程进行建模,实现了名为“发布-订阅”的模型(也叫协议,简写为 pub-sub )

反应式编程有三个主要的概念,分别是:订阅者可以订阅的发布者;名为订阅的连接;消息(也叫事件),它们通过连接传输。

反应式流(Flow API)规范可以总结为4个接口:Publisher(发布者)、Subscriber(订阅者)、Subscription(订阅)和Processor(处理者)

网络异常,图片无法展示
|

Publisher负责生成数据,并将数据发送给 Subscription(每个Subscriber对应一个Subscription)。Publisher接口声明了一个方法 subscribe(),Subscriber可以通过该方法向 Publisher发起订阅。

网络异常,图片无法展示
|

一旦Subscriber订阅成功,就可以接收来自Publisher的事件。这些事件是通过Subscriber接口上的方法发送的

网络异常,图片无法展示
|

Subscriber的第一个事件是通过对 onSubscribe()方法的调用接收的。Publisher调用 onSubscribe() 方法时,会将Subscription对象传递给 Subscriber。通过Subscription,Subscriber可以管理其订阅情况

网络异常,图片无法展示
|

Subscriber 可以通过调用 request() 方法来请求 Publisher 发送数据,或者通过调用 cancel()方法表明它不再对数据感兴趣并且取消订阅。当调用 request() 时,Subscriber 可以传入一个long类型的数值以表明它愿意接受多少数据。这也是回压能够发挥作用的地方,以避免Publisher 发送多于 Subscriber能够处理的数据量。在 Publisher 发送完所请求数量的数据项之后,Subscriber 可以再次调用 request()方法来请求更多的数据。

Subscriber 请求数据之后,数据就会开始流经反应式流。Publisher 发布的每个数据项都会通过调用 Subscriber 的 onNext()方法递交给 Subscriber。如果有任何错误,就会调用 onError()方法。如果 Publisher 没有更多的数据,也不会继续产生更多的数据,那么将会调用 Subscriber 的onComplete() 方法来告知 Subscriber 它已经结束

反应式流规范的接口本身并不支持以函数式的方式组成这样的流。Reactor 项目是反应式流规范的一个实现,提供了一组用于组装反应式流的函数式API。有我们自己实现。

2.3、Flow API 实战

FlowImpl :创建Publisher并向其订阅TempSubscriber

public class FlowImpl {
    public static void main(String[] args) {
        // 创建 Publisher 并向其订阅 Subscriber
        getOrderAmt("20220727123").subscribe(new SubscriberImpl());
    }
    // Publisher是个函数式接口
    private static Flow.Publisher<OrderInfo> getOrderAmt(String orderId) {
        return subscriber -> subscriber.onSubscribe(new SubscriptionImpl(subscriber, orderId));
    }
}

Subscription接口:实现向 Subscriber 发送 OrderInfo Steam

public class SubscriptionImpl implements Flow.Subscription {
    private final Flow.Subscriber<? super OrderInfo> subscriber;
    private final String orderId;
    public SubscriptionImpl(Flow.Subscriber<? super OrderInfo> subscriber, String orderId) {
        this.subscriber = subscriber;
        this.orderId = orderId;
    }
    @Override
    public void request(long n) {
        // 另起一个线程向 subscriber 发送下一个元素
        Executors.newSingleThreadExecutor().submit(() -> {
            for (long i = 0L; i < n; i++) // subscriber 每处理一个请求执行一次循环
                try {
                    // 将当前 订单号 发送给 Subscriber
                    subscriber.onNext(OrderInfo.reduceAmt(orderId));
                } catch (Exception e) {
                    // 查询报错将这个报错信息传给 Subscriber
                    subscriber.onError(e);
                    e.printStackTrace();
                    break;
                }
        });
    }
    @Override
    public void cancel() {
        // 如果 Subscription 被取消了,向 subscriber 发送一个完成信号
        subscriber.onComplete();
    }
}

Subscriber接口:实现打印输出收到的 订单 数据

public class SubscriberImpl implements Flow.Subscriber<OrderInfo> {
    private Flow.Subscription subscription;
    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }
    @Override
    public void onNext(OrderInfo orderInfo) {
        System.out.println(orderInfo);
        subscription.request(1);
    }
    @Override
    public void onError(Throwable throwable) {
        System.out.println(throwable.getMessage());
    }
    @Override
    public void onComplete() {
        System.out.println("Done!");
    }
}

总结

Java代码为了更好的发展和性能,开发了 异步编程的模式,Future异步编程和CompletableFuture 接口都可以实现异步编程,我们通过源码深入理解其原理和设计的思想,Java9中提供了反应式编程(Flow API)我们分析其源码并提供一个响应式查询实战。

相关文章
|
2天前
|
开发工具
Flutter-AnimatedWidget组件源码解析
Flutter-AnimatedWidget组件源码解析
|
11天前
|
开发者 图形学 Java
揭秘Unity物理引擎核心技术:从刚体动力学到关节连接,全方位教你如何在虚拟世界中重现真实物理现象——含实战代码示例与详细解析
【8月更文挑战第31天】Unity物理引擎对于游戏开发至关重要,它能够模拟真实的物理效果,如刚体运动、碰撞检测及关节连接等。通过Rigidbody和Collider组件,开发者可以轻松实现物体间的互动与碰撞。本文通过具体代码示例介绍了如何使用Unity物理引擎实现物体运动、施加力、使用关节连接以及模拟弹簧效果等功能,帮助开发者提升游戏的真实感与沉浸感。
28 1
|
4天前
|
存储 JSON API
Python编程:解析HTTP请求返回的JSON数据
使用Python处理HTTP请求和解析JSON数据既直接又高效。`requests`库的简洁性和强大功能使得发送请求、接收和解析响应变得异常简单。以上步骤和示例提供了一个基础的框架,可以根据你的具体需求进行调整和扩展。通过合适的异常处理,你的代码将更加健壮和可靠,为用户提供更加流畅的体验。
21 0
|
11天前
|
开发者 图形学 API
从零起步,深度揭秘:运用Unity引擎及网络编程技术,一步步搭建属于你的实时多人在线对战游戏平台——详尽指南与实战代码解析,带你轻松掌握网络化游戏开发的核心要领与最佳实践路径
【8月更文挑战第31天】构建实时多人对战平台是技术与创意的结合。本文使用成熟的Unity游戏开发引擎,从零开始指导读者搭建简单的实时对战平台。内容涵盖网络架构设计、Unity网络API应用及客户端与服务器通信。首先,创建新项目并选择适合多人游戏的模板,使用推荐的网络传输层。接着,定义基本玩法,如2D多人射击游戏,创建角色预制件并添加Rigidbody2D组件。然后,引入网络身份组件以同步对象状态。通过示例代码展示玩家控制逻辑,包括移动和发射子弹功能。最后,设置服务器端逻辑,处理客户端连接和断开。本文帮助读者掌握构建Unity多人对战平台的核心知识,为进一步开发打下基础。
32 0
|
11天前
|
开发者 图形学 C#
揭秘游戏沉浸感的秘密武器:深度解析Unity中的音频设计技巧,从背景音乐到动态音效,全面提升你的游戏氛围艺术——附实战代码示例与应用场景指导
【8月更文挑战第31天】音频设计在游戏开发中至关重要,不仅能增强沉浸感,还能传递信息,构建氛围。Unity作为跨平台游戏引擎,提供了丰富的音频处理功能,助力开发者轻松实现复杂音效。本文将探讨如何利用Unity的音频设计提升游戏氛围,并通过具体示例代码展示实现过程。例如,在恐怖游戏中,阴森的背景音乐和突然的脚步声能增加紧张感;在休闲游戏中,轻快的旋律则让玩家感到愉悦。
24 0
|
11天前
|
C# 开发者 Windows
勇敢迈出第一步:手把手教你如何在WPF开源项目中贡献你的第一行代码,从选择项目到提交PR的全过程解析与实战技巧分享
【8月更文挑战第31天】本文指导您如何在Windows Presentation Foundation(WPF)相关的开源项目中贡献代码。无论您是初学者还是有经验的开发者,参与这类项目都能加深对WPF框架的理解并拓展职业履历。文章推荐了一些适合入门的项目如MvvmLight和MahApps.Metro,并详细介绍了从选择项目、设置开发环境到提交代码的全过程。通过具体示例,如添加按钮点击事件处理程序,帮助您迈出第一步。此外,还强调了提交Pull Request时保持专业沟通的重要性。参与开源不仅能提升技能,还能促进社区交流。
22 0
|
11天前
|
数据库 Windows
超详细步骤解析:从零开始,手把手教你使用 Visual Studio 打造你的第一个 Windows Forms 应用程序,菜鸟也能轻松上手的编程入门指南来了!
【8月更文挑战第31天】创建你的第一个Windows Forms (WinForms) 应用程序是一个激动人心的过程,尤其适合编程新手。本指南将带你逐步完成一个简单WinForms 应用的开发。首先,在Visual Studio 中创建一个“Windows Forms App (.NET)”项目,命名为“我的第一个WinForms 应用”。接着,在空白窗体中添加一个按钮和一个标签控件,并设置按钮文本为“点击我”。然后,为按钮添加点击事件处理程序`button1_Click`,实现点击按钮后更新标签文本为“你好,你刚刚点击了按钮!”。
30 0
|
11天前
|
安全 开发者 数据安全/隐私保护
Xamarin 的安全性考虑与最佳实践:从数据加密到网络防护,全面解析构建安全移动应用的六大核心技术要点与实战代码示例
【8月更文挑战第31天】Xamarin 的安全性考虑与最佳实践对于构建安全可靠的跨平台移动应用至关重要。本文探讨了 Xamarin 开发中的关键安全因素,如数据加密、网络通信安全、权限管理等,并提供了 AES 加密算法的代码示例。
24 0
|
11天前
|
开发者 API 开发框架
Xamarin 在教育应用开发中的应用:从课程笔记到互动测验,全面解析使用Xamarin.Forms构建多功能教育平台的技术细节与实战示例
【8月更文挑战第31天】Xamarin 作为一款强大的跨平台移动开发框架,在教育应用开发中展现了巨大潜力。它允许开发者使用单一的 C# 代码库构建 iOS、Android 和 Windows 应用,确保不同设备上的一致体验。Xamarin 提供广泛的 API 支持,便于访问摄像头、GPS 等原生功能。本文通过一个简单的教育应用示例——课程笔记和测验功能,展示了 Xamarin 在实际开发中的应用过程。从定义用户界面到实现保存笔记和检查答案的逻辑,Xamarin 展现了其在教育应用开发中的高效性和灵活性。
21 0
|
11天前
|
开发框架 Android开发 iOS开发
跨平台开发的双重奏:Xamarin在不同规模项目中的实战表现与成功故事解析
【8月更文挑战第31天】在移动应用开发领域,选择合适的开发框架至关重要。Xamarin作为一款基于.NET的跨平台解决方案,凭借其独特的代码共享和快速迭代能力,赢得了广泛青睐。本文通过两个案例对比展示Xamarin的优势:一是初创公司利用Xamarin.Forms快速开发出适用于Android和iOS的应用;二是大型企业借助Xamarin实现高性能的原生应用体验及稳定的后端支持。无论是资源有限的小型企业还是需求复杂的大公司,Xamarin均能提供高效灵活的解决方案,彰显其在跨平台开发领域的强大实力。
20 0

热门文章

最新文章

推荐镜像

更多