深入浅出 RxJS 核心原理(响应式编程篇)

简介: 在最近的项目中,我们面临了一个需求:监听异步数据的更新,并及时通知相关的组件模块进行相应的处理。传统的事件监听和回调函数方式可能无法满足我们的需求,因此决定采用响应式编程的方法来解决这个问题。在实现过程中发现 RxJS 这个响应式编程库,可以很高效、可维护地实现数据的监听和组件通知。

48db98f2fbb11c51aba35cb560b126f6.jpeg

背景

在最近的项目中,我们面临了一个需求:监听异步数据的更新,并及时通知相关的组件模块进行相应的处理。传统的事件监听和回调函数方式可能无法满足我们的需求,因此决定采用响应式编程的方法来解决这个问题。在实现过程中发现 RxJS 这个响应式编程库,可以很高效、可维护地实现数据的监听和组件通知。



响应式编程是什么?

响应式编程是一种编程范式,它通过使用数据流和变化传播的方式来处理数据和事件。在响应式编程中,我们将程序看作是一系列的数据流,并通过定义数据流之间的依赖关系和操作来实现数据的处理和变化。

响应式编程的优势包括:

1. 响应性: 响应式编程能够实时地响应数据的变化,当数据发生改变时,相关的处理逻辑会自动触发。这种特性可以减少手动处理和更新的工作量,提高系统的响应能力。

2. 异步和并发: 响应式编程非常适合处理异步和并发的情况,如网络请求等。通过使用异步数据流和事件处理机制,响应式编程可以有效地处理多个并发任务和事件,并提供简洁而可靠的编程模型。

3. 可组合性: 响应式编程鼓励将程序拆分为独立的组件,并通过将这些组件连接在一起来构建更复杂的系统。可以方便地对数据进行转换、过滤、合并等操作,从而实现复杂的数据处理需求。

4. 声明式: 响应式编程采用声明式的方式来描述数据流和操作,而不是指定具体的控制流程。这使得代码更加清晰、简洁和可读,减少了出错的可能性。

下面是一个简单的示例代码,展示了如何使用 RxJS 进行响应式编程:

// 创建一个 Observable 对象,表示一个数据流
const dataStream = new Observable(observer => {
   
   
  // 模拟异步数据更新
  setTimeout(() => {
   
   
    observer.next('Data Updated');
  }, 1000);
});

// 订阅数据流的变化
dataStream.subscribe(data => {
   
   
  console.log(data);
});

在这个示例中,我们创建了一个 Observable 对象 dataStream,表示一个数据流。通过调用 subscribe 方法,我们订阅了这个数据流的变化,并在回调函数中打印出更新后的数据。

RxJS 的核心概念

RxJS(Reactive Extensions for JavaScript)是一个流行的响应式编程库,用于处理异步数据流。以下是 RxJS 的核心概念:

  1. Observable(可观测对象):Observable 是 RxJS 的核心概念,代表一个可观测的数据源。它可以发出多个值,并在时间上进行推送。Observable 是持续的数据源,可以随着时间的推移不断发出新的值。
  2. Observer(观察者):Observer 是订阅 Observable 并处理数据流的对象。它定义了一系列的回调函数来处理 Observable 发出的不同类型的通知。Observer 的回调函数包括 next(处理正常值)、error(处理错误)和 complete(处理完成信号)。
  3. Subscription(订阅):Subscription 表示 Observable 的订阅关系。当我们订阅一个 Observable 时,会得到一个 Subscription 对象,它用于取消订阅和释放资源。通过调用 Subscription 的 unsubscribe() 方法,可以手动取消订阅并停止接收 Observable 的值。
  4. Operator(操作符):操作符是 RxJS 提供的函数,用于对 Observable 进行转换、过滤和组合等操作。使用操作符,我们可以对数据流进行处理和转换,使代码更简洁和可读。常见的操作符包括 mapfiltermergeconcatdebounceTime 等。
  5. Subject(主体):Subject 是一种特殊类型的 Observable,同时充当 Observable 和 Observer 的角色。Subject 具有多播的特性,可以向多个观察者同时发送值。我们可以使用 Subject 实现事件总线、多播数据和条件订阅等功能。
  6. Scheduler(调度器):调度器用于控制 Observable 的执行时机和顺序。它可以指定何时以及如何执行 Observable 的订阅和发送操作。RxJS 提供了不同的调度器,如 asyncSchedulerqueueScheduler 等。

