Android 架构之 MVI 初级体 | Flow 替换 LiveData 重构数据链路

简介: Android 架构之 MVI 初级体 | Flow 替换 LiveData 重构数据链路

引子


这也是《如何将业务代码越写越复杂》系列的第二篇。上一篇用实战代码演绎了 Feeds 流场景下,业务层代码如何从 “无架构”“MVP” 再到 “MVVM”,并使用LiveData承载整个数据链路。这一篇尝用 Kotlin Flow 替换掉 LiveData,看看会发生些什么变化及遇到哪些问题。


若对 Kotlin Flow 还很陌生,可先阅读下面两篇文章,这将有助于理解本文:


  1. Kotlin 异步 | Flow 应用场景及原理


  1. Kotlin 异步 | Flow 限流的应用场景及原理


业务场景是这样的:从网络拉取 Feeds 流并持久化在数据库中,以便下次启动时可先展示本地数据,待请求返回后再刷新 Feeds。


使用 LiveData 承载该业务数据链路的架构演进可以点击我是怎么把业务代码越写越复杂的 | MVP - MVVM - Clean Architecture


现援引上一篇的解决方案:


// 实现访问网络和数据库的细节
class NewsRepository(context: Context) {
    // 使用 Retrofit 构建请求访问网络
    private val retrofit = Retrofit.Builder()
            .baseUrl("https://api.apiopen.top")
            .addConverterFactory(MoshiConverterFactory.create())
            // 将返回数据组织成 LiveData
            .addCallAdapterFactory(LiveDataCallAdapterFactory())
            .client(OkHttpClient.Builder().build())
            .build()
    private val newsApi = retrofit.create(NewsApi::class.java)
    private var executor = Executors.newSingleThreadExecutor()
    // 使用 room 访问数据库
    private var newsDatabase = NewsDatabase.getInstance(context)
    private var newsDao = newsDatabase.newsDao()
    // 用于将新闻流传递给上层的 LiveData
    private var newsLiveData = MediatorLiveData<List<News>>()
    fun fetchNewsLiveData(): LiveData<List<News>?> {
        // 从数据库获取新闻
        val localNews = newsDao.queryNews()
        // 从网络获取新闻
        val remoteNews = newsApi.fetchNewsLiveData(
            mapOf("page" to "1", "count" to "4")
        )
        .let {
            Transformations.map(it) { response: ApiResponse<NewsBean>? ->
                when (response) {
                    is ApiSuccessResponse -> {
                        val news = response.body.result
                        // 将网络新闻入库
                        news?.let {executor.submit { newsDao.insertAll(it) }}
                        news
                    }
                    else -> null
                }
            }
        }
        // 将数据库和网络响应的 LiveData 合并
        newsLiveData.addSource(localNews) {newsLiveData.value = it}
        newsLiveData.addSource(remoteNews) {newsLiveData.value = it}
        return newsLiveData
    }
}


这是 Clean Architecture 中的 Repository,它提供数据访问能力,隐藏了访问网络和数据库的细节。


关于 Clean Architecture 的详细解释可以点击我是怎么把业务代码越写越复杂的 | MVP - MVVM - Clean Architecture


为了使用 LiveData 承载整个数据链路,Retrofit 增加了 LiveDataCallAdapterFactory,它使得接口能直接返回 LiveData:


interface NewsApi {
    @POST("/getWangYiNews")
    fun fetchNewsLiveData(
        @FieldMap map:Map<String,String>
    ):LiveData<ApiResponse<NewsBean>>
}


Room 也支持将数据库查询内容 LiveData 化:


@Dao
interface NewsDao {
    @Query("select * from news")
    fun queryNews(): LiveData<List<News>?>
}


网络 & 数据库 Flow 化


数据链路 Flow 化从链路源头开始。


Room 支持以 Flow 形式返回查询结果:


@Dao
interface NewsDao {
    @Query("select * from news")
    fun queryNewsFlow(): Flow<List<News>?>
}


Retrofit 并未支持 Flow 形式的接口返回值,于是在 GitHub 上找了一遍,有是有,但 star 数都很少,不太敢用。正在犹豫之际,看到了下面 retrofit 官方的回复:[Feature Request] Support adapter for Kotlin Coroutine Flow · Issue #3497 · square/retrofit (github.com)


