Kotlin 异步 | Flow 限流的应用场景及原理

简介: Kotlin 异步 | Flow 限流的应用场景及原理

异步数据流中的生产者可能会生产过多的数据,而消费者并不需要那么多,所以限流就有用武之地了。App 开发中有一些常见的限流场景,比如搜索框防抖、点击事件防抖、防过度刷新。这一篇就以这三个场景为线索探究一下如何实现及背后的原理


阅读本篇需要了解 Flow 的基础知识。关于这些知识的详细介绍可以点击Kotlin 异步 | Flow 应用场景及原理,现援引如下:


  1. 异步数据流可以理解为一条时间轴上按序产生的数据,它可用于表达多个连续的异步过程


  1. 异步数据流也可以用“生产者/消费者”模型来理解,生产者和消费者之间就好像有一条管道,生产者从管道的一头插入数据,消费者从另一头取数据。因为管道的存在,数据是有序的,遵循先进先出的原则。


  1. Kotlin 中的suspend方法用于表达一个异步过程,而Flow用于表达多连续个异步过程。Flow是冷流,冷流不会发射数据,直到它被收集的那一刻,所以冷流是“声明式的”。


  1. Flow被收集的瞬间,数据开始生产并被发射出去,通过流收集器FlowCollector将其传递给消费者。流和流收集器是成对出现的概念。流是一组按序产生的数据,数据的产生表现为通过流收集器发射数据,在这里流收集器像是流数据容器(虽然它不持有任何一条数据),它定义了如何将数据传递给消费者。


  1. 异步数据流中,生产者和消费者之间可以插入中间消费者。中间消费者建立了流上的拦截并转发机制:新建下游流,它生产数据的方式是通过收集上游数据,并转发到一个带有发射数据能力的 lambda 中。拥有多个中间消费者的流就像“套娃”一样,下游流套在上游流外面。中间消费者通过这种方式拦截了原始数据,就可以对其做任意变换再转发给下游消费者。


  1. 所有能触发收集数据动作的消费者称为终端消费者,它就像点燃鞭炮的星火,使得被若干个中间消费者套娃的流从外向内(从下游到上游)一个个的被收集,最终传导到原始流,触发数据的发射。


  1. 默认情况下,流中生产和消费数据是在同一个线程中进行的。但可以通过flowOn()改变上游流执行的线程,这并不影响下游流所执行的线程。


  1. Flow中生产和消费数据的操作都被包装在用 suspend 修饰的 lambda 中,用协程就可以轻松的实现异步生产,异步消费。


搜索框防抖


“在搜索框中输入内容,然后点击搜索按钮,经过一段等待,搜索结果以列表形式展现”。很久以前的 app 是这样进行搜索的。


现在搜索体验就要好很多了,不需要手动点击搜索按钮,输入内容后,搜索是自动触发。

为了实现这效果就得监听输入框内容的变化:


// 构建监听器
val textWatcher = object : android.text.TextWatcher {
    override fun afterTextChanged(s: Editable?) {}
    override fun beforeTextChanged(text: CharSequence?,start: Int,count: Int,after: Int) {}
    override fun onTextChanged(text: CharSequence?, start: Int, before: Int, count: Int) {
        search(text.toString())
    }
}
// 设置输入框内容监听器
editText.addTextChangedListener(textWatcher)
// 访问网络进行搜索
fun search(key: String) {}


这样实现有一个缺点,会进行多次无效的网络访问。比如搜索“kotlin flow”时,onTextChanged()会被回调 10 次,就触发了 10 次网络请求,而只有最后一次才是有效的。


优化方案也很容易想到,只有在用户停止输入时才进行请求。但并没有这样的回调通知业务层用户已经停止输入。。。


那就只能设置一个超时,即用户多久未输入内容后就判定已停止输入。


但实现起来还挺复杂的:得在每次输入框内容变化后启动超时倒计时,若倒计时归零时输入框内容没有发生新变化,则用输入框当前内容发起请求,否则将倒计时重置,重新开始倒计时。


在需求迭代中,会有时间去实现这么一个复杂的小功能?


还好 Kotlin 的 Flow 替我们封装了这个功能。


