1、RxJava 2.0
1.1、什么是RxJava和响应式编程(reactive programming)
在响应编程中,消费者对数据进行反应,这就是为什么异步编程也被称为响应式编程的原因。 响应式编程允许将事件更改传播到已注册的观察者。
RxJava是从Netflix的反向扩展(Rx)到Java的端口。 RxJava是2014年开源的,托管于http://reactivex.io/。
“观察者模式做的正确。 ReactiveX是来自Observer模式,Iterator模式和功能编程的最佳创意的组合。“
--activex.io
这个概念的Java版本叫做RxJava,它托管在https://github.com/ReactiveX/RxJava下。 RxJava根据Apache 2.0许可证发布。
RxJava将自己描述为用于具有可观察流的异步编程的API。
1.2、定义与RxJava 2.0的依赖关系
在撰写本文时,2.0.4版本目前是发布版本。 将g.a.v替换为2.0.6或更高版本。
对于Gradle构建,您可以通过以下依赖关系语句添加RxJava。
compile group: 'io.reactivex.rxjava2', name: 'rxjava', version: 'g.a.v'
对于Maven,您可以添加以下代码段的依赖关系
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>g.a.v</version>
</dependency>
对于OSGi环境,例如Eclipse RCP开发,https://dl.bintray.com/simon-scholz/RxJava-OSGi/可用作p2更新站点。
1.3、 异步编程
现在的编程以一种迫切的单向线程的方式进行编程通常会导致奇怪的行为,阻塞不响应的用户界面,从而导致糟糕的用户体验。例如,如果网络不响应,则主动等待数据库查询或Web服务调用可能导致应用程序冻结。
这可以通过异步处理不可预测的事情来避免。
一个例子是:
public List<Todo> getTodos() {
List<Todo> todosFromWeb = // query a webservice (with bad network latency)
return todosFromDb;
}
从主线程或UI线程调用getTodos()方法将导致一个非响应的应用程序,直到todosFromWeb到达。
为了改进这个查询,这需要不可预测的时间量,这个代码应该运行在不同的线程中,并在结果进入时通知主线程。
public void getTodos(Consumer<List<Todo>> todosCallback) {
Thread thread = new Thread(()-> {
List<Todo> todosFromWeb = // query a webservice
todosCallback.accept(todosFromWeb);
});
thread.start();
}
现在调用
getTodos(Consumer <List <Todo >> todosConsumer)后,主线程可以继续工作,一旦调用了给定的Consumer的accept方法,就不会被阻塞,并且可以做出反应。
现在的代码真正是异步的。
但是如果发生Web服务查询中的错误怎么办?
public void getTodos(FailableCallback<List<Todo>> todosCallback) {
Thread thread = new Thread(()-> {
try {
List<Todo> todosFromWeb = // query a web service
todosCallback.accept(todosFromWeb);
} catch(Exception ex) {
todosCallback.error(ex);
}
});
thread.start();
}
使用自定义
FailableCallback界面可以工作,但也增加了复杂性。
还有更多的问题可以发生:
- 与UI同步(SWT和Android中的小部件必须从UI线程更新)
- 如果FailableCallback的消费者不再存在,该怎么办?
- 如果这样的FailableCallback取决于另一个FailableCallback怎么办?
public void getUserPermission(FailableCallback<UserPermission> permissionCallback) {
Thread thread = new Thread(()-> {
try {
UserPermission permission = // query a web service
permissionCallback.accept(permission);
} catch(Exception ex) {
permission.error(ex);
}
});
thread.start();
}
public void getTodos(FailableCallback<List<Todo>> todosCallback) {
Thread thread = new Thread(()-> {
getUserPermission(new FailableCallback() {
public void accept(UserPermission permission) {
if(permission.isValid()) {
try {
List<Todo> todosFromWeb = // query a web service
if(!todosCallbackInstance.isDisposed()) {
if(syncWithUIThread()) {
todosCallback.accept(todosFromWeb);
}
}
} catch(Exception ex) {
if(!todosCallbackInstance.isDisposed()) {
if(syncWithUIThread()) {
todosCallback.error(ex);
}
}
}
}
}
public void error(Exception ex) {
// Oh no!
}
});
});
thread.start();
}
这是非常糟糕的编码,它可能会变得更糟,应该只显示一个可以用ReactiveX解决的例子。 这些问题通常被认为是回调地狱/大坑。
2、RxJava可观察类型
可以重复或甚至无限发射数据的类型 Flowable<T> 和 Obervable<T>.
Observable<Todo> todoObservable = Observable.create(emitter -> {
try {
List<Todo> todos = getTodos();
for (Todo todo : todos) {
emitter.onNext(todo);
}
emitter.onComplete();
} catch (Exception e) {
emitter.onError(e);
}
});
典型的Observable可能会发出无限数据,就像一个点击监听器一样,UI监听器是不可预测的,通常用户可能会点击按钮或其他UI小部件。通常终止成功或失败的类型
Maybe<T>,
Single<T>和
Completable。Maybe<T>对象是一种异步java.util.Optional从Java 8。
Maybe<List<Todo>> todoMaybe = Maybe.create(emitter -> {
try {
List<Todo> todos = getTodos();
if(todos != null && !todos.isEmpty()) {
emitter.onSuccess(todos); (1)
}else {
emitter.onComplete(); (2)
}
} catch (Exception e) {
emitter.onError(e); (3)
}
});
(1) | java.util.Optional 与一个值 |
(2) | java.util.Optional 不包含值→null |
(3) | 发生错误 |
Single<T>对象也可以被认为是承诺,在异步框架中也很受欢迎,并且类似于 Maybe<T>对象,但只有没有 onComplete()方法。
Completable对象与 Single<T>对象非常相似,但没有返回值,因此也不具有类似其他类型的泛型。 Completable对象也可以看作是反应式 java.lang.Runnable对象。
除了这些可观察类型的最流行的 create()方法之外,还有更多的方便方法来创建这些类型之一。
- Observable.just() - 允许在其他数据类型周围创建一个可观察的包装
- Observable.fromIterable() - 接受一个java.lang.Iterable <T>,并在数据结构中按顺序排列它们的值
- Observable.fromArray() - 获取一个数组,并在数据结构中按顺序排列它们的值
- Observable.fromCallable() - 允许为java.util.concurrent.Callable <V>创建一个observable
- Observable.fromFuture() - 允许为java.util.concurrent.Future创建一个observable
- Observable.interval() - 在给定间隔内发出长对象的可观察值
- ...
3、在RxJava中订阅
一个可观察的实例是可用的监听器/用户可以附加。所有可观察类型都提供了各种各样的订阅方法。
Observable<Todo> todoObservable = Observable.create(emitter -> { ... });
// Simply subscribe with a io.reactivex.functions.Consumer<T>, which will be informed onNext()
Disposable disposable = todoObservable.subscribe(t -> System.out.print(t));
// Dispose the subscription when not interested in the emitted data any more
disposable.dispose();
// Also handle the error case with a second io.reactivex.functions.Consumer<T>
Disposable subscribe = todoObservable.subscribe(t -> System.out.print(t), e -> e.printStackTrace());
// ...
- 有更多的io.reactivex.functions.Consumer <T> onNext,onSuccess,onFailure,onComplete等符合可观察类型。
- io.reactivex.functions.Consumer <T>几乎等于java 8中的java.util.function.Consumer,除了它的accept方法可以抛出异常。 此外,RxJava也不依赖Java 8,但与Java 6兼容。
DisposableObserver<Todo> disposableObserver = todoObservable.subscribeWith(new DisposableObserver<Todo>() {
@Override
public void onNext(Todo t) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
4、处理订阅并使用CompositeDisposable
当注册者或订阅者被附加时,他们通常不应该永久地聆听。所以可能会发生这样的情况:由于某些状态的改变,一个可观察的发射的事件可能不再是有趣的。import io.reactivex.Single;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableSingleObserver;
Single<List<Todo>> todosSingle = getTodos();
Disposable disposable = todosSingle.subscribeWith(new DisposableSingleObserver<List<Todo>>() {
@Override
public void onSuccess(List<Todo> todos) {
// work with the resulting todos
}
@Override
public void onError(Throwable e) {
// handle the error case
}
});
// continue working and dispose when value of the Single is not interesting any more
disposable.dispose();
- 单一类和其他可观察类提供不同的订阅方法,返回一个Disposable对象。
import io.reactivex.Single;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableSingleObserver;
import io.reactivex.disposables.CompositeDisposable;
CompositeDisposable compositeDisposable = new CompositeDisposable();
Single<List<Todo>> todosSingle = getTodos();
Single<Happiness> happiness = getHappiness();
compositeDisposable.add(todosSingle.subscribeWith(new DisposableSingleObserver<List<Todo>>() {
@Override
public void onSuccess(List<Todo> todos) {
// work with the resulting todos
}
@Override
public void onError(Throwable e) {
// handle the error case
}
}));
compositeDisposable.add(happiness.subscribeWith(new DisposableSingleObserver<Happiness>() {
@Override
public void onSuccess(Happiness happiness) {
// celebrate the happiness :-D
}
@Override
public void onError(Throwable e) {
System.err.println("Don't worry, be happy! :-P");
}
}));
// continue working and dispose all subscriptions when the values from the Single objects are not interesting any more
compositeDisposable.dispose();
5.缓存已完成的可观察值的值
当使用可观察器对可观察器上的每个订阅进行异步调用时,通常不是必需的。可能会发生在应用程序中传递观察器,而不需要在添加订阅的同时进行这样一个昂贵的调用。以下代码执行昂贵的网页查询4次,即使这样做一次会很好,因为应该显示相同的Todo对象,而只能以不同的方式显示。Single<List<Todo>> todosSingle = Single.create(emitter -> {
Thread thread = new Thread(() -> {
try {
List<Todo> todosFromWeb = // query a webservice
System.out.println("Called 4 times!");
emitter.onSuccess(todosFromWeb);
} catch (Exception e) {
emitter.onError(e);
}
});
thread.start();
});
todosSingle.subscribe(... " Show todos times in a bar chart " ...);
showTodosInATable(todosSingle);
todosSingle.subscribe(... " Show todos in gant diagram " ...);
anotherMethodThatsSupposedToSubscribeTheSameSingle(todosSingle);
Single<List<Todo>> todosSingle = Single.create(emitter -> {
Thread thread = new Thread(() -> {
try {
List<Todo> todosFromWeb = // query a webservice
System.out.println("I am only called once!");
emitter.onSuccess(todosFromWeb);
} catch (Exception e) {
emitter.onError(e);
}
});
thread.start();
});
// cache the result of the single, so that the web query is only done once
Single<List<Todo>> cachedSingle = todosSingle.cache();
cachedSingle.subscribe(... " Show todos times in a bar chart " ...);
showTodosInATable(cachedSingle);
cachedSingle.subscribe(... " Show todos in gant diagram " ...);
anotherMethodThatsSupposedToSubscribeTheSameSingle(cachedSingle);
6. Flowable<T> 和 Backpressure
RxJava 2.0引入了一种新型的 Flowable <T>,它与API相当于 Observable <T>,但 Flowable <T>支持 Backpressure,而 Observable <T>则不支持。回到RxJava 1.0 Backpressure的概念太晚了,被添加到了 Observable的类型,但有些则抛出了一个 MissingBackpressureException,所以区分 Flowable <T>和 Observable <T>是一件好事。除了 Observable <T>也可能 <T>,Single <T>和 Completable没有 Backpressure。7.类型之间的转换
很容易在不同的RxJava类型之间进行转换。From / To | Flowable | Observable | Maybe | Single | Completable |
---|---|---|---|---|---|
Flowable |
toObservable() |
reduce() |
scan() |
ignoreElements() |
|
Observable |
toFlowable() |
reduce() |
scan() |
ignoreElements() |
|
Maybe |
toFlowable() |
toObservable() |
toSingle() |
toCompletable() |
|
Single |
toFlowable() |
toObservable() |
toMaybe() |
toCompletable() |
|
Completable |
toFlowable() |
toObservable() |
toMaybe() |
toSingle() |
8、测试RxJava可观察和订阅
8.1、 测试可观察量
您可以通过RxJava库提供的TestSubscriber类来测试可观察值。Observable<String> obs = ...// assume creation code here
TestSubscriber<String> testSubscriber = new TestSubscriber<>();
obs.subscribe(testSubscriber);
testSubscriber.assertNoErrors();
List<String> chickens = testSubscriber.getOnNextEvents();
// TODO assert your string integrity...