你会用RxJS吗?【初识 RxJS中的Observable和Observer】

简介: 概念RxJS是一个库,可以使用可观察队列来编写异步和基于事件的程序的库。RxJS 中管理和解决异步事件的几个关键点:Observable: 表示未来值或事件的可调用集合的概念。Observer: 是一个回调集合,它知道如何监听 Observable 传递的值。Subscription: 表示一个 Observable 的执行,主要用于取消执行。Operators:** 是纯函数,可以使用函数式编程风格来处理具有map、filter、concat、reduce等操作的集合。

概念
RxJS是一个库,可以使用可观察队列来编写异步和基于事件的程序的库。
RxJS 中管理和解决异步事件的几个关键点:

Observable: 表示未来值或事件的可调用集合的概念。
Observer: 是一个回调集合,它知道如何监听 Observable 传递的值。
Subscription: 表示一个 Observable 的执行,主要用于取消执行。
Operators:** 是纯函数,可以使用函数式编程风格来处理具有map、filter、concat、reduce等操作的集合。
Subject: 相当于一个EventEmitter,也是将一个值或事件多播到多个Observers的唯一方式。
Schedulers: 是控制并发的集中调度程序,允许我们在计算发生在 eg setTimeoutor requestAnimationFrame或者其它上时进行协调。

牛刀小试
我们通过在dom上绑定事件的小案例,感受一下Rxjs的魅力。

在dom绑定事件,我们通常这样处理

document.addEventListener('click', () => console.log('Clicked!'));
复制代码
用Rxjs创建一个observable,内容如下
import { fromEvent } from 'rxjs';

fromEvent(document, 'click').subscribe(() => console.log('Clicked!'));
复制代码

这时候我们简单升级一下,需要记录一下点击的数量

let count = 0;
document.addEventListener('click', () => console.log(Clicked ${++count} times));
复制代码
用Rxjs可以隔离状态,
import { fromEvent, scan } from 'rxjs';

fromEvent(document, 'click')
.pipe(scan((count) => count + 1, 0))
.subscribe((count) => console.log(Clicked ${count} times));
复制代码
可以看到,我们用到了scan操作符,该操作符的工作方式和数组的reduce类似,回调函数接收一个值, 回调的返回值作为下一次回调运行暴露的一个值。
通过上面的案例可以看出,RxJS的强大之处在于它能够使用纯函数生成值。这意味着您的代码不太容易出错。 通常你会创建一个不纯的函数,你的代码的其他部分可能会弄乱你的状态。

这时候,需求又有变动了,要求我们一秒内只能有一次点击

let count = 0;
let rate = 1000;
let lastClick = Date.now() - rate;
document.addEventListener('click', () => {
if (Date.now() - lastClick >= rate) {

console.log(`Clicked ${++count} times`);
lastClick = Date.now();

}
});
复制代码
使用Rxjs
fromEvent(document, 'click')
.pipe(

throttleTime(1000),
scan((count) => count + 1, 0)

)
.subscribe((count) => console.log(Clicked ${count} times));
复制代码
RxJS 有一系列的操作符,可以帮助你控制事件如何在你的 observables 中流动。

这时候,我们要每次累计鼠标x的值

let count = 0;
const rate = 1000;
let lastClick = Date.now() - rate;
document.addEventListener('click', (event) => {
if (Date.now() - lastClick >= rate) {

count += event.clientX;
console.log(count);
lastClick = Date.now();

}
});
复制代码
使用Rxjs
import { fromEvent, throttleTime, map, scan } from 'rxjs';

fromEvent(document, 'click')
.pipe(

throttleTime(1000),
map((event) => event.clientX),
scan((count, clientX) => count + clientX, 0)

)
.subscribe((count) => console.log(count));
复制代码
从上面看可以通过map去转换observables 的值。
Observable
我们先来写一个案例代码,大家可以猜下它的执行顺序
import { Observable } from 'rxjs';

const observable = new Observable(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
setTimeout(() => {

subscriber.next(4);
subscriber.complete();

}, 1000);
});

console.log('just before subscribe');
observable.subscribe({
next(x) { console.log('got value ' + x); },
error(err) { console.error('something wrong occurred: ' + err); },
complete() { console.log('done'); }
});
console.log('just after subscribe');
复制代码
可以稍微想一下,正确的输出结果
just before subscribe
got value 1
got value 2
got value 3
just after subscribe
got value 4
done
复制代码
怎么样,和大家想的结果一样吗,我们来一下分析一下。
Observable 剖析