这些核心概念构成了 RxJS,为我们处理异步数据流提供了丰富的工具。以下是一个简单的 RxJS 示例,演示如何使用 Observable、操作符和观察者来处理异步数据流:

import {
   
    from } from 'rxjs';
import {
   
    filter, map } from 'rxjs/operators';

// 创建一个 Observable,发出一系列数字
const numbers = from([1, 2, 3, 4, 5]);

// 使用操作符对数据流进行转换和过滤
const filteredNumbers = numbers.pipe(
  filter((n) => n % 2 === 0), // 过滤出偶数
  map((n) => n * 2) // 将每个数字乘以 2
);

// 创建观察者来处理 Observable 的通知
const observer = {
   
   
  next: (value) => console.log(value), // 处理正常值
  error: (error) => console.error(error), // 处理错误
  complete: () => console.log('Complete') // 处理完成信号
};

// 订阅 Observable,并传入观察者
filteredNumbers.subscribe(observer);

// 输出:
// 4
// 8
// Complete

RxJS 核心原理解析

RxJS的核心原理是一个基于可观测数据流 Stream 结合观察者模式迭代器模式的一种异步编程的应用库。它提供了一组操作符(operators),用于创建和组合可观察对象(Observable),并对数据流进行转换、筛选、合并等操作。

可观测流

可观测流(Observable)是一个持续的数据源,它在时间上进行推送并可以发出多个值。与传统的数据集合不同,可观测流是一种异步的、惰性的数据流。观察者(Observer)是订阅这个可观测流的对象,用于处理流中发出的值。

所以,在 RxJS 中,可观测流与观察者之间的关系如下:

  • 可观测流(Observable)是一个数据源,它发出多个值,并且可以被订阅。
  • 观察者(Observer)订阅可观测流,并处理可观测流发出的值。

在实际使用中,我们可以通过调用可观测流的 subscribe() 方法来订阅它,并传入一个观察者对象,观察者对象中定义了用于处理可观测流发出的值的回调函数。当可观测流发出新的值时,观察者对象的相应回调函数将被调用。

以下是一个简单的示例代码,演示了可观测流和观察者之间的关系:

// 创建一个可观测流
const observable = new Observable((observer) => {
   
   
  observer.next('Hello');
  observer.next('World');
  observer.complete();
});

// 创建一个观察者
const observer = {
   
   
  next: (value) => console.log(value),
  complete: () => console.log('Complete')
};

// 订阅可观测流
observable.subscribe(observer);

在上面的示例中,我们创建了一个可观测流 observable,它发出了两个值("Hello" 和 "World"),并在最后调用了 complete() 方法表示流结束。然后,我们创建了一个观察者 observer,它定义了 nextcomplete 回调函数。最后,我们通过调用 subscribe() 方法,将观察者对象传递给可观测流,实现了订阅和处理可观测流发出的值。

steam.webp

观察者模式

RxJS 中的ObservableObserver是基于观察者模式的概念。Observable代表一个可观察的数据源,它可以发出数据流并通知订阅者。Observer代表一个订阅者,它可以通过订阅Observable来接收数据流。当Observable有新的数据时,它会调用Observernext方法发送数据。Observable还可以发送错误和完成事件,分别通过errorcomplete方法通知订阅者。观察者模式的实现在RxJS中是通过Observable类和subscribe方法来实现的。

class Observable {
   
   
  constructor(subscribe) {
   
   
    if (subscribe) {
   
   
        this._subscribe = subscribe;
    }
  }

  subscribe(observer) {
   
   
    return this._subscribe(observer);
  }
}

