通常而言,Rx如果遇到if条件语句、switch case语句时需要先选择分支条件,然后再进行链式调用。RxCondition产生的目的就是为了在这些情况下也能顺利地使用链式调用。
我在查找RxJava的条件、布尔操作符时,没有找到符合我需求的操作符。于是,我在网上找到了RxJavaComputationExpressions, 做了一些修改将RxJava1升级到RxJava2,增加了对Flowable的支持。
下载安装
下载地址:
https://github.com/fengzhizi715/RxCondition
使用方法:
1.ifThen用法
if条件语句传统的写法:
Observable<String> observable = null; if (flag) { observable = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception { e.onNext("this is true"); } }); } else { observable = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception { e.onNext("this is false"); } }); } observable.subscribe(new Consumer<String>() { @Override public void accept(@NonNull String s) throws Exception { System.out.println("s="+s); } });
使用了ifThen()以后的写法:
Statement.ifThen(new BooleanSupplier() { @Override public boolean getAsBoolean() throws Exception { return flag; } }, Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception { e.onNext("this is true"); } }),Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception { e.onNext("this is false"); } })).subscribe(new Consumer<String>() { @Override public void accept(@NonNull String s) throws Exception { System.out.println("s="+s); } });
ifThen(BooleanSupplier condition, Observable<? extends R> then,Observable<? extends R> orElse),其中第一个Observable是条件为true时执行的,第二个Observable则是条件为false时执行。
使用lambda表达式,进一步简化写法:
Statement.ifThen(()->{ return flag; },Observable.create((e)->{ e.onNext("this is true"); }),Observable.create((e)->{ e.onNext("this is false"); })).subscribe((Consume) (s) -> {System.out.println("s="+s)});
当然,ifThen也支持Flowable:
Statement.ifThen(()->{ return flag; },Flowable.just("this is true"), Flowable.just("this is false")) .subscribe((Consume) (s) -> {System.out.println("s="+s)});
2.switchCase用法
switch case语句传统的写法:
Flowable<String> flowable = null; switch(type) { case 0: flowable = Flowable.just("this is 0"); break; case 1: flowable = Flowable.just("this is 1"); break; case 2: flowable = Flowable.just("this is 2"); break; case 3: flowable = Flowable.just("this is 3"); break; default: flowable = Flowable.just("this is default"); break; } flowable.subscribe(new Consumer<String>() { @Override public void accept(@NonNull String s) throws Exception { System.out.println("s="+s); } });
使用了switchCase()以后的写法:
Map<Integer,Flowable<String>> maps = new HashMap<>(); maps.put(0,Flowable.just("this is 0")); maps.put(1,Flowable.just("this is 1")); maps.put(2,Flowable.just("this is 2")); maps.put(3,Flowable.just("this is 3")); Statement.switchCase(new Callable<Integer>() { @Override public Integer call() throws Exception { return type; } },maps,Flowable.just("this is default")) .subscribe(new Consumer<String>() { @Override public void accept(@NonNull String s) throws Exception { System.out.println("s="+s); } });
首先,将各个分支情况放入maps中。
其次,switchCase()的第一个参数是caseSelector,用于返回maps的key。最后一个参数是defaultCase,相当于switch case语句中的default语句。
改成lambda表达:
Map<Integer,Flowable<String>> maps = new HashMap<>(); maps.put(0,Flowable.just("this is 0")); maps.put(1,Flowable.just("this is 1")); maps.put(2,Flowable.just("this is 2")); maps.put(3,Flowable.just("this is 3")); Statement.switchCase((Callable)()-> {return type;},maps,Flowable.just("this is default")) .subscribe((Consumer)(s) -> {System.out.println("s="+s);});
switchCase()中,第一个参数返回的是Map<K, R>中的key,它支持范型,所以switchCase()相对于switch case语句而已能够支持更多种类型。
原理
以ifThen为例,看看它某个方法是怎么运行的。
public static <R> Observable<R> ifThen(BooleanSupplier condition, Observable<? extends R> then, Observable<? extends R> orElse) { return RxJavaPlugins.onAssembly(new ObservableIfThen<R>(condition, then, orElse)); }
ifThen静态方法借助RxJavaPlugins.onAssembly()来生成一个新的Observable并返回。
ObservableIfThen是一个Observable的实现类。它的subscribeActual()方法是被订阅时真正执行的方法,用来衔接Observable和Observer(Subscriber)。在这里,subscribeActual()根据condition返回的bool值来判断是使用then还是使用orElse来做Observable。
import io.reactivex.Observable; import io.reactivex.ObservableSource; import io.reactivex.Observer; import io.reactivex.functions.BooleanSupplier; import io.reactivex.internal.disposables.EmptyDisposable; /** * Created by Tony Shen on 2017/5/9. */ final class ObservableIfThen<T> extends Observable<T> { final BooleanSupplier condition; final ObservableSource<? extends T> then; final ObservableSource<? extends T> orElse; ObservableIfThen(BooleanSupplier condition, ObservableSource<? extends T> then, ObservableSource<? extends T> orElse) { this.condition = condition; this.then = then; this.orElse = orElse; } @Override protected void subscribeActual(Observer<? super T> observer) { boolean b; try { b = condition.getAsBoolean(); } catch (Throwable ex) { EmptyDisposable.error(ex, observer); return; } if (b) { then.subscribe(observer); } else { orElse.subscribe(observer); } } }
switchCase的原理也是大同小异,区别在于订阅时需要根据key从map中取到对应的Observable/Flowable,取不到则使用defaultCase。
总结
这个库其实很简单,编写它的同时我加深了对RxJava中Observable/Flowable原理的认识。