有人提 issue 希望 retrofit 官方支持接口 Flow 化,但作者回复说网络请求返回的是“一个异步结果”而不是“一串异步结果”,所以suspend就够用了。如果想要将接口 Flow 化,可以这样做:


flow {
  emit(getPosts())
}


作者接着说:“如果有机会重写 RxJava 的 call adapter,可能也不会支持接口 Observable 化。”


醍醐灌顶,立马照做:


interface NewsApi {
    @POST("/getWangYiNews")
    suspend fun fetchNews(@FieldMap map:Map<String,String>): NewsBean
}


将接口定义为suspend方法。查询数据库内容也应该这么改:


@Dao
interface NewsDao {
    @Query("select * from news")
    suspend fun queryNewsSuspend(): List<News>
}


其实若将查询数据库的结果定义为 Flow 的话,每当数据库内容发生增删,Flow 的订阅者都会收到通知。相较于“多个异步结果”,当前场景使用“单个异步结果”更合适。


将访问数据库及请求网络在 Repository 中转化成流:


class NewsRepo() {
    // 访问网络的 Flow(冷流:此时并未发生网络请求)
    fun remoteNewsFlow(page: Int, count: Int) = 
        suspend { newApi.fetchNews(mapOf("page" to page, "count" to count)) }
            .asFlow() // 将 suspend 代码块转换成流
            .map { newsBean ->
                if (newsBean.code == 200) {
                    // 请求成功,更新缓存
                    if (!newsBean.result.isNullOrEmpty()) {
                        newsDao.deleteAllNews()
                        newsDao.insertAll(newsBean.result.map { it.toNews() })
                        newsBean.result
                    } else {
                        emptyList()
                    }
                } else {
                    throw Exception(newsBean.message)
                }
            }
    // 访问数据库的 Flow(冷流:此时并未发生数据库查询)
    val localNewsOneShotFlow = flow {
        val news = newsDao.queryNewsSuspend()
        val newsList = news.map { it.convert() }// 将数据库数据统一为网络数据
        emit(newsList)
    }
}


在 Flow 数据链路的场景下,Repository 作为数据链路的起点,提供给上层的是“原始的冷流”。


代码中虽然调用了访问网络和查询数据库的方法,但是它们是被定义在“冷流”中的,若未发生订阅行为,就不会执行。订阅行为通常是在界面中进行。


变换 & 合流


当链路用 LiveData 表达时,访问数据库和网络的操作被定义在一个 Repository 的方法中:


class NewsRepository(context: Context) {
    fun fetchNewsLiveData(): LiveData<List<News>?> {
        // 1.从数据库获取新闻
        val localNews = newsDao.queryNews()
        // 2.从网络获取新闻
        val remoteNews = newsApi.fetchNewsLiveData(mapOf("page" to "1", "count" to "4"))
        // 3.将数据库和网络响应的 LiveData 合并
        newsLiveData.addSource(localNews) {newsLiveData.value = it}
        newsLiveData.addSource(remoteNews) {newsLiveData.value = it}
        return newsLiveData
    }
}


并且它们是串行的,即只有当数据库访问结束后才开始网络请求,最后再将它们通过 MediatorLiveData 合流。


而使用流时,数据库和网络操作被定义在不同的流中,这为它们提供了更灵活的合流方式。


串行合流


串行合流的思路是将多个流组织成“嵌套流”,然后将它们“展平”。


拿 List 举例,List.flat()提供了在列表上的展平操作,flat 即展平,为啥要展平?因为有嵌套,比如List<List<Int>>,即 List 中每个元素还是 List:


val lists = listOf(
    listOf(1,2,3),
    listOf(4,5,6)
)
Log.v("ttaylor","${lists.flatten()}") //[1, 2, 3, 4, 5, 6]
Log.v("ttaylor","${lists.flatMap { it.map { it+1 } }}") //[2, 3, 4, 5, 6, 7]


List.flat() 将两层嵌套结构变成单层结构,而List.flatMap()在展平的同时提供了变换内部 List 的机会。


流也提供了类似的展平方法flattenConcat()


