背景
在最近的项目中,我们面临了一个需求:监听异步数据的更新,并及时通知相关的组件模块进行相应的处理。传统的事件监听和回调函数方式可能无法满足我们的需求,因此决定采用响应式编程的方法来解决这个问题。在实现过程中发现 RxJS 这个响应式编程库,可以很高效、可维护地实现数据的监听和组件通知。
响应式编程是什么?
响应式编程是一种编程范式,它通过使用数据流和变化传播的方式来处理数据和事件。在响应式编程中,我们将程序看作是一系列的数据流,并通过定义数据流之间的依赖关系和操作来实现数据的处理和变化。
响应式编程的优势包括:
1. 响应性: 响应式编程能够实时地响应数据的变化,当数据发生改变时,相关的处理逻辑会自动触发。这种特性可以减少手动处理和更新的工作量,提高系统的响应能力。
2. 异步和并发: 响应式编程非常适合处理异步和并发的情况,如网络请求等。通过使用异步数据流和事件处理机制,响应式编程可以有效地处理多个并发任务和事件,并提供简洁而可靠的编程模型。
3. 可组合性: 响应式编程鼓励将程序拆分为独立的组件,并通过将这些组件连接在一起来构建更复杂的系统。可以方便地对数据进行转换、过滤、合并等操作,从而实现复杂的数据处理需求。
4. 声明式: 响应式编程采用声明式的方式来描述数据流和操作,而不是指定具体的控制流程。这使得代码更加清晰、简洁和可读,减少了出错的可能性。
下面是一个简单的示例代码,展示了如何使用 RxJS 进行响应式编程:
// 创建一个 Observable 对象,表示一个数据流
const dataStream = new Observable(observer => {
// 模拟异步数据更新
setTimeout(() => {
observer.next('Data Updated');
}, 1000);
});
// 订阅数据流的变化
dataStream.subscribe(data => {
console.log(data);
});
在这个示例中,我们创建了一个 Observable 对象 dataStream
,表示一个数据流。通过调用 subscribe
方法,我们订阅了这个数据流的变化,并在回调函数中打印出更新后的数据。
RxJS 的核心概念
RxJS(Reactive Extensions for JavaScript)是一个流行的响应式编程库,用于处理异步数据流。以下是 RxJS 的核心概念:
- Observable(可观测对象):Observable 是 RxJS 的核心概念,代表一个可观测的数据源。它可以发出多个值,并在时间上进行推送。Observable 是持续的数据源,可以随着时间的推移不断发出新的值。
- Observer(观察者):Observer 是订阅 Observable 并处理数据流的对象。它定义了一系列的回调函数来处理 Observable 发出的不同类型的通知。Observer 的回调函数包括
next
(处理正常值)、error
(处理错误)和complete
(处理完成信号)。 - Subscription(订阅):Subscription 表示 Observable 的订阅关系。当我们订阅一个 Observable 时,会得到一个 Subscription 对象,它用于取消订阅和释放资源。通过调用 Subscription 的
unsubscribe()
方法,可以手动取消订阅并停止接收 Observable 的值。 - Operator(操作符):操作符是 RxJS 提供的函数,用于对 Observable 进行转换、过滤和组合等操作。使用操作符,我们可以对数据流进行处理和转换,使代码更简洁和可读。常见的操作符包括
map
、filter
、merge
、concat
、debounceTime
等。 - Subject(主体):Subject 是一种特殊类型的 Observable,同时充当 Observable 和 Observer 的角色。Subject 具有多播的特性,可以向多个观察者同时发送值。我们可以使用 Subject 实现事件总线、多播数据和条件订阅等功能。
- Scheduler(调度器):调度器用于控制 Observable 的执行时机和顺序。它可以指定何时以及如何执行 Observable 的订阅和发送操作。RxJS 提供了不同的调度器,如
asyncScheduler
、queueScheduler
等。
这些核心概念构成了 RxJS,为我们处理异步数据流提供了丰富的工具。以下是一个简单的 RxJS 示例,演示如何使用 Observable、操作符和观察者来处理异步数据流:
import {
from } from 'rxjs';
import {
filter, map } from 'rxjs/operators';
// 创建一个 Observable,发出一系列数字
const numbers = from([1, 2, 3, 4, 5]);
// 使用操作符对数据流进行转换和过滤
const filteredNumbers = numbers.pipe(
filter((n) => n % 2 === 0), // 过滤出偶数
map((n) => n * 2) // 将每个数字乘以 2
);
// 创建观察者来处理 Observable 的通知
const observer = {
next: (value) => console.log(value), // 处理正常值
error: (error) => console.error(error), // 处理错误
complete: () => console.log('Complete') // 处理完成信号
};
// 订阅 Observable,并传入观察者
filteredNumbers.subscribe(observer);
// 输出:
// 4
// 8
// Complete
RxJS 核心原理解析
RxJS的核心原理是一个基于可观测数据流 Stream
结合观察者模式
和迭代器模式
的一种异步编程的应用库。它提供了一组操作符(operators),用于创建和组合可观察对象(Observable),并对数据流进行转换、筛选、合并等操作。
可观测流
可观测流(Observable)是一个持续的数据源,它在时间上进行推送并可以发出多个值。与传统的数据集合不同,可观测流是一种异步的、惰性的数据流。观察者(Observer)是订阅这个可观测流的对象,用于处理流中发出的值。
所以,在 RxJS 中,可观测流与观察者之间的关系如下:
- 可观测流(Observable)是一个数据源,它发出多个值,并且可以被订阅。
- 观察者(Observer)订阅可观测流,并处理可观测流发出的值。
在实际使用中,我们可以通过调用可观测流的 subscribe()
方法来订阅它,并传入一个观察者对象,观察者对象中定义了用于处理可观测流发出的值的回调函数。当可观测流发出新的值时,观察者对象的相应回调函数将被调用。
以下是一个简单的示例代码,演示了可观测流和观察者之间的关系:
// 创建一个可观测流
const observable = new Observable((observer) => {
observer.next('Hello');
observer.next('World');
observer.complete();
});
// 创建一个观察者
const observer = {
next: (value) => console.log(value),
complete: () => console.log('Complete')
};
// 订阅可观测流
observable.subscribe(observer);
在上面的示例中,我们创建了一个可观测流 observable
,它发出了两个值("Hello" 和 "World"),并在最后调用了 complete()
方法表示流结束。然后,我们创建了一个观察者 observer
,它定义了 next
和 complete
回调函数。最后,我们通过调用 subscribe()
方法,将观察者对象传递给可观测流,实现了订阅和处理可观测流发出的值。
观察者模式
RxJS 中的Observable
和Observer
是基于观察者模式的概念。Observable
代表一个可观察的数据源,它可以发出数据流并通知订阅者。Observer
代表一个订阅者,它可以通过订阅Observable
来接收数据流。当Observable
有新的数据时,它会调用Observer
的next
方法发送数据。Observable
还可以发送错误和完成事件,分别通过error
和complete
方法通知订阅者。观察者模式的实现在RxJS中是通过Observable
类和subscribe
方法来实现的。
class Observable {
constructor(subscribe) {
if (subscribe) {
this._subscribe = subscribe;
}
}
subscribe(observer) {
return this._subscribe(observer);
}
}
const myObservable = new Observable(observer => {
let count = 1;
const intervalId = setInterval(() => {
observer.next(count++);
}, 1000);
return {
unsubscribe: () => {
clearInterval(intervalId);
}
};
});
myObservable.subscribe({
next: value => console.log(value)
});
Observer
接口代表一个订阅者,它定义了一组回调方法,用于处理从Observable
接收到的数据流。观察者需要通过订阅(subscribe
)一个Observable
来接收数据。一旦订阅成功,Observable
会调用观察者的next
方法来发送数据项,可以调用error
方法发送错误通知,或者调用complete
方法发送完成通知。
总之,RxJS的Observable
和Observer
提供了一种便捷的方式来实现观察者模式。Observable
作为生产者负责产生数据流,而Observer
作为订阅者负责处理数据流的各个事件。通过使用subscribe
方法将观察者订阅到可观察对象上,可以实现数据的推送和处理。
迭代器模式
迭代器模式在 RxJS 中被广泛应用,特别是在操作符(operators)和管道(pipes)的实现中。这些操作符和管道接收 Observable 对象并返回一个新的 Observable 对象,用于对数据流进行转换、筛选、组合等操作。
迭代器模式的核心思想是将迭代的过程从业务逻辑中分离出来,通过迭代器对象提供的方法next()
顺序访问数据结构的每个元素,而不需要暴露数据结构的内部表示。在 RxJS 中,可观测流(Observable)就是数据结构,而操作符和管道则是迭代器对象,通过调用 next()
方法顺序访问 Observable 发出的值。
迭代器对象本质上,就是一个指针对象。通过指针对象的next(), 用来移动指针。 每调用一次next()方法,都会返回一个对象,都会返回数据结构的信息。这个对象有 value 和 done 两个属性,value属性返回当前位置的成员,done属性是一个布尔值,表示遍历是否结束,即是否有必要再调用一次next()。
let arr = ['foo', 'baz'];
let iter = arr[Symbol.iterator]();
console.log(iter.next()); //{ value: 'foo', done: false }
arr.splice(1, 0, 'bar'); // 在数组中间插入值
console.log(iter.next()); //{ value: 'bar', done: false }
console.log(iter.next()); //{ value: 'baz', done: false }
console.log(iter.next()); //{ value: undefined, done: true } 结束迭代
- 每次成功调用 next() ,都会返回一个 IteratorResult 对象
- IteratorResult 对象包含两个属性:done 和 value
- done :布尔值,是否可以继续迭代
- value :值
- 迭代器会阻止垃圾回收程序回收可迭代对象
- 如果可迭代对象在迭代期间被修改了,那么迭代器也会相应的变化
在 RxJS 中,使用 pipe
方法可以将多个操作符组合起来形成一个管道,从而按顺序对 Observable 进行一系列的转换和处理操作。每个操作符都接收一个 Observable 对象,并返回一个新的 Observable 对象,该对象经过操作符处理后发出转换后的值。
以下是一个示例代码,演示了使用操作符和管道对 Observable 进行转换的过程:
import {
of } from 'rxjs';
import {
map, filter } from 'rxjs/operators';
// 创建一个 Observable 对象
const observable = of(1, 2, 3, 4, 5);
// 使用操作符和管道对 Observable 进行转换
const transformedObservable = observable.pipe(
filter((value) => value % 2 === 0),
map((value) => value * 2)
);
// 订阅转换后的 Observable
transformedObservable.subscribe((value) => console.log(value));
在上面的示例中,我们首先创建了一个 Observable 对象 observable
,它发出了一系列的值。然后,我们使用 pipe
方法和两个操作符 filter
和 map
对 Observable 进行转换。filter
操作符用于筛选出偶数,而 map
操作符将每个偶数乘以 2。最后,我们订阅了转换后的 Observable,并在回调函数中打印出每个值。
通过使用迭代器模式,RxJS 提供了一种方便且可组合的方式来处理可观测流中的数据。操作符和管道的组合可以让我们以一种简洁、可读的方式对数据流进行处理和转换,同时将业务逻辑与数据处理逻辑分离开来,提高了代码的可维护性和扩展性。
RxJS 响应式编程使用
Observable
Observable 称之为流更容易理解。他能被多个observer
(观察者)订阅,每个订阅关系相互独立、互不影响。
它具有以下关键方法和属性:
subscribe(observer)
:订阅可观察者,并返回 Subscription 对象。next(value)
:向观察者发送新的值。error(error)
:向观察者发送错误信号。complete()
:向观察者发送完成信号。
示例:
import {
Observable } from 'rxjs';
const observable = Observable.create(observer => {
observer.next('foo');
observer.next('bar');
})
Observer
Observer (观察者), 称之为流的处理方法。它具有以下关键方法:
next(value)
:处理可观察者发送的新值。error(error)
:处理可观察者发送的错误信号。complete()
:处理可观察者发送的完成信号。
我们创建一个 Observable 时。是需要传入一个函数作为参数来生成 Observable 的值,这个函数的入参就是 observe,通过在函数内部调用 next() 来执行。当 Observable 被创建时,它能被多个 observer 订阅,而且每个订阅者之间相互独立、互不影响。
但是如何订阅呢?通过 subscribe 实现。
// 创建observable
const observable = new Observable(observer => {
observer.next(1)
setTimeout(() => {
observer.next(2)
}, 1000)
})
// 订阅observable
observable.subscribe({
next: (value) => console.log(value),
error: (err) => console.log(err),
complete: () => console.log("done"),
});
// 输出:1 -> 2 -> done
rxjs的subscribe是同步执行的。
Subscription
它的本质就是暂存了一个启动后的流,之前提到,每一个启动后的流都是相互独立的,而这个启动后的流,就存储在 subscription
中。它具有以下关键方法和属性:
unsubscribe()
:取消订阅(也可以理解成停止这个流)。
我们收到信息后,如何清理订阅占用的资源,这时候就需要用到 Subscription 了。
const subscription = observable.subscribe(data => console.log(data));
subscription.unsubscribe();
Subject
Observable表示一个可观察的数据源。Observable 可以发出多个值,并且能够被观察者(Observer)订阅,以接收这些值。Observable 是惰性的,只有在被订阅时才会开始发出值。
Subject 是一种特殊类型的 Observable。与普通的 Observable 不同,Subject 具有多播(multicasting)的特性,即可以同时发送值给多个观察者。Subject 既是一个可观察的数据源,也是一个观察者。
示例:
import {
Subject } from 'rxjs';
// 创建一个Subject对象
const subject = new Subject();
// 订阅Subject
const subscription = subject.subscribe({
next: (value) => console.log('Received value:', value),
error: (error) => console.error('Error:', error),
complete: () => console.log('Complete'),
});
// 发送数据到Subject
subject.next('Hello');
subject.next('World');
// 完成Subject
subject.complete();
// 取消订阅
subscription.unsubscribe();
首先创建了一个Subject
对象。然后,我们通过subscribe
方法订阅了这个Subject
,并指定了next
、error
和complete
回调函数。这些回调函数分别用于处理Subject
发送的数据、错误和完成事件。
接下来,我们通过subject.next()
方法发送了两个值:"Hello"和"World"。这些值会被发送给所有订阅了这个Subject
的观察者。我们调用了subject.complete()
方法来表示数据的发送已经完成。这将触发complete
回调函数。
最后,我们使用subscription.unsubscribe()
方法取消了对Subject
的订阅。
总结来说,Subject
可以作为一个可观察对象和观察者,可以用来发送数据和订阅数据。
应用场景:
- 事件总线:Subject 常用于实现事件总线的功能。我们可以通过创建一个 Subject 实例,并在不同的组件中订阅这个 Subject,以实现组件之间的事件通信和数据传递。
- 条件订阅:Subject 允许在任何时候进行订阅,这使得它非常适用于条件订阅的场景。我们可以在某个条件满足时订阅 Subject,从而开始接收值。
- 多播操作:当需要将同一个值序列发送给多个观察者时,可以使用 Subject 进行多播。这在一些场景中非常有用,例如在多个订阅者之间共享一个数据源。
Observable Vs Subject
Observable | Subject | |
---|---|---|
角色 | 生产者(单向) | 生产者、消费者(双向) |
消费策略 | 单播 | 多播 |
流转方式 | 内部发送/接收数据 | 外部发送/接收数据 |
消费时机 | 调用 subscribe | 调用 next |
Subject 和 Observable 是 RxJS 中的重要概念。Observable 是单播的,每个订阅者都会独立接收值,适用于一对一的场景。而 Subject 是多播的,可以同时发送值给多个订阅者,适用于一对多的场景。根据具体的需求,选择合适的概念可以帮助我们更好地处理数据流和实现响应式编程。
其他 Subject
Subject类型 | 用法 | 区别 | 适用场景 |
---|---|---|---|
Subject | 将值或事件广播给多个观察者 | 无法回放历史数据 | - 广播值或事件给多个观察者 - 将非RxJS代码转换为响应式 |
BehaviorSubject | 将最新值发送给新的观察者 | 记住最新值 | - 初始值或当前状态的广播 - 状态管理的中心数据源 |
ReplaySubject | 向新的观察者发送历史数据 | 可以回放历史数据 | - 重新发送过去数据给新的观察者 - 缓存历史数据的场景 |
AsyncSubject | 在完成时发送最后一个值 | 只发送最后一个值 | - 只关心Subject完成后的最终结果 - 等待异步操作完成后获取结果 |
具体使用及应用场景,详见RxJS中四种Subject的用法和区别
总结
希望本文对你理解 RxJS 的工作原理以及响应式编程在实际应用中的用法有所帮助。如果您有任何疑问、建议或意见,欢迎在评论区留言,我会及时根据反馈进行修改和完善。