Android高级开发之【RxJava】详解(附项目源码)

简介: 文章大纲 一、什么是RxJava 二、为什么要用RxJava 三、RxJava使用详解 四、项目源码下载 一、什么是RxJava Rx(Reactive Extensions)是一个库,用来处理事件和异步任务,在很多语言上都有实现,RxJava是Rx在Java上的实现。

文章大纲

一、什么是RxJava
二、为什么要用RxJava
三、RxJava使用详解
四、项目源码下载

一、什么是RxJava

Rx(Reactive Extensions)是一个库,用来处理事件和异步任务,在很多语言上都有实现,RxJava是Rx在Java上的实现。简单来说,RxJava就是处理异步的一个库,最基本是基于观察者模式来实现的。通过Obserable和Observer的机制,实现所谓响应式的编程体验。

二、为什么要用RxJava

比如说一个庞大的项目,一个事件传递的整个过程可能要经历很多方法,方法套方法,每个方法的位置七零八落,一个个方法跳进去看,跳过去跳过来很容易把脑袋弄晕,不够直观。但是Rxjava可以把所有逻辑用链式加闭包的方式呈现,做了哪些操作,谁在前谁在后非常直观,逻辑清晰,维护就会非常轻松。就算不是你写的你也可以很快的了解,你可以把它看作一条河流,整个过程就是对里面的水流做进行加工。懂了这个特性我们才知道在复杂的逻辑中运用Rxjava是多么的重要。
假设有这样一个需求:界面上有一个自定义的视图 imageCollectorView ,它的作用是显示多张图片,并能使用 addImage(Bitmap) 方法来任意增加显示的图片。现在需要程序将一个给出的目录数组 File[] folders 中每个目录下的 png 图片都加载出来并显示在 imageCollectorView 中。需要注意的是,由于读取图片的这一过程较为耗时,需要放在后台执行,而图片的显示则必须在 UI 线程执行。常用的实现方式有多种,我这里贴出其中一种:

new Thread() {
    @Override
    public void run() {
        super.run();
        for (File folder : folders) {
            File[] files = folder.listFiles();
            for (File file : files) {
                if (file.getName().endsWith(".png")) {
                    final Bitmap bitmap = getBitmapFromFile(file);
                    getActivity().runOnUiThread(new Runnable() {
                        @Override
                        public void run() {
                            imageCollectorView.addImage(bitmap);
                        }
                    });
                }
            }
        }
    }
}.start();

而如果使用 RxJava ,实现方式是这样的:

Observable.from(folders)
    .flatMap(new Func1<File, Observable<File>>() {
        @Override
        public Observable<File> call(File file) {
            return Observable.from(file.listFiles());
        }
    })
    .filter(new Func1<File, Boolean>() {
        @Override
        public Boolean call(File file) {
            return file.getName().endsWith(".png");
        }
    })
    .map(new Func1<File, Bitmap>() {
        @Override
        public Bitmap call(File file) {
            return getBitmapFromFile(file);
        }
    })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Action1<Bitmap>() {
        @Override
        public void call(Bitmap bitmap) {
            imageCollectorView.addImage(bitmap);
        }
    });

三、RxJava使用详解

1. RxJava设计模式

RxJava 的异步实现,是通过一种扩展的观察者模式来实现的。
观察者模式面向的需求是:A 对象(观察者)对 B 对象(被观察者)的某种变化高度敏感,需要在 B 变化的一瞬间做出反应。举个例子,新闻里喜闻乐见的警察抓小偷,警察需要在小偷伸手作案的时候实施抓捕。在这个例子里,警察是观察者,小偷是被观察者,警察需要时刻盯着小偷的一举一动,才能保证不会漏过任何瞬间。程序的观察者模式和这种真正的『观察』略有不同,观察者不需要时刻盯着被观察者(例如 A 不需要每过 2ms 就检查一次 B 的状态),而是采用注册(Register)或者称为订阅(Subscribe)的方式,告诉被观察者:我需要你的某某状态,你要在它变化的时候通知我。 Android 开发中一个比较典型的例子是点击监听器 OnClickListener 。对设置 OnClickListener 来说, View 是被观察者, OnClickListener 是观察者,二者通过 setOnClickListener() 方法达成订阅关系。订阅之后用户点击按钮的瞬间,Android Framework 就会将点击事件发送给已经注册的 OnClickListener 。采取这样被动的观察方式,既省去了反复检索状态的资源消耗,也能够得到最高的反馈速度。当然,这也得益于我们可以随意定制自己程序中的观察者和被观察者,而警察叔叔明显无法要求小偷『你在作案的时候务必通知我』。

OnClickListener 的模式大致如下图:

