一文解析 ODPS SQL 任务优化方法原理

简介: 本文重点尝试从ODPS SQL的逻辑执行计划和Logview中的执行计划出发,分析日常数据研发过程中各种优化方法背后的原理,覆盖了部分调优方法的分析,从知道怎么优化,到为什么这样优化,以及还能怎样优化。

一、背景


使用ODPS SQL进行离线数据研发时,开发同学不可避免会碰到任务性能问题,需要经常对ODPS SQL执行任务进行调优,以对重点场景任务产出时效进行保障,避免资源浪费。调优过程需要参考相关优化文档资料,发现技术网站中有很多文章介绍到相关的优化方法,但从ODPS底层执行计划来解释为什么要这样做优化以及背后的依据是什么的介绍文章比较少。本文尝试从ODPS底层逻辑计划拆解部分优化方法对应的优化原理,从知道怎么优化,到为什么这样优化,以及还能怎样优化。

二、ODPS基础架构


本节直接略过MAXCOMPUTE基本信息介绍,直接进入相关架构描述。


2 ODPS架构

image.png



ODPS按照功能逻辑划分为接入层、逻辑层、存储/计算层,对应着集群功能则是接入层、控制集群、计算集群。

  • ODPS接入层的最上层是通过LVS实现负载均衡,把请求发送给HTTP Server,该请求包括用户的AccessID和MD5签名信息,HTTP Server在接收到请求后,会把AccessID和MD5签名发给云账号服务进行用户认证,认证通过后,云账号服务会返回该用户的唯一AccountID,在后续执行逻辑中,发送的请求都是包含该AccountID,而不是AccessID。
  • 逻辑层又称作控制层,是MaxCompute的核心部分。实现用户空间和对象的管理、命令的解析与执行逻辑、数据对象的访问控制与授权等功能。在逻辑层有Worker、Scheduler和Executor三个角色:
  • Worker处理所有的RESTful请求,它可以本地处理一些作业,如对用户空间、表、资源、作业等的管理;而对于需要执行分布式计算的作业,如SQL、MR等,Worker会进一步把它提交给Scheduler处理;
  • Scheduler负责instance的调度,它会维护一个Instance列表,并把Instance分解成各个Task,生成这些Task的工作流——DAG图(Directed Acyclic Graph,有向无环图),把可以运行的Task放到TaskPool中,TaskPool是个优先级队列,后台线程会定时对该优先级队列进行排序;此外,Scheduler还会查询计算集群的资源状况,向计算集群的Fuxi master询问资源占用情况以进行流控(Fuxi slot满的时候,停止响应Executor的task申请)。
  • Executor会判断自身资源情况,如CPU、内存、正在运行的Task数(不能超过上限),如果资源满足,则会主动轮询Scheduler的TaskPool请求获取下一个Task,TaskPool会根据Task的优先级和计算集群的资源情况,把相应Task提交给Executor,Executor获取到Task后,会生成计算层的分布式作业描述文件,提交给计算层,监控这些任务的运行状态,并定时把状态汇报给Scheduler。

简单地说,当用户提交一个ODPS作业请求时,接入层先进行用户认证,然后发送给控制层的Worker,Worker判断是否为同步请求,如果为同步请求,则本地执行并返回。如果是异步请求,Worker会先做些检查(如表是否存在,版本号是否最新等),生成InstanceID,把请求进一步发送给Scheduler,并返回给客户端。Scheduler把作业分解成各个Task,Executor主动轮询Scheduler,获取相应Task,提交给计算层执行,并定时将自己持有的Task的状态汇报给Scheduler。

  • 计算层就是飞天内核(Apsara Core),运行在和控制层相互独立的计算集群上。包括Pangu(分布式文件系统)、Fuxi(资源调度系统)、Nuwa/ZK(Naming服务)、Shennong(监控模块)等。MaxCompute中的元数据存储在阿里云计算的另一个开放服务OTS(Open Table Service,开放结构化数据服务)中,元数据内容主要包括用户空间元数据、Table/Partition Schema、ACL、Job元数据、安全体系等。

三、ODPS基础概念



3 ODPS作业概念


官方文档描述ODPS元数据模型:

image.png

image.png

