继续解惑,异步处理 —— RxJS Observable

简介: Observable 可观察对象是开辟一个连续的通信通道给观察者 Observer,彼此之前形成一种关系,而这种关系需要由 Subscription 来确立,而在整个通道中允许对数据进行转换我们称为操作符 Operator。

image.png

接上一篇《Js 异步处理演进,Callback=>Promise=>Observer》,可能不少掘友对 Observer 还心存疑虑,本篇继续解惑~


Observable 称它为可观察对象,它并不是 Angular 的东西,而是 ES7 的一种用来管理异步数据的标准。


Observable 可观察对象是开辟一个连续的通信通道给观察者 Observer,彼此之前形成一种关系,而这种关系需要由 Subscription 来确立,而在整个通道中允许对数据进行转换我们称为操作符 Operator。


有一个形象的比喻:


你订了一个银行卡余额变化短信通知的服务,那么这个时候,每次只要你转账或者是购买商品在使用这张银行卡消费之后,银行的系统就会给你推送一条短信,通知你消费了多少多少钱;


这个场景下,银行卡余额就是 Observable,用户就是 Observer,用户在银行办理这个服务,就是 Subscription,银行卡余额发生转账或购买商品产生了变动,就是 Operator。


将上面的过程转化为代码:


import { Observable } from 'rxjs/Rx';
let sub = Observable
    .interval(1000)
    .map(second => second + '秒')
    .subscribe(res => {
        console.log(res);
    });


利用 Observable.interval 每隔 1 秒产生一个数据,然后交给 map 操作号将内容进行转换(银行卡余额发生变动),最后交给观察者打印结果(通知用户余额发生变化)。

如果你想取消这个服务,可以调用 sub.unsubscribe()


整个过程都在体现一个思想:数据流!这和函数式编程思路一致,数据流就像是工厂流水线,从原材料到成品,经过一层层的处理,所见即所做,非常清晰!(分离材料与加工机器,就是分离 Observable 和 Subscribe)


接下来,我们再具体看看 Observable 细节:


  • 创建
const Rx = require('rxjs/Rx')
const myObservable = Rx.Observable.create(observer => {
  observer.next('foo');
  setTimeout(() => observer.next('bar'), 1000);
});


我们可以调用 Observable.create 方法来创建一个 Observable,入参是 observer,在函数内部通过调用 observer.next() 便可生成有一系列值的一个 Observable


  • 内置方法
const observer = {
  next: function(value) {
    console.log(value);
  },
  error: function(error) {
    console.log(error)
  },
  complete: function() {
    console.log('complete')
  }
}


nexterrorcomplete,都是可选项;


打个比方:用户订牛奶厂商的牛奶,牛奶厂商是 Observable,用户是 Observer。用户打电话(subscribe)给牛奶商,牛奶商送牛奶(next),用户收到牛奶后喝牛奶;送奶过程可能发生意外,送奶失败(error);或者送奶顺利全部完成(complete);

  • Operator:RxJS 提供了大量的 API,熟悉他们需要时间和经验;


创建数据流

  • 单值:ofemptynever
  • 多值:from
  • 定时:intervaltimer
  • 事件:fromEvent
  • Promise:fromPromise
  • 自定义:create


转换

  • 改变数据形态:map, mapTo, pluck
  • 过滤一些值:filter, skip, first, last, take
  • 时间轴上的操作:delay, timeout, throttle, debounce, audit, bufferTime
  • 累加:reduce, scan
  • 异常处理:throw, catch, retry, finally
  • 条件执行:takeUntil, delayWhen, retryWhen, subscribeOn, ObserveOn
  • 转接:switch


组合

  • concat 保持原来的序列顺序连接两个数据流
  • merge 合并序列
  • race 预设条件为其中一个数据流完成
  • forkJoin 预设条件为所有数据流都完成
  • zip 取各来源数据流最后一个值合并为对象
  • combineLatest 取各来源数据流最后一个值合并为数组


Observable 的优势在于:

  • 降低了目标与观察者之间的耦合关系,两者之间是抽象耦合关系;
  • 符合 依赖倒置原则
  • 目标与观察者之间建立了一套触发机制;
  • 支持广播通信多播;


依赖倒置原则:依赖倒置原则(Dependence Inversion Principle)是程序要依赖于抽象接口,不要依赖于具体实现。简单的说就是要求对抽象进行编程,不要对实现进行编程,这样就降低了客户与实现模块间的耦合。

多播(即一个Observable,多个subscribe):

image.png

以上就是关于 RxJS Observable 进一步在概念上的解惑~~

觉得还不错,点个赞吧👍👍👍


更多推荐阅读:


目录
打赏
0
0
0
0
2
分享
相关文章
探秘 RxJS Observable 为什么要长成这个样子?!
我们都知道 RxJS Observable 最基础的使用方法:是建立 Observable,即调用 .create API
面试官:RxJava是如何做到响应式编程的?
RxJava已经出现很多个年头了,但是依然被很多公司使用,如果现在还对RxJava了解的不够透彻, 可以看这个系列对它的分析:相信看完后你对它会有个更全面的认识。 这个系列主要从下面几个方面来讲解: **RxJava基本操作符使用** **RxJava响应式编程是如何实现的** **RxJava的背压机制及Flowable是如何实现背压的** **RxJava的线程切换原理
三连弹!原生实现异步处理利器 —— Observable
本篇带来用原生实现 Observable,一探内部究竟!!
手写代码:实现一个EventBus
EventBus,事件总线。总线一词来自于《计算机组成原理》中的”系统总线“,是指用于连接多个部件的信息传输线,各部件共享的传输介质。我们通常把事件总线也成为自定义事件,一般包含`on`、`once`、`emit`、`off`等方法。在Vue2中想要实现EventBus比较简单,直接暴露出一个`new Vue()`实例即可,以此为思路,我们应该如何自定义实现EventBus呢?
552 0
手写代码:实现一个EventBus
你会用RxJS吗?【初识 RxJS中的Observable和Observer】
概念 RxJS是一个库,可以使用可观察队列来编写异步和基于事件的程序的库。 RxJS 中管理和解决异步事件的几个关键点: Observable: 表示未来值或事件的可调用集合的概念。 Observer: 是一个回调集合,它知道如何监听 Observable 传递的值。 Subscription: 表示一个 Observable 的执行,主要用于取消执行。 Operators:** 是纯函数,可以使用函数式编程风格来处理具有map、filter、concat、reduce等操作的集合。
175 0
Rxjs源码解析(一)Observable
学习一个库最好的方法就是看其源码,理解其 api 的调用原理,用起来自然也就很清楚自己到底在干什么了,秉持着此观念,为了更好地理解 rxjs,抽空将其源码看了一遍 本文章不会刻意涉及概念性的东西,主线就是解读源码,并在恰当的时候给出一些小例子,源码基于 rxjs v7.4.0 版本
356 0
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等