RxJava1 升级到 RxJava2 所踩过的坑

简介: RxJava1 升级到 RxJava2 所踩过的坑

RxJava2



RxJava2 发布已经有一段时间了,是对 RxJava 的一次重大的升级,由于我的一个库cv4j使用了 RxJava2 来尝鲜,但是 RxJava2 跟 RxJava1 是不能同时存在于一个项目中的,逼不得已我得把自己所有框架中使用 RxJava 的地方以及


App 中使用 RxJava 的地方都升级到最新版本。所以我整理并记录了一些已经填好的坑。


填坑记录



1. RxJava1 跟 RxJava2 不能共存


如果,在同一个module中同时使用RxJava1和RxJava2,类似如下:

compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'io.reactivex.rxjava2:rxjava:2.0.7'
compile 'io.reactivex:rxandroid:1.2.0'
compile 'io.reactivex:rxjava:1.1.5'


那么,很不幸你会遇到这样的错误

image.png

Rxjava1和Rxjava2无法共存.jpeg


同理,在 App 中如果使用了 Rxjava2,但是某个第三方的 library 还在使用 Rxjava1 也会遇到同样的错误。


上面的错误是因为 RxAndroid 2.0.1 本身依赖了 RxJava 2.0.1。我们尝试去掉对 RxJava 的依赖,只留下 RxAndroid 。还是会遇到问题。

compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
//compile 'io.reactivex.rxjava2:rxjava:2.0.7'
compile 'io.reactivex:rxandroid:1.2.0'
//compile 'io.reactivex:rxjava:1.1.5'


image.png

去掉对Rxjava的依赖.jpeg


所以使用RxAndroid不能去掉对RxJava的依赖,我是这样使用的。

compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'io.reactivex.rxjava2:rxjava:2.0.7'


官方也是这样解释的


Because RxAndroid releases are few and far between, it is recommended you also

explicitly depend on RxJava's latest version for bug fixes and new features.


最后,我建议要升级到 RxJava2 的时候必须所有使用的地方都要升级,并且用最新的版本。


2. 新增Flowable


RxJava1 中 Observable 不能很好地支持 backpressure ,会抛出MissingBackpressureException。所以在 RxJava2 中 Observable 不再支持 backpressure ,而使用新增的 Flowable 来支持 backpressure 。关于backpressure 的理解,可以看这篇文章


Flowable的用法跟原先的Observable是一样的。


3. ActionN 和 FuncN 改名


ActionN 和 FuncN 遵循Java 8的命名规则。


其中,Action0 改名成Action,Action1改名成Consumer,而Action2改名成了BiConsumer,而Action3 - Action9都不再使用了,ActionN变成了Consumer<Object[]>。


同样,Func改名成Function,Func2改名成BiFunction,Func3 - Func9 改名成 Function3 - Function9,FuncN 由 Function<Object[], R> 取代。


4. Observable.OnSubscribe 变成  

ObservableOnSubscribe


原先RxJava1的写法:

Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("hello");
            }
        }).subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                System.out.println(s);
            }
        });


现在的写法:

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("hello");
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) {
                System.out.println(s);
            }
        });


5. ObservableOnSubscribe 中使用 ObservableEmitter 发送数据给 Observer


结合上一条,ObservableOnSubscribe 不再使用 Subscriber 而是用 ObservableEmitter 替代。


ObservableEmitter 可以理解为发射器,是用来发出事件的,它可以发出三种类型的事件,通过调用emitter的onNext(T value)、onComplete()和onError(Throwable error)可以分别发出next事件、complete事件和error事件。 如果只关心next事件的话,只需单独使用onNext()即可。


需要特别注意,emitter的onComplete()调用后,Consumer不再接收任何next事件。


6. Observable.Transformer 变成 ObservableTransformer


原先RxJava1的写法:

/**
     * 跟compose()配合使用,比如ObservableUtils.wrap(obj).compose(toMain())
     * @param <T>
     * @return
     */
    public static <T> Observable.Transformer<T, T> toMain() {
        return new Observable.Transformer<T, T>() {
           @Override
            public Observable<T> call(Observable<T> tObservable) {
                return tObservable
                        .subscribeOn(Schedulers.io())
                        .observeOn(AndroidSchedulers.mainThread());
            }
        };
    }


现在的写法:

/**
     * 跟compose()配合使用,比如ObservableUtils.wrap(obj).compose(toMain())
     * @param <T>
     * @return
     */
    public static <T> ObservableTransformer<T, T> toMain() {
        return new ObservableTransformer<T, T>() {
            @Override
            public ObservableSource<T> apply(Observable<T> upstream) {
                return upstream.subscribeOn(Schedulers.io())
                        .observeOn(AndroidSchedulers.mainThread());
            }
        };
    }


由于新增了Flowable,同理也增加了FlowableTransformer

public static <T> FlowableTransformer<T, T> toMain() {
        return new FlowableTransformer<T, T>() {
            @Override
            public Publisher<T> apply(Flowable<T> upstream) {
                return upstream.subscribeOn(Schedulers.io())
                        .observeOn(AndroidSchedulers.mainThread());
            }
        };
    }


7.  Subscription 改名为 Disposable


在 RxJava2 中,由于已经存在了 org.reactivestreams.subscription 这个类,为了避免名字冲突将原先的 rx.Subscription 改名为 io.reactivex.disposables.Disposable。


刚开始不知道,在升级 RxJava2 时发现 org.reactivestreams.subscription 这个类完全没法做原先 rx.Subscription 的事情:(


顺便说下,Disposable必须单次使用,用完就要销毁。


8. first() 用法改变


官方文档是这么描述的first()的用法


1.x 2.x
first() RC3 renamed to firstElement and returns Maybe<T>
first(Func1) dropped, use filter(predicate).first()
firstOrDefault(T) renamed to first(T) and RC3 returns Single<T>
firstOrDefault(Func1, T) renamed to first(T) and RC3 returns Single<T>


以first(Func1)为例,first(Func1)后面还使用了push(),原先 Rxjava1会这样写

ConnectableObservable<Data> connectableObservable = Observable
          .concat(Observable.from(list))
          .first(new Func1<Data, Boolean>() {
                @Override
                public Boolean call(Data data) {
                    return DataUtils.isAvailable(data);
                }
            }).publish();


RxJava2 改成这样

ConnectableObservable<Data> connectableObservable = Observable
          .concat(Observable.fromIterable(list))
          .filter(new Predicate<Data>() {
                @Override
                public boolean test(@NonNull Data data) throws Exception {
                    return DataUtils.isAvailable(data);
                }
            }).firstElement().toObservable().publish();


9. toBlocking().y 被 blockingY() 取代


在我的框架中存在着一个Optional类,它跟Java 8的Optional作用差不多,原先是使用RxJava1来编写的。

import rx.Observable;
/**
 * 使用方法:
 *      String s = null;
 *      Optional.ofNullable(s).orElse("default")); // 如果s为null,则显示default,否则显示s的值
 * @author Tony Shen
 *
 */
public class Optional<T> {
    Observable<T> obs;
    public Optional(Observable<T> obs) {
        this.obs = obs;
    }
    public static <T> Optional<T> of(T value) {
        if (value == null) {
            throw new NullPointerException();
        } else {
            return new Optional<T>(Observable.just(value));
        }
    }
    public static <T> Optional<T> ofNullable(T value) {
        if (value == null) {
            return new Optional<T>(Observable.<T>empty());
        } else {
            return new Optional<T>(Observable.just(value));
        }
    }
    public T get() {
        return obs.toBlocking().single();
    }
    public T orElse(T defaultValue) {
        return obs.defaultIfEmpty(defaultValue).toBlocking().single();
    }
}


升级到RxJava2之后,get() 和 orElse() 方法都会报错,修改之后是这样的。

import io.reactivex.Observable;
/**
 * 使用方法:
 *      String s = null;
 *      Optional.ofNullable(s).orElse("default"); // 如果s为null,则显示default,否则显示s的值
 * @author Tony Shen
 *
 */
public class Optional<T> {
    Observable<T> obs;
    public Optional(Observable<T> obs) {
        this.obs = obs;
    }
    public static <T> Optional<T> of(T value) {
        if (value == null) {
            throw new NullPointerException();
        } else {
            return new Optional<T>(Observable.just(value));
        }
    }
    public static <T> Optional<T> ofNullable(T value) {
        if (value == null) {
            return new Optional<T>(Observable.<T>empty());
        } else {
            return new Optional<T>(Observable.just(value));
        }
    }
    public T get() {
        return obs.blockingSingle();
    }
    public T orElse(T defaultValue) {
        return obs.defaultIfEmpty(defaultValue).blockingSingle();
    }
}


10. PublishSubject


包括 PublishSubject 以及各种 Subject(ReplaySubject、BehaviorSubject、AsyncSubject) 都不再支持backpressure。


总结



RxJava2 所带来的变化远远不止这些,以后遇到的话还会继续整理和总结,毕竟我使用的 RxJava2 还是很少的一部分内容。


RxJava2 最好到文档依然是官方文档。如果是新项目到话,可以毫不犹豫地使用RxJava2,如果是在线上已经成熟稳定的项目,可以再等等。对于新手的话,可以直接从 RxJava2 学起,RxJava1 就直接略过吧。对于老手,RxJava2 还是使用原来的思想,区别不大,从 RxJava1 迁移到 Rxjava2 也花不了多少工夫。

相关文章
|
6月前
|
安全 Android开发
你是否了解 RxJava 的 Disposable ?
你是否了解 RxJava 的 Disposable ?
234 0
RxJava2 中 doFinally 和 doAfterTerminate 的比较
RxJava2 中 doFinally 和 doAfterTerminate 的比较
295 0
|
Java Android开发
面试官:RxJava是如何做到响应式编程的?
RxJava已经出现很多个年头了,但是依然被很多公司使用,如果现在还对RxJava了解的不够透彻, 可以看这个系列对它的分析:相信看完后你对它会有个更全面的认识。 这个系列主要从下面几个方面来讲解: **RxJava基本操作符使用** **RxJava响应式编程是如何实现的** **RxJava的背压机制及Flowable是如何实现背压的** **RxJava的线程切换原理
|
安全 Android开发
详解 RxJava 的 Disposable
RxJava2 的 Disposable,可以在适当时机取消订阅、截断数据流,避免 Android 中的内存泄露。
1189 0
|
Java Go Android开发
RxJava2
函数式编程是一种编程范式。我们常见的编程范式有命令式编程、函数式编程和逻辑式编程。我们常见的面向对象编程是一种命令式编程。命令式编程是面向计算机硬件的抽象,有变量、赋值语句、表达式和控制语句。而函数式编程是面向数学的抽象,将计算描述为一种表达式求值,函数可以在任何地方定义,并且可以对函数进行组合。响应式编程是一种面向数据流和变化传播的编程范式,数据更新是相关联的。把函数式编程里的一套思路和响应式编程合起来就是函数响应式编程。函数响应式编程可以极大地简化项目,特别是处理嵌套回调的异步事件、复杂的列表过滤和变换或者时间相关问题。在Android开发中使用函数响应式编程的主要有两大框架:
169 0
RxJava2
|
设计模式 存储 Java
XTask与RxJava的使用对比
XTask与RxJava的使用对比
165 0
XTask与RxJava的使用对比
|
负载均衡 算法 Java
RxJava 并行操作
RxJava 并行操作
419 0
RxJava 并行操作
|
Java 调度 安全
Rxjava深入理解之自己动手编写Rxjava
Demo的源码地址在 mini-rxjava, 有兴趣的可以下载源码来看. 从观察者模式说起 观察者模式,是我们在平时使用的比较多的一种设计模式.观察者模式定义了一种一对多的依赖关系,让多个观察者对象同时监听某一个主题对象。
1172 0
|
开发工具 git 容器
RxJava中几个常用但初学介绍不多的方法介绍
自己学习RxJava的知识点的总结,看了很多篇文章,到现在也算是略有心得;推荐大家如果想了解的可以看看这篇文章,RxJava综合教程(系列版) 不过再详细的讲解还是会有说的不到的地方,比如我最近在做安卓的课设,坑的老师,一开始说没有,最后几天在说,真的是加班加点的干。
954 0
|
Java
冷饭热炒——RxJava
已经超过一个月没有写文章了,原因无非就是工作太忙。最近终于恢复以前的节奏,任务开始正常了起来。忙里偷闲,写一写人们写烂了的RxJava。这篇文章主要分析RxJava事件的产生以及变化的原理,Ok,let's go! 0. 前言 本次源码分析使用的是RxJava2,版本2.1.14。
1070 0