通常情况下,一个odps job对应一个odps instance(会产生一个instance_id), 一个odps instance对应一个odps task, 一个odps task对应一个活多个fuxi job,一个fuxi job可以基于DAG被拆分为多个类型的task如map、reduce和joiner。一个odps instance对应两个fuxi job的case(小文件合并):

image.png

四、ODPS运行时监控



4 Logview2.0框架


参考链接:https://help.aliyun.com/zh/maxcompute/user-guide/use-logview-v2-0-to-view-job-information后续的任务调优都会基于logview2.0作业运行时监控进行。

五、ODPS执行计划



5.1SQL执行顺序


5.1.1SQL执行顺序

image.png

通用的SQL 逻辑算子:

image.png

5.1.2SHUFFLE概念

目前大部分的sql性能问题都会和Shuffle强相关,本节重点介绍shuffle基本概念。目前基本所有的SQL优化问题都会涉及到Shuffle过程,所以先来了解Shuffle的原理,参考Hadoop Shuffle过程原理(Hadoop权威指南):在Hadoop中数据从Map阶段传递给Reduce阶段的过程就叫Shuffle,Shuffle机制是整个MApReduce框架中最核心的部分。

image.png

image.png

5.1.3ODPS SQL逻辑执行计划算子

  • ODPS SQL Task Operator结构(截取自ODPS官方文档)


image.png

注:可在ODPS SQL前添加EXPLAIN 执行流程得到,EXPLAIN主要有以下的作用:

1、检查SQL语法;

2、检查读取的表和分区是否符合预期,这样可以排除掉很多分区读错的尴尬;

3、检查mapreduce运行结构是否符合预期,检查mapjoin等特性有没有生效;

  • 各operator算子含义

image.png


5.2离线ODPS SQL优化方法分析


基于上面的介绍的基本概念,本小节基于ODPS SQL的Explain功能查询静态SQL的逻辑执行计划,分析SQL任务优化前后的差异,结合任务实际运行过程中Logview的监控输出,分析给出任务优化生效的原因。

5.2.1Multi Distinct优化分析

image.png

技术网站文章中有大量介绍Multi-Distinct问题的优化方法,先从下面的执行计划来看下。

CASE1:不带Distinct的Count算子使用


EXPLAIN SELECT  app_id
        ,count(user_id)
FROM    xxx.table_vst_user_test
WHERE   dt = '20230816'
GROUP BY app_id;

逻辑执行计划:

image.png

分析:可以看到在Map Task输出阶段,会以app_id字段进行Hash分区传输,输出的临时结果是 app_id和__agg_0_count字段,数据已经预聚合,不存在带有user_id的明细数据shuffle传输,所以任务运行速度较快。

CASE2:带Distinct的Count算子使用


EXPLAIN SELECT  app_id
        ,count(DISTINCT user_id)
FROM    xxx.table_vst_user_test
WHERE   dt = '20230816'
GROUP BY app_id;

image.png

分析:可以看到在Map Task输出阶段,会以app_id字段进行Hash分区传输,输出的临时结果值是 app_id&user_id。Map Task输出的中间结果无法预聚合,需要将带有user_id的明细数据传输,所以运行速度较慢,如果某个小程序对应的访问用户量较大时,极易在Reducer阶段产生数据倾斜。

CASE3:带多Distinct的Count算子使用


EXPLAIN 
SELECT  app_id
        ,count(DISTINCT user_id)
        ,count(DISTINCT cy23_source_name_l1)
        ,count(DISTINCT cy23_source_name_l2)
        ,count(DISTINCT cy23_source_name_l3)
        ,count(DISTINCT cy23_source_name_l4)
FROM    xxx.table_vst_user_test
WHERE   dt = '20230816'
GROUP BY app_id;

image.png

分析:可以看到在Map Task输出阶段,还是会以app_id字段进行Hash分区传输,输出的临时结果值是 app_id&user_id&cy23_source_name_l1&cy23_source_name_l2&cy23_source_name_l3&cy23_source_name_l4。Map Task输出的中间结果无法预聚合,需要将带有user_id及其他的待去重字段的明细数据传输,字段越多,数据传输量越大,所以运行速度较慢,如果某个小程序对应的访问用户量较大时,极易在Reducer阶段产生数据倾斜。

CASE4:带Distinct的Count算子的优化代码(该CASE是对CASE2的代码优化)