RxJava 的观察者模式
RxJava 有四个基本概念:Observable (可观察者,即被观察者)、 Observer (观察者)、 subscribe (订阅)、事件。Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer
与传统观察者模式不同, RxJava 的事件回调方法除了普通事件 onNext() (相当于 onClick() / onEvent())之外,还定义了两个特殊的事件:onCompleted() 和 onError()

  • onCompleted(): 事件队列完结。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。RxJava 规定,当不会再有新的 onNext() 发出时,需要触发 onCompleted()方法作为标志。
  • onError(): 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。
  • 在一个正确运行的事件序列中, onCompleted() 和 onError() 有且只有一个,并且是事件序列中的最后一个。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。

RxJava 的观察者模式大致如下图:

开始接入RxJava之间,添加依赖

dependencies {
  compile 'io.reactivex:rxandroid:1.2.1'
  compile 'io.reactivex:rxjava:1.1.6'
  }

2. 创建RxJava几种方式

方式1:简单创建Rxjava

/**
     * 简单创建Rxjava
     *
     * Observable是被观察者,创建后传入一个OnSubscribe对象,当Observable(观察者)调用subscribe进行注册观察者时,OnSubscribe的call方法会触发。
     ObservableEmitter: Emitter 是发射器的意思,它可以发出三种类型的事件,与之对应的。
     Observer有三个回调方法:

     onNext:接受到一个事件
     onCompleted:接受完事件后调用,只会调用一次
     onError :发生错误时调用,并停止接受事件,调用一次

     注:onCompleted和onError不会同时调用,只会调用其中之一
     */
    public static void createOne() {

        //创建被观察者
        Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("Hello");
                subscriber.onNext("吴");
                subscriber.onNext("晓畅");
                subscriber.onCompleted();
            }
        });

        Subscriber<String> subscriber = new Subscriber<String>() {
            @Override
            public void onNext(String s) {

                System.out.println("Item: " + s);
            }

            ////事件队列完结,RxJava 规定,当不会再有新的 onNext() 发出时,需要触发 onCompleted() 方法作为标志。
            @Override
            public void onCompleted() {

                System.out.println("Completed!");
            }

            ////事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。
            @Override
            public void onError(Throwable e) {
                System.out.println("Error!");
            }
        };

        observable.subscribe(subscriber);

    }

运行结果如下所示:

方式2:just(T...): 将传入的参数依次发送出来

public static void createTwo()
    {

        //相当于
        // 将会依次调用:
        // onNext("Hello");
        // onNext("Hi");
        // onNext("Aloha");
        // onCompleted();
        Observable observable = Observable.just("Hello", "wu", "xiaochang");

        Subscriber<String> subscriber = new Subscriber<String>() {
            @Override
            public void onNext(String s) {

                System.out.println("Item: " + s);
            }

            ////事件队列完结,RxJava 规定,当不会再有新的 onNext() 发出时,需要触发 onCompleted() 方法作为标志。
            @Override
            public void onCompleted() {

                System.out.println("Completed!");
            }

            ////事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。
            @Override
            public void onError(Throwable e) {
                System.out.println("Error!");
            }
        };

        observable.subscribe(subscriber);

    }

运行结果如下所示:

方式3:将传入的数组或 Iterable 拆分成具体对象后,依次发送出来

 public static void createThree()
    {

        String[] words = {"Hello", "wu", "xiaochang"};

        //相当于
        // 将会依次调用:
        // onNext("Hello");
        // onNext("Hi");
        // onNext("Aloha");
        // onCompleted();
        Observable observable = Observable.from(words);

        Subscriber<String> subscriber = new Subscriber<String>() {
            @Override
            public void onNext(String s) {

                System.out.println("Item: " + s);
            }

            ////事件队列完结,RxJava 规定,当不会再有新的 onNext() 发出时,需要触发 onCompleted() 方法作为标志。
            @Override
            public void onCompleted() {

                System.out.println("Completed!");
            }

            ////事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。
            @Override
            public void onError(Throwable e) {
                System.out.println("Error!");
            }
        };

        observable.subscribe(subscriber);
    }

运行结果如下图所示:

方式4:发送多种类型参数

/**
     *发送多种类型参数
     */
    public static void createFour()
    {
        //Just类似于From,但是From会将数组或Iterable的元素具取出然后逐个发射,而Just只是简单的原样发射,将数组或Iterable当做单个数据。
        //Just接受一至九个参数,返回一个按参数列表顺序发射这些数据的Observable
        Observable justObservable = Observable.just(1, "someThing", false, 3.256f, "NewYork");
        justObservable.subscribe(new Subscriber() {
            @Override
            public void onCompleted() {
                System.out.println("onCompleted!");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println(e.getMessage());
            }

            @Override
            public void onNext(Object o) {

                System.out.println(o);
            }
        });
    }

