Rxjs源码解析(一)Observable

本文涉及的产品
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
简介: 学习一个库最好的方法就是看其源码,理解其 api 的调用原理,用起来自然也就很清楚自己到底在干什么了,秉持着此观念,为了更好地理解 rxjs,抽空将其源码看了一遍本文章不会刻意涉及概念性的东西,主线就是解读源码,并在恰当的时候给出一些小例子,源码基于 rxjs v7.4.0 版本

从 new Observable 开始
import { Observable } from 'rxjs'

const observable = new Observable(subscriber => {
subscriber.next(1)
subscriber.next(2)
subscriber.complete()
})
observable.subscribe({
next: data => console.log('next data:', data),
complete: () => {

console.log('complete')

}
})

输出如下:
// 开始输出
next data: 1
next data: 2
complete
// 结束输出

通过 new Observable() 方法创建了一个可观察对象 observable,然后通过 subscribe 方法订阅这个observable,订阅的时候会执行在 new Observable时候传入的函数参数,那么就来看下 new Observable到底做了什么
// /src/internal/Observable.ts
export class Observable implements Subscribable {
// ...
constructor(subscribe?: (this: Observable, subscriber: Subscriber) => TeardownLogic) {

if (subscribe) {
  this._subscribe = subscribe;
}

}
// ...
}

Observable的初始化方法很简单,就是将回调函数绑定到实例的 _subscribe属性上
subscribe
Observable 实现 (implements) 了 Subscribable(订阅)接口
// /src/internal/types.ts
export interface Subscribable {
subscribe(observer: Partial<Observer>): Unsubscribable;
}

这个 subscribe正是下一步要用于订阅的方法,在当前版本中 subscribe的方法签名有三个,三个只是传参形式不同,最终都会处理成相同的对象,着重看第一个
subscribe(observer?: Partial<Observer>): Subscription;

对于第一个签名,接收的参数与Observer接口相关,这个接口有三个方法属性
export interface Observer {
next: (value: T) => void;
error: (err: any) => void;
complete: () => void;
}

subscribe可以是一个对象,这个对象包含三个方法属性 next、error、complete,当你不关心 error 和 complete 这两个属性的时候,那么可以按照第二个函数签名直接传入一个方法,这个方法就默认代表 next 方法属性
进入 subscribe方法
subscribe(
observerOrNext?: Partial<Observer> | ((value: T) => void) | null,
error?: ((error: any) => void) | null,
complete?: (() => void) | null
): Subscription {
const subscriber = isSubscriber(observerOrNext) ? observerOrNext : new SafeSubscriber(observerOrNext, error, complete);
errorContext(() => {

// ...

});
return subscriber;
}

subscribe的第一个参数可以是一个 subscriber(具有 next、error、complete三个属性,所以类型合法),不过这种传参形式一般都是库内部使用,我们正常写法还是传入一个纯粹的对象或者方法,那么就意味着会执行 new SafeSubscriber(observerOrNext, error, complete)
// node_modules/rxjs/src/internal/Subscriber.ts
export class SafeSubscriber extends Subscriber {
// ...
}

SafeSubscriber继承了 Subscriber,主要作用是对 next、error、complete 三个方法属性进行了一层封装,保证能够更好地进行错误处理
subscriber.add(
operator

? // We're dealing with a subscription in the
  // operator chain to one of our lifted operators.
  operator.call(subscriber, source)
: source
? // If `source` has a value, but `operator` does not, something that
  // had intimate knowledge of our API, like our `Subject`, must have
  // set it. We're going to just call `_subscribe` directly.
  this._subscribe(subscriber)
: // In all other cases, we're likely wrapping a user-provided initializer
  // function, so we need to catch errors and handle them appropriately.
  this._trySubscribe(subscriber)

);

errorContext也是一个错误处理的包装方法,里面只调用了一个 subscriber.add方法,这个方法的参数用了两个嵌套的三元表达式。
rxjs内置的众多操作符(operator) 会调用 Observable,这个场景下,this.operator就有值了,所以如果是操作符调用,就会走 operator.call(subscriber, source);rxjs内部的一些 Subject在某些情况下会执行到第二个逻辑 this._subscribe(subscriber);其他情况(即开发者正常使用的情况)会执行 this._trySubscribe(subscriber),前两个涉及到 operator 和 Subject,而且最终的大概流程跟直接执行第三个是差不多的,所以这里只看第三个
this._subscribe 就是在最开始 new Observable的时候传入的参数,所以只要有订阅操作(subscribe),就会执行这个方法
protected _trySubscribe(sink: Subscriber): TeardownLogic {
try {

return this._subscribe(sink);

} catch (err) {

// We don't need to return anything in this case,
// because it's just going to try to `add()` to a subscription
// above.
sink.error(err);

}
}

