Kotlin Coroutines Flow 系列(二) Flow VS RxJava2

简介: Kotlin Coroutines Flow 系列(二) Flow VS RxJava2

三. Flow VS Sequences



每一个 Flow 其内部是按照顺序执行的,这一点跟 Sequences 很类似。


Flow 跟 Sequences 之间的区别是 Flow 不会阻塞主线程的运行,而 Sequences 会阻塞主线程的运行。


使用 flow:

fun main() = runBlocking {
    launch {
        for (j in 1..5) {
            delay(100)
            println("I'm not blocked $j")
        }
    }
    flow {
        for (i in 1..5) {
            delay(100)
            emit(i)
        }
    }.collect { println(it) }
    println("Done")
}


执行结果:

1
I'm not blocked 1
2
I'm not blocked 2
3
I'm not blocked 3
4
I'm not blocked 4
5
Done
I'm not blocked 5


使用 sequence:

fun main() = runBlocking {
    launch {
        for (k in 1..5) {
            delay(100)
            println("I'm blocked $k")
        }
    }
    sequence {
        for (i in 1..5) {
            Thread.sleep(100)
            yield(i)
        }
    }.forEach { println(it) }
    println("Done")
}


执行结果:

1
2
3
4
5
Done
I'm blocked 1
I'm blocked 2
I'm blocked 3
I'm blocked 4
I'm blocked 5


由此,可以得出 Flow 在使用各个 suspend 函数时(本例子中使用了collect、emit函数)不会阻塞主线程的运行。


四. Flow VS RxJava



Kotlin 协程库的设计本身也参考了 RxJava ,下图展示了如何从 RxJava 迁移到 Kotlin 协程。(火和冰形象地表示了 Hot、Cold Stream)


image.png

migration from rxjava.jpeg


4.1 Cold Stream


flow 的代码块只有调用 collected() 才开始运行,正如 RxJava 创建的 Observables 只有调用 subscribe() 才开始运行一样。


4.2 Hot Stream


如图上所示,可以借助 Kotlin Channel 来实现 Hot Stream。


4.3. Completion


Flow 完成时(正常或出现异常时),如果需要执行一个操作,它可以通过两种方式完成:imperative、declarative。


4.3.1 imperative


通过使用 try ... finally 实现

fun main() = runBlocking {
    try {
        flow {
            for (i in 1..5) {
                delay(100)
                emit(i)
            }
        }.collect { println(it) }
    } finally {
        println("Done")
    }
}


4.3.2 declarative


通过 onCompletion() 函数实现

fun main() = runBlocking {
    flow {
        for (i in 1..5) {
            delay(100)
            emit(i)
        }
    }.onCompletion { println("Done") }
        .collect { println(it) }
}


4.3.3 onCompleted (借助扩展函数实现)


借助扩展函数可以实现类似 RxJava 的 onCompleted() 功能,只有在正常结束时才会被调用:

fun <T> Flow<T>.onCompleted(action: () -> Unit) = flow {
    collect { value -> emit(value) }
    action()
}


它的使用类似于 onCompletion()

fun <T> Flow<T>.onCompleted(action: () -> Unit) = flow {
    collect { value -> emit(value) }
    action()
}
fun main() = runBlocking {
    flow {
        for (i in 1..5) {
            delay(100)
            emit(i)
        }
    }.onCompleted { println("Completed...") }
        .collect{println(it)}
}


但是假如 Flow 异常结束时,是不会执行 onCompleted() 函数的。


4.4 Backpressure


Backpressure 是响应式编程的功能之一。


RxJava2 Flowable 支持的 Backpressure 策略,包括:


  • MISSING:创建的 Flowable 没有指定背压策略,不会对通过 OnNext 发射的数据做缓存或丢弃处理。
  • ERROR:如果放入 Flowable 的异步缓存池中的数据超限了,则会抛出 MissingBackpressureException 异常。
  • BUFFER:Flowable 的异步缓存池同 Observable 的一样,没有固定大小,可以无限制添加数据,不会抛出 MissingBackpressureException 异常,但会导致 OOM。
  • DROP:如果 Flowable 的异步缓存池满了,会丢掉将要放入缓存池中的数据。
  • LATEST:如果缓存池满了,会丢掉将要放入缓存池中的数据。这一点跟 DROP 策略一样,不同的是,不管缓存池的状态如何,LATEST 策略会将最后一条数据强行放入缓存池中。


