上一篇博客提到了几种响应式的方案,以及它们的缺点。本文将介绍Observable
以及它的一个实现,以及它在处理响应式时相对于上篇博客中的方案的巨大优势(推荐两篇博客对比阅读)。
Observable
是一个集合了观察者模式、迭代器模式和函数式的库,提供了基于事件流的强大的异步处理能力,并且已在Stage 1
草案中。本文介绍的Rxjs
是Observable
的一个实现,它是ReactiveX众多语言中的JavaScript
版本。
在JavaScript
中,我们可以使用T | null
去处理一个单值,使用Iterator
去处理多个值得情况,使用Promise
处理异步的单个值,而Observable
则填补了缺失的“异步多个值”。
单个值 | 多个值 | |
---|---|---|
同步 | T | null |
Iterator<T> |
异步 | Promise<T> |
Observable<T> |
使用Rxjs
上文提到使用Event Emitter
做响应式处理,在Rxjs
中稍有些不同:
/*
const change$ = new Subject();
<Input change$={change$} />
<Search change$={change$} />
*/
class Input extends Component {
state = {
value: ''
};
onChange = e => {
this.props.change$.next(e.target.value);
};
componentDidMount() {
this.subscription = this.props.change$.subscribe(value => {
this.setState({
value
});
});
}
componentWillUnmount() {
this.subscription.ubsubscribe();
}
render() {
const { value } = this.state;
return <input value={value} onChange={this.onChange} />;
}
}
class Search extends Component {
// ...
componentDidMount() {
this.subscription = this.props.change$.subscribe(value => {
ajax(/* ... */).then(list =>
this.setState({
list
})
);
});
}
componentWillUnmount() {
this.subscription.ubsubscribe();
}
render() {
const { list } = this.state;
return <ul>{list.map(item => <li key={item.id}>{item.value}</li>)}</ul>;
}
}
在这里,我们虽然也需要手动释放对事件的订阅,但是得益于
Rxjs
的设计,我们不需要像
Event Emitter
那样去存下回调函数的实例,用于释放订阅,因此我们很容易就可以通过高阶组件解决这个问题。例如:
const withObservables = observables => ChildComponent => {
return class extends Component {
constructor(props) {
super(props);
this.subscriptions = {};
this.state = {};
Object.keys(observables).forEach(key => {
this.subscriptions[key] = observables[key].subscribe(value => {
this.setState({
[key]: value
});
});
});
}
onNext = (key, value) => {
observables[key].next(value);
};
componentWillUnmount() {
Object.keys(this.subscriptions).forEach(key => {
this.subscriptions[key].unsubscribe();
});
}
render() {
return (
<ChildComponent {...this.props} {...this.state} onNext={this.onNext} />
);
}
};
};
这样在需要聚合多个数据源时,也不会像
Event Emitter
那样手动释放资源造成麻烦。同时,在
Rxjs
中我们还有专用于聚合数据源的方法:
Observable.combineLatest(foo$, bar$)
.pipe(
// ...
);
Event Emitter
的方式十分高效,同时它相对于
Mobx
也有巨大的优势。在
Mobx
中,我们提到需要聚合多个数据源的时候,采用
autoRun
的方式容易收集到不必要的依赖,使用
observe
则不够高效。在
Rxjs
中,显然不会有这些问题,
combineLatest
可以以很简练的方式声明需要聚合的数据源,同时,得益于
Rxjs
设计,我们不需要像
Mobx
一个一个去调用
observe
返回的析构,只需要处理每一个
subscribe
返回的
subscription
:
class Foo extends Component {
constructor(props) {
super(props);
this.subscription = Observable.combineLatest(foo$, bar$)
.pipe(
// ...
)
.subscribe(() => {
// ...
});
}
componentWillUnmount() {
this.subscription.unsubscribe();
}
}
异步处理
Rxjs
使用操作符去描述各种行为,每一个操作符会返回一个新的Observable
,我们可以对它进行后续的操作。例如,使用map
操作符就可以实现对数据的转换:
foo$.map(event => event.target.value);
Rxjs 5.5
之后所有的
Observable
上都引入了一个
pipe
方法,接收若干个操作符,
pipe
方法会返回一个
Observable
。因此,我们可以很容易配合
tree shaking
实现对操作符的按需引入,而不是把整个
Rxjs
引入进来:
import { map } from 'rxjs/operators';
foo$.pipe(map(event => event.target.value));
推荐使用这种写法。
在讨论面向对象的响应式的响应式中,我们提到对于异步的问题,面向对象的方式不好处理。在Observable
中我们可以通过switchMap
操作符处理异步问题,一个异步搜索看起来会是这样:
input$.pipe(switchMap(keyword => Observable.ajax(/* ... */)));
在处理异步单值时,我们可以使用
Promise
,而
Observable
用于处理异步多个值,我们可以很容易把一个
Promise
转成一个
Observable
,从而复用已有的异步代码:
input$.pipe(switchMap(keyword => fromPromise(search(/* ... */))));
switchMap
接受一个返回
Observable
的函数作为参数,下游的流就会切到这个返回的
Observable
。 而要聚合多个数据源并做异步处理时:
combineLatest(foo$, bar$).pipe(
switchMap(keyword => fromPromise(someAsyncOperation(/* ... */)))
);
同时,由于标准制定的Promise
是没有cancel
方法的,有时候我们要取消异步方法的时候就有些麻烦(主要是为了解决一些并发安全问题)。switchMap
当上游有新值到来时,会忽略结束已有未完成的Observable
然后调用函数返回一个新的Observable
,我们只使用一个函数就解决了并发安全问题。当然,我们可以根据实际需要选用switchMap
、mergeMap
、concatMap
、exhaustMap
等。
而对于时间轴的操作,Rxjs
也有巨大优势。上篇博客中提到当我们需要延时 5 秒做操作时,无论是Event Emitter
还是面向对象的方式都力不从心,而在Rxjs
中我们只需要一个delay
操作符即可解决问题:
input$.pipe(
delay(5000) // 下游会在input$值到来后5秒才接到数据
);
用 Rxjs 处理数据
在实际开发过程中,事件不能解决所有问题,我们往往会需要存储数据,而Observable
被设计成用于处理事件,因此它有很多符合事件直觉的设计。
Observable
被设计为懒(lazy
)的,当当没有订阅者时,一个流不会执行。对于事件而言,没有事件的消费者那么不执行也不会有问题。而在 GUI 中,订阅者可能是View
:
class View extends Component {
state = {
input: ''
};
componentDidMount() {
this.subscription = input$.subscribe(input => {
this.setState({
input
});
});
}
componentWillUnmount() {
this.subscription.unsubscribe();
}
render() {
// ...
}
}
由于这个View
可能不存在,例如路由被切走了,那么我们的事件源就没有了订阅者,他就不会运行。但是我们希望在路由被且走后,后台的数据依然会继续。
对于事件而言,在事件发生之后的订阅者不会受到订阅之前的逻辑。例如在EventEmitter
中:
eventEmitter.emit('hello', 1);
// ...
eventEmitter.on('hello', function listener() {});
由于listener
是在hello
事件发生后在监听的,不会收到值为1
的事件。但是这在处理数据的时候会造成麻烦,我们的数据在View
被卸载(例如路由切走)后丢失。
同时,由于Observable
没有提供直接取到内部状态的方法,当我们使用Observable
处理数据时,我们不方便随时拿到数据。那有办法解决这个问题,从而使Observable
强大抽象能力去赋能数据层呢?
回到Redux
。Redux
的事件(Action)其实是一个事件流,那么我们就可以很自然地把Redux
的事件流融入到Rxjs
流中:
() => next => {
const action$ = new Subject();
return action => {
action$.next(action);
// ...
};
};
通过这样的封装, redux-observable 就能让我们把
Observable
强大的事件描述和处理能力和
Redux
结合。我们可以非常方便地根据
Action
去处理副作用:
action$.pipe(
ofType('ACTION_1'),
switchMap(() => {
// ...
}),
map(res => ({
type: 'ACTION_2',
payload: res
}))
);
action$.pipe(
ofType('ACTION_3'),
mergeMap(() => {
// ...
}),
map(res => ({
type: 'ACTION_4',
payload: res
}))
);
Redux Observable
使我们可以结合
Redux
和
Observable
。在这里,
Action
被视作一个流,
ofType
相当于
filter(action => action.type === 'SOME_ACTION')
,从而得到需要监听的
Action
,得益于
Redux
的设计,我们可以通过监听
Action
去完成副作用的处理或者监听数据变化。最后这个流返回一个新的
Action
流,
Redux Observable
会把这个新的
Action
流中的
Action
dispatch
出去。由此,我们在使用
Redux
存储数据的基础上获得了
Rxjs
对异步事件的强大处理能力。
原文作者:有赞前端
本文来源: 掘金 如需转载请联系原作者