Observable学习笔记

简介: Observable学习笔记

简介


可观察对象(Observable)支持在应用中的发布者和订阅者之间传递消息。


可观察对象(Observable)可以发送多个任意类型的值 —— 字面量、消息、事件。


基本用法和词汇


Observable 用于在发送方和接收方之间传输消息。在创建 Observable 对象时,需要传入一个函数作为构造函数的参数,这个函数叫订阅者函数,这个函数也就是生产者向消费者推送消息的地方。在被消费者 subscribe(订阅)之前,订阅者函数不会被执行,直到subscribe()函数被调用,该函数返回一个 subscription 对象,里面有一个unsubscribe()函数,消费者可以随时拒绝消息的接收!


好了,我们看如下案例:


import { Component, OnInit } from '@angular/core';
import { Observable } from 'rxjs';
@Component({
  selector: 'app-observable',
  templateUrl: '
    <h2>Observable对象使用</h2>
    <div>
      <button (click)="getData()" >Click</button>
    </div>
    <h2>计数</h2>
    <div>
      <button (click)="getRxjsIntervalData()" >Click</button>
    </div>
  '
})
export class ObservableComponent implements OnInit {
  constructor() { }
  ngOnInit(): void {
  }
  getData() {
    const myObservable = new Observable((observer) => {
      let uname = 'hresh';
      observer.next(uname);
    });
    let oData = myObservable.subscribe((data) => {
      console.log(data);
    });
  }
  getRxjsIntervalData() {
    let count = 0;
    //开始每秒计数
    const myObservable = new Observable((observer) => {
      setInterval(() => {
        count++;
        observer.next(count);
      }, 1000);
    });
    let oData = myObservable.subscribe((data) => {
      console.log(data);
    });
    setTimeout(() => {
      console.log('取消计数操作');
      oData.unsubscribe();  /*3s后取消数据显示*/
    }, 3000);
  }
}
复制代码


页面测试效果:


1.jpg


地理位置的获取


该案例的功能是获取地理位置,根据官方文档的案例进行扩展,不然直接看官方案例可能会存在疑惑。


在 HTML 规范中,增加了获取用户地理信息的 API,这样使得可以基于用户位置开发互联网应用,即基于位置服务 。


获取当前地信息


navigator.geolocation.getCurrentPosition(successCallback,errorCallback)
复制代码


重复获取当前地理信息


navigator.geolocation.watchPosition(successCallback,errorCallback)
复制代码


使用

当获取地理信息成功后,会调用 successCallback,并返回一个包含位置信息的对象 position。

  • 经度 : coords.longitude
  • 纬度 : coords.latitude
  • 准确度 : coords.accuracy
  • 海拔 : coords.altitude
  • 海拔准确度 : coords.altitudeAcuracy
  • 行进方向 : coords.heading
  • 地面速度 : coords.speed
  • 请求的时间: new Date(position.timestamp)


当获取地理信息失败后,会调用 errorCallback,返回错误信息 error。


observable.component.ts 文件内容如下:


import { Component, OnInit } from '@angular/core';
import { Observable } from 'rxjs';
@Component({
  selector: 'app-observable',
  templateUrl: '
    <h2>获取地理位置案例</h2>
    <div>
      <input type = "button" (click)="getLocationUpdate()" value = "Watch Update"/>
      <input type = "button" (click) = "stopWatch()" value = "Stop Watch"/>
    </div>
  '
})
export class ObservableComponent implements OnInit {
  watchID: number;
  geoLoc: any;
  public num = 0;
  constructor() { }
  ngOnInit(): void {
    // this.geolocationTest();
  }
  getLocationUpdate() {
    let index = this.num;
    function showLocation(position) {
      let latitude = position.coords.latitude;
      let longitude = position.coords.longitude;
      alert('index:' + index + '---Latitude : ' + latitude + '----Longitude: ' + longitude);
    }
    function errorHandler(err) {
      if (err.code === 1) {
        alert('Error: Access is denied!');
      } else if (err.code === 2) {
        // tslint:disable-next-line: quotemark
        alert("Error: Position is unavailable!");
      }
    }
    if (navigator.geolocation) {
      // timeout at 60000 milliseconds (60 seconds)
      const options = {
        enableHighAccuracy: false,  // 位置是否精确获取
        timeout: 5000,    // 获取位置允许的最长时间
        maximumAge: 1000    // 多久更新获取一次位置
      };
      this.geoLoc = navigator.geolocation;
      this.watchID = this.geoLoc.watchPosition(showLocation, errorHandler, options);
      console.log(this.watchID);
      this.num += 1;
    } else {
      alert('Sorry, browser does not support geolocation!');
    }
  }
  stopWatch() {
    this.geoLoc.clearWatch(this.watchID);
    this.num = 0;
  }
}
复制代码


