今日榜首|10年高级技术专家用7000字带你详解响应式技术框架

简介: 目前在后端Web编程和微服务编程领域,存在多种响应式编程技术框架。本篇我们从响应式编程规范开始介绍,进一步加深对响应式编程的理解。

响应式技术框架

目前在后端Web编程和微服务编程领域,存在多种响应式编程技术框架。

本篇我们从响应式编程规范开始介绍,进一步加深对响应式编程的理解。

响应式编程规范

对于响应式编程来说,响应式流是一种非阻塞、响应式、异步流处理、支持背压的技术标准,包括运行时环境(JVM和JavaScript)及网络协议。JDK 9发布的Flow API(java.util.concurrent.Flow)和响应式流规范呼应,成为响应式编程事实上的标准。

响应式流规范提供了一组最小化的接口、方法和协议来描述必要的操作和实体对象。

● Publisher:消息发布者。发布者只有一种方法,用来接受订阅者进行订阅(Subscribe)。T代表发布者和订阅者之间传输的数据类型,接口声明如下:

网络异常,图片无法展示
|

● Subscriber:消息订阅者。当接收到Publisher的数据时,会调用响应的回调方法。注册完成时,首先会调用onSubscribe方法,参数Subscriptions包含了注册信息。订阅者有四种事件方法,分别在开启订阅、接收数据、发生错误和数据传输结束时被调用,接口声明如下:

网络异常,图片无法展示
|

● Subscription:连接Publisher和Subscriber的消息交互的操作对象。Subscriber可以请求数据(request),或者取消订阅(cancel)。当请求数据时,参数“long n”表示希望接收的数据量,防止Publisher发送过多的数据。一旦开始请求,数据就会在流中传输。每接收一个,就会调用onNext(Tt);当发生错误时,onError(Throwable t)被调用;在传输完成后,onComplete()被调用。接口声明如下:

网络异常,图片无法展示
|

● Processor:同时充当Subscriber和Publisher的组件。可以看出,Processor接口继承了Subscriber和Publisher,是流的中间环节,接口声明如下:

网络异常,图片无法展示
|

响应式流中的数据从Publisher开始,经过若干Processor,最终到达Subscriber,即完整的数据管道(Pipeline)。

背压(Back Pressure)

在响应式编程规范中,响应式编程采用异步的发布-订阅模式。数据由Publisher推送消息给Subscriber。这种模式容易产生的问题是,当Publisher即生产者产生的数据速度远远大于Subscriber即消费者的消费速度时,消费者会承受巨大的资源压力(Pressure)而可能崩溃。

为了解决以上问题,数据流的速度需要被控制,即流量控制(Flow Control),以防止快速的数据流压垮目标。因此需要反压,即背压(Back Pressure),生产者和消费者之间需要通过一种背压机制来相互操作。这种背压机制要求是异步非阻塞的,如果是同步阻塞的,则消费者在处理数据时,生产者必须等待,会产生性能问题。

Java Flow API

从Java 9开始,增加了java.util.concurrent.Flow API,实现了响应式流规范(Reactive Stream Specification),并且把响应式流标准的接口集成到了JDK中。它和响应式流标准接口定义完全一致,之前需要通过Maven引用的API,Java 9之后可以直接使用了。响应式流的标准Maven依赖如下:

网络异常,图片无法展示
|

Java9通过java.util.concurrent.Flow和
java.util.concurrent.SubmissionPublisher实现了响应式流Flow类中定义的四个嵌套的静态接口,用于建立流量控制的组件,Publisher在其中生成一个或多个数据项供Subscriber使用。下面是Java 9 FlowAPI的核心组件。

● java.util.concurrent.Flow:这是Flow API的主要类,该类封装了Flow API的所有重要接口。需要说明的是,这个类声明为final类型,所以我们无法扩展它。


java.util.concurrent.Flow.Publisher:每个发布者都需要实现此接口,每个发布者都必须实现它的subscribe方法,并添加相关的订阅者以接收消息。


java.util.concurrent.Flow.Subscriber:每个订阅者都必须实现此接口,订阅者按照严格的顺序调用方法,此接口有下面四种方法。