而在本文的例子里,new Observable的函数参数里,调用了 subscriber.next 和 subscriber.complete
protected _next(value: T): void {
this.destination.next(value);
}
protected _error(err: any): void {
try {

this.destination.error(err);

} finally {

this.unsubscribe();

}
}
protected _complete(): void {
try {

this.destination.complete();

} finally {

this.unsubscribe();

}
}

this.destination 这个对象,在 new SafeSubscriber的时候,被设置了 next、error、complete三个方法属性,就是订阅的时候传入的三个自定义方法,在这里调用到了
// 简化后的代码
subscriber.add(this._trySubscribe(subscriber));

这个是为了收集 teardown,也就是订阅取消(unsubscribe)的时候执行的收尾/清理方法,比如在订阅里启动了一个轮询方法,那么结束订阅的时候,你想同时也取消掉这个轮询逻辑,那么就可以在 new Observable 的方法体里,最后返回一个取消轮询的方法,那么在 unsubscribe 的时候就会自动调用这个 teardown方法执行你定义的取消轮询逻辑,类似于 React.useEffect 最后返回的那个方法
add(teardown: TeardownLogic): void {
// Only add the teardown if it's not undefined
// and don't add a subscription to itself.
if (teardown && teardown !== this) {

if (this.closed) {
  // If this subscription is already closed,
  // execute whatever teardown is handed to it automatically.
  execTeardown(teardown);
} else {
  if (teardown instanceof Subscription) {
    // We don't add closed subscriptions, and we don't add the same subscription
    // twice. Subscription unsubscribe is idempotent.
    if (teardown.closed || teardown._hasParent(this)) {
      return;
    }
    teardown._addParent(this);
  }
  (this._teardowns = this._teardowns ?? []).push(teardown);
}

}
}

this.closed的值用于标识当前 subscription 是否已经取消订阅了(complete、error、unsubscribe都会将此值置为 true),this._teardowns就是用于存放与当前 subscription所有有关的 teardown,可以看到,teardown 除了是一个自定义的清理方法外,还可以是一个 Subscription
一个 subscription(称为父 subscription)可以通过 add 连接到另外一个 subscription(称为子 subscription),那么在父 subscription 调用 unsubscribe方法取消订阅的时候,由于会执行 this._teardowns 里所有的方法,也就会调用子 subscription 的 unsubscribe,取消其下所有子孙 subscription 的订阅
这种关系看起来是一种父子关系,所以通过私有属性 _parentage 来标明这种关系,作用是避免 B subscription 被同一个 subscription重复订阅的问题,Subscription 里定义了几个方法用于管理 _parentage 的数据,例如 _hasParent、_addParent、_removeParent
const observable1 = interval(100)
const observable2 = interval(200)

const subscription1 = observable1.subscribe(x => console.log('first: ' + x))
const subscription2 = observable2.subscribe(x => console.log('second: ' + x))

subscription2.add(subscription1)
setTimeout(() => {
subscription2.unsubscribe()
}, 400)

上述代码中,subscription2通过 add 方法连接到了 subscription1,那么在 subscription2 调用 unsubscribe的时候,也会同时执行 subscription1的 unsubscribe,所以输出为
// 开始输出
first: 0
first: 1
second: 0
first: 2
first: 3
second: 1
// 结束输出

unsubscribe
有订阅就有取消订阅,unsubscribe主要用作执行一些清理动作,例如执行在 subscribe 的时候收集到的 teardown,以及更新 _parentage 的数据
// node_modules/rxjs/src/internal/Subscription.ts
unsubscribe(): void {
// ...
const { _parentage } = this;
if (_parentage) {

// 更新 _parentage

}

const { initialTeardown } = this;
if (isFunction(initialTeardown)) {

// 执行 initialTeardown

}

const { _teardowns } = this;
if (_teardowns) {

// ...
// 执行 teardown

}
// ...
}

这里有个 initialTeardown 方法,可以理解为 Subscription 取消订阅时会执行的函数,作为使用者一般不需要关心这个,库内部会使用到
const subscription = new Subscription(() => {
console.log('取消订阅时执行 initialTeardown')
})
const observable = new Observable(subscribe => {
subscribe.next(1)
return subscription
})
const subscription1 = observable.subscribe(d => console.log(d))
subscription1.unsubscribe()
// 开始输出
// 1
// 取消订阅时执行 initialTeardown
// 结束输出

至此,由文章开头例子所引申出来的源码逻辑都看完了,关于 Subscription的也看得差不多,再回头看看 Observable中没提到的地方
lift
lift(operator?: Operator<T, R>): Observable {
const observable = new Observable();
observable.source = this;
observable.operator = operator;
return observable;
}

