Rxjs源码解析(一)Observable

本文涉及的产品
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
简介: 学习一个库最好的方法就是看其源码,理解其 api 的调用原理,用起来自然也就很清楚自己到底在干什么了,秉持着此观念,为了更好地理解 rxjs,抽空将其源码看了一遍本文章不会刻意涉及概念性的东西,主线就是解读源码,并在恰当的时候给出一些小例子,源码基于 rxjs v7.4.0 版本

从 new Observable 开始
import { Observable } from 'rxjs'

const observable = new Observable(subscriber => {
subscriber.next(1)
subscriber.next(2)
subscriber.complete()
})
observable.subscribe({
next: data => console.log('next data:', data),
complete: () => {

console.log('complete')

}
})

输出如下:
// 开始输出
next data: 1
next data: 2
complete
// 结束输出

通过 new Observable() 方法创建了一个可观察对象 observable,然后通过 subscribe 方法订阅这个observable,订阅的时候会执行在 new Observable时候传入的函数参数,那么就来看下 new Observable到底做了什么
// /src/internal/Observable.ts
export class Observable implements Subscribable {
// ...
constructor(subscribe?: (this: Observable, subscriber: Subscriber) => TeardownLogic) {

if (subscribe) {
  this._subscribe = subscribe;
}

}
// ...
}

Observable的初始化方法很简单,就是将回调函数绑定到实例的 _subscribe属性上
subscribe
Observable 实现 (implements) 了 Subscribable(订阅)接口
// /src/internal/types.ts
export interface Subscribable {
subscribe(observer: Partial<Observer>): Unsubscribable;
}

这个 subscribe正是下一步要用于订阅的方法,在当前版本中 subscribe的方法签名有三个,三个只是传参形式不同,最终都会处理成相同的对象,着重看第一个
subscribe(observer?: Partial<Observer>): Subscription;

对于第一个签名,接收的参数与Observer接口相关,这个接口有三个方法属性
export interface Observer {
next: (value: T) => void;
error: (err: any) => void;
complete: () => void;
}

subscribe可以是一个对象,这个对象包含三个方法属性 next、error、complete,当你不关心 error 和 complete 这两个属性的时候,那么可以按照第二个函数签名直接传入一个方法,这个方法就默认代表 next 方法属性
进入 subscribe方法
subscribe(
observerOrNext?: Partial<Observer> | ((value: T) => void) | null,
error?: ((error: any) => void) | null,
complete?: (() => void) | null
): Subscription {
const subscriber = isSubscriber(observerOrNext) ? observerOrNext : new SafeSubscriber(observerOrNext, error, complete);
errorContext(() => {

// ...

});
return subscriber;
}

subscribe的第一个参数可以是一个 subscriber(具有 next、error、complete三个属性,所以类型合法),不过这种传参形式一般都是库内部使用,我们正常写法还是传入一个纯粹的对象或者方法,那么就意味着会执行 new SafeSubscriber(observerOrNext, error, complete)
// node_modules/rxjs/src/internal/Subscriber.ts
export class SafeSubscriber extends Subscriber {
// ...
}

SafeSubscriber继承了 Subscriber,主要作用是对 next、error、complete 三个方法属性进行了一层封装,保证能够更好地进行错误处理
subscriber.add(
operator

? // We're dealing with a subscription in the
  // operator chain to one of our lifted operators.
  operator.call(subscriber, source)
: source
? // If `source` has a value, but `operator` does not, something that
  // had intimate knowledge of our API, like our `Subject`, must have
  // set it. We're going to just call `_subscribe` directly.
  this._subscribe(subscriber)
: // In all other cases, we're likely wrapping a user-provided initializer
  // function, so we need to catch errors and handle them appropriately.
  this._trySubscribe(subscriber)

);

errorContext也是一个错误处理的包装方法,里面只调用了一个 subscriber.add方法,这个方法的参数用了两个嵌套的三元表达式。
rxjs内置的众多操作符(operator) 会调用 Observable,这个场景下,this.operator就有值了,所以如果是操作符调用,就会走 operator.call(subscriber, source);rxjs内部的一些 Subject在某些情况下会执行到第二个逻辑 this._subscribe(subscriber);其他情况(即开发者正常使用的情况)会执行 this._trySubscribe(subscriber),前两个涉及到 operator 和 Subject,而且最终的大概流程跟直接执行第三个是差不多的,所以这里只看第三个
this._subscribe 就是在最开始 new Observable的时候传入的参数,所以只要有订阅操作(subscribe),就会执行这个方法
protected _trySubscribe(sink: Subscriber): TeardownLogic {
try {

return this._subscribe(sink);

} catch (err) {

// We don't need to return anything in this case,
// because it's just going to try to `add()` to a subscription
// above.
sink.error(err);

}
}