○ onSubscribe:这是订阅者订阅了发布者后接收消息时调用的第一个方法。通常我们调用subscription.request就开始从处理器(Processor)接收项目。

○ onNext:当发布者收到项目时调用此方法,这是我们实现业务逻辑来处理流并向发布者请求更多数据的方法。

○ onError:当发生不可恢复的错误时调用此方法,我们可以在此方法中执行清理操作,例如关闭数据库连接。

○ onComplete:这就像finally方法,在发布者没有发布其他项目或者发布者关闭时调用。可以用来发送流成功处理的通知。


java.util.concurrent.Flow.Subscription:用于在发布者和订 阅 者 之 间 创 建 异 步 非 阻 塞 连 接 。 订 阅 者 调 用 请 求(request)方法来向发布者请求项目。它还有取消订阅(cancel)的方法,即关闭发布者和订阅者之间的连接。


java.util.concurrent.Flow.Processor:此接口同时扩展了Publisher和Subscriber接口,用于在发布者和订阅者之间转换消息。


java.util.concurrent.SubmissionPublisher : 这 个 类 是 对Publisher接口的实现,它将提交的项目异步发送给当前订阅者,直到它关闭。它使用Executor框架,我们将在响应式流示例中使用该类来添加订阅者,然后向其提交项目。

Java 9 Flow API接入实例

下面使用Java 9 Flow API实现一个简单的发布消息订阅的例子。

1.创建一个Item类,作为创建从发布者到订阅者之间的流消息的对象

网络异常,图片无法展示
|

网络异常,图片无法展示
|

2.实现一个帮助类,创建一个Item列表

网络异常,图片无法展示
|

3.实现消息的订阅

网络异常,图片无法展示
|

网络异常,图片无法展示
|

在步骤3中,Subscription变量保持消费者对生产者的引用,通过onNext ( ) 方 法 进 行 请 求 处 理 ;Count 变 量 记 录 请 求 个 数 ; 在onSubscribe方法中调用订阅请求来开始处理;在onError方法和onComplete方法中调用发生错误和完成时执行的业务逻辑。

4.使用主程序测试完成逻辑

网络异常,图片无法展示
|

在步骤4中,首先使用SubmissionPublisher、TestSubscriber创建发布者和订阅者。通过publisher.subscribe(subs)建立发布者与订阅者之间的关联关系;然后发布者通过submit方法发送消息给订阅者,这个过程是异步执行的;在主线程的while循环中判断Item的size和消费累计的size;当Item全部消费完成时,退出主线程的While循环;最后关闭发布者以免任何内存泄漏。

下面是程序的输出结果:

网络异常,图片无法展示
|

RxJava响应式框架

RxJava基于ReactiveX(Reactive Extensions的缩写)库和框架,使用观察者模式、迭代器模式及函数式编程,提供了异步数据流处理、非阻塞背压等特性。

Reactive Extensions

这个概念最早出现在微软的.NET社区中,目前越来越多语言实现了自己的响应式扩展,如Java、Javascript、Ruby等。

Reactive Extensions是响应式编程的一种实现,是解决异步事件流的一种方案。通俗地讲,就是利用它可以很好地控制事件流的异步操作,将事件的发生和对事件的响应解耦,让开发者不再关心复杂的线程处理、锁等并发相关问题。

RxJava的接入实例

RxJava 2.x实现了响应式流规范。它是Netflix开发的一个响应式编程框架。下面是RxJava的典型开发代码:

网络异常,图片无法展示
|

网络异常,图片无法展示
|

Observable

Observable可以理解为数据的发射器,对应Java Flow的发布者(Publisher)组件,通过create方法生成Observer对象。它会执行相关 业 务 逻 辑 并 通 过 emit 方 法 发 射 数 据 , 传 入 的 参 数 是ObservableOnSubscribe对象,使用泛型T作为操作对象的类型。你可以重写subscribe方法,里面是具体的数据源计划,前面的例子中是发射三个数字:1、2、3。ObservableEmitter是发射器的意思,有三种发 射 数 据 的 方 法 : void onNext( T value ) 、 voidonError(Throwable error)、void onComplete()。onNext方法可以 无 限 调 用 , 观 察 者 ( Observer ) 可 以 接 收 到 所 有 发 布 者(Publisher)发布的数据库,onError和onComplete是互斥的。

