响应式编程的复杂度和简化

简介: 响应式系统不是今天的主题,我们要讨论更具体的话题,即响应式代码的编写会有哪些复杂度,应该如何简化。

什么是响应式编程


什么是响应式编程,它是一种编程范式?还是一种设计模式?抑或是其他?响应式系统和响应式编程有什么关系?又比如,响应式编程它适用于什么场景?解决什么问题?



微软于2011年率先建设了.Net上的Rx库,以简化容易出错的异步和事件驱动编程,迈出了响应式编程的第一步,随后业界为许多编程语言提供了对应的实现。


.Net上的Rx库地址:https://docs.microsoft.com/en-us/previous-versions/dotnet/reactive-extensions/hh242985(v=vs.103)


什么是响应式,我们从一个例子开始,

image.png

在上面的表格中,建立了单元格之间的关系:`A1 = B1 + C1`,建立关系之后A1将响应任何对于B1和C1的变化,毫无疑问,这就是一种响应式行为。



我觉得这个例子很棒的地方在于,它显然很简单,同时又足够深刻,首先,它充分的体现了响应式的概念,其次,变化发生时,肯定触发了某些过程的执行,说明背后存在关系的建立和沿着关系传播的变化,再次,稍微深入一点看,B1和C1的变化可以是一系列的变化,可以很自然的引申到流的概念,最后,它有一个很高级的抽象,对使用方来说,整个过程是声明式的。



当然,举例子来说明一个概念的时候,本质上是用一个外延在解释一个概念的内涵,往往是会将内涵缩小的,所以我们可以尝试推广这个外延。列出这个例子的特征:



描述了一个单元格里的整数等于另外两个单元格里整数相加,当后者每次发生变化时,变化都会传播到第一个单元格,并进行求值。



关键词为整数、等于、相加、变化、每次、传播、求值。前三个关键词仅仅和例子相关,可以直接去掉。变化可以推广为数据,每次可以在逻辑上等价于流的概念,流可以有0个、1个或多个数据,传播可以推广为通信(在这个意义上,函数调用、RPC、socket、MQ都是通信),求值推广为执行一个过程。所以我们可以得出响应式编程的定义:



通过声明式的通信定义,将数据流与过程组合起来,从而实现数据驱动过程的一种复合编程范式。



时至今日,业界对于响应式的定义仍然是不统一的,因此这是我自己的理解。响应式的基础概念是数据流,理念是过程的执行是通过响应数据来驱动的核心是构造数据和过程的响应关系,并且能够让数据沿着关系传播驱动过程,因此响应式编程本质上是一种对通信的抽象,说它是一种编程范式,是因为它提供一种对于数据与过程组合方式的看法,说它是复合范式而不是基本范式,是因为它不像OOP或者FP一样提供的是对于数据和过程的看法,而是以两者为基础,所以可以有对象响应式和函数响应式。