而在本文的例子里,new Observable的函数参数里,调用了 subscriber.next 和 subscriber.complete
protected _next(value: T): void {
this.destination.next(value);
}
protected _error(err: any): void {
try {

this.destination.error(err);

} finally {

this.unsubscribe();

}
}
protected _complete(): void {
try {

this.destination.complete();

} finally {

this.unsubscribe();

}
}

this.destination 这个对象,在 new SafeSubscriber的时候,被设置了 next、error、complete三个方法属性,就是订阅的时候传入的三个自定义方法,在这里调用到了
// 简化后的代码
subscriber.add(this._trySubscribe(subscriber));

这个是为了收集 teardown,也就是订阅取消(unsubscribe)的时候执行的收尾/清理方法,比如在订阅里启动了一个轮询方法,那么结束订阅的时候,你想同时也取消掉这个轮询逻辑,那么就可以在 new Observable 的方法体里,最后返回一个取消轮询的方法,那么在 unsubscribe 的时候就会自动调用这个 teardown方法执行你定义的取消轮询逻辑,类似于 React.useEffect 最后返回的那个方法
add(teardown: TeardownLogic): void {
// Only add the teardown if it's not undefined
// and don't add a subscription to itself.
if (teardown && teardown !== this) {

if (this.closed) {
  // If this subscription is already closed,
  // execute whatever teardown is handed to it automatically.
  execTeardown(teardown);
} else {
  if (teardown instanceof Subscription) {
    // We don't add closed subscriptions, and we don't add the same subscription
    // twice. Subscription unsubscribe is idempotent.
    if (teardown.closed || teardown._hasParent(this)) {
      return;
    }
    teardown._addParent(this);
  }
  (this._teardowns = this._teardowns ?? []).push(teardown);
}

}
}

this.closed的值用于标识当前 subscription 是否已经取消订阅了(complete、error、unsubscribe都会将此值置为 true),this._teardowns就是用于存放与当前 subscription所有有关的 teardown,可以看到,teardown 除了是一个自定义的清理方法外,还可以是一个 Subscription
一个 subscription(称为父 subscription)可以通过 add 连接到另外一个 subscription(称为子 subscription),那么在父 subscription 调用 unsubscribe方法取消订阅的时候,由于会执行 this._teardowns 里所有的方法,也就会调用子 subscription 的 unsubscribe,取消其下所有子孙 subscription 的订阅
这种关系看起来是一种父子关系,所以通过私有属性 _parentage 来标明这种关系,作用是避免 B subscription 被同一个 subscription重复订阅的问题,Subscription 里定义了几个方法用于管理 _parentage 的数据,例如 _hasParent、_addParent、_removeParent
const observable1 = interval(100)
const observable2 = interval(200)

const subscription1 = observable1.subscribe(x => console.log('first: ' + x))
const subscription2 = observable2.subscribe(x => console.log('second: ' + x))

subscription2.add(subscription1)
setTimeout(() => {
subscription2.unsubscribe()
}, 400)

上述代码中,subscription2通过 add 方法连接到了 subscription1,那么在 subscription2 调用 unsubscribe的时候,也会同时执行 subscription1的 unsubscribe,所以输出为
// 开始输出
first: 0
first: 1
second: 0
first: 2
first: 3
second: 1
// 结束输出

unsubscribe
有订阅就有取消订阅,unsubscribe主要用作执行一些清理动作,例如执行在 subscribe 的时候收集到的 teardown,以及更新 _parentage 的数据
// node_modules/rxjs/src/internal/Subscription.ts
unsubscribe(): void {
// ...
const { _parentage } = this;
if (_parentage) {

// 更新 _parentage

}

const { initialTeardown } = this;
if (isFunction(initialTeardown)) {

// 执行 initialTeardown

}

const { _teardowns } = this;
if (_teardowns) {

// ...
// 执行 teardown

}
// ...
}

这里有个 initialTeardown 方法,可以理解为 Subscription 取消订阅时会执行的函数,作为使用者一般不需要关心这个,库内部会使用到
const subscription = new Subscription(() => {
console.log('取消订阅时执行 initialTeardown')
})
const observable = new Observable(subscribe => {
subscribe.next(1)
return subscription
})
const subscription1 = observable.subscribe(d => console.log(d))
subscription1.unsubscribe()
// 开始输出
// 1
// 取消订阅时执行 initialTeardown
// 结束输出

至此,由文章开头例子所引申出来的源码逻辑都看完了,关于 Subscription的也看得差不多,再回头看看 Observable中没提到的地方
lift
lift(operator?: Operator<T, R>): Observable {
const observable = new Observable();
observable.source = this;
observable.operator = operator;
return observable;
}