Observer

Observer 是 数 据 的 观 察 者 , 对 应 Java Flow 的 订 阅 者(Subscriber)组件,通过new方法创建并重写内部方法,onNext、onError、onComplete都是与被观察者发射的方法一一对应的。在本例中,订阅者的onNext方法处理消费数据逻辑,当收到的数据等于20时,将取消订阅,此时数据的发布者就不再向观察者推送数据。通过dispose方法可以取消Observer和Observable之前的订阅关系。

Scheduler

RxJava支持异步通信的特性是通过Schedulers组件实现的,Scheduler的中文意思是调度器。在RxJava中,可以通过Scheduler来控制调度线程,从Scheduler的源码可以发现它本质上是操纵Runnable对象,支持用立即、延时、周期形式来调度工作线程。RxJava 2.x中内置了多种Scheduler实现,适用于不同场景。这些Scheduler可以在代码中直接使用,屏蔽了开发者对线程调用的管理和控制。在前面的例子中我们使用了Schedulers.io()作为线程调度策略,下表总结的是Schedulers不同的线程调度策略。

网络异常,图片无法展示
|

Operator

RxJava在处理事件的流转过程中,提供了丰富的操作符,用来改变事件流中的数据。以Map操作符为例,Map的作用是将发射的事件进行Map函数定义的数据转换,再将转换后的事件发射给Observer。转换过程如下图所示。

网络异常,图片无法展示
|

(1)通过Emitter发射了1、2、3三个数字。

(2)中间通过Map进行转换,转换后事件变成10、20、30。

(3)最后将转换后的事件发射给Observer。

RxJava2-Android-Samples(GitHub开源项目)的Readme.md中总结了RxJava用到的所有操作符,篇幅所限,其他操作符可以从Reactive官方地址获得详解。RxJava的主要操作符如下表所示。

网络异常,图片无法展示
|

Reactor响应式框架

Reactor是Pivotal基于Reactive Streams规范实现的响应式框架 。 作 为 Spring 的 兄 弟 项 目 , 它 进 一 步 扩 展 了 基 本 的 ReactiveStreams Publisher及Flux和Mono API等组件,主要使用依赖的组件是Reactor Core模块。

Reactor项目已在GitHub中开源(可使用Reactor关键字搜索),主要包含Reactor Core和Reactor Netty两部分。Reactor Core实现了反应式编程的核心功能,Reactor Netty则是Spring WebFlux等技术的基础。

Reactor的接入实例

1.使用Reactor进行响应式编程,加载对应的Maven依赖

网络异常,图片无法展示
|

2.使用Reactor进行响应式编程的Demo

网络异常,图片无法展示
|

网络异常,图片无法展示
|

3.执行上述程序得到如下结果

网络异常,图片无法展示
|

在Reactor项目中,主要有与RxJava类似的发布者、订阅者、操作符等关键API和语法概念,下面结合代码实例讲解主要用到的模块。

Reactor的核心模块

● Flux

Flux是Reactor中数据发布者的重要抽象类。从源码中可以发现,Flux实现了Reactive Streams JVM API Publisher。Flux定义了0~N的非阻塞序列,类比非阻塞Stream,在Reactor中充当数据发布者的角色。在上述实例中,Flux通过just方法发布数据流。just方法是Flux常见的创建Stream的方法,此外,还可以通过create、generate、from等方法创建Flux数据流。上面例子中使用最简单的just方法完成了三个数字的构造和声明发布,如下图所示。

网络异常,图片无法展示
|

● Mono

Mono和Flux类似。从源码中可以发现,Mono同样实现了ReactiveStreams JVM API Publisher,实现了0~1的非阻塞结果,如下图所示。

网络异常,图片无法展示
|

● Subscriber

