RxJava2 / RxAndroid2的concat拼接多个Observable

简介: RxJava2 / RxAndroid2的concat拼接多个Observable concat操作符和merge类似,把多个Observable拼接成一个可以观察的输出,例如代码: package zhangphil.

RxJava2 / RxAndroid2的concat拼接多个Observable

 

concat操作符和merge类似,把多个Observable拼接成一个可以观察的输出,例如代码:

 

package zhangphil.app;

import android.os.Bundle;
import android.support.annotation.NonNull;
import android.support.annotation.Nullable;
import android.support.v7.app.AppCompatActivity;
import android.util.Log;

import java.util.concurrent.Callable;

import io.reactivex.Observable;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.functions.BiFunction;
import io.reactivex.observers.DisposableObserver;
import io.reactivex.schedulers.Schedulers;

public class MainActivity extends AppCompatActivity {
    private final String TAG = getClass().getSimpleName();
    private CompositeDisposable mCompositeDisposable = new CompositeDisposable();

    @Override
    public void onCreate(@Nullable Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);

        test();
    }

    private void test() {

        DisposableObserver disposableObserver = new DisposableObserver<String>() {
            @Override
            public void onNext(String s) {
                Log.d(TAG, "#####开始#####");
                Log.d(TAG + "数据", String.valueOf(s));
                Log.d(TAG, "#####结束#####");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, e.toString(), e);
            }
        };

        mCompositeDisposable.add(
                Observable.concat(
                        getObservableA(null),
                        getObservableB(null),
                        getObservableA(null),
                        getObservableB(null))
                        .subscribeOn(Schedulers.io())
                        .observeOn(AndroidSchedulers.mainThread())
                        .subscribeWith(disposableObserver));
    }

    @Override
    protected void onDestroy() {
        super.onDestroy();

        // 如果退出程序,就清除后台任务
        mCompositeDisposable.clear();
    }

    private Observable<String> getObservableA(Object o) {
        return Observable.fromCallable(new Callable<String>() {
            @Override
            public String call() throws Exception {
                try {
                    Thread.sleep(500); // 假设此处是耗时操作
                } catch (Exception e) {
                    e.printStackTrace();
                }

                return "A";
            }
        });
    }

    private Observable<String> getObservableB(Object o) {
        return Observable.fromCallable(new Callable<String>() {
            @Override
            public String call() throws Exception {
                try {
                    Thread.sleep(1000); // 假设此处是耗时操作
                } catch (Exception e) {
                    e.printStackTrace();
                }

                return "B";
            }
        });
    }
}

 

输出:

 

05-15 14:39:18.667 14456-14456/zhangphil.app D/MainActivity: #####开始#####
05-15 14:39:18.667 14456-14456/zhangphil.app D/MainActivity数据: A
05-15 14:39:18.667 14456-14456/zhangphil.app D/MainActivity: #####结束#####
05-15 14:39:19.669 14456-14456/zhangphil.app D/MainActivity: #####开始#####
05-15 14:39:19.669 14456-14456/zhangphil.app D/MainActivity数据: B
05-15 14:39:19.669 14456-14456/zhangphil.app D/MainActivity: #####结束#####
05-15 14:39:20.170 14456-14456/zhangphil.app D/MainActivity: #####开始#####
05-15 14:39:20.170 14456-14456/zhangphil.app D/MainActivity数据: A
05-15 14:39:20.170 14456-14456/zhangphil.app D/MainActivity: #####结束#####
05-15 14:39:21.171 14456-14456/zhangphil.app D/MainActivity: #####开始#####
05-15 14:39:21.172 14456-14456/zhangphil.app D/MainActivity数据: B
05-15 14:39:21.172 14456-14456/zhangphil.app D/MainActivity: #####结束#####
05-15 14:39:21.172 14456-14456/zhangphil.app D/MainActivity: onComplete

 
相关文章
|
Java C++ Python
快讯:LeetCode中国正式上线《剑指Offer》题目,刷题真方便了!
近日,LeetCode中国[1]上线了一个全新的分类模块 LCOF “剑指 Offer[2]”。
7909 0
快讯:LeetCode中国正式上线《剑指Offer》题目,刷题真方便了!
|
11月前
|
缓存 监控 Java
如何运用JAVA开发API接口?
本文详细介绍了如何使用Java开发API接口,涵盖创建、实现、测试和部署接口的关键步骤。同时,讨论了接口的安全性设计和设计原则,帮助开发者构建高效、安全、易于维护的API接口。
906 4
|
微服务
微服务多机房部署大揭秘:全局单一实例、全局多实例,一文让你彻底解锁!
【8月更文挑战第25天】本文探讨了微服务架构中的多机房部署策略,包括全局单一与多实例、区域及机房多实例等方法,分析了它们在可用性、容错性、扩展性和成本上的差异。示例展示了如何利用AWS CloudFormation实现跨不同机房的微服务部署。这为实际应用场景提供了有价值的参考和指导。
462 2
|
算法 Python
Python函数的嵌套调用:深入理解与应用
Python函数的嵌套调用:深入理解与应用
350 1
|
JavaScript Java 测试技术
基于SpringBoot+Vue+uniapp的校园跑腿管理系统的详细设计和实现(源码+lw+部署文档+讲解等)
基于SpringBoot+Vue+uniapp的校园跑腿管理系统的详细设计和实现(源码+lw+部署文档+讲解等)
364 1
|
机器学习/深度学习 数据采集 人工智能
动手实践:从零开始训练AI模型的全面指南
【7月更文第14天】随着人工智能技术的飞速发展,训练AI模型已成为科研、工程乃至创业领域的热门技能。本文旨在为初学者提供一个清晰、实用的指南,带领大家从零开始,了解并实践如何训练一个人工智能模型。我们将以一个简单的线性回归任务为例,逐步深入,探讨数据预处理、模型构建、训练过程及评估方法,最后展示如何使用Python和深度学习库PyTorch实现这一过程。
6740 0
|
JavaScript Java 关系型数据库
Springboot+vue的员工绩效考核管理系统(有报告),Javaee项目,springboot vue前后端分离项目。
Springboot+vue的员工绩效考核管理系统(有报告),Javaee项目,springboot vue前后端分离项目。
|
负载均衡 监控 Java
微服务架构 | 4.2 基于 Feign 与 OpenFeign 的服务接口调用
Feign 是一个声明式的Web服务客户端,让编写 Web 服务客户端变得非常容易,只需创建一个接口并在接口上添加注解即可;
989 1
微服务架构 | 4.2 基于 Feign 与 OpenFeign 的服务接口调用
|
开发框架 JavaScript 前端开发
ASP.NET WebApi+Vue前后端分离之允许启用跨域请求
ASP.NET WebApi+Vue前后端分离之允许启用跨域请求
415 0
|
前端开发
细读 React | PureComponet
今天来聊一聊 React.Component、React.PureComponent、React.memo 的一些区别以及使用场景。
255 0