
问题回顾Shuffle是大数据计算中最为重要的算子。首先,覆盖率高,超过50%的作业都包含至少一个Shuffle[2]。其次,资源消耗大,阿里内部平台Shuffle的CPU占比超过20%,LinkedIn内部Shuffle Read导致的资源浪费高达15%[1],单Shuffle数据量超100T[2]。第三,不稳定,硬件资源的稳定性CPU>内存>磁盘≈网络,而Shuffle的资源消耗是倒序。OutOfMemory和Fetch Failure可能是Spark作业最常见的两种错误,前者可以通过调参解决,而后者需要系统性重构Shuffle。传统Shuffle如下图所示,Mapper把Shuffle数据按PartitionId排序写盘后交给External Shuffle Service(ESS)管理,Reducer从每个Mapper Output中读取属于自己的Block。传统Shuffle存在以下问题。本地盘依赖限制了存算分离。存算分离是近年来兴起的新型架构,它解耦了计算和存储,可以更灵活地做机型设计:计算节点强CPU弱磁盘,存储节点强磁盘强网络弱CPU。计算节点无状态,可根据负载弹性伸缩。存储端,随着对象存储(OSS, S3)+数据湖格式(Delta, Iceberg, Hudi)+本地/近地缓存等方案的成熟,可当作容量无限的存储服务。用户通过计算弹性+存储按量付费获得成本节约。然而,Shuffle对本地盘的依赖限制了存算分离。写放大。当Mapper Output数据量超过内存时触发外排,从而引入额外磁盘IO。大量随机读。Mapper Output属于某个Reducer的数据量很小,如Output 128M,Reducer并发2000,则每个Reducer只读64K,从而导致大量小粒度随机读。对于HDD,随机读性能极差;对于SSD,会快速消耗SSD寿命。高网络连接数,导致线程池消耗过多CPU,带来性能和稳定性问题。Shuffle数据单副本,大规模集群场景坏盘/坏节点很普遍,Shuffle数据丢失引发的Stage重算带来性能和稳定性问题。RSS发展历程针对Shuffle的问题,工业界尝试了各种方法,近两年逐渐收敛到Push Shuffle的方案。SailfishSailfish[3](2012)最早提出Push Shuffle + Partition数据聚合的方法,对大作业有20%-5倍的性能提升。Sailfish魔改了分布式文件系统KFS[4],不支持多副本。DataflowGoolge BigQuery和Cloud Dataflow[5](2018)实现了Shuffle跟计算的解耦,采用多层存储(内存+磁盘),除此之外没有披露更多技术细节。RiffleFacebook Riffle[2](2018)采用了在Mapper端Merge的方法,物理节点上部署的Riffle服务负责把此节点上的Shuffle数据按照PartitionId做Merge,从而一定程度把小粒度的随机读合并成较大粒度。CoscoFacebook Cosco[6][7](2019)采用了Sailfish的方法并做了重设计,保留了Push Shuffle + Parititon数据聚合的核心方法,但使用了独立服务。服务端采用Master-Worker架构,使用内存两副本,用DFS做持久化。Cosco基本上定义了RSS的标准架构,但受到DFS的拖累,性能上并没有显著提升。ZeusUber Zeus[8][9](2020)同样采用了去中心化的服务架构,但没有类似etcd的角色维护Worker状态,因此难以做状态管理。Zeus通过Client双推的方式做多副本,采用本地存储。RPMPIntel RPMP[10](2020)依靠RDMA和PMEM的新硬件来加速Shuffle,并没有做数据聚合。MagnetLinkedIn Magnet[1](2021)融合了本地Shuffle+Push Shuffle,其设计哲学是"尽力而为",Mapper的Output写完本地后,Push线程会把数据推给远端的ESS做聚合,且不保证所有数据都会聚合。受益于本地Shuffle,Magnet在容错和AE的支持上的表现更好(直接Fallback到传统Shuffle)。Magnet的局限包括依赖本地盘,不支持存算分离;数据合并依赖ESS,对NodeManager造成额外压力;Shuffle Write同时写本地和远端,性能达不到最优。Magnet方案已经被Apache Spark接纳,成为默认的开源方案。FireStormFireStorm[11](2021)混合了Cosco和Zeus的设计,服务端采用Master-Worker架构,通过Client多写实现多副本。FireStorm使用了本地盘+对象存储的多层存储,采用较大的PushBlock(默认3M)。FireStorm在存储端保留了PushBlock的元信息,并记录在索引文件中。FireStorm的Client缓存数据的内存由Spark MemoryManager进行管理,并通过细颗粒度的内存分配(默认3K)来尽量避免内存浪费。从上述描述可知,当前的方案基本收敛到Push Shuffle,但在一些关键设计上的选择各家不尽相同,主要体现在:集成到Spark内部还是独立服务。RSS服务侧架构,选项包括:Master-Worker,含轻量级状态管理的去中心化,完全去中心化。Shuffle数据的存储,选项包括:内存,本地盘,DFS,对象存储。多副本的实现,选项包括:Client多推,服务端做Replication。阿里云RSS[12][13]由2020年推出,核心设计参考了Sailfish和Cosco,并且在架构和实现层面做了改良,下文将详细介绍。阿里云RSS核心架构针对上一节的关键设计,阿里云RSS的选择如下:独立服务。考虑到将RSS集成到Spark内部无法满足存算分离架构,阿里云RSS将作为独立服务提供Shuffle服务。Master-Worker架构。通过Master节点做服务状态管理非常必要,基于etcd的状态状态管理能力受限。多种存储方式。目前支持本地盘/DFS等存储方式,主打本地盘,将来会往分层存储方向发展。服务端做Replication。Client多推会额外消耗计算节点的网络和计算资源,在独立部署或者服务化的场景下对计算集群不友好。下图展示了阿里云RSS的关键架构,包含Client(RSS Client, Meta Service),Master(Resource Manager)和Worker三个角色。Shuffle的过程如下:Mapper在首次PushData时请求Master分配Worker资源,Worker记录自己所需要服务的Partition列表。Mapper把Shuffle数据缓存到内存,超过阈值时触发Push。隶属同个Partition的数据被Push到同一个Worker做合并,主Worker内存接收到数据后立即向从Worker发起Replication,数据达成内存两副本后即向Client发送ACK,Flusher后台线程负责刷盘。Mapper Stage运行结束,MetaService向Worker发起CommitFiles命令,把残留在内存的数据全部刷盘并返回文件列表。Reducer从对应的文件列表中读取Shuffle数据。阿里云RSS的核心架构和容错方面的介绍详见[13],本文接下来介绍阿里云RSS近一年的架构演进以及不同于其他系统的特色。状态下沉RSS采用Master-Worker架构,最初的设计中Master统一负责集群状态管理和Shuffle生命周期管理。集群状态包括Worker的健康度和负载;生命周期包括每个Shuffle由哪些Worker服务,每个Worker所服务的Partition列表,Shuffle所处的状态(Shuffle Write,CommitFile,Shuffle Read),是否有数据丢失等。维护Shuffle生命周期需要较大数据量和复杂数据结构,给Master HA的实现造成阻力。同时大量生命周期管理的服务调用使Master易成为性能瓶颈,限制RSS的扩展性。为了缓解Master压力,我们把生命周期状态管理下沉到Driver,由Application管理自己的Shuffle,Master只需维护RSS集群本身的状态。这个优化大大降低Master的负载,并使得Master HA得以顺利实现。Adaptive Pusher在最初的设计中,阿里云RSS跟其他系统一样采用Hash-Based Pusher,即Client会为每个Partition维护一个(或多个[11])内存Buffer,当Buffer超过阈值时触发推送。这种设计在并发度适中的情况下没有问题,而在超大并发度的情况下会导致OOM。例如Reducer的并发5W,在小Buffer[13]的系统中(64K)极端内存消耗为64K*5W=3G,在大Buffer[11]的系统中(3M)极端内存消耗为3M*5W=146G,这是不可接受的。针对这个问题,我们开发了Sort-Based Pusher,缓存数据时不区分Partition,当总的数据超过阈值(i.e. 64M)时对当前数据按照PartitionId排序,然后把数据Batch后推送,从而解决内存消耗过大的问题。Sort-Based Pusher会额外引入一次排序,性能上比Hash-Based Pusher略差。我们在ShuffleWriter初始化阶段根据Reducer的并发度自动选择合适的Pusher。磁盘容错出于性能的考虑,阿里云RSS推荐本地盘存储,因此处理坏/慢盘是保证服务可靠性的前提。Worker节点的DeviceMonitor线程定时对磁盘进行检查,检查项包括IOHang,使用量,读写异常等。此外Worker在所有磁盘操作处(创建文件,刷盘)都会捕捉异常并上报。IOHang、读写异常被认为是Critical Error,磁盘将被隔离并终止该磁盘上的存储服务。慢盘、使用量超警戒线等异常仅将磁盘隔离,不再接受新的Partition存储请求,但已有的Partition服务保持正常。在磁盘被隔离后,Worker的容量和负载将发生变化,这些信息将通过心跳发送给Master。滚动升级RSS作为常驻服务,有永不停服的要求,而系统本身总在向前演进,因此滚动升级是必选的功能。尽管通过Sub-Cluster部署方式可以绕过,即部署多个子集群,对子集群做灰度,灰度的集群暂停服务,但这种方式依赖调度系统感知正在灰度的集群并动态修改作业配置。我们认为RSS应该把滚动升级闭环掉,核心设计如下图所示。Client向Master节点的Leader角色(Master实现了HA,见上文)发起滚动升级请求并把更新包上传给Leader,Leader通过Raft协议修改状态为滚动升级,并启动第一阶段的升级:升级Master节点。Leader首先升级所有的Follower,然后替换本地包并重启。在Leader节点改变的情况下,升级过程不会中断或异常。Master节点升级结束后进入第二阶段:Worker节点升级。RSS采用滑动窗口做升级,窗口内的Worker尽量优雅下线,即拒绝新的Partition请求,并等待本地Shuffle结束。为了避免等待时间过长,会设置超时时间。此外,窗口内的Worker选择会尽量避免同时包含主从两副本以降低数据丢失的概率。混乱测试框架对于服务来说,仅依靠UT、集成测试、e2e测试等无法保证服务可靠性,因为这些测试无法覆盖线上复杂环境,如坏盘、CPU过载、网络过载、机器挂掉等。RSS要求在出现这些复杂情况时保持服务稳定,为了模拟线上环境,我们开发了仿真(混乱)测试框架,在测试环境中模拟线上可能出现的异常,同时保证满足RSS运行的最小运行环境,即至少3个Master节点和2个Worker节点可用,并且每个Worker节点至少有一块盘。我们持续对RSS做此类压力测试。仿真测试框架架构如下图所示,首先定义测试Plan来描述事件类型、事件触发的顺序及持续时间,事件类型包括节点异常,磁盘异常,IO异常,CPU过载等。客户端将Plan提交给Scheduler,Scheduler根据Plan的描述给每个节点的Runner发送具体的Operation,Runner负责具体执行并汇报当前节点的状态。在触发Operation之前,Scheduler会推演该事件发生产生的后果,若导致无法满足RSS的最小可运行环境,将拒绝此事件。我们认为仿真测试框架的思路是通用设计,可以推广到更多的服务测试中。多引擎支持Shuffle是通用操作,不跟引擎绑定,因此我们尝试了多引擎支持。当前我们支持了Hive+RSS,同时也在探索跟流计算引擎(Flink),MPP引擎(Presto)结合的可能性。尽管Hive和Spark都是批计算引擎,但Shuffle的行为并不一致,最大的差异是Hive在Mapper端做排序,Reducer只做Merge,而Spark在Reducer端做排序。由于RSS暂未支持计算,因此需要改造Tez支持Reducer排序。此外,Spark有干净的Shuffle插件接口,RSS只需在外围扩展,而Tez没有类似抽象,在这方面也有一定侵入性。当前大多数引擎都没有Shuffle插件化的抽象,需要一定程度的引擎修改。此外,流计算和MPP都是上游即时Push给下游的模式,而RSS是上游Push,下游Pull的模式,这两者如何结合也是需要探索的。测试我们对比了阿里云RSS、Magent及开源系统X。由于大家的系统还在向前演进,因此测试结果仅代表当前。测试环境Header * 1: ecs.g6e.4xlarge, 16 * 2.5GHz/3.2GHz, 64GiB, 10GbpsWorker * 3: ecs.g6e.8xlarge, 32 * 2.5GHz/3.2GHz, 128GiB, 10Gbps阿里云RSS vs. Magnet5T Terasort的性能测试如下图所示,如上文描述,Magent的Shuffle Write有额外开销,差于RSS和传统做法。Magent的Shuffle Read有提升,但差于RSS。在这个Benchmark下,RSS明显优于另外两个,Magent的e2e时间略好于传统Shuffle。阿里云RSS vs. 开源系统XRSS跟开源系统X在TPCDS-3T的性能对比如下,总时间RSS快了20%。稳定性在稳定性方面,我们测试了Reducer大规模并发的场景,Magnet可以跑通但时间比RSS慢了数倍,System X在Shuffle Write阶段报错。阿里云RSS在小米的实践现状及痛点小米的离线集群以Yarn+HDFS为主,NodeManager和DataNode混合部署。Spark是主要的离线引擎,支撑着核心计算任务。Spark作业当前最大的痛点集中在Shuffle导致的稳定性差,性能差和对存算分离架构的限制。在进行资源保证和作业调优后,作业失败原因主要归结为Fetch Failure,如下图所示。由于大部分集群使用的是HDD,传统Shuffle的高随机读和高网络连接导致性能很差,低稳定性带来的Stage重算会进一步加剧性能回退。此外,小米一直在尝试利用存算分离架构的计算弹性降低成本,但Shuffle对本地盘的依赖造成了阻碍。RSS在小米的落地小米一直在关注Shuffle优化相关技术,21年1月份跟阿里云EMR团队就RSS项目建立了共创关系,3月份第一个生产集群上线,开始接入作业,6月份第一个HA集群上线,规模达100+节点,9月份第一个300+节点上线,集群默认开启RSS,后续规划会进一步扩展RSS的灰度规模。在落地的过程,小米主导了磁盘容错的开发,大大提高了RSS的服务稳定性,技术细节如上文所述。此外,在前期RSS还未完全稳定阶段,小米在多个环节对RSS的作业进行了容错。在调度端,若开启RSS的Spark作业因Shuffle报错,则Yarn的下次重试会回退到ESS。在ShuffleWriter初始化阶段,小米主导了自适应Fallback机制,根据当前RSS集群的负载和作业的特征(如Reducer并发是否过大)自动选择RSS或ESS,从而提升稳定性。效果接入RSS后,Spark作业的稳定性、性能都取得了显著提升。之前因Fetch Failure失败的作业几乎不再失败,性能平均有20%的提升。下图展示了接入RSS前后作业稳定性的对比。ESS:RSS:下图展示了接入RSS前后作业运行时间的对比。ESS:RSS:在存算分离方面,小米海外某集群接入RSS后,成功上线了1600+ Core的弹性集群,且作业运行稳定。在阿里云EMR团队及小米Spark团队的共同努力下,RSS带来的稳定性和性能提升得到了充分的验证。后续小米将会持续扩大RSS集群规模以及作业规模,并且在弹性资源伸缩场景下发挥更大的作用。开源重要的事说三遍:“阿里云RSS开源啦!” X 3git地址: https://github.com/alibaba/RemoteShuffleService开源代码包含核心功能及容错,满足生产要求。计划中的重要Feature:AESpark多版本支持Better 流控Better 监控Better HA多引擎支持欢迎各路开发者共建!Reference[1]Min Shen, Ye Zhou, Chandni Singh. Magnet: Push-based Shuffle Service for Large-scale Data Processing. VLDB 2020.[2]Haoyu Zhang, Brian Cho, Ergin Seyfe, Avery Ching, Michael J. Freedman. Riffle: Optimized Shuffle Service for Large-Scale Data Analytics. EuroSys 2018.[3]Sriram Rao, Raghu Ramakrishnan, Adam Silberstein. Sailfish: A Framework For Large Scale Data Processing. SoCC 2012.[4]KFS. http://code.google.com/p/kosmosfs/[5]Google Dataflow Shuffle. https://cloud.google.com/blog/products/data-analytics/how-distributed-shuffle-improves-scalability-and-performance-cloud-dataflow-pipelines[6]Cosco: An Efficient Facebook-Scale Shuffle Service. https://databricks.com/session/cosco-an-efficient-facebook-scale-shuffle-service[7]Flash for Apache Spark Shuffle with Cosco. https://databricks.com/session_na20/flash-for-apache-spark-shuffle-with-cosco[8]Uber Zeus. https://databricks.com/session_na20/zeus-ubers-highly-scalable-and-distributed-shuffle-as-a-service[9]Uber Zeus. https://github.com/uber/RemoteShuffleService[10]Intel RPMP. https://databricks.com/session_na20/accelerating-apache-spark-shuffle-for-data-analytics-on-the-cloud-with-remote-persistent-memory-pools[11]Tencent FireStorm. https://github.com/Tencent/Firestorm[12]Aliyun RSS在趣头条的实践. https://developer.aliyun.com/article/779686[13]Aliyun RSS架构. https://developer.aliyun.com/article/772329[14]京东 RSS. https://cloud.tencent.com/developer/article/1882205
背景介绍 SparkSQL的优越性能背后有两大技术支柱:Optimizer和Runtime。前者致力于寻找最优的执行计划,后者则致力于把既定的执行计划尽可能快地执行出来。Runtime的多种优化可概括为两个层面:1. 全局优化。从提升全局资源利用率、消除数据倾斜、降低IO等角度做优化,包括自适应执行(Adaptive Execution), Shuffle Removal等。2. 局部优化。优化具体的Task的执行效率,主要依赖Codegen技术,具体包括Expression级别和WholeStage级别的Codegen。本文介绍Spark Codegen的技术原理。 Case Study 本节通过两个具体case介绍Codegen的做法。 Expression级别 考虑下面的表达式计算:x + (1 + 2),用scala代码表达如下: Add(Attribute(x), Add(Literal(1), Literal(2))) 语法树如下: 递归求值这棵语法树的常规代码如下: tree.transformUp { case Attribute(idx) => Literal(row.getValue(idx)) case Add(Literal(c1),Literal(c2)) => Literal(c1+c2) case Literal(c) => Literal(c) } 执行上述代码需要做很多类型匹配、虚函数调用、对象创建等额外逻辑,这些overhead远超对表达式求值本身。为了消除这些overhead,Spark Codegen直接拼成求值表达式的java代码并进行即时编译。具体分为三个步骤:1. 代码生成。根据语法树生成java代码,封装在wrapper类中: ... // class wrapper row.getValue(idx) + (1 + 2) ... // class wrapper 2. 即时编译。使用Janino框架把生成代码编译成class文件。3. 加载执行。最后加载并执行。优化前后性能有数量级的提升。 WholeStage级别 考虑如下的sql语句: select count(*) from store_sales where ss_item_sk=1000; 生成的物理执行计划如下: 执行该计划的常规做法是使用火山模型(vocano model),每个Operator都继承了Iterator接口,其next()方法首先驱动上游执行拿到输入,然后执行自己的逻辑。代码示例如下: class Agg extends Iterator[Row] { def doAgg() { while (child.hasNext()) { val row = child.next(); // do aggregation ... } } def next(): Row { if (!doneAgg) { doAgg(); } return aggIter.next(); } } class Filter extends Iterator[Row] { def next(): Row { var current = child.next() while (current != null && !predicate(current)) { current = child.next() } return current; } } 从上述代码可知,火山模型会有大量类型转换和虚函数调用。虚函数调用会导致CPU分支预测失败,从而导致严重的性能回退。为了消除这些overhead,Spark WholestageCodegen会为该物理计划生成类型确定的java代码,然后类似Expression的做法即时编译和加载执行。本例生成的java代码示例如下(非真实代码,真实代码片段见后文): var count = 0 for (ss_item_sk in store_sales) { if (ss_item_sk == 1000) { count += 1 } } 优化前后性能提升数据如下: Spark Codegen框架 Spark Codegen框架有三个核心组成部分1. 核心接口/类2. CodegenContext3. Produce-Consume Pattern 接下来详细介绍。 接口/类 四个核心接口: 1. CodegenSupport(接口)实现该接口的Operator可以将自己的逻辑拼成java代码。重要方法: produce() // 输出本节点产出Row的java代码 consume() // 输出本节点消费上游节点输入的Row的java代码 实现类包括但不限于: ProjectExec, FilterExec, HashAggregateExec, SortMergeJoinExec。2. WholeStageCodegenExec(类)CodegenSupport的实现类之一,Stage内部所有相邻的实现CodegenSupport接口的Operator的融合,产出的代码把所有被融合的Operator的执行逻辑封装到一个Wrapper类中,该Wrapper类作为Janino即时compile的入参。3. InputAdapter(类)CodegenSupport的实现类之一,胶水类,用来连接WholeStageCodegenExec节点和未实现CodegenSupport的上游节点。4. BufferedRowIterator(接口)WholeStageCodegenExec生成的java代码的父类,重要方法: public InternalRow next() // 返回下一条Row public void append(InternalRow row) // append一条Row CodegenContext 管理生成代码的核心类。主要涵盖以下功能: 1.命名管理。保证同一Scope内无变量名冲突。2.变量管理。维护类变量,判断变量类型(应该声明为独立变量还是压缩到类型数组中),维护变量初始化逻辑等。3.方法管理。维护类方法。4.内部类管理。维护内部类。5.相同表达式管理。维护相同子表达式,避免重复计算。6.size管理。避免方法、类size过大,避免类变量数过多,进行比较拆分。如把表达式块拆分成多个函数;把函数、变量定义拆分到多个内部类。7.依赖管理。维护该类依赖的外部对象,如Broadcast对象、工具对象、度量对象等。8.通用模板管理。提供通用代码模板,如genComp, nullSafeExec等。 Produce-Consume Pattern 相邻Operator通过Produce-Consume模式生成代码。Produce生成整体处理的框架代码,例如aggregation生成的代码框架如下: if (!initialized) { # create a hash map, then build the aggregation hash map # call child.produce() initialized = true; } while (hashmap.hasNext()) { row = hashmap.next(); # build the aggregation results # create variables for results # call consume(), which will call parent.doConsume() if (shouldStop()) return; } Consume生成当前节点处理上游输入的Row的逻辑。如Filter生成代码如下: # code to evaluate the predicate expression, result is isNull1 and value2 if (!isNull1 && value2) { # call consume(), which will call parent.doConsume() } 下图比较清晰地展示了WholestageCodegen生成java代码的call graph: Case Study的示例,生成的真实代码如下: == Subtree 1 / 2 == *(2) HashAggregate(keys=[], functions=[count(1)], output=[count(1)#326L]) +- Exchange SinglePartition +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], output=[count#329L]) +- *(1) Project +- *(1) Filter (isnotnull(ss_item_sk#13L) && (ss_item_sk#13L = 1000)) +- *(1) FileScan parquet [ss_item_sk#13L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/home/admin/zhoukeyong/workspace/tpc/tpcds/data/parquet/10/store_sales/par..., PartitionFilters: [], PushedFilters: [IsNotNull(ss_item_sk), EqualTo(ss_item_sk,1000)], ReadSchema: struct<ss_item_sk:bigint> Generated code: /* 001 */ public Object generate(Object[] references) { /* 002 */ return new GeneratedIteratorForCodegenStage2(references); /* 003 */ } /* 004 */ /* 005 */ // codegenStageId=2 /* 006 */ final class GeneratedIteratorForCodegenStage2 extends org.apache.spark.sql.execution.BufferedRowIterator { /* 007 */ private Object[] references; /* 008 */ private scala.collection.Iterator[] inputs; /* 009 */ private boolean agg_initAgg_0; /* 010 */ private boolean agg_bufIsNull_0; /* 011 */ private long agg_bufValue_0; /* 012 */ private scala.collection.Iterator inputadapter_input_0; /* 013 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] agg_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[1]; /* 014 */ /* 015 */ public GeneratedIteratorForCodegenStage2(Object[] references) { /* 016 */ this.references = references; /* 017 */ } /* 018 */ /* 019 */ public void init(int index, scala.collection.Iterator[] inputs) { /* 020 */ partitionIndex = index; /* 021 */ this.inputs = inputs; /* 022 */ /* 023 */ inputadapter_input_0 = inputs[0]; /* 024 */ agg_mutableStateArray_0[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0); /* 025 */ /* 026 */ } /* 027 */ /* 028 */ private void agg_doAggregateWithoutKey_0() throws java.io.IOException { /* 029 */ // initialize aggregation buffer /* 030 */ agg_bufIsNull_0 = false; /* 031 */ agg_bufValue_0 = 0L; /* 032 */ /* 033 */ while (inputadapter_input_0.hasNext() && !stopEarly()) { /* 034 */ InternalRow inputadapter_row_0 = (InternalRow) inputadapter_input_0.next(); /* 035 */ long inputadapter_value_0 = inputadapter_row_0.getLong(0); /* 036 */ /* 037 */ agg_doConsume_0(inputadapter_row_0, inputadapter_value_0); /* 038 */ if (shouldStop()) return; /* 039 */ } /* 040 */ /* 041 */ } /* 042 */ /* 043 */ private void agg_doConsume_0(InternalRow inputadapter_row_0, long agg_expr_0_0) throws java.io.IOException { /* 044 */ // do aggregate /* 045 */ // common sub-expressions /* 046 */ /* 047 */ // evaluate aggregate function /* 048 */ long agg_value_3 = -1L; /* 049 */ agg_value_3 = agg_bufValue_0 + agg_expr_0_0; /* 050 */ // update aggregation buffer /* 051 */ agg_bufIsNull_0 = false; /* 052 */ agg_bufValue_0 = agg_value_3; /* 053 */ /* 054 */ } /* 055 */ /* 056 */ protected void processNext() throws java.io.IOException { /* 057 */ while (!agg_initAgg_0) { /* 058 */ agg_initAgg_0 = true; /* 059 */ long agg_beforeAgg_0 = System.nanoTime(); /* 060 */ agg_doAggregateWithoutKey_0(); /* 061 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[1] /* aggTime */).add((System.nanoTime() - agg_beforeAgg_0) / 1000000); /* 062 */ /* 063 */ // output the result /* 064 */ /* 065 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(1); /* 066 */ agg_mutableStateArray_0[0].reset(); /* 067 */ /* 068 */ agg_mutableStateArray_0[0].zeroOutNullBytes(); /* 069 */ /* 070 */ agg_mutableStateArray_0[0].write(0, agg_bufValue_0); /* 071 */ append((agg_mutableStateArray_0[0].getRow())); /* 072 */ } /* 073 */ } /* 074 */ /* 075 */ } == Subtree 2 / 2 == *(1) HashAggregate(keys=[], functions=[partial_count(1)], output=[count#329L]) +- *(1) Project +- *(1) Filter (isnotnull(ss_item_sk#13L) && (ss_item_sk#13L = 1000)) +- *(1) FileScan parquet [ss_item_sk#13L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/home/admin/zhoukeyong/workspace/tpc/tpcds/data/parquet/10/store_sales/par..., PartitionFilters: [], PushedFilters: [IsNotNull(ss_item_sk), EqualTo(ss_item_sk,1000)], ReadSchema: struct<ss_item_sk:bigint> Generated code: /* 001 */ public Object generate(Object[] references) { /* 002 */ return new GeneratedIteratorForCodegenStage1(references); /* 003 */ } /* 004 */ /* 005 */ // codegenStageId=1 /* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator { /* 007 */ private Object[] references; /* 008 */ private scala.collection.Iterator[] inputs; /* 009 */ private boolean agg_initAgg_0; /* 010 */ private boolean agg_bufIsNull_0; /* 011 */ private long agg_bufValue_0; /* 012 */ private long scan_scanTime_0; /* 013 */ private boolean outputMetaColumns; /* 014 */ private int scan_batchIdx_0; /* 015 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] scan_mutableStateArray_3 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[3]; /* 016 */ private org.apache.spark.sql.vectorized.ColumnarBatch[] scan_mutableStateArray_1 = new org.apache.spark.sql.vectorized.ColumnarBatch[1]; /* 017 */ private scala.collection.Iterator[] scan_mutableStateArray_0 = new scala.collection.Iterator[1]; /* 018 */ private org.apache.spark.sql.execution.vectorized.OffHeapColumnVector[] scan_mutableStateArray_2 = new org.apache.spark.sql.execution.vectorized.OffHeapColumnVector[1]; /* 019 */ /* 020 */ public GeneratedIteratorForCodegenStage1(Object[] references) { /* 021 */ this.references = references; /* 022 */ } /* 023 */ /* 024 */ public void init(int index, scala.collection.Iterator[] inputs) { /* 025 */ partitionIndex = index; /* 026 */ this.inputs = inputs; /* 027 */ /* 028 */ scan_mutableStateArray_0[0] = inputs[0]; /* 029 */ outputMetaColumns = false; /* 030 */ scan_mutableStateArray_3[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0); /* 031 */ scan_mutableStateArray_3[1] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0); /* 032 */ scan_mutableStateArray_3[2] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0); /* 033 */ /* 034 */ } /* 035 */ /* 036 */ private void agg_doAggregateWithoutKey_0() throws java.io.IOException { /* 037 */ // initialize aggregation buffer /* 038 */ agg_bufIsNull_0 = false; /* 039 */ agg_bufValue_0 = 0L; /* 040 */ /* 041 */ if (scan_mutableStateArray_1[0] == null) { /* 042 */ scan_nextBatch_0(); /* 043 */ } /* 044 */ while (scan_mutableStateArray_1[0] != null) { /* 045 */ int scan_numRows_0 = scan_mutableStateArray_1[0].numRows(); /* 046 */ int scan_localEnd_0 = scan_numRows_0 - scan_batchIdx_0; /* 047 */ for (int scan_localIdx_0 = 0; scan_localIdx_0 < scan_localEnd_0; scan_localIdx_0++) { /* 048 */ int scan_rowIdx_0 = scan_batchIdx_0 + scan_localIdx_0; /* 049 */ if (!scan_mutableStateArray_1[0].validAt(scan_rowIdx_0)) { continue; } /* 050 */ do { /* 051 */ boolean scan_isNull_0 = scan_mutableStateArray_2[0].isNullAt(scan_rowIdx_0); /* 052 */ long scan_value_0 = scan_isNull_0 ? -1L : (scan_mutableStateArray_2[0].getLong(scan_rowIdx_0)); /* 053 */ /* 054 */ if (!(!scan_isNull_0)) continue; /* 055 */ /* 056 */ boolean filter_value_2 = false; /* 057 */ filter_value_2 = scan_value_0 == 1000L; /* 058 */ if (!filter_value_2) continue; /* 059 */ /* 060 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[2] /* numOutputRows */).add(1); /* 061 */ /* 062 */ agg_doConsume_0(); /* 063 */ /* 064 */ } while(false); /* 065 */ // shouldStop check is eliminated /* 066 */ } /* 067 */ scan_batchIdx_0 = scan_numRows_0; /* 068 */ scan_mutableStateArray_1[0] = null; /* 069 */ scan_nextBatch_0(); /* 070 */ } /* 071 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[1] /* scanTime */).add(scan_scanTime_0 / (1000 * 1000)); /* 072 */ scan_scanTime_0 = 0; /* 073 */ /* 074 */ } /* 075 */ /* 076 */ private void scan_nextBatch_0() throws java.io.IOException { /* 077 */ long getBatchStart = System.nanoTime(); /* 078 */ if (scan_mutableStateArray_0[0].hasNext()) { /* 079 */ scan_mutableStateArray_1[0] = (org.apache.spark.sql.vectorized.ColumnarBatch)scan_mutableStateArray_0[0].next(); /* 080 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(scan_mutableStateArray_1[0].numRows()); /* 081 */ scan_batchIdx_0 = 0; /* 082 */ scan_mutableStateArray_2[0] = (org.apache.spark.sql.execution.vectorized.OffHeapColumnVector) (outputMetaColumns ? /* 083 */ scan_mutableStateArray_1[0].column(0, true) : scan_mutableStateArray_1[0].column(0)); /* 084 */ /* 085 */ } /* 086 */ scan_scanTime_0 += System.nanoTime() - getBatchStart; /* 087 */ } /* 088 */ /* 089 */ private void agg_doConsume_0() throws java.io.IOException { /* 090 */ // do aggregate /* 091 */ // common sub-expressions /* 092 */ /* 093 */ // evaluate aggregate function /* 094 */ long agg_value_1 = -1L; /* 095 */ agg_value_1 = agg_bufValue_0 + 1L; /* 096 */ // update aggregation buffer /* 097 */ agg_bufIsNull_0 = false; /* 098 */ agg_bufValue_0 = agg_value_1; /* 099 */ /* 100 */ } /* 101 */ /* 102 */ protected void processNext() throws java.io.IOException { /* 103 */ while (!agg_initAgg_0) { /* 104 */ agg_initAgg_0 = true; /* 105 */ long agg_beforeAgg_0 = System.nanoTime(); /* 106 */ agg_doAggregateWithoutKey_0(); /* 107 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[4] /* aggTime */).add((System.nanoTime() - agg_beforeAgg_0) / 1000000); /* 108 */ /* 109 */ // output the result /* 110 */ /* 111 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[3] /* numOutputRows */).add(1); /* 112 */ scan_mutableStateArray_3[2].reset(); /* 113 */ /* 114 */ scan_mutableStateArray_3[2].zeroOutNullBytes(); /* 115 */ /* 116 */ scan_mutableStateArray_3[2].write(0, agg_bufValue_0); /* 117 */ append((scan_mutableStateArray_3[2].getRow())); /* 118 */ } /* 119 */ } /* 120 */ /* 121 */ }
2022年01月
2019年11月