RxJava基础操作符和高级操作符

简介: RxJava基于观察者模式,通过Observable、Observer、Subscription和Disposable四大核心实现响应式编程。支持冷热数据流,提供丰富操作符进行变换、过滤与组合,并借助Scheduler实现线程调度,Flowable解决背压问题,Subject用于状态共享,适用于异步事件处理与复杂数据流管理。

@TOC

1. 理解核心概念与基础操作

这是学习 RxJava 的基石,必须牢固掌握。

1.1 核心四要素 (The Observable Contract)

RxJava 的核心是 观察者模式 (Observer Pattern),它定义了四个关键角色:

Observable (被观察者/数据源): 产生数据流的源头。可以发射 多个数据项,以成功或失败结束。
Observer (观察者/订阅者): 接收并处理 Observable 发射的数据。它定义了四个方法:

• onSubscribe(Disposable d): 订阅建立时调用,可用于取消订阅。
• onNext(T value):接收到一个数据项时调用。
• onError(Throwable e): 发生错误时调用,流终止。
• onComplete():流成功完成时调用,流终止。

Subscription (订阅): 连接 Observable 和 Observer 的纽带。通过调用 subscribe() 方法建立。
Disposable (可dispose的): 代表一个订阅关系,调用其 dispose() 方法可以取消订阅,停止接收 数据并释放资源。

Hello World 示例:


import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.disposables.Disposable;

public class RxJavaHelloWorld {
        public static void main(String[] args) {
            // 1. 创建 Observable (数据源)
            Observable < String > observable = Observable.just("Hello", "RxJava!");
            // 2. 创建 Observer (观察者)
            MyObserver observer = new
            MyObserver();
            // 3. 建立订阅 (Subscription)
            Disposable disposable = observable.subscribe(observer);
            // 4. (可选) 取消订阅
            // disposable.dispose();
        }

        static class MyObserver implements io.reactivex.rxjava3.core.Observer < String > {
            @Override
            public void onSubscribe(io.reactivex.rxjava3.disposables.Disposable d) {
                System.out.println("订阅建立");
            }

            @Override
            public void onNext(String s) {
                System.out.println("接收到: " +
                    s);
            }

            @Override
            public void onError(Throwable e) {
                System.err.println("发生错误: " +
                    e.getMessage());
            }

            @Override
            public void onComplete() {
                System.out.println("流已结束");
            }
        }
    }
    // 输出:
    // 订阅建立
    // 接收到: Hello
    // 接收到: RxJava!
    // 流已结束

1.2 冷 Observable 与 热 Observable

• 冷 Observable (Cold Observable): 只有在有订阅者订阅时才开始发射数据。每个订阅者都会收到完整的数据流。just, fromArray, create 等创建的通常是冷的。
• 热 Observable (Hot Observable): 无论是否有订阅者,都会发射数据。订阅者只能接收到订阅之后发射的数据。PublishSubject, ReplaySubject 等创建的是热的。

1.3 基础操作符 (Operators)

操作符是 RxJava 的灵魂,用于对数据流进行转换、过滤、组合等。

1.3.1 创建操作符 (Creating):

• just(T...): 发射指定的项。
• fromArray(T[]) / fromIterable(Iterable<? extends T>): 从数组或集合发射数据。
• create(ObservableOnSubscribe): 通过自定义逻辑创建 Observable。
• interval(long, TimeUnit): 定时发射递增的数字。
• range(int start, int count): 发射一个范围内的整数序列。

1.3.2 变换操作符 (Transforming):

• map(Function): 将每个发射的项通过一个函数转换成另一个值。


Observable.just("1", "2", "3"
)
    .map(Integer::parseInt) // 将 String 转为 Integer
    .subscribe(System.out::println); // 输出: 1, 2, 3

• flatMap(Function>): 将每个发射的项转换成一个新的 Observable,然后将这些 Observable 发射的数据“拍平”并合并到一个单一的 Observable 中。常用于异步操作。


Observable.just("url1", "url2")
    .flatMap(url -> fetchDataFromNetwork(url)) // 假设 fetchDataFromNetwork 返回 Observable<Data>
    .subscribe(data -> System.out.println("Received: " + data));

• concatMap(Function>>): 类似 flatMap,但保证内部 Observable 的发射顺序与原始项的顺序一致。

1.3.3 过滤操作符 (Filtering):

• filter(Predicate): 只发射满足指定条件的项。