EXPLAIN 
SELECT  app_id
        ,COUNT(user_id)
FROM
(
    SELECT  app_id 
           ,user_id
    FROM    xxx.table_vst_user_test
    WHERE   dt = '${bizdate}'
    GROUP BY app_id
            ,user_id
)t
GROUP BY app_id

image.png

image.png

分析:优化后的代码,在逻辑计划里多增加了一个Reducer阶段,但在MAP Task的输出阶段,从原先的以app_id进行Hash分区改为了以app_id&user_id进行Hash分区,可以避免数据在传输到Reduce阶段因为热点数据导致的数据倾斜。在第一个Reducer执行阶段,会对Map段传输的数据进行预聚合,不存在带有明细字段的数据向下一个Reducer阶段传输,避免了数据倾斜的发生。整体来看,该优化方法,没有减少Shuffle过程中的明细数据传输,只是对于Map Task的Hash字段从app_id调整为app_id和user_id,减少了热点数据聚集的可能,通过增加计算阶段进行运行时间的优化。

5.2.2系统参数odps.sql.groupby.skewindata=True分析

image.png

CASE1:带Distinct的Count算子使用


EXPLAIN 
SELECT  app_id 
       ,COUNT(DISTINCT user_id)
FROM    xxx.table_vst_user_test
WHERE   dt = '${bizdate}'
GROUP BY app_id

image.png

分析:同5.2.1中的CASE2

CASE2:Case1代码前加入系统优化参数


SET odps.sql.groupby.skewindata = true;

EXPLAIN 
SELECT  app_id
        ,COUNT(DISTINCT user_id)
FROM    xxx.table_vst_user_test
WHERE   dt = '${bizdate}'
GROUP BY app_id

image.png

image.png

分析:可以看到加入系统优化参数后的逻辑执行计划同5.2.1中的Case4,优化后,Map阶段的输出,app_id进行Hash分区改为了以app_id&user_id进行Hash分区,避免热点数据的聚集,通过增加计算阶段进行运行时间的优化。

5.2.3.Join(Map Join/Inner Join/Left Join)

CASE1:大小表关联(SortMergeJoin)

EXPLAIN SELECT  mini_cat_name_l1
        ,COUNT(DISTINCT user_id)
FROM    (
            --主表
            SELECT  app_id
                    ,user_id
            FROM    xxx.table_vst_user_test
            WHERE   dt = '20230816'
            GROUP BY app_id
                    ,user_id
        ) t1
LEFT JOIN   (
                --维表
                SELECT  app_id
                        ,mini_cat_name_l1
                FROM    xxx.dim_category
                WHERE   dt = '20230816'
            ) t2
ON      t1.app_id = t2.app_id
GROUP BY mini_cat_name_l1;

image.png

image.png

image.png

image.png

image.png

下图来自Logview中的执行计划:

image.png

J4_1_3内部结构:

image.png

分析:逻辑执行计划中,M1阶段,主要针对右表小程序维表xxx.dim_category进行数据加工提取,由于左右表关联的Key是app_id,所以Hash分区的key也是app_id,输出的中间结果是app_id&mini_cate_name_l1。M2阶段,针对访问事件表xxx.table_vst_user_test进行数据加工,Hash分区的key是app_id&user_id,输出的中间结果是是app_id&user_id,因为Hash分区的key是app_id&user_id,所以在R3_2阶段执行时,不存在热点数据聚集导致的数据倾斜。但参看逻辑执行计划,R3_2的输出会以app_id作为Hash key进行数据传输,数据会在J4_1_3阶段进行整合,并跟M1阶段的小程序维表数据进行MergeJoin,存在数据倾斜的可能。同时在R5_4阶段,Hash分区key是mini_cate_name_l1,不同的行业类目下的用户量差异较大,也会存在可能的数据倾斜。基于Logview的执行计划,可以看到两表关联使用的是MergeJoin的算法(参考上图)。

Sort Merge Join算法原理:

image.png

image.png

算法执行过程:1. Shuffle阶段:将两张表根据join key进行重新分区,两张表数据会分布到整个集群,以便分布式并行处理;2. Sort阶段:对单个分区节点的两表数据,分别进行排序;3. Merge阶段:对排好序的两张分区表数据执行join操作。join操作很简单,分别遍历两个有序序列,碰到相同join key就merge输出,如果不同,左边小就继续取左边,反之取右边(即用即取即丢),见下图示意:

