RxJava1 升级到 RxJava2 所踩过的坑

简介: RxJava1 升级到 RxJava2 所踩过的坑

RxJava2



RxJava2 发布已经有一段时间了,是对 RxJava 的一次重大的升级,由于我的一个库cv4j使用了 RxJava2 来尝鲜,但是 RxJava2 跟 RxJava1 是不能同时存在于一个项目中的,逼不得已我得把自己所有框架中使用 RxJava 的地方以及


App 中使用 RxJava 的地方都升级到最新版本。所以我整理并记录了一些已经填好的坑。


填坑记录



1. RxJava1 跟 RxJava2 不能共存


如果,在同一个module中同时使用RxJava1和RxJava2,类似如下:

compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'io.reactivex.rxjava2:rxjava:2.0.7'
compile 'io.reactivex:rxandroid:1.2.0'
compile 'io.reactivex:rxjava:1.1.5'


那么,很不幸你会遇到这样的错误

image.png

Rxjava1和Rxjava2无法共存.jpeg


同理,在 App 中如果使用了 Rxjava2,但是某个第三方的 library 还在使用 Rxjava1 也会遇到同样的错误。


上面的错误是因为 RxAndroid 2.0.1 本身依赖了 RxJava 2.0.1。我们尝试去掉对 RxJava 的依赖,只留下 RxAndroid 。还是会遇到问题。

compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
//compile 'io.reactivex.rxjava2:rxjava:2.0.7'
compile 'io.reactivex:rxandroid:1.2.0'
//compile 'io.reactivex:rxjava:1.1.5'


image.png

去掉对Rxjava的依赖.jpeg


所以使用RxAndroid不能去掉对RxJava的依赖,我是这样使用的。

compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'io.reactivex.rxjava2:rxjava:2.0.7'


官方也是这样解释的


Because RxAndroid releases are few and far between, it is recommended you also

explicitly depend on RxJava's latest version for bug fixes and new features.


最后,我建议要升级到 RxJava2 的时候必须所有使用的地方都要升级,并且用最新的版本。


2. 新增Flowable


RxJava1 中 Observable 不能很好地支持 backpressure ,会抛出MissingBackpressureException。所以在 RxJava2 中 Observable 不再支持 backpressure ,而使用新增的 Flowable 来支持 backpressure 。关于backpressure 的理解,可以看这篇文章


Flowable的用法跟原先的Observable是一样的。


3. ActionN 和 FuncN 改名


ActionN 和 FuncN 遵循Java 8的命名规则。


其中,Action0 改名成Action,Action1改名成Consumer,而Action2改名成了BiConsumer,而Action3 - Action9都不再使用了,ActionN变成了Consumer<Object[]>。


同样,Func改名成Function,Func2改名成BiFunction,Func3 - Func9 改名成 Function3 - Function9,FuncN 由 Function<Object[], R> 取代。


4. Observable.OnSubscribe 变成  

ObservableOnSubscribe


原先RxJava1的写法:

Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("hello");
            }
        }).subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                System.out.println(s);
            }
        });


现在的写法:

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("hello");
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) {
                System.out.println(s);
            }
        });


5. ObservableOnSubscribe 中使用 ObservableEmitter 发送数据给 Observer


结合上一条,ObservableOnSubscribe 不再使用 Subscriber 而是用 ObservableEmitter 替代。


ObservableEmitter 可以理解为发射器,是用来发出事件的,它可以发出三种类型的事件,通过调用emitter的onNext(T value)、onComplete()和onError(Throwable error)可以分别发出next事件、complete事件和error事件。 如果只关心next事件的话,只需单独使用onNext()即可。


需要特别注意,emitter的onComplete()调用后,Consumer不再接收任何next事件。


6. Observable.Transformer 变成 ObservableTransformer


原先RxJava1的写法:

/**
     * 跟compose()配合使用,比如ObservableUtils.wrap(obj).compose(toMain())
     * @param <T>
     * @return
     */
    public static <T> Observable.Transformer<T, T> toMain() {
        return new Observable.Transformer<T, T>() {
           @Override
            public Observable<T> call(Observable<T> tObservable) {
                return tObservable
                        .subscribeOn(Schedulers.io())
                        .observeOn(AndroidSchedulers.mainThread());
            }
        };
    }


