Android:随笔——RxJava的线程切换

简介: 转载请标明地址 QuincySx:[http://www.jianshu.com/p/d9da64774f7b]近期用到 RxJava ,线程切换的时候出了点小插曲,首先先上理论,在上实践,不喜理论可跳过,此篇文章适合会使用 RxJava 的人...

转载请标明地址 QuincySx:[http://www.jianshu.com/p/d9da64774f7b]


近期用到 RxJava ,线程切换的时候出了点小插曲,首先先上理论,在上实践,不喜理论可跳过,此篇文章适合会使用 RxJava 的人群,如果还没有接触过可以自学过后再来读这篇文章,这篇文章这几个例子其实代码都是基本都是一样的,我也不知道这样写是不是更清晰

理论

总所周知 RxJava 在切换线程时用到了两个方法 subscribeOn()observeOn() 下面来分别解释一下这两个方法

  • subscribeOn() : 影响的是最开始的被观察者所在的线程。当使用多个 subscribeOn() 的时候,只有第一个 subscribeOn() 起作用;
  • observeOn() : 影响的是跟在后面的操作(指定观察者运行的线程)。所以如果想要多次改变线程,可以多次使用 observeOn;

我之前还看到有人说 subscribeOn() 必须在 observeOn() 的前面,不过经过我测试他两个的位置并没有什么联系,就如上面所说 第一次出现 subscribeOn() 的地方是有效的,其他的无效


实践

实践一下,先来个基本的栗子


img_3faa948d9999311d91832caee7452f57.png
        Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        Log.e(TAG, "===create: " + Thread.currentThread().getName());
                        subscriber.onNext("1");
                    }
                })
                .map(new Func1<String, Integer>() {
                    @Override
                    public Integer call(String s) {
                        Log.e(TAG, "===String -> Integer: " + Thread.currentThread().getName());
                        return Integer.valueOf(s);
                    }
                })
                .flatMap(new Func1<Integer, Observable<String>>() {
                    @Override
                    public Observable<String> call(final Integer integer) {
                        Log.e(TAG, "===Integer->Observable: " + Thread.currentThread().getName());
                        return Observable.create(new Observable.OnSubscribe<String>() {
                            @Override
                            public void call(Subscriber<? super String> subscriber) {
                                Log.e(TAG, "===Observable<String> call: " + Thread.currentThread().getName());
                                for (int i = 0; i < integer; i++) {
                                    subscriber.onNext(i + "");
                                }
                                subscriber.onCompleted();
                            }
                        });
                    }
                })
                .map(new Func1<String, Long>() {
                    @Override
                    public Long call(String s) {
                        Log.e(TAG, "===String->Long: " + Thread.currentThread().getName());
                        return Long.parseLong(s);
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Long>() {
                    @Override
                    public void onCompleted() {
                    }

                    @Override
                    public void onError(Throwable e) {
                    }

                    @Override
                    public void onNext(Long aLong) {
                        Log.e(TAG, "===onNext: " + Thread.currentThread().getName());
                    }
                });

这个例子呢就是简单的将 String 转为 Integer 然后转换发射源 发射 String 在然后 将 String 转换为 Long 然后打印
这个例子呢没有什么实际意义,只作为个示例
有没有看到

.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())

这两句代码 我切换了一下线程 ,接下来看一下我的运行结果

===create: RxIoScheduler-2
===String -> Integer: RxIoScheduler-2
===Integer->Observable: RxIoScheduler-2
===Observable<String> call: RxIoScheduler-2
===String->Long: RxIoScheduler-2
===onNext: main

接下来解释一下,因为subscribeOn(Schedulers.io())它指定了最开始的被观察者所在的线程所以后面的操作都是根据最开始的被观察者制定的线程运行的,又因为 .observeOn(AndroidSchedulers.mainThread()) 它指定了都面的操作符使用主线程运行。


接下来再写一个多 subscribeOn() observeOn()的情况

      Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        Log.e(TAG, "===create: " + Thread.currentThread().getName());
                        subscriber.onNext("1");
                    }
                })
                .subscribeOn(AndroidSchedulers.mainThread())
                .observeOn(Schedulers.io())
                .map(new Func1<String, Integer>() {
                    @Override
                    public Integer call(String s) {
                        Log.e(TAG, "===String -> Integer: " + Thread.currentThread().getName());
                        return Integer.valueOf(s);
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .flatMap(new Func1<Integer, Observable<String>>() {
                    @Override
                    public Observable<String> call(final Integer integer) {
                        Log.e(TAG, "===Integer->Observable: " + Thread.currentThread().getName());
                        return Observable.create(new Observable.OnSubscribe<String>() {
                            @Override
                            public void call(Subscriber<? super String> subscriber) {
                                Log.e(TAG, "===Observable<String> call: " + Thread.currentThread().getName());
                                for (int i = 0; i < integer; i++) {
                                    subscriber.onNext(i + "");
                                }
                                subscriber.onCompleted();
                            }
                        });
                    }
                })
                .observeOn(Schedulers.io())
                .map(new Func1<String, Long>() {
                    @Override
                    public Long call(String s) {
                        Log.e(TAG, "===String->Long: " + Thread.currentThread().getName());
                        return Long.parseLong(s);
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Long>() {
                    @Override
                    public void onCompleted() {
                    }

                    @Override
                    public void onError(Throwable e) {
                    }

                    @Override
                    public void onNext(Long aLong) {
                        Log.e(TAG, "===onNext: " + Thread.currentThread().getName());
                    }
                });

运行结果

===create: main
===String -> Integer: RxIoScheduler-4
===Integer->Observable: main
===Observable<String> call: main
===String->Long: RxIoScheduler-3
===onNext: main

下面应该不用我解释就能知道结果为什么是这样的
简单解释一下 因为有两个subscribeOn()所以取第一个,所以最开始的被观察者所在的线程为主线程,接着使用observeOn()使用制定后面的操作符为 io 线程,接着observeOn()又指定后面的操作符为主线程...以此类推不再赘述


接下来到了我要说的重点了也是我受到迷惑的地方,我就简单的写一段栗子来重现一下,直接上代码

      Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        Log.e(TAG, "===create: " + Thread.currentThread().getName());
                        subscriber.onNext("1");
                    }
                })
                .map(new Func1<String, Integer>() {
                    @Override
                    public Integer call(String s) {
                        Log.e(TAG, "===String -> Integer: " + Thread.currentThread().getName());
                        return Integer.valueOf(s);
                    }
                })
                .flatMap(new Func1<Integer, Observable<String>>() {
                    @Override
                    public Observable<String> call(final Integer integer) {
                        Log.e(TAG, "===Integer->Observable: " + Thread.currentThread().getName());
                        return forEach(integer);
                    }
                })
                .map(new Func1<String, Long>() {
                    @Override
                    public Long call(String s) {
                        Log.e(TAG, "===String->Long: " + Thread.currentThread().getName());
                        return Long.parseLong(s);
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Long>() {
                    @Override
                    public void onCompleted() {
                    }

                    @Override
                    public void onError(Throwable e) {
                    }

                    @Override
                    public void onNext(Long aLong) {
                        Log.e(TAG, "===onNext: " + Thread.currentThread().getName());
                    }
                });

可以看到 flatMap 那个操作符哪里使用了一个方法 forEach(int) 代码如下

 public Observable<String> forEach(final int integer) {
        return Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        Log.e(TAG, "===Observable<String> call: " + Thread.currentThread().getName());
                        for (int i = 0; i < integer; i++) {
                            subscriber.onNext(i + "");
                        }
                        subscriber.onCompleted();
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread());
    }

再贴一下结果

===create: RxIoScheduler-2
===String -> Integer: RxIoScheduler-2
===Integer->Observable: RxIoScheduler-2
===Observable<String> call: RxIoScheduler-3
===String->Long: main
===onNext: main

解释一下结果:我想大家肯定发现了端倪,为什么 String -> Long 这个步骤为什么会在主线程里运行呢,原因呢很简单 flatMap 变换被监听者,这个被监听者使用observeOn切换了后边操作符的线程,影响到了 flatMap 后面的 map 操作符所以导致了如此结果

紧接着我们在看一个例子

Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        Log.e(TAG, "===create: " + Thread.currentThread().getName());
                        subscriber.onNext("1");
                    }
                })
                .map(new Func1<String, Integer>() {
                    @Override
                    public Integer call(String s) {
                        Log.e(TAG, "===String -> Integer: " + Thread.currentThread().getName());
                        return Integer.valueOf(s);
                    }
                })
                .flatMap(new Func1<Integer, Observable<String>>() {
                    @Override
                    public Observable<String> call(final Integer integer) {
                        Log.e(TAG, "===Integer->Observable: " + Thread.currentThread().getName());
                        return Observable.create(new Observable.OnSubscribe<String>() {
                            @Override
                            public void call(final Subscriber<? super String> subscriber) {
                                forEach(integer, new CallBack() {
                                    @Override
                                    public void call(String s) {
                                        Log.e(TAG, "===Subscriber: " + Thread.currentThread().getName());
                                        subscriber.onNext(s);
                                    }
                                });
                            }
                        });
                    }
                })
                .map(new Func1<String, Long>() {
                    @Override
                    public Long call(String s) {
                        Log.e(TAG, "===String->Long: " + Thread.currentThread().getName());
                        return Long.parseLong(s);
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Long>() {
                    @Override
                    public void onCompleted() {
                    }

                    @Override
                    public void onError(Throwable e) {
                    }

                    @Override
                    public void onNext(Long aLong) {
                        Log.e(TAG, "===onNext: " + Thread.currentThread().getName());
                    }
                });
public void forEach(final int integer, final CallBack back) {
        Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        Log.e(TAG, "===Observable<String> call: " + Thread.currentThread()
                                .getName());
                        for (int i = 0; i < integer; i++) {
                            subscriber.onNext(i + "");
                        }
                        subscriber.onCompleted();
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        back.call(s);
                    }
                });
    }

    interface CallBack {
        void call(String s);
    }