image.png

可以看出,无论分区有多大,Sort Merge Join都不用把某一侧的数据全部加载到内存中,而是即用即取即丢。

CASE2:大小表关联使用mapjoin hint(BroadcastHashJoin)

EXPLAIN SELECT  /*+mapjoin(t2)*/mini_cat_name_l1
        ,COUNT(DISTINCT user_id)
FROM    (
            SELECT  app_id
                    ,user_id
            FROM    xxx.table_vst_user_test
            WHERE   dt = '20230816'
            GROUP BY app_id
                    ,user_id
        ) t1
LEFT JOIN   (
                SELECT  app_id
                        ,mini_cat_name_l1
                FROM    xxx.dim_category
                WHERE   dt = '20230816'
            ) t2
ON      t1.app_id = t2.app_id
GROUP BY mini_cat_name_l1;

image.png

image.png

image.png

image.png

image.png

分析:逻辑执行计划中,M1阶段,主要针对右表小程序维表xxx.dim_category进行数据加工提取,由于左右表关联的Key是app_id,但明确使用的是Mapjoin,所以不存在Hash分区字段,输出的中间结果是app_id&mini_cate_name_l1。M2阶段,针对访问事件表xxx.table_vst_user_test进行数据加工,Hash分区的key是app_id&user_id,输出的中间结果是是app_id&user_id,因为Hash分区的key是app_id&user_id,所以在J3_1_2阶段执行时,不存在热点数据聚集导致的数据倾斜。数据会在J3_1_2阶段进行整合,并跟M1阶段的小程序维表数据进行Broadcast Hash Join。同时在R4_3阶段,Hash分区key是mini_cate_name_l1,不同的行业类目下的用户量差异较大,也会存在可能的数据倾斜。基于逻辑执行计划和Logview的执行计划,可以看到两表关联使用的是BroadcastHashJoin的算法。可以看到针对Case1的代码进行优化后,两表关联算法从SortMergeJoin改为了BroadcastHashJoin,特定场景下,减少了可能的数据倾斜,利用资源空间换时间。下图来自Logview中的执行计划:

image.png

Broadcast Hash Join算法:

SparkSQL中broadcast hash join定义:是将其中一张小表广播分发到大表所在的所有节点上,供打标使用。executor存储小表的全部数据,一定程度上牺牲了空间,换区shuffle操作大量的耗时。

image.png

HashJoin的伪代码逻辑:

image.png

CASE3:大小表关联使用distributed mapjoin hint


EXPLAIN SELECT  /*+mapjoin(t2)*/mini_cat_name_l1
        ,COUNT(DISTINCT user_id)
FROM    (
            SELECT  app_id
                    ,user_id
            FROM    xxx.table_vst_user_test
            WHERE   dt = '20230816'
            GROUP BY app_id
                    ,user_id
        ) t1
LEFT JOIN   (
                SELECT  app_id
                        ,mini_cat_name_l1
                FROM    xxx.dim_category
                WHERE   dt = '20230816'
            ) t2
ON      t1.app_id = t2.app_id
GROUP BY mini_cat_name_l1;

image.png

image.png

image.png

image.png

下图来自Logview中的执行计划:

image.png

J4_2_3内部结构

image.png

基于可以看到在Join Task中,使用的是DistributeMapJoin算法。分析:Case1中的执行计划为原执行计划,M1是小表,上图为使用Distributed MapJoin之后的Plan。

  • 小表一侧分为M1,R2_1 两个Stage。M1阶段读表并进行Shuffle,Shuffle的过程将数据分片(shard=2),使得具有相同hash value的数据分发到同一个worker。R2_1(HashTableBuilder1)作为server端,完成HashTable的构建并常驻内存,接受client端(J4_2_3 DistributedMapJoin1)请求完成Lookup查询并返回values。多个shard共同组合成一个分布式的hash table services,shard数量可以手动调整。各shard的service一旦启动,需要等待client端(DistributedMapJoin1)完成所有的request请求后才stop。
  • 大表一侧为Stage M3和J4_2_3。J4_2_3(DistributedMapJoin1)作为client端,通过网络传输方式将大表端的join keys,分batches往server端(HashTableBuilder1)发起request请求并获取返回values。由于server端的数据已经按照hash value分shard,client端可以根据数据的特征只请求特定的shard。