运行结果如下所示:

方式5:自定义Subscriber

   /**
     * 自定义Subscriber
     */
    public static void createFive()
    {

        Observable observable = Observable.just("Hello", "Hi", "Aloha");

        Action1<String> onNextAction = new Action1<String>() {
            // onNext()
            @Override
            public void call(String s) {
                System.out.println(s);
            }
        };
        Action1<Throwable> onErrorAction = new Action1<Throwable>() {
            // onError()
            @Override
            public void call(Throwable throwable) {
                // Error handling
            }
        };
        Action0 onCompletedAction = new Action0() {
            // onCompleted()
            @Override
            public void call() {
                System.out.println("completed");
            }
        };

        // 自动创建 Subscriber ,并使用 onNextAction 来定义 onNext()
                observable.subscribe(onNextAction);
        // 自动创建 Subscriber ,并使用 onNextAction 和 onErrorAction 来定义 onNext() 和 onError()
                observable.subscribe(onNextAction, onErrorAction);
        // 自动创建 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 来定义 onNext()、 onError() 和 onCompleted()
                observable.subscribe(onNextAction, onErrorAction, onCompletedAction);
    }

运行结果如下图所示:

3. 创建观察者方法

创建方式如下:

        Observer<String> observer = new Observer<String>() {
            @Override
            public void onNext(String s) {
                Log.d(tag, "Item: " + s);
            }

            @Override
            public void onCompleted() {
                Log.d(tag, "Completed!");
            }

            @Override
            public void onError(Throwable e) {
                Log.d(tag, "Error!");
            }
        };

        //创建方式2
        Subscriber<String> subscriber = new Subscriber<String>() {
            @Override
            public void onNext(String s) {
                Log.d("MainActivity", "Item: " + s);
            }

            @Override
            public void onCompleted() {
                Log.d("MainActivity", "Completed!");
            }

            @Override
            public void onError(Throwable e) {
                Log.d("MainActivity", "Error!");
            }
        };

实质上,在 RxJava 的 subscribe 过程中,Observer 也总是会先被转换成一个 Subscriber 再使用。所以如果你只想使用基本功能,选择 Observer 和 Subscriber 是完全一样的。它们的区别对于使用者来说主要有两点:
onStart(): 这是 Subscriber 增加的方法。它会在 subscribe 刚开始,而事件还未发送之前被调用,可以用于做一些准备工作,例如数据的清零或重置。这是一个可选方法,默认情况下它的实现为空。需要注意的是,如果对准备工作的线程有要求(例如弹出一个显示进度的对话框,这必须在主线程执行), onStart() 就不适用了,因为它总是在 subscribe 所发生的线程被调用,而不能指定线程。要在指定的线程来做准备工作,可以使用 doOnSubscribe() 方法,具体可以在后面的文中看到。
unsubscribe(): 这是 Subscriber 所实现的另一个接口 Subscription 的方法,用于取消订阅。在这个方法被调用后,Subscriber 将不再接收事件。一般在这个方法调用前,可以使用 isUnsubscribed() 先判断一下状态。 unsubscribe() 这个方法很重要,因为在 subscribe() 之后, Observable 会持有 Subscriber 的引用,这个引用如果不能及时被释放,将有内存泄露的风险。所以最好保持一个原则:要在不再使用的时候尽快在合适的地方(例如 onPause() onStop() 等方法中)调用 unsubscribe() 来解除引用关系,以避免内存泄露的发生。

3. 线程Scheduler (调度器)

在不指定线程的情况下, RxJava 遵循的是线程不变的原则,即:在哪个线程调用 subscribe(),就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。如果需要切换线程,就需要用到 Scheduler (调度器)。
在RxJava 中,Scheduler ——调度器,相当于线程控制器,RxJava 通过它来指定每一段代码应该运行在什么样的线程。RxJava 已经内置了几个 Scheduler ,它们已经适合大多数的使用场景:
Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。
Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
另外, Android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。
有了这几个 Scheduler ,就可以使用 subscribeOn() 和 observeOn() 两个方法来对线程进行控制了。 * subscribeOn(): 指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程。或者叫做事件产生的线程。 * observeOn(): 指定 Subscriber 所运行在的线程。或者叫做事件消费的线程。

public class RxJavaScheduler {

    public static void showScheduler()
    {
        Observable.just(1, 2, 3, 4)
//                .subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程

//                .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程

                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer number) {

                        System.out.println("number:" + number);
                    }
                });

    }

    public static void main(String[] args) {

        showScheduler();
    }

}

