RxJS(Reactive Extensions for JavaScript) 是一个非常强大的 JS 库,我们可以使用它轻松编写异步代码。
在本系列文章中,我将带领你学习 RxJS 的最新版本,我们会重点关注如何使用响应式编程范式来解决你在日常工作中碰到的问题。所以这是一个偏实战的系列文章。
在本系列文章中,你将学会 RxJS 中的核心组件是如何使用和运作的。
通过学习这个系列文章,你将亲自使用 RxJS 完成一个完整的项目开发,在这个项目中,你将了解如何处理 DOM 事件、如何构建响应式本地数据库等内容。
主题的概念
每个订阅者通过 subscribe 函数来订阅 observable 的数据流,obervable 的值对每个订阅者而言都是独立执行的,这种值传递模式被称为单播(unicast)。
除了这种情况外,我们可能还需要一种不同的方式,在每个订阅者之间共享 Observable 的同一个执行上下文。这种值传递模式被称为多播(multicast)。
Subjects 是 RxJS 中一种特殊类型的 Observable,它支持多播。
普通主题 VoidSubject
主题是 RxJS 中的一种特殊结构,它可以同时充当 Observable 和订阅者两种角色。
- 我们可以订阅一个主题,就像订阅一个 Observable 一样。
- 我们可以通过调用 next 方法将值推向主题,就像 Observable 中的 subscribe 方法一样。
每个对数据感兴趣的订阅者都可以通过简单的订阅来接收主题产生的值,而不需要担心 Observable 上下文是一个新的还是每个订阅者共享的。
随着时间的推移,每个数据生产者都可以向主题推送值,然后这些值将会被多播到每一个订阅的订阅者身上。
主题维护者一个订阅者列表,传入的值会在同一个执行上下文中多播到每个订阅者。
现在,你可能会问了,我们既然可以轻松的使用 Observable,为什么还需要主题呢?
想象一下,我们的 App 中有多个服务会随着时间的推移产生数据。这些服务如果将数据放入主题中,然后主题将数据多播到每个订阅者。这种模式会让业务逻辑更加清晰,所需要的代码也会更少。
下面这张图描述了整个工作流程。
现在我们为上面的图来编写具体代码。
import { Subject } from "rxjs" const dataSubject = new Subject(); const produceData = val => { dataSubject.next(val); } const producers = { producer1: () => produceData(1), producer2: () => produceData(2), producer3: () => produceData(3) } const consumers = { consumer1: () => dataSubject.subscribe(val => { console.log('订阅者1 接收到的值: ' + val); }), consumer2: () => dataSubject.subscribe(val => { console.log('订阅者2 接收到的值: ' + val); }), consumer3: () => dataSubject.subscribe(val => { console.log('订阅者3 接收到的值: ' + val); }) } consumers.consumer1(); consumers.consumer2(); consumers.consumer3(); producers.producer1(); producers.producer2(); producers.producer3();
在这个实现中,每个消费者都会订阅主题。每个生产者会向主题推送一个值。注意每个订阅者是如何接收每个生产者发布的值的。这就是使用主题带来的一个优势,和直接订阅 observable 相比,这样的代码看上去会简单很多。
主题的另外一个关键特性,是允许生产者在注册任何订阅者之前将值推送到主题。你可以将第 27-29 行代码和第 31-33 行代码颠倒一下位置。这时控制台将不会再有输出。
这是因为主题的工作原理是“随时间变化来产生值”。这就意味着当一个值被推送到主题时,所有注册的订阅者都可以接受到值,但是在值推送到主题之后再注册的订阅者将无法接收到注册之前的值。如果我们想要每个新加入的订阅者也可以接受到 Observable 所有的值该怎么办?这就需要使用 RxJS 中提供的多种类型的主题了。
主题的类型
RxJS 为不同的情况提供了其他三种不同的主题,分别是:
- 行为主题
- 重播主题
- 异步主题
接下来我们要探索每一种类型的主题,以便于在后续的项目中灵活使用它们。
行为主题 BehaviorSubject
行为主题是一种特殊类型的主题,它可以缓冲最新发出的那个值并将该值推送到每个新订阅它的订阅者。需要注意的是,行为主题必须有一个默认值作为缓存值。
假设有一个聊天室的场景,每个新加入的用户都会收到默认的欢迎消息。现在我们想要新加入的用户可以看到一条之前发送的消息。该如何使用 RxJS 来实现呢?
下面是使用行为主题实现的代码:
import { BehaviorSubject } from "rxjs" const defaultWelcomeMessage = "欢迎来到聊天室"; const chatRoomSubject = new BehaviorSubject(defaultWelcomeMessage); const sendMessage = (userFrom, userMessage) => { chatRoomSubject.next(userFrom + ":" + userMessage); } const alice = chatRoomSubject.subscribe(val => { console.log('----Alice 的房间----'); console.log(val); console.log('------------------'); }) const bob = chatRoomSubject.subscribe(val => { console.log('----Bob 的房间----'); console.log(val); console.log('------------------'); }) setTimeout(() => sendMessage('alice', '你好,我是 Alice'), 2000); setTimeout(() => sendMessage('bob', '大家好,我是 Bob'), 300); setTimeout(() => { const eve = chatRoomSubject.subscribe(val => { console.log('----Eve 的房间----'); console.log(val); console.log('------------------'); }) }, 3000);
我们来分析上面的代码都做了些什么。
Alice 最先加入聊天室,收到了“欢迎来到聊天室”的消息。
紧接着 Bob 也加入聊天室,因为没有任何人发消息,所以 Bob 也会收到“欢迎来到聊天室”。
300 毫秒后,Bob 发送了“大家好,我是 Bob”,Alice 和 Bob 都收到了这条消息。缓存值被更新为“大家好,我是 Bob”。
2000 毫秒后,Alice 发送了“你好,我是 Alice”,Alice 和 Bob 都会收到这条消息。缓存值被更新为“你好,我是 Alice”
3000 毫秒后,Eve 加入聊天室,他收到了缓存消息“你好,我是 Alice”。
这就是行为主题在聊天室场景下的作用,可以存储之前交换过的消息,让后面新加入的用户看到大家的最后一条聊天记录。
重播主题 ReplaySubject
顾名思义,这种类型的主题可以将一些之前产生的值发送给新的订阅者。但是重播主题不能将过去所有的值全部缓存,我们需要给它设置一个缓存的长度。重播主题会在内部维护一个缓冲区,当新值推送到主题时,不断更新缓冲区的内容。
继续拿上面聊天室的例子举例,我们的需求变为新用户加入可以看到 10 条历史聊天记录。这时可以利用重播主题来实现这个功能。
import { ReplaySubject } from "rxjs" const chatRoomSubject = new ReplaySubject(10); const sendMessage = (userFrom, userMessage) => { chatRoomSubject.next(userFrom + ":" + userMessage); } const alice = chatRoomSubject.subscribe(val => { console.log('----Alice 的房间----'); console.log(val); console.log('------------------'); }) const bob = chatRoomSubject.subscribe(val => { console.log('----Bob 的房间----'); console.log(val); console.log('------------------'); }) setTimeout(() => sendMessage('alice', '你好,我是 Alice'), 2000); setTimeout(() => sendMessage('bob', '大家好,我是 Bob'), 300); setTimeout(() => sendMessage('mike', '你好 Alice,你在做什么呢?'), 2500); setTimeout(() => { const eve = chatRoomSubject.subscribe(val => { console.log('----Eve 的房间----'); console.log(val); console.log('------------------'); }) }, 3000);
你应该能够发现,行为主题有点像只能缓存一个值的重播主题。
异步主题 AsyncSubject
异步主题的工作原理很简单,只有在异步主题执行完成时,每个订阅者才能接收到最后一个值。
下面是一个简单的例子:
import { AsyncSubject } from "rxjs" const asyncSubject = new AsyncSubject(); asyncSubject.subscribe(val => { console.log("接收到的值: " + val) }) asyncSubject.next(1); asyncSubject.next(2); console.log('已将 2 个值发送到主题') asyncSubject.complete();
在示例中,有两个值发送到了异步主题,但是订阅者并没有第一时间获取到这两个值,而是在异步主题执行结束后,订阅者才收到最后一个推送的值。
下图是异步主题的工作流程:
无论异步主题有多少个订阅者,只有在主题完成,也就是调用了 complete 方法后,每个订阅者才可以收到最后一个值。