Angular与Rxjs学习

简介: Angular与Rxjs学习

简介


RxJSReactiveX编程理念的JavaScript版本。ReactiveX来自微软,它是一种针对异步数据流的编程。简单来说,它将一切数据,包括HTTP请求,DOM事件或者普通数据等包装成流的形式,然后用强大丰富的操作符对流进行处理,使你能以同步编程的方式处理异步数据,并组合不同的操作符来轻松优雅的实现你所需要的功能。

RxJS 提供了一种对 Observable 类型的实现,直到 Observable 成为了 JavaScript 语言的一部分并且浏览器支持它之前,它都是必要的。这个库还提供了一些工具函数,用于创建和使用可观察对象。这些工具函数可用于:


  • 把现有的异步代码转换成可观察对象
  • 迭代流中的各个值
  • 把这些值映射成其它类型
  • 对流进行过滤
  • 组合多个流


初级核心概念


  • Observable: 一系列值的生产者
  • Observer: 它是observable值的消费者
  • Operator: 可以在数据流的途中对值进行转换的操作符

Observables 作为被观察者,是一个值或事件的流集合;而 Observer 则作为观察者,根据 Observables 进行处理。


Observables 与 Observer 之间的订阅发布关系(观察者模式) 如下:


  • 订阅:Observer 通过 Observable 提供的 subscribe() 方法订阅 Observable。
  • 发布:Observable 通过回调 next 方法向 Observer 发布事件。


创建Observable


RxJS 中提供了很多操作符,用于创建 Observable 对象,常用的操作符如下:

  • of(), 将普通JavaScript数据转为 Observable
  • from(), 把数组或iterable对象转换成Observable
  • create(), 返回一个可以在Observer上调用方法的Observable.
  • fromEvent(), 把event转换成Observable.
  • fromPromise(), 把Promise转换成Observable.
  • ajax(),  从ajax创建一个observable


of()


import { Component, OnInit } from '@angular/core';
import { Observable,of } from 'rxjs';
@Component({
  selector: 'app-observable',
  templateUrl: '
    <h2>使用of来创建可观察对象</h2>
    <div>
      <button (click)="getData()">Click here</button>
    </div>
  '
})
export class ObservableComponent implements OnInit {
  constructor() { }
  ngOnInit(): void {
  }
  getData() {
    // Create simple observable that emits three values
    const myObservable = of(1, 2, 3);
    // Create observer object
    const myObserver = {
      next: x => console.log('Observer got a next value: ' + x),
      error: err => console.error('Observer got an error: ' + err),
      complete: () => console.log('Observer got a complete notification'),
    };
    // Execute with the observer object
    myObservable.subscribe(myObserver);
  }
}
复制代码


启动该项目,打开页面并点击按钮,出现这样的结果:


1.jpg


from()


import { Component, OnInit } from '@angular/core';
import { Observable,from } from 'rxjs';
@Component({
  selector: 'app-observable',
  templateUrl: '
    <h2>使用from函数创建可观察对象</h2>
    <div>
      <button (click)="fromData()">根据数组创建Observable</button>
    </div>
  '
})
export class ObservableComponent implements OnInit {
  constructor() { }
  ngOnInit(): void {
  }
  fromData() {
    let persons = [
      { name: 'Dave', age: 34, salary: 2000 },
      { name: 'Nick', age: 37, salary: 32000 },
      { name: 'Howie', age: 40, salary: 26000 },
      { name: 'Brian', age: 40, salary: 30000 },
      { name: 'Kevin', age: 47, salary: 24000 },
    ];
    const myObservable = from(persons);
    myObservable.subscribe(person => console.log(person));
  }
}
复制代码


页面测试结果为:


2.jpg


fromEvent()