订阅者通过订阅操作,可以处理数据的请求,在订阅方法中需要重写onSubscribe、onNext、onError、onComplete方法来实现数据流的消费。Flux调用subscribe方法后会触发数据的发送,订阅者接收到数据后会触发onSubscribe方法。onSubscribe表示订阅动作的方式,准备发送给真正的消息接收者,然后执行subscription.request方法发送请求数据。代码例子中request(1)表示只发送一条数据,也可以使用subscription.cancel取消上游数据的传输。然后执行onNext方法进行消息的响应处理,在onNext方法中执行request方法可以把数据交给subscription链,循环处理所有数据。

● Operator

在Reactor项目中,一个Operator会给一个发布者(Publisher)添加某种行为,并返回一个新的Publisher实例。还可以对返回的Publisher再添加Operator连成一个链条。原始数据沿着链条从第一个Publisher开始向下流动,链条中的每个节点都会以某种方式去转换流入的数据。链条的终点是一个订阅者(Subscriber),Subscriber以某种方式消费这些数据,流程图如下图所示。

网络异常,图片无法展示
|

下面是对Reactor项目中Operator的总结分类,大致可以分为如下几类。

● 集合Operator:提供集合运算,如map、filter、sort、group、reduce等,和Java 8 Stream的中间操作具有相同的效果。

● 异 常 处 理 Operator : 提 供 异 常 处 理 机 制 , 如 retry 、onErrorReturn等。

● 回 调 Operator : 提 供 Publisher 状 态 转 换 时 的 回 调 , 如doOnCancel、doOnRequest等。

● 行为Operator:修改Publisher的默认行为,为其添加更多功能,如buffer、defaultIfEmpty、onBackpressureXXX等。

● 调试Operator:添加调试信息,如log、elapsed等。

Vert.X响应式编程

Vert.X是基于JVM构建的一个Reactive工具箱。同时,Vert.X和Spring类似,也有一套微服务开发生态。从开发者的角度来看,Vert.X就是一些库包,提供了HTTP客户端和服务器、消息服务、TCP和UDP底层协议等模块。你可以使用这些模块来构建自己的应用,也可以通过向Vert.X Core(Vert.X的基础组件)中增加任意模块来构建自己的系统。

Vert.X的主要功能

● Web开发,Vert.X封装了Web开发常用的组件,支持路由、Session管理、模板等。

● TCP/UDP开发,Vert.X底层基于Netty,提供了丰富的I/O类库,支持多种网络应用开发,不需要处理底层细节(如拆包和粘包),注重业务代码编写。

● 提供对WebSocket的支持,可以做网络聊天室、动态推送等。

● Event Bus(事件总线)是Vert.X的神经系统,通过Event Bus可以实现分布式消息、远程方法调用等。正是因为Event Bus的存在,Vert.X才可以更加便捷地开发微服务应用。

● 支 持 主 流 的 数 据 和 消 息 的 访 问 , 如 Redis 、 MongoDB 、RabbitMQ、Kafka等。

● 支持分布式锁、分布式计数器、分布式Map。

Vert.X的特性

● 异步非阻塞:Vert.X就像是跑在JVM上的Node.js(使用事件驱动、非阻塞式I/O模型的JavaScript运行环境),所以Vert.X的第一个优势就是它实现了一个异步的非阻塞框架。

● Vert.X支持多编程语言,在Vert.X上,可以使用JavaScript、Java、Scala、Ruby等语言。

● 不依赖中间件:Vert.X的底层依赖Netty,因此在使用Vert.X构建Web项目时,不依赖中间件。像Node一样,可以直接创建一个HttpServer,相对会更灵活一些,安全性也会更高一些。

● 完善的生态:Vert.X提供数据库操作、Redis操作、Web客户端操作等丰富的组件功能。

Vert.X的接入实例

1.加载对应的Maven依赖

网络异常,图片无法展示
|

2.Vert.X提供了一个创建HTTP服务器的简单方法,该服务器会在每次接收到HTTP请求时返回一个“Hello”的response

网络异常,图片无法展示
|

在这个例子里,我们创建了一个requestHandler来接收HTTP请求事件,并且返回响应。在Vert.X中,所有API都不会阻塞调用线程,如果不能立即响应结果,Handler会在事件准备好后处理,通过异步操作回调Handler方法触发执行。这种非阻塞的开发模型,可以使用较少的线程处理高并发场景。下面是Vert.X中EventLoop的工作模型图。