lift通过 new Observable返回新的 observable,并且标记了 source 和 operator,这是为了方便链式操作,在当前版本中,官方已经不建议开发者直接调用这个方法了,主要是供给 rxjs内部众多的 operators 使用
forEach
forEach(next: (value: T) => void, promiseCtor?: PromiseConstructorLike): Promise {
promiseCtor = getPromiseCtor(promiseCtor);

return new promiseCtor((resolve, reject) => {

// Must be declared in a separate statement to avoid a ReferenceError when
// accessing subscription below in the closure due to Temporal Dead Zone.
let subscription: Subscription;
subscription = this.subscribe(
  (value) => {
    try {
      next(value);
    } catch (err) {
      reject(err);
      subscription?.unsubscribe();
    }
  },
  reject,
  resolve
);

}) as Promise;
}

getPromiseCtor 可以理解为 js 中的 Promise 对象,主要看调用 this.subscribe 这一句
subscribe(next?: ((value: T) => void) | null, error?: ((error: any) => void) | null, complete?: (() => void) | null): Subscription;

subscribe 的函数定义前面已经看过了,这里调用 subscribe 传入的三个参数与 next、error、complete一一对应,next 会持续调用直到 complete 执行,这个 promise才算是结束了,所以如果你想要使用这个方法,就必须确保所使用的 observable 最终会调用 complete 方法,否则意味着 promise 不会结束,forEach也就一直处于 hung up 的状态
一般情况下,我们是不会使用到这个方法的,因为很多需要 forEach的场景完全可以用操作符来代替,比如针对forEach源码中给的一个使用例子
import { interval } from 'rxjs';
import { take } from 'rxjs/operators';

const source$ = interval(1000).pipe(take(4));
async function getTotal() {
let total = 0;
await source$.forEach(value => {

 total += value;
 console.log('observable -> ', value);

});
return total;
}
getTotal().then(
total => console.log('Total:', total)
)

如果用 reduce 操作符来实现会更加直观
import { interval } from 'rxjs';
import { reduce } from 'rxjs/operators';

const source$ = interval(1000).pipe(take(4));
source$.pipe(
reduce((acc, value) => {

console.log('observable -> ', value);
return acc + value;

}, 0)
).subscribe(total => console.log('Total:', total));

pipe
pipe 的类型签名很多,实际上是为了辅助类型的自动推导,只要 pipe传入的参数数量在 9 个及以内,则就可以正确推导出类型,而一旦超过 9个,自动推导就失效了,必须使用者自己指定类型
// node_modules/rxjs/src/internal/Observable.ts
pipe(...operations: OperatorFunction<any, any>[]): Observable {
return pipeFromArray(operations)(this);
}

// node_modules/rxjs/src/internal/util/identity.ts
export function identity(x: T): T {
return x;
}

