RxJS 是前端开发的未来吗?

简介: RxJS 是前端开发的未来吗?

学习 RxJS 我的主要有三大步骤:

  1. 理解相关概念及思想
  2. 熟悉各种操作符
  3. 联想使用场景

github 地址:github.com/ReactiveX/r…

官方文档地址(建议直接看官方的):rxjs.dev/

中文社区文档地址(辅助阅读):cn.rx.js.org/manual/over…

中文文档(辅助阅读):rxjs.tech/

学习RxJS操作符和响应式编程原则: reactive.how/

RxJS 可视化理解: rxviz.com/

RxJS 简介

本篇文章是基于 V7.8.0

官方简介:JavaScript 的响应式扩展库

RxJS(Reactive Extensions for JavaScript) 是一个使用 Observables 进行响应式编程的库,可以更轻松地编写异步或基于回调的代码。

该项目是对 Reactive-Extensions/RxJS(RxJS 4) 的重写,具有更好的性能、更好的模块化、更好的可调试调用堆栈,同时保持大部分向后兼容,只有一些破坏性的变更(breaking changes)是为了减少外层的 API 。

RxJS 是一个库,它通过使用 observable 序列来编写异步和基于事件的程序。它提供了一个核心类型 Observable附属类型 (Observer、 Schedulers、 Subjects) 和受 Array 启发的操作符 (map、filter、reduce、every, 等等),这些数组操作符可以把异步事件作为集合来处理。

Think of RxJS as Lodash for events.

RxJS 的 Logo 是鱼,是因为 RxJS 的概念和操作符可以被看作是一条鱼在游动,数据流就像是鱼在水中游动的路径。鱼的形象也代表了 RxJS 的灵活性和可扩展性,可以适应不同的应用场景和需求。此外,鱼也象征着 RxJS 的响应式编程思想,即数据流的变化会引起相应的响应和处理。

代表“流”的变量标示符,都是用 $ 符号结尾,这是 RxJS 编程中普遍使用的风格,被称为“芬兰式命名法”(Finnish Notation)。

为什么选 RxJS

随着时代的发展,技术也在不断更新换代。

面向对象式编程 ——》 函数式编程

函数式编程就是非常强调使用函数来解决问题的一种编程方式。

函数式编程对函数的使用有一些特殊的要求,这些要求包括以下几点:

  • 声明式(Declarative)
  • 纯函数(Pure Function)
  • 数据不可变性(Immutability)

从语言角度讲,JavaScript 当然不算一个纯粹意义上的函数式编程语言,但是,JavaScript 中的函数有第一公民的身份,因为函数本身就是一个对象,可以被赋值给一个变量,可以作为参数传递,由此可以很方便地应用函数式编程的许多思想。

JavaScript 并不是纯粹的函数式编程语言,但是,通过应用一些编程规范,再借助一点工具的帮助,我们完全可以用 JavaScript 写出函数式的代码,RxJS 就是辅助我们写出函数式代码的一种工具。

指令式编程 ——》 响应式编程

RxJS 就是兼具函数式和响应式两种先进编程风格的框架。

RxJS 是一个组织异步逻辑的库,它有很多 operator,可以极大的简化异步逻辑的编写。

它是由数据源产生数据,经过一系列 operator 的处理,最后传给接收者。

但是 RxJS 的 operator 多呀,组合起来可以实现非常复杂的异步逻辑处理。

还能帮助我们解决一些问题:

  • 如何控制大量代码的复杂度
  • 如何保持代码可读性
  • 如何处理异步操作
  • 数据流抽象了很多现实问题

相关概念介绍

说明:以下部分主要出自官方文档(因为最好的概念介绍就是官方),对一些核心的概念、知识点做了一些总结(是摘录总结翻译,不是全盘抄录)。同时也是对官方文档意译文,帮助辅助中文式阅读。

Reactive Extension

Reactive Extension,也叫 ReactiveX,简称 Rx,是基于响应式的扩展,是各种语言实现的一个统称。