Observable.just(1, 2, 3, 4, 5)
    .filter(x -> x % 2 == 0) // 只保留偶数
    .subscribe(System.out::println); // 输出: 2, 4

• take(long count): 只取前 N 项。
• skip(long count): 跳过前 N 项。

1.3.4 组合操作符 (Combining):

• merge(ObservableSource<? extends T>...): 将多个 Observable 发射的数据合并,不保证顺序。
• concat(ObservableSource<? extends T>...): 按顺序连接多个 Observable,前一个完成后才订阅下一个。
• zip(ObservableSource, ObservableSource, BiFunction): 将多个 Observable 的数据按顺序一一配对组合。


Observable<String> names = Observable.just("Alice", "Bob");
Observable<Integer> ages = Observable.just(25, 30);

Observable.zip(names, ages, (name, age) -> name + " is " + age + " years old.")
   .subscribe(System.out::println);
// 输出:
// Alice is 25 years old.
// Bob is 30 years old.

• combineLatest(ObservableSource, ObservableSource, BiFunction): 当任意一个 Observable 发射新数据时,与其它 Observable 的最新数据组合。

1.4. 线程调度 (Schedulers)

RxJava 强大的异步能力离不开调度器。
• subscribeOn(Scheduler scheduler): 指定 上游(数据产生、操作符执行)在哪个线程执行。只对第一个 subscribeOn有效。
• observeOn(Scheduler scheduler): 指定下游(onNext, onError, onComplete)在哪个线程执行。可以多次调用,切换线程。

常用调度器:

• Schedulers.io(): 用于 I/O 密集型操作(如网络请求、文件读写)。
• Schedulers.computation(): 用于 CPU 密集型计算。
• Schedulers.newThread(): 为每个任务创建一个新线程。
• AndroidSchedulers.mainThread() (RxAndroid): 在 Android 主线程执行,用于更新 UI。

示例:


Observable.just("url")
    .subscribeOn(Schedulers.io()) // 网络请求在 IO 线程
    .map(url -> fetchData(url))   // 转换操作也在 IO 线程
    .observeOn(AndroidSchedulers.mainThread()) // 切换到主线程更新 UI
    .subscribe(data -> updateUI(data));

2. 深入原理与高级操作

掌握了基础后,需要理解其内部机制和更复杂的操作。

2.1 操作符链与 Lift 原理

理解操作符是如何串联起来的至关重要。每个操作符(如 map, filter)内部通常会调用 lift(Operator) 方法,创建一个新的 Observable,这个新的 Observable 会包装上游的 Observable,并在订阅时,将下游的 Observer 包装成一个新的 Observer,从而在数据传递过程中插入转换或过滤逻辑。这是一个典型的装饰者模式。

2.2 背压 (Backpressure) - RxJava 2/3 的核心概念

在异步场景下,如果生产者(Observable)发射数据的速度远快于消费者(Observer)处理数据的速度,会导致内存溢出。RxJava 2 引入了 Flowable 来专门处理背压问题。
• Flowable vs Observable:
• Observable: 无背压支持,适用于 GUI 事件、短序列等。
• Flowable: 有背压支持,适用于网络请求、数据库操作、文件 I/O 等可能产生大量数据的场景。
• 背压策略 (BackpressureStrategy):
• BUFFER: 缓存所有数据,可能导致 OOM。
• DROP: 丢弃无法处理的数据。
• LATEST: 只保留最新的数据,丢弃旧的。
• ERROR: 抛出 MissingBackpressureException。
• MISSING: 不做任何处理,需要手动通过 onBackpressureXXX 操作符处理。

2.3. 高级操作符

• switchMap: 类似 flatMap,但当源 Observable 发射一个新项时,会取消订阅并丢弃前一个内部 Observable 产生的数据,只处理最新的内部 Observable。
• debounce: 如果在一个指定的时间段内没有新的数据发射,则发射最近的一个数据。常用于搜索框防抖。
• distinct / distinctUntilChanged: 过滤掉重复的数据。
• retry / retryWhen: 在发生错误时重试。
• timeout: 如果在指定时间内没有收到数据,则抛出超时异常。
• doOnNext / doOnError / doOnComplete / doFinally: 用于副作用(side-effect),如日志记录、资源清理,不影响数据流本身。

2.4. Subject (主题)

