Kotlin Coroutines Flow 系列(四) 线程操作

简介: Kotlin Coroutines Flow 系列(四) 线程操作

七. Flow 线程操作



7.1 更为简化的线程切换


相对于 RxJava 多线程的学习曲线,Flow 对线程的切换友好地多。


在之前的 Kotlin Coroutines Flow 系列(一) Flow 基本使用 一文中曾经介绍过 Flow 的切换线程,以及 flowOn 操作符。


Flow 只需使用 flowOn 操作符,而不必像 RxJava 需要去深入理解 observeOn、subscribeOn 之间的区别。


7.2 flowOn VS RxJava 的 observeOn


RxJava 的 observeOn 操作符,接收一个 Scheduler 参数,用来指定下游操作运行在特定的线程调度器 Scheduler 上。


Flow 的 flowOn 操作符,接收一个 CoroutineContext 参数,影响的是上游的操作。

例如:

fun main() = runBlocking {
    flow {
        for (i in 1..5) {
            delay(100)
            emit(i)
        }
    }.map {
            it * it
        }.flowOn(Dispatchers.IO)
        .collect {
            println("${Thread.currentThread().name}: $it")
        }
}


flow builder 和 map 操作符都会受到flowOn的影响,并使用 Dispatchers.io 线程池。

再例如:

val customerDispatcher = Executors.newFixedThreadPool(5).asCoroutineDispatcher()
fun main() = runBlocking {
    flow {
        for (i in 1..5) {
            delay(100)
            emit(i)
        }
    }.map {
            it * it
        }.flowOn(Dispatchers.IO)
        .map {
            it+1
        }
        .flowOn(customerDispatcher)
        .collect {
            println("${Thread.currentThread().name}: $it")
        }
}


flow builder  和两个 map 操作符都会受到两个flowOn的影响,其中 flow builder  和第一个 map 操作符跟上面的例子一样,第二个 map 操作符会切换到指定的 customerDispatcher 线程池。


7.3 buffer 实现并发操作


Kotlin Coroutines Flow 系列(二) Flow VS RxJava2 一文中,曾介绍  buffer 操作符对应 RxJava Backpressure 中的 BUFFER 策略。


事实上 buffer 操作符也可以并发地执行任务,它是除了使用 flowOn 操作符之外的另一种方式,只是不能显示地指定 Dispatchers。


例如:

fun main() = runBlocking {
    val time = measureTimeMillis {
        flow {
            for (i in 1..5) {
                delay(100)
                emit(i)
            }
        }
        .buffer()
        .collect { value ->
            delay(300)
            println(value)
        }
    }
    println("Collected in $time ms")
}


执行结果:

1
2
3
4
5
Collected in 1676 ms


在上述例子中,所有的 delay 所花费的时间是2000ms。然而通过 buffer 操作符并发地执行 emit,再顺序地执行 collect 函数后,所花费的时间在 1700ms 左右。


如果去掉 buffer 操作符。

fun main() = runBlocking {
    val time = measureTimeMillis {
        flow {
            for (i in 1..5) {
                delay(100)
                emit(i)
            }
        }
        .collect { value ->
            delay(300)
            println(value)
        }
    }
    println("Collected in $time ms")
}


执行结果:

1
2
3
4
5
Collected in 2039 ms


所花费的时间比刚才多了300多ms。


7.4 并行操作


在讲解并行操作之前,先来了解一下并发和并行的区别。


并发(concurrency):是指一个处理器同时处理多个任务。

并行(parallelism):是多个处理器或者是多核的处理器同时处理多个不同的任务。并行是同时发生的多个并发事件,具有并发的含义,而并发则不一定是并行。


RxJava 可以借助 flatMap 操作符实现并行,亦可以使用 ParallelFlowable 类实现并行操作。


下面,以 flatMap 操作符为例实现 RxJava 的并行:

Observable.range(1,100)
                .flatMap(new Function<Integer, ObservableSource<String>>() {
                    @Override
                    public ObservableSource<String> apply(Integer integer) throws Exception {
                        return Observable.just(integer)
                                .subscribeOn(Schedulers.io())
                                .map(new Function<Integer, String>() {
                                    @Override
                                    public String apply(Integer integer) throws Exception {
                                        return integer.toString();
                                    }
                                });
                    }
                })
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String str) throws Exception {
                        System.out.println(str);
                    }
                });


Flow 也有相应的操作符 flatMapMerge 可以实现并行。

fun main() = runBlocking {
    val result = arrayListOf<Int>()
    for (index in 1..100){
        result.add(index)
    }
    result.asFlow()
        .flatMapMerge {
            flow {
                emit(it)
            }
            .flowOn(Dispatchers.IO)
        }
        .collect { println("$it") }
}


总体而言,Flow 相比于 RxJava 更加简洁一些。

