flow函数和collect函数浅析

简介: flow函数和collect函数浅析

flow方法:

public fun <T> flow(@BuilderInference block:suspend FlowCollector<T>.() -> Unit):
Flow<T> = SafeFlow(block)
public interface FlowCollector<in T> {
    public suspend fun emit(value: T)
}

会将flow传入的方法封装成一个FlowCollector的扩展函数,因此在flow代码块中使用emit是自然地。

第二个代码块中观察到FlowCollector的泛型是通过emit来推导的,这也就是为什么emit方法传入不同的类型flow所构造的FlowCollector的类型也不同。

总结:flow方法只是将传入的方法扩展成了一个FlowCollector的扩展函数,并可共享已有的emit方法。

collect方法:

public suspend inline fun <T> Flow<T>.collect(crossinline action: 
                suspend (value: T) -> Unit): Unit =
    collect(object : FlowCollector<T> {
        override suspend fun emit(value: T) = action(value)
    })

可以看到调用Flow的扩展函数collect时会手动构建一个FlowCollector,并重写emit方法的逻辑为执行collect中的代码块。

接下来看Flow的collect成员方法接收到FlowCollector对象后做什么处理。

public interface Flow<out T> {
    @InternalCoroutinesApi
    public suspend fun collect(collector: FlowCollector<T>)
}

收集collect的具体行为默认是通过具体的flow构建时构造出来的。如默认上文构造出来的是SafeFlow,collect收集行为被封装在AbstractFlow中。

先看下AbstractFlow:

 abstract class AbstractFlow<T> : Flow<T> {
    public final override suspend fun collect(collector: FlowCollector<T>) {
        val safeCollector = SafeCollector(collector, coroutineContext)
        try {
            collectSafely(safeCollector)
        } finally {
            safeCollector.releaseIntercepted()
        }
    }
    public abstract suspend fun collectSafely(collector: FlowCollector<T>)

可以看到构建了一个SafeCollector调用collectSafely将改参数传进去。

SafeCollector会保存协程上下文(为了之后防止再次创建续体导致的浪费)和collect方法传进来的FlowCollector。

重写collectSafely方法的类也就是最上面的SafeFlow做的事情:

SafeFlow:

// Named anonymous object
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
    override suspend fun collectSafely(collector: FlowCollector<T>) {
        collector.block()
    }
}

可以看到最后是用SafeCollector调用的flow里面的代码块

注:这个collector就是collect中创建的SafeCollector而block方法则是之前flow中创建的扩展函数(也就是flow代码块)

所以这就是为什么是冷流的原因,只有调用collect才会构建这个SafeCollector对象并调用flow传进来的方法(flow代码块会添加到FlowCollector的扩展函数中,为了之后SafaCollector调用block)

到此flow代码块开始运行了,flow中的调用者this即为collect中创建的SafeCollector对象

SafeCollector中的emit方法

接下来看看SafeCollector中的emit方法:

 override suspend fun emit(value: T) {
 //该方法中会获取当前协程的续体,当执行不需要挂起时(不返回SUSPEND关键字),
 //会直接运行resumeWith并且不会执行拦截器,这也是为什么不能够在
 //flow中切换上下文的原因,不会执行intercept切换线程
        return suspendCoroutineUninterceptedOrReturn sc@{ uCont ->
            try {
                //调用第二个emit方法
                emit(uCont, value)
            } catch (e: Throwable) {
                // Save the fact that exception from emit (or even check context) has been thrown
                lastEmissionContext = DownstreamExceptionElement(e)
                //emit
                throw e
            }
        }
    }
//带续体参数的emit方法:
    private fun emit(uCont: Continuation<Unit>, value: T): Any? {
        val currentContext = uCont.context
        currentContext.ensureActive()
        // This check is triggered once per flow on happy path.
        val previousContext = lastEmissionContext
        if (previousContext !== currentContext) {
            checkContext(currentContext, previousContext, value)
        }
        completion = uCont
        //最终会调用到emitfun中。 
        return emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)
    }

再来看下emitfun

读者先记下这三个变量具体含义:

collect为FlowCollector对象(该对象的emit方法实现是执行collect代码块),value为emit的参数,最后一个代表续体

private val emitFun =
    FlowCollector<Any?>::emit as Function3<FlowCollector<Any?>, Any?, Continuation<Unit>, Any?>

这里科普下Kotlin中的::语法 和 表达式

Class::函数名/属性名 代表获取这个class的函数引用和属性引用。属性引用还好理解可以针对这个属性进行指定赋值/取值操作(静态属性不需要具体对象,成员属性需要具体的对象)。函数引用代表持有了这个方法的引用可以调用这个函数,由于是函数不区分对象。

=表达式 对应于上面的表达式则为emitfun返回了一个经强转后的方法。也就是emitfun返回的是一个方法,当调用表达式时该方法的invoke会自动执行(后面就会看到了)

可以看到将这个FlowCollector<Any?>的emit方法强转为了Function3<FlowCollector<Any?>, Any?, Continuation, Any?>方法,最直观的感受就是将原来只接受一个value的emit方法强转成了一个FlowCollector,value,和续体的方法。

