学习 RxJS 我的主要有三大步骤:
- 理解相关概念及思想
- 熟悉各种操作符
- 联想使用场景
github 地址:github.com/ReactiveX/r…
官方文档地址(建议直接看官方的):rxjs.dev/
中文社区文档地址(辅助阅读):cn.rx.js.org/manual/over…
中文文档(辅助阅读):rxjs.tech/
学习RxJS操作符和响应式编程原则: reactive.how/
RxJS 可视化理解: rxviz.com/
RxJS 简介
本篇文章是基于 V7.8.0
官方简介:JavaScript 的响应式扩展库
RxJS(Reactive Extensions for JavaScript) 是一个使用 Observables 进行响应式编程的库,可以更轻松地编写异步或基于回调的代码。
该项目是对 Reactive-Extensions/RxJS(RxJS 4) 的重写,具有更好的性能、更好的模块化、更好的可调试调用堆栈,同时保持大部分向后兼容,只有一些破坏性的变更(breaking changes)是为了减少外层的 API 。
RxJS 是一个库,它通过使用 observable 序列来编写异步和基于事件的程序。它提供了一个核心类型 Observable,附属类型 (Observer、 Schedulers、 Subjects) 和受 Array 启发的操作符 (map、filter、reduce、every, 等等),这些数组操作符可以把异步事件作为集合来处理。
Think of RxJS as Lodash for events.
RxJS 的 Logo 是鱼,是因为 RxJS 的概念和操作符可以被看作是一条鱼在游动,数据流就像是鱼在水中游动的路径。鱼的形象也代表了 RxJS 的灵活性和可扩展性,可以适应不同的应用场景和需求。此外,鱼也象征着 RxJS 的响应式编程思想,即数据流的变化会引起相应的响应和处理。
代表“流”的变量标示符,都是用 $ 符号结尾,这是 RxJS 编程中普遍使用的风格,被称为“芬兰式命名法”(Finnish Notation)。
为什么选 RxJS
随着时代的发展,技术也在不断更新换代。
面向对象式编程 ——》 函数式编程
函数式编程就是非常强调使用函数来解决问题的一种编程方式。
函数式编程对函数的使用有一些特殊的要求,这些要求包括以下几点:
- 声明式(Declarative)
- 纯函数(Pure Function)
- 数据不可变性(Immutability)
从语言角度讲,JavaScript 当然不算一个纯粹意义上的函数式编程语言,但是,JavaScript 中的函数有第一公民的身份,因为函数本身就是一个对象,可以被赋值给一个变量,可以作为参数传递,由此可以很方便地应用函数式编程的许多思想。
JavaScript 并不是纯粹的函数式编程语言,但是,通过应用一些编程规范,再借助一点工具的帮助,我们完全可以用 JavaScript 写出函数式的代码,RxJS 就是辅助我们写出函数式代码的一种工具。
指令式编程 ——》 响应式编程
RxJS 就是兼具函数式和响应式两种先进编程风格的框架。
RxJS 是一个组织异步逻辑的库,它有很多 operator,可以极大的简化异步逻辑的编写。
它是由数据源产生数据,经过一系列 operator 的处理,最后传给接收者。
但是 RxJS 的 operator 多呀,组合起来可以实现非常复杂的异步逻辑处理。
还能帮助我们解决一些问题:
- 如何控制大量代码的复杂度
- 如何保持代码可读性
- 如何处理异步操作
- 数据流抽象了很多现实问题
相关概念介绍
说明:以下部分主要出自官方文档(因为最好的概念介绍就是官方),对一些核心的概念、知识点做了一些总结(是摘录总结翻译,不是全盘抄录)。同时也是对官方文档意译文,帮助辅助中文式阅读。
Reactive Extension
Reactive Extension,也叫 ReactiveX,简称 Rx,是基于响应式的扩展,是各种语言实现的一个统称。
Rx 是一套通过可监听流来做异步编程的API。
ReactiveX 将 Observer 模式与 Iterator 模式以及函数式编程与集合相结合,以满足对管理事件序列的理想方式的需求。
Observer 模式
Iterator 模式
函数式编程
集合
RxJS 中解决异步事件管理的基本概念是:
- Observable:表示一个可调用的未来值或事件的集合的概念。
- Observer:是回调的集合,知道如何监听 Observable 传递的值。
- Subscription:表示 Observable 的执行,主要用于取消执行。
- Operators:采用函数式编程风格的纯函数,支持使用
map
、filter
、concat
、reduce
等操作处理集合。 - Subject:相当于一个 EventEmitter,是将一个值或事件多播给多个 Observers 的唯一途径。
- Schedulers:是控制并发的集中式调度程序,允许我们在计算发生时进行协调,例如
setTimeout
或requestAnimationFrame
或其它。
Observable *
被观察者,用来产生消息/数据。
Observable | Promise | |
使用场景 | 同步、异步均可使用 | 用 Promise 包裹的多数是异步场景 |
执行时机 | 声明式惰性执行,只有在订阅后才会执行 | 创建时就立即执行 |
执行次数 | 多次调用 subscribe 函数会执行多次 | 只有第一次执行,后续都是取值 |
流程控制 | 相较于 Promise 有更为全面的操作符 | 提供串行、并行的函数 |
错误处理 | subscribe 函数捕获错误 | .catch 捕获 |
Observable 是多个值的惰性推送集合。本质其实就是一个随时间不断产生数据的一个集合,称之为流更容易理解。
js
复制代码
import { Observable } from'rxjs';
const observable = newObservable((subscriber) => {
subscriber.next(1); subscriber.next(2); subscriber.next(3); setTimeout(() => { subscriber.next(4); subscriber.complete(); }, 1000);});// 订阅查看这些值的变化console.log('just before subscribe');
observable.subscribe({ next(x) { console.log('got value ' + x); }, error(err) { console.error('something wrong occurred: ' + err); }, complete() { console.log('done'); },});console.log('just after subscribe');
// 控制台打印出的订阅数据just before subscribegot value 1got value 2got value 3just after subscribegot value 4done
Pull vs Push
pull 和 push 是两种不同的协议,用来描述数据生产者 (Producer) 如何与数据消费者 (Consumer) 进行通信的。
Observables 作为函数的泛化
Observables 像是没有参数, 但可以泛化为多个值的函数。
订阅 Observable 类似于调用函数。
Observables 能够同步或异步传递值。
Observable 剖析
Observable 的核心关注点:
- 创建 Observables
- 订阅 Observables
- 执行 Observables
- 清理 Observables
创建 Observables:
js
复制代码
import { Observable } from'rxjs';
const observable = newObservable(functionsubscribe(subscriber) {
const id = setInterval(() => { subscriber.next('hi'); }, 1000);});
订阅 Observables
js
复制代码
observable.subscribe((x) =>console.log(x));// 订阅一个 Observable 就像调用一个函数,在数据将被传送到的地方提供回调。
执行 Observables
在 Observable 执行中, 可能会发送零个到无穷多个 "Next" 通知。如果发送的是 "Error" 或 "Complete" 通知的话,那么之后不会再发送任何通知了。
js
复制代码
import { Observable } from'rxjs';
const observable = newObservable(functionsubscribe(subscriber) {
subscriber.next(1); subscriber.next(2); subscriber.next(3); subscriber.complete(); subscriber.next(4); // Is not delivered because it would violate the contract});const observable2 = newObservable(functionsubscribe(subscriber) {
try { subscriber.next(1); subscriber.next(2); subscriber.next(3); subscriber.complete(); } catch (err) { subscriber.error(err); // delivers an error if it caught one }});
清理 Observables
js
复制代码
import { from } from'rxjs';
const observable = from([10, 20, 30]);
const subscription = observable.subscribe((x) =>console.log(x));
// Later:subscription.unsubscribe();// 当你订阅了 Observable,你会得到一个 Subscription ,它表示进行中的执行。// 只要调用 `unsubscribe()` 方法就可以取消执行。
js
复制代码
import { Observable } from'rxjs';
const observable = newObservable(functionsubscribe(subscriber) {
// Keep track of the interval resource const intervalId = setInterval(() => { subscriber.next('hi'); }, 1000); // Provide a way of canceling and disposing the interval resource returnfunctionunsubscribe() { clearInterval(intervalId); };});
js
复制代码
functionsubscribe(subscriber) {
const intervalId = setInterval(() => { subscriber.next('hi'); }, 1000); returnfunctionunsubscribe() { clearInterval(intervalId); };}const unsubscribe = subscribe({ next: (x) =>console.log(x) });
// Later:unsubscribe(); // dispose the resources
Observer
从行为上来看,无非就是定义了如何处理上述流产生的数据,称之为流的处理方法。
js
复制代码
const observer = {
next: x => console.log('Observer got a next value: ' + x), error: err => console.error('Observer got an error: ' + err), complete: () =>console.log('Observer got a complete notification'),};
Operators
观察者,用来消费消息/数据。
Subscription *
Subscription 是表示可清理资源的对象,它是由 Observable 执行之后产生的。
本质就是暂存了一个启动后的流,每一个启动后的流都是相互独立的,而这个启动后的流,就存储在subscription
中,提供了unsubscribe
,来停止这个流。
Subject *
Subject
是一类特殊的Observable
,它可以向多个Observer
多路推送数值。
Observable | Subject | |
角色 | 生产者(单向) | 生产者、消费者(双向) |
消费策略 | 单播 | 多播 |
流转方式 | 内部发送/接收数据 | 外部发送/接收数据 |
数据特性 | 冷数据流 | 热数据流 |
消费时机 | 调用 subscribe | 调用 next |
Scheduler
简单 demo
推荐在 stackblitz.com/ 练习。
事件监听器
js
复制代码
// 以前document.addEventListener('click', () =>console.log('我被“惦记”了!'));
// 现在import { fromEvent } from'rxjs';
fromEvent(document, 'click').subscribe(() =>console.log('我被“惦记”了!'));
Purity
使 RxJS 强大的是它使用纯函数产生值的能力。这意味着您的代码不太容易出错。
所谓纯函数,指的是满足下面两个条件的函数:
- 函数的执行过程完全由输入参数决定,不会受除参数之外的任何数据影响。
- 函数不会修改任何外部状态,比如修改全局变量或传入的参数对象。
js
复制代码
// 你的其他代码部分可能会弄乱你的状态let count = 0;
document.addEventListener('click', () =>console.log(`我被“惦记”了 ${++count} 次`));
// 使用 RxJS 可以隔离状态import { fromEvent, scan } from'rxjs';
fromEvent(document, 'click')
.pipe(scan((count) => count + 1, 0)) .subscribe((count) =>console.log(`我被“惦记”了 ${count} 次`));
// scan 运算符的工作方式与数组的 reduce 类似。// 它接受一个暴露给回调的值。回调的返回值将成为下次运行回调时下一个暴露的值。
Flow
RxJS 有一整套运算符,可以帮助您控制事件如何通过 observables 流动。
js
复制代码
// 每秒最多点击一次的方式let count = 0;
let rate = 1000;
let lastClick = Date.now() - rate;
document.addEventListener('click', () => {
if (Date.now() - lastClick >= rate) { console.log(`我被“惦记”了 ${++count} 次`); lastClick = Date.now(); }});// import { fromEvent, throttleTime, scan } from'rxjs';
fromEvent(document, 'click')
.pipe( throttleTime(1000), scan((count) => count + 1, 0) ) .subscribe((count) =>console.log(`我被“惦记”了 ${count} 次`));
Values
js
复制代码
// 在纯 JavaScript 中为每次点击添加当前鼠标 x 位置let count = 0;
const rate = 1000;
let lastClick = Date.now() - rate;
document.addEventListener('click', (event) => {
if (Date.now() - lastClick >= rate) { count += event.clientX; console.log(count); lastClick = Date.now(); }});// import { fromEvent, throttleTime, map, scan } from'rxjs';
fromEvent(document, 'click')
.pipe( throttleTime(1000), map((event) => event.clientX), scan((count, clientX) => count + clientX, 0) ) .subscribe((count) =>console.log(count));
场景介绍
都有哪些场景能用到 RxJS 呢,以下举出了一些场景例子。供大家参考,其实还有更广泛的场景,还望大家自己发觉。
异步场景
- AJAX / XHR(XMLHttpRequest) / fetch API
- Service Worker / Node Stream
- setTimeout / setInterval
- Promise
事件场景
- 各种 DOM 事件(click,dbclick,keyup、keydown...)
- css3 动画事件(transition)
- html5 Geolocation
- WebSocket / Server-Sent Events
微前端通信
js
复制代码
// 主应用 main.js 部分内容import { Subject } from'rxjs'// 按需引入减少依赖包大小const pager = newSubject()
// 在主应用注册呼机监听器,这里可以监听到其他应用的广播pager.subscribe(v => { console.log(`main主应用监听到子应用${v.from}发来消息:`, v) // store.dispatch('app/setToken', v.token) // 这里处理主应用监听到改变后的逻辑 // rxjs 事件 qkEvent.toggleSidebar(v) qkEvent.generateHeaderNavDoc(v) qkEvent.generateDropdownMenu(v)})// 结合下章主应用下发资源给子应用,将pager作为一个模块传入子应用const msg = {
// data: store.getters, // 从主应用仓库读出的数据 data: {}, // components: LibraryUi, // 从主应用读出的组件库 // utils: LibraryJs, // 从主应用读出的工具类库 // emitFnc: childEmit, // 从主应用下发emit函数来收集子应用反馈 pager // 从主应用下发应用间通信呼机}exportconst qiankunRegisterMicroApps = [
{ name: FUSANG_NAME, entry: qiankunEntryConfig(FUSANG_NAME), container: '#appContainer', activeRule: '/ops-fe', loader (loading) { }, props: { store, pager: msg, parentRoute: router } },]
js
复制代码
// 子应用// 引入主应用Subject实例if (props.pager) {
Vue.prototype.$pager = props.pager}// 使用this.$pager && this.$pager.next({
from: 'logcenter-fe', // 从哪来 子应用的名字 event: 'sendDoc', // 跳转文档 data: [{ path: 'https://xxxx.com/docs/logcenter/', target: '_blank' }] // 传什么数据})
实现一个批量请求函数 multiRequest(urls, maxNum)
要求如下:
- 要求最大并发数 maxNum
- 每当有一个请求返回,就留下一个空位,可以增加新的请求
- 所有请求完成后,结果按照 urls 里面的顺序依次打出
js
复制代码
// promisefunctionmultiRequest(urls = [], maxNum) {
// 请求总数量 const len = urls.length; // 根据请求数量创建一个数组来保存请求的结果 const result = newArray(len).fill(false); // 当前完成的数量 let count = 0; returnnewPromise((resolve, reject) => { // 请求maxNum个 while (count < maxNum) { next(); } functionnext() { let current = count++; // 处理边界条件 if (current >= len) { // 请求全部完成就将promise置为成功状态, 然后将result作为promise值返回 !result.includes(false) && resolve(result); return; } const url = urls[current]; console.log(`开始 ${current}`, newDate().toLocaleString()); fetch(url) .then((res) => { // 保存请求结果 result[current] = res; console.log(`完成 ${current}`, newDate().toLocaleString()); // 请求没有全部完成, 就递归 if (current < len) { next(); } }) .catch((err) => { console.log(`结束 ${current}`, newDate().toLocaleString()); result[current] = err; // 请求没有全部完成, 就递归 if (current < len) { next(); } }); } });}
js
复制代码
// RxJS// 假设这是你的http请求函数functionhttpGet(url) {
returnnewPromise(resolve => setTimeout(() =>resolve(`Result: ${url}`), 2000));}const array = [
'https://httpbin.org/ip', 'https://httpbin.org/user-agent', 'https://httpbin.org/delay/3',];// mergeMap 是专门用来处理并发处理的 rxjs 操作符// mergeMap 第二个参数2的意思是,from(array)每次并发量是2,只有promise执行结束才接着取array里面的数据// mergeMap第一个参数 httpGet的意思是每次并发,从from(array)中取的数据如何包装,这里是作为httpGet的参数const source = from(array)
.pipe(mergeMap(httpGet, 2)) .subscribe(val => console.log(val));
RxJS 和 Nest
nest 的 interceptor 就用了 rxjs 来处理响应,但常用的 operator 也就几个:
- tap: 不修改响应数据,执行一些额外逻辑,比如记录日志、更新缓存等
- map:对响应数据做修改,一般都是改成 {code, data, message} 的格式
- catchError:在 exception filter 之前处理抛出的异常,可以记录或者抛出别的异常
- timeout:处理响应超时的情况,抛出一个 TimeoutError,配合 catchErrror 可以返回超时的响应
使用 tap operator 来添加一些日志、缓存等逻辑:
js
复制代码
import { AppService } from'./app.service';
import { CallHandler, ExecutionContext, Injectable, Logger, NestInterceptor } from'@nestjs/common';
import { Observable, tap } from'rxjs';
@Injectable()exportclassTapTestInterceptor implements NestInterceptor {
constructor(private appService: AppService) {} private readonly logger = newLogger(TapTestInterceptor.name); intercept(context: ExecutionContext, next: CallHandler): Observable<any> { return next.handle().pipe(tap((data) => {
// 这里是更新缓存的操作,这里模拟下 this.appService.getHello(); this.logger.log(`log something`, data); })) }}
使用 map operator 来对 controller 返回的数据做一些修改:
js
复制代码
import { CallHandler, ExecutionContext, Injectable, NestInterceptor } from'@nestjs/common';
import { map, Observable } from'rxjs';
@Injectable()exportclassMapTestInterceptor implements NestInterceptor {
intercept(context: ExecutionContext, next: CallHandler): Observable<any> { return next.handle().pipe(map(data => { return { code: 200, message: 'success', data } })) }}
使用 catchError 处理抛出的异常:
js
复制代码
import { CallHandler, ExecutionContext, Injectable, Logger, NestInterceptor } from'@nestjs/common';
import { catchError, Observable, throwError } from'rxjs';
@Injectable()exportclassCatchErrorTestInterceptor implements NestInterceptor {
private readonly logger = newLogger(CatchErrorTestInterceptor.name) intercept (context: ExecutionContext, next: CallHandler): Observable<any> { return next.handle().pipe(catchError(err => { this.logger.error(err.message, err.stack) returnthrowError(() => err) })) }}
RxJS 和 Angular
RxJS 和 Vue
RxJS 和 React
RxJS 和 React 一样,实践的都是响应式编程的概念,从取名上就可以看出来。
React 版本(无 RxJS):
js
复制代码
importReact, { useState } from'react';
// 傻瓜组件,无状态组件constCounterView = ({ count, onIncrement, onDecrement }) => (
<div> <h1>Count: {count}</h1> <buttononClick={onIncrement}>+</button> <buttononClick={onDecrement}>-</button> </div>);// 聪明组件,有状态组件constCounter = () => {
const [count, setCount] = useState(0); constonIncrement = () => { setCount(count + 1); }; constonDecrement = () => { setCount(count - 1); }; return ( <CounterView count={count} onIncrement={onIncrement} onDecrement={onDecrement} /> );};exportdefaultCounter;
React + RxJS 版本:
- 把 onIncrement 和 onDecrement 的函数调用转化为数据流中的数据。(Observable)
- 把数据流中的数据改变转变为对组件状态的修改。(Observer)
在 RxJS 中 Subject 能既扮演 Observable 又扮演 Observer 的角色。
js
复制代码
importReact, { useState, useEffect } from'react';
import { Subject } from'rxjs/Subject';
import { scan } from'rxjs/operators';
constCounter = () => {
const [count, setCount] = useState(0);
// 创造了一个Subject对象 // 这个对象就是连接 RxJS 和 React 的纽带 const counter = newSubject(); useEffect(() => { constobserver = value => setCount(value);
// 利用 scan 累计 counter 中所有数据的总和 // scan 产生的 Observable 对象吐出的每个数据都通过 setCount 来修改当前组件的状态就可以 counter .pipe(scan((result, inc) => result + inc, 0)) .subscribe(observer); }, [counter]); // 从 Observable 的角度,将 counter 代表的 Observable 定位成所有加减数字的数据流 // 当需要加 1 时,往 counter 里推送一个 1;当需要减 1 时,往 counter 里推送一个 -1 // 如果有需求改变想要加减其他的数值,那也只需要往 counter 里推送对应的正数或者负数就可以了 return ( <div> <h1>{count}</h1> <buttononClick={() => counter.next(1)}>+</button> <buttononClick={() => counter.next(-1)}>-</button> </div> );};exportdefaultCounter;
Subject:
用于封装和管理状态。在使用 seed
值(第二个参数)或来自源的第一个值建立了初始状态之后,对来自源的每个值调用累加器(或“reducer 函数”)。
React 和 RxJS 高阶组件版本:
js
复制代码
importReact, { useState } from'react';
import { BehaviorSubject } from'rxjs/BehaviorSubject';
import { scan } from'rxjs/operators';
constCounterView = ({ count, onIncrement, onDecrement }) => (
<div> <h1>Count: {count}</h1> <buttononClick={onIncrement}>+</button> <buttononClick={onDecrement}>-</button> </div>);constuseCounter = () => {
const [counter] = useState(() =>newBehaviorSubject(0)); const count$ = counter.pipe( scan((result, inc) => result + inc, 0) ); constonIncrement = () => counter.next(1); constonDecrement = () => counter.next(-1); const [count, setCount] = useState(0); count$.subscribe(value => setCount(value)); return { count, onIncrement, onDecrement };};constCounter = () => {
const { count, onIncrement, onDecrement } = useCounter(); return ( <CounterView count={count} onIncrement={onIncrement} onDecrement={onDecrement} /> );};exportdefaultCounter;
BehaviorSubject:rxjs.dev/api/index/c…
Subject的一个变体,需要初始值,并在订阅时发出其当前值。
BehaviorSubject 可以指定一个“默认数据”,如果不给某个 BehaviorSubject 塞任何数据,每一个观察者在订阅 BehaviorSubject 的时候依然可以获得一个数据,这非常适合 Counter 这个应用的要求,因为计数器需要一个初始默认值为 0。
React 和 RxJS 版本的一个秒表⏱️:
js
复制代码
importReact, { useState, useEffect } from"react";
import { Subject, BehaviorSubject, interval, of, EMPTY} from"rxjs";import {
scan, switchMap, mergeMap, map, timeInterval, take,} from"rxjs/operators";import padStart from"lodash/padStart";
constms2Time = (milliseconds) => {
let ms = parseInt(milliseconds % 1000, 10); let seconds = parseInt((milliseconds / 1000) % 60, 10); let minutes = parseInt((milliseconds / (1000 * 60)) % 60, 10); let hours = parseInt(milliseconds / (1000 * 60 * 60), 10); return ( padStart(hours, 2, "0") + ":" + padStart(minutes, 2, "0") + ":" + padStart(seconds, 2, "0") + "." + padStart(ms, 3, "0") );};constStopWatchView = ({ milliseconds, onStart, onStop, onReset }) => {
return ( <div> <h1>{ms2Time(milliseconds)}</h1> <buttononClick={onStart}>开始</button> <buttononClick={onStop}>停止</button> <buttononClick={onReset}>重设</button> </div> );};constSTART = "start";
constSTOP = "stop";
constRESET = "reset";
constStopWatch = () => {
const [milliseconds, setMilliseconds] = useState(0); useEffect(() => { // button 代表秒表上按钮点击动作的数据流 const button = newSubject(); // time$代表秒表当前应该展示的时间,无论哪个按钮被点击,都会打断time$原有的产生数据方式 const time$ = button.pipe( switchMap((value) => { switch (value) { caseSTART: { // 当点击“开始”时,我们使用 interval 配合 scan 来产生累积递增的毫秒数, // 精确度是 10 毫秒而不是 1, // 是因为 JS 运行环境往往也不会达到毫秒级别的绝对精确。 returninterval(10).pipe( timeInterval(), scan((result, ti) => result + ti.interval, 0) ); } caseSTOP: returnEMPTY; caseRESET: returnof(0); default: returnthrowError("Invalid value ", value); } }) ); const stopWatch = newBehaviorSubject(0); const subscription = stopWatch .pipe( mergeMap((value) => time$), map((value) =>setMilliseconds(value)) ) .subscribe(); return() => subscription.unsubscribe(); }, []); constonStop = () => setMilliseconds(STOP); constonStart = () => setMilliseconds(START); constonReset = () => setMilliseconds(RESET); return ( <StopWatchView milliseconds={milliseconds} onStart={onStart} onStop={onStop} onReset={onReset} /> );};exportdefaultStopWatch;
三个按钮的点击操作当然可以看作数据流来看待。对于 StopWatchView,需要渲染 milliseconds 属性,而且这个 milliseconds 的序列也可以看作一个数据流,当点击开始之后,这个数据流应该是持续不断地产生新的数据;当点击停止之后,这个数据流就不应该再产生数据。
EMPTY:rxjs.dev/api/index/c…
常量,一个简单的 Observable,不向 Observer 发出任何项,并立即发出完整的通知。
RxJS 和 Redux
Redux 原版:
js
复制代码
// Store.jsimport {createStore} from'redux';
import reducer from'./Reducer.js';
const initValues = {
count: 0};const store = createStore(reducer, initValues);
exportdefault store;
Redux 和 RxJS 版本
js
复制代码
// Store.js// 不使用 Redux 的 createStore 来创造 Store 对象,// 而是使用我们自己定制的 createReactiveStoreimport createReactiveStore from'./createReactiveStore';
import reducer from'./Reducer.js';
const initValues = {
count: 0};const store = createReactiveStore(reducer, initValues);
exportdefault store;
js
复制代码
import { Subject } from'rxjs';
import { scan, startWith, tap } from'rxjs/operators';
constcreateReactiveStore = (reducer, initialState) => {
const action$ = newSubject(); let currentState = initialState; const store$ = action$.pipe( startWith(initialState), scan(reducer), tap((state) => { currentState = state; }) ); return { dispatch: (action) => { return action$.next(action); }, getState: () => currentState, subscribe: (func) => { store$.subscribe(func); }, };};exportdefault createReactiveStore;
createReactiveStore 和 createStore 基本能达到一致的效果,但是还是有一点小小的功能差异,因为 createReactiveStore 依赖于 RxJS 的数据流,而数据流如果不被订阅的话,整个管道上每个环节的操作是不会运行的。
假设,在调用 createReactiveStore 产生的 Store 对象的 subscribe 之前,先利用这个 Store 的 dispatch 函数派送了 action 对象,是不会引起数据流操作的,所以对应的 currentState 也不会发生改变,这样,当晚些时候调用 Store 的 subscribe 时候,得到的状态就不是正确的结果。
为了克服这个问题,一定要保证 createReactiveStore 产生的 Store 对象第一时间被订阅,这并不是什么困难的事情,react-redux 的 connect 函数实际上就替我们做了对 Store 的订阅。如下:
js
复制代码
importReactfrom'react';
import {connect} from'react-redux';
import * asActionsfrom'./Actions.js';
constCounterView = ({count, onIncrement, onDecrement}) => (
<div> <h1>Count: {count}</h1> <buttononClick={onIncrement}>+</button> <buttononClick={onDecrement}>-</button> </div>);functionmapStateToProps(state, ownProps) {
return { count: state.count }}functionmapDispatchToProps(dispatch, ownProps) {
return { onIncrement: () =>dispatch(Actions.increment()), onDecrement: () =>dispatch(Actions.decrement()), }}constCounter = connect(
mapStateToProps, mapDispatchToProps)(CounterView);exportdefaultCounter;
最后
其实 RxJS 最重要的部分就是操作符,但是由于操作符太多,也就不一一介绍了,可根据自己需求去官网找对应的操作符。找操作服的方法推荐去:rxjs.dev/operator-de…
RxJS 是不是前端开发的未来?还请各位小伙伴做出自己的思考!