运行项目后,在 IE 浏览器中打开,谷歌浏览器暂不支持地理位置获取。点击页面中的第一个按钮,效果如下:


2..jpg


Geolocation.watchPosition() 用于注册监听器,在设备的地理位置发生改变的时候自动被调用。该方法会返回一个 ID,从图中可以看到控制台输出的 watchID 值为1。在页面上停留一段时间,可以看到以下现象:


3.jpg


该监听器会一直监听设备的地理位置,一旦有所变动就会自动调用。如要取消监听可以通过 Geolocation.clearWatch() 传入该 ID 实现取消的目的,点击第二个按钮即可。


接下来我们使用可观察对象来实现地理位置的监听。


基于Observable的位置监听


作为发布者,你创建一个 Observable 的实例,其中定义了一个订阅者(subscriber)函数。 当有消费者调用 subscribe() 方法时,这个函数就会执行。 订阅者函数用于定义“如何获取或生成那些要发布的值或消息”。


要执行所创建的可观察对象,并开始从中接收通知,你就要调用它的 subscribe() 方法,并传入一个观察者(observer)。 这是一个 JavaScript 对象,它定义了你收到的这些消息的处理器(handler)。 subscribe() 调用会返回一个 Subscription 对象,该对象具有一个 unsubscribe() 方法。 当调用该方法时,你就会停止接收通知。


observable.component.ts 文件内容如下:


import { Component, OnInit } from '@angular/core';
import { Observable } from 'rxjs';
@Component({
  selector: 'app-observable',
  templateUrl: '
  <h2>Observable基本用法</h2>
    <div>
      <button (click)="geolocationTest()">获取当前地信息</button>
    </div>
  '
})
export class ObservableComponent implements OnInit {
  public num = 0;
  constructor() { }
  ngOnInit(): void {
    // this.geolocationTest();
  }
  // 创建一个Observable,它将开始监听地理位置更新
  // 消费者订阅时
  geolocationTest() {
    const locations = new Observable((observer) => {
      let watchId: number;
      if ('geolocation' in navigator) {
        watchId = navigator.geolocation.watchPosition((position) => {
          observer.next(position);
        }, (error: PositionError) => {
          observer.error(error);
        });
      } else {
        observer.error('Geolocation not available');
      }
      // 当使用者取消订阅时,请清理数据以备下次订阅。
      return {
        unsubscribe() {
          navigator.geolocation.clearWatch(watchId);
        }
      };
    });
    let index = this.num;
    // 调用subscription()开始监听更新。
    const locationsSubscription = locations.subscribe({
      next(position) {
        console.log(index);
        console.log('Current Position: ', position);
      },
      error(msg) {
        console.log('Error Getting Location: ', msg);
      }
    });
    this.num += 1;
    // 10秒后停止监听位置信息
    setTimeout(() => {
      locationsSubscription.unsubscribe();
    }, 10000);
  }
}
复制代码


打开页面后,点击按钮,观察控制台输出情况。


4.jpg


我们定义的事件方法监听了 10s,即不再对地理位置的更新进行监听,所以很长时间后控制台只输出了一条信息。


我们对上述的代码进行修改,查看页面显示效果。首先注释 setTimeout 方法,页面显示内容为:


5.jpg


从上图可以看出,该监听器一直在运行着,所以在不间断的输出位置更新信息。

但是当 setTimeout 方法有效时,无论是否存在 clearWatch 方法,该监听器都不会一直执行下去。


定义观察者


用于接收可观察对象通知的处理器要实现 Observer 接口。这个对象定义了一些回调函数来处理可观察对象可能会发来的三种通知


通知类型 说明
next 必要。用来处理每个送达值。在开始执行后可能执行零次或多次。
error 可选。用来处理错误通知。错误会中断这个可观察对象实例的执行过程。
complete 可选。用来处理执行完毕(complete)通知。当执行完毕后,这些值就会继续传给下一个处理器。

Observer 对象其实就是个包含 next、error、complete 方法的对象。比如说:


const myObserver = {
  next: x => console.log('Observer got a next value: ' + x),
  error: err => console.error('Observer got an error: ' + err),
  complete: () => console.log('Observer got a complete notification'),
};
复制代码


关于 Observer 对象有一些比较重要的原则:


  • 传入的 Observer 对象可以不实现所有规定的方法 (next、error、complete 方法)
  • complete 或者 error 触发之后再调用 next 方法是没用的
  • 调用 unsubscribe 方法后,任何方法都不能再被调用了
  • completeerror 触发后,unsubscribe 也会自动调用
  • nextcompleteerror 出现异常时,unsubscribe 也会自动调用以保证资源不会浪费
  • nextcompleteerror是可选的。按需处理即可,不必全部处理


订阅



只有当有人订阅 Observable 的实例时,它才会开始发布值。订阅时要先调用该实例的 subscribe() 方法,并把一个观察者对象传给它,用来接收通知。

为了展示订阅的原理,我们需要创建新的可观察对象。它有一个构造函数可以用来创建新实例,但是为了更简明,也可以使用 Observable 上定义的一些静态方法来创建一些常用的简单可观察对象:

  • of(…items) —— 返回一个 Observable 实例,它用同步的方式把参数中提供的这些值发送出来。
  • from(iterable) —— 把它的参数转换成一个 Observable 实例。 该方法通常用于把一个数组转换成一个(发送多个值的)可观察对象。



下面的例子会创建并订阅一个简单的可观察对象,它的观察者会把接收到的消息记录到控制台中:


import { Component, OnInit } from '@angular/core';
import { Observable,of } from 'rxjs';
@Component({
  selector: 'app-observable',
  templateUrl: '
    <h2>使用of来创建可观察对象</h2>
    <div>
      <button (click)="getData()">Click here</button>
    </div>
  '
})
export class ObservableComponent implements OnInit {
  constructor() { }
  ngOnInit(): void {
  }
  getData() {
    // Create simple observable that emits three values
    const myObservable = of(1, 2, 3);
    // Create observer object
    const myObserver = {
      next: x => console.log('Observer got a next value: ' + x),
      error: err => console.error('Observer got an error: ' + err),
      complete: () => console.log('Observer got a complete notification'),
    };
    // Execute with the observer object
    myObservable.subscribe(myObserver);
  }
}
复制代码


启动该项目,打开页面并点击按钮,出现这样的结果:


1.jpg


观察者除了单独定义之外,还可以直接同  subscribe() 方法 一起使用,下面的代码和刚才的等价:


myObservable.subscribe(
  x => console.log('Observer got a next value: ' + x),
  err => console.error('Observer got an error: ' + err),
  () => console.log('Observer got a complete notification')
);
复制代码


创建可观察对象


在上面提到可以使用 Observable 上定义的一些静态方法来创建一些常用的简单可观察对象,比如 of() 和 from() 方法。接下来我们使用 Observable 构造函数可以创建任何类型的可观察流。 当执行可观察对象的 subscribe() 方法时,这个构造函数就会把它接收到的参数作为订阅函数来运行。 订阅函数会接收一个 Observer 对象,并把值发布给观察者的 next() 方法。


比如,要创建一个与前面的 of(1, 2, 3) 等价的可观察对象,你可以这样做:


import { Component, OnInit } from '@angular/core';
import { Observable } from 'rxjs';
@Component({
  selector: 'app-observable',
  templateUrl: '
    <h2>使用Observable构造函数来创建可观察对象</h2>
    <div>
      <button (click)="getData2()">Click here</button>
    </div>
  '
})
export class ObservableComponent implements OnInit {
  constructor() { }
  ngOnInit(): void {
  }
  getData2() {
    const myObservable = new Observable((observer) => {
      observer.next(1);
      observer.next(2);
      observer.next(3);
      observer.complete();
      return { unsubscribe() { } };
    });
    myObservable.subscribe({
      next(num) { console.log(num); },
      complete() { console.log('Finished sequence'); }
    });
  }
}
复制代码


我们都知道键盘事件分为三种,包括 keydown,keypress 和 keyup,在我们之前学习普通事件的时候,在 input 标签中使用过这三个事件,如下述代码:


<input type="text" (keydown)="keydown()" (keypress)="keypress()" (keyup)="keyup()" />
复制代码


关于这三个事件的定义如下:

  • keydown:按下键盘键
  • keypress:紧接着keydown事件触发(只有按下字符键时触发)
  • keyup:释放键盘键


