Flow
作为Coroutine版的RxJava
,同RxJava一样可以方便地进行线程切换。
本文针对两者在多线程场景中的使用区别进行一个简单对比。
1. RxJava
我们先来回顾一下RxJava中的线程切换
如上,RxJava使用subscriberOn
与observeOn
进行线程切换
subscribeOn
subscribeOn
用来决定在哪个线程进行订阅,对于Cold流来说即决定了数据的发射线程。使用中有两点注意:
- 当调用链上只有一个
subscribeOn
时,可以出现在任意位置
上面两种写法效果是一样的:都是在io线程订阅后发射数据
- 当调用链上有多个
subscribeOn
时,只有第一个生效:
上面第二个subscribeOn
没有意义
observeOn
observeOn
用来决定在哪个线程上响应:
observeOn
决定调用链上下游操作符执行的线程
上面绿线部分的代码将会运行在主线程
- 与
subscribeOn
不同,调用链上允许存在多个observeOn
且每个都有效
上面蓝色绿色部分因为observeOn
的存在分别切换到了不同线程执行
just
RxJava的初学者经常会犯的一个错误是在Observable.just(...)
里做耗时任务。 just并不是接受lambda,所以是立即执行的,不受subscribeOn
的影响
如上,loadDataSync()
不会在io
执行,
想要在io执行,需要使用Observable.deffer{}
flatMap
结合上面介绍的RxJava的线程切换,看下面这段代码
如果我们希望loadData(id)
并发执行,那么上面的写法是错误的。
subscribe(io())
意味着其上游的数据在单一线程中串行发射。因此虽然flatMap{}
返回多个Observable
, 都是都在单一线程中订阅,多个loadData
始终运行在同一线程。
代码经过一下修改后,可以达到并发执行的效果:
当订阅flatMap返回的Observable时,通过subscribeOn
分别指定订阅线程。
其他类似flatMap这种涉及多个Observable订阅的操作符(例如merge
、zip
等),需要留意各自的subscribeOn
的线程,以防不符合预期的行为出现。
2. Flow
接下来看一下 Flow的线程切换 。
Flow是基于CoroutineContext进行线程切换,所以这部分内容需要你对Croutine事先有基本的了解。
flowOn
类似于RxJava的subscribeOn,Flow中没有对应observeOn的操作符,因为collect
是一个suspend函数,必须在CoroutineScope
中执行,所以响应线程是由CoroutineContext
决定的。例如你在main中执行collect
,那么响应线程就是Dispatcher.Main
flowOn
说flowOn
类似于subscribeOn
,因为它们都可以用来决定上游线程
上面代码中,flowOn
前面代码将会在IO执行。
与subscribeOn
不同的是,flowOn允许出现多次,每个都会影响其前面的操作
上面代码,根据颜色可以看出来flowOn
影响的范围
launchIn
collect
是suspend函数,所以后续代码因为协程挂起不会继续执行
所以上面代码可能会不符合预期,因为第一个collect
不走完第二个走不到。
正确的写法是为每个collect
单独起一个协程
或者使用launchIn
,写法更加优雅launchIn
不会挂起协程,所以与RxJava的subscribe
更加接近。
通过名字可以感觉出来launchIn
只不过是之前例子中launch
的一个链式调用的语法糖。
flowOf
flowOf
类似于Observable.just()
,需要注意flowOf内的内容是立即执行的,不受flowOn
影响
希望calculate()
运行在IO,可以使用flow{ }
flatMapMerge
flatMapMerge
类似RxJava的flatMap
如上,2个item各自flatMap成2个item,即一共发射了4条数据,日志输出如下:
inner: pool-2-thread-2 @coroutine#4
inner: pool-2-thread-3 @coroutine#5
inner: pool-2-thread-3 @coroutine#5
inner: pool-2-thread-2 @coroutine#4
collect: pool-1-thread-2 @coroutine#2
collect: pool-1-thread-2 @coroutine#2
collect: pool-1-thread-2 @coroutine#2
collect: pool-1-thread-2 @coroutine#2
通过日志我们发现flowOn
虽然写在flatMapMerge
外面,inner
的日志却可以打印在多个线程上(都来自pool2线程池),这与flatMap
是不同的,同样场景下flatMap只能运行在线程池的固定线程上。
如果将flowOn
写在flatMapMerge
内部
结果如下:
inner: pool-2-thread-2 @coroutine#6
inner: pool-2-thread-1 @coroutine#7
inner: pool-2-thread-2 @coroutine#6
inner: pool-2-thread-1 @coroutine#7
collect: pool-1-thread-3 @coroutine#2
collect: pool-1-thread-3 @coroutine#2
collect: pool-1-thread-3 @coroutine#2
collect: pool-1-thread-3 @coroutine#2
inner
仍然打印在多个线程,flowOn
无论写在flatMapMerge
内部还是外部,对flatMapMerge内的处理没有区别。
但是flatMapMerge
之外还是有区别的,看下面两段代码
通过颜色可以知道flowOn
影响的范围,向上追溯到flowOf
为止
3. Summary
RxJava的Observable
与Coroutine的Flow
都支持线程切换,相关API的对比如下:
线程池调度 | 线程操作符 | 数据源同步创建 | 异步创建 | 并发执行 | |
---|---|---|---|---|---|
RxJava | Schedulers (io(), computation(), mainThread()) | subscribeOn, observeOn | just | deffer{} | flatMap(inner subscribeOn) |
Flow | Dispatchers (IO, Default, Main) | flowOn | flowOf | flow{} | flatMapMerge(inner or outer flowOn) |
最后通过一个例子看一下如何将代码从RxJava迁移到Flow
RxJava
RxJava代码如下:
使用到的Schedulers
定义如下:
代码执行结果:
1: pool-1-thread-1
1: pool-1-thread-1
1: pool-1-thread-1
2: pool-3-thread-1
2: pool-3-thread-1
2: pool-3-thread-1
inner 1: pool-4-thread-1
inner 1: pool-4-thread-2
inner 1: pool-4-thread-1
inner 1: pool-4-thread-1
inner 1: pool-4-thread-2
inner 1: pool-4-thread-2
inner 1: pool-4-thread-3
inner 2: pool-5-thread-1
inner 2: pool-5-thread-2
3: pool-5-thread-1
inner 2: pool-5-thread-2
inner 1: pool-4-thread-3
inner 2: pool-5-thread-2
inner 2: pool-5-thread-3
3: pool-5-thread-1
3: pool-5-thread-1
3: pool-5-thread-1
end: pool-6-thread-1
end: pool-6-thread-1
inner 1: pool-4-thread-3
end: pool-6-thread-1
3: pool-5-thread-1
inner 2: pool-5-thread-1
3: pool-5-thread-1
inner 2: pool-5-thread-3
inner 2: pool-5-thread-1
end: pool-6-thread-1
3: pool-5-thread-3
3: pool-5-thread-3
end: pool-6-thread-1
inner 2: pool-5-thread-3
3: pool-5-thread-3
end: pool-6-thread-1
end: pool-6-thread-1
end: pool-6-thread-1
end: pool-6-thread-1
代码较长,通过颜色标记法帮我们理清线程关系
上色后一目了然了,需要特别注意的是由于flatMap中切换了数据源的同时切换了线程,所以打印 3
的线程不是s2
而是 s4
Flow
首相创建对应的Dispatcher
然后将代码换成Flow的写法,主要遵循下列原则
- RxJava通过
observeOn
切换后续代码的线程 - Flow通过
flowOn
切换前置代码的线程
打印结果如下:
1: pool-1-thread-1 @coroutine#6
1: pool-1-thread-1 @coroutine#6
1: pool-1-thread-1 @coroutine#6
2: pool-2-thread-2 @coroutine#5
2: pool-2-thread-2 @coroutine#5
2: pool-2-thread-2 @coroutine#5
inner 1: pool-3-thread-1 @coroutine#10
inner 1: pool-3-thread-2 @coroutine#11
inner 1: pool-3-thread-3 @coroutine#12
inner 1: pool-3-thread-2 @coroutine#11
inner 1: pool-3-thread-3 @coroutine#12
inner 2: pool-4-thread-3 @coroutine#9
inner 1: pool-3-thread-1 @coroutine#10
inner 1: pool-3-thread-3 @coroutine#12
inner 1: pool-3-thread-2 @coroutine#11
inner 2: pool-4-thread-1 @coroutine#7
inner 2: pool-4-thread-2 @coroutine#8
inner 2: pool-4-thread-1 @coroutine#7
inner 2: pool-4-thread-3 @coroutine#9
inner 1: pool-3-thread-1 @coroutine#10
3: pool-4-thread-1 @coroutine#3
inner 2: pool-4-thread-3 @coroutine#9
inner 2: pool-4-thread-2 @coroutine#8
end: pool-5-thread-1 @coroutine#2
3: pool-4-thread-1 @coroutine#3
inner 2: pool-4-thread-2 @coroutine#8
3: pool-4-thread-1 @coroutine#3
end: pool-5-thread-1 @coroutine#2
3: pool-4-thread-1 @coroutine#3
end: pool-5-thread-1 @coroutine#2
end: pool-5-thread-1 @coroutine#2
3: pool-4-thread-1 @coroutine#3
3: pool-4-thread-1 @coroutine#3
end: pool-5-thread-1 @coroutine#2
end: pool-5-thread-1 @coroutine#2
3: pool-4-thread-1 @coroutine#3
3: pool-4-thread-1 @coroutine#3
end: pool-5-thread-1 @coroutine#2
end: pool-5-thread-1 @coroutine#2
inner 2: pool-4-thread-1 @coroutine#7
3: pool-4-thread-1 @coroutine#3
end: pool-5-thread-1 @coroutine#2
从日志可以看到,1
、2
、3
的时序性以及inner1
和inner2
的并发性与RxJava的一致。
4. FIN
Flow在线程切换方面可以完全取代RxJava的能力,而且将subscribeOn
和observeOn
两个操作符合二为一成flowOn
,学习成本更低。随着flow的操作符种类日趋完善,未来在Android/Kotlin开发中可以跟RxJava说再见了👋🏻