flowOf(
    flow {
        emit(1)
        emit(2)
    },
    flow { emit(3) },
    flow { emit(4) },
).flattenConcat().collect {
    Log.v("ttaylor", "${it}") // 1,2,3,4
}


flattenConcat() 的合流是串行的,即只有消费了前一个流中所有的数据后才会消费后一个流。


在 ViewModel 层对原始数据流进行合流:


// 新闻 ViewModel 持有 repo
class NewsViewModel(private val newsRepo: NewsRepo) : ViewModel() {
    fun newsFlow(type: Int, count: Int) =
        flowOf(newsRepo.localNewsFlow, newsRepo.remoteNewsFlow(type, count))
            .flattenConcat() // 串行合流
            .map { NewsModel(it, false) }
}
// 通过 ViewModelProvider.Factory 定义构建 ViewModel 的细节(注入Repository)
class NewsViewModelFactory(private val newsRepo: NewsRepo) : ViewModelProvider.Factory {
    override fun <T : ViewModel> create(modelClass: Class<T>): T {
        return NewsViewModel(newsRepo) as T
    }
}


在 Repository + Flow 的加持下,ViewModel 变得异常简单,它持有原始数据流并对其进行合流以及变换。


两个原始数据流分别是数据库流和网络流,使用flowOf()将它们组织成Flow<Flow<News>>嵌套结构,然后调用 flattenConcat() 将它们串行合流并展平变成一个流,即先查询数据库,待查询完毕后才请求网络。合流之后还进行了数据变换,以将网络数据转换为界面数据 NewsModel:


data class NewsModel(
    val news: List<News>, // 新闻列表
    val loading: Boolean, // 是否正在加载
    val errorMessage: String = "" // 错误信息
)


将新闻列表进行这样包装的目的是实现“唯一可信数据源”,这是 MVI 的关键词之一。关于它的详细介绍可以点击Android 架构最新进展 | MVI = 响应式编程 + 单向数据流 + 唯一可信数据源(该篇和本文同时发布,若链接无法跳转,可能是还未过审,请稍等~)


并行合流


串行合流中网络请求必须等待数据库查询,若两者能并行,则性能就会更好一点。


flattenMerge()方法就用于多流并发的场景:


class NewsViewModel(private val newsRepo: NewsRepo) : ViewModel() {
    fun newsFlow(type: Int, count: Int) =
        flowOf(newsRepo.localNewsFlow, newsRepo.remoteNewsFlow(type, count))
            .flattenMerge() // 并行合流
            .map { NewsModel(it, false) }
}


此时数据库和网络流会并发启动,性能是好了,但也产生了新问题。


每个流生成的数据会合成到一个流中并通知界面刷新。若数据库流先生成数据,让用户先看到缓存新闻,然后网络流再生成数据,用新数据把老数据刷掉。这个流程是符合预期的。但万一数据库抽风了,比网络还慢咋办?这就会发生老数据刷掉新数据的 bug。


解决方案是:当接收到网络流的数据时,就丢弃流上后面的数据。


在 RxJava 中有一个操作符叫takeUntil()就是用来描述这个场景的。


但 Kotlin Flow 并未提供这个方法。。。于是我开始在网上找。。。直到我发现了这个官方回复:Flow.transformWhile operator · Issue #2065 · Kotlin/kotlinx.cor…


官方说不会提供 takeUntil() 方法。因为 Kotlin Flow 的设计原则是“简单”,只提供必要的和高度灵活性的方法,以便自定义。Kotlin Flow 中以transform开头的方法都是高度灵活的,它们通常用来定义其他操作符。在Kotlin 异步 | Flow 应用场景及原理中分析过Flow.transform()方法的灵活性。现在来看下transformWhile()


public fun <T, R> Flow<T>.transformWhile(
    transform: suspend FlowCollector<R>.(value: T) -> Boolean // 这 lambda 带有数据发射能力
): Flow<R> =
    safeFlow {
        // 进行有条件的转发流数据,条件即是 transform
        return@safeFlow collectWhile { value ->
            transform(value)
        }
    }