顺序为:keydown -> keypress ->keyup


在上述代码可以看出,如果像捕捉到不同的事件需要单独进行定义。接下来我们将对上述代码进行修改,创建一个用来发布事件的可观察对象。在这个例子中,订阅函数是用内联方式定义的。


import { Component, OnInit } from '@angular/core';
import { Observable } from 'rxjs';
@Component({
  selector: 'app-observable',
  templateUrl: '
    <h2>发布键盘事件的可观察对象</h2>
    <div>
      <input type="text" class="name" id="name" placeholder="please input data" (keydown)="inputFunc()">
    </div>
  '
})
export class ObservableComponent implements OnInit {
  constructor() { }
  ngOnInit(): void {
  }
    myObservable.subscribe({
      next(num) { console.log(num); },
      complete() { console.log('Finished sequence'); }
    });
  }
  fromEvent(target, eventName) {
    return new Observable((observer) => {
      const handler = (e) => observer.next(e);
      // Add the event handler to the target
      target.addEventListener(eventName, handler);
      return () => {
        // Detach the event handler from the target
        target.removeEventListener(eventName, handler);
      };
    });
  }
  inputFunc() {
    const ESC_KEY = 27;
    const nameInput = document.getElementById('name') as HTMLInputElement;
    const subscription = this.fromEvent(nameInput, 'keydown')
      .subscribe((e: KeyboardEvent) => {
        if (e.keyCode === ESC_KEY) {
          nameInput.value = '';
        }
      });
    const subscription2 = this.fromEvent(nameInput, 'keyup')
      .subscribe(() => {
        console.log('keyup释放键盘:' + nameInput.value);
      });
  }
}
复制代码


页面测试效果如下:


1.jpg


从结果可以看出,你可以在 inputFunc 方法中创建可发布 keydown 事件的可观察对象以及 keyup事件的可观察对象。


多播


典型的可观察对象会为每一个观察者创建一次新的、独立的执行。 当观察者进行订阅时,该可观察对象会连上一个事件处理器,并且向那个观察者发送一些值。当第二个观察者订阅时,这个可观察对象就会连上一个新的事件处理器,并独立执行一次,把这些值发送给第二个可观察对象。


有时候,不应该对每一个订阅者都独立执行一次,你可能会希望每次订阅都得到同一批值 —— 即使是那些你已经发送过的。这在某些情况下有用,比如用来发送 document 上的点击事件的可观察对象。


多播用来让可观察对象在一次执行中同时广播给多个订阅者。借助支持多播的可观察对象,你不必注册多个监听器,而是复用第一个(next)监听器,并且把值发送给各个订阅者。


来看一个从 1 到 3 进行计数的例子, 如果你订阅了两次,就会有两个独立的流 。


import { Component, OnInit } from '@angular/core';
import { Observable } from 'rxjs';
@Component({
  selector: 'app-observable',
  templateUrl: '
    <h2>使用Observable构造函数来创建可观察对象</h2>
    <div>
      <button (click)="getData2()">Click here</button>
    </div>
  '
})
export class ObservableComponent implements OnInit {
  constructor() { }
  ngOnInit(): void {
  }
  getData2() {
    const myObservable = new Observable((observer) => {
      observer.next(1);
      observer.next(2);
      observer.next(3);
      observer.complete();
      return { unsubscribe() { } };
    });
    myObservable.subscribe({
      next(num) { console.log('1st subscribe: ' + num); },
      complete() { console.log('Finished sequence'); }
    });
    myObservable.subscribe({
      next(num) { console.log('2nd subscribe: ' + num); },
      complete() { console.log('Finished sequence'); }
    });
  }
}
复制代码


页面结果如下:


1.jpg


可以发现,这两个观察者之间互不干扰,即生产者先将值发送给 observer1,再将值发送给 observer2。


修改上述代码使其支持多播,代码如下:


