RxJava 实例

简介: RxJava 实例

1.定义

RxJava 是一个 基于事件流、实现异步操作的库

2.作用

用于实现异步操作,类似于 Android中的 AsyncTaskHandler+new Thread的作用

3. 特点

由于 RxJava的使用方式是:基于事件流的链式调用,所以使得 RxJava

  • 逻辑简洁
  • 实现优雅
  • 使用简单

更重要的是,随着程序逻辑的复杂性提高,它依然能够保持简洁 & 优雅

  • Rxjava原理 基于 一种扩展的观察者模式
  • Rxjava的扩展观察者模式中有4个角色:
角色 作用 类比
被观察者(Observable) 产生事件 顾客
观察者(Observer) 接收事件,并给出响应动作 厨房
订阅(Subscribe) 连接 被观察者 & 观察者 服务员
事件(Event) 被观察者 & 观察者 沟通的载体 菜式

4.使用

导包

implementation 'io.reactivex.rxjava2:rxjava:2.1.3'
implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'
implementation 'com.jakewharton.rxbinding2:rxbinding:2.2.0'

1.定义被观察者

三种方式:Observable.create、Observable.just、Observable.fromArray

    public void buyFood(){
        Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
                    e.onNext("potato");
                    e.onNext("tomato");
                    e.onNext("noodles");
                    e.onComplete();
            }
        });
 
        Observable<String> observable1 = Observable.just("potato","tomato","noodles");
        String [] foods = new String[]{"potato","tomato","noodles"};
        Observable<String> observable2 = Observable.fromArray(foods);
    }

2.定义观察者

使用observer或者subscriber

    Observer<String> mObserver = new Observer<String>() {
        @Override
        public void onSubscribe(@NonNull Disposable d) {
 
        }
 
        @Override
        public void onNext(@NonNull String s) {
 
        }
 
        @Override
        public void onError(@NonNull Throwable e) {
 
        }
 
        @Override
        public void onComplete() {
 
        }
    };
    Subscriber<String> mStringSubscriber = new Subscriber<String>() {
        @Override
        public void onSubscribe(Subscription s) {
            
        }
 
        @Override
        public void onNext(String s) {
 
        }
 
        @Override
        public void onError(Throwable t) {
 
        }
 
        @Override
        public void onComplete() {
 
        }
    };

特别注意:2种方法的区别,即Subscriber 抽象类与Observer 接口的区别

// 相同点:二者基本使用方式完全一致(实质上,在RxJava的 subscribe 过程中,Observer总是会先被转换成Subscriber再使用)

// 不同点:Subscriber抽象类对 Observer 接口进行了扩展,新增了两个方法:

   // 1\. onStart():在还未响应事件前调用,用于做一些初始化工作

   // 2\. unsubscribe():用于取消订阅。在该方法被调用后,观察者将不再接收 & 响应事件

   // 调用该方法前,先使用 isUnsubscribed() 判断状态,确定被观察者Observable是否还持有观察者Subscriber的引用,如果引用不能及时释放,就会出现内存泄露

3.订阅

observable.subscribe(observer);

5.优雅实现

        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
                    e.onNext("potato");
                    e.onNext("tomato");
                    e.onNext("noodles");
                    e.onComplete();
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
 
            }
 
            @Override
            public void onNext(@NonNull String s) {
 
            }
 
            @Override
            public void onError(@NonNull Throwable e) {
 
            }
 
            @Override
            public void onComplete() {
 
            }
        });

其他方法

observeOn:观察者的执行线程

observeOn 作用于该操作符之后直到出现新的observeOn操作符

subscribeOn:被观察者的执行线程

subscribeOn 作用于该操作符之前的 Observable 的创建操符作以及 doOnSubscribe 操作符 ,换句话说就是 doOnSubscribe 以及 Observable 的创建操作符总是被其之后最近的 subscribeOn 控制

实例

    //将字符串转换成double类型
    String path = "12.3";
    public void test(){
        //输入事件
        Observable.just(path)
                //对事件进行处理
                .map(new Function<String, Double>() {
            @Override
            public Double apply(@NonNull String s) throws Exception {
                return Double.parseDouble(s);
            }
        })
                //指定被观察者执行线程
                .subscribeOn(Schedulers.io())
                //指定观察者执行线程
                .observeOn(AndroidSchedulers.mainThread())
                //订阅观察者
                .subscribe(
                new Observer<Double>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {
 
                    }
 
                    @Override
                    public void onNext(@NonNull Double aDouble) {
 
                    }
 
                    @Override
                    public void onError(@NonNull Throwable e) {
 
                    }
 
                    @Override
                    public void onComplete() {
 
                    }
                });
    }