// 有条件的收集流数据
internal suspend inline fun <T> Flow<T>.collectWhile(
    crossinline predicate: suspend (value: T) -> Boolean
) {
    // 自定义流收集器,描述如何发射数据
    val collector = object : FlowCollector<T> {
        override suspend fun emit(value: T) {
            // 当满足条件时才发射数据,否则丢弃流往后的数据
            if (!predicate(value)) {
                throw AbortFlowException(this)
            }
        }
    }
    try {
        collect(collector)// 收集上游流并通过自定义的方式转发给下游
    } catch (e: AbortFlowException) {
        e.checkOwnership(collector)
    }
}


transformWhile() 的套路依然是拦截转发机制,即新建下游流,它生产数据的方式是通过收集上游数据,并将数据转发到一个带有发射数据能力的 lambda 中,当前这个 lambda 需要有一个返回值,该值决定了是否要终止上游流数据的生产。


现在的问题转化为,如何让网络流告诉数据库流“我已经生成数据了你歇菜吧~”


“流的通信”,听上去有点高大上,但转念一想,是我把问题想复杂了。因为网络和数据库流已经在 ViewModel 层合流了,它们并成一个流了,流动的是List<News>,在这个数据结构上套一层就能实现所谓的“流通信”:


// 新闻流包装类
data class NewsFlowWrapper(
    val news: List<News>,// 新闻列表
    val abort: Boolean // 是否中断流
)


用 NewsFlowWrapper 改造下 NewsRepo:


class NewsRepo(context: Context) {
    val localNewsFlow = flow {
        val news = newsDao.queryNewsSuspend()
        val newsList = news.map { it.convert() }
        // 使用 NewsFlowWrapper 包装数据库流
        emit(NewsFlowWrapper(newsList, false))
    }
    fun remoteNewsFlow(page: Int, count: Int) = 
        suspend { newApi.fetchNews(mapOf("page" to page, "count" to count)) }
            .asFlow()
            .map { newsBean ->
                if (newsBean.code == 200) {
                    if (!newsBean.result.isNullOrEmpty()) {
                        newsDao.deleteAllNews()
                        newsDao.insertAll(newsBean.result.map { it.toNews() })
                        // 网络请求成功时,中断流
                        NewsFlowWrapper(newsBean.result, true)
                    } else {
                        NewsFlowWrapper(emptyList(), false)
                    }
                } else {
                    throw Exception(newsBean.message)
                }
            }
}


接着用 transformWhile() 改造一下 ViewModel 层的合流:


class NewsViewModel(private val newsRepo: NewsRepo) : ViewModel() {
    fun newsFlow(type: Int, count: Int) =
        flowOf(newsRepo.localNewsFlow, newsRepo.remoteNewsFlow(type, count))
            .flattenMerge()
            .transformWhile {
                emit(it.news)// 总是直接转发上游数据 ,直到 abort 为 true
                !it.abort
            }
            .map { NewsModel(it, false) }
}


就这样自定义了一个新操作符用于流通信。


在讨论到用 Kotlin Flow 取代 RxJava 的时候,有一种声音说“相比 RxJava,Kotlin Flow 的操作符还很匮乏,有待丰富~”。我倒是觉得这是 RxJava 的劣势,Kotlin Flow 的优势。RxJava 让人最望而却步的正是因为复杂性,品种繁多的“流”、琳琅满目的操作符、以及 Rx 版的回调地狱。Kotlin Flow 的策略是简单 + 高灵活性。


这样一来,用 Flow 重构的数据链路上,Repository 和 ViewModel 的界限就很清晰了:Repository 提供原始的数据流,以供 ViewModel 用各种自己喜欢的方式进行合流及变换。


异步化


若直接在界面中收集上述新闻流的话,程序会 crash,提示不能在主线程操作数据库。


所有在流中的操作,默认情况下都是执行在主线程的。


将流中的操作异步化也很简单:


class NewsViewModel(private val newsRepo: NewsRepo) : ViewModel() {
    fun newsFlow(type: Int, count: Int) =
        flowOf(newsRepo.localNewsFlow, newsRepo.remoteNewsFlow(type, count))
            .flattenMerge()
            .transformWhile {
                emit(it.news)
                !it.abort
            }
            .map { NewsModel(it, false) }
            .flowOn(Dispatchers.IO) // 将所有上游操作都分发到 IO 线程执行
}


