面试题 | 异步任务的各种组合方式(一)

简介: 面试题 | 异步任务的各种组合方式(一)

引子


App 开发中,等待多个异步结果的场景很多见,


比如并发地在后台执行若干个运算,待所有运算执行完毕后归总结果。


比如并发地请求若干个接口,待所有结果返回后刷新界面。


比如统计相册页并发加载 20 张图片的耗时。


其实把若干异步任务串行化是最简单的解决办法,即前一个异步任务执行完毕后再执行下一个。但这样就无法利用多核性能,执行时间被拉长,此时的执行总时长 = 所有任务执行时长的和。


若允许任务并发,则执行总时长 = 执行时间最长任务的耗时。时间性能得以优化,但随之而来的一个复杂度是:“如何等待所有异步结果”。


本文会介绍几种解决方案,并将它们运用到不同的业务场景,比对一下哪个方案适用于哪个场景。


等待并发网络请求


布尔值


假设有如下两个网络请求:


// 拉取新闻
fun fetchNews() {
    newsApi.fetchNews().enqueue(object : Callback<List<News>> {
        override fun onFailure(call: Call<List<News>>, t: Throwable) { ... }
        override fun onResponse(call: Call<List<News>>, response: Response<List<News>>) { ... }
    })
}
// 拉取广告
fun fetchAd() {
    newsApi.fetchAd().enqueue(object : Callback<List<Ad>> {
        override fun onFailure(call: Call<List<Ad>>, t: Throwable) { ... }
        override fun onResponse(call: Call<List<Ad>>, response: Response<List<Ad>>) { ... }
    })
}


广告需要按一定规则插入到新闻列表中。


最简单的做法是,先请求新闻,待其返回后再请求广告。显然这会增加用户等待时间。而且会写出这样的代码:


// 拉取新闻
fun fetchNews() {
    newsApi.fetchNews().enqueue(object : Callback<News> {
        override fun onFailure(call: Call<News>, t: Throwable) { ... }
        override fun onResponse(call: Call<News>, response: Response<News>) {
                // 拉取广告
                newsApi.fetchAd().enqueue(object : Callback<Ad> {
                    override fun onFailure(call: Call<Ad>, t: Throwable) { ... }
                    override fun onResponse(call: Call<Ad>, response: Response<Ad>) { ... }
                })
        }
    })
}


嵌套回调,若再加一个接口,回调层次就会再加一层,不能忍。 用户和程序员的体验都不好,得想办法解决。


第一个想到的方案是布尔值:


