RxJS系列02:可观察者 Observables

简介: RxJS系列02:可观察者 Observables

RxJS(Reactive Extensions for JavaScript) 是一个非常强大的 JS 库,我们可以使用它轻松编写异步代码。

在本系列文章中,我将带领你学习 RxJS 的最新版本,我们会重点关注如何使用响应式编程范式来解决你在日常工作中碰到的问题。所以这是一个偏实战的系列文章。

在本系列文章中,你将学会 RxJS 中的核心组件是如何使用和运作的。

通过学习这个系列文章,你将亲自使用 RxJS 完成一个完整的项目开发,在这个项目中,你将了解如何处理 DOM 事件、如何构建响应式本地数据库等内容。


数据源和 Observables


通过上一篇文章的学习,我们了解了响应式编程和异步编程相关的概念。接下来我们要了解响应式编程中的基本组件,包括数据源和 Observable(可观察者)。


数据源


数据源可以定义为随着时间推移产生数据或者简单存储数据的一个角色。这个角色可以是很多,比如:

  1. 鼠标或键盘等输入设备,可以由人随着时间不停的触发各种事件和产生各种值。
  2. HTTP 调用。
  3. 包含数组的组件。

虽然它们完全不同,但是通过响应式编程的概念,我们可以以相同的方式处理任何数据源。而使数据从时间点 A 到时间点 B 的动作,我们也可以称为 stream(流)。而承载流的容器,我们叫做数据管道(pipeline)。


生产者-消费者模型


这个范例基于生产者-消费者模型,生产者可以将数据发送到数据管道,也就是 stream。数据可以在 stream 中进行转换,最终到达一个或者多个消费者手中。为了能够让消费者从生产者手中接收数据,消费者必须通知生产者它对生产者产生的数据感兴趣。这种方式我们称之为 subscribe(订阅)。

下面这张图可以很好的解释这个流程:

image.png

RxJS 中的生产者是一个 Observable,它是一个随着时间推移生成数据的组件,无论数据是从键盘鼠标等输入设备中产生的,还是存储在数组中的数据。

RxJS 中的消费者会以 Subscribe 的方式侦听生产者的数据管道。

我们可以在数据管道中可以做非常多的事情。


数据转换


当数据流过管道的时候,可以在数据到达消费者之前对它做多种转换。这些转换包括过滤元素、映射元素、忽略特定元、甚至延迟元素的传递等。

我们可以利用这些灵活的操作做任何事情。

下面这张图解释了数据转换的过程:

image.png


创建 Observables


在学习 RxJS 之前,我们先来看看 Observable 内部的工作原理。下面这段代码将一个数组包装在一个 Observable 中,通过迭代数组来同步的产生值。


import { Observable } from 'rxjs'
const wrapArrayIntoObservable = arr => {
  return new Observable(subscriber => {
    for(let item of arr) {
      subscriber.next(item)
    }
  })
}
const data = [1, 2, 3, 4, 5]
const observable = wrapArrayIntoObservable(data)
observable.subscribe(val => console.log('订阅者1 :', val))
observable.subscribe(val => console.log('订阅者2 :', val))

注意传递给 Observable 构造函数的函数,这个函数会对每个订阅过这个 Observable 的订阅者运行。

除了这种方式创建 Observable,RxJS 还提供了很多种更简单的方法,比如 of、from、interval 等。

下面我们使用 from 来创建同步数据源。


在 RxJS 中创建数据管道


现在我们来创建数据管道定并实现使用这个数据的多个订阅者。


