RxJava 并行操作

简介: RxJava 并行操作

上一篇文章RxJava 线程模型分析详细介绍了RxJava的线程模型,被观察者(Observable、Flowable...)发射的数据流可以经历各种线程切换,但是数据流的各个元素之间不会产生并行执行的效果。我们知道并行并不是并发,不是同步,更不是异步。


Java 8新增了并行流来实现并行的效果,只需要在集合上调用parallelStream()即可。

List<Integer> result = new ArrayList();
        for(Integer i=1;i<=100;i++) {
            result.add(i);
        }
        result.parallelStream()
                .map(new java.util.function.Function<Integer, String>() {
            @Override
            public String apply(Integer integer) {
                return integer.toString();
            }
        }).forEach(new java.util.function.Consumer<String>() {
            @Override
            public void accept(String s) {
                System.out.println(s);
            }
        });


如果要达到类似于 Java8 的 parallel 执行效果,可以借助 flatMap 操作符来实现并行的效果。

Observable.range(1,100)
                .flatMap(new Function<Integer, ObservableSource<String>>() {
                    @Override
                    public ObservableSource<String> apply(Integer integer) throws Exception {
                        return Observable.just(integer)
                                .subscribeOn(Schedulers.computation())
                                .map(new Function<Integer, String>() {
                                    @Override
                                    public String apply(Integer integer) throws Exception {
                                        return integer.toString();
                                    }
                                });
                    }
                })
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String str) throws Exception {
                        System.out.println(str);
                    }
                });


flatMap操作符的原理是将这个Observable转化为多个以原Observable发射的数据作为源数据的Observable,然后再将这多个Observable发射的数据整合发射出来,需要注意的是最后的顺序可能会交错地发射出来。


image.png

flatMap.png


flatMap会对原始Observable发射的每一项数据执行变换操作。在这里,生成的每个Observable可以使用线程池(指定了computation作为Scheduler)并发的执行。


当然我们还可以使用ExecutorService来创建一个Scheduler。

int threadNum = Runtime.getRuntime().availableProcessors()+1;
        ExecutorService executor = Executors.newFixedThreadPool(threadNum);
        final Scheduler scheduler = Schedulers.from(executor);
        Observable.range(1,100)
                .flatMap(new Function<Integer, ObservableSource<String>>() {
                    @Override
                    public ObservableSource<String> apply(Integer integer) throws Exception {
                        return Observable.just(integer)
                                .subscribeOn(scheduler)
                                .map(new Function<Integer, String>() {
                                    @Override
                                    public String apply(Integer integer) throws Exception {
                                        return integer.toString();
                                    }
                                });
                    }
                })
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String str) throws Exception {
                        System.out.println(str);
                    }
                });


需要补充的是: 当完成所有的操作之后,ExecutorService需要执行shutdown()来关闭 ExecutorService。在这里,可以使用doFinally操作符来执行shutdown()。


doFinally操作符可以在onError或者onComplete之后调用指定的操作,或由下游处理。


增加了doFinally操作符之后,代码是这样的。

int threadNum = Runtime.getRuntime().availableProcessors()+1;
        final ExecutorService executor = Executors.newFixedThreadPool(threadNum);
        final Scheduler scheduler = Schedulers.from(executor);
        Observable.range(1,100)
                .flatMap(new Function<Integer, ObservableSource<String>>() {
                    @Override
                    public ObservableSource<String> apply(Integer integer) throws Exception {
                        return Observable.just(integer)
                                .subscribeOn(scheduler)
                                .map(new Function<Integer, String>() {
                                    @Override
                                    public String apply(Integer integer) throws Exception {
                                        return integer.toString();
                                    }
                                });
                    }
                })
                .doFinally(new Action() {
                    @Override
                    public void run() throws Exception {
                        executor.shutdown();
                    }
                })
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String str) throws Exception {
                        System.out.println(str);
                    }
                });


Round-Robin 算法实现并行


Round-Robin算法是最简单的一种负载均衡算法。它的原理是把来自用户的请求轮流分配给内部的服务器:从服务器1开始,直到服务器N,然后重新开始循环。也被称为哈希取模法,在实际中是非常常用的数据分片方法。Round-Robin算法的优点是其简洁性,它无需记录当前所有连接的状态,所以它是一种无状态调度。


通过 Round-Robin 算法把数据分组, 按线程数分组,分成5组每组个数相同,一起发送处理。这样做的目的可以减少Observable的创建节省系统资源,但是会增加处理时间,Round-Robin 算法可以看成是对时间和空间的综合考虑。

