近期用到 RxJava ,线程切换的时候出了点小插曲,首先先上理论,在上实践,不喜理论可跳过,此篇文章适合会使用 RxJava 的人群,如果还没有接触过可以自学过后再来读这篇文章,这篇文章这几个例子其实代码都是基本都是一样的,我也不知道这样写是不是更清晰
理论
总所周知 RxJava 在切换线程时用到了两个方法 subscribeOn()
和 observeOn()
下面来分别解释一下这两个方法
- subscribeOn() : 影响的是最开始的被观察者所在的线程。当使用多个 subscribeOn() 的时候,只有第一个 subscribeOn() 起作用;
- observeOn() : 影响的是跟在后面的操作(指定观察者运行的线程)。所以如果想要多次改变线程,可以多次使用 observeOn;
我之前还看到有人说 subscribeOn()
必须在 observeOn()
的前面,不过经过我测试他两个的位置并没有什么联系,就如上面所说 第一次出现 subscribeOn()
的地方是有效的,其他的无效
实践
实践一下,先来个基本的栗子
Observable
.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
Log.e(TAG, "===create: " + Thread.currentThread().getName());
subscriber.onNext("1");
}
})
.map(new Func1<String, Integer>() {
@Override
public Integer call(String s) {
Log.e(TAG, "===String -> Integer: " + Thread.currentThread().getName());
return Integer.valueOf(s);
}
})
.flatMap(new Func1<Integer, Observable<String>>() {
@Override
public Observable<String> call(final Integer integer) {
Log.e(TAG, "===Integer->Observable: " + Thread.currentThread().getName());
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
Log.e(TAG, "===Observable<String> call: " + Thread.currentThread().getName());
for (int i = 0; i < integer; i++) {
subscriber.onNext(i + "");
}
subscriber.onCompleted();
}
});
}
})
.map(new Func1<String, Long>() {
@Override
public Long call(String s) {
Log.e(TAG, "===String->Long: " + Thread.currentThread().getName());
return Long.parseLong(s);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Long aLong) {
Log.e(TAG, "===onNext: " + Thread.currentThread().getName());
}
});
这个例子呢就是简单的将 String 转为 Integer 然后转换发射源 发射 String 在然后 将 String 转换为 Long 然后打印
这个例子呢没有什么实际意义,只作为个示例
有没有看到
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
这两句代码 我切换了一下线程 ,接下来看一下我的运行结果
===create: RxIoScheduler-2
===String -> Integer: RxIoScheduler-2
===Integer->Observable: RxIoScheduler-2
===Observable<String> call: RxIoScheduler-2
===String->Long: RxIoScheduler-2
===onNext: main
接下来解释一下,因为subscribeOn(Schedulers.io())
它指定了最开始的被观察者所在的线程所以后面的操作都是根据最开始的被观察者制定的线程运行的,又因为 .observeOn(AndroidSchedulers.mainThread())
它指定了都面的操作符使用主线程运行。
接下来再写一个多 subscribeOn()
observeOn()
的情况
Observable
.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
Log.e(TAG, "===create: " + Thread.currentThread().getName());
subscriber.onNext("1");
}
})
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(Schedulers.io())
.map(new Func1<String, Integer>() {
@Override
public Integer call(String s) {
Log.e(TAG, "===String -> Integer: " + Thread.currentThread().getName());
return Integer.valueOf(s);
}
})
.observeOn(AndroidSchedulers.mainThread())
.flatMap(new Func1<Integer, Observable<String>>() {
@Override
public Observable<String> call(final Integer integer) {
Log.e(TAG, "===Integer->Observable: " + Thread.currentThread().getName());
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
Log.e(TAG, "===Observable<String> call: " + Thread.currentThread().getName());
for (int i = 0; i < integer; i++) {
subscriber.onNext(i + "");
}
subscriber.onCompleted();
}
});
}
})
.observeOn(Schedulers.io())
.map(new Func1<String, Long>() {
@Override
public Long call(String s) {
Log.e(TAG, "===String->Long: " + Thread.currentThread().getName());
return Long.parseLong(s);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Long aLong) {
Log.e(TAG, "===onNext: " + Thread.currentThread().getName());
}
});
运行结果
===create: main
===String -> Integer: RxIoScheduler-4
===Integer->Observable: main
===Observable<String> call: main
===String->Long: RxIoScheduler-3
===onNext: main
下面应该不用我解释就能知道结果为什么是这样的
简单解释一下 因为有两个subscribeOn()
所以取第一个,所以最开始的被观察者所在的线程为主线程,接着使用observeOn()
使用制定后面的操作符为 io 线程,接着observeOn()
又指定后面的操作符为主线程...以此类推不再赘述
接下来到了我要说的重点了也是我受到迷惑的地方,我就简单的写一段栗子来重现一下,直接上代码
Observable
.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
Log.e(TAG, "===create: " + Thread.currentThread().getName());
subscriber.onNext("1");
}
})
.map(new Func1<String, Integer>() {
@Override
public Integer call(String s) {
Log.e(TAG, "===String -> Integer: " + Thread.currentThread().getName());
return Integer.valueOf(s);
}
})
.flatMap(new Func1<Integer, Observable<String>>() {
@Override
public Observable<String> call(final Integer integer) {
Log.e(TAG, "===Integer->Observable: " + Thread.currentThread().getName());
return forEach(integer);
}
})
.map(new Func1<String, Long>() {
@Override
public Long call(String s) {
Log.e(TAG, "===String->Long: " + Thread.currentThread().getName());
return Long.parseLong(s);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Long aLong) {
Log.e(TAG, "===onNext: " + Thread.currentThread().getName());
}
});
可以看到 flatMap
那个操作符哪里使用了一个方法 forEach(int)
代码如下
public Observable<String> forEach(final int integer) {
return Observable
.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
Log.e(TAG, "===Observable<String> call: " + Thread.currentThread().getName());
for (int i = 0; i < integer; i++) {
subscriber.onNext(i + "");
}
subscriber.onCompleted();
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
再贴一下结果
===create: RxIoScheduler-2
===String -> Integer: RxIoScheduler-2
===Integer->Observable: RxIoScheduler-2
===Observable<String> call: RxIoScheduler-3
===String->Long: main
===onNext: main
解释一下结果:我想大家肯定发现了端倪,为什么 String -> Long 这个步骤为什么会在主线程里运行呢,原因呢很简单 flatMap 变换被监听者,这个被监听者使用observeOn
切换了后边操作符的线程,影响到了 flatMap 后面的 map 操作符所以导致了如此结果
紧接着我们在看一个例子
Observable
.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
Log.e(TAG, "===create: " + Thread.currentThread().getName());
subscriber.onNext("1");
}
})
.map(new Func1<String, Integer>() {
@Override
public Integer call(String s) {
Log.e(TAG, "===String -> Integer: " + Thread.currentThread().getName());
return Integer.valueOf(s);
}
})
.flatMap(new Func1<Integer, Observable<String>>() {
@Override
public Observable<String> call(final Integer integer) {
Log.e(TAG, "===Integer->Observable: " + Thread.currentThread().getName());
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(final Subscriber<? super String> subscriber) {
forEach(integer, new CallBack() {
@Override
public void call(String s) {
Log.e(TAG, "===Subscriber: " + Thread.currentThread().getName());
subscriber.onNext(s);
}
});
}
});
}
})
.map(new Func1<String, Long>() {
@Override
public Long call(String s) {
Log.e(TAG, "===String->Long: " + Thread.currentThread().getName());
return Long.parseLong(s);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Long aLong) {
Log.e(TAG, "===onNext: " + Thread.currentThread().getName());
}
});
public void forEach(final int integer, final CallBack back) {
Observable
.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
Log.e(TAG, "===Observable<String> call: " + Thread.currentThread()
.getName());
for (int i = 0; i < integer; i++) {
subscriber.onNext(i + "");
}
subscriber.onCompleted();
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
back.call(s);
}
});
}
interface CallBack {
void call(String s);
}
结果
===create: RxIoScheduler-4
===String -> Integer: RxIoScheduler-4
===Integer->Observable: RxIoScheduler-4
===Observable<String> call: RxIoScheduler-5
===Subscriber: main
===String->Long: main
===onNext: main
大家发现的现在还是 和上一个例子相仿,主要的改变还是在 flatMap()
这个方法里,现在我的做法是创建一个被监听者,然后里面是调用forEach()
等待接口返回值,再往下发射数据。
注意以下几点:
- 在
forEach()
方法里面切换了线程,这个回调接口的线程为 主线程 - 我们在
flatMap()
这里新建了一个发射源(被监听者)、在这里我们并没有指定它的线程,所以flatMap()
是什么线程,这个被监听者就是什么线程
看清以上两点,我们分析一下,flatMap()
以上的结果大家肯定都能想出来,我们就直接从flatMap()
开始分析了,都知道了回调的接口里面的线程为主线程,那么它作为被观察者它下面的操作符按照 RxJava 线程切换的基本原理来说,肯定也是主线程。
上个图理解一下
所以大家如果像我这样使用 flatMap()
的时候一定注意下面操作符的线程
总结
今天去调一个 Js 的方法,然后有一个接口回调,无意中看见一个 名字奇怪的 线程名称,一想 RxJava 的 io 线程名 应该是 RxIoScheduler 开头的啊,突然有点蒙,感觉自己一点都不了解 RxJava 的线程 切换了,结果分析一波 也是万变不离其宗,还是挺有意思的一个小插曲
欢迎各位前来拍砖