在 LiveData 承载数据链路的版本中,需自行启动线程池执行数据库操作(网络操作的异步化由OkHttp实现)。


当用 Flow 组织数据库流和网络流时,只需一个方法就能实现异步化,无疑大大地降低了复杂度。


捕获异常


使用catch()可以捕获所有上游抛出的异常:


class NewsViewModel(private val newsRepo: NewsRepo) : ViewModel() {
    fun newsFlow(type: Int, count: Int) =
        flowOf(newsRepo.localNewsOneShotFlow, newsRepo.remoteNewsFlow(type, count))
            .flattenMerge()
            .transformWhile {
                emit(it.news)
                !it.abort
            }
            .map { NewsModel(it, false) }
            .flowOn(Dispatchers.IO)
            .catch {
                // 捕获自定义异常并向流发送消息
                if (it is YourException)
                    emit(NewsModel(emptyList(),false,"network error,show old news"))
            }


灵活的是,在捕获异常之后还可以继续向流发送数据。比如当网络异常时,向界面发送一个带有 errorMessage 的 Model,界面根据此字段决定是否展示错误 toast。也可以在这里处理和服务端约定的特殊错误码。


感知生命周期


流准备地差不多了,下一步就是让界面收集流并刷新:


class NewsActivity : AppCompatActivity() {
    private val newsViewModel by lazy {
        ViewModelProvider(
            this,
            NewsViewModelFactory(NewsRepo(this))
        )[NewsViewModel::class.java]
    }
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        // 收集新闻流并展示
        lifecycleScope.launch { 
            newsViewModel.newsFlow(1, 8).collect { showNews(it) }
        }
    }
    // 这样刷新界面是 MVI 提倡的
    private fun showNews(newsModel: NewsModel) {
        when {
            // 展示 loading
            newsModel.loading -> {
                showLoading()
            }
            newsModel.errorMessage.isEmpty() -> {
                dismissLoading()
                // 将新闻展示在 RecyclerView 上
            }
            // 展示错误提示
            else -> {
                dismissLoading()
                showErrorMessage(newsModel.errorMessage)
            }
        }
    }
}


其中展示/解散 loading 的方法定义如下:


// 展示 loading
fun Activity.showLoading() {
    contentView()?.apply {
        ProgressBar {
            layout_id = "pb"
            layout_width = 50
            layout_height = 50
            layout_gravity = gravity_center
        }
    }
}
// 解散 loading
fun Activity.dismissLoading() {
    val pb = contentView()?.find<ProgressBar>("pb")
    pb?.let { contentView()?.removeView(it) }
}
// 获取 Activity 的 content view
fun Activity.contentView(): FrameLayout? =
    takeIf { !isFinishing && !isDestroyed }?.window?.decorView?.findViewById(android.R.id.content)


展示 loading 即向当前 Activity 的 contentView 添加一个子 View,解散 loading 即是移除该子 View。其中使用了 DSL 声明式地构建了界面,详细介绍可以点击Android性能优化 | 把构建布局用时缩短 20 倍(下)


这样写会有一个坑,若新闻流因为各种原因迟迟未生成新闻列表,此时用户切换到另一个页面,不久后新闻流有数据了,数据被推到界面,就发生了 crash,因为要刷新的界面已不再前台。