import { from } from 'rxjs'
import { tap, filter, map } from 'rxjs/operators'
const arrayDataObservable$ = from([1, 2, 3, 4, 5]);
const dataPipeline = arrayDataObservable$.pipe(
    tap(val => console.log('当前通过流进行传递的值是: ', val)),
    filter(val => val > 2),
    map(val => val * 2)
)
const subscribeToBaseObservable = subscriberName => {
    return arrayDataObservable$.subscribe(val => {
        console.log(subscriberName + ' 接收到的值是: ' + val);
    })
}
const subscribeToDataPipeline = subscriberName => {
    return dataPipeline.subscribe(val => {
        console.log(subscriberName + ' 接收到的值是: ' + val);
    })
}
const handleSubscriptionToBaseObservable = () => {
    const subscription1 = subscribeToBaseObservable('订阅者1');
    const subscription2 = subscribeToBaseObservable('订阅者2');
}
const handleSubscriptionToDataPipeline = () => {
    const subscription1 = subscribeToDataPipeline('订阅者1');
    const subscription2 = subscribeToDataPipeline('订阅者2');
}
// 1. 执行第一个方法
// handleSubscriptionToBaseObservable();
// 2. 执行下一个方法
handleSubscriptionToDataPipeline();

我通过这个示例来解释两件事:

  1. 订阅原始的 Observable,不会有任何数据转换。数据会原封不动的到达订阅者。
  2. 订阅数据管道,数据管道会将数据转换为订阅者感兴趣的内容。

你可以尝试执行第一个函数,你会注意到它们的不同之处。每个对原始 obervable 感兴趣的订阅者都可以订阅它并接受它随时间产生的所有数据,另一方面,其他订阅着可能只对原始数据中大于 2 的值感兴趣。与此同时,控制台将会通过 tap 操作符来输出正在发生一些数据转换操作。

当两个订阅者都订阅数据管道时,会产生以下输出:

makefile

复制代码

当前通过流进行传递的值是:  1当前通过流进行传递的值是:  2当前通过流进行传递的值是:  3订阅者1 接收到的值是: 6当前通过流进行传递的值是:  4订阅者1 接收到的值是: 8当前通过流进行传递的值是:  5订阅者1 接收到的值是: 10当前通过流进行传递的值是:  1当前通过流进行传递的值是:  2当前通过流进行传递的值是:  3订阅者2 接收到的值是: 6当前通过流进行传递的值是:  4订阅者2 接收到的值是: 8当前通过流进行传递的值是:  5订阅者2 接收到的值是: 10

通过输出我们可以总结出以下几点规律:

  1. 只有当有感兴趣的订阅者订阅它时,observable 才会开始产生值。
  2. 每个新的订阅者都会获得从 observable 发出的全部数据。
  3. 数据会通过所有的管道,直到最终到达订阅者。
  4. 订阅者 2 仅在订阅者 1 从管道中检索并处理了值之后才开始检索这些值。

你可能会觉得第 4 点有些奇怪,但是由于原始数据源是同步的,所以才会发生这种情况。

现在我们清楚了:Observable 是以一种惰性的方式工作的。


同步数据源与异步数据源


在处理响应式的数据流时,区分同步数据源和异步数据源是非常重要的。

一个非常常见的误解是,很多人认为响应式数据流都是异步的。


同步数据源


每个响应式的库都会提供从不同类型的数据创建流的方式。我们来举个例子:将一个简单的数组包装在一个 observable 中:


import { from } from 'rxjs'
const source = from([1, 2, 3, 4])
source.subscribe(console.log)

这个例子中,当我们开始订阅 observable 时,数组的值会被一一发出,直到数组中没有任何值。

observable 中可以包装任何数据,包括字符串、数组、Map 和 Set 等。

在每个数据项发送到订阅的回调函数中时,每个后续的数据项都必须等待当前的数据项处理完成。这就是同步数据源的工作方式。


异步数据源


在使用异步数据源时,情况会有些不同。

下面的代码演示了如何创建一个两秒后才会被处理的异步事件。只有在 Promise resolve 后才会调用订阅者的回调。


import { from } from 'rxjs'
const successPromise = new Promise((resolve, reject) => {
  setTimeout(() => {
    resolve('success')
  }, 2000)
})
const source = from(successPromise)
source.subscribe(console.log)
console.log('等待 promise 完成')

这意味着订阅的回调函数会异步执行,只有当数据可用时才会执行回调,而程序的其他代码是正常执行的。