import { Component, OnInit } from '@angular/core';
import { Observable,fromEvent } from 'rxjs';
@Component({
  selector: 'app-observable',
  templateUrl: '
    <h2>使用fromEvent函数创建可观察对象</h2>
    <div>
      <p id="content">Hello Hresh</p>
    </div>
  '
})
export class ObservableComponent implements OnInit {
  constructor() { }
  ngOnInit(): void {
      this.fromEvent();
  }
  fromEvent() {
    const el = document.getElementById('content');
    const mouseMoves = fromEvent(el, 'click');
    const subscription = mouseMoves.subscribe(() => {
      el.style.color = 'red';
    });
  }
}
复制代码


运行项目,点击文本,文本将会变为红色。


关于 fromEvent 在实际生产中有个典型的应用:输入提示(type-ahead)建议

可观察对象可以简化输入提示建议的实现方式。典型的输入提示要完成一系列独立的任务:


  • 从输入中监听数据。
  • 移除输入值前后的空白字符,并确认它达到了最小长度。
  • 防抖(这样才能防止连续按键时每次按键都发起 API 请求,而应该等到按键出现停顿时才发起)
  • 如果输入值没有变化,则不要发起请求(比如按某个字符,然后快速按退格)。
  • 如果已发出的 AJAX 请求的结果会因为后续的修改而变得无效,那就取消它。


完全用 JavaScript 的传统写法实现这个功能可能需要大量的工作。使用可观察对象,你可以使用这样一个 RxJS 操作符的简单序列:


import { fromEvent } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { debounceTime, distinctUntilChanged, filter, map, switchMap } from 'rxjs/operators';
const searchBox = document.getElementById('search-box');
const typeahead = fromEvent(searchBox, 'input').pipe(
  map((e: KeyboardEvent) => (e.target as HTMLInputElement).value),
  filter(text => text.length > 2),        //判断输入内容长度是否大于2
  debounceTime(500),    //等待,直到用户停止输入(这个例子中是停止 1/2 秒)
  distinctUntilChanged(),    // 等待搜索文本发生变化。
  switchMap(() => ajax('/api/endpoint'))        //将搜索请求发送到服务。
);
typeahead.subscribe(data => {
 // Handle the data from the API
});
复制代码


fromPromise()


import { Component, OnInit } from '@angular/core';
import { from, Observable } from 'rxjs';
import { fromPromise } from 'rxjs/internal/observable/fromPromise';
@Component({
  selector: 'app-observable',
  templateUrl: '
    <h2>使用from函数创建可观察对象</h2>
    <div>
      <button (click)="fromPromiseData()">根据Promise创建Observable</button>
    </div>
  '
})
export class ObservableComponent implements OnInit {
  constructor() { }
  ngOnInit(): void {
  }
  fromPromiseData() {
    const myObservable = fromPromise(new Promise((resolve, reject) => {
      setTimeout(() => {
        // tslint:disable-next-line:prefer-const
        let username = 'hresh----Promise';
        resolve(username);
      }, 2000);
    }));
    myObservable.subscribe({
      next(data) { console.log(data); },
      error(err) { console.error('Error' + err); },
      complete() { console.log('completed'); }
    });
  }
}
复制代码


在 Rxjs6 之后,from 可以用来代替 fromPromise,所以上述代码改为 from()也是可以的。页面测试结果为:


3.jpg


这里扩展一下,官方文档有个案例是通过 fetch 方法返回 Promise 对象,这里我做了一些修改:


import { Component, OnInit } from '@angular/core';
import { from, Observable } from 'rxjs';
import { fromPromise } from 'rxjs/internal/observable/fromPromise';
@Component({
  selector: 'app-observable',
  templateUrl: '
    <h2>使用from函数创建可观察对象</h2>
    <div>
      <button (click)="fromPromiseData()">根据Promise创建Observable</button>
      <br>
      <button (click)="fromData2()">根据Promise得到的数组创建Observable</button>
    </div>
  '
})
export class ObservableComponent implements OnInit {
  constructor() { }
  ngOnInit(): void {
  }
  fromPromiseData() {
    const myObservable = from(fetch('http://a.itying.com/api/productlist'));
    myObservable.subscribe({
      next(data) { console.log(data); },
      error(err) { console.error('Error' + err); },
      complete() { console.log('completed'); }
    });
  }
  fromData2() {
    //fetch()返回的Promise,获取Promise对象中的内容,即数组
    let arrayData = [];
    fetch('http://a.itying.com/api/productlist').then(response => response.json()).then((data => {
      arrayData = data.result;
    }));
    let myObservable = null;
    setTimeout(() => {
      myObservable = from(arrayData);
      myObservable.subscribe(data => console.log(data));
    }, 2000);
  }
}
复制代码