Observable 有两种方式创建,一种是通过new Observable(),还有一种是通过Rx.Observable.create()的方式去创建。

Observable 核心的关注点:

创建Observable
订阅Observable
执行Observable
取消Observable

创建Observable

const observable = new Observable(function subscribe(subscriber) {
const id = setInterval(() => {

subscriber.next('hi')

}, 1000);
});
复制代码
该代码是创建一个Observable,然后每隔1s向订阅者发送消息。我们看到上边的回调函数是subscribe, 该函数是描述Observable最重要的部分。

订阅Observable

observable.subscribe(x => console.log(x));
复制代码
observable中的subscribe中参数是一个回调x => console.log(x),官方叫它Observer,其实Observer有多种形式,后边我们会说到,在这里就简单理解,Observer 可以去消费数据,比如,在react中,我们这可以更新状态数据等。

执行Observable

subscriber.next(1); // Next 通知
subscriber.complete(); // 完成 通知
subscriber.error(err); // Error 通知
复制代码
其实就是执行一个惰性计算,可同步可异步,
Observable Execution 可以传递三种类型的值:

Next:发送数值、字符串、对象等。
Error:发送 JavaScript 错误或异常。
complete:不发送值。

Next通知是最重要和最常见的类型:它们代表传递给订阅者的实际数据。在 Observable 执行期间,Error和complete通知可能只发生一次,并且只能有其中之一。

取消Observable

function subscribe(subscriber) {
const intervalId = setInterval(() => {

subscriber.next('hi');

}, 1000);

return function unsubscribe() {

clearInterval(intervalId);

};
}

const observable = new Observable(subscribe)

const unsubscribe = observable.subscribe({next: (x) => console.log(x)});

// Later:
unsubscribe(); // 取消执行
复制代码
我们有看代码,创建了一个每秒输出一个hi内容的Observable,但在我们的使用场景中,会有取消改行为,这时候就需要返回一个unsubscribe的方法,用于取消。
Observer
我们在上边的场景中也提到了Observer, 但什么是Observer呢,其实就是数据的消费者,先回顾一下上面的代码
observable.subscribe(x => console.log(x));
复制代码
其实可以写成
const observer = {
next: x => console.log('Observer got a next value: ' + x),
error: err => console.error('Observer got an error: ' + err),
complete: () => console.log('Observer got a complete notification'),
};

observable.subscribe(observer);

复制代码
这样应就比较清晰了,observer只是具有三个回调的对象,每一个用于Observable 可能传递不同类型的通知。注意,observer 对象中的类型可以不必要全都写。
其实observer有许多变种,我们看下它的TS声明就比较清楚了。

可以直接传递一个observer对象,或者只传递一个next回调函数,在或者传多个可选的回调函数类型。

相关文章
|
6月前
|
JavaScript 前端开发 调度
15_Rxjs
15_Rxjs
50 0
|
存储 缓存 JavaScript
RxJS系列03:主题 Subjects
RxJS系列03:主题 Subjects
114 1
|
JavaScript 前端开发 算法
RxJS系列06:测试 Observable
RxJS系列06:测试 Observable
106 0
|
存储 JavaScript 前端开发
RxJS系列02:可观察者 Observables
RxJS系列02:可观察者 Observables
|
前端开发 JavaScript API
RxJS系列01:响应式编程与异步
RxJS系列01:响应式编程与异步
198 0
|
前端开发 API 开发工具
MobX 源码解析-observable #86
MobX 源码解析-observable #86
115 0
探秘 RxJS Observable 为什么要长成这个样子?!
我们都知道 RxJS Observable 最基础的使用方法:是建立 Observable,即调用 .create API
|
前端开发 JavaScript API
继续解惑,异步处理 —— RxJS Observable
Observable 可观察对象是开辟一个连续的通信通道给观察者 Observer,彼此之前形成一种关系,而这种关系需要由 Subscription 来确立,而在整个通道中允许对数据进行转换我们称为操作符 Operator。
|
JavaScript 前端开发 定位技术
Observable学习笔记
Observable学习笔记
186 0
Observable学习笔记
|
前端开发 JavaScript API
Rxjs源码解析(一)Observable
学习一个库最好的方法就是看其源码,理解其 api 的调用原理,用起来自然也就很清楚自己到底在干什么了,秉持着此观念,为了更好地理解 rxjs,抽空将其源码看了一遍 本文章不会刻意涉及概念性的东西,主线就是解读源码,并在恰当的时候给出一些小例子,源码基于 rxjs v7.4.0 版本
334 0