我们还可以处理不可预测的 DOM 事件,例如鼠标和键盘事件,方法是将这些数据发射器包装到 observable 中,并订阅它。这些 DOM 事件就是一种异步数据源。

下面是监听 mousemove 事件的事件发射器代码:


const eventSource = fromEvent(document, 'mousemove')
eventSource.subscribe(console.log)


将同步数据源转换为异步数据源


现在我们可以很轻松的区分数据源的性质。但是同时出现了一个问题:我们能否将同步数据源转换为异步数据源呢?当然可以。我们把创建数据管道那一部分的例子进行改造,让它变为异步数据源:


import { from } from 'rxjs'
import { tap, filter, map } from 'rxjs/operators'
const getAsynchronousObservable = arr => {
    return new Observable(subscriber => {
        subscriber.next(arr[0])
        for(let item of arr.slice(1)) {
            setTimeout(() => subscriber.next(item), 100);
        }
    })
}
const arrayDataObservable$ = getAsynchronousObservable([1, 2, 3, 4, 5]);
const dataPipeline = arrayDataObservable$.pipe(
    tap(val => console.log('当前通过流进行传递的值是: ' + val)),
    filter(val => val > 2),
    map(val => val * 2)
)
const subscribeToBaseObservable = subscriberName => {
    return arrayDataObservable$.subscribe(val => {
        console.log(subscriberName + ' 接收到的值是: ' + val);
    })
}
const subscribeToDataPipeline = subscriberName => {
    return dataPipeline.subscribe(val => {
        console.log(subscriberName + ' 接收到的值是: ' + val);
    })
}
const handleSubscriptionToBaseObservable = () => {
    const subscription1 = subscribeToBaseObservable('订阅者1');
    const subscription2 = subscribeToBaseObservable('订阅者2');
}
const handleSubscriptionToDataPipeline = () => {
    const subscription1 = subscribeToDataPipeline('订阅者1');
    const subscription2 = subscribeToDataPipeline('订阅者2');
}
// 1. 执行第一个方法
// handleSubscriptionToBaseObservable();
// 2. 执行下一个方法
handleSubscriptionToDataPipeline();

为了更好的演示包装数据数组的 observable 的异步特性,数组中的第一个数据项是立即发出的,也就是同步的方式。而数组中其他的数据项将在 100 毫秒后发出。


处理 Observables


在之前的文章中,我们看到了 Observable 的强大之处,但是我们还需要学习关于它的更多内容。

想象一下,如果我们为一个响应时间非常久或者压根没有响应的 HTTP 请求创建了一个 Promise,同时我们导航到另一个页面。这种情况下,内存会发生什么?你可能会认为 JavaScript 的垃圾回收机制会处理掉它,通常情况下是这样的。但是如果你是使用单页应用程序开发的 App,情况就不一样了。


取消订阅 Observable


这是 Observable 的一个特性。我们可以在接受完成后处理掉它们。这也就意味着 Observable 允许我们随时取消它们的执行上下文。在上述的场景中,我们应该处理始终无法完成的 HTTP 请求呢?答案是 unsubscribe API。调用这个方法后,observable 中的资源都会被释放。

为了了解 unsubscribe 背后的工作原理,我们分析一下上一篇文章中给出的示例并对其进行一些修改。

observable 的数据每 500 毫秒产生相同的值,为了模拟长时间运行的场景,我们用 setInterval 函数来模拟实现。另外需要注意 Observable 返回的那个函数,该函数将会使用 clearInterval 在每个事件间隔停止执行回调函数。


import { Observable } from 'rxjs'
const getAsynchronousObservable = () => {
    return new Observable(subscriber => {
        intervalId = setInterval(() =>  subscriber.next('异步的值'), 500)
        return () => clearInterval(intervalId)
    })
}
const arrayDataObservable$ = getAsynchronousObservable()
const subscribeToBaseObservable = subscriberName => {
    return arrayDataObservable$.subscribe(val => {
        console.log(subscriberName + ' 接收到的值是: ' + val)
    })
}
const subscriber = subscribeToBaseObservable("订阅者1");
var UNSUBSCRIBE_TIMEOUT_VALUE = 1600
setTimeout(() => {
    subscriber.unsubscribe()
}, UNSUBSCRIBE_TIMEOUT_VALUE)