Rxjava中的Scheduler相当于线程控制器,Rxjava通过它来指定每一段代码应该运行在什么样的线程。Rxjava提供了5种调度器:

  • .io()
  • .computation()
  • .immediate()
  • .newThread()
  • .trampoline()


另外,Android还有一个专用的AndroidSchedulers.mainThread()指定操作将在Android主线程运行。Rxjava通过subscribeOn()observeOn()两个方法来对线程进行控制,subscribeOn()指定subscribe()时间发生的线程,也就是事件产生的线程,observeOn()指定Subscriber做运行的线程,也就是消费事件的线程。


Schedulers.io() 用于I/O操作,比如:读写文件,数据库,网络交互等等。行为模式和newThread()差不多,重点需要注意的是线程池是无限制的,大量的I/O调度操作将创建许多个线程并占用内存


Schedulers.computation()计算工作默认的调度器,这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。


Schedulers.immediate()  这个调度器允许你立即在当前线程执行你指定的工作。这是默认的Scheduler


Schedulers.newThread()    它为指定任务启动一个新的线程。


Schedulers.trampoline()

当我们想在当前线程执行一个任务时,并不是立即,我们可以用trampoline() 将它入队。这个调度器将会处理它的队列并且按序运行队列中每一个任务。

RXView 防抖

    private void test3() {
        RxView.clicks(控件).throttleFirst(1000, TimeUnit.MILLISECONDS).subscribe(new Observer<Object>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
 
            }
 
            @Override
            public void onNext(@NonNull Object o) {
                Log.e(TAG, "onNext: 响应事件....." );
            }
 
            @Override
            public void onError(@NonNull Throwable e) {
 
            }
 
            @Override
            public void onComplete() {
 
            }
        });

RxJava篇(应用场景) - 简书 (jianshu.com)


目录
相关文章
Cocos Creator3.8 项目实战(五)背景无限滚屏效果如何实现
Cocos Creator3.8 项目实战(五)背景无限滚屏效果如何实现
753 0
|
消息中间件 Java 关系型数据库
10道不得不会的Docker面试题
10道不得不会的Docker面试题,10道不得不会的Docker面试题
9691 1
10道不得不会的Docker面试题
|
Java Spring 前端开发
Spring Boot 中文乱码问题解决方案汇总
使用 Spring Boot 开发,对外开发接口供调用,传入参数中有中文,出现中文乱码,查了好多资料,总结解决方法如下: 第一步,约定传参编码格式 不管是使用httpclient,还是okhttp,都要设置传参的编码,为了统一,这里全部设置为utf-8 第二步,修改application.
11032 4
|
Java
Java编程:基于socket实现局域网双人联机对战五子棋
Java编程:基于socket实现局域网双人联机对战五子棋
1307 0
Java编程:基于socket实现局域网双人联机对战五子棋
|
监控 小程序 前端开发
C#医院预约挂号小程序源码(前端+后台)
线上预约挂号系统构建了医院和患者的连接,通过改善患者院内的就医服务流程,以微信公众号、支付宝小程序为患者服务入口,为居民提供导诊、预约、支付、报告查询等线上线下一体化的就医服务,缩短患者就诊环节,提高医疗机构服务效率。
419 0
|
Cloud Native Go Docker
云原生之使用Docker部署caddy网站服务器
云原生之使用Docker部署caddy网站服务器
821 2
云原生之使用Docker部署caddy网站服务器
|
Arthas 监控 IDE
Arthas(Java 应用诊断利器)
Arthas(Java 应用诊断利器)
Arthas(Java 应用诊断利器)
|
Java Maven 数据安全/隐私保护
SpringBoot接口中如何直接返回图片数据
SpringBoot接口中如何直接返回图片数据
|
人工智能 Java C++
python入门(五) vscode配置Anaconda 环境,代码自动提示
python入门(五) vscode配置Anaconda 环境,代码自动提示
1223 0
|
网络协议 Ubuntu Linux
nmap流量特征修改
nmap流量特征修改
nmap流量特征修改