仿Flow构建器创建数据流

简介: 仿Flow构建器创建数据流


前两篇文章讲了flow,collect和中间操作符map的实现原理及方式,但是仅仅是看还是有点头晕,不得不说这个函数式编程太绕了,所以现在让我们自己定义一个Flow数据流,也是仅实现上述三个方法的功能。

第一步

首先回顾下前面的知识点:

flow提供的只是一个扩展函数返回的是一个保存了这个方法的类实例,并且该类提供emit方法以供flow中调用

构建Flow

flow方法

object Flow {
    fun <T> flow(collect: Collector<T>.() -> Unit): SafeFlowCollector<T> {
        return SafeFlowCollector(collect)
    }
}

定义一个Flow类,内部提供flow方法。

SafeCollector类:

class SafeFlowCollector<T>(val collect: Collector<T>.() -> Unit) {
    //将该Function保存在调用flow后创建的实例中获取实例创建FlowCollector
    fun collectFunction(a: (T) -> Unit) {
        val co = Collector(a)
        co.collect()
    }

Collector类

class Collector<T>(val action: (a: T) -> Unit) {
    fun emit(value: T) {
        action(value)
    }
}

这是flow方法需要创建的三个类,虽然功能不多,但是对于简单的构建流还是绰绰有余的。

分析

可以看到flow方法传入的方法参数collect被定义为了Collector的扩展函数,并且保存在了刚创建的SafeCollector的类中用collect函数表示。

第一个功能:flow参数提交的类型和collect中收到的类型一致,我采用了更加直接的形式定义flow时需要设置传输的类型,在emit和collect中都是对应的类型。

实现

flow定义类型和emit类型保持一致:通过Collector< T >实现

flow定义类型和收集到的类型一致:通过SafeFlowCollector< T >实现

第二步

构建collect收集器

第一步发射器设置好后,我们限制了发送的类型和接受的类型,并且将发送逻辑保存在了实例中。接下来我们需要调用该实例对象以触发发送逻辑,在发送逻辑中还需要调用到我们收集的逻辑。

因此收集逻辑需要单独存放,因此需要单独构建一个类,这个类还必须可以调用到发送逻辑。

注:Flow中采用的是collect收集触发flow流发送逻辑,本人使用时是按照collectFunction定义的。两者逻辑一样只是名称不同

flow要构造成哪个类的构造函数,该类就需要持有collect传入的方法。这也是Collector的功能

SafeFlowCollector和Collector

Collector保存collect传入的方法,flow扩展Collector以使他可以触发发射逻辑也就是flow传入的参数。

class SafeFlowCollector<T>(val collect: Collector<T>.() -> Unit)  //扩展函数
//发送逻辑emit,这个action是从哪里来的呢?
class Collector<T>(val action: (a: T) -> Unit) {
    fun emit(value: T) {
        action(value)
    }
}

再解collectFunction方法:

上面说到action方法,接着看:

fun collectFunction(a: (T) -> Unit) {
    //可以看到调用collectFunction方法将传入的方法参数保存到了Collector中也就是action方法
    val co = Collector(a)
    //触发collect,collect也就是flow中传入的方法。
    co.collect()
}

原生flow,collect的demo:

注:将上述三个类拷贝到您的项目中即可调用

Flow.flow<String> {
    Log.i(TAG, "emit before")
    emit("1")
    Log.i(TAG, "emit after")
}.collectFunction {
    Log.i(TAG, "collectionFunction is : $it")
}

总结

因此可以看到调用collectFunction方法会调用到flow传入的方法中,在flow传入的方法中调用emit又会执行collectFunction传入的方法。

ps:collectFunction类比于原生Flow中的collect方法即可

扩展中间转换符

flow和collect我们支持了,现在我们来扩展转换操作符。这里还是简单实现,原生Flow的实现看的很绕。

map操作符

map方法接受的是一个方法。并且该方法的参数是原数据,经过转换后返回的值是collect接受的值。

首先我们需要确定几个点:

1、map的参数如何确定?

map的参数即上一个Flow emit的值,而在我们这个里面emit的值是通过flow< T >指定的,所以参数直接写T就可以。

2.map的返回值如何确定?

map的返回值要经过两个阶段,收到上一个flow发送的值调用转换函数把值传入得到结果,因此map中最后一行即为返回值。

3.支持链式调用map

map之后还需要再次经历map或者collectFunction方法,因此他返回的也必须是一个flowSafeCollector。参数是map传入方法的返回值。

确定好这些来实现,由于需要调用到上一个flow的collect方法,原生中是扩展函数this即代表上一个Flow。本demo异曲同工,直接定义为该类的函数。于是实现如下,修改SafeFlowCollector类即可:

class SafeFlowCollector<T>(val collect: Collector<T>.() -> Unit) {
    fun collectFunction(a: (T) -> Unit) {
        val co = Collector(a)
        co.collect()
    }
    //对比发现只是扩展了这个map方法
    fun <R> map(mapFunction: (T) -> R): SafeFlowCollector<R> {
                //外层调用经map转换后的collectionFunction会走到flow里面
        return Flow.flow {
                //this@SafeFlowCollector是为了保证调用到调用map函数的flow触发这个flow的收集
                this@SafeFlowCollector.collectFunction {
                    //当最内层flow函数调用emit,此collectFunction会收到回调,进行转换后调用flow返回给最外层的collectFunction方法。
                    emit(mapFunction(it))
                }
        }
    }
}

读者可以直接将上面的SafeFlowCollector换成这个即可。

可支持map转换Flow的demo

Flow.flow<String> {
    emit("2")
}.map {
    it.toInt()
}.map {
    it.toString()
}.collectFunction {
    Log.i(TAG, "onCreate: it   $it")
}

最后还是放出这张图片:

扩展中间操作符

还是像map一样,直接把该方法添加到SafeFlowCollector类中即可

//新增扩展zip操作流,用于多个流发射,就近原则谁先返回用谁
//tips1:收集器类型及·返回流类型,由于返回值相同。因此复用调用方流的泛型即可
//2:开启收集后触发多个流的收集,利用标志位进行判断是否发射。目前采用这种方式。缺点:流发射后应该关闭但是此处只是限制了流的发射逻辑
fun zip(twoFlow: SafeFlowCollector<T>): SafeFlowCollector<T> {
    return Flow.flow<T> {
        var a = false //通过定义condition判断,不支持协程需要手动切换线程测试效果
        //开启两个刘的收集,谁先收集到谁先返回
        this@SafeFlowCollector.collectFunction {
            if (!a) {
                a = true
                this.emit(it)
            }
        }
        twoFlow.collectFunction {
            if (!a) {
                a = true
                this.emit(it)
            }
        }
    }
}

该种方案可支持多个Flow发射,zip中将参数变为可变参数即可。SafeCollector中扩展

zip操作符Demo

val flow1 = Flow.flow<Int> {
        emit(2)
}
Flow.flow<String> {
    Thread{
        emit("111")
    }.interrupt()
}.map {
    it.toInt()
}.zip(flow1).collectFunction {
    Log.i(TAG, "onCreate: ZIP操作符下的Flow收集器收集到的数据位 $it")
}

上述方案采用标志位实现不太优雅,有更好的方式可以提出~~

不支持协程~~~,需要调度手动切换线程

总结

转换操作符只是中间操作符的一种,其他中间操作符原理大致都一样。都是经过在封装一次flow然后触发上级flow的收集,最后调用到最里层的flow,调用emit在一层层经过中间操作符处理给到最外层

使用过程中出现问题或有更好的解决方式辛苦在评论区提出,感谢~~


相关文章
|
存储 Shell Linux
【Shell 命令集合 文档编辑】Linux 文本统计 wc命令使用指南
【Shell 命令集合 文档编辑】Linux 文本统计 wc命令使用指南
320 0
|
11月前
|
Java 测试技术 编译器
🎯Java零基础-Switch条件语句详解 🎯
【10月更文挑战第8天】本文收录于「滚雪球学Java」专栏,专业攻坚指数级提升,希望能够助你一臂之力,帮你早日登顶实现财富自由🚀;同时,欢迎大家关注&&收藏&&订阅!持续更新中,up!up!up!!
187 6
|
前端开发 JavaScript
详细说明 BootStrap整合 BootStrap 【整合V3版本的,需要依赖JQuery】
这篇文章详细说明了如何在项目中整合Bootstrap V3版本,包括下载Bootstrap和jQuery、将文件复制到静态资源目录、项目中离线引用这些文件,并提供了完整页面代码示例。
详细说明 BootStrap整合 BootStrap 【整合V3版本的,需要依赖JQuery】
|
11月前
|
iOS开发 开发者 MacOS
在线创建ios打包证书无需mac
这个文件并不一定需要使用mac OS去创建,在苹果开发者中心,生成了cer格式的证书后,导出p12证书这个过程,其实也并不一定需要mac电脑来完成。
142 0
|
消息中间件 存储 负载均衡
"RabbitMQ集群大揭秘!让你的消息传递系统秒变超级英雄,轻松应对亿级并发挑战!"
【8月更文挑战第24天】RabbitMQ是一款基于AMQP的开源消息中间件,以其高可靠性、扩展性和易用性闻名。面对高并发和大数据挑战时,可通过构建集群提升性能。本文深入探讨RabbitMQ集群配置、工作原理,并提供示例代码。集群由多个通过网络连接的节点组成,共享消息队列,确保高可用性和负载均衡。搭建集群需准备多台服务器,安装Erlang和RabbitMQ,并确保节点间通信顺畅。核心步骤包括配置.erlang.cookie文件、使用rabbitmqctl命令加入集群。消息发布至任一节点时,通过集群机制同步至其他节点;消费者可从任一节点获取消息。
173 2
|
小程序 API
uniapp 蓝牙小程序
uniapp 蓝牙小程序
576 0
|
存储 运维 自然语言处理
研发视角:一个需求应该怎么拆解与实现?
研发过程中,开发同学在接到一个需求后,必须要回答两个问题:做什么(WHAT)、怎么做(HOW)。本文就开发与测试在拆解需求时面临的共性问题,结合自己过往的经验,总结的一个实用的方法。本文不讨论技术选型,仅从思考逻辑上总结应该如何拆解与实现一个给定的需求。欢迎讨论。理解需求拆解的关注点以带UI的需求为示例,来看拆解需求过程中的关注点。看下图,停留20秒,思考两个问题:(1)从无到有实现以下需求对应的
75658 10
研发视角:一个需求应该怎么拆解与实现?
|
Java 数据库连接 Apache
JavaWeb基础第二章(Maven项目与MyBatis 的快速入门与配置)
JavaWeb基础第二章(Maven项目与MyBatis 的快速入门与配置)
|
Scala
scala-模式匹配(字符串、数组、元组、集合、类、偏函数)
scala-模式匹配(字符串、数组、元组、集合、类、偏函数)
77 0