1.简介
日前,在加拿大温哥华召开的数据库领域顶会VLDB 2023上,来自阿里云瑶池数据库团队的论文《Anser: Adaptive Information Sharing Framework of AnalyticDB》,成功入选VLDB Industrial Track。
论文提出了一种在分布式计算引擎中高效传递动态执行信息的框架Anser,是业界首个系统地支持基于执行中信息进行动态优化的动态信息传递框架;用于系统地优化复杂长查询的响应时间和资源使用;并实现了基于Anser的自适应调度器,一方面避免长查询占满集群资源,另一方面在用满CPU资源的前提下尽量满足Anser对执行信息收集依赖关系的要求。
2.背景
混合负载是实时数仓面临的一大业务挑战,我们观察到,复杂的交互式分析、大吞吐数据处理等场景的ETL查询,虽然流量不大(超过10s的查询仅有10%),但占用了集群超过50%的计算资源。这些复杂的大查询,不仅本身执行时间长、消耗资源大,在并发较高时,还可能出现和小查询争抢资源,拖慢小查询,影响集群整体稳定性。因此,优化这些大查询的资源分配和执行计划的优化就尤为重要。
由于大查询计划复杂,传统的基于代价的优化策略,其代价估算的准确度会随着关联次数的增多而下降,极端情况下会生成灾难性计划,影响查询效率和集群稳定性。业界常见的优化思路是基于执行中的统计信息,对执行计划进行动态优化,这类优化统称为Adaptive Query Processing (AQP)。另一方面,一些执行中的信息也可以用于对结果集进行提前过滤,从而减少后续需要参与计算的数据量,这类优化是通过旁路(非执行计划上下游)进行信息传递,学术界称为Sideways Information Passing(SIP)。然而,在实际应用中,仍然存在一些挑战:
业界的AQP方案大都是定制化的。独立支持单独功能,分别进行执行信息的收集和应用。而执行信息的收集和使用本身是可以解耦的。耦合的设计无法做到信息收集的复用和应用场景的扩展,当执行信息的收集开销不可忽略时,信息收集的复用可以极大减少优化链路本身的开销。
单独进行信息收集并扩展应用场景时,信息的种类可能变多、缓存的开销可能变大,因此需要一个高效的管理模块来控制信息的注册、收集、分发、缓存、清理等生命周期。这个链路需要独立于查询链路,避免影响集群的正常业务。并且在分布式计算引擎中,根据应用场景的不同,我们可能需要分区的、或者全量的执行信息。业界没有一套系统的框架,来进行动态信息的抽象定义和管理。
要能够应用执行中的信息,信息的分布者就需要先于订阅者执行;同时,如果订阅者强依赖于信息的产生,就可能退化为串行执行,反而使查询的响应时间变长。我们需要在能够并行用满CPU资源的前提下,尽量让订阅者可以收到发布者传递的信息。这就需要执行调度模块能够感知信息的依赖关系,同时作为一个中心化的管理模块感知集群资源水位并进行动态分配。
基于以上挑战,论文提出了一个动态信息传递框架,及一个基于信息流依赖的自适应调度器,来进行执行中长查询的智能优化。
3.动态信息传递框架Anser
论文设计了一个统一的动态的信息传递框架,能够支持各种自适应优化的应用场景。
3.1整体架构
论文将信息的类型和粒度做了定义和分类。他与信息的发布订阅者无关,是一个对信息本身的描述。
类型:分为primitive和non-primitive。primitive是收集开销可以忽略的基础统计信息,如行数,最大最小值等;non-primitive是需要从基础统计信息推导(如NDV)或有额外收集开销(如BloomFilter)的统计信息;
粒度:在分布式计算引擎中,数据通过一定的算法划分为不同的分片并行执行。在执行中,每个算子可以获取一个部分的统计信息,如果是传递给具有相同分布属性的算子,可以直接利用分片的统计信息进行一些优化;如果是传递给不同分布属性的算子,或全局的优化器、调度器等模块,就需要讲所有信息聚合在一起生成全局的信息。分片、全局描述的就是信息的粒度。
整个信息传递框架是一个发布订阅者模型,具体实现包括:
1. 发布者 Publisher - 信息收集者:算子是发布者
所有算子都收集primitive信息,收集开销可忽略;
部分算子本身就会计算non-primitive信息,如hash agg在计算时会构建hash表,就可以直接估算出NDV的值;
如无算子可以提供需要的non-primitive信息,会在算子链中插入一个PubOperator。
2.订阅者 Subscriber - 信息消费者:根据应用场景不同,订阅者者可能是算子、调度器、优化器
对于订阅者非算子的情况,会在算子链中插入一个SubOperator,用来接收信息,并和调度器/优化器交互;
订阅者对发布者弱依赖,SubOperator中会定义一个阻塞事件,收到信息/收到信息被取消(在执行异常情况下或在自适应调度器要求下)时取消阻塞,和订阅者交互,然后开始执行算子本身的逻辑或驱动下游算子。
3.管道 Channel - 连接发布订阅者的桥梁
Channel Manager - 逻辑桥梁,确定发布者和订阅者对应关系、构建管道,并进行信息的生命周期管理;
Channel Service - 物理桥梁,进行数据合并和跨机器的网路传输。
3.2工程实现
Anser的实现需要考虑平衡 应用运行时信息进行优化带来的收益,和信息收集、传递、消费时的开销,并且需要考虑网络和缓存开销给系统带来的压力对整体稳定性的影响。在通用性上,希望运行时信息能被更广泛的应用,一方面可以rule-based扩展不同的应用场景,而不需要定制化的进行信息收集和传输;另一方面每个信息可以被一个查询内多个模块或算子复用,利用全局上下文扩展生效范围。
贪婪算法与全局上下文
考虑到增加运行时的信息传递有一定的开销,我们希望通过开销and/or收益评估,将生效场景限制在有正收益的场景。业界一般有两种做法。一种是在计划阶段、基于估算的、局部生效的规则;另一种是在执行阶段、基于运行信息的、全局生效的规则。
论文提出贪婪算法,一种在计划阶段生成等价关系、基于运行信息判断是否生效的、全局的规则。首先,在计划阶段,基于全局等价关系推导,针对不同应用场景,rule-based找到发布和订阅者并通过管道进行匹配,这里除了插入发布node外,不对执行计划进行改变;在执行阶段,基于运行时信息计算发布订阅的开销,对有收益的管道,才进行信息的收集和传递。
开销优化
论文提出了一些工程实现上的优化和兜底逻辑的限制来降低传递开销并限制开销上限:
1. 共享:支持一个信息被多个订阅者共享,这个信息只需要产生并传递一次到对应节点;
2.合并:多个信息如果同时产生,通过信息合并,在节点间传递时只发送一次请求,从而减少网络连接;
3. 短路:当信息粒度为分片时,直接在节点内部传递给对应订阅者,短路信息合并和节点间传递的开销;
4.兜底:通过限制信息内存使用等兜底逻辑保证开销可控。
4.基于Anser的自适应调度器
在分布式数据库中,我们将每个查询拆分为多个子任务,在执行过程中并行处理。调度器,就起到了决定子任务执行顺序和时机的作用,使系统能尽可能充分利用整个分布式系统的资源。
4.1算法实现
自适应调度器需要以下4个能力:
1. 整个执行计划可以按序执行,不出现查询死锁;
2. 具备分批调度的能力,从而保证执行任务有动态调整的能力;
3. 感知Anser定义的弱依赖关系,尽量先调度发布者,后调度订阅者;
4. 平衡单批次执行资源利用率低导致RT上升,和每次执行批次个数过多导致重优化机会变少的问 题。
批次划分
论文利用执行计划中的阻塞点,将执行计划划分为更小的批次。阻塞点包括三种情况。一种是攒数据算子,在计算未完成时不输出数据。另一种是落盘算子,在资源不足时,需要将数据落盘,完成计算后再读盘继续执行下游。在这两种情况下,执行流水线都会被打断,下游还未执行,可以利用这个时机进行执行中的动态优化。第三种是自定义一个阻塞算子,初始化时就不处理数据,满足一定条件才开始执行。
论文分3步划分出Stage Group为调度的最小粒度,前两步保证能力1,第三步保证能力2:
1.定义Stage间的强依赖:图a中箭头的指向 A -> B 表示A需要消费B的数据,A强依赖于B,否则会出现查询死锁;
2. 找出强连通分量:找到图a中的强连通分量,一次性调度每个强连通分量里的所有Stage;
3. 划出批次:在找到强连通分量的基础上,要求每个批次至少有一个源头stage或阻塞点。
优先级定义
为满足能力3,根据Anser定义的订阅者对发布者弱依赖关系,和DAG定义的强依赖关系,给每个批次定义了一个优先级priority score。优先级的取值范围在0-5之间,0包括没有任何强依赖的批次,会首先被调度。有强依赖的批次,通过动态规划,按以下优先级决定调度的先后顺序:包含订阅者且对应的发布者已完成 > 只包含发布者 > 既没有发布者也没有订阅者 > 包含订阅者且对应发布者执行中 > 包含订阅者且对应发布者未执行。目标是在尽可能先调度发布者后调度订阅者的同时,减少订阅者的等待时间。
调度流程
整体调度流程分为四步。其中,第三步是为满足能力4:
1. 划分批次并定义优先级;
2. 构建优先级队列:维护两个优先级队列,队列1为非阻塞队列,包含priority score=0的批次,由批次树的叶子节点向根节点调度;队列2为阻塞队列,包含其他批次,调度器会对批次执行状态进行监控,实时更新priority score或将批次挪到非阻塞队列;
3. 并行调度因子N:随资源规格自适应调整N的具体值,同时跑N个批次,保证并行度优先;
4. 批次的执行:每个批次中的所有stage会一次性下发。
4.2收益分析
论文是用一套算法来调度在线和离线的场景。对于响应时间敏感的在线场景,定义Stage个数小于等于5个的查询为短查询,会短路Anser并跳过批次的划分,整个查询作为一个批次进行下发,也即AllAtOnce地执行。
自适应调度器还能降低系统的整体压力,减少不必要的资源竞争和等待开销。
减少分布式开销和资源等待开销:AllAtOnce一次性下发后,会开始申请计算资源并在有数据交换的地方建立网络连接。但由于本身下游就需要上游有数据输出后才开始计算,下游申请好了资源也需要等待,就会造成资源的竞争和浪费。在生产环境中,使用自适应调度器跑复杂查询的大集群中,对比AllAtOnce,网络连接减少了50%-95%,峰值内存使用减少了50%,等待task个数最多减少99%
减少存储扫描热点:AllAtOnce一次性下发后,查询会从源头数据扫描开始执行,会带来存储扫描热点。自适应调度器会分批次下发带有源头数据扫描的Stage,有效打散热点
5.实验
论文分别在Benchmark测试集TPC-DS和线上生产环境进行了实验验证。
1. E2E Benchmark测试
Benchmark测试对比了AnalyticDB MySQL打开和关闭Anser的整体收益,并和Spark进行了对比。打开Anser,AnalyticDB MySQL整体有60%+的提升。
2. 以Runtime Filter场景为例的消融实验
消融实验选取了TPC-DS 99条查询中的22条在实验1中执行时间超过2s的长查询,单看这些长查询,并且只打开Runtime Filter,有平均81%的性能提升。接下来通过控制变量,实验了一些关键设计的有效性。
对比了全局推导生效(greedy)vs 仅在同一个Join生效(baseline)的有效应用场景:计划阶段生成Runtime Filter个数,baseline下61个,greedy下 170个;在greedy中生成的170个Filter中,104个在执行中有效。
对比了基于执行前推导 vs 基于执行中推导,减少的数据扫描量:在实验场景中,基于执行前推导的信息应用于源头数据扫描,可以提前过滤掉超过96%的数据,极大的减少网络和I/O的开销;而基于执行中推导的实现,无法用到执行中的信息下推到存储提前过滤数据。
对比了自适应调度器 vs AllAtOnce调度器,对查询响应时间的影响:设置了集群资源空闲/较满/打满三种测试场景,并在订阅端算子设置了等待时间。自适应调度器的响应时间在不同资源配置、不同等待时间下都比AllAtOnce的不同等待时间的性能更优。
3. 其他应用场景在生产集群上的效果验证
6.总结
论文提出了业界首个系统地支持基于执行中信息进行动态优化的动态信息传递框架,实现了执行中信息的场景化注册和生命周期的高效管理,并实现了基于该框架的自适应调度器。该优化框架已经在AnalyticDB MySQL中上线,并且在生产环境中验证了对于复杂查询的稳定性能收益。
点击「链接」即可下载论文原文