初识RxJava(四)组合类 操作符

简介:

前言:

前面已经记录了 三种操作符,下面开始第四种操作符的相关使用笔记,每天学一点没什么坏处,而且现在 RxJava 并不是什么新鲜玩意,都到现在了,还不知道 RxJava 怎么使用,那么 笔者请你 打开 Boss直聘 app 看看 Android 的招聘信息,你就知道是什么了。不墨迹了,开始写笔记。

正文:

1、zip 操作符

1)、作用

将多个 被观察者对象 通过一定的 骚操作 组合到一起,发射给 观察者的;
观察者接收到的数据是 按照严格的顺序;
结束点是 数据少的被被观察者对象 的数据队列。

2.1)、代码
 /**
     * zip  操作符
     */
    private void zipMethod() {
        Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                logDUtils("Integer:"+1);
                emitter.onNext(1);
                logDUtils("Integer:"+2);
                emitter.onNext(2);
                logDUtils("Integer:"+3);
                emitter.onNext(3);
                logDUtils("Integer:"+4);
                emitter.onNext(4);
                emitter.onComplete();
            }

        });

        Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                logDUtils("String: aaa");
                emitter.onNext("aaa");
                logDUtils("String: bbb");
                emitter.onNext("bbb");
                logDUtils("String: ccc");
                emitter.onNext("ccc");
                emitter.onComplete();
            }
        });

        Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
            @Override
            public String apply(Integer integer, String s) throws Exception {
                return integer + s;
            }
        }).subscribe(new Observer<String>() {

            @Override
            public void onSubscribe(Disposable d) {
                logDUtils("onSubscribe:");
            }

            @Override
            public void onNext(String s) {
                logDUtils("onNext:" + s);
            }

            @Override
            public void onError(Throwable e) {
                logDUtils("onError:" + e.getMessage());
            }

            @Override
            public void onComplete() {
                logDUtils("onComplete:");
            }
        });
    }
2.2)、效果

主线程运行效果

仔细观察上图输出结果,有没有发现感觉好像很奇异,所有组合全是 第一个 被观察者发射数据之后,第二个被观察者每发射一个,组合一个 ,观察者接收一个。为什么会产生这样的原因呢,因为大家都运行在主线程啊,得排队。具体原因 建议去看 笔者崇拜的大佬的文章 给初学者的RxJava2.0教程(四)

如何解决这个尴尬的问题呢,那就是在不同线程执行咯,show code

3.1)、代码

    /**
     * zip  操作符
     */
    private void zipMethod() {
        Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                logDUtils("Integer:"+1);
                emitter.onNext(1);
                Thread.sleep(1000);
                logDUtils("Integer:"+2);
                emitter.onNext(2);
                Thread.sleep(1000);
                logDUtils("Integer:"+3);
                emitter.onNext(3);
                Thread.sleep(1000);
                logDUtils("Integer:"+4);
                emitter.onNext(4);
                Thread.sleep(1000);
                logDUtils("Integer: onComplete");
                emitter.onComplete();
            }

        }).subscribeOn(Schedulers.io());

        Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                logDUtils("String: aaa");
                emitter.onNext("aaa");
                Thread.sleep(1000);
                logDUtils("String: bbb");
                emitter.onNext("bbb");
                Thread.sleep(1000);
                logDUtils("String: ccc");
                emitter.onNext("ccc");
                Thread.sleep(1000);
                logDUtils("String: onComplete");
                emitter.onComplete();
            }
        }).subscribeOn(Schedulers.io());

        Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
            @Override
            public String apply(Integer integer, String s) throws Exception {
                return integer + s;
            }
        }).subscribe(new Observer<String>() {

            @Override
            public void onSubscribe(Disposable d) {
                logDUtils("onSubscribe:");
            }

            @Override
            public void onNext(String s) {
                logDUtils("onNext:" + s);
            }

            @Override
            public void onError(Throwable e) {
                logDUtils("onError:" + e.getMessage());
            }

            @Override
            public void onComplete() {
                logDUtils("onComplete:");
            }
        });

    }
3.2)、效果

效果

2、combineLatest 操作符

1)、作用

从两个 被观察对象内 将先发射数据的 被观察对象的最新一个数据与另一个 被观察对象每个数据进行结合,之后发射给 观察者

2)、代码
  /**
     * combineLatest 操作符
     */
    private void combineLatestMethod() {
        Observable.combineLatest(Observable.just("a", "b", "c"), Observable
                //从1 开始发射数据 连续发射 4个 开始发射延迟1秒  发射开始后时间间隔为1秒
                .intervalRange(1, 4, 1, 1, TimeUnit.SECONDS), new
                BiFunction<String, Long, String>() {
                    @Override
                    public String apply(String s, Long l) throws Exception {
                        return s + l;
                    }
                }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                logDUtils("onSubscribe");
            }

            @Override
            public void onNext(String s) {
                logDUtils("onNext:" + s);
            }

            @Override
            public void onError(Throwable e) {
                logDUtils("onError:" + e.getMessage());
            }

            @Override
            public void onComplete() {
                logDUtils("onComplete");
            }
        });
    }