Rx 是一套通过可监听流来做异步编程的API。

ReactiveXObserver 模式Iterator 模式以及函数式编程集合相结合,以满足对管理事件序列的理想方式的需求。

Observer 模式

Iterator 模式

函数式编程

集合

RxJS 中解决异步事件管理的基本概念是:

  • Observable:表示一个可调用的未来值或事件的集合的概念。
  • Observer:是回调的集合,知道如何监听 Observable 传递的值。
  • Subscription:表示 Observable 的执行,主要用于取消执行。
  • Operators:采用函数式编程风格的纯函数,支持使用 mapfilterconcatreduce 等操作处理集合。
  • Subject:相当于一个 EventEmitter,是将一个值或事件多播给多个 Observers 的唯一途径。
  • Schedulers:是控制并发的集中式调度程序,允许我们在计算发生时进行协调,例如 setTimeoutrequestAnimationFrame 或其它。

Observable *

被观察者,用来产生消息/数据。

Observable Promise
使用场景 同步、异步均可使用 用 Promise 包裹的多数是异步场景
执行时机 声明式惰性执行,只有在订阅后才会执行 创建时就立即执行
执行次数 多次调用 subscribe 函数会执行多次 只有第一次执行,后续都是取值
流程控制 相较于 Promise 有更为全面的操作符 提供串行、并行的函数
错误处理 subscribe 函数捕获错误 .catch 捕获

Observable 是多个值的惰性推送集合。本质其实就是一个随时间不断产生数据的一个集合,称之为流更容易理解。

js

复制代码

import { Observable } from'rxjs';

const observable = newObservable((subscriber) => {
  subscriber.next(1);  subscriber.next(2);  subscriber.next(3);  setTimeout(() => {    subscriber.next(4);    subscriber.complete();  }, 1000);});// 订阅查看这些值的变化console.log('just before subscribe');
observable.subscribe({  next(x) {    console.log('got value ' + x);  },  error(err) {    console.error('something wrong occurred: ' + err);  },  complete() {    console.log('done');  },});console.log('just after subscribe');

// 控制台打印出的订阅数据just before subscribegot value 1got value 2got value 3just after subscribegot value 4done

Pull vs Push

pullpush 是两种不同的协议,用来描述数据生产者 (Producer) 如何与数据消费者 (Consumer) 进行通信的。

Observables 作为函数的泛化

Observables 像是没有参数, 但可以泛化为多个值的函数。

订阅 Observable 类似于调用函数。

Observables 能够同步或异步传递值。

Observable 剖析

Observable 的核心关注点:

  • 创建 Observables
  • 订阅 Observables
  • 执行 Observables
  • 清理 Observables

创建 Observables:

js

复制代码

import { Observable } from'rxjs';

const observable = newObservable(functionsubscribe(subscriber) {
  const id = setInterval(() => {    subscriber.next('hi');  }, 1000);});

订阅 Observables

js

复制代码

observable.subscribe((x) =>console.log(x));// 订阅一个 Observable 就像调用一个函数,在数据将被传送到的地方提供回调。

执行 Observables

在 Observable 执行中, 可能会发送零个到无穷多个 "Next" 通知。如果发送的是 "Error" 或 "Complete" 通知的话,那么之后不会再发送任何通知了。

js

复制代码

import { Observable } from'rxjs';