相比于原Query,使用Distributed MapJoin后,大表侧需要通过RPC建立网络通讯获取小表侧HashTable查询返回的数据,建议大表数据量应该远大于小表,否则带来的收益有限,甚至有可能因为网络的波动导致性能回退。从硬件发展趋势来看,相比于网络带宽,磁盘IO往往更容易成为瓶颈,所以长远看更有益,但是现阶段使用Distributed MapJoin时,要求大表应远大于小表数据量。注意,本case仅仅是为了对DistributedMapJoin的逻辑执行计划进行分析,与CASE1进行对比,该优化方法不一定适用该测试sql语句。具体适用场景及用法请查询参考资料章节中的DistributedMapJoin链接。

六、总结


本文重点尝试从ODPS SQL的逻辑执行计划和Logview中的执行计划出发,分析日常数据研发过程中各种优化方法背后的原理,精力有限,仅覆盖了部分调优方法的分析,希望能给大家日常SQL优化工作带来一些启发。由于掌握的ODPS底层执行原理资料有限且线上生产环境HBO对于执行计划有影响,存在理解不完全正确的可能,望读者谅解。

参考资料:

作者 | 博暄

来源 | 阿里云开发者公众号

相关文章
|
1天前
|
域名解析 存储 缓存
HTTP请求流程概览:浏览器构建请求行含方法、URL和版本;检查缓存;解析IP与端口
【6月更文挑战第23天】 HTTP请求流程概览:浏览器构建请求行含方法、URL和版本;检查缓存;解析IP与端口;TCP连接(HTTP/1.1可能需排队);三次握手;发送请求头与体;服务器处理并返回响应;TCP连接可能关闭或保持;浏览器接收并显示响应,更新缓存。HTTP版本间有差异。
12 5
|
1天前
|
SQL 分布式计算 大数据
MaxCompute产品使用问题之等待上游执行的任务是否会占用资源组的资源
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
1天前
|
机器学习/深度学习 分布式计算 大数据
MaxCompute产品使用问题之ods层离线同步任务,数据源的一张表新增了字段。如何更改可以不影响当前节点和下游任务的运行
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
1天前
|
SQL 分布式计算 DataWorks
MaxCompute产品使用问题之任务修改后提交,会什么时候生效
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
1天前
|
分布式计算 DataWorks 大数据
MaxCompute产品使用问题之DataWorks整库全增量同步任务的源库如果新增了表,如何能将这个表快速同步进maxcompute
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
1天前
|
分布式计算 大数据 关系型数据库
MaxCompute产品使用问题之同步任务为什么默认访问的是生产环境
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
1天前
|
分布式计算 DataWorks 大数据
MaxCompute产品使用问题之如何设置超时就自动结束一个任务
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
20小时前
|
机器学习/深度学习 算法 数据挖掘
算法金 | K-均值、层次、DBSCAN聚类方法解析
**摘要:** 这篇文章介绍了聚类分析的基本概念和几种主要的聚类算法。聚类是无监督学习中用于发现数据内在结构的技术,常用于市场分析、图像分割等场景。K-均值是一种基于划分的算法,简单高效但易受初始值影响;层次聚类包括凝聚和分裂方式,形成层次结构但计算复杂;DBSCAN基于密度,能处理任意形状的簇,但参数选择敏感。文章还讨论了这些算法的优缺点和适用场景,并提供了相关资源链接和Python实现。
19 9
算法金 | K-均值、层次、DBSCAN聚类方法解析
|
10天前
|
存储 安全 Java
深入理解Java中的ThreadLocal机制:原理、方法与使用场景解析
深入理解Java中的ThreadLocal机制:原理、方法与使用场景解析
20 2
|
22小时前
|
分布式计算 大数据 数据处理
MaxCompute操作报错合集之出现无法在 information_schema.TASKS_HISTORY 表中查询到特定类型的 DI 上线任务记录,该怎么办?
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。

相关产品

  • 云原生大数据计算服务 MaxCompute
  • 推荐镜像

    更多