而 Flow 的 Backpressure 是通过 suspend 函数实现。


4.4.1 buffer() 对应 BUFFER 策略

fun currTime() = System.currentTimeMillis()
var start: Long = 0
fun main() = runBlocking {
    val time = measureTimeMillis {
        (1..5)
            .asFlow()
            .onStart { start = currTime() }
            .onEach {
                delay(100)
                println("Emit $it (${currTime() - start}ms) ")
            }
            .buffer()
            .collect {
                println("Collect $it starts (${currTime() - start}ms) ")
                delay(500)
                println("Collect $it ends (${currTime() - start}ms) ")
            }
    }
    println("Cost $time ms")
}


执行结果:

Emit 1 (104ms) 
Collect 1 starts (108ms) 
Emit 2 (207ms) 
Emit 3 (309ms) 
Emit 4 (411ms) 
Emit 5 (513ms) 
Collect 1 ends (613ms) 
Collect 2 starts (613ms) 
Collect 2 ends (1114ms) 
Collect 3 starts (1114ms) 
Collect 3 ends (1615ms) 
Collect 4 starts (1615ms) 
Collect 4 ends (2118ms) 
Collect 5 starts (2118ms) 
Collect 5 ends (2622ms) 
Collected in 2689 ms


4.4.2 conflate() 对应 LATEST 策略

fun main() = runBlocking {
    val time = measureTimeMillis {
        (1..5)
            .asFlow()
            .onStart { start = currTime() }
            .onEach {
                delay(100)
                println("Emit $it (${currTime() - start}ms) ")
            }
            .conflate()
            .collect {
                println("Collect $it starts (${currTime() - start}ms) ")
                delay(500)
                println("Collect $it ends (${currTime() - start}ms) ")
            }
    }
    println("Cost $time ms")
}


执行结果:

Emit 1 (106ms) 
Collect 1 starts (110ms) 
Emit 2 (213ms) 
Emit 3 (314ms) 
Emit 4 (419ms) 
Emit 5 (520ms) 
Collect 1 ends (613ms) 
Collect 5 starts (613ms) 
Collect 5 ends (1113ms) 
Cost 1162 ms


4.4.3 DROP 策略


RxJava 的 contributor:David Karnok, 他写了一个kotlin-flow-extensions库,其中包括:FlowOnBackpressureDrop.kt,这个类支持 DROP 策略。

/**
 * Drops items from the upstream when the downstream is not ready to receive them.
 */
@FlowPreview
fun <T> Flow<T>.onBackpressurureDrop() : Flow<T> = FlowOnBackpressureDrop(this)


使用这个库的话,可以通过使用 Flow 的扩展函数 onBackpressurureDrop() 来支持 DROP 策略。

