RxJava2学习笔记(3)

简介: 接上回继续,今天来学习下zip(打包)操作 一、zip操作 @Test public void zipTest() { Observable.zip(Observable.

上回继续,今天来学习下zip(打包)操作

一、zip操作

    @Test
    public void zipTest() {
        Observable.zip(Observable.create(emitter -> {
            for (int i = 0; i < 10; i++) {
                emitter.onNext(100 + i);
            }
        }), Observable.create(emitter -> {
            for (int i = 0; i < 5; i++) {
                emitter.onNext(new Character((char) (65 + i)));
            }
        }), (integer, character) -> integer + "" + character).subscribe(s -> System.out.println(s));
    }

zip字面意义,就是打包操作,把多个Obserable合并在一起,形成一个新的Obserable,类似文件1、文件2 ... 文件n,合成一个新文件。上面这段代码的输出:

100A
101B
102C
103D
104E

第1个生产者,发射了10个数字(100~109),第1个生产者发射了5个字符(A~E),合并处理时,会把 “数字+字符",变成一个新字符串,然后继续发射。注意:这里有一个类似"木桶原理",即决定一个木桶能盛多少水的,永远是最短的那块木头。10发A型子弹 + 5发B型子弹,按1:1来合成,最终只有得到5发新型子弹。

 

二、限流

生产者-消费者模型中,有可能会遇到这样一种情况:生产者精力旺盛,狂生产数据,然后消费者力不从心,根本来不及处理,这样上游就堵住了,严重的话,可能导致内存耗尽。最简单的办法,就是把来不及处理的内容给扔掉(即:丢弃策略)。刚刚提到的zip操作中的木桶原理,就可以派上用场了。

    @Test
    public void zipTest1() throws InterruptedException {
        Observable.zip(Observable.create(emitter -> {
            for (int i = 0; ; i++) { //一直不停的发
                emitter.onNext(i);
            }
        }).subscribeOn(Schedulers.newThread()), Observable.create(emitter -> {
            for (int i = 0; i < 5; i++) {
                emitter.onNext(0); //这里技巧性的处理:发1个0过去
            }
        }).subscribeOn(Schedulers.newThread()),
                (BiFunction<Object, Object, Object>) (i1, i2) -> (Integer) i1 + (Integer) i2) //1个数字+0,不影响原值
                .subscribe(integer -> System.out.println(integer));

        Thread.sleep(200);
    }

  输出:

0
1
2
3
4

  如果是字符串,可以参考下面这样处理:

        Observable.zip(Observable.create(emitter -> {
                    for (int i = 0; ; i++) {
                        emitter.onNext("A" + i);
                    }
                }).subscribeOn(Schedulers.newThread()), Observable.create(emitter -> {
                    for (int i = 0; i < 5; i++) {
                        emitter.onNext("");
                    }
                }).subscribeOn(Schedulers.newThread()),
                (BiFunction<Object, Object, Object>) (i1, i2) -> (String) i1 + (String) i2)
                .subscribe(s -> System.out.println(s));
        Thread.sleep(200);

  输出:

A0
A1
A2
A3
A4

 

三、Flowable

刚才用zip这种"奇淫技巧"实现了限流,但其实rxjava还有更科学的做法(Flowable)。再思考一下“限流”这种场景,生产者太猛,一下喷出来的量太多,而消费者太弱,完全吸收不下。比较温和的方式,最好是生产者喷发前先问下消费者,你1次能接承受多大的量?我根据你的能力来

作者: 菩提树下的杨过
出处: http://yjmyzz.cnblogs.com
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
目录
相关文章
|
12月前
|
存储 分布式计算 Hadoop
Hadoop-33 HBase 初识简介 项目简介 整体架构 HMaster HRegionServer Region
Hadoop-33 HBase 初识简介 项目简介 整体架构 HMaster HRegionServer Region
150 2
|
11月前
计算标准偏差
【10月更文挑战第30天】计算标准偏差。
232 3
|
存储 小程序 API
【微信小程序-原生开发+云开发+TDesign】修改用户头像(含wx.chooseMedia,wx.cloud.uploadFile,wx.cloud.deleteFile的使用)
【微信小程序-原生开发+云开发+TDesign】修改用户头像(含wx.chooseMedia,wx.cloud.uploadFile,wx.cloud.deleteFile的使用)
212 0
【微信小程序-原生开发+云开发+TDesign】修改用户头像(含wx.chooseMedia,wx.cloud.uploadFile,wx.cloud.deleteFile的使用)
|
存储 Kubernetes Docker
k8s-配置与存储-配置管理
k8s-配置与存储-配置管理
235 1
|
11月前
|
存储 JSON Java
ELK 圣经:Elasticsearch、Logstash、Kibana 从入门到精通
ELK是一套强大的日志管理和分析工具,广泛应用于日志监控、故障排查、业务分析等场景。本文档将详细介绍ELK的各个组件及其配置方法,帮助读者从零开始掌握ELK的使用。
|
Java C语言 Python
Python安装Jnius库报错DLL load failed:找不到模块
Python安装Jnius库报错DLL load failed:找不到模块
506 0
Python安装Jnius库报错DLL load failed:找不到模块
|
11月前
|
JavaScript 前端开发 容器
jQuery二维码生成插件
jquery.qrcode.js二维码插件允许你在网页中容易的插入二维码,可以生成表格形式的二维码,或基于canvas的图片二维码
|
自然语言处理 算法 机器学习/深度学习
大语言模型 RAG 论文总结(2023~202404)(2)
大语言模型 RAG 论文总结(2023~202404)
660 0
|
人工智能 供应链 算法
量子计算机有什么用?
【8月更文挑战第5天】量子计算机有什么用?
605 3
|
Java 编译器 API
Android使用NDK(从java调用本地函数'JNI')
Android使用NDK(从java调用本地函数'JNI')
370 0
Android使用NDK(从java调用本地函数'JNI')