Kotlin Coroutines Flow 系列(二) Flow VS RxJava2

简介: Kotlin Coroutines Flow 系列(二) Flow VS RxJava2

三. Flow VS Sequences



每一个 Flow 其内部是按照顺序执行的,这一点跟 Sequences 很类似。


Flow 跟 Sequences 之间的区别是 Flow 不会阻塞主线程的运行,而 Sequences 会阻塞主线程的运行。


使用 flow:

fun main() = runBlocking {
    launch {
        for (j in 1..5) {
            delay(100)
            println("I'm not blocked $j")
        }
    }
    flow {
        for (i in 1..5) {
            delay(100)
            emit(i)
        }
    }.collect { println(it) }
    println("Done")
}


执行结果:

1
I'm not blocked 1
2
I'm not blocked 2
3
I'm not blocked 3
4
I'm not blocked 4
5
Done
I'm not blocked 5


使用 sequence:

fun main() = runBlocking {
    launch {
        for (k in 1..5) {
            delay(100)
            println("I'm blocked $k")
        }
    }
    sequence {
        for (i in 1..5) {
            Thread.sleep(100)
            yield(i)
        }
    }.forEach { println(it) }
    println("Done")
}


执行结果:

1
2
3
4
5
Done
I'm blocked 1
I'm blocked 2
I'm blocked 3
I'm blocked 4
I'm blocked 5


由此,可以得出 Flow 在使用各个 suspend 函数时(本例子中使用了collect、emit函数)不会阻塞主线程的运行。


四. Flow VS RxJava



Kotlin 协程库的设计本身也参考了 RxJava ,下图展示了如何从 RxJava 迁移到 Kotlin 协程。(火和冰形象地表示了 Hot、Cold Stream)


image.png

migration from rxjava.jpeg


4.1 Cold Stream


flow 的代码块只有调用 collected() 才开始运行,正如 RxJava 创建的 Observables 只有调用 subscribe() 才开始运行一样。


4.2 Hot Stream


如图上所示,可以借助 Kotlin Channel 来实现 Hot Stream。


4.3. Completion


Flow 完成时(正常或出现异常时),如果需要执行一个操作,它可以通过两种方式完成:imperative、declarative。


4.3.1 imperative


通过使用 try ... finally 实现

fun main() = runBlocking {
    try {
        flow {
            for (i in 1..5) {
                delay(100)
                emit(i)
            }
        }.collect { println(it) }
    } finally {
        println("Done")
    }
}


4.3.2 declarative


通过 onCompletion() 函数实现

fun main() = runBlocking {
    flow {
        for (i in 1..5) {
            delay(100)
            emit(i)
        }
    }.onCompletion { println("Done") }
        .collect { println(it) }
}


4.3.3 onCompleted (借助扩展函数实现)


借助扩展函数可以实现类似 RxJava 的 onCompleted() 功能,只有在正常结束时才会被调用:

fun <T> Flow<T>.onCompleted(action: () -> Unit) = flow {
    collect { value -> emit(value) }
    action()
}


它的使用类似于 onCompletion()

fun <T> Flow<T>.onCompleted(action: () -> Unit) = flow {
    collect { value -> emit(value) }
    action()
}
fun main() = runBlocking {
    flow {
        for (i in 1..5) {
            delay(100)
            emit(i)
        }
    }.onCompleted { println("Completed...") }
        .collect{println(it)}
}


但是假如 Flow 异常结束时,是不会执行 onCompleted() 函数的。


4.4 Backpressure


Backpressure 是响应式编程的功能之一。


RxJava2 Flowable 支持的 Backpressure 策略,包括:


  • MISSING:创建的 Flowable 没有指定背压策略,不会对通过 OnNext 发射的数据做缓存或丢弃处理。
  • ERROR:如果放入 Flowable 的异步缓存池中的数据超限了,则会抛出 MissingBackpressureException 异常。
  • BUFFER:Flowable 的异步缓存池同 Observable 的一样,没有固定大小,可以无限制添加数据,不会抛出 MissingBackpressureException 异常,但会导致 OOM。
  • DROP:如果 Flowable 的异步缓存池满了,会丢掉将要放入缓存池中的数据。
  • LATEST:如果缓存池满了,会丢掉将要放入缓存池中的数据。这一点跟 DROP 策略一样,不同的是,不管缓存池的状态如何,LATEST 策略会将最后一条数据强行放入缓存池中。


