响应式技术框架
目前在后端Web编程和微服务编程领域,存在多种响应式编程技术框架。
本篇我们从响应式编程规范开始介绍,进一步加深对响应式编程的理解。
响应式编程规范
对于响应式编程来说,响应式流是一种非阻塞、响应式、异步流处理、支持背压的技术标准,包括运行时环境(JVM和JavaScript)及网络协议。JDK 9发布的Flow API(java.util.concurrent.Flow)和响应式流规范呼应,成为响应式编程事实上的标准。
响应式流规范提供了一组最小化的接口、方法和协议来描述必要的操作和实体对象。
● Publisher:消息发布者。发布者只有一种方法,用来接受订阅者进行订阅(Subscribe)。T代表发布者和订阅者之间传输的数据类型,接口声明如下:
● Subscriber:消息订阅者。当接收到Publisher的数据时,会调用响应的回调方法。注册完成时,首先会调用onSubscribe方法,参数Subscriptions包含了注册信息。订阅者有四种事件方法,分别在开启订阅、接收数据、发生错误和数据传输结束时被调用,接口声明如下:
● Subscription:连接Publisher和Subscriber的消息交互的操作对象。Subscriber可以请求数据(request),或者取消订阅(cancel)。当请求数据时,参数“long n”表示希望接收的数据量,防止Publisher发送过多的数据。一旦开始请求,数据就会在流中传输。每接收一个,就会调用onNext(Tt);当发生错误时,onError(Throwable t)被调用;在传输完成后,onComplete()被调用。接口声明如下:
● Processor:同时充当Subscriber和Publisher的组件。可以看出,Processor接口继承了Subscriber和Publisher,是流的中间环节,接口声明如下:
响应式流中的数据从Publisher开始,经过若干Processor,最终到达Subscriber,即完整的数据管道(Pipeline)。
背压(Back Pressure)
在响应式编程规范中,响应式编程采用异步的发布-订阅模式。数据由Publisher推送消息给Subscriber。这种模式容易产生的问题是,当Publisher即生产者产生的数据速度远远大于Subscriber即消费者的消费速度时,消费者会承受巨大的资源压力(Pressure)而可能崩溃。
为了解决以上问题,数据流的速度需要被控制,即流量控制(Flow Control),以防止快速的数据流压垮目标。因此需要反压,即背压(Back Pressure),生产者和消费者之间需要通过一种背压机制来相互操作。这种背压机制要求是异步非阻塞的,如果是同步阻塞的,则消费者在处理数据时,生产者必须等待,会产生性能问题。
Java Flow API
从Java 9开始,增加了java.util.concurrent.Flow API,实现了响应式流规范(Reactive Stream Specification),并且把响应式流标准的接口集成到了JDK中。它和响应式流标准接口定义完全一致,之前需要通过Maven引用的API,Java 9之后可以直接使用了。响应式流的标准Maven依赖如下:
Java9通过java.util.concurrent.Flow和
java.util.concurrent.SubmissionPublisher实现了响应式流Flow类中定义的四个嵌套的静态接口,用于建立流量控制的组件,Publisher在其中生成一个或多个数据项供Subscriber使用。下面是Java 9 FlowAPI的核心组件。
● java.util.concurrent.Flow:这是Flow API的主要类,该类封装了Flow API的所有重要接口。需要说明的是,这个类声明为final类型,所以我们无法扩展它。
●
java.util.concurrent.Flow.Publisher:每个发布者都需要实现此接口,每个发布者都必须实现它的subscribe方法,并添加相关的订阅者以接收消息。
●
java.util.concurrent.Flow.Subscriber:每个订阅者都必须实现此接口,订阅者按照严格的顺序调用方法,此接口有下面四种方法。
○ onSubscribe:这是订阅者订阅了发布者后接收消息时调用的第一个方法。通常我们调用subscription.request就开始从处理器(Processor)接收项目。
○ onNext:当发布者收到项目时调用此方法,这是我们实现业务逻辑来处理流并向发布者请求更多数据的方法。
○ onError:当发生不可恢复的错误时调用此方法,我们可以在此方法中执行清理操作,例如关闭数据库连接。
○ onComplete:这就像finally方法,在发布者没有发布其他项目或者发布者关闭时调用。可以用来发送流成功处理的通知。
●
java.util.concurrent.Flow.Subscription:用于在发布者和订 阅 者 之 间 创 建 异 步 非 阻 塞 连 接 。 订 阅 者 调 用 请 求(request)方法来向发布者请求项目。它还有取消订阅(cancel)的方法,即关闭发布者和订阅者之间的连接。
●
java.util.concurrent.Flow.Processor:此接口同时扩展了Publisher和Subscriber接口,用于在发布者和订阅者之间转换消息。
●
java.util.concurrent.SubmissionPublisher : 这 个 类 是 对Publisher接口的实现,它将提交的项目异步发送给当前订阅者,直到它关闭。它使用Executor框架,我们将在响应式流示例中使用该类来添加订阅者,然后向其提交项目。
Java 9 Flow API接入实例
下面使用Java 9 Flow API实现一个简单的发布消息订阅的例子。
1.创建一个Item类,作为创建从发布者到订阅者之间的流消息的对象
2.实现一个帮助类,创建一个Item列表
3.实现消息的订阅
在步骤3中,Subscription变量保持消费者对生产者的引用,通过onNext ( ) 方 法 进 行 请 求 处 理 ;Count 变 量 记 录 请 求 个 数 ; 在onSubscribe方法中调用订阅请求来开始处理;在onError方法和onComplete方法中调用发生错误和完成时执行的业务逻辑。
4.使用主程序测试完成逻辑
在步骤4中,首先使用SubmissionPublisher、TestSubscriber创建发布者和订阅者。通过publisher.subscribe(subs)建立发布者与订阅者之间的关联关系;然后发布者通过submit方法发送消息给订阅者,这个过程是异步执行的;在主线程的while循环中判断Item的size和消费累计的size;当Item全部消费完成时,退出主线程的While循环;最后关闭发布者以免任何内存泄漏。
下面是程序的输出结果:
RxJava响应式框架
RxJava基于ReactiveX(Reactive Extensions的缩写)库和框架,使用观察者模式、迭代器模式及函数式编程,提供了异步数据流处理、非阻塞背压等特性。
Reactive Extensions
这个概念最早出现在微软的.NET社区中,目前越来越多语言实现了自己的响应式扩展,如Java、Javascript、Ruby等。
Reactive Extensions是响应式编程的一种实现,是解决异步事件流的一种方案。通俗地讲,就是利用它可以很好地控制事件流的异步操作,将事件的发生和对事件的响应解耦,让开发者不再关心复杂的线程处理、锁等并发相关问题。
RxJava的接入实例
RxJava 2.x实现了响应式流规范。它是Netflix开发的一个响应式编程框架。下面是RxJava的典型开发代码:
Observable
Observable可以理解为数据的发射器,对应Java Flow的发布者(Publisher)组件,通过create方法生成Observer对象。它会执行相关 业 务 逻 辑 并 通 过 emit 方 法 发 射 数 据 , 传 入 的 参 数 是ObservableOnSubscribe对象,使用泛型T作为操作对象的类型。你可以重写subscribe方法,里面是具体的数据源计划,前面的例子中是发射三个数字:1、2、3。ObservableEmitter是发射器的意思,有三种发 射 数 据 的 方 法 : void onNext( T value ) 、 voidonError(Throwable error)、void onComplete()。onNext方法可以 无 限 调 用 , 观 察 者 ( Observer ) 可 以 接 收 到 所 有 发 布 者(Publisher)发布的数据库,onError和onComplete是互斥的。
Observer
Observer 是 数 据 的 观 察 者 , 对 应 Java Flow 的 订 阅 者(Subscriber)组件,通过new方法创建并重写内部方法,onNext、onError、onComplete都是与被观察者发射的方法一一对应的。在本例中,订阅者的onNext方法处理消费数据逻辑,当收到的数据等于20时,将取消订阅,此时数据的发布者就不再向观察者推送数据。通过dispose方法可以取消Observer和Observable之前的订阅关系。
Scheduler
RxJava支持异步通信的特性是通过Schedulers组件实现的,Scheduler的中文意思是调度器。在RxJava中,可以通过Scheduler来控制调度线程,从Scheduler的源码可以发现它本质上是操纵Runnable对象,支持用立即、延时、周期形式来调度工作线程。RxJava 2.x中内置了多种Scheduler实现,适用于不同场景。这些Scheduler可以在代码中直接使用,屏蔽了开发者对线程调用的管理和控制。在前面的例子中我们使用了Schedulers.io()作为线程调度策略,下表总结的是Schedulers不同的线程调度策略。
Operator
RxJava在处理事件的流转过程中,提供了丰富的操作符,用来改变事件流中的数据。以Map操作符为例,Map的作用是将发射的事件进行Map函数定义的数据转换,再将转换后的事件发射给Observer。转换过程如下图所示。
(1)通过Emitter发射了1、2、3三个数字。
(2)中间通过Map进行转换,转换后事件变成10、20、30。
(3)最后将转换后的事件发射给Observer。
RxJava2-Android-Samples(GitHub开源项目)的Readme.md中总结了RxJava用到的所有操作符,篇幅所限,其他操作符可以从Reactive官方地址获得详解。RxJava的主要操作符如下表所示。
Reactor响应式框架
Reactor是Pivotal基于Reactive Streams规范实现的响应式框架 。 作 为 Spring 的 兄 弟 项 目 , 它 进 一 步 扩 展 了 基 本 的 ReactiveStreams Publisher及Flux和Mono API等组件,主要使用依赖的组件是Reactor Core模块。
Reactor项目已在GitHub中开源(可使用Reactor关键字搜索),主要包含Reactor Core和Reactor Netty两部分。Reactor Core实现了反应式编程的核心功能,Reactor Netty则是Spring WebFlux等技术的基础。
Reactor的接入实例
1.使用Reactor进行响应式编程,加载对应的Maven依赖
2.使用Reactor进行响应式编程的Demo
3.执行上述程序得到如下结果
在Reactor项目中,主要有与RxJava类似的发布者、订阅者、操作符等关键API和语法概念,下面结合代码实例讲解主要用到的模块。
Reactor的核心模块
● Flux
Flux是Reactor中数据发布者的重要抽象类。从源码中可以发现,Flux实现了Reactive Streams JVM API Publisher。Flux定义了0~N的非阻塞序列,类比非阻塞Stream,在Reactor中充当数据发布者的角色。在上述实例中,Flux通过just方法发布数据流。just方法是Flux常见的创建Stream的方法,此外,还可以通过create、generate、from等方法创建Flux数据流。上面例子中使用最简单的just方法完成了三个数字的构造和声明发布,如下图所示。
● Mono
Mono和Flux类似。从源码中可以发现,Mono同样实现了ReactiveStreams JVM API Publisher,实现了0~1的非阻塞结果,如下图所示。
● Subscriber
订阅者通过订阅操作,可以处理数据的请求,在订阅方法中需要重写onSubscribe、onNext、onError、onComplete方法来实现数据流的消费。Flux调用subscribe方法后会触发数据的发送,订阅者接收到数据后会触发onSubscribe方法。onSubscribe表示订阅动作的方式,准备发送给真正的消息接收者,然后执行subscription.request方法发送请求数据。代码例子中request(1)表示只发送一条数据,也可以使用subscription.cancel取消上游数据的传输。然后执行onNext方法进行消息的响应处理,在onNext方法中执行request方法可以把数据交给subscription链,循环处理所有数据。
● Operator
在Reactor项目中,一个Operator会给一个发布者(Publisher)添加某种行为,并返回一个新的Publisher实例。还可以对返回的Publisher再添加Operator连成一个链条。原始数据沿着链条从第一个Publisher开始向下流动,链条中的每个节点都会以某种方式去转换流入的数据。链条的终点是一个订阅者(Subscriber),Subscriber以某种方式消费这些数据,流程图如下图所示。
下面是对Reactor项目中Operator的总结分类,大致可以分为如下几类。
● 集合Operator:提供集合运算,如map、filter、sort、group、reduce等,和Java 8 Stream的中间操作具有相同的效果。
● 异 常 处 理 Operator : 提 供 异 常 处 理 机 制 , 如 retry 、onErrorReturn等。
● 回 调 Operator : 提 供 Publisher 状 态 转 换 时 的 回 调 , 如doOnCancel、doOnRequest等。
● 行为Operator:修改Publisher的默认行为,为其添加更多功能,如buffer、defaultIfEmpty、onBackpressureXXX等。
● 调试Operator:添加调试信息,如log、elapsed等。
Vert.X响应式编程
Vert.X是基于JVM构建的一个Reactive工具箱。同时,Vert.X和Spring类似,也有一套微服务开发生态。从开发者的角度来看,Vert.X就是一些库包,提供了HTTP客户端和服务器、消息服务、TCP和UDP底层协议等模块。你可以使用这些模块来构建自己的应用,也可以通过向Vert.X Core(Vert.X的基础组件)中增加任意模块来构建自己的系统。
Vert.X的主要功能
● Web开发,Vert.X封装了Web开发常用的组件,支持路由、Session管理、模板等。
● TCP/UDP开发,Vert.X底层基于Netty,提供了丰富的I/O类库,支持多种网络应用开发,不需要处理底层细节(如拆包和粘包),注重业务代码编写。
● 提供对WebSocket的支持,可以做网络聊天室、动态推送等。
● Event Bus(事件总线)是Vert.X的神经系统,通过Event Bus可以实现分布式消息、远程方法调用等。正是因为Event Bus的存在,Vert.X才可以更加便捷地开发微服务应用。
● 支 持 主 流 的 数 据 和 消 息 的 访 问 , 如 Redis 、 MongoDB 、RabbitMQ、Kafka等。
● 支持分布式锁、分布式计数器、分布式Map。
Vert.X的特性
● 异步非阻塞:Vert.X就像是跑在JVM上的Node.js(使用事件驱动、非阻塞式I/O模型的JavaScript运行环境),所以Vert.X的第一个优势就是它实现了一个异步的非阻塞框架。
● Vert.X支持多编程语言,在Vert.X上,可以使用JavaScript、Java、Scala、Ruby等语言。
● 不依赖中间件:Vert.X的底层依赖Netty,因此在使用Vert.X构建Web项目时,不依赖中间件。像Node一样,可以直接创建一个HttpServer,相对会更灵活一些,安全性也会更高一些。
● 完善的生态:Vert.X提供数据库操作、Redis操作、Web客户端操作等丰富的组件功能。
Vert.X的接入实例
1.加载对应的Maven依赖
2.Vert.X提供了一个创建HTTP服务器的简单方法,该服务器会在每次接收到HTTP请求时返回一个“Hello”的response
在这个例子里,我们创建了一个requestHandler来接收HTTP请求事件,并且返回响应。在Vert.X中,所有API都不会阻塞调用线程,如果不能立即响应结果,Handler会在事件准备好后处理,通过异步操作回调Handler方法触发执行。这种非阻塞的开发模型,可以使用较少的线程处理高并发场景。下面是Vert.X中EventLoop的工作模型图。
Verticle是Vert.X中的重要组件,可以理解成Java中的Servlet、POJO Bean或Akka中的Actor。一个组件可以有多个实例,Verticle实例之间的通信通过Event Bus实现。
ProducerVerticle负责监听8080端口,接收前端请求,它可以通过Event Bus发送一个事件,该事件将被传递给多个该事件的订阅者,代码如下。
ConsumeVerticle负责消费Event Bus的数据并返回响应,代码如下。
MainApp是启动类,在main方法中发布两个Verticle,下面代码是启动主流程的方法。
浏览器调用接口http://127.0.0.1:8080/book/1,出现下面结果则表示正确。
Verticle具有以下几个特点。
● 每个Verticle都占用一个EventLoop线程,且只对应一个EventLoop。
● 每个Verticle中创建的HttpServer、EventBus等资源都会在回收Verticle时被同步回收。
● 在多个Verticle中创建同样端口的HttpServer,会变成两个EventLoop线程,处理同一个HttpServer的连接,可以利用Verticle的这一特性来提升并发处理性能。
Spring Boot 2响应式编程
Spring Boot 2.x在Spring Boot 1.x基础上,基于Spring 5实现了响应式编程框架。从Spring MVC注解驱动的时代开始,Spring官方有意识地去Servlet化。不过在Spring MVC时代,Spring仍然摆脱不了对 Servlet 容 器 的 依 赖 , 然 而 借 助 响 应 式 编 程 ( ReactiveProgramming)的势头,Spring加速了这一时代的到来。WebFlux将Servlet容器从必须项变为可选项,并且默认采用Netty Web Server作为HTTP容器的处理引擎,形成Spring全新的技术体系,包括数据存储等技术栈。Spring Boot 2官方提供的基于Reactor与Servlet容器生态和技术栈的对比如下图所示。
对比发现,Spring Boot 2.x与Spring Boot 1.x在技术栈上存在巨大差异。Spring Boot 2.x最显著的变化就是采用了响应式的技术体系。底层的Reactive核心组件、响应式WebFlux框架、响应式数据存储、响应式安全、响应式Web服务引擎组成了Spring响应式技术体系。
下面列举了Spring Boot 2中支持响应式编程的部分模块。
Spring Core
Spring Core 是 Spring 的 核 心 模 块 。 Spring Framework 5 基 于ProjectReactor和RxJava反应式项目及响应式编程规范实现了对响应式编程的支持。在Spring Core中通过引入ReactiveAdapter实现了Object和Publisher<T>的相互转换,代码如下:
使用者可以通过继承ReactiveAdapter实现定制化的数据类型转换 。 ReactiveAdapterRegistry 可 以 作 为 对 象 池 来 保 持ReactiveAdapter实例并提供相应的数据访问方式。
响应式I/O
Spring Core提供了对I/O的响应式编程支持。Spring Core首先引入了一个字节缓存抽象接口DataBuffer,提供了一个DataBufferUtils工具类,可以实现以Reactive方式对I/O进行访问和交互。从下面的示例代码可以看到,DataBufferUtils返回了一个Flux对象,这样就可以使用Reactor相关接口读取test.txt文件,实现背压的响应式特性。
同时,Spring Core通过下面接口实现了基于响应式流的编解码实现类,这样可以方便DataBuffer实例与对象的相互转化,代码如下:
Spring WebFlux构建响应式Web服务
在Web服务方面,Spring 2.x提供了WebFlux框架,基于Flux和Mono对象实现响应式非阻塞Web服务。同时提供了一个响应式的HTTPWebClient,它可以通过函数式的方式异步非阻塞地发起HTTP请求并处理响应。Spring WebFlux也提供了响应式的WebSocketClient。下一节我们会详细讲解Spring的WebFlux框架。
数据层支持响应式
开发基于响应式流的应用,就像搭建数据流的管道,使异步数据能够顺畅流过每个环节。大多数系统免不了要与数据库交互,所以我们也需要响应式的持久层API和支持异步的数据库驱动。在消息的处理过程中,如果数据管道在任何一个环节发生阻塞,都有可能造成整体吞吐量的下降。
各个数据库都开始陆续推出异步驱动的技术支持,目前可以支持响应式数据访问的数据库有MongoDB、Redis、Apache Cassandra和CouchDB。
相关生态的响应式支持
● Spring 5实现了对Spring Security的响应式支持。
● Spring Cloud基于WebFlux框架实现了Spring Cloud Gateway微服务网关。
● Spring Test实现了响应式的支持类WebTestClient。
● 在监控领域,Sleuth也提供对响应式WebFlux的追踪支持。
本文给大家讲解的内容是响应式微服务架构,响应式技术框架
- 觉得文章不错的朋友可以转发此文关注小编;
- 感谢大家的支持!