const myObservable = new Observable(observer => {
   
   
  let count = 1;
  const intervalId = setInterval(() => {
   
   
    observer.next(count++);
  }, 1000);

  return {
   
   
    unsubscribe: () => {
   
   
      clearInterval(intervalId);
    }
  };
});

myObservable.subscribe({
   
   
  next: value => console.log(value)
});

Observer接口代表一个订阅者,它定义了一组回调方法,用于处理从Observable接收到的数据流。观察者需要通过订阅(subscribe)一个Observable来接收数据。一旦订阅成功,Observable会调用观察者的next方法来发送数据项,可以调用error方法发送错误通知,或者调用complete方法发送完成通知。

总之,RxJS的ObservableObserver提供了一种便捷的方式来实现观察者模式。Observable作为生产者负责产生数据流,而Observer作为订阅者负责处理数据流的各个事件。通过使用subscribe方法将观察者订阅到可观察对象上,可以实现数据的推送和处理。

迭代器模式

迭代器模式在 RxJS 中被广泛应用,特别是在操作符(operators)和管道(pipes)的实现中。这些操作符和管道接收 Observable 对象并返回一个新的 Observable 对象,用于对数据流进行转换、筛选、组合等操作。

迭代器模式的核心思想是将迭代的过程从业务逻辑中分离出来,通过迭代器对象提供的方法next()顺序访问数据结构的每个元素,而不需要暴露数据结构的内部表示。在 RxJS 中,可观测流(Observable)就是数据结构,而操作符和管道则是迭代器对象,通过调用 next() 方法顺序访问 Observable 发出的值。

迭代器对象本质上,就是一个指针对象。通过指针对象的next(), 用来移动指针。 每调用一次next()方法,都会返回一个对象,都会返回数据结构的信息。这个对象有 value 和 done 两个属性,value属性返回当前位置的成员,done属性是一个布尔值,表示遍历是否结束,即是否有必要再调用一次next()。

let arr = ['foo', 'baz'];
let iter = arr[Symbol.iterator]();
console.log(iter.next()); //{ value: 'foo', done: false }
arr.splice(1, 0, 'bar'); // 在数组中间插入值
console.log(iter.next()); //{ value: 'bar', done: false }
console.log(iter.next()); //{ value: 'baz', done: false }
console.log(iter.next()); //{ value: undefined, done: true } 结束迭代
  • 每次成功调用 next() ,都会返回一个 IteratorResult 对象
  • IteratorResult 对象包含两个属性:done 和 value
    • done :布尔值,是否可以继续迭代
    • value :值
  • 迭代器会阻止垃圾回收程序回收可迭代对象
  • 如果可迭代对象在迭代期间被修改了,那么迭代器也会相应的变化

在 RxJS 中,使用 pipe 方法可以将多个操作符组合起来形成一个管道,从而按顺序对 Observable 进行一系列的转换和处理操作。每个操作符都接收一个 Observable 对象,并返回一个新的 Observable 对象,该对象经过操作符处理后发出转换后的值。

以下是一个示例代码,演示了使用操作符和管道对 Observable 进行转换的过程:

import {
   
    of } from 'rxjs';
import {
   
    map, filter } from 'rxjs/operators';

// 创建一个 Observable 对象
const observable = of(1, 2, 3, 4, 5);

// 使用操作符和管道对 Observable 进行转换
const transformedObservable = observable.pipe(
  filter((value) => value % 2 === 0),
  map((value) => value * 2)
);

// 订阅转换后的 Observable
transformedObservable.subscribe((value) => console.log(value));

在上面的示例中,我们首先创建了一个 Observable 对象 observable,它发出了一系列的值。然后,我们使用 pipe 方法和两个操作符 filtermap 对 Observable 进行转换。filter 操作符用于筛选出偶数,而 map 操作符将每个偶数乘以 2。最后,我们订阅了转换后的 Observable,并在回调函数中打印出每个值。