lift通过 new Observable返回新的 observable,并且标记了 source 和 operator,这是为了方便链式操作,在当前版本中,官方已经不建议开发者直接调用这个方法了,主要是供给 rxjs内部众多的 operators 使用
forEach
forEach(next: (value: T) => void, promiseCtor?: PromiseConstructorLike): Promise {
promiseCtor = getPromiseCtor(promiseCtor);

return new promiseCtor((resolve, reject) => {

// Must be declared in a separate statement to avoid a ReferenceError when
// accessing subscription below in the closure due to Temporal Dead Zone.
let subscription: Subscription;
subscription = this.subscribe(
  (value) => {
    try {
      next(value);
    } catch (err) {
      reject(err);
      subscription?.unsubscribe();
    }
  },
  reject,
  resolve
);

}) as Promise;
}

getPromiseCtor 可以理解为 js 中的 Promise 对象,主要看调用 this.subscribe 这一句
subscribe(next?: ((value: T) => void) | null, error?: ((error: any) => void) | null, complete?: (() => void) | null): Subscription;

subscribe 的函数定义前面已经看过了,这里调用 subscribe 传入的三个参数与 next、error、complete一一对应,next 会持续调用直到 complete 执行,这个 promise才算是结束了,所以如果你想要使用这个方法,就必须确保所使用的 observable 最终会调用 complete 方法,否则意味着 promise 不会结束,forEach也就一直处于 hung up 的状态
一般情况下,我们是不会使用到这个方法的,因为很多需要 forEach的场景完全可以用操作符来代替,比如针对forEach源码中给的一个使用例子
import { interval } from 'rxjs';
import { take } from 'rxjs/operators';

const source$ = interval(1000).pipe(take(4));
async function getTotal() {
let total = 0;
await source$.forEach(value => {

 total += value;
 console.log('observable -> ', value);

});
return total;
}
getTotal().then(
total => console.log('Total:', total)
)

如果用 reduce 操作符来实现会更加直观
import { interval } from 'rxjs';
import { reduce } from 'rxjs/operators';

const source$ = interval(1000).pipe(take(4));
source$.pipe(
reduce((acc, value) => {

console.log('observable -> ', value);
return acc + value;

}, 0)
).subscribe(total => console.log('Total:', total));

pipe
pipe 的类型签名很多,实际上是为了辅助类型的自动推导,只要 pipe传入的参数数量在 9 个及以内,则就可以正确推导出类型,而一旦超过 9个,自动推导就失效了,必须使用者自己指定类型
// node_modules/rxjs/src/internal/Observable.ts
pipe(...operations: OperatorFunction<any, any>[]): Observable {
return pipeFromArray(operations)(this);
}

// node_modules/rxjs/src/internal/util/identity.ts
export function identity(x: T): T {
return x;
}

