@TOC
1. 理解核心概念与基础操作
这是学习 RxJava 的基石,必须牢固掌握。
1.1 核心四要素 (The Observable Contract)
RxJava 的核心是 观察者模式 (Observer Pattern),它定义了四个关键角色:
• Observable (被观察者/数据源): 产生数据流的源头。可以发射 多个数据项,以成功或失败结束。
• Observer (观察者/订阅者): 接收并处理 Observable 发射的数据。它定义了四个方法:
• onSubscribe(Disposable d): 订阅建立时调用,可用于取消订阅。
• onNext(T value):接收到一个数据项时调用。
• onError(Throwable e): 发生错误时调用,流终止。
• onComplete():流成功完成时调用,流终止。
• Subscription (订阅): 连接 Observable 和 Observer 的纽带。通过调用 subscribe() 方法建立。
• Disposable (可dispose的): 代表一个订阅关系,调用其 dispose() 方法可以取消订阅,停止接收 数据并释放资源。
Hello World 示例:
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.disposables.Disposable;
public class RxJavaHelloWorld {
public static void main(String[] args) {
// 1. 创建 Observable (数据源)
Observable < String > observable = Observable.just("Hello", "RxJava!");
// 2. 创建 Observer (观察者)
MyObserver observer = new
MyObserver();
// 3. 建立订阅 (Subscription)
Disposable disposable = observable.subscribe(observer);
// 4. (可选) 取消订阅
// disposable.dispose();
}
static class MyObserver implements io.reactivex.rxjava3.core.Observer < String > {
@Override
public void onSubscribe(io.reactivex.rxjava3.disposables.Disposable d) {
System.out.println("订阅建立");
}
@Override
public void onNext(String s) {
System.out.println("接收到: " +
s);
}
@Override
public void onError(Throwable e) {
System.err.println("发生错误: " +
e.getMessage());
}
@Override
public void onComplete() {
System.out.println("流已结束");
}
}
}
// 输出:
// 订阅建立
// 接收到: Hello
// 接收到: RxJava!
// 流已结束
1.2 冷 Observable 与 热 Observable
• 冷 Observable (Cold Observable): 只有在有订阅者订阅时才开始发射数据。每个订阅者都会收到完整的数据流。just, fromArray, create 等创建的通常是冷的。
• 热 Observable (Hot Observable): 无论是否有订阅者,都会发射数据。订阅者只能接收到订阅之后发射的数据。PublishSubject, ReplaySubject 等创建的是热的。
1.3 基础操作符 (Operators)
操作符是 RxJava 的灵魂,用于对数据流进行转换、过滤、组合等。
1.3.1 创建操作符 (Creating):
• just(T...): 发射指定的项。
• fromArray(T[]) / fromIterable(Iterable<? extends T>): 从数组或集合发射数据。
• create(ObservableOnSubscribe): 通过自定义逻辑创建 Observable。
• interval(long, TimeUnit): 定时发射递增的数字。
• range(int start, int count): 发射一个范围内的整数序列。
1.3.2 变换操作符 (Transforming):
• map(Function): 将每个发射的项通过一个函数转换成另一个值。
Observable.just("1", "2", "3"
)
.map(Integer::parseInt) // 将 String 转为 Integer
.subscribe(System.out::println); // 输出: 1, 2, 3
• flatMap(Function>): 将每个发射的项转换成一个新的 Observable,然后将这些 Observable 发射的数据“拍平”并合并到一个单一的 Observable 中。常用于异步操作。
Observable.just("url1", "url2")
.flatMap(url -> fetchDataFromNetwork(url)) // 假设 fetchDataFromNetwork 返回 Observable<Data>
.subscribe(data -> System.out.println("Received: " + data));
• concatMap(Function>>): 类似 flatMap,但保证内部 Observable 的发射顺序与原始项的顺序一致。
1.3.3 过滤操作符 (Filtering):
• filter(Predicate): 只发射满足指定条件的项。
Observable.just(1, 2, 3, 4, 5)
.filter(x -> x % 2 == 0) // 只保留偶数
.subscribe(System.out::println); // 输出: 2, 4
• take(long count): 只取前 N 项。
• skip(long count): 跳过前 N 项。
1.3.4 组合操作符 (Combining):
• merge(ObservableSource<? extends T>...): 将多个 Observable 发射的数据合并,不保证顺序。
• concat(ObservableSource<? extends T>...): 按顺序连接多个 Observable,前一个完成后才订阅下一个。
• zip(ObservableSource, ObservableSource, BiFunction): 将多个 Observable 的数据按顺序一一配对组合。
Observable<String> names = Observable.just("Alice", "Bob");
Observable<Integer> ages = Observable.just(25, 30);
Observable.zip(names, ages, (name, age) -> name + " is " + age + " years old.")
.subscribe(System.out::println);
// 输出:
// Alice is 25 years old.
// Bob is 30 years old.
• combineLatest(ObservableSource, ObservableSource, BiFunction): 当任意一个 Observable 发射新数据时,与其它 Observable 的最新数据组合。
1.4. 线程调度 (Schedulers)
RxJava 强大的异步能力离不开调度器。
• subscribeOn(Scheduler scheduler): 指定 上游(数据产生、操作符执行)在哪个线程执行。只对第一个 subscribeOn有效。
• observeOn(Scheduler scheduler): 指定下游(onNext, onError, onComplete)在哪个线程执行。可以多次调用,切换线程。
常用调度器:
• Schedulers.io(): 用于 I/O 密集型操作(如网络请求、文件读写)。
• Schedulers.computation(): 用于 CPU 密集型计算。
• Schedulers.newThread(): 为每个任务创建一个新线程。
• AndroidSchedulers.mainThread() (RxAndroid): 在 Android 主线程执行,用于更新 UI。
示例:
Observable.just("url")
.subscribeOn(Schedulers.io()) // 网络请求在 IO 线程
.map(url -> fetchData(url)) // 转换操作也在 IO 线程
.observeOn(AndroidSchedulers.mainThread()) // 切换到主线程更新 UI
.subscribe(data -> updateUI(data));
2. 深入原理与高级操作
掌握了基础后,需要理解其内部机制和更复杂的操作。
2.1 操作符链与 Lift 原理
理解操作符是如何串联起来的至关重要。每个操作符(如 map, filter)内部通常会调用 lift(Operator) 方法,创建一个新的 Observable,这个新的 Observable 会包装上游的 Observable,并在订阅时,将下游的 Observer 包装成一个新的 Observer,从而在数据传递过程中插入转换或过滤逻辑。这是一个典型的装饰者模式。
2.2 背压 (Backpressure) - RxJava 2/3 的核心概念
在异步场景下,如果生产者(Observable)发射数据的速度远快于消费者(Observer)处理数据的速度,会导致内存溢出。RxJava 2 引入了 Flowable 来专门处理背压问题。
• Flowable vs Observable:
• Observable: 无背压支持,适用于 GUI 事件、短序列等。
• Flowable: 有背压支持,适用于网络请求、数据库操作、文件 I/O 等可能产生大量数据的场景。
• 背压策略 (BackpressureStrategy):
• BUFFER: 缓存所有数据,可能导致 OOM。
• DROP: 丢弃无法处理的数据。
• LATEST: 只保留最新的数据,丢弃旧的。
• ERROR: 抛出 MissingBackpressureException。
• MISSING: 不做任何处理,需要手动通过 onBackpressureXXX 操作符处理。
2.3. 高级操作符
• switchMap: 类似 flatMap,但当源 Observable 发射一个新项时,会取消订阅并丢弃前一个内部 Observable 产生的数据,只处理最新的内部 Observable。
• debounce: 如果在一个指定的时间段内没有新的数据发射,则发射最近的一个数据。常用于搜索框防抖。
• distinct / distinctUntilChanged: 过滤掉重复的数据。
• retry / retryWhen: 在发生错误时重试。
• timeout: 如果在指定时间内没有收到数据,则抛出超时异常。
• doOnNext / doOnError / doOnComplete / doFinally: 用于副作用(side-effect),如日志记录、资源清理,不影响数据流本身。
2.4. Subject (主题)
Subject 既是 Observable 又是 Observer,可以手动向数据流中“推送”数据,是创建热 Observable 的主要方式。
• PublishSubject: 向所有订阅者广播数据,订阅后才能收到数据。
• ReplaySubject: 缓存所有发射过的数据,新的订阅者会收到所有历史数据。
• BehaviorSubject: 缓存最后一个数据,新的订阅者会立即收到这个最新数据,然后接收后续数据。
• AsyncSubject: 只在流完成时,向所有订阅者发射最后一个数据。
BehaviorSubject 示例 (模拟状态管理):
BehaviorSubject<String> userNameSubject = BehaviorSubject.createDefault("Guest");
// 订阅者 A
userNameSubject.subscribe(name -> System.out.println("User A sees: "+ name));
// 输出: User A sees: Guest
// 订阅者 B (稍后订阅)
userNameSubject.subscribe(name -> System.out.println("User B sees: "+ name));
// 输出: User B sees: Guest
// 更新用户名
userNameSubject.onNext("Alice");
// 输出: User A sees: Alice
// 输出: User B sees: Alice
2.5. 整体变换 (compose 和 transform)
当一组操作符需要在多个地方复用时,可以使用 compose 或 transform。
• compose(ObservableTransformer): 用于 Observable。
• to(ObservableConverter): 更通用的转换。
示例 - 封装网络请求的通用线程切换:
public class SchedulersTransformer<T> implements ObservableTransformer<T, T> {
@Override
public ObservableSource<T> apply(Observable<T> upstream) {
return upstream.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
}
// 使用
apiService.getData()
.compose(new SchedulersTransformer<>()) // 应用通用变换
.subscribe(...);