源代码:
const source$ = range(0, 10);
range(0,10)返回一个新的Observable,但是不会立即执行,直到遇到subscribe调用为止:
下图高亮的这段代码,在Observable.subscribe后会执行:
注册到Observable的_subscribe里:
源代码:
ngOnInit(): void { console.log('before ngOnInit'); const source$ = range(0, 10); source$.pipe( filter((x, index) => { console.log('inside filter!: ' + x + ' index: ' + index); return x % 2 === 0 }), map( x => { console.log('inside map: ' + x); return (x + x); } ), // scan((acc, x) => acc + x, 0) scan(this.accumulator) ) .subscribe(x => console.log('result: ' + x)); }
pipe调用的参数传入的是filter, map和scan三个操作的结果,因此首先执行filter:
位于operators目录里:
filter操作接收的参数predicate, 类型是一个函数,该函数接收x和index两个参数,返回boolean类型,这就是所谓的filter-过滤器。filter操作返回一个新的函数,该函数接收一个新的输入source,对source调用lift操作,施加predicate操作。
然后执行pipe操作:
pipe的输入参数就是filter调用的返回结果:
pipeFromArray返回的就是包裹了filter操作的filterOperatorFunction:
进入之前filter调用返回的新函数:
这个source应该是range(0,10)返回的Observable:
对原始Observable对象调用filter返回的predicate操作:
life操作内部新建了一个Observable对象,source是原始Observable对象,operator就是filter对应的predicate.
Observable遇到subscribe方法才会真正地执行Observable内部的方法:
新建一个subscriber,第一个输入参数nextOrObserver就是应用程序里subscribe方法里传入的箭头函数:
subscriber是subscription的子类:
next就是应用程序传入的箭头函数:
注意这里,sink已经准备调用operator了。operator就是filter操作:
进入filter操作:
开始执行subscribe:
执行complete的条件:
调用next:
由此可见,range Observable里包含的值,逐一调用filter:
如果filter返回true,继续调用下一个Observable对象:
这里最终就调用应用程序里传入的subscribe方法了:
range = 0的value已经执行完毕了,下面进行range1:
因为range = 1时filter返回false,所以Observable链式执行到这里就中断了:
range = 2,下面的原理类似,不重复介绍了。