结果

===create: RxIoScheduler-4
===String -> Integer: RxIoScheduler-4
===Integer->Observable: RxIoScheduler-4
===Observable<String> call: RxIoScheduler-5
===Subscriber: main
===String->Long: main
===onNext: main

大家发现的现在还是 和上一个例子相仿,主要的改变还是在 flatMap() 这个方法里,现在我的做法是创建一个被监听者,然后里面是调用forEach()等待接口返回值,再往下发射数据。
注意以下几点:

  1. forEach()方法里面切换了线程,这个回调接口的线程为 主线程
  2. 我们在flatMap()这里新建了一个发射源(被监听者)、在这里我们并没有指定它的线程,所以flatMap()是什么线程,这个被监听者就是什么线程

看清以上两点,我们分析一下,flatMap()以上的结果大家肯定都能想出来,我们就直接从flatMap()开始分析了,都知道了回调的接口里面的线程为主线程,那么它作为被观察者它下面的操作符按照 RxJava 线程切换的基本原理来说,肯定也是主线程。

上个图理解一下

img_231588e2a52c4b3f99c4f8d46b9934a4.png
所以大家如果像我这样使用 flatMap() 的时候一定注意下面操作符的线程

总结

今天去调一个 Js 的方法,然后有一个接口回调,无意中看见一个 名字奇怪的 线程名称,一想 RxJava 的 io 线程名 应该是 RxIoScheduler 开头的啊,突然有点蒙,感觉自己一点都不了解 RxJava 的线程 切换了,结果分析一波 也是万变不离其宗,还是挺有意思的一个小插曲

