RxJS(Reactive Extensions for JavaScript) 是一个非常强大的 JS 库,我们可以使用它轻松编写异步代码。
在本系列文章中,我将带领你学习 RxJS 的最新版本,我们会重点关注如何使用响应式编程范式来解决你在日常工作中碰到的问题。所以这是一个偏实战的系列文章。
在本系列文章中,你将学会 RxJS 中的核心组件是如何使用和运作的。
通过学习这个系列文章,你将亲自使用 RxJS 完成一个完整的项目开发,在这个项目中,你将了解如何处理 DOM 事件、如何构建响应式本地数据库等内容。
数据源和 Observables
通过上一篇文章的学习,我们了解了响应式编程和异步编程相关的概念。接下来我们要了解响应式编程中的基本组件,包括数据源和 Observable(可观察者)。
数据源
数据源可以定义为随着时间推移产生数据或者简单存储数据的一个角色。这个角色可以是很多,比如:
- 鼠标或键盘等输入设备,可以由人随着时间不停的触发各种事件和产生各种值。
- HTTP 调用。
- 包含数组的组件。
虽然它们完全不同,但是通过响应式编程的概念,我们可以以相同的方式处理任何数据源。而使数据从时间点 A 到时间点 B 的动作,我们也可以称为 stream(流)。而承载流的容器,我们叫做数据管道(pipeline)。
生产者-消费者模型
这个范例基于生产者-消费者模型,生产者可以将数据发送到数据管道,也就是 stream。数据可以在 stream 中进行转换,最终到达一个或者多个消费者手中。为了能够让消费者从生产者手中接收数据,消费者必须通知生产者它对生产者产生的数据感兴趣。这种方式我们称之为 subscribe(订阅)。
下面这张图可以很好的解释这个流程:
RxJS 中的生产者是一个 Observable,它是一个随着时间推移生成数据的组件,无论数据是从键盘鼠标等输入设备中产生的,还是存储在数组中的数据。
RxJS 中的消费者会以 Subscribe 的方式侦听生产者的数据管道。
我们可以在数据管道中可以做非常多的事情。
数据转换
当数据流过管道的时候,可以在数据到达消费者之前对它做多种转换。这些转换包括过滤元素、映射元素、忽略特定元、甚至延迟元素的传递等。
我们可以利用这些灵活的操作做任何事情。
下面这张图解释了数据转换的过程:
创建 Observables
在学习 RxJS 之前,我们先来看看 Observable 内部的工作原理。下面这段代码将一个数组包装在一个 Observable 中,通过迭代数组来同步的产生值。
import { Observable } from 'rxjs' const wrapArrayIntoObservable = arr => { return new Observable(subscriber => { for(let item of arr) { subscriber.next(item) } }) } const data = [1, 2, 3, 4, 5] const observable = wrapArrayIntoObservable(data) observable.subscribe(val => console.log('订阅者1 :', val)) observable.subscribe(val => console.log('订阅者2 :', val))
注意传递给 Observable 构造函数的函数,这个函数会对每个订阅过这个 Observable 的订阅者运行。
除了这种方式创建 Observable,RxJS 还提供了很多种更简单的方法,比如 of、from、interval 等。
下面我们使用 from 来创建同步数据源。
在 RxJS 中创建数据管道
现在我们来创建数据管道定并实现使用这个数据的多个订阅者。
import { from } from 'rxjs' import { tap, filter, map } from 'rxjs/operators' const arrayDataObservable$ = from([1, 2, 3, 4, 5]); const dataPipeline = arrayDataObservable$.pipe( tap(val => console.log('当前通过流进行传递的值是: ', val)), filter(val => val > 2), map(val => val * 2) ) const subscribeToBaseObservable = subscriberName => { return arrayDataObservable$.subscribe(val => { console.log(subscriberName + ' 接收到的值是: ' + val); }) } const subscribeToDataPipeline = subscriberName => { return dataPipeline.subscribe(val => { console.log(subscriberName + ' 接收到的值是: ' + val); }) } const handleSubscriptionToBaseObservable = () => { const subscription1 = subscribeToBaseObservable('订阅者1'); const subscription2 = subscribeToBaseObservable('订阅者2'); } const handleSubscriptionToDataPipeline = () => { const subscription1 = subscribeToDataPipeline('订阅者1'); const subscription2 = subscribeToDataPipeline('订阅者2'); } // 1. 执行第一个方法 // handleSubscriptionToBaseObservable(); // 2. 执行下一个方法 handleSubscriptionToDataPipeline();
我通过这个示例来解释两件事:
- 订阅原始的 Observable,不会有任何数据转换。数据会原封不动的到达订阅者。
- 订阅数据管道,数据管道会将数据转换为订阅者感兴趣的内容。
你可以尝试执行第一个函数,你会注意到它们的不同之处。每个对原始 obervable 感兴趣的订阅者都可以订阅它并接受它随时间产生的所有数据,另一方面,其他订阅着可能只对原始数据中大于 2 的值感兴趣。与此同时,控制台将会通过 tap 操作符来输出正在发生一些数据转换操作。
当两个订阅者都订阅数据管道时,会产生以下输出:
makefile
复制代码
当前通过流进行传递的值是: 1当前通过流进行传递的值是: 2当前通过流进行传递的值是: 3订阅者1 接收到的值是: 6当前通过流进行传递的值是: 4订阅者1 接收到的值是: 8当前通过流进行传递的值是: 5订阅者1 接收到的值是: 10当前通过流进行传递的值是: 1当前通过流进行传递的值是: 2当前通过流进行传递的值是: 3订阅者2 接收到的值是: 6当前通过流进行传递的值是: 4订阅者2 接收到的值是: 8当前通过流进行传递的值是: 5订阅者2 接收到的值是: 10
通过输出我们可以总结出以下几点规律:
- 只有当有感兴趣的订阅者订阅它时,observable 才会开始产生值。
- 每个新的订阅者都会获得从 observable 发出的全部数据。
- 数据会通过所有的管道,直到最终到达订阅者。
- 订阅者 2 仅在订阅者 1 从管道中检索并处理了值之后才开始检索这些值。
你可能会觉得第 4 点有些奇怪,但是由于原始数据源是同步的,所以才会发生这种情况。
现在我们清楚了:Observable 是以一种惰性的方式工作的。
同步数据源与异步数据源
在处理响应式的数据流时,区分同步数据源和异步数据源是非常重要的。
一个非常常见的误解是,很多人认为响应式数据流都是异步的。
同步数据源
每个响应式的库都会提供从不同类型的数据创建流的方式。我们来举个例子:将一个简单的数组包装在一个 observable 中:
import { from } from 'rxjs' const source = from([1, 2, 3, 4]) source.subscribe(console.log)
这个例子中,当我们开始订阅 observable 时,数组的值会被一一发出,直到数组中没有任何值。
observable 中可以包装任何数据,包括字符串、数组、Map 和 Set 等。
在每个数据项发送到订阅的回调函数中时,每个后续的数据项都必须等待当前的数据项处理完成。这就是同步数据源的工作方式。
异步数据源
在使用异步数据源时,情况会有些不同。
下面的代码演示了如何创建一个两秒后才会被处理的异步事件。只有在 Promise resolve 后才会调用订阅者的回调。
import { from } from 'rxjs' const successPromise = new Promise((resolve, reject) => { setTimeout(() => { resolve('success') }, 2000) }) const source = from(successPromise) source.subscribe(console.log) console.log('等待 promise 完成')
这意味着订阅的回调函数会异步执行,只有当数据可用时才会执行回调,而程序的其他代码是正常执行的。
我们还可以处理不可预测的 DOM 事件,例如鼠标和键盘事件,方法是将这些数据发射器包装到 observable 中,并订阅它。这些 DOM 事件就是一种异步数据源。
下面是监听 mousemove 事件的事件发射器代码:
const eventSource = fromEvent(document, 'mousemove') eventSource.subscribe(console.log)
将同步数据源转换为异步数据源
现在我们可以很轻松的区分数据源的性质。但是同时出现了一个问题:我们能否将同步数据源转换为异步数据源呢?当然可以。我们把创建数据管道那一部分的例子进行改造,让它变为异步数据源:
import { from } from 'rxjs' import { tap, filter, map } from 'rxjs/operators' const getAsynchronousObservable = arr => { return new Observable(subscriber => { subscriber.next(arr[0]) for(let item of arr.slice(1)) { setTimeout(() => subscriber.next(item), 100); } }) } const arrayDataObservable$ = getAsynchronousObservable([1, 2, 3, 4, 5]); const dataPipeline = arrayDataObservable$.pipe( tap(val => console.log('当前通过流进行传递的值是: ' + val)), filter(val => val > 2), map(val => val * 2) ) const subscribeToBaseObservable = subscriberName => { return arrayDataObservable$.subscribe(val => { console.log(subscriberName + ' 接收到的值是: ' + val); }) } const subscribeToDataPipeline = subscriberName => { return dataPipeline.subscribe(val => { console.log(subscriberName + ' 接收到的值是: ' + val); }) } const handleSubscriptionToBaseObservable = () => { const subscription1 = subscribeToBaseObservable('订阅者1'); const subscription2 = subscribeToBaseObservable('订阅者2'); } const handleSubscriptionToDataPipeline = () => { const subscription1 = subscribeToDataPipeline('订阅者1'); const subscription2 = subscribeToDataPipeline('订阅者2'); } // 1. 执行第一个方法 // handleSubscriptionToBaseObservable(); // 2. 执行下一个方法 handleSubscriptionToDataPipeline();
为了更好的演示包装数据数组的 observable 的异步特性,数组中的第一个数据项是立即发出的,也就是同步的方式。而数组中其他的数据项将在 100 毫秒后发出。
处理 Observables
在之前的文章中,我们看到了 Observable 的强大之处,但是我们还需要学习关于它的更多内容。
想象一下,如果我们为一个响应时间非常久或者压根没有响应的 HTTP 请求创建了一个 Promise,同时我们导航到另一个页面。这种情况下,内存会发生什么?你可能会认为 JavaScript 的垃圾回收机制会处理掉它,通常情况下是这样的。但是如果你是使用单页应用程序开发的 App,情况就不一样了。
取消订阅 Observable
这是 Observable 的一个特性。我们可以在接受完成后处理掉它们。这也就意味着 Observable 允许我们随时取消它们的执行上下文。在上述的场景中,我们应该处理始终无法完成的 HTTP 请求呢?答案是 unsubscribe API。调用这个方法后,observable 中的资源都会被释放。
为了了解 unsubscribe 背后的工作原理,我们分析一下上一篇文章中给出的示例并对其进行一些修改。
observable 的数据每 500 毫秒产生相同的值,为了模拟长时间运行的场景,我们用 setInterval 函数来模拟实现。另外需要注意 Observable 返回的那个函数,该函数将会使用 clearInterval 在每个事件间隔停止执行回调函数。
import { Observable } from 'rxjs' const getAsynchronousObservable = () => { return new Observable(subscriber => { intervalId = setInterval(() => subscriber.next('异步的值'), 500) return () => clearInterval(intervalId) }) } const arrayDataObservable$ = getAsynchronousObservable() const subscribeToBaseObservable = subscriberName => { return arrayDataObservable$.subscribe(val => { console.log(subscriberName + ' 接收到的值是: ' + val) }) } const subscriber = subscribeToBaseObservable("订阅者1"); var UNSUBSCRIBE_TIMEOUT_VALUE = 1600 setTimeout(() => { subscriber.unsubscribe() }, UNSUBSCRIBE_TIMEOUT_VALUE)
Observable 返回的函数实际上是释放它持有的所有资源并终止任何数据项继续发送给订阅者的函数。一旦调用 subscribe 方法,这个函数将会被执行。它可以在任意时刻被执行。
上面的示例中展示了如何在一些事件发生之后取消这个 HTTP 请求,比如在页面跳转时。
你可以尝试改变上面的代码中的 UNSUBSCRIBE_TIMEOUT_VALUE 变量来观察程序是如何变化的。这个变量可以控制程序在多少毫秒后取消订阅 observable。
下面这张图可以很好的描述背后发生的事情。
现在我们已经学习了响应式编程世界中的基本功能。