而 Flow 的 Backpressure 是通过 suspend 函数实现。


4.4.1 buffer() 对应 BUFFER 策略

fun currTime() = System.currentTimeMillis()
var start: Long = 0
fun main() = runBlocking {
    val time = measureTimeMillis {
        (1..5)
            .asFlow()
            .onStart { start = currTime() }
            .onEach {
                delay(100)
                println("Emit $it (${currTime() - start}ms) ")
            }
            .buffer()
            .collect {
                println("Collect $it starts (${currTime() - start}ms) ")
                delay(500)
                println("Collect $it ends (${currTime() - start}ms) ")
            }
    }
    println("Cost $time ms")
}


执行结果:

Emit 1 (104ms) 
Collect 1 starts (108ms) 
Emit 2 (207ms) 
Emit 3 (309ms) 
Emit 4 (411ms) 
Emit 5 (513ms) 
Collect 1 ends (613ms) 
Collect 2 starts (613ms) 
Collect 2 ends (1114ms) 
Collect 3 starts (1114ms) 
Collect 3 ends (1615ms) 
Collect 4 starts (1615ms) 
Collect 4 ends (2118ms) 
Collect 5 starts (2118ms) 
Collect 5 ends (2622ms) 
Collected in 2689 ms


4.4.2 conflate() 对应 LATEST 策略

fun main() = runBlocking {
    val time = measureTimeMillis {
        (1..5)
            .asFlow()
            .onStart { start = currTime() }
            .onEach {
                delay(100)
                println("Emit $it (${currTime() - start}ms) ")
            }
            .conflate()
            .collect {
                println("Collect $it starts (${currTime() - start}ms) ")
                delay(500)
                println("Collect $it ends (${currTime() - start}ms) ")
            }
    }
    println("Cost $time ms")
}


执行结果:

Emit 1 (106ms) 
Collect 1 starts (110ms) 
Emit 2 (213ms) 
Emit 3 (314ms) 
Emit 4 (419ms) 
Emit 5 (520ms) 
Collect 1 ends (613ms) 
Collect 5 starts (613ms) 
Collect 5 ends (1113ms) 
Cost 1162 ms


4.4.3 DROP 策略


RxJava 的 contributor:David Karnok, 他写了一个kotlin-flow-extensions库,其中包括:FlowOnBackpressureDrop.kt,这个类支持 DROP 策略。

/**
 * Drops items from the upstream when the downstream is not ready to receive them.
 */
@FlowPreview
fun <T> Flow<T>.onBackpressurureDrop() : Flow<T> = FlowOnBackpressureDrop(this)


使用这个库的话,可以通过使用 Flow 的扩展函数 onBackpressurureDrop() 来支持 DROP 策略。