相关文章
|
6月前
|
Java 数据库 Android开发
【专栏】Kotlin在Android开发中的多线程优化,包括线程池、协程的使用,任务分解、避免阻塞操作以及资源管理
【4月更文挑战第27天】本文探讨了Kotlin在Android开发中的多线程优化,包括线程池、协程的使用,任务分解、避免阻塞操作以及资源管理。通过案例分析展示了网络请求、图像处理和数据库操作的优化实践。同时,文章指出并发编程的挑战,如性能评估、调试及兼容性问题,并强调了多线程优化对提升应用性能的重要性。开发者应持续学习和探索新的优化策略,以适应移动应用市场的竞争需求。
145 5
|
3月前
|
缓存 数据处理 Android开发
Android经典实战之Kotlin常用的 Flow 操作符
本文介绍 Kotlin 中 `Flow` 的多种实用操作符,包括转换、过滤、聚合等,通过简洁易懂的例子展示了每个操作符的功能,如 `map`、`filter` 和 `fold` 等,帮助开发者更好地理解和运用 `Flow` 来处理异步数据流。
117 4
|
19天前
|
调度 Android开发 开发者
构建高效Android应用:探究Kotlin多线程优化策略
【10月更文挑战第11天】本文探讨了如何在Kotlin中实现高效的多线程方案,特别是在Android应用开发中。通过介绍Kotlin协程的基础知识、异步数据加载的实际案例,以及合理使用不同调度器的方法,帮助开发者提升应用性能和用户体验。
36 4
|
2月前
|
数据处理 开发者 Kotlin
利用Kotlin Flow简化数据流管理
随着移动端应用的复杂化,数据流管理成为一大挑战。Kotlin Flow作为一种基于协程的响应式编程框架,可简化数据流处理并支持背压机制,有效避免应用崩溃。本文通过解答四个常见问题,详细介绍Kotlin Flow的基本概念、创建方法及复杂数据流处理技巧,帮助开发者轻松上手,提升应用性能。
67 16
|
2月前
|
存储 API 数据库
Kotlin协程与Flow的魅力——打造高效数据管道的不二法门!
在现代Android开发中,Kotlin协程与Flow框架助力高效管理异步操作和数据流。协程采用轻量级线程管理,使异步代码保持同步风格,适合I/O密集型任务。Flow则用于处理数据流,支持按需生成数据和自动处理背压。结合两者,可构建复杂数据管道,简化操作流程,提高代码可读性和可维护性。本文通过示例代码详细介绍其应用方法。
38 2
|
2月前
|
数据处理 Kotlin
掌握这项Kotlin技能,让你的数据流管理不再头疼!Flow的秘密你解锁了吗?
【9月更文挑战第12天】随着移动应用发展,数据流管理日益复杂。Kotlin Flow作为一种基于协程的异步数据流处理框架应运而生,它可解耦数据的生产和消费过程,简化数据流管理,并支持背压机制以防应用崩溃。本文通过四个问题解析Kotlin Flow的基础概念、创建方式、复杂数据流处理及背压实现方法,助您轻松掌握这一高效工具,在实际开发中更从容地应对各种数据流挑战,提升应用性能。
47 8
|
2月前
|
数据处理 API 数据库
揭秘Kotlin Flow:迈向响应式编程的黄金钥匙
【9月更文挑战第11天】在现代软件开发中,异步编程与数据处理对于构建高性能应用至关重要。Kotlin Flow作为协程库的一部分,提供了简洁高效的API来处理数据流。本文将通过实例引导你从零开始学习Kotlin Flow,掌握构建响应式应用的方法。Flow是一种冷流,仅在订阅时才开始执行,支持map、filter等操作符,简化数据处理。
41 7
|
2月前
|
存储 数据处理 Kotlin
Kotlin Flow背后的神秘力量:背压、缓冲与合并策略的终极揭秘!
【9月更文挑战第13天】Kotlin Flow 是 Kotlin 协程库中处理异步数据流的强大工具,本文通过对比传统方法,深入探讨 Flow 的背压、缓冲及合并策略。背压通过 `buffer` 函数控制生产者和消费者的速率,避免过载;缓冲则允许数据暂存,使消费者按需消费;合并策略如 `merge`、`combine` 和 `zip` 则帮助处理多数据源的整合。通过这些功能,Flow 能更高效地应对复杂数据处理场景。
67 2
|
2月前
|
移动开发 定位技术 Android开发
「揭秘高效App的秘密武器」:Kotlin Flow携手ViewModel,打造极致响应式UI体验,你不可不知的技术革新!
【9月更文挑战第12天】随着移动开发领域对响应式编程的需求增加,管理应用程序状态变得至关重要。Jetpack Compose 和 Kotlin Flow 的组合提供了一种优雅的方式处理 UI 状态变化,简化了状态管理。本文探讨如何利用 Kotlin Flow 增强 ViewModel 功能,构建简洁强大的响应式 UI。
42 3
|
2月前
|
数据库 Kotlin
Kotlin中的冷流和热流以及如何让Flow停下来
本文介绍了Kotlin中`Flow`的概念及其类型,包括冷流(Cold Flow)、热流`SharedFlow`及具有最新值的`StateFlow`。文中详细描述了每种类型的特性与使用场景,并提供了停止`Flow`的方法,如取消协程、使用操作符过滤及异常处理。通过示例代码展示了如何运用这些概念。
88 2