网络异常,图片无法展示
|

Verticle是Vert.X中的重要组件,可以理解成Java中的Servlet、POJO Bean或Akka中的Actor。一个组件可以有多个实例,Verticle实例之间的通信通过Event Bus实现。

ProducerVerticle负责监听8080端口,接收前端请求,它可以通过Event Bus发送一个事件,该事件将被传递给多个该事件的订阅者,代码如下。

网络异常,图片无法展示
|

网络异常,图片无法展示
|

ConsumeVerticle负责消费Event Bus的数据并返回响应,代码如下。

网络异常,图片无法展示
|

MainApp是启动类,在main方法中发布两个Verticle,下面代码是启动主流程的方法。

网络异常,图片无法展示
|

浏览器调用接口http://127.0.0.1:8080/book/1,出现下面结果则表示正确。

网络异常,图片无法展示
|

Verticle具有以下几个特点。

● 每个Verticle都占用一个EventLoop线程,且只对应一个EventLoop。

● 每个Verticle中创建的HttpServer、EventBus等资源都会在回收Verticle时被同步回收。

● 在多个Verticle中创建同样端口的HttpServer,会变成两个EventLoop线程,处理同一个HttpServer的连接,可以利用Verticle的这一特性来提升并发处理性能。

Spring Boot 2响应式编程

Spring Boot 2.x在Spring Boot 1.x基础上,基于Spring 5实现了响应式编程框架。从Spring MVC注解驱动的时代开始,Spring官方有意识地去Servlet化。不过在Spring MVC时代,Spring仍然摆脱不了对 Servlet 容 器 的 依 赖 , 然 而 借 助 响 应 式 编 程 ( ReactiveProgramming)的势头,Spring加速了这一时代的到来。WebFlux将Servlet容器从必须项变为可选项,并且默认采用Netty Web Server作为HTTP容器的处理引擎,形成Spring全新的技术体系,包括数据存储等技术栈。Spring Boot 2官方提供的基于Reactor与Servlet容器生态和技术栈的对比如下图所示。

网络异常,图片无法展示
|

对比发现,Spring Boot 2.x与Spring Boot 1.x在技术栈上存在巨大差异。Spring Boot 2.x最显著的变化就是采用了响应式的技术体系。底层的Reactive核心组件、响应式WebFlux框架、响应式数据存储、响应式安全、响应式Web服务引擎组成了Spring响应式技术体系。

下面列举了Spring Boot 2中支持响应式编程的部分模块。

Spring Core

Spring Core 是 Spring 的 核 心 模 块 。 Spring Framework 5 基 于ProjectReactor和RxJava反应式项目及响应式编程规范实现了对响应式编程的支持。在Spring Core中通过引入ReactiveAdapter实现了Object和Publisher<T>的相互转换,代码如下:

网络异常,图片无法展示
|

使用者可以通过继承ReactiveAdapter实现定制化的数据类型转换 。 ReactiveAdapterRegistry 可 以 作 为 对 象 池 来 保 持ReactiveAdapter实例并提供相应的数据访问方式。

响应式I/O

Spring Core提供了对I/O的响应式编程支持。Spring Core首先引入了一个字节缓存抽象接口DataBuffer,提供了一个DataBufferUtils工具类,可以实现以Reactive方式对I/O进行访问和交互。从下面的示例代码可以看到,DataBufferUtils返回了一个Flux对象,这样就可以使用Reactor相关接口读取test.txt文件,实现背压的响应式特性。

网络异常,图片无法展示
|

同时,Spring Core通过下面接口实现了基于响应式流的编解码实现类,这样可以方便DataBuffer实例与对象的相互转化,代码如下:

网络异常,图片无法展示
|

网络异常,图片无法展示
|

Spring WebFlux构建响应式Web服务

在Web服务方面,Spring 2.x提供了WebFlux框架,基于Flux和Mono对象实现响应式非阻塞Web服务。同时提供了一个响应式的HTTPWebClient,它可以通过函数式的方式异步非阻塞地发起HTTP请求并处理响应。Spring WebFlux也提供了响应式的WebSocketClient。下一节我们会详细讲解Spring的WebFlux框架。