import { Component, OnInit } from '@angular/core';
import { Observable } from 'rxjs';
@Component({
  selector: 'app-observable',
  templateUrl: '
    <h2>使用Observable构造函数来创建可观察对象</h2>
    <div>
      <button (click)="getData2()">Click here</button>
    </div>
  '
})
export class ObservableComponent implements OnInit {
  constructor() { }
  ngOnInit(): void {
  }
    sequenceSubscriber(observer) {
    const seq = [1, 2, 3];
    let timeoutId;
    const mydate = this.date;
    // Will run through an array of numbers, emitting one value
    // per second until it gets to the end of the array.
    function doSequence(arr, idx) {
      timeoutId = setTimeout(() => {
        const dd = new Date();
        // console.log(dd.toLocaleTimeString());
        observer.next(dd.toLocaleTimeString() + '----:' + arr[idx]);
        if (idx === arr.length - 1) {
          observer.complete();
        } else {
          doSequence(arr, ++idx);
        }
      }, 1000);
    }
    doSequence(seq, 0);
    // Unsubscribe should clear the timeout to stop execution
    return {
      unsubscribe() {
        clearTimeout(timeoutId);
      }
    };
  }
  getNum() {
    const sequence = new Observable(this.sequenceSubscriber);
    // Subscribe starts the clock, and will emit after 1 second
    sequence.subscribe({
      next(num) { console.log('1st subscribe: ' + num); },
      complete() { console.log('1st sequence finished.'); }
    });
    // After 1/2 second, subscribe again.
    // setTimeout(() => {
    //   sequence.subscribe({
    //     next(num) { console.log('2nd subscribe: ' + num); },
    //     complete() { console.log('2nd sequence finished.'); }
    //   });
    // }, 500);
    sequence.subscribe({
      next(num) { console.log('2nd subscribe: ' + num); },
      complete() { console.log('2nd sequence finished.'); }
    });
}
复制代码


页面测试结果为:


1.jpg


问题记录


angular中clearTimeout不生效的问题

原因:安装了@types/node 所致

解决办法:

使用 window.setTimeout 和 window.clearTimeout。


总结


本文内容参考官方文档总结的,官方文档毕竟有深度,所以对于刚接触 Observable 对象的新人来说理解起来比较困难,所以如果各位想快速入门的话,可以参看 Angular7入门辅助教程(五)——Observable(可观察对象) ,该文通俗易懂,希望对大家有所帮助。回到本文,本文只是想按照官方文档理解一遍,对于其中的案例进行扩展,希望能帮助你更好的理解。



目录
相关文章
|
6月前
|
JavaScript 前端开发 调度
15_Rxjs
15_Rxjs
50 0
|
JavaScript 前端开发 算法
RxJS系列06:测试 Observable
RxJS系列06:测试 Observable
106 0
|
存储 JavaScript 前端开发
RxJS系列02:可观察者 Observables
RxJS系列02:可观察者 Observables
|
前端开发 API 开发工具
MobX 源码解析-observable #86
MobX 源码解析-observable #86
115 0
探秘 RxJS Observable 为什么要长成这个样子?!
我们都知道 RxJS Observable 最基础的使用方法:是建立 Observable,即调用 .create API
|
前端开发 JavaScript API
继续解惑,异步处理 —— RxJS Observable
Observable 可观察对象是开辟一个连续的通信通道给观察者 Observer,彼此之前形成一种关系,而这种关系需要由 Subscription 来确立,而在整个通道中允许对数据进行转换我们称为操作符 Operator。
rxJava中 Subscriber 与Observer
rxJava中 Subscriber 与Observer
216 0
|
JavaScript 前端开发 调度
你会用RxJS吗?【初识 RxJS中的Observable和Observer】
概念 RxJS是一个库,可以使用可观察队列来编写异步和基于事件的程序的库。 RxJS 中管理和解决异步事件的几个关键点: Observable: 表示未来值或事件的可调用集合的概念。 Observer: 是一个回调集合,它知道如何监听 Observable 传递的值。 Subscription: 表示一个 Observable 的执行,主要用于取消执行。 Operators:** 是纯函数,可以使用函数式编程风格来处理具有map、filter、concat、reduce等操作的集合。
146 0
|
前端开发 JavaScript API
Rxjs源码解析(一)Observable
学习一个库最好的方法就是看其源码,理解其 api 的调用原理,用起来自然也就很清楚自己到底在干什么了,秉持着此观念,为了更好地理解 rxjs,抽空将其源码看了一遍 本文章不会刻意涉及概念性的东西,主线就是解读源码,并在恰当的时候给出一些小例子,源码基于 rxjs v7.4.0 版本
334 0
|
存储 搜索推荐 开发工具
Rxjava源码解析笔记 | 创建Observable 与 Observer/Subscriber 以及之间订阅实现的源码分析
一篇关于Rxjava源码解析笔记,内容主要是 创建Observable 与 Observer/Subscriber 以及之间订阅实现的源码分析