1 什么是响应式编程
一句话总结:响应式编程是一种编程范式,通用和专注于数据流和变化的,并且是异步的。
维基百科原文:
In computing, reactive programming is an asynchronous programming paradigm concerned with data streams and the propagation of change. This means that it becomes possible to express static (e.g. arrays) or dynamic (e.g. event emitters) data streams with ease via the employed programming language(s), and that an inferred dependency within the associated execution model exists, which facilitates the automatic propagation of the change involved with data flow.
翻译:
在计算机领域,响应式编程是一个专注于数据流和变化传递的**异步编程范式。**这意味着可以使用编程语言很容易地表示静态(例如数组)或动态(例如事件发射器)数据流,并且在关联的执行模型中,存在着可推断的依赖关系,这个关系的存在有利于自动传播与数据流有关的更改。
举例:
例如,在命令式编程环境中, a:=b+c 表示将表达式的结果赋给a ,而之后改变b 或c 的值不会影响 。但在响应式编程中,a的值会随着b或c 的更新而更新。
电子表格程序就是响应式编程的一个例子。单元格可以包含字面值或类似"=B1+C1"的公式,而包含公式的单元格的值会依据其他单元格的值的变化而变化 。
响应式编程最初是为了简化交互式用户界面的创建和实时系统动画的绘制而提出来的一种方法,但它本质上是一种通用的编程范式。
2 回顾Reactor
2.1 什么是Reactor
还是维基百科:
The reactor design pattern is an event handling pattern for handling service requests delivered concurrently to a service handler by one or more inputs. The service handler then demultiplexes the incoming requests and dispatches them synchronously to the associated request handlers.
翻译:
反应器(reactor)设计模式是一种事件处理模式,用于处理由一个或多个输入并发交付给服务处理程序的服务请求。然后,服务处理程序将传入的请求解复用,并将它们同步地分派给相关的请求处理程序。
2.2 为什么是Reactor
Reactor来源于网络IO中同步非阻塞的I/O多路复用机制的模式。
- 堵塞、非堵塞的区别是在于第一阶段,即数据准备阶段。无论是堵塞还是非堵塞,都是用应用主动找内核要数据,而read数据的过程是‘堵塞’的,直到数据读取完。
- 同步、异步的区别在于第二阶段,若由请求者主动的去获取数据,则为同步操作,需要说明的是:read/write操作也是‘堵塞’的,直到数据读取完。
若数据的read都由kernel内核完成了(在内核read数据的过程中,应用进程依旧可以执行其他的任务),这就是异步操作。
换句话说,
- BIO里用户最关心“我要读”,
- NIO里用户最关心"我可以读了",
- 在AIO模型里用户更需要关注的是“读完了”。
NIO一个重要的特点是:socket主要的读、写、注册和接收函数,在等待就绪阶段都是非阻塞的,真正的I/O操作是同步阻塞的(消耗CPU但性能非常高)。
NIO是一种同步非阻塞的I/O模型,也是I/O多路复用的基础。
Reactor模式基本结构:
- Handle:本质上表示一种资源(比如说文件描述符,或是针对网络编程中的socket描述符),是由操作系统提供的;该资源用于表示一个个的事件,事件既可以来自于外部,也可以来自于内部,Handle是事件产生的发源地。
- Synchronous Event Demultiplexer(同步事件分离器):它本身是一个系统调用,用于等待事件的发生(事件可能是一个,也可能是多个)。调用方在调用它的时候会被阻塞,一直阻塞到同步事件分离器上有事件产生为止。对于Linux来说,同步事件分离器指的就是常用的I/O多路复用机制,比如说select、poll、epoll等。在Java NIO领域中,同步事件分离器对应的组件就是Selector;对应的阻塞方法就是select方法。
- Event Handler(事件处理器):本身由多个回调方法构成,这些回调方法构成了与应用相关的对于某个事件的反馈机制。在Java NIO领域中并没有提供事件处理器机制让我们调用或去进行回调,是由我们自己编写代码完成的。Netty相比于Java NIO来说,在事件处理器这个角色上进行了一个升级,它为我们开发者提供了大量的回调方法,供我们在特定事件产生时实现相应的回调方法进行业务逻辑的处理,即,ChannelHandler。ChannelHandler中的方法对应的都是一个个事件的回调。
- Concrete Event Handler(具体事件处理器):是事件处理器的实现。它本身实现了事件处理器所提供的各种回调方法,从而实现了特定于业务的逻辑。它本质上就是我们所编写的一个个的处理器实现。
- Initiation Dispatcher(初始分发器):实际上就是Reactor角色。它本身定义了一些规范,这些规范用于控制事件的调度方式,同时又提供了应用进行事件处理器的注册、删除等设施。它本身是整个事件处理器的核心所在,Initiation Dispatcher会通过Synchronous Event Demultiplexer来等待事件的发生。一旦事件发生,Initiation Dispatcher首先会分离出每一个事件,然后调用事件处理器,最后调用相关的回调方法来处理这些事件。Netty中ChannelHandler里的一个个回调方法都是由bossGroup或workGroup中的某个EventLoop来调用的。
2.3 Reactor模式的经典实现—Netty
Netty的线程模式就是一个实现了Reactor模式的经典模式。
- 结构对应:
NioEventLoop ———— Initiation Dispatcher
Synchronous EventDemultiplexer ———— Selector
Evnet Handler ———— ChannelHandler
ConcreteEventHandler ———— 具体的ChannelHandler的实现 - 模式对应:
Netty服务端使用了“多Reactor线程模式”
mainReactor ———— bossGroup(NioEventLoopGroup) 中的某个NioEventLoop
subReactor ———— workerGroup(NioEventLoopGroup) 中的某个NioEventLoop
acceptor ———— ServerBootstrapAcceptor
ThreadPool ———— 用户自定义线程池 - 流程:
① 当服务器程序启动时,会配置ChannelPipeline,ChannelPipeline中是一个ChannelHandler链,所有的事件发生时都会触发Channelhandler中的某个方法,这个事件会在ChannelPipeline中的ChannelHandler链里传播。然后,从bossGroup事件循环池中获取一个NioEventLoop来现实服务端程序绑定本地端口的操作,将对应的ServerSocketChannel注册到该NioEventLoop中的Selector上,并注册ACCEPT事件为ServerSocketChannel所感兴趣的事件。
② NioEventLoop事件循环启动,此时开始监听客户端的连接请求。
③ 当有客户端向服务器端发起连接请求时,NioEventLoop的事件循环监听到该ACCEPT事件,Netty底层会接收这个连接,通过accept()方法得到与这个客户端的连接(SocketChannel),然后触发ChannelRead事件(即,ChannelHandler中的channelRead方法会得到回调),该事件会在ChannelPipeline中的ChannelHandler链中执行、传播。
④ ServerBootstrapAcceptor的readChannel方法会该SocketChannel(客户端的连接)注册到workerGroup(NioEventLoopGroup) 中的某个NioEventLoop的Selector上,并注册READ事件为SocketChannel所感兴趣的事件。启动SocketChannel所在NioEventLoop的事件循环,接下来就可以开始客户端和服务器端的通信了。
3 Spring5中多Reactive的支持
3.1 Spring Webflux
3.1.1 依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> </dependency> 复制代码
3.1.2 Controller代码
@RestController public class HelloController { @GetMapping("/hello") public Mono<String> hello() { return Mono.just("Hello Spring Webflux"); } } 复制代码
3.1.3 测试
C:\Users\xxxx\Desktop\sb-reactive>curl http://localhost:8080/hello Hello Spring Webflux 复制代码
3.1.4 Spring MVC和Spring WebFlux模式上的不同
3.2 Spring Data Reactive Respositories
3.2.1 依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis-reactive</artifactId> </dependency> 复制代码
3.2.2 配置
public class RedisReactiveConfig { @Bean public ReactiveRedisConnectionFactory connectionFactory() { return new LettuceConnectionFactory("127.0.0.1", 6379); } @Bean public ReactiveStringRedisTemplate reactiveStringRedisTemplate(ReactiveRedisConnectionFactory factory) { return new ReactiveStringRedisTemplate(factory); } } 复制代码
3.3.3 测试
@SpringBootTest class SbReactiveApplicationTests { @Autowired private ReactiveStringRedisTemplate reactiveRedisTemplate; @Test void contextLoads() { reactiveRedisTemplate .opsForValue().set("1", "zs") .subscribe(b -> System.out.println("success"), e -> System.out.println("error")); } } 复制代码
4 最后一问:如何理解Reactive响应式编程?
概念有很多,但是它相较我们的一般请求处理到底有什么更好的价值体现?
以下是来自小马哥的一段解释:
Reactive Programming 作为观察者模式(Observer) 的延伸,不同于传统的命令编程方式( Imperative programming)同步拉取数据的方式,如迭代器模式(Iterator) 。而是采用数据发布者同步或异步地推送到数据流(Data Streams)的方案。当该数据流(Data Steams)订阅者监听到传播变化时,立即作出响应动作。在实现层面上,Reactive Programming 可结合函数式编程简化面向对象语言语法的臃肿性,屏蔽并发实现的复杂细节,提供数据流的有序操作,从而达到提升代码的可读性,以及减少 Bugs 出现的目的。同时,Reactive Programming 结合背压(Backpressure)的技术解决发布端生成数据的速率高于订阅端消费的问题。
参考文章: