引子
App 开发中,等待多个异步结果的场景很多见,
比如并发地在后台执行若干个运算,待所有运算执行完毕后归总结果。
比如并发地请求若干个接口,待所有结果返回后刷新界面。
比如统计相册页并发加载 20 张图片的耗时。
其实把若干异步任务串行化是最简单的解决办法,即前一个异步任务执行完毕后再执行下一个。但这样就无法利用多核性能,执行时间被拉长,此时的执行总时长 = 所有任务执行时长的和。
若允许任务并发,则执行总时长 = 执行时间最长任务的耗时。时间性能得以优化,但随之而来的一个复杂度是:“如何等待所有异步结果”。
本文会介绍几种解决方案,并将它们运用到不同的业务场景,比对一下哪个方案适用于哪个场景。
等待并发网络请求
布尔值
假设有如下两个网络请求:
// 拉取新闻 fun fetchNews() { newsApi.fetchNews().enqueue(object : Callback<List<News>> { override fun onFailure(call: Call<List<News>>, t: Throwable) { ... } override fun onResponse(call: Call<List<News>>, response: Response<List<News>>) { ... } }) } // 拉取广告 fun fetchAd() { newsApi.fetchAd().enqueue(object : Callback<List<Ad>> { override fun onFailure(call: Call<List<Ad>>, t: Throwable) { ... } override fun onResponse(call: Call<List<Ad>>, response: Response<List<Ad>>) { ... } }) }
广告需要按一定规则插入到新闻列表中。
最简单的做法是,先请求新闻,待其返回后再请求广告。显然这会增加用户等待时间。而且会写出这样的代码:
// 拉取新闻 fun fetchNews() { newsApi.fetchNews().enqueue(object : Callback<News> { override fun onFailure(call: Call<News>, t: Throwable) { ... } override fun onResponse(call: Call<News>, response: Response<News>) { // 拉取广告 newsApi.fetchAd().enqueue(object : Callback<Ad> { override fun onFailure(call: Call<Ad>, t: Throwable) { ... } override fun onResponse(call: Call<Ad>, response: Response<Ad>) { ... } }) } }) }
嵌套回调,若再加一个接口,回调层次就会再加一层,不能忍。 用户和程序员的体验都不好,得想办法解决。
第一个想到的方案是布尔值:
var isNewsDone = false var isAdDone = false var news = emptyList() var ads = emptyList() // 拉取新闻 fun fetchNews() { newsApi.fetchNews().enqueue(object : Callback<List<News>> { override fun onFailure(call: Call<List<News>>, t: Throwable) { isNewsDone = true tryRefresh(news, ad) } override fun onResponse(call: Call<List<News>>, response: Response<List<News>>) { isNewsDone = true news = response.body().result tryRefresh(news, ad) } }) } // 拉取广告 fun fetchAd() { newsApi.fetchAd().enqueue(object : Callback<List<Ad>> { override fun onFailure(call: Call<List<Ad>>, t: Throwable) { isAdDone = true tryRefresh(news, ad) } override fun onResponse(call: Call<List<Ad>>, response: Response<List<Ad>>) { isAdDone = true ads = response.body().result tryRefresh(news, ad) } }) } // 尝试刷新界面(只有当两个请求都返回时才刷新) fun tryRefresh(news: List<News>, ads: List<Ad>) { if(isNewsDone && isAdDone){ //刷新界面 } }
设置两个布尔值分别对应两个请求是否返回,并且在每个请求返回时检测两个布尔值,若都为 true 则进行刷新界面。
网络库通常会将请求成功的回调抛到主线程执行,所以这里没有线程安全问题。但如果不是网络请求,而是后台任务,此时需要将布尔值声明为volatile
保证其可见性,关于 volatile 更详细的解释可以点击面试题 | 徒手写一个非阻塞线程安全队列 ConcurrentLinkedQueue?。
这个方案能解决问题,但只适用于并发请求数量很少的请求,因为每个请求都要声明一个布尔值。而且每增加一个请求都要修改其余请求的代码,可维护性差。
CountdownLatch
更好的方案是CountDownLatch
,它是java.util.concurrent
包下的一个类,用来等待多个异步结果,用法如下:
val countdownLatch = CountDownLatch(2)//初始化,等待2个异步结果 var news = emptyList() var ads = emptyList() // 拉取新闻 fun fetchNews() { newsApi.fetchNews().enqueue(object : Callback<List<News>> { override fun onFailure(call: Call<List<News>>, t: Throwable) { countdownLatch.countDown() } override fun onResponse(call: Call<List<News>>, response: Response<List<News>>) { news = response.body().result countdownLatch.countDown() } }) } // 拉取广告 fun fetchAd() { newsApi.fetchAd().enqueue(object : Callback<List<Ad>> { override fun onFailure(call: Call<List<Ad>>, t: Throwable) { countdownLatch.countDown() } override fun onResponse(call: Call<List<Ad>>, response: Response<List<Ad>>) { ads = response.body().result countdownLatch.countDown() } }) } // countdownLatch 在新线程中等待 thread { countdownLatch.await() // 阻塞线程等待两个请求返回 liveData.postValue() // 抛数据到主线程刷刷新界面 }.start()
CountDownLatch 在构造时需传入一个数量,它的语义可以理解为一个计数器。countDown() 将计数器减一,而 await() 会阻塞当前线程直到计数器为 0 才被唤醒。
该计数器是一个 int 值,可能被多线程访问,为了保证线程安全,它被声明为 volatile,并且 countDown() 通过 CAS + 自旋的方式将其减一。
关于 CAS 的介绍可以点击面试题 | 徒手写一个非阻塞线程安全队列 ConcurrentLinkedQueue?。
若新增一个接口,只需要将计数器的值加一,并在新接口返回时调用 countDown() 即可,可维护性陡增。
协程
Kotlin 是降低复杂度的大师,它对于这个问题的解决方案可以让代码看上去更简单。
在 Kotlin 的世界里异步操作应该被定义为suspend
方法,retrofit 就支持这样的操作,比如:
interface NewsApi { @GET("/xxx") suspend fun fetchNews(): List<News> @GET("/xxx") suspend fun fetchAd(): List<Ad> }
然后在协程中使用async
启动异步任务:
scope.launch { // 并发地请求网络 val newsDefered = async { fetchNews() } val adDefered = async { fetchAd() } // 等待两个网络请求返回 val news = newsDefered.await() val ads = adDefered.await() // 刷新界面 refreshUi(news, ads) }
不管是写起来还是读起来,体验都非常好。因为协程把回调干掉了,逻辑不会跳来跳去。
其中的async()
是 CoroutineScope 的扩展方法:
// 启动协程,并返回协程执行结果 public fun <T> CoroutineScope.async( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> T ): Deferred<T> { ... }
async() 和 launch() 唯一的不同是它的返回值是Defered
,用于描述协程体执行的结果:
public interface Deferred<out T> : Job { // 挂起方法: 等待值的计算,但不会阻塞当前线程,计算完成后恢复当前协程执行 public suspend fun await(): T }
调用async()
启动子协程不会挂起外层协程,而是立即返回一个Deferred
对象,直到调用Deferred.await()
协程的执行才会被挂起。当协程在多个Deferred
对象上被挂起时,只有当它们都恢复后,协程才继续执行。这样就实现了“等待多个并行的异步结果”。
但这样写会问题:当广告拉取抛出异常时,新闻拉取也会被取消。
这是协程的一个默认设定,叫结构化并发,即并发是有结构性的。
Java 中线程的并发是没有结构的,所以做如下事情很困难:
- 结束一个线程时,如何一并结束它所有的子线程?
- 当某个子线程抛出异常时,如何结束和它同一层级的兄弟线程?
- 父线程如何等待所有子线程结束之后才结束?
之所以会很困难,是因为 Java 中的线程是没有级联关系的。而 Kotlin 通过协程域 CoroutineScope 以及协程上下文 CoroutineContext 实现级联关系。
在协程中启动的子协程会继承父协程的协程上下文,除了其中的 Job,一个新的 Job 会被创建并归属于父协程的子 Job。通过这套机制,协程和子协程之间有了级联关系,就能实现结构化并发。(以后会就结构化并发写一个系列,敬请期待~)
关于 CoroutineContext 内部结构的详细剖析可以点击Kotlin 协程 | CoroutineContext 为什么要设计成 indexed set?
但有些业务场景不需要子任务之间相互关联,比如当前场景,广告加载失败不应该影响新闻的拉取,大不了不展示广告。为此 kotlin 提供了supervisorScope
:
scope.launch { supervisorScope { // 并发地请求网络 val newsDefered = async { fetchNews() } val adDefered = async { fetchAd() } // 等待两个网络请求返回 val news = newsDefered.await() val ads = adDefered.await() // 刷新界面 refreshUi(news, ads) } }
supervisorScope 新建一个协程域继承父亲的协程上下文,但会将其中的 Job 重写为SupervisorJob
,它的特点就是孩子的失败不会影响父亲,也不会影响兄弟。
现在广告和新闻加载互不影响,各自抛异常都不会影响对方。但就目前的业务场景来说,理想情况是这样的:“广告加载失败不应该影响新闻的加载。但新闻加载失败应该取消广告的加载(因为此时广告也没有展示的机会)”
稍改动下代码:
scope.launch { supervisorScope { // 并发地请求网络 val adDefered = async { fetchAd() } val newsDefered = async { fetchNews() } // 当新闻请求抛异常时,取消广告请求 newsDefered.invokeOnCompletion { throwable -> throwable?.let { adDefered.cancel() } } // 等待新闻 val news = try { newsDefered.await() } catch (e: Exception) { emptyList() } // 等待广告 val ads = try { adDefered.await() } catch (e: Exception) { emptyList() } // 刷新界面 refreshUi(news, ads) } }
invokeOnCompletion()
相当于注册了一个回调,在异步任务结束时调用,不管是正常结束还是因异常而结束。在该回调中判断,若新闻因异常而结束则取消广告任务。
因为新闻和广告任务都可能抛出异常,且 async 启动的异步任务是在调用 await() 时才会抛出异常,所以它应该包裹在 try-catch 中。Kotlin 中的 try-catch 是一个表达式,即是有返回值的。这个特性让正常和异常情况的值聚合在一个表达式中。
若不使用 try-catch,程序也不会奔溃,因为 supervisorScope 中异常是不会向上传播的,即子协程的异常不会影响兄弟和父亲。但这样就少了异常情况的处理。
若现有代码都是 Callback 形式的,还能不能享受协程的简洁?
能!Kotlin 提供了suspendCoroutine()
,专门用于将回调风格的代码转换成 suspend 方法,以拉取新闻为例:
// Callback 形式 fun fetchNews() { newsApi.fetchNews().enqueue(object : Callback<List<News>> { override fun onFailure(call: Call<List<News>>, t: Throwable) { ... } override fun onResponse(call: Call<List<News>>, response: Response<List<News>>) { ... } }) } // suspend 形式 suspend fun fetchNews() = suspendCoroutine<List<News>> { continuation -> newsApi.fetchNews().enqueue(object : Callback<List<News>> { override fun onFailure(call: Call<List<News>>, t: Throwable) { continuation.resumeWithException(t) } override fun onResponse(call: Call<List<News>>, response: Response<List<News>>) { continuation.resume(response.body().result) } }) }
其中的Continuation
是剩余的计算,从形式上看,它就是一个回调:
public interface Continuation<in T> { public val context: CoroutineContext public fun resumeWith(result: Result<T>) // 开始剩余的计算 }
每个 suspend 方法被编译成 java 之后,都会在原有方法参数表最后添加一个 Continuation 参数,用于表达这个挂起点之后“剩余的计算”,举个例子:
scope.launch { fun1() // 普通方法 suspendFun1() // 挂起方法 // -------------------------- fun2() // 普通方法 suspendFun2() // 挂起方法 // -------------------------- }
整个协程体中有四个方法,其中两个是挂起方法,每个挂起方法都是一道水平的分割线,分割线下方的代码就是当前执行点相对于整个协程体剩余的计算,这“剩余的计算”会被包装成 Continuation 并作为参数传入挂起方法。所以上述代码翻译成 java 就类似于:
scope.launch { fun1() suspendFun1(new Continuation() { @override public void resumeWith(Result<T> result) { fun2() suspendFun2(new Continuation() { @override public void resumeWith(Result<T> result) { } }) } }) }
所以挂起方法无异于 java 中带回调的方法,它自然不会阻塞当前线程,它只是把协程体中剩下的代码当成回调,该回调会在将来某个时间点被执行。通过这种方式,挂起方法主动让出了 cpu 执行权。
题外话
从业务上讲,将 Callback 方法改造成挂起式可以降低业务复杂度。举个例子:用户可以通过若干动作触发拉取新闻,比如首次进入新闻页、下拉刷新新闻页、上拉加载更多新闻、切换分区。新闻页有一个埋点,当首次展示某分区时,上报此时的新闻。
若没有 suspend 方法,代码应该这样写:
// NewsViewModel.kt fun fetchNews(isFirstLoad: Boolean, isChangeType: Boolean) { newsApi.fetchNews().enqueue(object : Callback<List<News>> { override fun onFailure(call: Call<List<News>>, t: Throwable) { ... } override fun onResponse(call: Call<List<News>>, response: Response<List<News>>) { // 将新闻抛给界面刷新 newsLiveData.value = response.body.result // 只有当首次加载或切换分区时时才埋点 if(isFirstLoad || isChangeType) { reportNews(response.body.result) } } }) } // NewsActivity.kt // 分区切换监听 tab.setOnTabChangeListener { index -> newsViewModel.fetchNews(false, true) } // 首次加载新闻 fun init() { newsViewModel.fetchNews(true, false) } // 下拉刷新 refreshLayout.setOnRefreshListener { newsViewModel.fetchNews(false, false) } // 上拉加载更多 refreshLayout.setOnLoadMoreListener { newsViewModel.fetchNews(false, false) }
因为埋点需要带上新闻列表,所以必须在请求返回之后上报。不同业务场景的拉取接口是同一个,所以只能在统一的 onResponse() 中分类讨论,分类讨论依赖于标记位,不得不为 fetchNews() 添加两个参数。
如果将拉取新闻的接口改成 suspend 方式就能化解这类复杂度:
// NewsViewModel.kt suspend fun fetchNews() = suspendCoroutine<List<News>> { continuation -> newsApi.fetchNews().enqueue(object : Callback<List<News>> { override fun onFailure(call: Call<List<News>>, t: Throwable) { continuation.resumeWithException(t) } override fun onResponse(call: Call<List<News>>, response: Response<List<News>>) { val news = response.body.result newsLiveData.value = news continuation.resume(news) } }) } // NewsActivity.kt fun initNews() { scope.launch { val news = viewModel.fetchNews() reportNews(news) } } fun changeNewsType() { scope.launch { val news = viewModel.fetchNews() reportNews(news) } } fun loadMoreNews() { scope.launch { viewModel.fetchNews() } } fun refreshNews() { scope.launch { viewModel.fetchNews() } } newsViewModel.newsLiveData.observe {news -> showNews(news) }
所有界面的刷新还是走 LiveData,但拉取新闻的方法被改造成挂起之后,也会将新闻列表用类似同步的方式返回,所以可以在相关业务点进行单独埋点。
统计相册加载图片耗时
再通过一个更高并发数的场景比对下各个方案代码上的差异,场景如下:
测试并发加载 20 张网络图片的总耗时。该场景下已经无法使用布尔值,因为并发数太多。
CountdownLatch
var start = SystemClock.elapsedRealtime() var imageUrls = listOf(...) val countdownLatch = CountDownLatch(imageUrls.size) // 另起线程等待 CountDownLatch 并输出耗时 scope.launch(Dispatchers.IO) { countdownLatch.await() Log.d( "test", "time-consume=${SystemClock.elapsedRealtime() - start}" ) } // 遍历 20 张图片 url imageUrls.forEach { img -> ImageView {// 动态构建 ImageView layout_width = 100 layout_height = 100 Glide.with(this@GlideActivity) .load(img) .listener(object : RequestListener<Drawable> { override fun onLoadFailed( e: GlideException?, model: Any?, target: Target<Drawable>?, isFirstResource: Boolean ): Boolean { countdownLatch.countDown() // 加载完一张 return false } override fun onResourceReady( resource: Drawable?, model: Any?, target: Target<Drawable>?, dataSource: DataSource?, isFirstResource: Boolean ): Boolean { countdownLatch.countDown() // 加载完一张 return false } }) .into(this) } }
协程
var imageUrls = listOf(...) scope.launch { val start = SystemClock.elapsedRealtime() // 将每个 url 都变换为一个 Defered val defers = imageUrls.map { img -> val imageView = ImageView { layout_width = 100 layout_height = 100 } async { imageView.loadImage(img) } } defers.awaitAll()//等待所有的异步任务结束 Log.d( "test", "time-consume=${SystemClock.elapsedRealtime() - start}" ) } // 将 Callback 方式的加载转换为挂起方式 private suspend fun ImageView.loadImage(img: String) = suspendCoroutine<String> { continuation -> Glide.with(this@GlideActivity) .load(img) .listener(object : RequestListener<Drawable> { override fun onLoadFailed( e: GlideException?, model: Any?, target: Target<Drawable>?, isFirstResource: Boolean ): Boolean { continuation.resume("") return false } override fun onResourceReady( resource: Drawable?, model: Any?, target: Target<Drawable>?, dataSource: DataSource?, isFirstResource: Boolean ): Boolean { continuation.resume("") return false } }) .into(this) }
你更喜欢哪种方式?