Subject 既是 Observable 又是 Observer,可以手动向数据流中“推送”数据,是创建热 Observable 的主要方式。
• PublishSubject: 向所有订阅者广播数据,订阅后才能收到数据。
• ReplaySubject: 缓存所有发射过的数据,新的订阅者会收到所有历史数据。
• BehaviorSubject: 缓存最后一个数据,新的订阅者会立即收到这个最新数据,然后接收后续数据。
• AsyncSubject: 只在流完成时,向所有订阅者发射最后一个数据。
BehaviorSubject 示例 (模拟状态管理):

BehaviorSubject<String> userNameSubject = BehaviorSubject.createDefault("Guest");

// 订阅者 A
userNameSubject.subscribe(name -> System.out.println("User A sees: "+ name));
// 输出: User A sees: Guest

// 订阅者 B (稍后订阅)
userNameSubject.subscribe(name -> System.out.println("User B sees: "+ name));
// 输出: User B sees: Guest

// 更新用户名
userNameSubject.onNext("Alice");

// 输出: User A sees: Alice
// 输出: User B sees: Alice

2.5. 整体变换 (compose 和 transform)

当一组操作符需要在多个地方复用时,可以使用 compose 或 transform。
• compose(ObservableTransformer): 用于 Observable。
• to(ObservableConverter): 更通用的转换。

示例 - 封装网络请求的通用线程切换:


public class SchedulersTransformer<T> implements ObservableTransformer<T, T> {
    @Override
    public ObservableSource<T> apply(Observable<T> upstream) {
        return  upstream.subscribeOn(Schedulers.io())
                                  .observeOn(AndroidSchedulers.mainThread());
    }
}

// 使用
apiService.getData()
    .compose(new SchedulersTransformer<>()) // 应用通用变换
    .subscribe(...);
目录
相关文章
|
2天前
|
云安全 人工智能 安全
AI被攻击怎么办?
阿里云提供 AI 全栈安全能力,其中对网络攻击的主动识别、智能阻断与快速响应构成其核心防线,依托原生安全防护为客户筑牢免疫屏障。
|
12天前
|
域名解析 人工智能
【实操攻略】手把手教学,免费领取.CN域名
即日起至2025年12月31日,购买万小智AI建站或云·企业官网,每单可免费领1个.CN域名首年!跟我了解领取攻略吧~
|
6天前
|
安全 Java Android开发
深度解析 Android 崩溃捕获原理及从崩溃到归因的闭环实践
崩溃堆栈全是 a.b.c?Native 错误查不到行号?本文详解 Android 崩溃采集全链路原理,教你如何把“天书”变“说明书”。RUM SDK 已支持一键接入。
491 201
|
4天前
|
人工智能 移动开发 自然语言处理
2025最新HTML静态网页制作工具推荐:10款免费在线生成器小白也能5分钟上手
晓猛团队精选2025年10款真正免费、无需编程的在线HTML建站工具,涵盖AI生成、拖拽编辑、设计稿转代码等多种类型,均支持浏览器直接使用、快速出图与文件导出,特别适合零基础用户快速搭建个人网站、落地页或企业官网。
620 157
|
10天前
|
人工智能 自然语言处理 安全
国内主流Agent工具功能全维度对比:从技术内核到场景落地,一篇读懂所有选择
2024年全球AI Agent市场规模达52.9亿美元,预计2030年将增长至471亿美元,亚太地区增速领先。国内Agent工具呈现“百花齐放”格局,涵盖政务、金融、电商等多场景。本文深入解析实在智能实在Agent等主流产品,在技术架构、任务规划、多模态交互、工具集成等方面进行全维度对比,结合市场反馈与行业趋势,为企业及个人用户提供科学选型指南,助力高效落地AI智能体应用。
|
4天前
|
数据采集 消息中间件 人工智能
跨系统数据搬运的全方位解析,包括定义、痛点、技术、方法及智能体解决方案
跨系统数据搬运打通企业数据孤岛,实现CRM、ERP等系统高效互通。伴随数字化转型,全球市场规模超150亿美元,中国年增速达30%。本文详解其定义、痛点、技术原理、主流方法及智能体新范式,结合实在Agent等案例,揭示从数据割裂到智能流通的实践路径,助力企业降本增效,释放数据价值。
|
存储 人工智能 监控
从代码生成到自主决策:打造一个Coding驱动的“自我编程”Agent
本文介绍了一种基于LLM的“自我编程”Agent系统,通过代码驱动实现复杂逻辑。该Agent以Python为执行引擎,结合Py4j实现Java与Python交互,支持多工具调用、记忆分层与上下文工程,具备感知、认知、表达、自我评估等能力模块,目标是打造可进化的“1.5线”智能助手。
625 46