简介
RxJS
是ReactiveX
编程理念的JavaScript
版本。ReactiveX
来自微软,它是一种针对异步数据流的编程。简单来说,它将一切数据,包括HTTP请求,DOM事件或者普通数据等包装成流的形式,然后用强大丰富的操作符对流进行处理,使你能以同步编程的方式处理异步数据,并组合不同的操作符来轻松优雅的实现你所需要的功能。
RxJS 提供了一种对 Observable
类型的实现,直到 Observable
成为了 JavaScript 语言的一部分并且浏览器支持它之前,它都是必要的。这个库还提供了一些工具函数,用于创建和使用可观察对象。这些工具函数可用于:
- 把现有的异步代码转换成可观察对象
- 迭代流中的各个值
- 把这些值映射成其它类型
- 对流进行过滤
- 组合多个流
初级核心概念
- Observable: 一系列值的生产者
- Observer: 它是observable值的消费者
- Operator: 可以在数据流的途中对值进行转换的操作符
Observables 作为被观察者,是一个值或事件的流集合;而 Observer 则作为观察者,根据 Observables 进行处理。
Observables 与 Observer 之间的订阅发布关系(观察者模式) 如下:
- 订阅:Observer 通过 Observable 提供的 subscribe() 方法订阅 Observable。
- 发布:Observable 通过回调 next 方法向 Observer 发布事件。
创建Observable
RxJS 中提供了很多操作符,用于创建 Observable 对象,常用的操作符如下:
- of(), 将普通JavaScript数据转为 Observable
- from(), 把数组或iterable对象转换成Observable
- create(), 返回一个可以在Observer上调用方法的Observable.
- fromEvent(), 把event转换成Observable.
- fromPromise(), 把Promise转换成Observable.
- ajax(), 从ajax创建一个observable
of()
import { Component, OnInit } from '@angular/core'; import { Observable,of } from 'rxjs'; @Component({ selector: 'app-observable', templateUrl: ' <h2>使用of来创建可观察对象</h2> <div> <button (click)="getData()">Click here</button> </div> ' }) export class ObservableComponent implements OnInit { constructor() { } ngOnInit(): void { } getData() { // Create simple observable that emits three values const myObservable = of(1, 2, 3); // Create observer object const myObserver = { next: x => console.log('Observer got a next value: ' + x), error: err => console.error('Observer got an error: ' + err), complete: () => console.log('Observer got a complete notification'), }; // Execute with the observer object myObservable.subscribe(myObserver); } } 复制代码
启动该项目,打开页面并点击按钮,出现这样的结果:
from()
import { Component, OnInit } from '@angular/core'; import { Observable,from } from 'rxjs'; @Component({ selector: 'app-observable', templateUrl: ' <h2>使用from函数创建可观察对象</h2> <div> <button (click)="fromData()">根据数组创建Observable</button> </div> ' }) export class ObservableComponent implements OnInit { constructor() { } ngOnInit(): void { } fromData() { let persons = [ { name: 'Dave', age: 34, salary: 2000 }, { name: 'Nick', age: 37, salary: 32000 }, { name: 'Howie', age: 40, salary: 26000 }, { name: 'Brian', age: 40, salary: 30000 }, { name: 'Kevin', age: 47, salary: 24000 }, ]; const myObservable = from(persons); myObservable.subscribe(person => console.log(person)); } } 复制代码
页面测试结果为:
fromEvent()
import { Component, OnInit } from '@angular/core'; import { Observable,fromEvent } from 'rxjs'; @Component({ selector: 'app-observable', templateUrl: ' <h2>使用fromEvent函数创建可观察对象</h2> <div> <p id="content">Hello Hresh</p> </div> ' }) export class ObservableComponent implements OnInit { constructor() { } ngOnInit(): void { this.fromEvent(); } fromEvent() { const el = document.getElementById('content'); const mouseMoves = fromEvent(el, 'click'); const subscription = mouseMoves.subscribe(() => { el.style.color = 'red'; }); } } 复制代码
运行项目,点击文本,文本将会变为红色。
关于 fromEvent 在实际生产中有个典型的应用:输入提示(type-ahead)建议。
可观察对象可以简化输入提示建议的实现方式。典型的输入提示要完成一系列独立的任务:
- 从输入中监听数据。
- 移除输入值前后的空白字符,并确认它达到了最小长度。
- 防抖(这样才能防止连续按键时每次按键都发起 API 请求,而应该等到按键出现停顿时才发起)
- 如果输入值没有变化,则不要发起请求(比如按某个字符,然后快速按退格)。
- 如果已发出的 AJAX 请求的结果会因为后续的修改而变得无效,那就取消它。
完全用 JavaScript 的传统写法实现这个功能可能需要大量的工作。使用可观察对象,你可以使用这样一个 RxJS 操作符的简单序列:
import { fromEvent } from 'rxjs'; import { ajax } from 'rxjs/ajax'; import { debounceTime, distinctUntilChanged, filter, map, switchMap } from 'rxjs/operators'; const searchBox = document.getElementById('search-box'); const typeahead = fromEvent(searchBox, 'input').pipe( map((e: KeyboardEvent) => (e.target as HTMLInputElement).value), filter(text => text.length > 2), //判断输入内容长度是否大于2 debounceTime(500), //等待,直到用户停止输入(这个例子中是停止 1/2 秒) distinctUntilChanged(), // 等待搜索文本发生变化。 switchMap(() => ajax('/api/endpoint')) //将搜索请求发送到服务。 ); typeahead.subscribe(data => { // Handle the data from the API }); 复制代码
fromPromise()
import { Component, OnInit } from '@angular/core'; import { from, Observable } from 'rxjs'; import { fromPromise } from 'rxjs/internal/observable/fromPromise'; @Component({ selector: 'app-observable', templateUrl: ' <h2>使用from函数创建可观察对象</h2> <div> <button (click)="fromPromiseData()">根据Promise创建Observable</button> </div> ' }) export class ObservableComponent implements OnInit { constructor() { } ngOnInit(): void { } fromPromiseData() { const myObservable = fromPromise(new Promise((resolve, reject) => { setTimeout(() => { // tslint:disable-next-line:prefer-const let username = 'hresh----Promise'; resolve(username); }, 2000); })); myObservable.subscribe({ next(data) { console.log(data); }, error(err) { console.error('Error' + err); }, complete() { console.log('completed'); } }); } } 复制代码
在 Rxjs6 之后,from 可以用来代替 fromPromise,所以上述代码改为 from()也是可以的。页面测试结果为:
这里扩展一下,官方文档有个案例是通过 fetch 方法返回 Promise 对象,这里我做了一些修改:
import { Component, OnInit } from '@angular/core'; import { from, Observable } from 'rxjs'; import { fromPromise } from 'rxjs/internal/observable/fromPromise'; @Component({ selector: 'app-observable', templateUrl: ' <h2>使用from函数创建可观察对象</h2> <div> <button (click)="fromPromiseData()">根据Promise创建Observable</button> <br> <button (click)="fromData2()">根据Promise得到的数组创建Observable</button> </div> ' }) export class ObservableComponent implements OnInit { constructor() { } ngOnInit(): void { } fromPromiseData() { const myObservable = from(fetch('http://a.itying.com/api/productlist')); myObservable.subscribe({ next(data) { console.log(data); }, error(err) { console.error('Error' + err); }, complete() { console.log('completed'); } }); } fromData2() { //fetch()返回的Promise,获取Promise对象中的内容,即数组 let arrayData = []; fetch('http://a.itying.com/api/productlist').then(response => response.json()).then((data => { arrayData = data.result; })); let myObservable = null; setTimeout(() => { myObservable = from(arrayData); myObservable.subscribe(data => console.log(data)); }, 2000); } } 复制代码
页面测试结果为:
interval()
import { Component, OnInit } from '@angular/core'; import { interval, Observable } from 'rxjs'; @Component({ selector: 'app-observable', templateUrl: ' <h2>使用interval函数创建可观察对象</h2> <div> <button (click)="interval2()">根据interval创建Observable</button> </div> ' }) export class ObservableComponent implements OnInit { constructor() { } ngOnInit(): void { } interval2() { const secondsCounter = interval(1000); // Subscribe to begin publishing values const oData = secondsCounter.subscribe(n => console.log(`It's been ${n} seconds since subscribing!`)); setTimeout(() => { console.log('取消计数操作'); oData.unsubscribe(); /*5s后取消数据显示*/ }, 5000); } } 复制代码
interval 操作符支持一个数值类型的参数,用于表示定时的间隔。上面代码表示每隔 1s,会输出一个递增的值,初始值从 0 开始。页面测试结果为:
ajax()
import { Component, OnInit } from '@angular/core'; import {ajax} from 'rxjs/ajax'; @Component({ selector: 'app-observable', templateUrl: ' <h2>使用ajax函数创建可观察对象</h2> <div> <button (click)="ajax2()">根据ajax创建Observable</button> </div> ' }) export class ObservableComponent implements OnInit { constructor() { } ngOnInit(): void { } ajax2(){ const apiData = ajax('http://a.itying.com/api/productlist'); // Subscribe to create the request apiData.subscribe(res => console.log(res.status, res.response)); } } 复制代码
页面测试结果:
操作符(Operators)
上述我们讲解的创建 Observable 的方法其实就是 Rxjs 的操作符,操作符是基于可观察对象构建的一些对集合进行复杂操作的函数。RxJS 还定义了一些操作符,比如 map()
、filter()
、concat()
和 flatMap()
。
操作符接受一些配置项,然后返回一个以来源可观察对象为参数的函数。当执行这个返回的函数时,这个操作符会观察来源可观察对象中发出的值,转换它们,并返回由转换后的值组成的新的可观察对象。
map
import { map } from 'rxjs/operators'; const nums = of(1, 2, 3); const squareValues = map((val: number) => val * val); const squaredNums = squareValues(nums); squaredNums.subscribe(x => console.log(x)); // Logs // 1 // 4 // 9 复制代码
可以看到 map 接受一个 function 作为参数, 通过该 function 可以把每个元素做平方运算进行转换.。
filter
你可以使用管道来把这些操作符链接起来。管道让你可以把多个由操作符返回的函数组合成一个。pipe()
函数以你要组合的这些函数作为参数,并且返回一个新的函数,当执行这个新函数时,就会顺序执行那些被组合进去的函数。
应用于某个可观察对象上的一组操作符就像一个处理流程 —— 也就是说,对你感兴趣的这些值进行处理的一组操作步骤。这个处理流程本身不会做任何事。你需要调用 subscribe()
来通过处理流程得出并生成一个结果。
import { filter, map } from 'rxjs/operators'; const nums = of(1, 2, 3, 4, 5); // Create a function that accepts an Observable. const squareOddVals = pipe( filter((n: number) => n % 2 !== 0), map(n => n * n) ); // Create an Observable that will run the filter and map functions const squareOdd = squareOddVals(nums); // Subscribe to run the combined functions squareOdd.subscribe(x => console.log(x)); // Logs // 1 // 9 // 25 复制代码
pipe()
函数也同时是 RxJS 的 Observable
上的一个方法,所以你可以用下列简写形式来达到同样的效果:
import { filter, map } from 'rxjs/operators'; const squareOdd = of(1, 2, 3, 4, 5) .pipe( filter(n => n % 2 !== 0), map(n => n * n) ); // Subscribe to get values squareOdd.subscribe(x => console.log(x)); 复制代码
share
在讲解 share 操作符前,我们需要了解一下冷热两种模式下的 Observables。官方定义如下:
Cold Observables 在被订阅后运行,也就是说,observables 序列仅在 subscribe 函数被调用后才会推送数据。与 Hot Observables 不同之处在于,Hot Observables 在被订阅之前就已经开始产生数据,例如
mouse move
事件。
从原理来说是这样的: Cold内部会创建一个新的数据生产者, 而Hot则会一直使用外部的数据生产者.
举个例子:
Cold: 就相当于我在B站看英雄联盟赛事重播,可以从头看起。
Hot: 就相当于看英雄联盟赛事直播, 如果来晚了, 那么前面就看不到了。
share() 操作符允许多个订阅者共享同一个 Observable. 也就是把 Cold 变成 Hot。看如下示例:
import { Component, OnInit } from '@angular/core'; import { interval } from 'rxjs'; import { share, take } from 'rxjs/operators'; @Component({ selector: 'app-observable', templateUrl: ' <h2>share操作符</h2> <div> <button (click)="shareData()">Click</button> </div> ' }) export class ObservableComponent implements OnInit { constructor() { } ngOnInit(): void { } shareData() { const numbers = interval(1000).pipe( take(5), share() ); function subscribeToNumbers(name) { numbers.subscribe( x => console.log(`${name}: ${x}`) ); } subscribeToNumbers('Dave'); const anotherSubscription = () => subscribeToNumbers('Nick'); setTimeout(anotherSubscription, 2500); } } 复制代码
页面测试结果为:
这里 interval 是每隔1秒产生一个数据, take(5)表示取5个数, 也就是1,2,3,4,5,然后 share()就把这个 observable 从 cold 变成了 hot 的。后边 Dave 进行了订阅,2.5秒以后, Nick 进行了订阅。
其他
RxJS 提供了很多操作符,不过只有少数是常用的。 如下图所示:
关于这些操作符的使用可以参看 RxJS 系列之三 - Operators 详解 和 RxJS API 文档。
错误处理
除了可以在订阅时提供 error()
处理器外,RxJS 还提供了 catchError
操作符,它允许你在管道中处理已知错误。
假设你有一个可观察对象,它发起 API 请求,然后对服务器返回的响应进行映射。如果服务器返回了错误或值不存在,就会生成一个错误。如果你捕获这个错误并提供了一个默认值,流就会继续处理这些值,而不会报错。
import { ajax } from 'rxjs/ajax'; import { map, catchError } from 'rxjs/operators'; // Return "response" from the API. If an error happens, // return an empty array. const apiData = ajax('http://a.itying.com/api/productlist').pipe( map(res => { if (!res.response) { throw new Error('Value expected!'); } return res.response; }), catchError(err => of([])) ); apiData.subscribe({ next(x) { console.log('data: ', x); }, error(err) { console.log('errors already caught... will not run'); } }); 复制代码
页面测试结果:
重试失败的可观察对象
catchError
提供了一种简单的方式进行恢复,而 retry
操作符让你可以尝试失败的请求。
可以在 catchError
之前使用 retry
操作符。它会订阅到原始的来源可观察对象,它可以重新运行导致结果出错的动作序列。如果其中包含 HTTP 请求,它就会重新发起那个 HTTP 请求。
下列代码把前面的例子改成了在捕获错误之前重发请求:
import { ajax } from 'rxjs/ajax'; import { map, retry, catchError } from 'rxjs/operators'; const apiData = ajax('/api/data').pipe( retry(3), // Retry up to 3 times before failing map(res => { if (!res.response) { throw new Error('Value expected!'); } return res.response; }), catchError(err => of([])) ); apiData.subscribe({ next(x) { console.log('data: ', x); }, error(err) { console.log('errors already caught... will not run'); } }); 复制代码
注意:不要重试登录认证请求,这些请求只应该由用户操作触发。我们肯定不会希望自动重复发送登录请求导致用户的账号被锁定。
问题记录
fromPromise no longer exported in master (v6)
RxJS version:
6.0.0-alpha.3
Code to reproduce:
import { fromPromise } from 'rxjs'; 复制代码
Expected behavior:
should work just as it did with v5.5 (but different location)
解决办法:统一使用 from 来代替 fromPromise。