通过使用迭代器模式,RxJS 提供了一种方便且可组合的方式来处理可观测流中的数据。操作符和管道的组合可以让我们以一种简洁、可读的方式对数据流进行处理和转换,同时将业务逻辑与数据处理逻辑分离开来,提高了代码的可维护性和扩展性。

RxJS 响应式编程使用

v2-6abe4b3332a30e7d57d8a83a6bc123b1\_r.jpeg

Observable

Observable 称之为流更容易理解。他能被多个observer(观察者)订阅,每个订阅关系相互独立、互不影响。

它具有以下关键方法和属性:

  • subscribe(observer):订阅可观察者,并返回 Subscription 对象。
  • next(value):向观察者发送新的值。
  • error(error):向观察者发送错误信号。
  • complete():向观察者发送完成信号。

示例:

import {
   
    Observable } from 'rxjs';

const observable = Observable.create(observer => {
   
   
  observer.next('foo');
  observer.next('bar');
})

Observer

Observer (观察者), 称之为流的处理方法。它具有以下关键方法:

  • next(value):处理可观察者发送的新值。
  • error(error):处理可观察者发送的错误信号。
  • complete():处理可观察者发送的完成信号。

我们创建一个 Observable 时。是需要传入一个函数作为参数来生成 Observable 的值,这个函数的入参就是 observe,通过在函数内部调用 next() 来执行。当 Observable 被创建时,它能被多个 observer 订阅,而且每个订阅者之间相互独立、互不影响。

但是如何订阅呢?通过 subscribe 实现。

// 创建observable
const observable = new Observable(observer => {
   
   
    observer.next(1)
    setTimeout(() => {
   
   
        observer.next(2)
    }, 1000)
})

// 订阅observable
observable.subscribe({
   
   
  next: (value) => console.log(value),
  error: (err) => console.log(err),
  complete: () => console.log("done"),
});

// 输出:1 -> 2 -> done

rxjs的subscribe是同步执行的。

Subscription

它的本质就是暂存了一个启动后的流,之前提到,每一个启动后的流都是相互独立的,而这个启动后的流,就存储在 subscription 中。它具有以下关键方法和属性:

  • unsubscribe():取消订阅(也可以理解成停止这个流)。

我们收到信息后,如何清理订阅占用的资源,这时候就需要用到 Subscription 了。

const subscription = observable.subscribe(data => console.log(data));
subscription.unsubscribe();

Subject

Observable表示一个可观察的数据源。Observable 可以发出多个值,并且能够被观察者(Observer)订阅,以接收这些值。Observable 是惰性的,只有在被订阅时才会开始发出值。

Subject 是一种特殊类型的 Observable。与普通的 Observable 不同,Subject 具有多播(multicasting)的特性,即可以同时发送值给多个观察者。Subject 既是一个可观察的数据源,也是一个观察者。



示例:

import {
   
    Subject } from 'rxjs';

// 创建一个Subject对象
const subject = new Subject();

// 订阅Subject
const subscription = subject.subscribe({
   
   
  next: (value) => console.log('Received value:', value),
  error: (error) => console.error('Error:', error),
  complete: () => console.log('Complete'),
});

// 发送数据到Subject
subject.next('Hello');
subject.next('World');

// 完成Subject
subject.complete();

// 取消订阅
subscription.unsubscribe();

首先创建了一个Subject对象。然后,我们通过subscribe方法订阅了这个Subject,并指定了nexterrorcomplete回调函数。这些回调函数分别用于处理Subject发送的数据、错误和完成事件。

接下来,我们通过subject.next()方法发送了两个值:"Hello"和"World"。这些值会被发送给所有订阅了这个Subject的观察者。我们调用了subject.complete()方法来表示数据的发送已经完成。这将触发complete回调函数。

最后,我们使用subscription.unsubscribe()方法取消了对Subject的订阅。

总结来说,Subject可以作为一个可观察对象和观察者,可以用来发送数据和订阅数据。

