带你读《2022技术人的百宝黑皮书》——响应式编程的复杂度和简化(4)https://developer.aliyun.com/article/1339635?groupCode=taobaotech
响应式编程的同步形式
回头看最原始的代码,当我们调用orderService.listOrders时,传进去的callback,其实就相当于一个弱化版的continuation。这意味着,如果我们可以将使用continuation将数据表示为AsyncSequence,那么就可以将响应 式代码写成同步形式,从而大幅简化响应式编程。
Swift提供了continuation的概念,提供了AsyncStream和AsyncThrowingStream来实现这个过程,对上节Rx的实现稍作改动即可。
第一步,实现一个将流式函数转换成AsyncThrowingStream的工具类,这个是通用的:
func makeSequence<Data>(f : StreamFunc<Data>) -> AsyncThrowingStream<Data, Error> { AsyncThrowingStream<Data, Error>{ continuation in f { data in continuation.yield(data) } _: { error in continuation.finish(throwing: e) } _: { continuation.finish() } } }
第二步,由于AsyncSequence还不支持merge,需要自己实现一个merge工具方法来实现多个流的组合,这个也是通用的:
//多个AsyncSequence merge成一个AsyncSequence func mergeSequence<Seq : AsyncSequence>(seqs : [Seq]) -> AsyncThrowingStream<Seq.Element, Error> { makeSequence(f: mergeF(fs: seqs.map(makeLoadFunc))) } func makeLoadFunc<Seq : AsyncSequence>(ats : Seq) -> StreamFunc<Seq.Element>{ { onData, onError, onComplete in Task { do { for try await data in ats { onData(data) } onComplete() } catch { onError(error) } } } } func mergeF<Data>(fs : [StreamFunc<Data>]) -> StreamFunc<Data> { { onData, onError, onComplete in var finish = false var results = fs.map{_ in false} for (index, f) in fs.enumerated() { f { data in if (!finish) { onData(data) } } _: { e in // 如果是第一个错误,直接回调,同时标记为结束 if (!finish) { finish = true onError(e) } } _: { // 注意,即使所有的请求都结束了,回调成功if (!finish) { results[index] = true // 所有都结束,回调 if (!results.contains(false)) { finish = true onComplete() } } } } } }
第三步,将listOrder和queryUserInfo转换成StreamFunc形式,与Rx中的第二步实现完全相同;
第四步,这样就可以将load方法简化为:
func asLoad() -> AsyncThrowingStream<[OrderObject], Error> { let orderService = [OrderService("鹅鹅鹅"), OrderService("鸭鸭鸭")] 3 // 通过map构造AsyncSequence,通过flatMap对listOrder和queryUserInfo进行复合let streams = orderService.map { orderService in makeSequence(f: orderService.listOrders).flatMap { (orders) -> AsyncThrowingStream<[OrderOb- ject], Error> in makeSequence(f: makeLoadFunc(orders: orders, userInfoService: getUserService(site: order- Service.site)))
8 |
|
} |
9 |
|
} |
10 |
|
|
11 |
|
// merge两个平台的AsyncSequence |
12 |
|
return mergeSequence(seqs: streams) |
13 |
} |
|
可以发现,代码与RxSwift几乎是完全相同的,所以我们仍然有对于代码正确性的信心,不同的是,现在使用方也得以获得同样的信心:
for try await orderObject in asLoad() { print("async get orderObject \(orderObject.first?.order.orderId)") 3 }
带你读《2022技术人的百宝黑皮书》——响应式编程的复杂度和简化(6)https://developer.aliyun.com/article/1339633?groupCode=taobaotech