3)、效果

效果

3、reduce 操作符

1)、作用

将发射来的数据进行组合,一个被观察对象发射 的数据。

2)、代码
 /**
     * reduce 操作符
     */
    @SuppressLint("CheckResult")
    private void reduceMethod() {
        
        String[] str = {"a", "b", "c", "d"};
        Observable.fromArray(str).reduce(new BiFunction<String, String, String>() {
            @Override
            public String apply(String s, String s2) throws Exception {
                logDUtils(s+" 与 "+s2+" 组合");
                return s + s2;
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                logDUtils("accept:" + s);
            }
        });
    }
3)、效果

效果

4、collect 操作符

1)、作用

将多个 被观察对象发射的数据 进行组合,添加到一个指定的容器内,有一点像 list 的 add

2)、代码

    /**
     * collect 修饰符
     */
    @SuppressLint("CheckResult")
    private void collectMethod() {
        Observable.just("a", "b", "ccc", "d").collect(new Callable<ArrayList<String>>() {
            @Override
            public ArrayList<String> call() throws Exception {
                //创建收集容器
                return new ArrayList<>();
            }
        }, new BiConsumer<ArrayList<String>, String>() {
            @Override
            public void accept(ArrayList<String> strings, String s) throws Exception {
                //将数据 添加到容器  发射
                strings.add(s);
            }
        }).subscribe(new Consumer<ArrayList<String>>() {
            @Override
            public void accept(ArrayList<String> strings) throws Exception {
                logDUtils(Arrays.toString(strings.toArray()));
            }
        });
    }
3)、效果

效果

5、concat 操作符 和 concatArray 操作符

1)、作用

组合多个被观察者数据然后一起发送,合并后 按发送数据有序

concat 只能组合 小于等于 4 组被观察者的数据、concatArray 可以组合大于 4 组被观察者对象

2)、代码
   /**
     * concat 操作符 和  concatArray 操作符
     */
    private void concatMethod() {
        Observable
                .concat(Observable.just(1, 2, 3, 4), Observable.just("a", "b", "c"),
                        Observable.just(0, 9, 55, 99), Observable.just("一", "二"))
                .subscribe(new Observer<Serializable>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        logDUtils("onSubscribe:" + d);
                    }

                    @Override
                    public void onNext(Serializable serializable) {
                        logDUtils("onNext:" + serializable.toString());
                    }

                    @Override
                    public void onError(Throwable e) {
                        logDUtils("onError:" + e.getMessage());
                    }

                    @Override
                    public void onComplete() {
                        logDUtils("onComplete:");
                    }
                });
    }
3)、效果

效果

6、merge操作符 和 mergeArray 操作符

1)、作用

使用时间为节点 同一个时间节点 内数据一同发射;
执行完最长的 被观察者 终止发射;

2)、代码
/**
     * merge 操作符 和 mergeArray 操作符
     */
    private void mergeMethod() {
        Observable.merge(Observable.intervalRange(2, 5, 1, 1, TimeUnit.SECONDS)
                , Observable.intervalRange(0, 8, 2, 3, TimeUnit.SECONDS))
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        logDUtils("onSubscribe:");
                    }

                    @Override
                    public void onNext(Long aLong) {
                        logDUtils("onNext:" + aLong);
                    }

                    @Override
                    public void onError(Throwable e) {
                        logDUtils("onError:" + e.getMessage());
                    }

                    @Override
                    public void onComplete() {
                        logDUtils("onComplete:");
                    }
                });
    }
3)、效果

效果图

7、startWith 操作符 和 startWithArray 操作符

1)、作用

在一个被观察者发射事件前,发射一些数据 / 一个新的被观察者对象

2)、代码
  /**
     * startWith 操作符 和  startWithArray 操作符
     */
    private void startMethod() {
        Observable.just("a", "b", "c").startWith("11")
                .startWithArray("99", "666").subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                logDUtils("onSubscribe:");
            }

            @Override
            public void onNext(String s) {
                logDUtils("onNext:" + s);
            }

            @Override
            public void onError(Throwable e) {
                logDUtils("onError:" + e.getMessage());
            }

            @Override
            public void onComplete() {
                logDUtils("onComplete:");
            }
        });
    }

3)、效果

效果图

8、count 操作符

1)、作用

