一、背景
使用ODPS SQL进行离线数据研发时,开发同学不可避免会碰到任务性能问题,需要经常对ODPS SQL执行任务进行调优,以对重点场景任务产出时效进行保障,避免资源浪费。调优过程需要参考相关优化文档资料,发现技术网站中有很多文章介绍到相关的优化方法,但从ODPS底层执行计划来解释为什么要这样做优化以及背后的依据是什么的介绍文章比较少。本文尝试从ODPS底层逻辑计划拆解部分优化方法对应的优化原理,从知道怎么优化,到为什么这样优化,以及还能怎样优化。
二、ODPS基础架构
本节直接略过MAXCOMPUTE基本信息介绍,直接进入相关架构描述。
2 ODPS架构
ODPS按照功能逻辑划分为接入层、逻辑层、存储/计算层,对应着集群功能则是接入层、控制集群、计算集群。
- ODPS接入层的最上层是通过LVS实现负载均衡,把请求发送给HTTP Server,该请求包括用户的AccessID和MD5签名信息,HTTP Server在接收到请求后,会把AccessID和MD5签名发给云账号服务进行用户认证,认证通过后,云账号服务会返回该用户的唯一AccountID,在后续执行逻辑中,发送的请求都是包含该AccountID,而不是AccessID。
- 逻辑层又称作控制层,是MaxCompute的核心部分。实现用户空间和对象的管理、命令的解析与执行逻辑、数据对象的访问控制与授权等功能。在逻辑层有Worker、Scheduler和Executor三个角色:
- Worker处理所有的RESTful请求,它可以本地处理一些作业,如对用户空间、表、资源、作业等的管理;而对于需要执行分布式计算的作业,如SQL、MR等,Worker会进一步把它提交给Scheduler处理;
- Scheduler负责instance的调度,它会维护一个Instance列表,并把Instance分解成各个Task,生成这些Task的工作流——DAG图(Directed Acyclic Graph,有向无环图),把可以运行的Task放到TaskPool中,TaskPool是个优先级队列,后台线程会定时对该优先级队列进行排序;此外,Scheduler还会查询计算集群的资源状况,向计算集群的Fuxi master询问资源占用情况以进行流控(Fuxi slot满的时候,停止响应Executor的task申请)。
- Executor会判断自身资源情况,如CPU、内存、正在运行的Task数(不能超过上限),如果资源满足,则会主动轮询Scheduler的TaskPool请求获取下一个Task,TaskPool会根据Task的优先级和计算集群的资源情况,把相应Task提交给Executor,Executor获取到Task后,会生成计算层的分布式作业描述文件,提交给计算层,监控这些任务的运行状态,并定时把状态汇报给Scheduler。
简单地说,当用户提交一个ODPS作业请求时,接入层先进行用户认证,然后发送给控制层的Worker,Worker判断是否为同步请求,如果为同步请求,则本地执行并返回。如果是异步请求,Worker会先做些检查(如表是否存在,版本号是否最新等),生成InstanceID,把请求进一步发送给Scheduler,并返回给客户端。Scheduler把作业分解成各个Task,Executor主动轮询Scheduler,获取相应Task,提交给计算层执行,并定时将自己持有的Task的状态汇报给Scheduler。
- 计算层就是飞天内核(Apsara Core),运行在和控制层相互独立的计算集群上。包括Pangu(分布式文件系统)、Fuxi(资源调度系统)、Nuwa/ZK(Naming服务)、Shennong(监控模块)等。MaxCompute中的元数据存储在阿里云计算的另一个开放服务OTS(Open Table Service,开放结构化数据服务)中,元数据内容主要包括用户空间元数据、Table/Partition Schema、ACL、Job元数据、安全体系等。
三、ODPS基础概念
3 ODPS作业概念
官方文档描述ODPS元数据模型:
通常情况下,一个odps job对应一个odps instance(会产生一个instance_id), 一个odps instance对应一个odps task, 一个odps task对应一个活多个fuxi job,一个fuxi job可以基于DAG被拆分为多个类型的task如map、reduce和joiner。一个odps instance对应两个fuxi job的case(小文件合并):
四、ODPS运行时监控
4 Logview2.0框架
参考链接:https://help.aliyun.com/zh/maxcompute/user-guide/use-logview-v2-0-to-view-job-information后续的任务调优都会基于logview2.0作业运行时监控进行。
五、ODPS执行计划
5.1SQL执行顺序
5.1.1SQL执行顺序
通用的SQL 逻辑算子:
5.1.2SHUFFLE概念
目前大部分的sql性能问题都会和Shuffle强相关,本节重点介绍shuffle基本概念。目前基本所有的SQL优化问题都会涉及到Shuffle过程,所以先来了解Shuffle的原理,参考Hadoop Shuffle过程原理(Hadoop权威指南):在Hadoop中数据从Map阶段传递给Reduce阶段的过程就叫Shuffle,Shuffle机制是整个MApReduce框架中最核心的部分。
5.1.3ODPS SQL逻辑执行计划算子
- ODPS SQL Task Operator结构(截取自ODPS官方文档)
注:可在ODPS SQL前添加EXPLAIN 执行流程得到,EXPLAIN主要有以下的作用:
1、检查SQL语法;
2、检查读取的表和分区是否符合预期,这样可以排除掉很多分区读错的尴尬;
3、检查mapreduce运行结构是否符合预期,检查mapjoin等特性有没有生效;
- 各operator算子含义
5.2离线ODPS SQL优化方法分析
基于上面的介绍的基本概念,本小节基于ODPS SQL的Explain功能查询静态SQL的逻辑执行计划,分析SQL任务优化前后的差异,结合任务实际运行过程中Logview的监控输出,分析给出任务优化生效的原因。
5.2.1Multi Distinct优化分析
技术网站文章中有大量介绍Multi-Distinct问题的优化方法,先从下面的执行计划来看下。
CASE1:不带Distinct的Count算子使用
EXPLAIN SELECT app_id ,count(user_id) FROM xxx.table_vst_user_test WHERE dt = '20230816' GROUP BY app_id;
逻辑执行计划:
分析:可以看到在Map Task输出阶段,会以app_id字段进行Hash分区传输,输出的临时结果是 app_id和__agg_0_count字段,数据已经预聚合,不存在带有user_id的明细数据shuffle传输,所以任务运行速度较快。
CASE2:带Distinct的Count算子使用
EXPLAIN SELECT app_id ,count(DISTINCT user_id) FROM xxx.table_vst_user_test WHERE dt = '20230816' GROUP BY app_id;
分析:可以看到在Map Task输出阶段,会以app_id字段进行Hash分区传输,输出的临时结果值是 app_id&user_id。Map Task输出的中间结果无法预聚合,需要将带有user_id的明细数据传输,所以运行速度较慢,如果某个小程序对应的访问用户量较大时,极易在Reducer阶段产生数据倾斜。
CASE3:带多Distinct的Count算子使用
EXPLAIN SELECT app_id ,count(DISTINCT user_id) ,count(DISTINCT cy23_source_name_l1) ,count(DISTINCT cy23_source_name_l2) ,count(DISTINCT cy23_source_name_l3) ,count(DISTINCT cy23_source_name_l4) FROM xxx.table_vst_user_test WHERE dt = '20230816' GROUP BY app_id;
分析:可以看到在Map Task输出阶段,还是会以app_id字段进行Hash分区传输,输出的临时结果值是 app_id&user_id&cy23_source_name_l1&cy23_source_name_l2&cy23_source_name_l3&cy23_source_name_l4。Map Task输出的中间结果无法预聚合,需要将带有user_id及其他的待去重字段的明细数据传输,字段越多,数据传输量越大,所以运行速度较慢,如果某个小程序对应的访问用户量较大时,极易在Reducer阶段产生数据倾斜。
CASE4:带Distinct的Count算子的优化代码(该CASE是对CASE2的代码优化)
EXPLAIN SELECT app_id ,COUNT(user_id) FROM ( SELECT app_id ,user_id FROM xxx.table_vst_user_test WHERE dt = '${bizdate}' GROUP BY app_id ,user_id )t GROUP BY app_id
分析:优化后的代码,在逻辑计划里多增加了一个Reducer阶段,但在MAP Task的输出阶段,从原先的以app_id进行Hash分区改为了以app_id&user_id进行Hash分区,可以避免数据在传输到Reduce阶段因为热点数据导致的数据倾斜。在第一个Reducer执行阶段,会对Map段传输的数据进行预聚合,不存在带有明细字段的数据向下一个Reducer阶段传输,避免了数据倾斜的发生。整体来看,该优化方法,没有减少Shuffle过程中的明细数据传输,只是对于Map Task的Hash字段从app_id调整为app_id和user_id,减少了热点数据聚集的可能,通过增加计算阶段进行运行时间的优化。
5.2.2系统参数odps.sql.groupby.skewindata=True分析
CASE1:带Distinct的Count算子使用
EXPLAIN SELECT app_id ,COUNT(DISTINCT user_id) FROM xxx.table_vst_user_test WHERE dt = '${bizdate}' GROUP BY app_id
分析:同5.2.1中的CASE2
CASE2:Case1代码前加入系统优化参数
SET odps.sql.groupby.skewindata = true; EXPLAIN SELECT app_id ,COUNT(DISTINCT user_id) FROM xxx.table_vst_user_test WHERE dt = '${bizdate}' GROUP BY app_id
分析:可以看到加入系统优化参数后的逻辑执行计划同5.2.1中的Case4,优化后,Map阶段的输出,app_id进行Hash分区改为了以app_id&user_id进行Hash分区,避免热点数据的聚集,通过增加计算阶段进行运行时间的优化。
5.2.3.Join(Map Join/Inner Join/Left Join)
CASE1:大小表关联(SortMergeJoin)
EXPLAIN SELECT mini_cat_name_l1 ,COUNT(DISTINCT user_id) FROM ( --主表 SELECT app_id ,user_id FROM xxx.table_vst_user_test WHERE dt = '20230816' GROUP BY app_id ,user_id ) t1 LEFT JOIN ( --维表 SELECT app_id ,mini_cat_name_l1 FROM xxx.dim_category WHERE dt = '20230816' ) t2 ON t1.app_id = t2.app_id GROUP BY mini_cat_name_l1;
下图来自Logview中的执行计划:
J4_1_3内部结构:
分析:逻辑执行计划中,M1阶段,主要针对右表小程序维表xxx.dim_category进行数据加工提取,由于左右表关联的Key是app_id,所以Hash分区的key也是app_id,输出的中间结果是app_id&mini_cate_name_l1。M2阶段,针对访问事件表xxx.table_vst_user_test进行数据加工,Hash分区的key是app_id&user_id,输出的中间结果是是app_id&user_id,因为Hash分区的key是app_id&user_id,所以在R3_2阶段执行时,不存在热点数据聚集导致的数据倾斜。但参看逻辑执行计划,R3_2的输出会以app_id作为Hash key进行数据传输,数据会在J4_1_3阶段进行整合,并跟M1阶段的小程序维表数据进行MergeJoin,存在数据倾斜的可能。同时在R5_4阶段,Hash分区key是mini_cate_name_l1,不同的行业类目下的用户量差异较大,也会存在可能的数据倾斜。基于Logview的执行计划,可以看到两表关联使用的是MergeJoin的算法(参考上图)。
Sort Merge Join算法原理:
算法执行过程:1. Shuffle阶段:将两张表根据join key进行重新分区,两张表数据会分布到整个集群,以便分布式并行处理;2. Sort阶段:对单个分区节点的两表数据,分别进行排序;3. Merge阶段:对排好序的两张分区表数据执行join操作。join操作很简单,分别遍历两个有序序列,碰到相同join key就merge输出,如果不同,左边小就继续取左边,反之取右边(即用即取即丢),见下图示意:
可以看出,无论分区有多大,Sort Merge Join都不用把某一侧的数据全部加载到内存中,而是即用即取即丢。
CASE2:大小表关联使用mapjoin hint(BroadcastHashJoin)
EXPLAIN SELECT /*+mapjoin(t2)*/mini_cat_name_l1 ,COUNT(DISTINCT user_id) FROM ( SELECT app_id ,user_id FROM xxx.table_vst_user_test WHERE dt = '20230816' GROUP BY app_id ,user_id ) t1 LEFT JOIN ( SELECT app_id ,mini_cat_name_l1 FROM xxx.dim_category WHERE dt = '20230816' ) t2 ON t1.app_id = t2.app_id GROUP BY mini_cat_name_l1;
分析:逻辑执行计划中,M1阶段,主要针对右表小程序维表xxx.dim_category进行数据加工提取,由于左右表关联的Key是app_id,但明确使用的是Mapjoin,所以不存在Hash分区字段,输出的中间结果是app_id&mini_cate_name_l1。M2阶段,针对访问事件表xxx.table_vst_user_test进行数据加工,Hash分区的key是app_id&user_id,输出的中间结果是是app_id&user_id,因为Hash分区的key是app_id&user_id,所以在J3_1_2阶段执行时,不存在热点数据聚集导致的数据倾斜。数据会在J3_1_2阶段进行整合,并跟M1阶段的小程序维表数据进行Broadcast Hash Join。同时在R4_3阶段,Hash分区key是mini_cate_name_l1,不同的行业类目下的用户量差异较大,也会存在可能的数据倾斜。基于逻辑执行计划和Logview的执行计划,可以看到两表关联使用的是BroadcastHashJoin的算法。可以看到针对Case1的代码进行优化后,两表关联算法从SortMergeJoin改为了BroadcastHashJoin,特定场景下,减少了可能的数据倾斜,利用资源空间换时间。下图来自Logview中的执行计划:
Broadcast Hash Join算法:
SparkSQL中broadcast hash join定义:是将其中一张小表广播分发到大表所在的所有节点上,供打标使用。executor存储小表的全部数据,一定程度上牺牲了空间,换区shuffle操作大量的耗时。
HashJoin的伪代码逻辑:
CASE3:大小表关联使用distributed mapjoin hint
EXPLAIN SELECT /*+mapjoin(t2)*/mini_cat_name_l1 ,COUNT(DISTINCT user_id) FROM ( SELECT app_id ,user_id FROM xxx.table_vst_user_test WHERE dt = '20230816' GROUP BY app_id ,user_id ) t1 LEFT JOIN ( SELECT app_id ,mini_cat_name_l1 FROM xxx.dim_category WHERE dt = '20230816' ) t2 ON t1.app_id = t2.app_id GROUP BY mini_cat_name_l1;
下图来自Logview中的执行计划:
J4_2_3内部结构
基于可以看到在Join Task中,使用的是DistributeMapJoin算法。分析:Case1中的执行计划为原执行计划,M1是小表,上图为使用Distributed MapJoin之后的Plan。
- 小表一侧分为M1,R2_1 两个Stage。M1阶段读表并进行Shuffle,Shuffle的过程将数据分片(shard=2),使得具有相同hash value的数据分发到同一个worker。R2_1(HashTableBuilder1)作为server端,完成HashTable的构建并常驻内存,接受client端(J4_2_3 DistributedMapJoin1)请求完成Lookup查询并返回values。多个shard共同组合成一个分布式的hash table services,shard数量可以手动调整。各shard的service一旦启动,需要等待client端(DistributedMapJoin1)完成所有的request请求后才stop。
- 大表一侧为Stage M3和J4_2_3。J4_2_3(DistributedMapJoin1)作为client端,通过网络传输方式将大表端的join keys,分batches往server端(HashTableBuilder1)发起request请求并获取返回values。由于server端的数据已经按照hash value分shard,client端可以根据数据的特征只请求特定的shard。
相比于原Query,使用Distributed MapJoin后,大表侧需要通过RPC建立网络通讯获取小表侧HashTable查询返回的数据,建议大表数据量应该远大于小表,否则带来的收益有限,甚至有可能因为网络的波动导致性能回退。从硬件发展趋势来看,相比于网络带宽,磁盘IO往往更容易成为瓶颈,所以长远看更有益,但是现阶段使用Distributed MapJoin时,要求大表应远大于小表数据量。注意,本case仅仅是为了对DistributedMapJoin的逻辑执行计划进行分析,与CASE1进行对比,该优化方法不一定适用该测试sql语句。具体适用场景及用法请查询参考资料章节中的DistributedMapJoin链接。
六、总结
本文重点尝试从ODPS SQL的逻辑执行计划和Logview中的执行计划出发,分析日常数据研发过程中各种优化方法背后的原理,精力有限,仅覆盖了部分调优方法的分析,希望能给大家日常SQL优化工作带来一些启发。由于掌握的ODPS底层执行原理资料有限且线上生产环境HBO对于执行计划有影响,存在理解不完全正确的可能,望读者谅解。
参考资料:
- Sort-Merge Join:https://www.sparkcodehub.com/spark-what-is-a-sort-merge-join-in-spark-sql
- Join实现原理:https://www.jianshu.com/p/97e76dddcbfb
- SparkSQL中的三种Join及实现:https://blog.csdn.net/wlk_328909605/article/details/82933552
作者 | 博暄
来源 | 阿里云开发者公众号