RxJava2

简介: 函数式编程是一种编程范式。我们常见的编程范式有命令式编程、函数式编程和逻辑式编程。我们常见的面向对象编程是一种命令式编程。命令式编程是面向计算机硬件的抽象,有变量、赋值语句、表达式和控制语句。而函数式编程是面向数学的抽象,将计算描述为一种表达式求值,函数可以在任何地方定义,并且可以对函数进行组合。响应式编程是一种面向数据流和变化传播的编程范式,数据更新是相关联的。把函数式编程里的一套思路和响应式编程合起来就是函数响应式编程。函数响应式编程可以极大地简化项目,特别是处理嵌套回调的异步事件、复杂的列表过滤和变换或者时间相关问题。在Android开发中使用函数响应式编程的主要有两大框架:

函数式编程是一种编程范式。我们常见的编程范式有命令式编程、函数式编程和逻辑式编程。我们常见的面向对象编程是一种命令式编程。命令式编程是面向计算机硬件的抽象,有变量、赋值语句、表达式和控制语句。而函数式编程是面向数学的抽象,将计算描述为一种表达式求值,函数可以在任何地方定义,并且可以对函数进行组合。响应式编程是一种面向数据流和变化传播的编程范式,数据更新是相关联的。把函数式编程里的一套思路和响应式编程合起来就是函数响应式编程。函数响应式编程可以极大地简化项目,特别是处理嵌套回调的异步事件、复杂的列表过滤和变换或者时间相关问题。在Android开发中使用函数响应式编程的主要有两大框架:


一个是 RxJava,另一个是 Goodle 推出的 Agera。本章我们来学习 RxJava。


需要先下载两个东东


https://search.maven.org/remotecontent?filepath=org/reactivestreams/reactive-streams/1.0.2/reactive-streams-1.0.2.jar


https://search.maven.org/remotecontent?

filepath=io/reactivex/rxjava2/rxjava/2.2.3/rxjava-2.2.3.jar

// create a flowable
        Flowable<String> flowable = Flowable.create(new FlowableOnSubscribe<String>() {
            @Override
            public void subscribe(FlowableEmitter<String> e) throws Exception {
                e.onNext("hello RxJava 2");
                e.onComplete();
            }
        }, BackpressureStrategy.BUFFER);
        Subscriber<String> subscriber = new Subscriber<String>() {
            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
            @Override
            public void onError(Throwable t) {
            }
            @Override
            public void onNext(String s) {
                System.out.println(s);
            }
            @Override
            public void onSubscribe(Subscription s) {
                System.out.println("onSubscribe");
                // 在onSubscribe中,我们需要调用request去请求资源,参数就是要请求的数量,一般如果不限制请求数量,可以
                // 写成Long.MAX_VALUE。如果你不调用request,Subscriber的onNext和onComplete方法将不会被调用。
                s.request(Long.MAX_VALUE);
            }
        };
        flowable.subscribe(subscriber);
        // 快捷的 Consumer只关心onNext方法
        Flowable.just("Hello world").subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) {
                System.out.println(s);
            }
        });
        // map 操作符,可以把一个事件转换成另一个事件。
        Flowable.just("map1").map(new Function<String, Integer>() {
            @Override
            public Integer apply(String s) throws Exception {
                return s.hashCode();
            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                System.out.println(integer);
            }
        });


Create


create 操作符应该是最常见的操作符了,主要用于产生一个 Obserable 被观察者对象,为了方便大家的认知,以后的教程中统一把被观察者 Observable 称为发射器(上游事件),观察者 Observer 称为接收器(下游事件)。


image.png


private static void testCreate() {
        Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(3);
                emitter.onNext(5);
                emitter.onComplete();
            }
        });
        observable.subscribe(newObserver());
    }


just


它接收一至十个参数,返回一个按参数列表顺序发射这些数据的Observable。

private static void testJust() {
        Observable.just(1, 3, 5).subscribe(newObserver());
        Observable.fromArray("fork", "knife").subscribe(newObserver());
        List<String> list = new ArrayList<String>();
        list.add("I am stronger");
        list.add("I am healthy");
        list.add("goodbye !");
        Observable.fromIterable(list).subscribe(newObserver());
    }


fromArray 和 fromIterable


传入对应类型即可


Filter



观测序列中只有通过的数据才会被发射。

Observable.just(1, 2, 8, 9).filter(new Predicate<Integer>() {
            @Override
            public boolean test(Integer t) throws Exception {
                return t > 3;
            }
        }).subscribe(newObserver());


