Android:随笔——RxJava的线程切换

简介: 转载请标明地址 QuincySx:[http://www.jianshu.com/p/d9da64774f7b]近期用到 RxJava ,线程切换的时候出了点小插曲,首先先上理论,在上实践,不喜理论可跳过,此篇文章适合会使用 RxJava 的人...

转载请标明地址 QuincySx:[http://www.jianshu.com/p/d9da64774f7b]


近期用到 RxJava ,线程切换的时候出了点小插曲,首先先上理论,在上实践,不喜理论可跳过,此篇文章适合会使用 RxJava 的人群,如果还没有接触过可以自学过后再来读这篇文章,这篇文章这几个例子其实代码都是基本都是一样的,我也不知道这样写是不是更清晰

理论

总所周知 RxJava 在切换线程时用到了两个方法 subscribeOn()observeOn() 下面来分别解释一下这两个方法

  • subscribeOn() : 影响的是最开始的被观察者所在的线程。当使用多个 subscribeOn() 的时候,只有第一个 subscribeOn() 起作用;
  • observeOn() : 影响的是跟在后面的操作(指定观察者运行的线程)。所以如果想要多次改变线程,可以多次使用 observeOn;

我之前还看到有人说 subscribeOn() 必须在 observeOn() 的前面,不过经过我测试他两个的位置并没有什么联系,就如上面所说 第一次出现 subscribeOn() 的地方是有效的,其他的无效


实践

实践一下,先来个基本的栗子


img_3faa948d9999311d91832caee7452f57.png
        Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        Log.e(TAG, "===create: " + Thread.currentThread().getName());
                        subscriber.onNext("1");
                    }
                })
                .map(new Func1<String, Integer>() {
                    @Override
                    public Integer call(String s) {
                        Log.e(TAG, "===String -> Integer: " + Thread.currentThread().getName());
                        return Integer.valueOf(s);
                    }
                })
                .flatMap(new Func1<Integer, Observable<String>>() {
                    @Override
                    public Observable<String> call(final Integer integer) {
                        Log.e(TAG, "===Integer->Observable: " + Thread.currentThread().getName());
                        return Observable.create(new Observable.OnSubscribe<String>() {
                            @Override
                            public void call(Subscriber<? super String> subscriber) {
                                Log.e(TAG, "===Observable<String> call: " + Thread.currentThread().getName());
                                for (int i = 0; i < integer; i++) {
                                    subscriber.onNext(i + "");
                                }
                                subscriber.onCompleted();
                            }
                        });
                    }
                })
                .map(new Func1<String, Long>() {
                    @Override
                    public Long call(String s) {
                        Log.e(TAG, "===String->Long: " + Thread.currentThread().getName());
                        return Long.parseLong(s);
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Long>() {
                    @Override
                    public void onCompleted() {
                    }

                    @Override
                    public void onError(Throwable e) {
                    }

                    @Override
                    public void onNext(Long aLong) {
                        Log.e(TAG, "===onNext: " + Thread.currentThread().getName());
                    }
                });

这个例子呢就是简单的将 String 转为 Integer 然后转换发射源 发射 String 在然后 将 String 转换为 Long 然后打印
这个例子呢没有什么实际意义,只作为个示例
有没有看到

.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())

这两句代码 我切换了一下线程 ,接下来看一下我的运行结果

===create: RxIoScheduler-2
===String -> Integer: RxIoScheduler-2
===Integer->Observable: RxIoScheduler-2
===Observable<String> call: RxIoScheduler-2
===String->Long: RxIoScheduler-2
===onNext: main

接下来解释一下,因为subscribeOn(Schedulers.io())它指定了最开始的被观察者所在的线程所以后面的操作都是根据最开始的被观察者制定的线程运行的,又因为 .observeOn(AndroidSchedulers.mainThread()) 它指定了都面的操作符使用主线程运行。


接下来再写一个多 subscribeOn() observeOn()的情况

      Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        Log.e(TAG, "===create: " + Thread.currentThread().getName());
                        subscriber.onNext("1");
                    }
                })
                .subscribeOn(AndroidSchedulers.mainThread())
                .observeOn(Schedulers.io())
                .map(new Func1<String, Integer>() {
                    @Override
                    public Integer call(String s) {
                        Log.e(TAG, "===String -> Integer: " + Thread.currentThread().getName());
                        return Integer.valueOf(s);
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .flatMap(new Func1<Integer, Observable<String>>() {
                    @Override
                    public Observable<String> call(final Integer integer) {
                        Log.e(TAG, "===Integer->Observable: " + Thread.currentThread().getName());
                        return Observable.create(new Observable.OnSubscribe<String>() {
                            @Override
                            public void call(Subscriber<? super String> subscriber) {
                                Log.e(TAG, "===Observable<String> call: " + Thread.currentThread().getName());
                                for (int i = 0; i < integer; i++) {
                                    subscriber.onNext(i + "");
                                }
                                subscriber.onCompleted();
                            }
                        });
                    }
                })
                .observeOn(Schedulers.io())
                .map(new Func1<String, Long>() {
                    @Override
                    public Long call(String s) {
                        Log.e(TAG, "===String->Long: " + Thread.currentThread().getName());
                        return Long.parseLong(s);
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Long>() {
                    @Override
                    public void onCompleted() {
                    }

                    @Override
                    public void onError(Throwable e) {
                    }

                    @Override
                    public void onNext(Long aLong) {
                        Log.e(TAG, "===onNext: " + Thread.currentThread().getName());
                    }
                });

运行结果

===create: main
===String -> Integer: RxIoScheduler-4
===Integer->Observable: main
===Observable<String> call: main
===String->Long: RxIoScheduler-3
===onNext: main

下面应该不用我解释就能知道结果为什么是这样的
简单解释一下 因为有两个subscribeOn()所以取第一个,所以最开始的被观察者所在的线程为主线程,接着使用observeOn()使用制定后面的操作符为 io 线程,接着observeOn()又指定后面的操作符为主线程...以此类推不再赘述


接下来到了我要说的重点了也是我受到迷惑的地方,我就简单的写一段栗子来重现一下,直接上代码

      Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        Log.e(TAG, "===create: " + Thread.currentThread().getName());
                        subscriber.onNext("1");
                    }
                })
                .map(new Func1<String, Integer>() {
                    @Override
                    public Integer call(String s) {
                        Log.e(TAG, "===String -> Integer: " + Thread.currentThread().getName());
                        return Integer.valueOf(s);
                    }
                })
                .flatMap(new Func1<Integer, Observable<String>>() {
                    @Override
                    public Observable<String> call(final Integer integer) {
                        Log.e(TAG, "===Integer->Observable: " + Thread.currentThread().getName());
                        return forEach(integer);
                    }
                })
                .map(new Func1<String, Long>() {
                    @Override
                    public Long call(String s) {
                        Log.e(TAG, "===String->Long: " + Thread.currentThread().getName());
                        return Long.parseLong(s);
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Long>() {
                    @Override
                    public void onCompleted() {
                    }

                    @Override
                    public void onError(Throwable e) {
                    }

                    @Override
                    public void onNext(Long aLong) {
                        Log.e(TAG, "===onNext: " + Thread.currentThread().getName());
                    }
                });

可以看到 flatMap 那个操作符哪里使用了一个方法 forEach(int) 代码如下

 public Observable<String> forEach(final int integer) {
        return Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        Log.e(TAG, "===Observable<String> call: " + Thread.currentThread().getName());
                        for (int i = 0; i < integer; i++) {
                            subscriber.onNext(i + "");
                        }
                        subscriber.onCompleted();
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread());
    }

再贴一下结果

===create: RxIoScheduler-2
===String -> Integer: RxIoScheduler-2
===Integer->Observable: RxIoScheduler-2
===Observable<String> call: RxIoScheduler-3
===String->Long: main
===onNext: main

解释一下结果:我想大家肯定发现了端倪,为什么 String -> Long 这个步骤为什么会在主线程里运行呢,原因呢很简单 flatMap 变换被监听者,这个被监听者使用observeOn切换了后边操作符的线程,影响到了 flatMap 后面的 map 操作符所以导致了如此结果

紧接着我们在看一个例子

Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        Log.e(TAG, "===create: " + Thread.currentThread().getName());
                        subscriber.onNext("1");
                    }
                })
                .map(new Func1<String, Integer>() {
                    @Override
                    public Integer call(String s) {
                        Log.e(TAG, "===String -> Integer: " + Thread.currentThread().getName());
                        return Integer.valueOf(s);
                    }
                })
                .flatMap(new Func1<Integer, Observable<String>>() {
                    @Override
                    public Observable<String> call(final Integer integer) {
                        Log.e(TAG, "===Integer->Observable: " + Thread.currentThread().getName());
                        return Observable.create(new Observable.OnSubscribe<String>() {
                            @Override
                            public void call(final Subscriber<? super String> subscriber) {
                                forEach(integer, new CallBack() {
                                    @Override
                                    public void call(String s) {
                                        Log.e(TAG, "===Subscriber: " + Thread.currentThread().getName());
                                        subscriber.onNext(s);
                                    }
                                });
                            }
                        });
                    }
                })
                .map(new Func1<String, Long>() {
                    @Override
                    public Long call(String s) {
                        Log.e(TAG, "===String->Long: " + Thread.currentThread().getName());
                        return Long.parseLong(s);
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Long>() {
                    @Override
                    public void onCompleted() {
                    }

                    @Override
                    public void onError(Throwable e) {
                    }

                    @Override
                    public void onNext(Long aLong) {
                        Log.e(TAG, "===onNext: " + Thread.currentThread().getName());
                    }
                });
public void forEach(final int integer, final CallBack back) {
        Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        Log.e(TAG, "===Observable<String> call: " + Thread.currentThread()
                                .getName());
                        for (int i = 0; i < integer; i++) {
                            subscriber.onNext(i + "");
                        }
                        subscriber.onCompleted();
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        back.call(s);
                    }
                });
    }

    interface CallBack {
        void call(String s);
    }

结果

===create: RxIoScheduler-4
===String -> Integer: RxIoScheduler-4
===Integer->Observable: RxIoScheduler-4
===Observable<String> call: RxIoScheduler-5
===Subscriber: main
===String->Long: main
===onNext: main

大家发现的现在还是 和上一个例子相仿,主要的改变还是在 flatMap() 这个方法里,现在我的做法是创建一个被监听者,然后里面是调用forEach()等待接口返回值,再往下发射数据。
注意以下几点:

  1. forEach()方法里面切换了线程,这个回调接口的线程为 主线程
  2. 我们在flatMap()这里新建了一个发射源(被监听者)、在这里我们并没有指定它的线程,所以flatMap()是什么线程,这个被监听者就是什么线程

看清以上两点,我们分析一下,flatMap()以上的结果大家肯定都能想出来,我们就直接从flatMap()开始分析了,都知道了回调的接口里面的线程为主线程,那么它作为被观察者它下面的操作符按照 RxJava 线程切换的基本原理来说,肯定也是主线程。

上个图理解一下

img_231588e2a52c4b3f99c4f8d46b9934a4.png
所以大家如果像我这样使用 flatMap() 的时候一定注意下面操作符的线程

总结

今天去调一个 Js 的方法,然后有一个接口回调,无意中看见一个 名字奇怪的 线程名称,一想 RxJava 的 io 线程名 应该是 RxIoScheduler 开头的啊,突然有点蒙,感觉自己一点都不了解 RxJava 的线程 切换了,结果分析一波 也是万变不离其宗,还是挺有意思的一个小插曲

欢迎各位前来拍砖

目录
相关文章
|
5月前
|
Java Android开发 UED
🧠Android多线程与异步编程实战!告别卡顿,让应用响应如丝般顺滑!🧵
【7月更文挑战第28天】在Android开发中,确保UI流畅性至关重要。多线程与异步编程技术可将耗时操作移至后台,避免阻塞主线程。我们通常采用`Thread`类、`Handler`与`Looper`、`AsyncTask`及`ExecutorService`等进行多线程编程。
61 2
|
7天前
|
API Android开发 iOS开发
深入探索Android与iOS的多线程编程差异
在移动应用开发领域,多线程编程是提高应用性能和响应性的关键。本文将对比分析Android和iOS两大平台在多线程处理上的不同实现机制,探讨它们各自的优势与局限性,并通过实例展示如何在这两个平台上进行有效的多线程编程。通过深入了解这些差异,开发者可以更好地选择适合自己项目需求的技术和策略,从而优化应用的性能和用户体验。
|
5月前
|
Java 调度 Android开发
深入解析Android应用开发中的响应式编程与RxJava应用
在现代Android应用开发中,响应式编程及其核心框架RxJava正逐渐成为开发者的首选。本文将深入探讨响应式编程的基本概念、RxJava的核心特性以及如何在Android应用中利用RxJava提升代码的可读性和性能。 【7月更文挑战第7天】
45 1
|
5月前
|
Java Android开发
Android面试题经典之Glide取消加载以及线程池优化
Glide通过生命周期管理在`onStop`时暂停请求,`onDestroy`时取消请求,减少资源浪费。在`EngineJob`和`DecodeJob`中使用`cancel`方法标记任务并中断数据获取。当网络请求被取消时,`HttpUrlFetcher`的`cancel`方法设置标志,之后的数据获取会返回`null`,中断加载流程。Glide还使用定制的线程池,如AnimationExecutor、diskCacheExecutor、sourceExecutor和newUnlimitedSourceExecutor,其中某些禁止网络访问,并根据CPU核心数动态调整线程数。
150 2
|
2月前
|
缓存 数据处理 Android开发
在 Android 中使用 RxJava 更新 View
【10月更文挑战第20天】使用 RxJava 来更新 View 可以提供更优雅、更高效的解决方案。通过合理地运用操作符和订阅机制,我们能够轻松地处理异步数据并在主线程中进行 View 的更新。在实际应用中,需要根据具体情况进行灵活运用,并注意相关的注意事项和性能优化,以确保应用的稳定性和流畅性。可以通过不断的实践和探索,进一步掌握在 Android 中使用 RxJava 更新 View 的技巧和方法,为开发高质量的 Android 应用提供有力支持。
|
2月前
|
调度 Android开发 开发者
构建高效Android应用:探究Kotlin多线程优化策略
【10月更文挑战第11天】本文探讨了如何在Kotlin中实现高效的多线程方案,特别是在Android应用开发中。通过介绍Kotlin协程的基础知识、异步数据加载的实际案例,以及合理使用不同调度器的方法,帮助开发者提升应用性能和用户体验。
55 4
|
3月前
|
Java 数据库 Android开发
一个Android App最少有几个线程?实现多线程的方式有哪些?
本文介绍了Android多线程编程的重要性及其实现方法,涵盖了基本概念、常见线程类型(如主线程、工作线程)以及多种多线程实现方式(如`Thread`、`HandlerThread`、`Executors`、Kotlin协程等)。通过合理的多线程管理,可大幅提升应用性能和用户体验。
134 15
一个Android App最少有几个线程?实现多线程的方式有哪些?
|
3月前
|
Java Android开发 UED
🧠Android多线程与异步编程实战!告别卡顿,让应用响应如丝般顺滑!🧵
在Android开发中,为应对复杂应用场景和繁重计算任务,多线程与异步编程成为保证UI流畅性的关键。本文将介绍Android中的多线程基础,包括Thread、Handler、Looper、AsyncTask及ExecutorService等,并通过示例代码展示其实用性。AsyncTask适用于简单后台操作,而ExecutorService则能更好地管理复杂并发任务。合理运用这些技术,可显著提升应用性能和用户体验,避免内存泄漏和线程安全问题,确保UI更新顺畅。
109 5
|
3月前
|
Java 数据库 Android开发
一个Android App最少有几个线程?实现多线程的方式有哪些?
本文介绍了Android应用开发中的多线程编程,涵盖基本概念、常见实现方式及最佳实践。主要内容包括主线程与工作线程的作用、多线程的多种实现方法(如 `Thread`、`HandlerThread`、`Executors` 和 Kotlin 协程),以及如何避免内存泄漏和合理使用线程池。通过有效的多线程管理,可以显著提升应用性能和用户体验。
79 10
|
3月前
|
API Android开发 iOS开发
安卓与iOS开发中的线程管理对比
【9月更文挑战第12天】在移动应用的世界中,安卓和iOS平台各自拥有庞大的用户群体。开发者们在这两个平台上构建应用时,线程管理是他们必须面对的关键挑战之一。本文将深入探讨两大平台在线程管理方面的异同,通过直观的代码示例,揭示它们各自的设计理念和实现方式,帮助读者更好地理解如何在安卓与iOS开发中高效地处理多线程任务。