欢迎各位前来拍砖

目录
相关文章
|
1月前
|
Java 调度 Android开发
构建高效Android应用:探究Kotlin多线程编程
【2月更文挑战第17天】 在现代移动开发领域,性能优化一直是开发者关注的焦点。特别是在Android平台上,合理利用多线程技术可以显著提升应用程序的响应性和用户体验。本文将深入探讨使用Kotlin进行Android多线程编程的策略与实践,旨在为开发者提供系统化的解决方案和性能提升技巧。我们将从基础概念入手,逐步介绍高级特性,并通过实际案例分析如何有效利用Kotlin协程、线程池以及异步任务处理机制来构建一个更加高效的Android应用。
39 4
|
1月前
|
API 数据库 Android开发
构建高效Android应用:探究Kotlin多线程优化策略
【2月更文挑战第14天】随着移动设备性能的日益强大,用户对应用程序的响应速度和流畅性要求越来越高。在Android开发中,合理利用多线程技术是提升应用性能的关键手段之一。Kotlin作为一种现代的编程语言,其协程特性为开发者提供了更为简洁高效的多线程处理方式。本文将深入探讨使用Kotlin进行Android多线程编程的最佳实践,包括协程的基本概念、优势以及在实际项目中的应用场景和性能优化技巧,旨在帮助开发者构建更加高效稳定的Android应用。
|
3月前
|
Java 调度 数据库
Android 性能优化: 如何进行多线程编程以提高应用性能?
Android 性能优化: 如何进行多线程编程以提高应用性能?
46 0
|
7月前
|
存储 SQL 安全
Android面试中问的线程相关问题
Android面试中问的线程相关问题
40 0
|
6天前
|
Java API 调度
安卓多线程和并发处理:提高应用效率
【4月更文挑战第13天】本文探讨了安卓应用中多线程和并发处理的优化方法,包括使用Thread、AsyncTask、Loader、IntentService、JobScheduler、WorkManager以及线程池。此外,还介绍了RxJava和Kotlin协程作为异步编程工具。理解并恰当运用这些技术能提升应用效率,避免UI卡顿,确保良好用户体验。随着安卓技术发展,更高级的异步处理工具将助力开发者构建高性能应用。
|
17天前
|
安全 Linux API
Android进程与线程
Android进程与线程
18 0
|
1月前
|
Java Android开发 开发者
构建高效Android应用:探究Kotlin多线程优化策略
【2月更文挑战第17天】 随着移动设备性能的不断提升,用户对应用的响应速度和稳定性要求越来越高。在Android开发中,Kotlin语言以其简洁、安全的特点受到开发者青睐。然而,面对复杂的多线程任务,如何有效利用Kotlin进行优化,以提升应用性能,是本文探讨的重点。通过分析Kotlin并发工具的使用场景与限制,结合实例演示其在Android开发中的实践,旨在为开发者提供实用的多线程处理指南。
|
7月前
|
算法 Java Android开发
Android rxjava和LiveData中的内存泄漏
Android rxjava和LiveData中的内存泄漏
118 0
|
8月前
|
Android开发
Android 中ProgressDialog进度条对话框的使用(使用子线程模拟更新进度)
Android 中ProgressDialog进度条对话框的使用(使用子线程模拟更新进度)
100 0
|
8月前
|
安全 Java Android开发
Android 中AsyncTask后台线程,异步任务的理解
Android 中AsyncTask后台线程,异步任务的理解
100 0