var isNewsDone = false
var isAdDone = false
var news = emptyList()
var ads = emptyList()
// 拉取新闻
fun fetchNews() {
    newsApi.fetchNews().enqueue(object : Callback<List<News>> {
        override fun onFailure(call: Call<List<News>>, t: Throwable) { 
            isNewsDone = true
            tryRefresh(news, ad)
        }
        override fun onResponse(call: Call<List<News>>, response: Response<List<News>>) { 
            isNewsDone = true
            news = response.body().result
            tryRefresh(news, ad)
        }
    })
}
// 拉取广告
fun fetchAd() {
    newsApi.fetchAd().enqueue(object : Callback<List<Ad>> {
        override fun onFailure(call: Call<List<Ad>>, t: Throwable) { 
            isAdDone = true
            tryRefresh(news, ad)
        }
        override fun onResponse(call: Call<List<Ad>>, response: Response<List<Ad>>) { 
            isAdDone = true
            ads = response.body().result
            tryRefresh(news, ad)
        }
    })
}
// 尝试刷新界面(只有当两个请求都返回时才刷新)
fun tryRefresh(news: List<News>, ads: List<Ad>) {
    if(isNewsDone && isAdDone){ //刷新界面 }
}


设置两个布尔值分别对应两个请求是否返回,并且在每个请求返回时检测两个布尔值,若都为 true 则进行刷新界面。


网络库通常会将请求成功的回调抛到主线程执行,所以这里没有线程安全问题。但如果不是网络请求,而是后台任务,此时需要将布尔值声明为volatile保证其可见性,关于 volatile 更详细的解释可以点击面试题 | 徒手写一个非阻塞线程安全队列 ConcurrentLinkedQueue?


这个方案能解决问题,但只适用于并发请求数量很少的请求,因为每个请求都要声明一个布尔值。而且每增加一个请求都要修改其余请求的代码,可维护性差。


CountdownLatch


更好的方案是CountDownLatch,它是java.util.concurrent包下的一个类,用来等待多个异步结果,用法如下:


val countdownLatch = CountDownLatch(2)//初始化,等待2个异步结果
var news = emptyList()
var ads = emptyList()
// 拉取新闻
fun fetchNews() {
    newsApi.fetchNews().enqueue(object : Callback<List<News>> {
        override fun onFailure(call: Call<List<News>>, t: Throwable) { 
            countdownLatch.countDown()
        }
        override fun onResponse(call: Call<List<News>>, response: Response<List<News>>) { 
            news = response.body().result
            countdownLatch.countDown()
        }
    })
}
// 拉取广告
fun fetchAd() {
    newsApi.fetchAd().enqueue(object : Callback<List<Ad>> {
        override fun onFailure(call: Call<List<Ad>>, t: Throwable) { 
            countdownLatch.countDown()
        }
        override fun onResponse(call: Call<List<Ad>>, response: Response<List<Ad>>) { 
            ads = response.body().result
            countdownLatch.countDown()
        }
    })
}
// countdownLatch 在新线程中等待
thread { 
    countdownLatch.await() // 阻塞线程等待两个请求返回
    liveData.postValue() // 抛数据到主线程刷刷新界面
}.start()


CountDownLatch 在构造时需传入一个数量,它的语义可以理解为一个计数器。countDown() 将计数器减一,而 await() 会阻塞当前线程直到计数器为 0 才被唤醒。


该计数器是一个 int 值,可能被多线程访问,为了保证线程安全,它被声明为 volatile,并且 countDown() 通过 CAS + 自旋的方式将其减一。


关于 CAS 的介绍可以点击面试题 | 徒手写一个非阻塞线程安全队列 ConcurrentLinkedQueue?


若新增一个接口,只需要将计数器的值加一,并在新接口返回时调用 countDown() 即可,可维护性陡增。


协程


Kotlin 是降低复杂度的大师,它对于这个问题的解决方案可以让代码看上去更简单。


在 Kotlin 的世界里异步操作应该被定义为suspend方法,retrofit 就支持这样的操作,比如:


interface NewsApi {
    @GET("/xxx")
    suspend fun fetchNews(): List<News>
    @GET("/xxx")
    suspend fun fetchAd(): List<Ad>
}


然后在协程中使用async启动异步任务:


scope.launch {
    // 并发地请求网络
    val newsDefered = async { fetchNews() }
    val adDefered = async { fetchAd() }
    // 等待两个网络请求返回
    val news = newsDefered.await()
    val ads = adDefered.await()
    // 刷新界面
    refreshUi(news, ads)
}


不管是写起来还是读起来,体验都非常好。因为协程把回调干掉了,逻辑不会跳来跳去。


其中的async()是 CoroutineScope 的扩展方法:


// 启动协程,并返回协程执行结果
public fun <T> CoroutineScope.async(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> T
): Deferred<T> {
    ...
}


async() 和 launch() 唯一的不同是它的返回值是Defered,用于描述协程体执行的结果:


public interface Deferred<out T> : Job {
    // 挂起方法: 等待值的计算,但不会阻塞当前线程,计算完成后恢复当前协程执行
    public suspend fun await(): T
}


调用async()启动子协程不会挂起外层协程,而是立即返回一个Deferred对象,直到调用Deferred.await()协程的执行才会被挂起。当协程在多个Deferred对象上被挂起时,只有当它们都恢复后,协程才继续执行。这样就实现了“等待多个并行的异步结果”。


但这样写会问题:当广告拉取抛出异常时,新闻拉取也会被取消。


这是协程的一个默认设定,叫结构化并发,即并发是有结构性的。


Java 中线程的并发是没有结构的,所以做如下事情很困难:


  1. 结束一个线程时,如何一并结束它所有的子线程?


  1. 当某个子线程抛出异常时,如何结束和它同一层级的兄弟线程?


  1. 父线程如何等待所有子线程结束之后才结束?


之所以会很困难,是因为 Java 中的线程是没有级联关系的。而 Kotlin 通过协程域 CoroutineScope 以及协程上下文 CoroutineContext 实现级联关系。


在协程中启动的子协程会继承父协程的协程上下文,除了其中的 Job,一个新的 Job 会被创建并归属于父协程的子 Job。通过这套机制,协程和子协程之间有了级联关系,就能实现结构化并发。(以后会就结构化并发写一个系列,敬请期待~)


关于 CoroutineContext 内部结构的详细剖析可以点击Kotlin 协程 | CoroutineContext 为什么要设计成 indexed set?


但有些业务场景不需要子任务之间相互关联,比如当前场景,广告加载失败不应该影响新闻的拉取,大不了不展示广告。为此 kotlin 提供了supervisorScope


scope.launch {
    supervisorScope {
        // 并发地请求网络
        val newsDefered = async { fetchNews() }
        val adDefered = async { fetchAd() }
        // 等待两个网络请求返回
        val news = newsDefered.await()
        val ads = adDefered.await()
        // 刷新界面
        refreshUi(news, ads)
    }
}


supervisorScope 新建一个协程域继承父亲的协程上下文,但会将其中的 Job 重写为SupervisorJob,它的特点就是孩子的失败不会影响父亲,也不会影响兄弟。


现在广告和新闻加载互不影响,各自抛异常都不会影响对方。但就目前的业务场景来说,理想情况是这样的:“广告加载失败不应该影响新闻的加载。但新闻加载失败应该取消广告的加载(因为此时广告也没有展示的机会)”


稍改动下代码:


scope.launch {
    supervisorScope {
        // 并发地请求网络
        val adDefered = async { fetchAd() }
        val newsDefered = async { fetchNews() }
        // 当新闻请求抛异常时,取消广告请求
        newsDefered.invokeOnCompletion { throwable ->
            throwable?.let { adDefered.cancel() }
        }
        // 等待新闻
        val news = try {
            newsDefered.await()
        } catch (e: Exception) {
            emptyList()
        }
        // 等待广告
        val ads = try {
            adDefered.await()
        } catch (e: Exception) {
            emptyList()
        }
        // 刷新界面
        refreshUi(news, ads)
    }
}


invokeOnCompletion()相当于注册了一个回调,在异步任务结束时调用,不管是正常结束还是因异常而结束。在该回调中判断,若新闻因异常而结束则取消广告任务。


因为新闻和广告任务都可能抛出异常,且 async 启动的异步任务是在调用 await() 时才会抛出异常,所以它应该包裹在 try-catch 中。Kotlin 中的 try-catch 是一个表达式,即是有返回值的。这个特性让正常和异常情况的值聚合在一个表达式中。


若不使用 try-catch,程序也不会奔溃,因为 supervisorScope 中异常是不会向上传播的,即子协程的异常不会影响兄弟和父亲。但这样就少了异常情况的处理。


若现有代码都是 Callback 形式的,还能不能享受协程的简洁?


能!Kotlin 提供了suspendCoroutine(),专门用于将回调风格的代码转换成 suspend 方法,以拉取新闻为例:


// Callback 形式
fun fetchNews() {
    newsApi.fetchNews().enqueue(object : Callback<List<News>> {
        override fun onFailure(call: Call<List<News>>, t: Throwable) { ... }
        override fun onResponse(call: Call<List<News>>, response: Response<List<News>>) { ... }
    })
}
// suspend 形式
suspend fun fetchNews() = suspendCoroutine<List<News>> { continuation ->
    newsApi.fetchNews().enqueue(object : Callback<List<News>> {
        override fun onFailure(call: Call<List<News>>, t: Throwable) { 
            continuation.resumeWithException(t)
        }
        override fun onResponse(call: Call<List<News>>, response: Response<List<News>>) { 
            continuation.resume(response.body().result)
        }
    }) 
}


其中的Continuation剩余的计算,从形式上看,它就是一个回调:


public interface Continuation<in T> {
    public val context: CoroutineContext
    public fun resumeWith(result: Result<T>) // 开始剩余的计算
}


每个 suspend 方法被编译成 java 之后,都会在原有方法参数表最后添加一个 Continuation 参数,用于表达这个挂起点之后“剩余的计算”,举个例子:


scope.launch {
    fun1() // 普通方法
    suspendFun1() // 挂起方法 
    // --------------------------
    fun2() // 普通方法
    suspendFun2() // 挂起方法
    // --------------------------
}


整个协程体中有四个方法,其中两个是挂起方法,每个挂起方法都是一道水平的分割线,分割线下方的代码就是当前执行点相对于整个协程体剩余的计算,这“剩余的计算”会被包装成 Continuation 并作为参数传入挂起方法。所以上述代码翻译成 java 就类似于:


scope.launch {
    fun1()
    suspendFun1(new Continuation() {
        @override
        public void resumeWith(Result<T> result) {
            fun2()
            suspendFun2(new Continuation() {
                @override
                public void resumeWith(Result<T> result) {
                }
            })
        }
    })
}


所以挂起方法无异于 java 中带回调的方法,它自然不会阻塞当前线程,它只是把协程体中剩下的代码当成回调,该回调会在将来某个时间点被执行。通过这种方式,挂起方法主动让出了 cpu 执行权。


题外话


从业务上讲,将 Callback 方法改造成挂起式可以降低业务复杂度。举个例子:用户可以通过若干动作触发拉取新闻,比如首次进入新闻页、下拉刷新新闻页、上拉加载更多新闻、切换分区。新闻页有一个埋点,当首次展示某分区时,上报此时的新闻。


若没有 suspend 方法,代码应该这样写:


// NewsViewModel.kt
fun fetchNews(isFirstLoad: Boolean, isChangeType: Boolean) {
    newsApi.fetchNews().enqueue(object : Callback<List<News>> {
        override fun onFailure(call: Call<List<News>>, t: Throwable) { ... }
        override fun onResponse(call: Call<List<News>>, response: Response<List<News>>) { 
            // 将新闻抛给界面刷新
            newsLiveData.value = response.body.result
            // 只有当首次加载或切换分区时时才埋点
            if(isFirstLoad || isChangeType) {
                reportNews(response.body.result)
            }
        }
    })
}
// NewsActivity.kt
// 分区切换监听
tab.setOnTabChangeListener { index ->
    newsViewModel.fetchNews(false, true)
}
// 首次加载新闻
fun init() {
    newsViewModel.fetchNews(true, false)
}
// 下拉刷新
refreshLayout.setOnRefreshListener {
    newsViewModel.fetchNews(false, false)
}
// 上拉加载更多
refreshLayout.setOnLoadMoreListener {
    newsViewModel.fetchNews(false, false)
}


因为埋点需要带上新闻列表,所以必须在请求返回之后上报。不同业务场景的拉取接口是同一个,所以只能在统一的 onResponse() 中分类讨论,分类讨论依赖于标记位,不得不为 fetchNews() 添加两个参数。


如果将拉取新闻的接口改成 suspend 方式就能化解这类复杂度:


// NewsViewModel.kt
suspend fun fetchNews() = suspendCoroutine<List<News>> { continuation ->
    newsApi.fetchNews().enqueue(object : Callback<List<News>> {
        override fun onFailure(call: Call<List<News>>, t: Throwable) { 
            continuation.resumeWithException(t)
        }
        override fun onResponse(call: Call<List<News>>, response: Response<List<News>>) { 
            val news = response.body.result
            newsLiveData.value = news
            continuation.resume(news)
        }
    }) 
}
// NewsActivity.kt
fun initNews() {
    scope.launch {
        val news = viewModel.fetchNews()
        reportNews(news)
    }
}
fun changeNewsType() {
    scope.launch {
        val news = viewModel.fetchNews()
        reportNews(news)
    }
}
fun loadMoreNews() {
    scope.launch { viewModel.fetchNews() }
}
fun refreshNews() {
    scope.launch { viewModel.fetchNews() }
}
newsViewModel.newsLiveData.observe {news ->
    showNews(news)
}


所有界面的刷新还是走 LiveData,但拉取新闻的方法被改造成挂起之后,也会将新闻列表用类似同步的方式返回,所以可以在相关业务点进行单独埋点。


统计相册加载图片耗时


再通过一个更高并发数的场景比对下各个方案代码上的差异,场景如下:


image.png


https://p9-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/4d5a1265bbee4707a9240b825fe5c847~tplv-k3u1fbpfcp-zoom-in-crop-mark:1512:0:0:0.awebp?


测试并发加载 20 张网络图片的总耗时。该场景下已经无法使用布尔值,因为并发数太多。


CountdownLatch


var start = SystemClock.elapsedRealtime()
var imageUrls = listOf(...)
val countdownLatch = CountDownLatch(imageUrls.size)
// 另起线程等待 CountDownLatch 并输出耗时
scope.launch(Dispatchers.IO) {
    countdownLatch.await()
    Log.d( "test", "time-consume=${SystemClock.elapsedRealtime() - start}" )
}
// 遍历 20 张图片 url
imageUrls.forEach { img ->
        ImageView {// 动态构建 ImageView
            layout_width = 100
            layout_height = 100
            Glide.with(this@GlideActivity)
                .load(img)
                .listener(object : RequestListener<Drawable> {
                    override fun onLoadFailed(
                        e: GlideException?,
                        model: Any?,
                        target: Target<Drawable>?,
                        isFirstResource: Boolean
                    ): Boolean {
                        countdownLatch.countDown() // 加载完一张
                        return false
                    }
                    override fun onResourceReady(
                        resource: Drawable?,
                        model: Any?,
                        target: Target<Drawable>?,
                        dataSource: DataSource?,
                        isFirstResource: Boolean
                    ): Boolean {
                        countdownLatch.countDown() // 加载完一张
                        return false
                    }
                })
               .into(this)
        }
}


协程


var imageUrls = listOf(...)
scope.launch {
    val start = SystemClock.elapsedRealtime()
    // 将每个 url 都变换为一个 Defered
    val defers = imageUrls.map { img ->
            val imageView = ImageView {
                layout_width = 100
                layout_height = 100
            }
            async { imageView.loadImage(img) }
    }
    defers.awaitAll()//等待所有的异步任务结束
    Log.d( "test", "time-consume=${SystemClock.elapsedRealtime() - start}" )
}
// 将 Callback 方式的加载转换为挂起方式
private suspend fun ImageView.loadImage(img: String) = suspendCoroutine<String> { continuation ->
    Glide.with(this@GlideActivity)
        .load(img)
        .listener(object : RequestListener<Drawable> {
            override fun onLoadFailed(
                e: GlideException?,
                model: Any?,
                target: Target<Drawable>?,
                isFirstResource: Boolean
            ): Boolean {
                continuation.resume("")
                return false
            }
            override fun onResourceReady(
                resource: Drawable?,
                model: Any?,
                target: Target<Drawable>?,
                dataSource: DataSource?,
                isFirstResource: Boolean
            ): Boolean {
                continuation.resume("")
                return false
            }
        })
        .into(this)
}


你更喜欢哪种方式?


参考


Multiple Concurrent Asynchronous calls using Kotlin coroutines (async-await and suspendCoroutine) | by Priya Sindkar Shah | MindOrks | Medium


目录
相关文章
|
7月前
|
消息中间件 前端开发 Java
美团面试:如何实现线程任务编排?
线程任务编排指的是对多个线程任务按照一定的逻辑顺序或条件进行组织和安排,以实现协同工作、顺序执行或并行执行的一种机制。 ## 1.线程任务编排 VS 线程通讯 有同学可能会想:那线程的任务编排是不是问的就是线程间通讯啊? 线程间通讯我知道了,它的实现方式总共有以下几种方式: 1. Object 类下的 wait()、notify() 和 notifyAll() 方法; 2. Condition 类下的 await()、signal() 和 signalAll() 方法; 3. LockSupport 类下的 park() 和 unpark() 方法。 但是,**线程通讯和线程的任务编排是
73 1
|
2月前
|
NoSQL Java API
美团面试:Redis锁如何续期?Redis锁超时,任务没完怎么办?
在40岁老架构师尼恩的读者交流群中,近期有小伙伴在面试一线互联网企业时遇到了关于Redis分布式锁过期及自动续期的问题。尼恩对此进行了系统化的梳理,介绍了两种核心解决方案:一是通过增加版本号实现乐观锁,二是利用watch dog自动续期机制。后者通过后台线程定期检查锁的状态并在必要时延长锁的过期时间,确保锁不会因超时而意外释放。尼恩还分享了详细的代码实现和原理分析,帮助读者深入理解并掌握这些技术点,以便在面试中自信应对相关问题。更多技术细节和面试准备资料可在尼恩的技术文章和《尼恩Java面试宝典》中获取。
美团面试:Redis锁如何续期?Redis锁超时,任务没完怎么办?
|
2月前
|
Android开发 Kotlin
Android面试题之Kotlin中如何实现串行和并行任务?
本文介绍了 Kotlin 中 `async` 和 `await` 在并发编程中的应用,包括并行与串行任务的处理方法。并通过示例代码展示了如何启动并收集异步任务的结果。
34 0
|
4月前
|
Go 数据库 UED
[go 面试] 同步与异步:程序执行方式的不同之处
[go 面试] 同步与异步:程序执行方式的不同之处
|
5月前
|
设计模式 安全 Java
Java面试题:设计模式如单例模式、工厂模式、观察者模式等在多线程环境下线程安全问题,Java内存模型定义了线程如何与内存交互,包括原子性、可见性、有序性,并发框架提供了更高层次的并发任务处理能力
Java面试题:设计模式如单例模式、工厂模式、观察者模式等在多线程环境下线程安全问题,Java内存模型定义了线程如何与内存交互,包括原子性、可见性、有序性,并发框架提供了更高层次的并发任务处理能力
84 1
|
6月前
|
消息中间件 算法 Java
抖音面试:说说延迟任务的调度算法?
Netty 框架是以性能著称的框架,因此在它的框架中使用了大量提升性能的机制,例如 Netty 用于实现延迟队列的时间轮调度算法就是一个典型的例子。使用时间轮调度算法可以实现海量任务新增和取消任务的时间度为 O(1),那么什么是时间轮调度算法呢?接下来我们一起来看。 ## 1.延迟任务实现 在 Netty 中,我们需要使用 HashedWheelTimer 类来实现延迟任务,例如以下代码: ```java public class DelayTaskExample { public static void main(String[] args) { System.ou
60 5
抖音面试:说说延迟任务的调度算法?
|
5月前
|
设计模式 安全 Java
Java面试题:如何实现一个线程安全的单例模式,并确保其在高并发环境下的内存管理效率?如何使用CyclicBarrier来实现一个多阶段的数据处理任务,确保所有阶段的数据一致性?
Java面试题:如何实现一个线程安全的单例模式,并确保其在高并发环境下的内存管理效率?如何使用CyclicBarrier来实现一个多阶段的数据处理任务,确保所有阶段的数据一致性?
74 0
|
5月前
|
设计模式 SQL 安全
Java面试题:设计一个线程安全的内存管理器,使用观察者模式来通知所有线程内存使用情况的变化。如何确保在添加和移除内存块时的线程安全?如何确保任务的顺序执行和调度器的线程安全?
Java面试题:设计一个线程安全的内存管理器,使用观察者模式来通知所有线程内存使用情况的变化。如何确保在添加和移除内存块时的线程安全?如何确保任务的顺序执行和调度器的线程安全?
43 0
|
5月前
|
设计模式 并行计算 安全
Java面试题:如何使用设计模式优化多线程环境下的资源管理?Java内存模型与并发工具类的协同工作,描述ForkJoinPool的工作机制,并解释其在并行计算中的优势。如何根据任务特性调整线程池参数
Java面试题:如何使用设计模式优化多线程环境下的资源管理?Java内存模型与并发工具类的协同工作,描述ForkJoinPool的工作机制,并解释其在并行计算中的优势。如何根据任务特性调整线程池参数
52 0
|
5月前
|
存储 安全 Java
Java面试题:如何在Java应用中实现有效的内存优化?在多线程环境下,如何确保数据的线程安全?如何设计并实现一个基于ExecutorService的任务处理流程?
Java面试题:如何在Java应用中实现有效的内存优化?在多线程环境下,如何确保数据的线程安全?如何设计并实现一个基于ExecutorService的任务处理流程?
48 0