相关文章
|
30天前
|
传感器 Android开发 开发者
构建高效Android应用:Kotlin的协程与Flow
【4月更文挑战第26天】随着移动应用开发的不断进步,开发者寻求更简洁高效的编码方式以应对复杂多变的业务需求。在众多技术方案中,Kotlin语言凭借其简洁性和强大的功能库逐渐成为Android开发的主流选择。特别是Kotlin的协程和Flow这两个特性,它们为处理异步任务和数据流提供了强大而灵活的工具。本文将深入探讨如何通过Kotlin协程和Flow来优化Android应用性能,实现更加流畅的用户体验,并展示在实际开发中的应用实例。
|
存储 缓存 API
Android Kotlin之Flow数据流
`Flow`是`google`官方提供的一套基于`kotlin`协程的响应式编程模型,它与`RxJava`的使用类似,但相比之下`Flow`使用起来更简单,另外`Flow`作用在协程内,可以与协程的生命周期绑定,当协程取消时,`Flow`也会被取消,避免了内存泄漏风险。
669 1
|
22天前
|
移动开发 数据处理 Android开发
构建高效Android应用:Kotlin的协程与Flow的使用
【5月更文挑战第23天】 在移动开发领域,性能优化和异步编程一直是核心议题。随着Kotlin语言在Android开发中的普及,其提供的协程(coroutines)和流式编程(Flow)功能为开发者带来了革命性的工具,以更简洁、高效的方式处理异步任务和数据流。本文将深入探讨Kotlin协程和Flow在Android应用中的实际应用,以及它们如何帮助开发者编写更加响应迅速且不阻塞用户界面的应用程序。我们将通过具体案例分析这两种技术的优势,并展示如何在现有项目中实现这些功能。
|
25天前
|
测试技术 Android开发 开发者
构建高效Android应用:Kotlin协程与Flow的完美融合
【5月更文挑战第20天】 在现代Android开发中,提升应用性能和用户体验是至关重要的任务。Kotlin作为一种现代化的编程语言,以其简洁、安全和易于理解的特点被广泛采用。特别是Kotlin协程和Flow这两个特性,它们为处理异步任务和数据流提供了强大而灵活的工具。通过深入探索Kotlin协程和Flow的结合使用,本文将揭示如何利用这些特性构建更加高效且响应迅速的Android应用。我们将探讨实现细节,以及如何通过这种技术堆栈来优化资源管理和用户界面的流畅度。
|
30天前
|
安全 Swift Android开发
构建移动应用:Swift vs Kotlin —— 两大主流语言的对决
【5月更文挑战第11天】Swift与Kotlin在移动应用开发中各有优势。Swift是iOS开发的首选,以其简洁语法、高性能和类型安全著称;而Kotlin是Android的官方推荐语言,以其与Java的无缝互操作、空安全特性和简洁代码受到青睐。两者在语法简洁性、性能和社区支持上表现优秀,但平台兼容性不同。开发者应根据项目需求和目标平台选择合适的语言。
|
30天前
|
Java Android开发 C++
Kotlin vs Java:选择最佳语言进行安卓开发
【4月更文挑战第13天】Java曾是安卓开发的主流语言,但Kotlin的崛起改变了这一局面。Google在2017年支持Kotlin,引发两者优劣讨论。Java以其成熟稳定、强大生态和跨平台能力占优,但代码冗长、开发效率低和语言特性过时是短板。Kotlin则以简洁语法、空安全设计和高度兼容Java脱颖而出,但社区和生态系统仍在发展中,可能存在学习曲线和性能问题。选择语言应考虑项目需求、团队熟悉度、维护性、性能和生态系统。无论选择哪种,理解其差异并适应新技术至关重要。
|
8月前
|
缓存 API Android开发
Kotlin 学习笔记(七)—— Flow 数据流学习实践指北(三)冷流转热流以及代码实例(下)
Kotlin 学习笔记(七)—— Flow 数据流学习实践指北(三)冷流转热流以及代码实例(下)
93 0
|
8月前
|
缓存 Java Kotlin
Kotlin 学习笔记(七)—— Flow 数据流学习实践指北(三)冷流转热流以及代码实例(上)
Kotlin 学习笔记(七)—— Flow 数据流学习实践指北(三)冷流转热流以及代码实例(上)
68 0
|
8月前
|
存储 缓存 Android开发
Kotlin 学习笔记(六)—— Flow 数据流学习实践指北(二)StateFlow 与 SharedFlow(下)
Kotlin 学习笔记(六)—— Flow 数据流学习实践指北(二)StateFlow 与 SharedFlow(下)
113 0
|
8月前
|
存储 缓存 人工智能
Kotlin 学习笔记(六)—— Flow 数据流学习实践指北(二)StateFlow 与 SharedFlow(上)
Kotlin 学习笔记(六)—— Flow 数据流学习实践指北(二)StateFlow 与 SharedFlow(上)
57 0