看下反编译代码

   private static final Function3 emitFun = (Function3)TypeIntrinsics.beforeCheckcastToFunctionOfArity(new Function3() {
      // $FF: synthetic method
      // $FF: bridge method
      public Object invoke(Object var1, Object var2, Object var3) {
         return this.invoke((FlowCollector)var1, var2, (Continuation)var3);
      }
      @Nullable
      public final Object invoke(@NotNull FlowCollector p1, @Nullable Object p2, @NotNull Continuation continuation) {
         InlineMarker.mark(0);
         Object var10000 = p1.emit(p2, continuation);
         InlineMarker.mark(2);
         InlineMarker.mark(1);
         return var10000;
      }
      public final KDeclarationContainer getOwner() {
         return Reflection.getOrCreateKotlinClass(FlowCollector.class);
      }
      public final String getName() {
         return "emit";
      }
      public final String getSignature() {
         return "emit(Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;";
      }
   }, 3);

讲到这里读者应该已经猜到了

参数讲解:

1.p1:对应于调用collect方法所构建的FlowCollector对象,重写emit方法执行collect代码块内容

2.p2:emit方法的参数,对应于发送的值

3.continuation:当前协程的续体

流程讲解:

1.当调用emifun表达式时,表达式所构建的Function3方法的invoke方法将会被调用

2.会调用到emit方法,该方法最终会调用collect代码块的内容也就是action方法,并把emit的参数传入。

至此,emit每调用一次,都会执行一次collect方法。

总结

flow方法的主要作用是将传入的方法参数变成FlowCollector的扩展函数。

collect方法首先会创建一个FlowCollector对象并重写其emit方法的逻辑用于执行传入的方法参数;

接着还会创建一个SafaCollector对象保存刚创建的重写emit方法的FlowCollector对象;

由于flow中的代码块其实是FlowCollector的扩展函数,所以会利用SafaCollector去调用扩展函数,从这里面可以得出一个结论flow中的this即为SafaCollector对象。

emit方法在SafaCollector中最终会执行collect函数


相关文章
|
缓存 小程序
微信小程序使用vant组件样式不生效的问题
微信小程序使用vant组件样式不生效的问题
1383 0
|
存储 XML 安全
Jetpack DataStore 你总要了解一下吧?
一、DataStore 介绍 DataStore 是 Android Jetpack 中的一个组件,它是一个数据存储的解决方案,跟 SharedPreferences 一样,采用key-value形式存储。 DataStore 保证原子性,一致性,隔离性,持久性。尤其是,它解决了 SharedPreferences API 的设计缺陷。 Jetpack DataStore 是经过改进的新版数据存储解决方案,旨在取代 SharedPreferences,让应用能够以异步、事务方式存储数据。
1155 0
Jetpack DataStore 你总要了解一下吧?
|
监控 NoSQL Java
云服务器Redis集群部署及客户端通过公网IP连接问题
目录 1、配置文件 2、启动服务并创建集群 (1)启动6个Redis服务 (2)通过客户端命令创建集群 3、客户端连接 (1)客户端配置 (2)测试用例 (3)错误日志分析 4、问题解决 (1)查redis.conf配置文件 (2)修改配置文件 (3)重新启动Redis服务并创建集群 5、故障转移期间Lettuce客户端连接问题 (1)测试用例 (2)停掉其中一个master节点,模拟宕机 (3)解决办法 1)更换Redis客户端 2)Lettuce客户端配置Redis集群拓扑刷新
|
9月前
|
供应链 搜索推荐 API
1688榜单商品详细信息API接口的开发、应用与收益
1688作为全球知名B2B电商平台,为企业提供丰富的商品信息和交易机会。随着电商发展,企业对数据需求增长,高效获取与利用数据成为提升竞争力的关键。1688榜单商品详细信息API接口应运而生,帮助企业批量获取商品详情,应用于信息采集、校验、同步与数据分析等领域,显著提高运营效率、优化库存管理、提升个性化推荐精准度,并助力制定更有效的市场策略,降低采购成本,最终提升客户满意度,推动企业可持续发展。
244 3
|
10月前
|
前端开发 开发者
如何理解 package.json 中的 proxy 字段?
`package.json` 中的 `proxy` 字段用于配置代理服务器,帮助前端开发中解决跨域问题及模拟后端响应。其基本概念、使用场景及配置方法将在本文中详细探讨,助力开发者高效调试与测试。
324 4
|
数据库 Kotlin
Kotlin中的冷流和热流以及如何让Flow停下来
本文介绍了Kotlin中`Flow`的概念及其类型,包括冷流(Cold Flow)、热流`SharedFlow`及具有最新值的`StateFlow`。文中详细描述了每种类型的特性与使用场景,并提供了停止`Flow`的方法,如取消协程、使用操作符过滤及异常处理。通过示例代码展示了如何运用这些概念。
384 2
|
12月前
|
数据可视化 搜索推荐 数据挖掘
mplcyberpunk库:探索未来主义风格
mplcyberpunk库:探索未来主义风格
153 1
|
存储 开发框架 前端开发
循序渐进介绍基于CommunityToolkit.Mvvm 和HandyControl的WPF应用端开发(9) -- 实现系统动态菜单的配置和权限分配
循序渐进介绍基于CommunityToolkit.Mvvm 和HandyControl的WPF应用端开发(9) -- 实现系统动态菜单的配置和权限分配
|
安全 Java 编译器
深入浅出:Kotlin 中的空安全机制
【8月更文挑战第31天】
223 0
|
编译器 C++ 开发者
【C++ 泛型编程 进阶篇】:用std::integral_constant和std::is_*系列深入理解模板元编程(二)
【C++ 泛型编程 进阶篇】:用std::integral_constant和std::is_*系列深入理解模板元编程
463 1