当我们基于响应式构建系统时,就是响应式系统,响应式系统的构建原则可以参考此处(地址:https://www.reactiveprinciples.org/patterns/communicate-facts.html,总的来说,系统会分割成一个一个的分区,分区内部对状态进行本地化,分区之间通过通信进行异步解耦,可以通过控制这个通信的过程,实现系统的弹性扩缩容和部分组件失败的回弹性。



响应式系统不是今天的主题,我们要讨论更具体的话题,即响应式代码的编写会有哪些复杂度,应该如何简化。


响应式编程的复杂度


响应式编程的复杂度来自于4个方面:


可以有0次、1次或多次数据产生,也就是数据流;

除了数据之外,还有能够标识错误和完成(正常结束);

数据流和数据流、数据流和过程的组合复杂度很高;

在上面的基础上,需要处理整个过程中线程切换、并发同步、数据缓冲等问题。


为了支持数据流的概念,可以产生0次、1次或多次数据产生,API设计需要把数据回调和结果回调分开,通常也会把错误回调和完成回调分开,这种接口被称为流式接口,一个标准的流式接口设计如下所示:

typealias Func = () -> ()
typealias OnData<Data> = (Data) -> ()
typealias OnError = (Error) -> ()
typealias OnComplete = Func
typealias StreamFunc<Data> = (@escaping OnData<Data>, @escaping OnError, @escaping OnComplete) -> ()

显然,流式接口是普通异步接口将一次结果向多次结果的推广,这种推广同时也增加了逻辑的复杂度我们可以通过一个逻辑上简单的例子来看一下流式接口的使用过程,为了关注于核心的复杂度,只会体现前3个方面,一方面是由于加入第4点的话会导致代码过于冗长混淆关注点,另一方面相信各位对第4点本身的复杂度和它引起的众多问题已经非常熟悉了。这个例子很简单,只有三步:

  1. 假设需要为一个店铺提供一个订单展示页面,这些订单来自两个不同的平台“鹅鹅鹅”和“鸭鸭鸭”,他们各自提供了查询的接口(listOrders,为了简单假设他们提供的模型和接口完全一致);  
  2. 订单列表需要展示用户的昵称等信息,需要通过对应平台的另外一个接口(queryUserInfo)查询;  
  3. 由于SDK缓存、持久化、网络请求策略,数据无法一次性获取,这两个接口可能存在多次数据回调。

进一步简化问题,我们忽略变更处理、UI渲染和用户交互处理,仅仅考虑数据加载,这需要组合2个阶段的4次接口调用,先分别请求两个平台的订单,使用订单请求对应平台的userInfo,最后合并成完整数据:

// 数据怎么回调,什么情况结束,onError和onComplete分别在什么情况回调,保证有且仅有一次回调
func load(onData : OnData<[OrderObject]>?, onError : OnError?, onComplete : OnComplete?) {
    let orderServices = [OrderService("鹅鹅鹅"), OrderService("鸭鸭鸭")]
    // 记录整体请求的完成状态
    var listOrderFinish = false
    var queryUserFinish = false
    // 记录各个请求的结果
    var listOrderResults = orderServices.map{_ in false}
    var queryUserResults = [Bool]()
    for (index, orderService) in orderServices.enumerated() {
        orderService.listOrders { orders in
            // 已结束不处理
            if (listOrderFinish) {
                return;
            }
            let index = queryUserResults.count
            queryUserResults[index] = false
            if let userService = getUserService(site: orderService.site){
                let userIds = orders.map { order in
                    order.userId
                }
                userService.queryUserInfo(userIds: userIds) { userInfoDict in
                    if (listOrderFinish && queryUserFinish) {
                        return;
                    }
                    let orderObjects = orders.map { order in
                        OrderObject(order: order, userInfo: userInfoDict[order.userId])
                    }
                    onData?(orderObjects)
                } onError: { error in
                    // 如果是第一个错误,直接回调,同时标记为结束
                    if (!listOrderFinish || !queryUserFinish) {
                        listOrderFinish = true
                        queryUserFinish = true
                        onError?(error)
                    }
                } onComplete: {
                    // 外层结束,内层也结束,才是最终结束
                    if (!listOrderFinish || !queryUserFinish) {
                        queryUserResults[index] = true
                        // 所有都结束,回调
                        if (listOrderFinish && !queryUserResults.contains(false)) {
                            listOrderFinish = true
                            onComplete?()
                        }
                    }
                }
            } else {
                let orderObjects = orders.map { order in
                    OrderObject(order: order)
                }
                onData?(orderObjects)
                queryUserResults[index] = true
                // 所有都结束,回调
                if (listOrderFinish && !queryUserResults.contains(false)) {
                    listOrderFinish = true
                    onComplete?()
                }
            }
        } onError: { error in
            // 如果是第一个错误,直接回调,同时标记为结束
            if (!listOrderFinish) {
                listOrderFinish = true
                onError?(error)
            }
        } onComplete: {
            // 注意,即使所有的请求都结束了,也不能回调结束,因为这里的结束只是代表Order请求结束,userInfo请求不一定结束
            if (!listOrderFinish) {
                listOrderResults[index] = true
                // 所有都结束,回调
                if (!listOrderResults.contains(false)) {
                    listOrderFinish = true
                }
            }
        }
    }
}

在这个接口的实现中,数据回调最简单,在没有结束的情况下,多次回调的数据可以直接回调,问题是如何保证错误和完成有且仅有一次回调,且结果回调后不再回调数据,即:




什么时候回调错误?什么时候回调完成?



如果我们认为一个接口出错,就回调错误,这是最简单的错误处理,只需要检查和设置结束状态,在没有结束时的第一个错误进行回调即可,注意,我们需要在userInfo的请求中也做类似的处理,并保证错误回调后不再执行任何回调。



完成的回调要比错误复杂的多,我们可以来思考一下:

  1. 首先,我们不能在listOrders的onComplete里面取回调完成,因为这里不能代表queryUserInfo这个接口也完成了;  
  2. 其次,我们也不能简单的通过所有queryUserInfo都完成了就回调完成,因为listOrders在完成前仍然有可能返回新的订单数据。



也就是说,这里的完成需要在queryUserInfo进行判断,并且也需要考虑外层请求的完成情况,比普通异步接口的级联要多了两个维度。这仅仅是2种接口4次请求,在真实的编程中,接口数量会多得多,并且需要把第4点加进来,线程/队列、并发、同步、缓冲区,还要处理新数据推送响应,再考虑调试、监控、排查,复杂度显然会继续大幅增长,保证这个过程的正确性是一件痛苦的事情。


响应式编程的复杂度使用Rx/Combine简化响应式编程


为了解决这些问题,业界搞出了Reactive Streams规范(地址:https://www.reactive-streams.org/),也出现了若干的实现,都以工具库的形式提供,包括Rx系列、Reactor,以及苹果功能类似的Combine。作为一个iOS开发,我对RxSwift和Combine比较了解,两者主要的区别在于Combine多了一个Subscription的抽象来协调Publisher和Subscriber之间的行为,尤其是Back Pressure相关的控制,但总的来说,都提供了对于异步数据流的抽象和组合能力,用法上也很类似,这里以RxSwift为例来重写上面的过程。



第一步,实现一个将流式函数转换成Observable的工具类,这个是通用的,非常直观:

func makeObservable<Data>(f : @escaping StreamFunc<Data>) -> Observable<Data> {
    Observable<Data>.create { observer in
        f { data in
            observer.onNext(data)
        } _: { error in
            observer.onError(error)
        } _: {
            observer.onCompleted()
        }
        return Disposables.create()
    }
}

第二步,针对这个例子,将listOrder和queryUserInfo转换成StreamFunc形式,listOrder本来就是StreamFunc,对queryUserInfo进行偏应用也可以转换为StreamFunc形式,这是具体接口相关的:

func makeStreamFunc(orders : [Order], userInfoService : UserService?) -> StreamFunc<[OrderObject]> {
    if let userInfoService = userInfoService {
        // 核心是对queryUserInfo的userIds参数进行偏应用
        let userInfoF : StreamFunc<[OrderObject]> = { onData, onError, onComplete in
            let userIds = orders.map{$0.userId}
            userInfoService.queryUserInfo(userIds: userIds, onData: { userInfoDict in
                let orderObjects = orders.map { order in
                    OrderObject(order: order, userInfo: userInfoDict[order.userId])
                }
                onData(orderObjects)
            }, onError: onError, onComplete: onComplete)
        }
        return userInfoF
    } else {
        return { onData, onError, onComplete in
            onData(orders.map{OrderObject(order: $0)})
            onComplete()
        }
    }
}

第三步,这样就可以将load方法简化为:

func rxLoad() -> Observable<[OrderObject]> {
    let orderService = [OrderService("鹅鹅鹅"), OrderService("鸭鸭鸭")]
    // 通过map构造Observable,通过flatMap对listOrder和queryUserInfo进行复合
    let observables = orderService.map { orderService in
        makeObservable(f: orderService.listOrders).flatMap { (orders) -> Observable<[OrderObject]> in
            let userLoadF = makeStreamFunc(orders: orders, userInfoService: getUserService(site: orderService.site))
            return makeObservable(f: userLoadF)
        }
    }
    // merge两个平台的Observable
    return Observable.merge(observables)
}

可以看到,第一步是通用的,实际代码中只需要做第二步和第三步,这就对上面的接口进行了大量的简化,并且库以统一的方式处理掉了合并、级联、多数据返回的复杂逻辑,我们有相当的把握来保证正确性。当然,除了学习成本较高以外,也还是有缺点的,主要是使用方式仍然是异步形式,在部分环节仍然需要处理异步带来的复杂度:

// 使用方调用
rxLoad().subscribe { orderObjects in
    // onNext闭包中处理数据
} onError: { error in
    // onError闭包中处理错误
} onCompleted: {
    // onCompleted闭包中处理完成
} onDisposed: {
}

Rx确实大大简化了异步编程,但是还不够,因为它的使用仍然是异步形式。


使用AsyncSequence简化响应式编程


 迭代器与序列


迭代器是很多语言都有的一个概念,一个迭代器的核心是next()函数,每次调用都会返回下一个数据,这些数据构成了一个序列(Sequence),迭代器也意味着序列可以被遍历。

 异步序列


如果让迭代器的next()方法支持异步,就产生了异步序列。Swift对此提供了一个AsyncSequence的协议,并对它提供了语言级别的支持,使得开发者可以以同步的形式遍历一个异步序列:

for try await data in asyncDataList {
    print("async get data : + \(data)")
}

实际上,Swift在Combine中支持了Publisher的同步遍历:

// Combine的同步调用
for try await data in publisher.values {
    print("async get publiser value \(data)")
}

不过这个特性需要iOS15才能支持,如果说iOS13还可以展望的话,iOS15就是遥遥无期了。


 CPS变换


如果能将流式接口转换为异步序列,那么就可以实现响应式代码的同步编写,这个转换过程可以通过CPS变换实现。



CPS变换全称Continuation-Pass-Style,这个概念来自Lisp语系,是一种显式传递控制流的编程风格,其传递控制流的载体就是continuation。continuation可以理解为当前代码执行的后续,如果一个函数f有一个continuation参数,我们就可以把当前的continuation传递进去,当函数产生结果时,通过continuation回到函数f外,继续执行,这种函数调用方式成为call/cc(call with current continuation)。



这种变换,称为CPS变换。



作为一个类比,我觉得可以将continuation理解为return的在两个方面的推广形式,首先,continuation是first-class的,可以作为变量存储,可以作为函数的参数和返回值,其次,continuation可以多次使用,而return只能有一次。



 响应式编程的同步形式


回头看最原始的代码,当我们调用orderService.listOrders时,传进去的callback,其实就相当于一个弱化版的continuation。这意味着,如果我们可以将使用continuation将数据表示为AsyncSequence,那么就可以将响应式代码写成同步形式,从而大幅简化响应式编程。Swift提供了continuation的概念,提供了AsyncStream和AsyncThrowingStream来实现这个过程,对上节Rx的实现稍作改动即可。第一步,实现一个将流式函数转换成AsyncThrowingStream的工具类,这个是通用的:

func makeSequence<Data>(f : StreamFunc<Data>) -> AsyncThrowingStream<Data, Error> {
    AsyncThrowingStream<Data, Error>{ continuation in
        f { data in
            continuation.yield(data)
        } _: { error in
            continuation.finish(throwing: e)
        } _: {
            continuation.finish()
        }
    }
}

第二步,由于AsyncSequence还不支持merge,需要自己实现一个merge工具方法来实现多个流的组合,这个也是通用的:

//多个AsyncSequence merge成一个AsyncSequence
func mergeSequence<Seq : AsyncSequence>(seqs : [Seq]) -> AsyncThrowingStream<Seq.Element, Error> {
    makeSequence(f: mergeF(fs: seqs.map(makeLoadFunc)))
}
func makeLoadFunc<Seq : AsyncSequence>(ats : Seq) -> StreamFunc<Seq.Element>{
    { onData, onError, onComplete in
        Task {
            do {
                for try await data in ats {
                    onData(data)
                }
                onComplete()
            } catch {
                onError(error)
            }
        }
    }
}
func mergeF<Data>(fs : [StreamFunc<Data>]) -> StreamFunc<Data> {
    { onData, onError, onComplete in
        var finish = false
        var results = fs.map{_ in false}
        for (index, f) in fs.enumerated() {
            f { data in
                if (!finish) {
                    onData(data)
                }
            } _: { e in
                // 如果是第一个错误,直接回调,同时标记为结束
                if (!finish) {
                    finish = true
                    onError(e)
                }
            } _: {
                // 注意,即使所有的请求都结束了,回调成功
                if (!finish) {
                    results[index] = true
                    // 所有都结束,回调
                    if (!results.contains(false)) {
                        finish = true
                        onComplete()
                    }
                }
            }
        }
    }
}

第三步,将listOrder和queryUserInfo转换成StreamFunc形式,与Rx中的第二步实现完全相同;


第四步,这样就可以将load方法简化为:

func asLoad() -> AsyncThrowingStream<[OrderObject], Error> {
    let orderService = [OrderService("鹅鹅鹅"), OrderService("鸭鸭鸭")]
    // 通过map构造AsyncSequence,通过flatMap对listOrder和queryUserInfo进行复合
    let streams = orderService.map { orderService in
        makeSequence(f: orderService.listOrders).flatMap { (orders) -> AsyncThrowingStream<[OrderObject], Error> in
            makeSequence(f: makeLoadFunc(orders: orders, userInfoService: getUserService(site: orderService.site)))
        }
    }
    // merge两个平台的AsyncSequence
    return mergeSequence(seqs: streams)
}

可以发现,代码与RxSwift几乎是完全相同的,所以我们仍然有对于代码正确性的信心,不同的是,现在使用方也得以获得同样的信心:

for try await orderObject in asLoad() {
    print("async get orderObject \(orderObject.first?.order.orderId)")
}

总结

同步是编程中的田园世界,而流式接口作为异步接口最复杂的形态,我们通过CPS变换的控制流技术,将流式接口表示为AsyncSequence,实现了对异步序列遍历的同步形式,从而将响应式编程在形式上统一回了田园世界。上面的第一步和第二步实现了AsyncSequence和StreamFunc的相互转换,所以实际上我们证明了它们是同构的,更进一步的,我们可以证明它们与Rx、Combine也是同构的。换言之,它们是同一个概念的不同形式,理论上它们的表达能力是等价的,这个概念就是数据流,这个概念在Rx中叫做Observable,在Combine中叫做Publisher。在实际实现上,Rx和Combine提供了大量的操作符,因此目前它们的能力远远强于AsyncSequence和StreamFunc,比如AsyncSequence居然不支持merge。AsyncSequence的优势是可以支持同步写法,在我看来这个优势是很大的。看到社区有过AsyncSequence替换Combine的相关的讨论,我认为逻辑上是讲得通的。

AsyncSequence替换Combine的 相关讨论地址:https://forums.swift.org/t/should-asyncsequence-replace-combine-in-the-future-or-should-they-coexist/53370


相关文章
|
3月前
|
安全 前端开发 Java
随着企业应用复杂度提升,Java Spring框架以其强大与灵活特性简化开发流程,成为构建高效、可维护应用的理想选择
随着企业应用复杂度提升,Java Spring框架以其强大与灵活特性简化开发流程,成为构建高效、可维护应用的理想选择。依赖注入使对象管理交由Spring容器处理,实现低耦合高内聚;AOP则分离横切关注点如事务管理,增强代码模块化。Spring还提供MVC、Data、Security等模块满足多样需求,并通过Spring Boot简化配置与部署,加速微服务架构构建。掌握这些核心概念与工具,开发者能更从容应对挑战,打造卓越应用。
42 1
|
1天前
|
前端开发 API UED
深入理解微前端架构:构建灵活、高效的前端应用
【10月更文挑战第23天】微前端架构是一种将前端应用分解为多个小型、独立、可复用的服务的方法。每个服务独立开发和部署,但共同提供一致的用户体验。本文探讨了微前端架构的核心概念、优势及实施方法,包括定义服务边界、建立通信机制、共享UI组件库和版本控制等。通过实际案例和职业心得,帮助读者更好地理解和应用微前端架构。
|
5月前
|
算法 Linux C++
C++框架设计中实现可扩展性的方法
在软件开发中,可扩展性至关重要,尤其对于C++这样的静态类型语言。本文探讨了在C++框架设计中实现可扩展性的方法:1) 模块化设计降低耦合;2) 使用继承和接口实现功能扩展;3) 通过插件机制动态添加功能;4) 利用模板和泛型提升代码复用;5) 遵循设计原则和最佳实践;6) 应用配置和策略模式以改变运行时行为;7) 使用工厂和抽象工厂模式创建可扩展的对象;8) 实现依赖注入增强灵活性。这些策略有助于构建适应变化、易于维护的C++框架。
357 2
|
3月前
|
开发框架 开发者 数据库管理
模块化开发和传统开发的优缺点有哪些
【8月更文挑战第26天】模块化开发和传统开发的优缺点有哪些
76 2
|
4月前
|
开发框架 前端开发 关系型数据库
Winform开发的快速、健壮、解耦的几点建议
Winform开发的快速、健壮、解耦的几点建议
|
6月前
|
前端开发 JavaScript 测试技术
第八章(应用场景篇) 中大型项目的解构:从单体应用到微前端
第八章(应用场景篇) 中大型项目的解构:从单体应用到微前端
|
6月前
|
消息中间件 开发者 微服务
构建高效代码:模块化设计原则的实践与思考
在软件开发的世界中,编写可维护、可扩展且高效的代码是每个开发者追求的目标。本文将探讨如何通过应用模块化设计原则来提升代码质量,分享一些实践中的经验教训以及对未来技术趋势的思考。
|
设计模式 算法
如何优雅地使用策略模式来实现更灵活、可扩展和易于维护的代码?
如何优雅地使用策略模式来实现更灵活、可扩展和易于维护的代码?
87 0
|
存储 前端开发 安全
Controller层代码技巧,开发人员可以编写出更高效、可维护的代码
Controller层代码技巧,开发人员可以编写出更高效、可维护的代码
163 0
|
程序员 测试技术
《重构2》第十章-简化条件逻辑
《重构2》第十章-简化条件逻辑
335 0