应用场景:

  1. 事件总线:Subject 常用于实现事件总线的功能。我们可以通过创建一个 Subject 实例,并在不同的组件中订阅这个 Subject,以实现组件之间的事件通信和数据传递。
  2. 条件订阅:Subject 允许在任何时候进行订阅,这使得它非常适用于条件订阅的场景。我们可以在某个条件满足时订阅 Subject,从而开始接收值。
  3. 多播操作:当需要将同一个值序列发送给多个观察者时,可以使用 Subject 进行多播。这在一些场景中非常有用,例如在多个订阅者之间共享一个数据源。

Observable Vs Subject

Observable Subject
角色 生产者(单向) 生产者、消费者(双向)
消费策略 单播 多播
流转方式 内部发送/接收数据 外部发送/接收数据
消费时机 调用 subscribe 调用 next

Subject 和 Observable 是 RxJS 中的重要概念。Observable 是单播的,每个订阅者都会独立接收值,适用于一对一的场景。而 Subject 是多播的,可以同时发送值给多个订阅者,适用于一对多的场景。根据具体的需求,选择合适的概念可以帮助我们更好地处理数据流和实现响应式编程。

其他 Subject

Subject类型 用法 区别 适用场景
Subject 将值或事件广播给多个观察者 无法回放历史数据 - 广播值或事件给多个观察者
- 将非RxJS代码转换为响应式
BehaviorSubject 将最新值发送给新的观察者 记住最新值 - 初始值或当前状态的广播
- 状态管理的中心数据源
ReplaySubject 向新的观察者发送历史数据 可以回放历史数据 - 重新发送过去数据给新的观察者
- 缓存历史数据的场景
AsyncSubject 在完成时发送最后一个值 只发送最后一个值 - 只关心Subject完成后的最终结果
- 等待异步操作完成后获取结果

具体使用及应用场景,详见RxJS中四种Subject的用法和区别

总结

希望本文对你理解 RxJS 的工作原理以及响应式编程在实际应用中的用法有所帮助。如果您有任何疑问、建议或意见,欢迎在评论区留言,我会及时根据反馈进行修改和完善。

目录
相关文章
|
6月前
|
存储 前端开发 JavaScript
第六章(原理篇) 微前端间的通信机制
第六章(原理篇) 微前端间的通信机制
159 0
编程问题之响应式编程使用了哪些技术
编程问题之响应式编程使用了哪些技术
|
存储 缓存 JavaScript
深入浅出 RxJS 核心原理(响应式编程篇)
在最近的项目中,我们面临了一个需求:监听异步数据的更新,并及时通知相关的组件模块进行相应的处理。传统的事件监听和回调函数方式可能无法满足我们的需求,因此决定采用响应式编程的方法来解决这个问题。在实现过程中发现 RxJS 这个响应式编程库,可以很高效、可维护地实现数据的监听和组件通知。
151 0
深入浅出 RxJS 核心原理(响应式编程篇)
|
JavaScript 前端开发 中间件
redux的实现原理是什么,核心代码?
redux的实现原理是什么,核心代码?
|
消息中间件 前端开发 JavaScript
图解 Google V8 # 18 :异步编程(一):V8是如何实现微任务的?
图解 Google V8 # 18 :异步编程(一):V8是如何实现微任务的?
458 0
图解 Google V8 # 18 :异步编程(一):V8是如何实现微任务的?
|
JavaScript 前端开发 中间件
Redux 原理探秘
Redux 是一个非常不错的状态管理库,和 Vuex 不同的是 Redux 并不和 React 强绑定,你甚至可以在 Vue 中使用 Redux。当初的目标是创建一个状态管理库,来提供最简化 API。
134 0
|
存储 前端开发 Dubbo
响应式编程的实践
响应式编程的实践
响应式编程的实践
|
Scala 开发工具 git
剖析响应式编程的本质
剖析响应式编程的本质
剖析响应式编程的本质
|
监控 安全 Java
【精通函数式编程】(十一) CompletableFuture、反应式编程源码解析与实战
Future异步编程和CompletableFuture 接口都可以实现异步编程,我们通过源码深入理解其原理和设计的思想,Java9中提供了反应式编程(Flow API)我们分析其源码并提供一个响应式查询实战。