take(N)、takeLast(N)


只发射前N个元素;


只发射最后N个元素


Skip、SkipLast


跳过前N个元素;


跳过最后N个元素;


firstElement、lastElement、elementAt(i)


返回的是Maybe<T>, 而非Observable<T>


image.png

image.png

distinct


仅处理一次,可以处理去除重复的数据


Map


map 基本作用就是将一个 Observable 通过某种函数关系,转换为另一种 Observable

Observable.just("Hello", "rxJavA").map(new Function<String, String>() {
            @Override
            public String apply(String t) throws Exception {
                return t.toUpperCase(Locale.CHINA);
            }
        }).subscribe(newObserver())


Zip


image.png


  • zip 组合事件的过程就是分别从发射器 A 和发射器 B 各取出一个事件来组合,并且一个事件只能被使用一次,组合的顺序是严格按照事件发送的顺序来进行的,所以上面截图中,可以看到,1 永远是和 A 结合的,2 永远是和 B 结合的。


  • 最终接收器收到的事件数量是和发送器发送事件最少的那个发送器的发送事件数目相同,所以如截图中,5 很孤单,没有人愿意和它交往,孤独终老。

private static void testZip() {
        Observable<String> source1 = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> observableemitter) throws Exception {
                observableemitter.onNext("one");
                observableemitter.onNext("two");
                observableemitter.onNext("three");
            }
        });
        Observable<Integer> source2 = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> observableemitter) throws Exception {
                observableemitter.onNext(1);
                observableemitter.onNext(2);
            }
        });
        Observable.zip(source1, source2, new BiFunction<String, Integer, String>() {
            @Override
            public String apply(String arg0, Integer arg1) throws Exception {
                return arg0 + arg1;
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String arg0) throws Exception {
                System.out.println(arg0);
            }
        });
    }


Concat


对于单一的把两个发射器连接成一个发射器


image.png


FlatMap


FlatMap 是一个很有趣的东西,我坚信你在实际开发中会经常用到。它可以把一个发射器 Observable 通过某种方法转换为多个  Observables,然后再把这些分散的 Observables装进一个单一的发射器 Observable。但有个需要注意的是,flatMap 并不能保证事件的顺序,如果需要保证,需要用到我们下面要讲的 ConcatMap


image.png


concatMap


上面其实就说了,concatMapFlatMap 的唯一区别就是 concatMap 保证了顺序,所以,我们就直接把 flatMap 替换为 concatMap 验证吧。


实际的使用才是王道!


参考



  • ReactiveX/RxJava: RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.

https://github.com/ReactiveX/RxJava







目录
相关文章
|
6月前
|
安全 Android开发
你是否了解 RxJava 的 Disposable ?
你是否了解 RxJava 的 Disposable ?
241 0
RxJava2 中 doFinally 和 doAfterTerminate 的比较
RxJava2 中 doFinally 和 doAfterTerminate 的比较
296 0
|
数据处理
RxJava2实现RxBus
RxJava2实现RxBus
174 0
|
安全 Android开发
详解 RxJava 的 Disposable
RxJava2 的 Disposable,可以在适当时机取消订阅、截断数据流,避免 Android 中的内存泄露。
1197 0
|
设计模式 存储 Java
XTask与RxJava的使用对比
XTask与RxJava的使用对比
167 0
XTask与RxJava的使用对比
|
负载均衡 算法 Java
RxJava 并行操作
RxJava 并行操作
421 0
RxJava 并行操作
|
Java API
RxJava 之 ParallelFlowable
RxJava 之 ParallelFlowable
269 0
RxJava 之 ParallelFlowable
|
Java
冷饭热炒——RxJava
已经超过一个月没有写文章了,原因无非就是工作太忙。最近终于恢复以前的节奏,任务开始正常了起来。忙里偷闲,写一写人们写烂了的RxJava。这篇文章主要分析RxJava事件的产生以及变化的原理,Ok,let's go! 0. 前言 本次源码分析使用的是RxJava2,版本2.1.14。
1071 0
|
Java 数据库 Android开发
06.RxJava初探
基本使用 在build.gradle中加入配置,注意,rxJava和rxAndroid版本一定要相互兼容,不然可能会报错More than one file was found with OS independent path 'META-INF/rxjava.
1182 0
RxJava2学习笔记(3)
接上回继续,今天来学习下zip(打包)操作 一、zip操作 @Test public void zipTest() { Observable.zip(Observable.
1103 0