const observable = newObservable(functionsubscribe(subscriber) {
  subscriber.next(1);  subscriber.next(2);  subscriber.next(3);  subscriber.complete();  subscriber.next(4); // Is not delivered because it would violate the contract});const observable2 = newObservable(functionsubscribe(subscriber) {
  try {    subscriber.next(1);    subscriber.next(2);    subscriber.next(3);    subscriber.complete();  } catch (err) {    subscriber.error(err); // delivers an error if it caught one  }});

清理 Observables

js

复制代码

import { from } from'rxjs';

const observable = from([10, 20, 30]);
const subscription = observable.subscribe((x) =>console.log(x));
// Later:subscription.unsubscribe();// 当你订阅了 Observable,你会得到一个 Subscription ,它表示进行中的执行。// 只要调用 `unsubscribe()` 方法就可以取消执行。

js

复制代码

import { Observable } from'rxjs';

const observable = newObservable(functionsubscribe(subscriber) {
  // Keep track of the interval resource  const intervalId = setInterval(() => {    subscriber.next('hi');  }, 1000);  // Provide a way of canceling and disposing the interval resource  returnfunctionunsubscribe() {    clearInterval(intervalId);  };});

js

复制代码

functionsubscribe(subscriber) {
  const intervalId = setInterval(() => {    subscriber.next('hi');  }, 1000);  returnfunctionunsubscribe() {    clearInterval(intervalId);  };}const unsubscribe = subscribe({ next: (x) =>console.log(x) });

// Later:unsubscribe(); // dispose the resources

Observer

从行为上来看,无非就是定义了如何处理上述流产生的数据,称之为流的处理方法。

js

复制代码

const observer = {
  next: x => console.log('Observer got a next value: ' + x),  error: err => console.error('Observer got an error: ' + err),  complete: () =>console.log('Observer got a complete notification'),};

Operators

观察者,用来消费消息/数据。

Subscription *

Subscription 是表示可清理资源的对象,它是由 Observable 执行之后产生的。

本质就是暂存了一个启动后的流,每一个启动后的流都是相互独立的,而这个启动后的流,就存储在subscription中,提供了unsubscribe,来停止这个流。

Subject *

Subject是一类特殊的Observable,它可以向多个Observer多路推送数值。

Observable Subject
角色 生产者(单向) 生产者、消费者(双向)
消费策略 单播 多播
流转方式 内部发送/接收数据 外部发送/接收数据
数据特性 冷数据流 热数据流
消费时机 调用 subscribe 调用 next

Scheduler

简单 demo

推荐在 stackblitz.com/ 练习。

事件监听器

js

复制代码

// 以前document.addEventListener('click', () =>console.log('我被“惦记”了!'));

// 现在import { fromEvent } from'rxjs';

fromEvent(document, 'click').subscribe(() =>console.log('我被“惦记”了!'));

Purity

使 RxJS 强大的是它使用纯函数产生值的能力。这意味着您的代码不太容易出错。

所谓纯函数,指的是满足下面两个条件的函数:

  • 函数的执行过程完全由输入参数决定,不会受除参数之外的任何数据影响。
  • 函数不会修改任何外部状态,比如修改全局变量或传入的参数对象。

js

复制代码

// 你的其他代码部分可能会弄乱你的状态let count = 0;
document.addEventListener('click', () =>console.log(`我被“惦记”了 ${++count} 次`));

// 使用 RxJS 可以隔离状态import { fromEvent, scan } from'rxjs';

fromEvent(document, 'click')
  .pipe(scan((count) => count + 1, 0))  .subscribe((count) =>console.log(`我被“惦记”了 ${count} 次`));
  
// scan 运算符的工作方式与数组的 reduce 类似。// 它接受一个暴露给回调的值。回调的返回值将成为下次运行回调时下一个暴露的值。

Flow

RxJS 有一整套运算符,可以帮助您控制事件如何通过 observables 流动。

js

复制代码

// 每秒最多点击一次的方式let count = 0;
let rate = 1000;
let lastClick = Date.now() - rate;
document.addEventListener('click', () => {
  if (Date.now() - lastClick >= rate) {    console.log(`我被“惦记”了 ${++count} 次`);    lastClick = Date.now();  }});// import { fromEvent, throttleTime, scan } from'rxjs';

fromEvent(document, 'click')
  .pipe(    throttleTime(1000),    scan((count) => count + 1, 0)  )  .subscribe((count) =>console.log(`我被“惦记”了 ${count} 次`));

Values

js

复制代码

// 在纯 JavaScript 中为每次点击添加当前鼠标 x 位置let count = 0;
const rate = 1000;
let lastClick = Date.now() - rate;
document.addEventListener('click', (event) => {
  if (Date.now() - lastClick >= rate) {    count += event.clientX;    console.log(count);    lastClick = Date.now();  }});// import { fromEvent, throttleTime, map, scan } from'rxjs';

fromEvent(document, 'click')
  .pipe(    throttleTime(1000),    map((event) => event.clientX),    scan((count, clientX) => count + clientX, 0)  )  .subscribe((count) =>console.log(count));

场景介绍

都有哪些场景能用到 RxJS 呢,以下举出了一些场景例子。供大家参考,其实还有更广泛的场景,还望大家自己发觉。

异步场景

  • AJAX / XHR(XMLHttpRequest) / fetch API
  • Service Worker / Node Stream
  • setTimeout / setInterval
  • Promise

事件场景

  • 各种 DOM 事件(click,dbclick,keyup、keydown...)
  • css3 动画事件(transition)
  • html5 Geolocation
  • WebSocket / Server-Sent Events

微前端通信

js

复制代码

// 主应用 main.js 部分内容import { Subject } from'rxjs'// 按需引入减少依赖包大小const pager = newSubject()

// 在主应用注册呼机监听器,这里可以监听到其他应用的广播pager.subscribe(v => {   console.log(`main主应用监听到子应用${v.from}发来消息:`, v)  // store.dispatch('app/setToken', v.token) // 这里处理主应用监听到改变后的逻辑  // rxjs 事件  qkEvent.toggleSidebar(v)  qkEvent.generateHeaderNavDoc(v)  qkEvent.generateDropdownMenu(v)})// 结合下章主应用下发资源给子应用,将pager作为一个模块传入子应用const msg = { 
  // data: store.getters, // 从主应用仓库读出的数据  data: {},  // components: LibraryUi, // 从主应用读出的组件库  // utils: LibraryJs, // 从主应用读出的工具类库  // emitFnc: childEmit, // 从主应用下发emit函数来收集子应用反馈  pager // 从主应用下发应用间通信呼机}exportconst qiankunRegisterMicroApps = [
  {    name: FUSANG_NAME,    entry: qiankunEntryConfig(FUSANG_NAME),    container: '#appContainer',    activeRule: '/ops-fe',    loader (loading) {    },    props: { store, pager: msg, parentRoute: router }  },]

js

复制代码

// 子应用// 引入主应用Subject实例if (props.pager) {
    Vue.prototype.$pager = props.pager}// 使用this.$pager && this.$pager.next({
  from: 'logcenter-fe', // 从哪来 子应用的名字  event: 'sendDoc', // 跳转文档  data: [{ path: 'https://xxxx.com/docs/logcenter/', target: '_blank' }]   // 传什么数据})

实现一个批量请求函数 multiRequest(urls, maxNum)

要求如下:

  • 要求最大并发数 maxNum
  • 每当有一个请求返回,就留下一个空位,可以增加新的请求
  • 所有请求完成后,结果按照 urls 里面的顺序依次打出

js

复制代码

// promisefunctionmultiRequest(urls = [], maxNum) {
  // 请求总数量  const len = urls.length;  // 根据请求数量创建一个数组来保存请求的结果  const result = newArray(len).fill(false);  // 当前完成的数量  let count = 0;  returnnewPromise((resolve, reject) => {    // 请求maxNum个    while (count < maxNum) {      next();    }    functionnext() {      let current = count++;      // 处理边界条件      if (current >= len) {        // 请求全部完成就将promise置为成功状态, 然后将result作为promise值返回        !result.includes(false) && resolve(result);        return;      }      const url = urls[current];      console.log(`开始 ${current}`, newDate().toLocaleString());      fetch(url)        .then((res) => {          // 保存请求结果          result[current] = res;          console.log(`完成 ${current}`, newDate().toLocaleString());          // 请求没有全部完成, 就递归          if (current < len) {            next();          }        })        .catch((err) => {          console.log(`结束 ${current}`, newDate().toLocaleString());          result[current] = err;          // 请求没有全部完成, 就递归          if (current < len) {            next();          }        });    }  });}

js

复制代码

// RxJS// 假设这是你的http请求函数functionhttpGet(url) {
  returnnewPromise(resolve => setTimeout(() =>resolve(`Result: ${url}`), 2000));}const array = [
  'https://httpbin.org/ip',   'https://httpbin.org/user-agent',  'https://httpbin.org/delay/3',];// mergeMap 是专门用来处理并发处理的 rxjs 操作符// mergeMap 第二个参数2的意思是,from(array)每次并发量是2,只有promise执行结束才接着取array里面的数据// mergeMap第一个参数 httpGet的意思是每次并发,从from(array)中取的数据如何包装,这里是作为httpGet的参数const source = from(array)
    .pipe(mergeMap(httpGet, 2))    .subscribe(val => console.log(val));

RxJS 和 Nest

nest 的 interceptor 就用了 rxjs 来处理响应,但常用的 operator 也就几个:

  • tap: 不修改响应数据,执行一些额外逻辑,比如记录日志、更新缓存等
  • map:对响应数据做修改,一般都是改成 {code, data, message} 的格式
  • catchError:在 exception filter 之前处理抛出的异常,可以记录或者抛出别的异常
  • timeout:处理响应超时的情况,抛出一个 TimeoutError,配合 catchErrror 可以返回超时的响应

使用 tap operator 来添加一些日志、缓存等逻辑:

js

复制代码

import { AppService } from'./app.service';
import { CallHandler, ExecutionContext, Injectable, Logger, NestInterceptor } from'@nestjs/common';
import { Observable, tap } from'rxjs';

@Injectable()exportclassTapTestInterceptor implements NestInterceptor {
  constructor(private appService: AppService) {}  private readonly logger = newLogger(TapTestInterceptor.name);  intercept(context: ExecutionContext, next: CallHandler): Observable<any> {    return next.handle().pipe(tap((data) => {
      
      // 这里是更新缓存的操作,这里模拟下      this.appService.getHello();      this.logger.log(`log something`, data);    }))  }}

使用 map operator 来对 controller 返回的数据做一些修改:

js

复制代码

import { CallHandler, ExecutionContext, Injectable, NestInterceptor } from'@nestjs/common';
import { map, Observable } from'rxjs';

@Injectable()exportclassMapTestInterceptor implements NestInterceptor {
  intercept(context: ExecutionContext, next: CallHandler): Observable<any> {    return next.handle().pipe(map(data => {      return {        code: 200,        message: 'success',        data      }    }))  }}

使用 catchError 处理抛出的异常:

js

复制代码

import { CallHandler, ExecutionContext, Injectable, Logger, NestInterceptor } from'@nestjs/common';
import { catchError, Observable, throwError } from'rxjs';

@Injectable()exportclassCatchErrorTestInterceptor implements NestInterceptor {
  private readonly logger = newLogger(CatchErrorTestInterceptor.name)  intercept (context: ExecutionContext, next: CallHandler): Observable<any> {    return next.handle().pipe(catchError(err => {      this.logger.error(err.message, err.stack)      returnthrowError(() => err)    }))  }}

RxJS 和 Angular

RxJS 和 Vue

RxJS 和 React

RxJS 和 React 一样,实践的都是响应式编程的概念,从取名上就可以看出来。

React 版本(无 RxJS):

js

复制代码

importReact, { useState } from'react';

// 傻瓜组件,无状态组件constCounterView = ({ count, onIncrement, onDecrement }) => (
  <div>    <h1>Count: {count}</h1>    <buttononClick={onIncrement}>+</button>    <buttononClick={onDecrement}>-</button>  </div>);// 聪明组件,有状态组件constCounter = () => {
  const [count, setCount] = useState(0);  constonIncrement = () => {    setCount(count + 1);  };  constonDecrement = () => {    setCount(count - 1);  };  return (    <CounterView      count={count}      onIncrement={onIncrement}      onDecrement={onDecrement}    />  );};exportdefaultCounter;

React + RxJS 版本:

  1. 把 onIncrement 和 onDecrement 的函数调用转化为数据流中的数据。(Observable)
  2. 把数据流中的数据改变转变为对组件状态的修改。(Observer)

在 RxJS 中 Subject 能既扮演 Observable 又扮演 Observer 的角色。

js

复制代码

importReact, { useState, useEffect } from'react';
import { Subject } from'rxjs/Subject';
import { scan } from'rxjs/operators';

constCounter = () => {
  const [count, setCount] = useState(0);
  
  // 创造了一个Subject对象  // 这个对象就是连接 RxJS 和 React 的纽带  const counter = newSubject();  useEffect(() => {    constobserver = value => setCount(value);
    
    // 利用 scan 累计 counter 中所有数据的总和    // scan 产生的 Observable 对象吐出的每个数据都通过 setCount 来修改当前组件的状态就可以    counter        .pipe(scan((result, inc) => result + inc, 0))        .subscribe(observer);  }, [counter]);  // 从 Observable 的角度,将 counter 代表的 Observable 定位成所有加减数字的数据流  // 当需要加 1 时,往 counter 里推送一个 1;当需要减 1 时,往 counter 里推送一个 -1  // 如果有需求改变想要加减其他的数值,那也只需要往 counter 里推送对应的正数或者负数就可以了  return (    <div>      <h1>{count}</h1>      <buttononClick={() => counter.next(1)}>+</button>      <buttononClick={() => counter.next(-1)}>-</button>    </div>  );};exportdefaultCounter;

Subject:

scan:rxjs.dev/api/index/f…

用于封装和管理状态。在使用 seed 值(第二个参数)或来自源的第一个值建立了初始状态之后,对来自源的每个值调用累加器(或“reducer 函数”)。

React 和 RxJS 高阶组件版本:

js

复制代码

importReact, { useState } from'react';
import { BehaviorSubject } from'rxjs/BehaviorSubject';
import { scan } from'rxjs/operators';

constCounterView = ({ count, onIncrement, onDecrement }) => (
  <div>    <h1>Count: {count}</h1>    <buttononClick={onIncrement}>+</button>    <buttononClick={onDecrement}>-</button>  </div>);constuseCounter = () => {
  const [counter] = useState(() =>newBehaviorSubject(0));  const count$ = counter.pipe(    scan((result, inc) => result + inc, 0)  );  constonIncrement = () => counter.next(1);  constonDecrement = () => counter.next(-1);  const [count, setCount] = useState(0);  count$.subscribe(value => setCount(value));  return { count, onIncrement, onDecrement };};constCounter = () => {
  const { count, onIncrement, onDecrement } = useCounter();  return (      <CounterView          count={count}          onIncrement={onIncrement}          onDecrement={onDecrement} />  );};exportdefaultCounter;

BehaviorSubject:rxjs.dev/api/index/c…

Subject的一个变体,需要初始值,并在订阅时发出其当前值。

BehaviorSubject 可以指定一个“默认数据”,如果不给某个 BehaviorSubject 塞任何数据,每一个观察者在订阅 BehaviorSubject 的时候依然可以获得一个数据,这非常适合 Counter 这个应用的要求,因为计数器需要一个初始默认值为 0。

React 和 RxJS 版本的一个秒表⏱️:

js

复制代码

importReact, { useState, useEffect } from"react";

import { Subject, BehaviorSubject, interval, of, EMPTY} from"rxjs";import {
  scan,  switchMap,  mergeMap,  map,  timeInterval,  take,} from"rxjs/operators";import padStart from"lodash/padStart";

constms2Time = (milliseconds) => {
  let ms = parseInt(milliseconds % 1000, 10);  let seconds = parseInt((milliseconds / 1000) % 60, 10);  let minutes = parseInt((milliseconds / (1000 * 60)) % 60, 10);  let hours = parseInt(milliseconds / (1000 * 60 * 60), 10);  return (    padStart(hours, 2, "0") +    ":" +    padStart(minutes, 2, "0") +    ":" +    padStart(seconds, 2, "0") +    "." +    padStart(ms, 3, "0")  );};constStopWatchView = ({ milliseconds, onStart, onStop, onReset }) => {
  return (    <div>      <h1>{ms2Time(milliseconds)}</h1>      <buttononClick={onStart}>开始</button>      <buttononClick={onStop}>停止</button>      <buttononClick={onReset}>重设</button>    </div>  );};constSTART = "start";
constSTOP = "stop";
constRESET = "reset";

constStopWatch = () => {
  const [milliseconds, setMilliseconds] = useState(0);  useEffect(() => {    // button 代表秒表上按钮点击动作的数据流    const button = newSubject();    // time$代表秒表当前应该展示的时间,无论哪个按钮被点击,都会打断time$原有的产生数据方式    const time$ = button.pipe(      switchMap((value) => {        switch (value) {          caseSTART: {             // 当点击“开始”时,我们使用 interval 配合 scan 来产生累积递增的毫秒数,             // 精确度是 10 毫秒而不是 1,             // 是因为 JS 运行环境往往也不会达到毫秒级别的绝对精确。            returninterval(10).pipe(              timeInterval(),              scan((result, ti) => result + ti.interval, 0)            );          }          caseSTOP:            returnEMPTY;          caseRESET:            returnof(0);          default:            returnthrowError("Invalid value ", value);        }      })    );    const stopWatch = newBehaviorSubject(0);    const subscription = stopWatch      .pipe(        mergeMap((value) => time$),        map((value) =>setMilliseconds(value))      )      .subscribe();    return() => subscription.unsubscribe();  }, []);  constonStop = () => setMilliseconds(STOP);  constonStart = () => setMilliseconds(START);  constonReset = () => setMilliseconds(RESET);  return (    <StopWatchView      milliseconds={milliseconds}      onStart={onStart}      onStop={onStop}      onReset={onReset}    />  );};exportdefaultStopWatch;

三个按钮的点击操作当然可以看作数据流来看待。对于 StopWatchView,需要渲染 milliseconds 属性,而且这个 milliseconds 的序列也可以看作一个数据流,当点击开始之后,这个数据流应该是持续不断地产生新的数据;当点击停止之后,这个数据流就不应该再产生数据。

EMPTY:rxjs.dev/api/index/c…

常量,一个简单的 Observable,不向 Observer 发出任何项,并立即发出完整的通知。

RxJS 和 Redux

Redux 原版:

js

复制代码

// Store.jsimport {createStore} from'redux';
import reducer from'./Reducer.js';

const initValues = {
  count: 0};const store = createStore(reducer, initValues);

exportdefault store;

Redux 和 RxJS 版本

js

复制代码

// Store.js// 不使用 Redux 的 createStore 来创造 Store 对象,// 而是使用我们自己定制的 createReactiveStoreimport createReactiveStore from'./createReactiveStore';
import reducer from'./Reducer.js';

const initValues = {
  count: 0};const store = createReactiveStore(reducer, initValues);

exportdefault store;

js

复制代码

import { Subject } from'rxjs';
import { scan, startWith, tap } from'rxjs/operators';

constcreateReactiveStore = (reducer, initialState) => {
  const action$ = newSubject();  let currentState = initialState;  const store$ = action$.pipe(    startWith(initialState),    scan(reducer),    tap((state) => {      currentState = state;    })  );  return {    dispatch: (action) => {      return action$.next(action);    },    getState: () => currentState,    subscribe: (func) => {      store$.subscribe(func);    },  };};exportdefault createReactiveStore;

createReactiveStore 和 createStore 基本能达到一致的效果,但是还是有一点小小的功能差异,因为 createReactiveStore 依赖于 RxJS 的数据流,而数据流如果不被订阅的话,整个管道上每个环节的操作是不会运行的。

假设,在调用 createReactiveStore 产生的 Store 对象的 subscribe 之前,先利用这个 Store 的 dispatch 函数派送了 action 对象,是不会引起数据流操作的,所以对应的 currentState 也不会发生改变,这样,当晚些时候调用 Store 的 subscribe 时候,得到的状态就不是正确的结果。

为了克服这个问题,一定要保证 createReactiveStore 产生的 Store 对象第一时间被订阅,这并不是什么困难的事情,react-redux 的 connect 函数实际上就替我们做了对 Store 的订阅。如下:

js

复制代码

importReactfrom'react';
import {connect} from'react-redux';

import * asActionsfrom'./Actions.js';

constCounterView = ({count, onIncrement, onDecrement}) => (
  <div>    <h1>Count: {count}</h1>    <buttononClick={onIncrement}>+</button>    <buttononClick={onDecrement}>-</button>  </div>);functionmapStateToProps(state, ownProps) {
  return {    count: state.count  }}functionmapDispatchToProps(dispatch, ownProps) {
  return {    onIncrement: () =>dispatch(Actions.increment()),    onDecrement: () =>dispatch(Actions.decrement()),  }}constCounter = connect(
    mapStateToProps,    mapDispatchToProps)(CounterView);exportdefaultCounter;

最后

其实 RxJS 最重要的部分就是操作符,但是由于操作符太多,也就不一一介绍了,可根据自己需求去官网找对应的操作符。找操作服的方法推荐去:rxjs.dev/operator-de…

RxJS 是不是前端开发的未来?还请各位小伙伴做出自己的思考!



相关文章
|
5天前
|
前端开发 JavaScript 测试技术
VueX解耦:前端开发的音乐大师
VueX解耦:前端开发的音乐大师
15 4
|
2月前
|
开发框架 前端开发 JavaScript
深入探究React:前端开发的利器
深入探究React:前端开发的利器
22 1
|
5月前
|
存储 移动开发 前端开发
前端知识笔记(二十五)———JS中的异步编程与Promise
前端知识笔记(二十五)———JS中的异步编程与Promise
21 0
|
11月前
|
存储 前端开发 JavaScript
【深入浅出 React 和 Redux】 笔记(上)
【深入浅出 React 和 Redux】 笔记(上)
52 0
|
11月前
|
前端开发 JavaScript API
|
12月前
|
存储 JSON JavaScript
「前端架构」Redux vs.MobX的权威指南
「前端架构」Redux vs.MobX的权威指南
|
存储 JavaScript 前端开发
前端开发:Vuex的基本使用
前端开发:Vuex的基本使用
62 0
|
缓存 前端开发 JavaScript
前端知识库Reactjs基础系列五mobx初识
在reactjs中,父子组件,同级组件间的状态传递只能通过props传递,如果需要传递的变量过多,就会导致项目混乱难以维护,并且容易产生意想不到的bug。所以我们需要一个能管理全局状态的工具,如redux,mobx这类的就是为了解决这个问题。
|
前端开发 C# C++
掌握React的基本使用,重塑前端开发
React change the way that Web apps should be build. UI -> Web apps 四步: Break The UI Into A Component Hierarchy 将UI结构拆解成组件结构 Build A Static Version in React It’s best to decouple these processes because building a static version requires a lot of typing and no thinking, and adding interactivity requi
101 0
掌握React的基本使用,重塑前端开发
|
JavaScript 前端开发
【前端】学习前端框架Vue-基础概念
Vue是当前市场上前端必须要掌握的一门前端框架,它是一套构建用户界面的渐进式前端框架。
167 1