四、项目源码下载

链接:https://pan.baidu.com/s/1Na7DH_N2rf-pXEadmQxgUQ
密码:xvr2

Android前沿技术

关于RXJava的全部学习内容,我们这边都有系统的知识体系以及进阶视频资料,有需要的朋友可以加群免费领取安卓进阶视频教程,源码,面试资料,群内有大牛一起交流讨论技术;818520403
(包括自定义控件、NDK、架构设计、混合式开发工程师(React native,Weex)、性能优化、完整商业项目开发等)

Android高级进阶视频教程
相关文章
|
7天前
|
开发框架 前端开发 Android开发
安卓与iOS开发中的跨平台策略
在移动应用开发的战场上,安卓和iOS两大阵营各据一方。随着技术的演进,跨平台开发框架成为开发者的新宠,旨在实现一次编码、多平台部署的梦想。本文将探讨跨平台开发的优势与挑战,并分享实用的开发技巧,帮助开发者在安卓和iOS的世界中游刃有余。
|
12天前
|
搜索推荐 Android开发 开发者
探索安卓开发中的自定义视图:打造个性化UI组件
【10月更文挑战第39天】在安卓开发的世界中,自定义视图是实现独特界面设计的关键。本文将引导你理解自定义视图的概念、创建流程,以及如何通过它们增强应用的用户体验。我们将从基础出发,逐步深入,最终让你能够自信地设计和实现专属的UI组件。
|
14天前
|
Android开发 Swift iOS开发
探索安卓与iOS开发的差异和挑战
【10月更文挑战第37天】在移动应用开发的广阔舞台上,安卓和iOS这两大操作系统扮演着主角。它们各自拥有独特的特性、优势以及面临的开发挑战。本文将深入探讨这两个平台在开发过程中的主要差异,从编程语言到用户界面设计,再到市场分布的不同影响,旨在为开发者提供一个全面的视角,帮助他们更好地理解并应对在不同平台上进行应用开发时可能遇到的难题和机遇。
|
16天前
|
XML 存储 Java
探索安卓开发之旅:从新手到专家
【10月更文挑战第35天】在数字化时代,安卓应用的开发成为了一个热门话题。本文旨在通过浅显易懂的语言,带领初学者了解安卓开发的基础知识,同时为有一定经验的开发者提供进阶技巧。我们将一起探讨如何从零开始构建第一个安卓应用,并逐步深入到性能优化和高级功能的实现。无论你是编程新手还是希望提升技能的开发者,这篇文章都将为你提供有价值的指导和灵感。
|
14天前
|
存储 API 开发工具
探索安卓开发:从基础到进阶
【10月更文挑战第37天】在这篇文章中,我们将一起探索安卓开发的奥秘。无论你是初学者还是有经验的开发者,这篇文章都将为你提供有价值的信息和建议。我们将从安卓开发的基础开始,逐步深入到更复杂的主题,如自定义组件、性能优化等。最后,我们将通过一个代码示例来展示如何实现一个简单的安卓应用。让我们一起开始吧!
|
15天前
|
存储 XML JSON
探索安卓开发:从新手到专家的旅程
【10月更文挑战第36天】在这篇文章中,我们将一起踏上一段激动人心的旅程,从零基础开始,逐步深入安卓开发的奥秘。无论你是编程新手,还是希望扩展技能的老手,这里都有适合你的知识宝藏等待发掘。通过实际的代码示例和深入浅出的解释,我们将解锁安卓开发的关键技能,让你能够构建自己的应用程序,甚至贡献于开源社区。准备好了吗?让我们开始吧!
25 2
|
16天前
|
Android开发
布谷语音软件开发:android端语音软件搭建开发教程
语音软件搭建android端语音软件开发教程!
|
Android开发 数据格式 JSON
【Android架构】基于MVP模式的Retrofit2+RXjava封装之常见问题(四)
先回顾下之前的 【Android架构】基于MVP模式的Retrofit2+RXjava封装(一)【Android架构】基于MVP模式的Retrofit2+RXjava封装之文件下载(二)【Android架构】基于MVP模式的Retrofit2+RXjava封装之文件上传(三) 今天要说的是使用Retrofit2和Okhttp 过程中遇到的一些问题。
1087 0
|
Android开发 流计算
【Android架构】基于MVP模式的Retrofit2+RXjava封装之文件下载(二)
上篇中我们介绍了基于MVP的Retrofit2+RXjava封装,还没有看的点击这里,这一篇我们来说说文件下载的实现。 首先,我们先在ApiServer定义好调用的接口 @GET Observable downloadFile(@Url St...
1378 0
下一篇
无影云桌面