RxJS 转换操作符,继续冲冲冲!熟悉的温故知新,不熟悉的混个脸熟先~
buffer
buffer
顾名思义就是“缓存”,可以在某些条件下进行值的收集,然后再在某些条件下,将收集的值发出。除了 buffer
同类的还有:
- bufferCount:收集发出的值,直到收集完提供的数量的值才将其作为数组发出。
- bufferTime:收集发出的值,直到经过了提供的时间才将其作为数组发出。
- bufferToggle:开启开关以捕获源 observable 所发出的值,关闭开关以将缓冲的值作为数组发出。
- bufferWhen:收集值,直到关闭选择器发出值才发出缓冲的值
使用方法大同小异,简单理解为:车站安检,人很多的时候,就有专人在那设卡,控制流量,当设卡的人觉得在某个条件下可以了,就放卡,这里的条件可以是:数量、时间、自定义开启、其它条件值;
e.g.
// 创建每1秒发出值的 observable const myInterval = interval(1000); // 创建页面点击事件的 observable const bufferBy = fromEvent(document, 'click'); /* 收集由 myInterval 发出的所有值,直到我们点击页面。此时 bufferBy 会发出值以完成缓存。 将自上次缓冲以来收集的所有值传递给数组。 */ const myBufferedInterval = myInterval.pipe(buffer(bufferBy)); // 打印值到控制台 // 例如 输出: [1,2,3] ... [4,5,6,7,8] const subscribe = myBufferedInterval.subscribe(val => console.log(' Buffered Values:', val) );
concatMap
concatMap
可以将值进行映射,还有一个与之相似的是 mergeMap
,类比来说:一个是 reduce promise,一个是 PromiseAll;
// concatMap // 发出 'Hello' 和 'Goodbye' const source = of('Hello', 'Goodbye'); // 使用 promise 的示例 const examplePromise = val => new Promise(resolve => resolve(`${val} World!`)); // 将 source 的值映射成内部 observable,当一个完成发出结果后再继续下一个 const example = source.pipe(concatMap(val => examplePromise(val))); // 输出: 'Example w/ Promise: 'Hello World', Example w/ Promise: 'Goodbye World' const subscribe = example.subscribe(val => console.log('Example w/ Promise:', val) ); // mergeMap // 发出 'Hello' const source = of('Hello'); // mergeMap 还会发出 promise 的结果 const myPromise = val => new Promise(resolve => resolve(`${val} World From Promise!`)); // 映射成 promise 并发出结果 const example = source.pipe(mergeMap(val => myPromise(val))); // 输出: 'Hello World From Promise' const subscribe = example.subscribe(val => console.log(val));
map
map
最关键了,它能对源 observable 的每个值应用投射函数。
// 发出 (1,2,3,4,5) const source = from([1, 2, 3, 4, 5]); // 每个数字加10 const example = source.pipe(map(val => val + 10)); // 输出: 11,12,13,14,15 const subscribe = example.subscribe(val => console.log(val));
reduce
常见的还有 reduce
,它能将源 observalbe 的值归并为单个值,当源 observable 完成时将这个值发出。
const source = of(1, 2, 3, 4); const example = source.pipe(reduce((acc, val) => acc + val)); // 输出: Sum: 10' const subscribe = example.subscribe(val => console.log('Sum:', val));
window
还有:window
操作符,是时间窗口值的 observable;
// RxJS v6+ import { timer, interval } from 'rxjs'; import { window, scan, mergeAll } from 'rxjs/operators'; // 立即发出值,然后每秒发出值 const source = timer(0, 1000); const example = source.pipe(window(interval(3000))); const count = example.pipe(scan((acc, curr) => acc + 1, 0)); /* "Window 1:" 0 1 2 "Window 2:" 3 4 5 ... */ const subscribe = count.subscribe(val => console.log(`Window ${val}:`)); const subscribeTwo = example .pipe(mergeAll()) .subscribe(val => console.log(val));
其实除了:window
,还有其衍生的 windowCount、windowTime、windowToggle、windowWhen。与 buffer
的衍生也很像。
OK,以上便是本篇分享,往期关于 RxJS 的内容:
- 3 分钟温故知新 RxJS 创建实例操作符
- 你就是函数响应式编程(FRP)啊?!【附 RxJS 实战】
- 为什么说:被观察者是 push 数据,迭代者是 pull 数据?
- 探秘 RxJS Observable 为什么要长成这个样子?!
- Js 异步处理演进,Callback=>Promise=>Observer
- 继续解惑,异步处理 —— RxJS Observable
我是掘金安东尼,输出暴露输入,技术洞见生活,再会~ 👍👍👍