统计被观察对象 发送数据的 数量

2)、代码
 /**
     * count 操作符
     */
    @SuppressLint("CheckResult")
    private void countMethod() {
        Observable.just(1, 2, 3, 4).count().subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {
                logDUtils("发射数据数量:" + aLong);
            }
        });
    }
3)、效果

效果

9、concatDelayError 操作符 和 mergeDelayError 操作符 和 combineLatestDelayError 操作符

作用:

保证合并中的 代码一方出现错误,另一位的合并项科研继续运行 分别用于 concat 、merge 、combineLatest操作符。

相关文章
|
安全 物联网 传感器
带你读《工业物联网安全》之一:一个前所未有的机会
本书为读者提供了针对IIoT安全各个方面的综合理解,以及用来构建部署安全IIoT解决方案的实践技术。书中介绍IIoT安全的基本原则、威胁模型、参考架构,以及现实生活中的实例分析学习,涵盖了用来设计基于风险安全控制方案的各种实用工具,并且深入讨论了多层防御相关技术,包括IAM、终端安全、互联技术以及基于边界和云环境的应用。读者能够从本书中获得保护IIoT生命周期流程、标准化、治理与评估新兴技术适用性方面的实用经验,从而实现成规模、可靠且具有社会效益的互联系统。
|
uml 测试技术 数据库
kde
|
4天前
|
JSON Linux 数据格式
Docker镜像加速指南:手把手教你配置国内镜像源
配置国内镜像源可大幅提升 Docker 拉取速度,解决访问 Docker Hub 缓慢问题。本文详解 Linux、Docker Desktop 配置方法,并提供测速对比与常见问题解答,附最新可用镜像源列表,助力高效开发部署。
kde
2394 6
|
13天前
|
Java Linux Maven
2025年最新版最细致Maven安装与配置指南(任何版本都可以依据本文章配置)
本文详细介绍了Maven的项目管理工具特性、安装步骤和配置方法。主要内容包括: Maven概述:解释Maven作为基于POM的构建工具,具备依赖管理、构建生命周期和仓库管理等功能。 安装步骤: 从官网下载最新版本 解压到指定目录 创建本地仓库文件夹 关键配置: 修改settings.xml文件 配置阿里云和清华大学镜像仓库以加速依赖下载 设置本地仓库路径 附加说明:包含详细的配置示例和截图指导,适用于各种操作系统环境。 本文提供了完整的Maven安装和配置
2025年最新版最细致Maven安装与配置指南(任何版本都可以依据本文章配置)
|
7天前
|
人工智能 定位技术 API
Dify MCP 保姆级教程来了!
大语言模型,例如 DeepSeek,如果不能联网、不能操作外部工具,只能是聊天机器人。除了聊天没什么可做的。
658 5
|
4天前
|
JavaScript Ubuntu IDE
国内如何安装和使用 Claude Code镜像教程 - Windows 用户篇
国内如何安装和使用 Claude Code镜像教程 - Windows 用户篇
451 0
|
8天前
|
数据采集 JSON API
Excel数据治理新思路:引入智能体实现自动纠错【Python+Agent】
本文介绍如何利用智能体与Python代码批量处理Excel中的脏数据,解决人工录入导致的格式混乱、逻辑错误等问题。通过构建具备数据校验、异常标记及自动修正功能的系统,将数小时的人工核查任务缩短至分钟级,大幅提升数据一致性和办公效率。
|
2天前
|
人工智能 Java Spring
【保姆级图文详解】大模型、Spring AI编程调用大模型
【保姆级图文详解】大模型、Spring AI编程调用大模型
240 5
【保姆级图文详解】大模型、Spring AI编程调用大模型
|
6天前
|
人工智能 大数据 开发者
让AI时代的卓越架构触手可及,阿里云技术解决方案开放免费试用
阿里云推出基于场景的解决方案免费试用活动,新老用户均可领取100点试用点,完成部署还可再领最高100点,相当于一年可获得最高200元云资源。覆盖AI、大数据、互联网应用开发等多个领域,支持热门场景如DeepSeek部署、模型微调等,助力企业和开发者快速验证方案并上云。
258 18
让AI时代的卓越架构触手可及,阿里云技术解决方案开放免费试用
|
7天前
|
存储 人工智能 自然语言处理
DeepSeek R1+Open WebUI实现本地知识库的搭建和局域网访问
本文介绍了使用 DeepSeek R1 和 Open WebUI 搭建本地知识库的详细步骤与注意事项,涵盖核心组件介绍、硬件与软件准备、模型部署、知识库构建及问答功能实现等内容,适用于本地文档存储、向量化与检索增强生成(RAG)场景的应用开发。
335 0

热门文章

最新文章