final AtomicInteger batch = new AtomicInteger(0);
        Observable.range(1,100)
                .groupBy(new Function<Integer, Integer>() {
                    @Override
                    public Integer apply(@NonNull Integer integer) throws Exception {
                        return batch.getAndIncrement() % 5;
                    }
                })
                .flatMap(new Function<GroupedObservable<Integer, Integer>, ObservableSource<?>>() {
                    @Override
                    public ObservableSource<?> apply(@NonNull GroupedObservable<Integer, Integer> integerIntegerGroupedObservable) throws Exception {
                        return integerIntegerGroupedObservable.observeOn(Schedulers.io())
                                .map(new Function<Integer, String>() {
                                    @Override
                                    public String apply(@NonNull Integer integer) throws Exception {
                                        return integer.toString();
                                    }
                                });
                    }
                })
                .subscribe(new Consumer<Object>() {
                    @Override
                    public void accept(@NonNull Object o) throws Exception {
                        System.out.println(o);
                    }
                });


在这里,也可以使用ExecutorService创建Scheduler,来替代Schedulers.io()

final AtomicInteger batch = new AtomicInteger(0);
        int threadNum = 5;
        final ExecutorService executor = Executors.newFixedThreadPool(threadNum);
        final Scheduler scheduler = Schedulers.from(executor);
        Observable.range(1,100)
                .groupBy(new Function<Integer, Integer>() {
                    @Override
                    public Integer apply(@NonNull Integer integer) throws Exception {
                        return batch.getAndIncrement() % threadNum;
                    }
                })
                .flatMap(new Function<GroupedObservable<Integer, Integer>, ObservableSource<?>>() {
                    @Override
                    public ObservableSource<?> apply(@NonNull GroupedObservable<Integer, Integer> integerIntegerGroupedObservable) throws Exception {
                        return integerIntegerGroupedObservable.observeOn(scheduler)
                                .map(new Function<Integer, String>() {
                                    @Override
                                    public String apply(@NonNull Integer integer) throws Exception {
                                        return integer.toString();
                                    }
                                });
                    }
                })
                .subscribe(new Consumer<Object>() {
                    @Override
                    public void accept(@NonNull Object o) throws Exception {
                        System.out.println(o);
                    }
                });


相关文章
|
3月前
|
安全 Android开发
你是否了解 RxJava 的 Disposable ?
你是否了解 RxJava 的 Disposable ?
75 0
RxJava2 中 doFinally 和 doAfterTerminate 的比较
RxJava2 中 doFinally 和 doAfterTerminate 的比较
236 0
|
数据处理
RxJava2实现RxBus
RxJava2实现RxBus
143 0
|
安全 Android开发
详解 RxJava 的 Disposable
RxJava2 的 Disposable,可以在适当时机取消订阅、截断数据流,避免 Android 中的内存泄露。
982 0
|
Java Go Android开发
RxJava2
函数式编程是一种编程范式。我们常见的编程范式有命令式编程、函数式编程和逻辑式编程。我们常见的面向对象编程是一种命令式编程。命令式编程是面向计算机硬件的抽象,有变量、赋值语句、表达式和控制语句。而函数式编程是面向数学的抽象,将计算描述为一种表达式求值,函数可以在任何地方定义,并且可以对函数进行组合。响应式编程是一种面向数据流和变化传播的编程范式,数据更新是相关联的。把函数式编程里的一套思路和响应式编程合起来就是函数响应式编程。函数响应式编程可以极大地简化项目,特别是处理嵌套回调的异步事件、复杂的列表过滤和变换或者时间相关问题。在Android开发中使用函数响应式编程的主要有两大框架:
144 0
RxJava2
|
设计模式 存储 Java
XTask与RxJava的使用对比
XTask与RxJava的使用对比
135 0
XTask与RxJava的使用对比
|
Java API
RxJava 之 ParallelFlowable
RxJava 之 ParallelFlowable
218 0
RxJava 之 ParallelFlowable
|
Java
冷饭热炒——RxJava
已经超过一个月没有写文章了,原因无非就是工作太忙。最近终于恢复以前的节奏,任务开始正常了起来。忙里偷闲,写一写人们写烂了的RxJava。这篇文章主要分析RxJava事件的产生以及变化的原理,Ok,let's go! 0. 前言 本次源码分析使用的是RxJava2,版本2.1.14。
1034 0
|
Java 数据库 Android开发
06.RxJava初探
基本使用 在build.gradle中加入配置,注意,rxJava和rxAndroid版本一定要相互兼容,不然可能会报错More than one file was found with OS independent path 'META-INF/rxjava.
1154 0
RxJava2学习笔记(3)
接上回继续,今天来学习下zip(打包)操作 一、zip操作 @Test public void zipTest() { Observable.zip(Observable.
1076 0