用流的思想重新理解上面的场景:输入框是流数据的生产者,其内容每变化一次,就是在流上生产了一个新数据。但并不是每一个数据都需要被消费,所以得做“限流”,即丢弃一切发射间隔过短的数据,直到生产出某个数据之后一段时间内不再有新数据。


Kotlin 预定义了一些限流方法,debounce()就非常契合当前场景。为了使用debounce(),得先把回调转换成流:


// 构建输入框文字变化流
fun EditText.textChangeFlow(): Flow<Editable> = callbackFlow {
    // 构建输入框监听器
    val watcher = object : TextWatcher {
        override fun afterTextChanged(s: Editable?) {} 
        override fun beforeTextChanged( s: CharSequence?, start: Int, count: Int, after: Int ) { }
        // 在文本变化后向流发射数据
        override fun onTextChanged( s: CharSequence?, start: Int, before: Int, count: Int ) { 
            s?.let { offer(it) }
        }
    }
    addTextChangedListener(watcher) // 设置输入框监听器
    awaitClose { removeTextChangedListener(watcher) } // 阻塞以保证流一直运行
}


为 EditText 扩展了一个方法,用于构建一个输入框文字变化流。


其中callbackFlow {}是系统预定义的顶层方法,它用于将回调组织成流。只需要在其内部构建回调实例并注册之,然后在生产数据的回调方法中调用offer()发射数据即可。当前场景中,将输入框每次文字变化作为流数据发射出去。


callbackFlow { lambda }中最后一句awaitClose {}是必不可少的,它阻塞了当前协程,保证流不会结束,即让流一直存活处于等待数据状态,否则 lambda 一执行完毕,流就会关闭。


然后就可以像这样使用:


editText.textChangeFlow() // 构建输入框文字变化流
    .filter { it.isNotEmpty() } // 过滤空内容,避免无效网络请求
    .debounce(300) // 300ms防抖
    .flatMapLatest { searchFlow(it.toString()) } // 新搜索覆盖旧搜索
    .flowOn(Dispatchers.IO) // 让搜索在异步线程中执行
    .onEach { updateUi(it) } // 获取搜索结果并更新界面
    .launchIn(mainScope) // 在主线程收集搜索结果
// 更新界面
fun updateUi(it: List<String>) {}
// 访问网络进行搜索
suspend fun search(key: String): List<String> {}
// 将搜索关键词转换成搜索结果流
fun searchFlow(key: String) = flow { emit(search(key)) }


其中filter()是流的中间消费者:


public inline fun <T> Flow<T>.filter(crossinline predicate: suspend (T) -> Boolean): Flow<T> = transform { value ->
    if (predicate(value)) return@transform emit(value)
}


filter() 利用transform()构建了一个下游流,它会收集上游数据,并且通过predicate过滤之,只有满足条件的数据才会被发射。关于transform()的详细解释可以点击Kotlin 进阶 | 异步数据流 Flow 的使用场景


其中的flatMapLatest()也是中间消费者,flatMap 的意思是将上游流中的一个数据转换成一个新的流,当前场景下即是将 key 通过网络请求转换成搜索结果Flow<List<String>>。lateest 的意思是如果一个新的搜索请求到来时,上一个请求还未返回,则取消之,即总是展示最新输入内容的搜索结果。


flatMapLatest()源码如下:


public inline fun <T, R> Flow<T>.flatMapLatest(@BuilderInference crossinline transform: suspend (value: T) -> Flow<R>): Flow<R> =
    transformLatest { emitAll(transform(it)) }
public fun <T, R> Flow<T>.transformLatest(@BuilderInference transform: suspend FlowCollector<R>.(value: T) -> Unit): Flow<R> =
    ChannelFlowTransformLatest(transform, this)