页面测试结果为:


4.jpg


interval()


import { Component, OnInit } from '@angular/core';
import { interval, Observable } from 'rxjs';
@Component({
  selector: 'app-observable',
  templateUrl: '
    <h2>使用interval函数创建可观察对象</h2>
    <div>
      <button (click)="interval2()">根据interval创建Observable</button>
    </div>
  '
})
export class ObservableComponent implements OnInit {
  constructor() { }
  ngOnInit(): void {
  }
  interval2() {
    const secondsCounter = interval(1000);
    // Subscribe to begin publishing values
    const oData = secondsCounter.subscribe(n =>
      console.log(`It's been ${n} seconds since subscribing!`));
    setTimeout(() => {
      console.log('取消计数操作');
      oData.unsubscribe();  /*5s后取消数据显示*/
    }, 5000);
  }
}
复制代码


interval 操作符支持一个数值类型的参数,用于表示定时的间隔。上面代码表示每隔 1s,会输出一个递增的值,初始值从 0 开始。页面测试结果为:


5.jpg


ajax()


import { Component, OnInit } from '@angular/core';
import {ajax} from 'rxjs/ajax';
@Component({
  selector: 'app-observable',
  templateUrl: '
    <h2>使用ajax函数创建可观察对象</h2>
    <div>
      <button (click)="ajax2()">根据ajax创建Observable</button>
    </div>
  '
})
export class ObservableComponent implements OnInit {
  constructor() { }
  ngOnInit(): void {
  }
  ajax2(){
    const apiData = ajax('http://a.itying.com/api/productlist');
    // Subscribe to create the request
    apiData.subscribe(res => console.log(res.status, res.response));
  }
}
复制代码


页面测试结果:


6.jpg


操作符(Operators)


上述我们讲解的创建 Observable 的方法其实就是 Rxjs 的操作符,操作符是基于可观察对象构建的一些对集合进行复杂操作的函数。RxJS 还定义了一些操作符,比如 map()filter()concat()flatMap()


操作符接受一些配置项,然后返回一个以来源可观察对象为参数的函数。当执行这个返回的函数时,这个操作符会观察来源可观察对象中发出的值,转换它们,并返回由转换后的值组成的新的可观察对象。


10.jpg


map


import { map } from 'rxjs/operators';
const nums = of(1, 2, 3);
const squareValues = map((val: number) => val * val);
const squaredNums = squareValues(nums);
squaredNums.subscribe(x => console.log(x));
// Logs
// 1
// 4
// 9
复制代码


可以看到 map 接受一个 function 作为参数, 通过该 function 可以把每个元素做平方运算进行转换.。


filter


你可以使用管道来把这些操作符链接起来。管道让你可以把多个由操作符返回的函数组合成一个。pipe() 函数以你要组合的这些函数作为参数,并且返回一个新的函数,当执行这个新函数时,就会顺序执行那些被组合进去的函数。


应用于某个可观察对象上的一组操作符就像一个处理流程 —— 也就是说,对你感兴趣的这些值进行处理的一组操作步骤。这个处理流程本身不会做任何事。你需要调用 subscribe() 来通过处理流程得出并生成一个结果。