相关文章
|
8月前
|
传感器 Android开发 开发者
构建高效Android应用:Kotlin的协程与Flow
【4月更文挑战第26天】随着移动应用开发的不断进步,开发者寻求更简洁高效的编码方式以应对复杂多变的业务需求。在众多技术方案中,Kotlin语言凭借其简洁性和强大的功能库逐渐成为Android开发的主流选择。特别是Kotlin的协程和Flow这两个特性,它们为处理异步任务和数据流提供了强大而灵活的工具。本文将深入探讨如何通过Kotlin协程和Flow来优化Android应用性能,实现更加流畅的用户体验,并展示在实际开发中的应用实例。
|
5月前
|
缓存 数据处理 Android开发
Android经典实战之Kotlin常用的 Flow 操作符
本文介绍 Kotlin 中 `Flow` 的多种实用操作符,包括转换、过滤、聚合等,通过简洁易懂的例子展示了每个操作符的功能,如 `map`、`filter` 和 `fold` 等,帮助开发者更好地理解和运用 `Flow` 来处理异步数据流。
172 4
|
4月前
|
数据处理 开发者 Kotlin
利用Kotlin Flow简化数据流管理
随着移动端应用的复杂化,数据流管理成为一大挑战。Kotlin Flow作为一种基于协程的响应式编程框架,可简化数据流处理并支持背压机制,有效避免应用崩溃。本文通过解答四个常见问题,详细介绍Kotlin Flow的基本概念、创建方法及复杂数据流处理技巧,帮助开发者轻松上手,提升应用性能。
75 16
|
4月前
|
存储 API 数据库
Kotlin协程与Flow的魅力——打造高效数据管道的不二法门!
在现代Android开发中,Kotlin协程与Flow框架助力高效管理异步操作和数据流。协程采用轻量级线程管理,使异步代码保持同步风格,适合I/O密集型任务。Flow则用于处理数据流,支持按需生成数据和自动处理背压。结合两者,可构建复杂数据管道,简化操作流程,提高代码可读性和可维护性。本文通过示例代码详细介绍其应用方法。
74 2
|
4月前
|
数据处理 Kotlin
掌握这项Kotlin技能,让你的数据流管理不再头疼!Flow的秘密你解锁了吗?
【9月更文挑战第12天】随着移动应用发展,数据流管理日益复杂。Kotlin Flow作为一种基于协程的异步数据流处理框架应运而生,它可解耦数据的生产和消费过程,简化数据流管理,并支持背压机制以防应用崩溃。本文通过四个问题解析Kotlin Flow的基础概念、创建方式、复杂数据流处理及背压实现方法,助您轻松掌握这一高效工具,在实际开发中更从容地应对各种数据流挑战,提升应用性能。
66 8
|
4月前
|
数据处理 API 数据库
揭秘Kotlin Flow:迈向响应式编程的黄金钥匙
【9月更文挑战第11天】在现代软件开发中,异步编程与数据处理对于构建高性能应用至关重要。Kotlin Flow作为协程库的一部分,提供了简洁高效的API来处理数据流。本文将通过实例引导你从零开始学习Kotlin Flow,掌握构建响应式应用的方法。Flow是一种冷流,仅在订阅时才开始执行,支持map、filter等操作符,简化数据处理。
53 7
|
4月前
|
移动开发 定位技术 Android开发
「揭秘高效App的秘密武器」:Kotlin Flow携手ViewModel,打造极致响应式UI体验,你不可不知的技术革新!
【9月更文挑战第12天】随着移动开发领域对响应式编程的需求增加,管理应用程序状态变得至关重要。Jetpack Compose 和 Kotlin Flow 的组合提供了一种优雅的方式处理 UI 状态变化,简化了状态管理。本文探讨如何利用 Kotlin Flow 增强 ViewModel 功能,构建简洁强大的响应式 UI。
67 3
|
4月前
|
存储 数据处理 Kotlin
Kotlin Flow背后的神秘力量:背压、缓冲与合并策略的终极揭秘!
【9月更文挑战第13天】Kotlin Flow 是 Kotlin 协程库中处理异步数据流的强大工具,本文通过对比传统方法,深入探讨 Flow 的背压、缓冲及合并策略。背压通过 `buffer` 函数控制生产者和消费者的速率,避免过载;缓冲则允许数据暂存,使消费者按需消费;合并策略如 `merge`、`combine` 和 `zip` 则帮助处理多数据源的整合。通过这些功能,Flow 能更高效地应对复杂数据处理场景。
141 2
|
4月前
|
数据库 Kotlin
Kotlin中的冷流和热流以及如何让Flow停下来
本文介绍了Kotlin中`Flow`的概念及其类型,包括冷流(Cold Flow)、热流`SharedFlow`及具有最新值的`StateFlow`。文中详细描述了每种类型的特性与使用场景,并提供了停止`Flow`的方法,如取消协程、使用操作符过滤及异常处理。通过示例代码展示了如何运用这些概念。
143 2
|
4月前
|
API 数据处理 数据库
掌握 Kotlin Flow 的艺术:让无限数据流处理变得优雅且高效 —— 实战教程揭秘如何在数据洪流中保持代码的健壮与灵活
Kotlin Flow 是一个强大的协程 API,专为处理异步数据流设计。它适合处理网络请求数据、监听数据库变化等场景。本文通过示例代码展示如何使用 Kotlin Flow 管理无限流,如实时数据流。首先定义了一个生成无限整数的流 `infiniteNumbers()`,然后结合多种操作符(如 `buffer`、`onEach`、`scan`、`filter`、`takeWhile` 和 `collectLatest`),实现对无限流的优雅处理,例如计算随机数的平均值并在超过阈值时停止接收新数据。这展示了 Flow 在资源管理和逻辑清晰性方面的优势。
84 0