flow方法:
public fun <T> flow(@BuilderInference block:suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
public interface FlowCollector<in T> { public suspend fun emit(value: T) }
会将flow传入的方法封装成一个FlowCollector的扩展函数,因此在flow代码块中使用emit是自然地。
第二个代码块中观察到FlowCollector的泛型是通过emit来推导的,这也就是为什么emit方法传入不同的类型flow所构造的FlowCollector的类型也不同。
总结:flow方法只是将传入的方法扩展成了一个FlowCollector的扩展函数,并可共享已有的emit方法。
collect方法:
public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit = collect(object : FlowCollector<T> { override suspend fun emit(value: T) = action(value) })
可以看到调用Flow的扩展函数collect时会手动构建一个FlowCollector,并重写emit方法的逻辑为执行collect中的代码块。
接下来看Flow的collect成员方法接收到FlowCollector对象后做什么处理。
public interface Flow<out T> { @InternalCoroutinesApi public suspend fun collect(collector: FlowCollector<T>) }
收集collect的具体行为默认是通过具体的flow构建时构造出来的。如默认上文构造出来的是SafeFlow,collect收集行为被封装在AbstractFlow中。
先看下AbstractFlow:
abstract class AbstractFlow<T> : Flow<T> { public final override suspend fun collect(collector: FlowCollector<T>) { val safeCollector = SafeCollector(collector, coroutineContext) try { collectSafely(safeCollector) } finally { safeCollector.releaseIntercepted() } } public abstract suspend fun collectSafely(collector: FlowCollector<T>)
可以看到构建了一个SafeCollector调用collectSafely将改参数传进去。
SafeCollector会保存协程上下文(为了之后防止再次创建续体导致的浪费)和collect方法传进来的FlowCollector。
重写collectSafely方法的类也就是最上面的SafeFlow做的事情:
SafeFlow:
// Named anonymous object private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() { override suspend fun collectSafely(collector: FlowCollector<T>) { collector.block() } }
可以看到最后是用SafeCollector调用的flow里面的代码块
注:这个collector就是collect中创建的SafeCollector而block方法则是之前flow中创建的扩展函数(也就是flow代码块)。
所以这就是为什么是冷流的原因,只有调用collect才会构建这个SafeCollector对象并调用flow传进来的方法(flow代码块会添加到FlowCollector的扩展函数中,为了之后SafaCollector调用block)
到此flow代码块开始运行了,flow中的调用者this即为collect中创建的SafeCollector对象
SafeCollector中的emit方法
接下来看看SafeCollector中的emit方法:
override suspend fun emit(value: T) { //该方法中会获取当前协程的续体,当执行不需要挂起时(不返回SUSPEND关键字), //会直接运行resumeWith并且不会执行拦截器,这也是为什么不能够在 //flow中切换上下文的原因,不会执行intercept切换线程 return suspendCoroutineUninterceptedOrReturn sc@{ uCont -> try { //调用第二个emit方法 emit(uCont, value) } catch (e: Throwable) { // Save the fact that exception from emit (or even check context) has been thrown lastEmissionContext = DownstreamExceptionElement(e) //emit throw e } } } //带续体参数的emit方法: private fun emit(uCont: Continuation<Unit>, value: T): Any? { val currentContext = uCont.context currentContext.ensureActive() // This check is triggered once per flow on happy path. val previousContext = lastEmissionContext if (previousContext !== currentContext) { checkContext(currentContext, previousContext, value) } completion = uCont //最终会调用到emitfun中。 return emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>) }
再来看下emitfun:
读者先记下这三个变量具体含义:
collect为FlowCollector对象(该对象的emit方法实现是执行collect代码块),value为emit的参数,最后一个代表续体
private val emitFun = FlowCollector<Any?>::emit as Function3<FlowCollector<Any?>, Any?, Continuation<Unit>, Any?>
这里科普下Kotlin中的::语法 和 表达式:
Class::函数名/属性名 代表获取这个class的函数引用和属性引用。属性引用还好理解可以针对这个属性进行指定赋值/取值操作(静态属性不需要具体对象,成员属性需要具体的对象)。函数引用代表持有了这个方法的引用可以调用这个函数,由于是函数不区分对象。
=表达式 对应于上面的表达式则为emitfun返回了一个经强转后的方法。也就是emitfun返回的是一个方法,当调用表达式时该方法的invoke会自动执行(后面就会看到了)
可以看到将这个FlowCollector<Any?>的emit方法强转为了Function3<FlowCollector<Any?>, Any?, Continuation, Any?>方法,最直观的感受就是将原来只接受一个value的emit方法强转成了一个FlowCollector,value,和续体的方法。
看下反编译代码
private static final Function3 emitFun = (Function3)TypeIntrinsics.beforeCheckcastToFunctionOfArity(new Function3() { // $FF: synthetic method // $FF: bridge method public Object invoke(Object var1, Object var2, Object var3) { return this.invoke((FlowCollector)var1, var2, (Continuation)var3); } @Nullable public final Object invoke(@NotNull FlowCollector p1, @Nullable Object p2, @NotNull Continuation continuation) { InlineMarker.mark(0); Object var10000 = p1.emit(p2, continuation); InlineMarker.mark(2); InlineMarker.mark(1); return var10000; } public final KDeclarationContainer getOwner() { return Reflection.getOrCreateKotlinClass(FlowCollector.class); } public final String getName() { return "emit"; } public final String getSignature() { return "emit(Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;"; } }, 3);
讲到这里读者应该已经猜到了
参数讲解:
1.p1:对应于调用collect方法所构建的FlowCollector对象,重写emit方法执行collect代码块内容
2.p2:emit方法的参数,对应于发送的值
3.continuation:当前协程的续体
流程讲解:
1.当调用emifun表达式时,表达式所构建的Function3方法的invoke方法将会被调用
2.会调用到emit方法,该方法最终会调用collect代码块的内容也就是action方法,并把emit的参数传入。
至此,emit每调用一次,都会执行一次collect方法。
总结
flow方法的主要作用是将传入的方法参数变成FlowCollector的扩展函数。
collect方法首先会创建一个FlowCollector对象并重写其emit方法的逻辑用于执行传入的方法参数;
接着还会创建一个SafaCollector对象保存刚创建的重写emit方法的FlowCollector对象;
由于flow中的代码块其实是FlowCollector的扩展函数,所以会利用SafaCollector去调用扩展函数,从这里面可以得出一个结论flow中的this即为SafaCollector对象。
emit方法在SafaCollector中最终会执行collect函数