// node_modules/rxjs/src/internal/util/pipe.ts
/* @internal /
export function pipeFromArray<T, R>(fns: Array<UnaryFunction<T, R>>): UnaryFunction<T, R> {
if (fns.length === 0) {

return identity as UnaryFunction<any, any>;

}

if (fns.length === 1) {

return fns[0];

}

return function piped(input: T): R {

return fns.reduce((prev: any, fn: UnaryFunction<T, R>) => fn(prev), input as any);

};
}

pipe 调用了 pipeFromArray,pipeFromArray的参数 fns 即所有传入 pipe的参数,也就是操作符 operator
如果没有传入任何操作符方法,则直接返回 Observable 对象;如果只传入了一个操作符方法,则直接返回该操作符方法,否则返回一个函数,将在函数体里通过reduce方法依次执行所有的操作符,执行的逻辑是将上一个操作符方法返回的值作为下一个操作符的参数,就像是一个管道串联起了所有的操作符,这里借鉴了函数式编程的思想,通过一个 pipe 函数将函数组合起来,上一个函数的输出成为下一个函数的输入参数
最后,不管是传入了几个操作符,最终返回的都是一个 Observable 的实例,所以可以接着调用 subscribe 方法
toPromise
// node_modules/rxjs/src/internal/Observable.ts
toPromise(promiseCtor?: PromiseConstructorLike): Promise {
promiseCtor = getPromiseCtor(promiseCtor);

return new promiseCtor((resolve, reject) => {

let value: T | undefined;
this.subscribe(
  (x: T) => (value = x),
  (err: any) => reject(err),
  () => resolve(value)
);

}) as Promise;
}

toPromise 方法跟上面提到的 forEach的实现很相似,将一个 Observable 对象转换成了一个 Promise 对象,会在 .then的时候返回这个 Observable最后一个值,这个方法已经被标记为 deprecated了,将会在 v8.x 中被移除,并且作者在源码注释里建议我们使用 firstValueFrom 和 lastValueFrom 来代替这个方法
const source$ = interval(100).pipe(take(4))
source$.toPromise().then(total => console.log(total))

// 相当于
const source$ = interval(100).pipe(take(4))
lastValueFrom(source$).then(total => console.log(total))

// 输出
// 3

用法上看着好像区别不大,实际上 lastValueFrom 的实现和 toPromise 也差不多,但从方法名上来说显然更加语义化
// node_modules/rxjs/src/internal/lastValueFrom.ts
export function lastValueFrom<T, D>(source: Observable, config?: LastValueFromConfig): Promise {
const hasConfig = typeof config === 'object';
return new Promise((resolve, reject) => {

let _hasValue = false;
let _value: T;
source.subscribe({
  next: (value) => {
    _value = value;
    _hasValue = true;
  },
  error: reject,
  complete: () => {
    if (_hasValue) {
      resolve(_value);
    } else if (hasConfig) {
      resolve(config!.defaultValue);
    } else {
      reject(new EmptyError());
    }
  },
});

});
}

小结
Observable、Subscription 部分的代码还是比较简单的,并没有什么七拐八拐的逻辑,官方源码中的注释也非常详细(甚至在注释里写 example),简直就是在文档里写代码,再加上 ts的助攻,可以说源码看起来没啥难度,当然了,这只是 rxjs 系统中两个最基础的概念,一般情况下使用 rxjs 是不会用到这两个概念的,Subject 和 operators 才是常客

相关文章
|
2月前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
87 2
|
12天前
|
存储 设计模式 算法
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
行为型模式用于描述程序在运行时复杂的流程控制,即描述多个类或对象之间怎样相互协作共同完成单个对象都无法单独完成的任务,它涉及算法与对象间职责的分配。行为型模式分为类行为模式和对象行为模式,前者采用继承机制来在类间分派行为,后者采用组合或聚合在对象间分配行为。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象行为模式比类行为模式具有更大的灵活性。 行为型模式分为: • 模板方法模式 • 策略模式 • 命令模式 • 职责链模式 • 状态模式 • 观察者模式 • 中介者模式 • 迭代器模式 • 访问者模式 • 备忘录模式 • 解释器模式
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
|
12天前
|
设计模式 存储 安全
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
结构型模式描述如何将类或对象按某种布局组成更大的结构。它分为类结构型模式和对象结构型模式,前者采用继承机制来组织接口和类,后者釆用组合或聚合来组合对象。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象结构型模式比类结构型模式具有更大的灵活性。 结构型模式分为以下 7 种: • 代理模式 • 适配器模式 • 装饰者模式 • 桥接模式 • 外观模式 • 组合模式 • 享元模式
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
|
12天前
|
设计模式 存储 安全
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
创建型模式的主要关注点是“怎样创建对象?”,它的主要特点是"将对象的创建与使用分离”。这样可以降低系统的耦合度,使用者不需要关注对象的创建细节。创建型模式分为5种:单例模式、工厂方法模式抽象工厂式、原型模式、建造者模式。
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
|
2月前
|
缓存 监控 Java
Java线程池提交任务流程底层源码与源码解析
【11月更文挑战第30天】嘿,各位技术爱好者们,今天咱们来聊聊Java线程池提交任务的底层源码与源码解析。作为一个资深的Java开发者,我相信你一定对线程池并不陌生。线程池作为并发编程中的一大利器,其重要性不言而喻。今天,我将以对话的方式,带你一步步深入线程池的奥秘,从概述到功能点,再到背景和业务点,最后到底层原理和示例,让你对线程池有一个全新的认识。
57 12
|
1月前
|
PyTorch Shell API
Ascend Extension for PyTorch的源码解析
本文介绍了Ascend对PyTorch代码的适配过程,包括源码下载、编译步骤及常见问题,详细解析了torch-npu编译后的文件结构和三种实现昇腾NPU算子调用的方式:通过torch的register方式、定义算子方式和API重定向映射方式。这对于开发者理解和使用Ascend平台上的PyTorch具有重要指导意义。
|
13天前
|
安全 搜索推荐 数据挖掘
陪玩系统源码开发流程解析,成品陪玩系统源码的优点
我们自主开发的多客陪玩系统源码,整合了市面上主流陪玩APP功能,支持二次开发。该系统适用于线上游戏陪玩、语音视频聊天、心理咨询等场景,提供用户注册管理、陪玩者资料库、预约匹配、实时通讯、支付结算、安全隐私保护、客户服务及数据分析等功能,打造综合性社交平台。随着互联网技术发展,陪玩系统正成为游戏爱好者的新宠,改变游戏体验并带来新的商业模式。
|
3月前
|
缓存 Java 程序员
Map - LinkedHashSet&Map源码解析
Map - LinkedHashSet&Map源码解析
87 0
|
3月前
|
算法 Java 容器
Map - HashSet & HashMap 源码解析
Map - HashSet & HashMap 源码解析
68 0
|
3月前
|
存储 Java C++
Collection-PriorityQueue源码解析
Collection-PriorityQueue源码解析
74 0

推荐镜像

更多