目录
相关文章
|
26天前
|
开发框架 前端开发 Android开发
Flutter 与原生模块(Android 和 iOS)之间的通信机制,包括方法调用、事件传递等,分析了通信的必要性、主要方式、数据传递、性能优化及错误处理,并通过实际案例展示了其应用效果,展望了未来的发展趋势
本文深入探讨了 Flutter 与原生模块(Android 和 iOS)之间的通信机制,包括方法调用、事件传递等,分析了通信的必要性、主要方式、数据传递、性能优化及错误处理,并通过实际案例展示了其应用效果,展望了未来的发展趋势。这对于实现高效的跨平台移动应用开发具有重要指导意义。
105 4
|
1月前
|
消息中间件 存储 缓存
十万订单每秒热点数据架构优化实践深度解析
【11月更文挑战第20天】随着互联网技术的飞速发展,电子商务平台在高峰时段需要处理海量订单,这对系统的性能、稳定性和扩展性提出了极高的要求。尤其是在“双十一”、“618”等大型促销活动中,每秒需要处理数万甚至数十万笔订单,这对系统的热点数据处理能力构成了严峻挑战。本文将深入探讨如何优化架构以应对每秒十万订单级别的热点数据处理,从历史背景、功能点、业务场景、底层原理以及使用Java模拟示例等多个维度进行剖析。
53 8
|
1月前
|
存储 分布式计算 数据挖掘
数据架构 ODPS 是什么?
数据架构 ODPS 是什么?
287 7
|
1月前
|
数据采集 搜索推荐 数据管理
数据架构 CDP 是什么?
数据架构 CDP 是什么?
52 2
|
4月前
|
机器学习/深度学习 数据采集 人工智能
揭秘!47页文档拆解苹果智能,从架构、数据到训练和优化
【8月更文挑战第23天】苹果公司发布了一份47页的研究文档,深入解析了其在智能基础语言模型领域的探索与突破。文档揭示了苹果在此领域的雄厚实力,并分享了其独特的混合架构设计,该设计融合了Transformer与RNN的优势,显著提高了模型处理序列数据的效能与表现力。然而,这种架构也带来了诸如权重平衡与资源消耗等挑战。苹果利用海量、多样的高质量数据集训练模型,但确保数据质量及处理噪声仍需克服。此外,苹果采取了自监督与无监督学习相结合的高效训练策略,以增强模型的泛化与稳健性,但仍需解决预训练任务选择及超参数调优等问题。
159 66
|
4月前
|
缓存 数据处理 Android开发
Android经典实战之Kotlin常用的 Flow 操作符
本文介绍 Kotlin 中 `Flow` 的多种实用操作符,包括转换、过滤、聚合等,通过简洁易懂的例子展示了每个操作符的功能,如 `map`、`filter` 和 `fold` 等,帮助开发者更好地理解和运用 `Flow` 来处理异步数据流。
157 4
|
2月前
|
存储 大数据 数据库
Android经典面试题之Intent传递数据大小为什么限制是1M?
在 Android 中,使用 Intent 传递数据时存在约 1MB 的大小限制,这是由于 Binder 机制的事务缓冲区限制、Intent 的设计初衷以及内存消耗和性能问题所致。推荐使用文件存储、SharedPreferences、数据库存储或 ContentProvider 等方式传递大数据。
88 0
|
3月前
|
存储 搜索推荐 数据库
MarkLogic在微服务架构中的应用:提供服务间通信和数据共享的机制
随着微服务架构的发展,服务间通信和数据共享成为关键挑战。本文介绍MarkLogic数据库在微服务架构中的应用,阐述其多模型支持、索引搜索、事务处理及高可用性等优势,以及如何利用MarkLogic实现数据共享、服务间通信、事件驱动架构和数据分析,提升系统的可伸缩性和可靠性。
55 5
|
2月前
|
存储 大数据 数据处理
洞察未来:数据治理中的数据架构新思维
数据治理中的数据架构新思维对于应对未来挑战、提高数据处理效率、加强数据安全与隐私保护以及促进数据驱动的业务创新具有重要意义。企业需要紧跟时代步伐,不断探索和实践新型数据架构,以洞察未来发展趋势,为企业的长远发展奠定坚实基础。
|
3月前
|
Android开发 开发者 Kotlin
告别AsyncTask:一招教你用Kotlin协程重构Android应用,流畅度飙升的秘密武器
【9月更文挑战第13天】随着Android应用复杂度的增加,有效管理异步任务成为关键。Kotlin协程提供了一种优雅的并发操作处理方式,使异步编程更简单直观。本文通过具体示例介绍如何使用Kotlin协程优化Android应用性能,包括网络数据加载和UI更新。首先需在`build.gradle`中添加coroutines依赖。接着,通过定义挂起函数执行网络请求,并在`ViewModel`中使用`viewModelScope`启动协程,结合`Dispatchers.Main`更新UI,避免内存泄漏。使用协程不仅简化代码,还提升了程序健壮性。
96 1