数据层支持响应式

开发基于响应式流的应用,就像搭建数据流的管道,使异步数据能够顺畅流过每个环节。大多数系统免不了要与数据库交互,所以我们也需要响应式的持久层API和支持异步的数据库驱动。在消息的处理过程中,如果数据管道在任何一个环节发生阻塞,都有可能造成整体吞吐量的下降。

各个数据库都开始陆续推出异步驱动的技术支持,目前可以支持响应式数据访问的数据库有MongoDB、Redis、Apache Cassandra和CouchDB。

相关生态的响应式支持

● Spring 5实现了对Spring Security的响应式支持。

● Spring Cloud基于WebFlux框架实现了Spring Cloud Gateway微服务网关。

● Spring Test实现了响应式的支持类WebTestClient。

● 在监控领域,Sleuth也提供对响应式WebFlux的追踪支持。

本文给大家讲解的内容是响应式微服务架构,响应式技术框架


  1. 觉得文章不错的朋友可以转发此文关注小编;
  2. 感谢大家的支持!


相关文章
|
人工智能 数据库 开发者
社区供稿 | 零一万物 Yi-34B开源大模型,邀请全球开发者共建创新生态
大语言模型已经成为AI核心基础能力,全球发展方兴未艾,头部企业并驱争先。近日,全球AI专家李开复博士带队创办AI 2.0公司零一万物,正式开源发布 Yi Open-source。
|
数据可视化 关系型数据库 开发工具
开放原子训练营(第三季)inBuilder低代码开发实验室之探秘
开放原子训练营(第三季)inBuilder低代码开发实验室之探秘
180 0
开放原子训练营(第三季)inBuilder低代码开发实验室之探秘
|
12月前
|
安全 Java 虚拟化
涅槃重生!字节大牛力荐大型分布式手册,凤凰架构让你浴火成神
从大型机到单体架构,从微服务架构到无服务架构,每一次架构模式的演进都是一次涅槃。每一个软件系统都是由大量服务构成的生态体系,个体服务的“死亡”和“重生”是整个系统能否持续可靠运行的关键因素。笔记从5个方面全面剖析了如何构建一个可靠的分布式系统,同时给出了Spring Boot、Spring Cloud、Kubernetes、Istio、AWS Lambda五种架构风格的样例工程。
|
人工智能 安全 算法
【社区图书馆】《新程序员005:开源深度指南 & 新金融背后的科技力量》
【社区图书馆】《新程序员005:开源深度指南 & 新金融背后的科技力量》
|
IDE 前端开发 数据可视化
QCon 2022·上海站 | 学习笔记7: 动态布局技术、原理及实践
QCon 2022·上海站 | 学习笔记7: 动态布局技术、原理及实践
293 0
QCon 2022·上海站 | 学习笔记7: 动态布局技术、原理及实践
|
数据采集 缓存 移动开发
前端进阶:细数2年内我做的15个开源项目
从19年到21年,笔者利用空余时间陆陆续续做了一些开源项目, 大部分开源项目都立足于企业实际业务需求, 所以笔者觉得有必要做一个总结和复盘,在复盘的过程中希望也能对大家有所帮助.今后笔者的开源项目都会放在这篇文章中,如果想学习的可以多交流.
379 0
|
弹性计算 运维 前端开发
最受欢迎五大技术图谱出炉!看看大佬们都在学什么
阿里云开发者学堂15个技术图谱,哪些最受开发者们喜欢?小助手已经帮你整理出榜单了,快来学习吧!
专访开源之道主创 · 适兕:真实的开源世界依旧冷清
专访开源之道主创 · 适兕:真实的开源世界依旧冷清
|
小程序 容器
|
人工智能 运维 Cloud Native
一图剧透!阿里巴巴研发效能峰会核心看点速览
6月12日-13日 阿里巴巴内部研发效能峰会首次对外直播! 7大论坛,35个议题,1300分钟技术干货! 39位技术大咖,4万阿里工程师,邀你共享研发效能盛宴!
2515 0
一图剧透!阿里巴巴研发效能峰会核心看点速览