internal class ChannelFlowTransformLatest<T, R>(
    private val transform: suspend FlowCollector<R>.(value: T) -> Unit,
    flow: Flow<T>,
    context: CoroutineContext = EmptyCoroutineContext,
    capacity: Int = Channel.BUFFERED,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
) : ChannelFlowOperator<T, R>(flow, context, capacity, onBufferOverflow) {
    override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<R> =
        ChannelFlowTransformLatest(transform, flow, context, capacity, onBufferOverflow)
    override suspend fun flowCollect(collector: FlowCollector<R>) {
        assert { collector is SendingCollector }
        flowScope {
            var previousFlow: Job? = null
            // 收集上游数据
            flow.collect { value ->
                // 1. 若新数据到来,则取消上一次
                previousFlow?.apply {
                    cancel(ChildCancelledException())
                    join()
                }
                // 2. 启动协程处理当前数据
                previousFlow = launch(start = CoroutineStart.UNDISPATCHED) {
                    collector.transform(value)
                }
            }
        }
    }
}


在收集数据时,每次都会启动新协程执行数据变换操作,并记录协程的 Job,待下一个数据到来时,取消上一次的 Job。


demo 场景中的launchIn()是一个终端消费者:


// 启动协程并在其中收集数据
public fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job = scope.launch {
    collect() 
}
// 用空收集器收集数据
public suspend fun Flow<*>.collect(): Unit = collect(NopCollector)
// 空收集器是一个不会再向下游发射数据的 FlowCollector
internal object NopCollector : FlowCollector<Any?> {
    override suspend fun emit(value: Any?) {
        // does nothing
    }
}


使用 launchIn() 将启动协程收集数据这一细节隐藏在了内部,所以就可以使外部代码保持简洁的链式调用。下面这两段代码是等价的:


mainScope.launch {
    editText.textChangeFlow() 
        .filter { it.isNotEmpty() } 
        .debounce(300) 
        .flatMapLatest { searchFlow(it.toString()) } 
        .flowOn(Dispatchers.IO) 
        .collect { updateUi(it) }
}
editText.textChangeFlow() 
    .filter { it.isNotEmpty() } 
    .debounce(300) 
    .flatMapLatest { searchFlow(it.toString()) }
    .flowOn(Dispatchers.IO) 
    .onEach { updateUi(it) } 
    .launchIn(mainScope) 


但由于 launchIn() 不会再向下游发射数据,所以它一般配合onEach {}一起使用来完成消费数据。


点击事件防抖


app 中点击事件响应逻辑一般是弹出界面或是网络请求。


如果用飞快的速度连续点击两次,就会弹出两个界面或是请求了两次网络。


为了避免这种情况的方法,需要做点击事件防抖,即在一定时间间隔内只响应第一次点击事件。可以这样实现:


val FAST_CLICK_THRSHOLD = 300
fun View.onDebounceClickListener( block: (T) -> Unit ) {
    // 如果不是快速点击,则响应点击逻辑
    setOnClickListener { if (!it.isFastClick) block() }
}
// 判断是否快速点击
fun View.isFastClick(): Boolean {
    val currentTime = SystemClock.elapsedRealtime()
    if (currentTime - this.triggerTime >= FAST_CLICK_THRSHOLD) {
        this.triggerTime = currentTime
        return false
    } else {
        return true
    }
}
// 记录上次点击时间
private var View.triggerTime: Long
    get() = getTag(R.id.click_trigger) as? Long ?: 0L
    set(value) = setTag(R.id.click_trigger, value)


做了 3 个扩展,完成了点击事件防抖。将每次有效点击的时间保存在 View 的 tag 中,每次点击时都判断当前时间和上次时间差,如果超过阈值则允许点击。


用流的思想重新样理解这个场景:每个点击事件都是流上的新数据。要对流做限流,即发射第一个数据,然后抛弃时间窗口中紧跟其后的所有数据,直到新的时间窗口到来。


很遗憾,Kotlin 未提供系统级实现,但自定义一个也很简单:


fun <T> Flow<T>.throttleFirst(thresholdMillis: Long): Flow<T> = flow {
    var lastTime = 0L // 上次发射数据的时间
    // 收集数据
    collect { upstream ->
        // 当前时间
        val currentTime = System.currentTimeMillis()
        // 时间差超过阈值则发送数据并记录时间
        if (currentTime - lastTime > thresholdMillis) {
            lastTime = currentTime
            emit(upstream)
        }
    }


throttleFirst() 使用flow {}构建了一个下游流并且收集了上游数据,只有当两次数据时间差超过阈值时,才发射数据。


然后将点击事件组织成流:


fun View.clickFlow() = callbackFlow {
    setOnClickListener { offer(Unit) }
    awaitClose { setOnClickListener(null) }
}


就可以像这样使用:


view.clickFlow()
    .throttleFirst(300)
    .onEach { // 点击事件响应 }
    .launchIn(mainScope)


防过度刷新


想象这样一个场景:百万级别的直播间,有一个展示最近加入观众的列表。每个新观众加入,都通过回调 onUserIn(uid: String) 通知,需通过 uid 请求网络拉取用户信息并更新在观众列表中。


对于百万级别的直播间,每一秒可能有成百上千的观众加入,若不做限制,每秒几百上千次的网络访问就很离谱。


产品端给出的限流方案:每一秒钟刷新一次列表,且只展示这一秒内最后加入直播间的那个人。


用流重新理解这个场景:onUserIn() 回调是流数据的生产者。要做限流,即在每个固定时间间隔内,只发射最后的 1 个数据,并丢弃其余的数据。


kotlin 提供了系统级别的实现sample()


// 将回调转换成流
fun userInFlow() = callbackFlow {
    val callback = object : UserCallback() {
        override fun onUserIn(uid: String) { offer(uid) }
    }
    setCallback(callback)
    awaitClose { setCallback(null) }
}
// 观众列表限流
userInFlow()
    .sample(1000)
    .onEach { fetchUser(it) }
    .flowOn(Dispatchers.IO)
    .onEach { updateAudienceList() }
    .launchIn(mainScope)


(这个实现犯了一个和上篇倒计时 Flow 同样的错误,看出来了吗?后续篇章会详细分析)


总结


用异步数据流的思想理解下面这些场景,使得问题求解变得简单:


  1. 搜索框防抖:丢弃一切发射间隔过短的数据,直到生产出某个数据之后一段时间内不再有新数据。


  1. 点击事件防抖:发射第一个数据,然后抛弃时间窗口中紧跟其后的所有数据,直到新的时间窗口到来。


  1. 在每个固定时间间隔内,只发射最后的 n 个数据,并丢弃其余的数据。


可以从两个维度区别上述限流方案:


  1. 发射数据是否有固定时间间隔。


  1. 新的数据是否会导致重启倒计时。


限流方案 固定间隔 重启倒计时
搜索框防抖 false true
点击事件防抖 false false
防过度刷新 true false


  • 只要输入连续不停止,则永远也不会发送数据。所以输入框防抖发射数据是没有固定时间间隔的。搜索框防抖会重启倒计时,而且是每一个新数据的到来都会触发重新倒计时。


  • 只要不发生点击事件,数据就不会发射。所以点击事件防抖发射数据是没有固定时间间隔的。点击事件防抖中,第一个数据产生时,倒计时开始,它并不会因为后续事件的到来而重新倒计时,在倒计时内除第一个数据外的其他数据都被抛弃。


  • 不管有没有新数据,每个固定的时间间隔内都会发射一个新数据,防过度刷新时有固定时间间隔的。


推荐阅读












目录
相关文章
|
8天前
|
数据处理 开发者 Kotlin
利用Kotlin Flow简化数据流管理
随着移动端应用的复杂化,数据流管理成为一大挑战。Kotlin Flow作为一种基于协程的响应式编程框架,可简化数据流处理并支持背压机制,有效避免应用崩溃。本文通过解答四个常见问题,详细介绍Kotlin Flow的基本概念、创建方法及复杂数据流处理技巧,帮助开发者轻松上手,提升应用性能。
36 16
|
1天前
|
存储 API 数据库
Kotlin协程与Flow的魅力——打造高效数据管道的不二法门!
在现代Android开发中,Kotlin协程与Flow框架助力高效管理异步操作和数据流。协程采用轻量级线程管理,使异步代码保持同步风格,适合I/O密集型任务。Flow则用于处理数据流,支持按需生成数据和自动处理背压。结合两者,可构建复杂数据管道,简化操作流程,提高代码可读性和可维护性。本文通过示例代码详细介绍其应用方法。
8 2
|
8天前
|
数据处理 Kotlin
掌握这项Kotlin技能,让你的数据流管理不再头疼!Flow的秘密你解锁了吗?
【9月更文挑战第12天】随着移动应用发展,数据流管理日益复杂。Kotlin Flow作为一种基于协程的异步数据流处理框架应运而生,它可解耦数据的生产和消费过程,简化数据流管理,并支持背压机制以防应用崩溃。本文通过四个问题解析Kotlin Flow的基础概念、创建方式、复杂数据流处理及背压实现方法,助您轻松掌握这一高效工具,在实际开发中更从容地应对各种数据流挑战,提升应用性能。
28 8
|
9天前
|
数据处理 API 数据库
揭秘Kotlin Flow:迈向响应式编程的黄金钥匙
【9月更文挑战第11天】在现代软件开发中,异步编程与数据处理对于构建高性能应用至关重要。Kotlin Flow作为协程库的一部分,提供了简洁高效的API来处理数据流。本文将通过实例引导你从零开始学习Kotlin Flow,掌握构建响应式应用的方法。Flow是一种冷流,仅在订阅时才开始执行,支持map、filter等操作符,简化数据处理。
24 7
|
8天前
|
开发者 Kotlin
揭秘Kotlin协程:如何在异步风暴中稳握错误处理之舵?
【9月更文挑战第12天】本文深入探讨了Kotlin协程框架下的错误处理机制,通过实例分析展示了如何利用`CoroutineExceptionHandler`进行结构化异常处理。文章详细介绍了全局与局部异常处理器的使用方法,并展示了如何在挂起函数中使用`try`表达式优雅地处理异常,以提高程序的健壮性和可维护性。
24 4
|
7天前
|
存储 数据处理 Kotlin
Kotlin Flow背后的神秘力量:背压、缓冲与合并策略的终极揭秘!
【9月更文挑战第13天】Kotlin Flow 是 Kotlin 协程库中处理异步数据流的强大工具,本文通过对比传统方法,深入探讨 Flow 的背压、缓冲及合并策略。背压通过 `buffer` 函数控制生产者和消费者的速率,避免过载;缓冲则允许数据暂存,使消费者按需消费;合并策略如 `merge`、`combine` 和 `zip` 则帮助处理多数据源的整合。通过这些功能,Flow 能更高效地应对复杂数据处理场景。
18 2
|
8天前
|
移动开发 定位技术 Android开发
「揭秘高效App的秘密武器」:Kotlin Flow携手ViewModel,打造极致响应式UI体验,你不可不知的技术革新!
【9月更文挑战第12天】随着移动开发领域对响应式编程的需求增加,管理应用程序状态变得至关重要。Jetpack Compose 和 Kotlin Flow 的组合提供了一种优雅的方式处理 UI 状态变化,简化了状态管理。本文探讨如何利用 Kotlin Flow 增强 ViewModel 功能,构建简洁强大的响应式 UI。
21 3
|
7天前
|
Android开发 开发者 Kotlin
告别AsyncTask:一招教你用Kotlin协程重构Android应用,流畅度飙升的秘密武器
【9月更文挑战第13天】随着Android应用复杂度的增加,有效管理异步任务成为关键。Kotlin协程提供了一种优雅的并发操作处理方式,使异步编程更简单直观。本文通过具体示例介绍如何使用Kotlin协程优化Android应用性能,包括网络数据加载和UI更新。首先需在`build.gradle`中添加coroutines依赖。接着,通过定义挂起函数执行网络请求,并在`ViewModel`中使用`viewModelScope`启动协程,结合`Dispatchers.Main`更新UI,避免内存泄漏。使用协程不仅简化代码,还提升了程序健壮性。
19 1
|
8天前
|
数据库 Kotlin
Kotlin中的冷流和热流以及如何让Flow停下来
本文介绍了Kotlin中`Flow`的概念及其类型,包括冷流(Cold Flow)、热流`SharedFlow`及具有最新值的`StateFlow`。文中详细描述了每种类型的特性与使用场景,并提供了停止`Flow`的方法,如取消协程、使用操作符过滤及异常处理。通过示例代码展示了如何运用这些概念。
17 2
|
14天前
|
设计模式 开发者 UED
深入理解Kotlin中的异步网络请求处理
深入理解Kotlin中的异步网络请求处理