// node_modules/rxjs/src/internal/util/pipe.ts
/* @internal /
export function pipeFromArray<T, R>(fns: Array<UnaryFunction<T, R>>): UnaryFunction<T, R> {
if (fns.length === 0) {

return identity as UnaryFunction<any, any>;

}

if (fns.length === 1) {

return fns[0];

}

return function piped(input: T): R {

return fns.reduce((prev: any, fn: UnaryFunction<T, R>) => fn(prev), input as any);

};
}

pipe 调用了 pipeFromArray,pipeFromArray的参数 fns 即所有传入 pipe的参数,也就是操作符 operator
如果没有传入任何操作符方法,则直接返回 Observable 对象;如果只传入了一个操作符方法,则直接返回该操作符方法,否则返回一个函数,将在函数体里通过reduce方法依次执行所有的操作符,执行的逻辑是将上一个操作符方法返回的值作为下一个操作符的参数,就像是一个管道串联起了所有的操作符,这里借鉴了函数式编程的思想,通过一个 pipe 函数将函数组合起来,上一个函数的输出成为下一个函数的输入参数
最后,不管是传入了几个操作符,最终返回的都是一个 Observable 的实例,所以可以接着调用 subscribe 方法
toPromise
// node_modules/rxjs/src/internal/Observable.ts
toPromise(promiseCtor?: PromiseConstructorLike): Promise {
promiseCtor = getPromiseCtor(promiseCtor);

return new promiseCtor((resolve, reject) => {

let value: T | undefined;
this.subscribe(
  (x: T) => (value = x),
  (err: any) => reject(err),
  () => resolve(value)
);

}) as Promise;
}

toPromise 方法跟上面提到的 forEach的实现很相似,将一个 Observable 对象转换成了一个 Promise 对象,会在 .then的时候返回这个 Observable最后一个值,这个方法已经被标记为 deprecated了,将会在 v8.x 中被移除,并且作者在源码注释里建议我们使用 firstValueFrom 和 lastValueFrom 来代替这个方法
const source$ = interval(100).pipe(take(4))
source$.toPromise().then(total => console.log(total))

// 相当于
const source$ = interval(100).pipe(take(4))
lastValueFrom(source$).then(total => console.log(total))

// 输出
// 3

用法上看着好像区别不大,实际上 lastValueFrom 的实现和 toPromise 也差不多,但从方法名上来说显然更加语义化
// node_modules/rxjs/src/internal/lastValueFrom.ts
export function lastValueFrom<T, D>(source: Observable, config?: LastValueFromConfig): Promise {
const hasConfig = typeof config === 'object';
return new Promise((resolve, reject) => {

let _hasValue = false;
let _value: T;
source.subscribe({
  next: (value) => {
    _value = value;
    _hasValue = true;
  },
  error: reject,
  complete: () => {
    if (_hasValue) {
      resolve(_value);
    } else if (hasConfig) {
      resolve(config!.defaultValue);
    } else {
      reject(new EmptyError());
    }
  },
});

});
}

小结
Observable、Subscription 部分的代码还是比较简单的,并没有什么七拐八拐的逻辑,官方源码中的注释也非常详细(甚至在注释里写 example),简直就是在文档里写代码,再加上 ts的助攻,可以说源码看起来没啥难度,当然了,这只是 rxjs 系统中两个最基础的概念,一般情况下使用 rxjs 是不会用到这两个概念的,Subject 和 operators 才是常客

相关文章
|
1月前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
71 2
|
17天前
|
PyTorch Shell API
Ascend Extension for PyTorch的源码解析
本文介绍了Ascend对PyTorch代码的适配过程,包括源码下载、编译步骤及常见问题,详细解析了torch-npu编译后的文件结构和三种实现昇腾NPU算子调用的方式:通过torch的register方式、定义算子方式和API重定向映射方式。这对于开发者理解和使用Ascend平台上的PyTorch具有重要指导意义。
|
22天前
|
缓存 监控 Java
Java线程池提交任务流程底层源码与源码解析
【11月更文挑战第30天】嘿,各位技术爱好者们,今天咱们来聊聊Java线程池提交任务的底层源码与源码解析。作为一个资深的Java开发者,我相信你一定对线程池并不陌生。线程池作为并发编程中的一大利器,其重要性不言而喻。今天,我将以对话的方式,带你一步步深入线程池的奥秘,从概述到功能点,再到背景和业务点,最后到底层原理和示例,让你对线程池有一个全新的认识。
50 12
|
1月前
|
存储 安全 Linux
Golang的GMP调度模型与源码解析
【11月更文挑战第11天】GMP 调度模型是 Go 语言运行时系统的核心部分,用于高效管理和调度大量协程(goroutine)。它通过少量的操作系统线程(M)和逻辑处理器(P)来调度大量的轻量级协程(G),从而实现高性能的并发处理。GMP 模型通过本地队列和全局队列来减少锁竞争,提高调度效率。在 Go 源码中,`runtime.h` 文件定义了关键数据结构,`schedule()` 和 `findrunnable()` 函数实现了核心调度逻辑。通过深入研究 GMP 模型,可以更好地理解 Go 语言的并发机制。
|
1月前
|
消息中间件 缓存 安全
Future与FutureTask源码解析,接口阻塞问题及解决方案
【11月更文挑战第5天】在Java开发中,多线程编程是提高系统并发性能和资源利用率的重要手段。然而,多线程编程也带来了诸如线程安全、死锁、接口阻塞等一系列复杂问题。本文将深度剖析多线程优化技巧、Future与FutureTask的源码、接口阻塞问题及解决方案,并通过具体业务场景和Java代码示例进行实战演示。
54 3
|
2月前
|
存储
让星星⭐月亮告诉你,HashMap的put方法源码解析及其中两种会触发扩容的场景(足够详尽,有问题欢迎指正~)
`HashMap`的`put`方法通过调用`putVal`实现,主要涉及两个场景下的扩容操作:1. 初始化时,链表数组的初始容量设为16,阈值设为12;2. 当存储的元素个数超过阈值时,链表数组的容量和阈值均翻倍。`putVal`方法处理键值对的插入,包括链表和红黑树的转换,确保高效的数据存取。
63 5
|
2月前
|
Java Spring
Spring底层架构源码解析(三)
Spring底层架构源码解析(三)
148 5
|
2月前
|
缓存 Java 程序员
Map - LinkedHashSet&Map源码解析
Map - LinkedHashSet&Map源码解析
76 0
|
2月前
|
算法 Java 容器
Map - HashSet & HashMap 源码解析
Map - HashSet & HashMap 源码解析
62 0
|
2月前
|
存储 Java C++
Collection-PriorityQueue源码解析
Collection-PriorityQueue源码解析
66 0

推荐镜像

更多
下一篇
DataWorks