现在的写法:

/**
     * 跟compose()配合使用,比如ObservableUtils.wrap(obj).compose(toMain())
     * @param <T>
     * @return
     */
    public static <T> ObservableTransformer<T, T> toMain() {
        return new ObservableTransformer<T, T>() {
            @Override
            public ObservableSource<T> apply(Observable<T> upstream) {
                return upstream.subscribeOn(Schedulers.io())
                        .observeOn(AndroidSchedulers.mainThread());
            }
        };
    }


由于新增了Flowable,同理也增加了FlowableTransformer

public static <T> FlowableTransformer<T, T> toMain() {
        return new FlowableTransformer<T, T>() {
            @Override
            public Publisher<T> apply(Flowable<T> upstream) {
                return upstream.subscribeOn(Schedulers.io())
                        .observeOn(AndroidSchedulers.mainThread());
            }
        };
    }


7.  Subscription 改名为 Disposable


在 RxJava2 中,由于已经存在了 org.reactivestreams.subscription 这个类,为了避免名字冲突将原先的 rx.Subscription 改名为 io.reactivex.disposables.Disposable。


刚开始不知道,在升级 RxJava2 时发现 org.reactivestreams.subscription 这个类完全没法做原先 rx.Subscription 的事情:(


顺便说下,Disposable必须单次使用,用完就要销毁。


8. first() 用法改变


官方文档是这么描述的first()的用法


1.x 2.x
first() RC3 renamed to firstElement and returns Maybe<T>
first(Func1) dropped, use filter(predicate).first()
firstOrDefault(T) renamed to first(T) and RC3 returns Single<T>
firstOrDefault(Func1, T) renamed to first(T) and RC3 returns Single<T>


以first(Func1)为例,first(Func1)后面还使用了push(),原先 Rxjava1会这样写

ConnectableObservable<Data> connectableObservable = Observable
          .concat(Observable.from(list))
          .first(new Func1<Data, Boolean>() {
                @Override
                public Boolean call(Data data) {
                    return DataUtils.isAvailable(data);
                }
            }).publish();


RxJava2 改成这样

ConnectableObservable<Data> connectableObservable = Observable
          .concat(Observable.fromIterable(list))
          .filter(new Predicate<Data>() {
                @Override
                public boolean test(@NonNull Data data) throws Exception {
                    return DataUtils.isAvailable(data);
                }
            }).firstElement().toObservable().publish();


9. toBlocking().y 被 blockingY() 取代


在我的框架中存在着一个Optional类,它跟Java 8的Optional作用差不多,原先是使用RxJava1来编写的。

import rx.Observable;
/**
 * 使用方法:
 *      String s = null;
 *      Optional.ofNullable(s).orElse("default")); // 如果s为null,则显示default,否则显示s的值
 * @author Tony Shen
 *
 */
public class Optional<T> {
    Observable<T> obs;
    public Optional(Observable<T> obs) {
        this.obs = obs;
    }
    public static <T> Optional<T> of(T value) {
        if (value == null) {
            throw new NullPointerException();
        } else {
            return new Optional<T>(Observable.just(value));
        }
    }
    public static <T> Optional<T> ofNullable(T value) {
        if (value == null) {
            return new Optional<T>(Observable.<T>empty());
        } else {
            return new Optional<T>(Observable.just(value));
        }
    }
    public T get() {
        return obs.toBlocking().single();
    }
    public T orElse(T defaultValue) {
        return obs.defaultIfEmpty(defaultValue).toBlocking().single();
    }
}


升级到RxJava2之后,get() 和 orElse() 方法都会报错,修改之后是这样的。

import io.reactivex.Observable;
/**
 * 使用方法:
 *      String s = null;
 *      Optional.ofNullable(s).orElse("default"); // 如果s为null,则显示default,否则显示s的值
 * @author Tony Shen
 *
 */
public class Optional<T> {
    Observable<T> obs;
    public Optional(Observable<T> obs) {
        this.obs = obs;
    }
    public static <T> Optional<T> of(T value) {
        if (value == null) {
            throw new NullPointerException();
        } else {
            return new Optional<T>(Observable.just(value));
        }
    }
    public static <T> Optional<T> ofNullable(T value) {
        if (value == null) {
            return new Optional<T>(Observable.<T>empty());
        } else {
            return new Optional<T>(Observable.just(value));
        }
    }
    public T get() {
        return obs.blockingSingle();
    }
    public T orElse(T defaultValue) {
        return obs.defaultIfEmpty(defaultValue).blockingSingle();
    }
}


10. PublishSubject


包括 PublishSubject 以及各种 Subject(ReplaySubject、BehaviorSubject、AsyncSubject) 都不再支持backpressure。


总结



RxJava2 所带来的变化远远不止这些,以后遇到的话还会继续整理和总结,毕竟我使用的 RxJava2 还是很少的一部分内容。


RxJava2 最好到文档依然是官方文档。如果是新项目到话,可以毫不犹豫地使用RxJava2,如果是在线上已经成熟稳定的项目,可以再等等。对于新手的话,可以直接从 RxJava2 学起,RxJava1 就直接略过吧。对于老手,RxJava2 还是使用原来的思想,区别不大,从 RxJava1 迁移到 Rxjava2 也花不了多少工夫。

相关文章
|
Java Android开发
面试官:RxJava是如何做到响应式编程的?
RxJava已经出现很多个年头了,但是依然被很多公司使用,如果现在还对RxJava了解的不够透彻, 可以看这个系列对它的分析:相信看完后你对它会有个更全面的认识。 这个系列主要从下面几个方面来讲解: **RxJava基本操作符使用** **RxJava响应式编程是如何实现的** **RxJava的背压机制及Flowable是如何实现背压的** **RxJava的线程切换原理
|
Java Android开发
关于Rxjava的简单使用
本篇只是讲一下Rxjava的简单入门使用,想要详解的请移步其他博主文章,关于RxJava详解的文章网上一大堆,本片文章内容适合小白学习。
174 1
|
数据处理
RxJava2实现RxBus
RxJava2实现RxBus
177 0
|
Java 数据库 UED
RxJava的简介
RxJava的简介
307 0
RxJava的简介
|
缓存 Java Android开发
一文详解 RxJava2 使用及实现原理
RxJava—一个可以在JVM上运行的,基于观察者模式 实现异步操作的java库。其英文描述为:RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java
552 0
一文详解 RxJava2 使用及实现原理
|
Java 调度 安全
Rxjava深入理解之自己动手编写Rxjava
Demo的源码地址在 mini-rxjava, 有兴趣的可以下载源码来看. 从观察者模式说起 观察者模式,是我们在平时使用的比较多的一种设计模式.观察者模式定义了一种一对多的依赖关系,让多个观察者对象同时监听某一个主题对象。
1176 0
|
开发者
RxJava2.0——从放弃到入门
前言 终于到了讲RxJava这一期,RxJava是笔者个人非常喜欢的一个开源库,它很好的将链式编程风格和异步结合在一起。RxJava其实已经推出很久了,可以说是已经很火了,但是目前仍然还有相当一部分Android开发者没有使用过,甚至说是想用,却不知道怎么用,或者不知道自己的项目哪里可以用到,本着让广大开发者理解并且上手项目,从放弃到入门,故推出这边文章。
1265 0
RxJava2学习笔记(3)
接上回继续,今天来学习下zip(打包)操作 一、zip操作 @Test public void zipTest() { Observable.zip(Observable.
1110 0
|
iOS开发
RxJava2学习笔记(2)
上一篇已经熟悉了Observable的基本用法,但是如果仅仅只是“生产-消费”的模型,这就体现不出优势了,java有100种办法可以玩这个:) 一、更简单的多线程 正常情况下,生产者与消费者都在同一个线程里处理,参考下面的代码: final long start = System.
1016 0
RxJava2学习笔记(1)
作为github上star数极高的响应式编程java扩展类库,rxjava是啥就不多说了,网上能查到一堆介绍,下面是一些学习记录: 前提依赖: compile 'io.reactivex.rxjava2:rxjava:2.1.9' 一、Observable 1.1 hello world rxjava中的核心思路是“生产者-消费者”模型,生产者的java类通常用xxxEmitter命名,字面意思:发射器,可以想象为一个机关枪,一直biu biu biu的向外发射信息,另一端则是靶子(也就是消费者),在不停的接收。
1355 0