Observable 返回的函数实际上是释放它持有的所有资源并终止任何数据项继续发送给订阅者的函数。一旦调用 subscribe 方法,这个函数将会被执行。它可以在任意时刻被执行。

上面的示例中展示了如何在一些事件发生之后取消这个 HTTP 请求,比如在页面跳转时。

你可以尝试改变上面的代码中的 UNSUBSCRIBE_TIMEOUT_VALUE 变量来观察程序是如何变化的。这个变量可以控制程序在多少毫秒后取消订阅 observable。

下面这张图可以很好的描述背后发生的事情。

image.png

现在我们已经学习了响应式编程世界中的基本功能。



相关文章
|
7月前
|
JavaScript 前端开发 调度
15_Rxjs
15_Rxjs
53 0
|
JavaScript 前端开发 算法
RxJS系列06:测试 Observable
RxJS系列06:测试 Observable
113 0
探秘 RxJS Observable 为什么要长成这个样子?!
我们都知道 RxJS Observable 最基础的使用方法:是建立 Observable,即调用 .create API
|
前端开发 JavaScript API
继续解惑,异步处理 —— RxJS Observable
Observable 可观察对象是开辟一个连续的通信通道给观察者 Observer,彼此之前形成一种关系,而这种关系需要由 Subscription 来确立,而在整个通道中允许对数据进行转换我们称为操作符 Operator。
|
开发者
观察者(observer)
观察者(observer)
142 0
观察者(observer)
|
设计模式 Java 测试技术
浅析Java设计模式【3.3】——观察者
Java常用设计模式,观察者模式
97 0
浅析Java设计模式【3.3】——观察者
|
JavaScript 前端开发 调度
你会用RxJS吗?【初识 RxJS中的Observable和Observer】
概念 RxJS是一个库,可以使用可观察队列来编写异步和基于事件的程序的库。 RxJS 中管理和解决异步事件的几个关键点: Observable: 表示未来值或事件的可调用集合的概念。 Observer: 是一个回调集合,它知道如何监听 Observable 传递的值。 Subscription: 表示一个 Observable 的执行,主要用于取消执行。 Operators:** 是纯函数,可以使用函数式编程风格来处理具有map、filter、concat、reduce等操作的集合。
151 0
|
前端开发 JavaScript API
Rxjs源码解析(一)Observable
学习一个库最好的方法就是看其源码,理解其 api 的调用原理,用起来自然也就很清楚自己到底在干什么了,秉持着此观念,为了更好地理解 rxjs,抽空将其源码看了一遍 本文章不会刻意涉及概念性的东西,主线就是解读源码,并在恰当的时候给出一些小例子,源码基于 rxjs v7.4.0 版本
337 0
|
设计模式 大数据
行为型-Observer
行为型设计模式主要解决的就是“类或对象之间的交互”问题。 原理及应用场景剖析 观察者模式(Observer Design Pattern)也被称为发布订阅模式(Publish-Subscribe Design Pattern)。在 GoF 的《设计模式》一书中,它的定义是这样的: Define a one-to-many dependency between objects so that when one object changes state, all its dependents are notified and updated automatically. 翻译成中文就是:在对象之间
119 0
|
JSON JavaScript 前端开发
一起来看 rxjs
## 更新日志 - 2018-05-26 校正 - 2016-12-03 第一版翻译 ## 过去你错过的 Reactive Programming 的简介 你好奇于这名为Reactive Programming(反应式编程)的新事物, 更确切地说,你想了解它各种不同的实现(比如 [Rx*], [Bacon.js], RAC 以及其它各种各样的框架或库) 学习它比较困难, 因为
1168 0
下一篇
DataWorks