import { filter, map } from 'rxjs/operators';
const nums = of(1, 2, 3, 4, 5);
// Create a function that accepts an Observable.
const squareOddVals = pipe(
  filter((n: number) => n % 2 !== 0),
  map(n => n * n)
);
// Create an Observable that will run the filter and map functions
const squareOdd = squareOddVals(nums);
// Subscribe to run the combined functions
squareOdd.subscribe(x => console.log(x));
// Logs
// 1
// 9
// 25
复制代码


pipe() 函数也同时是 RxJS 的 Observable 上的一个方法,所以你可以用下列简写形式来达到同样的效果:


import { filter, map } from 'rxjs/operators';
const squareOdd = of(1, 2, 3, 4, 5)
  .pipe(
    filter(n => n % 2 !== 0),
    map(n => n * n)
  );
// Subscribe to get values
squareOdd.subscribe(x => console.log(x));
复制代码

share


在讲解 share 操作符前,我们需要了解一下冷热两种模式下的 Observables。官方定义如下:


Cold Observables 在被订阅后运行,也就是说,observables 序列仅在 subscribe 函数被调用后才会推送数据。与 Hot Observables 不同之处在于,Hot Observables 在被订阅之前就已经开始产生数据,例如mouse move事件。


从原理来说是这样的: Cold内部会创建一个新的数据生产者, 而Hot则会一直使用外部的数据生产者.


举个例子:


Cold: 就相当于我在B站看英雄联盟赛事重播,可以从头看起。


Hot: 就相当于看英雄联盟赛事直播, 如果来晚了, 那么前面就看不到了。


share() 操作符允许多个订阅者共享同一个 Observable. 也就是把 Cold 变成 Hot。看如下示例:


import { Component, OnInit } from '@angular/core';
import { interval } from 'rxjs';
import { share, take } from 'rxjs/operators';
@Component({
  selector: 'app-observable',
  templateUrl: '
    <h2>share操作符</h2>
    <div>
      <button (click)="shareData()">Click</button>
    </div>
  '
})
export class ObservableComponent implements OnInit {
  constructor() { }
  ngOnInit(): void {
  }
  shareData() {
    const numbers = interval(1000).pipe(
      take(5),
      share()
    );
    function subscribeToNumbers(name) {
      numbers.subscribe(
        x => console.log(`${name}: ${x}`)
      );
    }
    subscribeToNumbers('Dave');
    const anotherSubscription = () => subscribeToNumbers('Nick');
    setTimeout(anotherSubscription, 2500);
  }
}
复制代码


页面测试结果为:


11.jpg


这里 interval 是每隔1秒产生一个数据, take(5)表示取5个数, 也就是1,2,3,4,5,然后 share()就把这个 observable 从 cold 变成了 hot 的。后边 Dave 进行了订阅,2.5秒以后, Nick 进行了订阅。


其他


RxJS 提供了很多操作符,不过只有少数是常用的。  如下图所示:


image.png


关于这些操作符的使用可以参看 RxJS 系列之三 - Operators 详解 和  RxJS API 文档


错误处理


除了可以在订阅时提供 error() 处理器外,RxJS 还提供了 catchError 操作符,它允许你在管道中处理已知错误。


假设你有一个可观察对象,它发起 API 请求,然后对服务器返回的响应进行映射。如果服务器返回了错误或值不存在,就会生成一个错误。如果你捕获这个错误并提供了一个默认值,流就会继续处理这些值,而不会报错。


import { ajax } from 'rxjs/ajax';
import { map, catchError } from 'rxjs/operators';
// Return "response" from the API. If an error happens,
// return an empty array.
const apiData = ajax('http://a.itying.com/api/productlist').pipe(
  map(res => {
    if (!res.response) {
      throw new Error('Value expected!');
    }
    return res.response;
  }),
  catchError(err => of([]))
);
apiData.subscribe({
  next(x) { console.log('data: ', x); },
  error(err) { console.log('errors already caught... will not run'); }
});
复制代码


页面测试结果:


12.jpg


重试失败的可观察对象


catchError 提供了一种简单的方式进行恢复,而 retry 操作符让你可以尝试失败的请求。


可以在 catchError 之前使用 retry 操作符。它会订阅到原始的来源可观察对象,它可以重新运行导致结果出错的动作序列。如果其中包含 HTTP 请求,它就会重新发起那个 HTTP 请求。


下列代码把前面的例子改成了在捕获错误之前重发请求:


import { ajax } from 'rxjs/ajax';
import { map, retry, catchError } from 'rxjs/operators';
const apiData = ajax('/api/data').pipe(
  retry(3), // Retry up to 3 times before failing
  map(res => {
    if (!res.response) {
      throw new Error('Value expected!');
    }
    return res.response;
  }),
  catchError(err => of([]))
);
apiData.subscribe({
  next(x) { console.log('data: ', x); },
  error(err) { console.log('errors already caught... will not run'); }
});
复制代码


注意:不要重试登录认证请求,这些请求只应该由用户操作触发。我们肯定不会希望自动重复发送登录请求导致用户的账号被锁定。


问题记录


fromPromise no longer exported in master (v6)


RxJS version:


6.0.0-alpha.3


Code to reproduce:


import { fromPromise } from 'rxjs';
复制代码


Expected behavior:


should work just as it did with v5.5 (but different location)

解决办法:统一使用 from 来代替 fromPromise。


目录
相关文章
|
27天前
|
缓存 前端开发 JavaScript
前端serverless探索之组件单独部署时,利用rxjs实现业务状态与vue-react-angular等框架的响应式状态映射
本文深入探讨了如何将RxJS与Vue、React、Angular三大前端框架进行集成,通过抽象出辅助方法`useRx`和`pushPipe`,实现跨框架的状态管理。具体介绍了各框架的响应式机制,展示了如何将RxJS的Observable对象转化为框架的响应式数据,并通过示例代码演示了使用方法。此外,还讨论了全局状态源与WebComponent的部署优化,以及一些实践中的改进点。这些方法不仅简化了异步编程,还提升了代码的可读性和可维护性。
|
6月前
|
JavaScript 前端开发 架构师
Angular进阶:理解RxJS在Angular应用中的高效运用
RxJS(Reactive Extensions for JavaScript)是JavaScript的一个响应式编程库,特别适用于处理异步数据流。
88 0
|
3月前
|
开发者 开发工具 UED
JSF应用的社交革命:一键解锁社交媒体超级功能,引爆用户参与度的奇迹!
【8月更文挑战第31天】本文探讨了在JavaServer Faces (JSF)应用中集成社交媒体功能的最佳实践,包括选择合适的API和SDK、示例代码及实现细节。通过集成社交媒体,应用能提供即时内容分享、互动交流和个性化体验,提升用户参与度。文章还强调了用户体验优化、安全性及隐私保护的重要性,并总结了社交媒体集成对企业竞争优势的意义。
42 0
|
设计模式 JavaScript 前端开发
学习Angular的编程之旅
学习Angular的编程之旅
|
前端开发 JavaScript 网络架构
Angular基础知识学习(三)
Angular基础知识学习(三)
152 0
Angular基础知识学习(三)
|
缓存 前端开发 JavaScript
Javascript学习-angular开发环境搭建及新建项目并运行
Javascript学习-angular开发环境搭建及新建项目并运行
114 0
Javascript学习-angular开发环境搭建及新建项目并运行
|
前端开发 JavaScript 安全
Angular基础知识学习(二)下
Angular基础知识学习(二)下
128 0
Angular基础知识学习(二)下
|
JavaScript 开发者
Angular基础知识学习(二)上
Angular基础知识学习(二)上
128 0
Angular基础知识学习(二)上
|
JavaScript
Angular基础知识学习(一)下
Angular基础知识学习(一)下
180 0
Angular基础知识学习(一)下
|
前端开发 JavaScript 安全
Angular基础知识学习(一)上
Angular基础知识学习(一)上
173 0
Angular基础知识学习(一)上