一文解析 ODPS SQL 任务优化方法原理

本文涉及的产品
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
简介: 本文重点尝试从ODPS SQL的逻辑执行计划和Logview中的执行计划出发,分析日常数据研发过程中各种优化方法背后的原理,覆盖了部分调优方法的分析,从知道怎么优化,到为什么这样优化,以及还能怎样优化。

一、背景


使用ODPS SQL进行离线数据研发时,开发同学不可避免会碰到任务性能问题,需要经常对ODPS SQL执行任务进行调优,以对重点场景任务产出时效进行保障,避免资源浪费。调优过程需要参考相关优化文档资料,发现技术网站中有很多文章介绍到相关的优化方法,但从ODPS底层执行计划来解释为什么要这样做优化以及背后的依据是什么的介绍文章比较少。本文尝试从ODPS底层逻辑计划拆解部分优化方法对应的优化原理,从知道怎么优化,到为什么这样优化,以及还能怎样优化。

二、ODPS基础架构


本节直接略过MAXCOMPUTE基本信息介绍,直接进入相关架构描述。


2 ODPS架构

image.png



ODPS按照功能逻辑划分为接入层、逻辑层、存储/计算层,对应着集群功能则是接入层、控制集群、计算集群。

  • ODPS接入层的最上层是通过LVS实现负载均衡,把请求发送给HTTP Server,该请求包括用户的AccessID和MD5签名信息,HTTP Server在接收到请求后,会把AccessID和MD5签名发给云账号服务进行用户认证,认证通过后,云账号服务会返回该用户的唯一AccountID,在后续执行逻辑中,发送的请求都是包含该AccountID,而不是AccessID。
  • 逻辑层又称作控制层,是MaxCompute的核心部分。实现用户空间和对象的管理、命令的解析与执行逻辑、数据对象的访问控制与授权等功能。在逻辑层有Worker、Scheduler和Executor三个角色:
  • Worker处理所有的RESTful请求,它可以本地处理一些作业,如对用户空间、表、资源、作业等的管理;而对于需要执行分布式计算的作业,如SQL、MR等,Worker会进一步把它提交给Scheduler处理;
  • Scheduler负责instance的调度,它会维护一个Instance列表,并把Instance分解成各个Task,生成这些Task的工作流——DAG图(Directed Acyclic Graph,有向无环图),把可以运行的Task放到TaskPool中,TaskPool是个优先级队列,后台线程会定时对该优先级队列进行排序;此外,Scheduler还会查询计算集群的资源状况,向计算集群的Fuxi master询问资源占用情况以进行流控(Fuxi slot满的时候,停止响应Executor的task申请)。
  • Executor会判断自身资源情况,如CPU、内存、正在运行的Task数(不能超过上限),如果资源满足,则会主动轮询Scheduler的TaskPool请求获取下一个Task,TaskPool会根据Task的优先级和计算集群的资源情况,把相应Task提交给Executor,Executor获取到Task后,会生成计算层的分布式作业描述文件,提交给计算层,监控这些任务的运行状态,并定时把状态汇报给Scheduler。

简单地说,当用户提交一个ODPS作业请求时,接入层先进行用户认证,然后发送给控制层的Worker,Worker判断是否为同步请求,如果为同步请求,则本地执行并返回。如果是异步请求,Worker会先做些检查(如表是否存在,版本号是否最新等),生成InstanceID,把请求进一步发送给Scheduler,并返回给客户端。Scheduler把作业分解成各个Task,Executor主动轮询Scheduler,获取相应Task,提交给计算层执行,并定时将自己持有的Task的状态汇报给Scheduler。

  • 计算层就是飞天内核(Apsara Core),运行在和控制层相互独立的计算集群上。包括Pangu(分布式文件系统)、Fuxi(资源调度系统)、Nuwa/ZK(Naming服务)、Shennong(监控模块)等。MaxCompute中的元数据存储在阿里云计算的另一个开放服务OTS(Open Table Service,开放结构化数据服务)中,元数据内容主要包括用户空间元数据、Table/Partition Schema、ACL、Job元数据、安全体系等。

三、ODPS基础概念



3 ODPS作业概念


官方文档描述ODPS元数据模型:

image.png

image.png

通常情况下,一个odps job对应一个odps instance(会产生一个instance_id), 一个odps instance对应一个odps task, 一个odps task对应一个活多个fuxi job,一个fuxi job可以基于DAG被拆分为多个类型的task如map、reduce和joiner。一个odps instance对应两个fuxi job的case(小文件合并):

image.png

四、ODPS运行时监控



4 Logview2.0框架


参考链接:https://help.aliyun.com/zh/maxcompute/user-guide/use-logview-v2-0-to-view-job-information后续的任务调优都会基于logview2.0作业运行时监控进行。

五、ODPS执行计划



5.1SQL执行顺序


5.1.1SQL执行顺序

image.png

通用的SQL 逻辑算子:

image.png

5.1.2SHUFFLE概念

目前大部分的sql性能问题都会和Shuffle强相关,本节重点介绍shuffle基本概念。目前基本所有的SQL优化问题都会涉及到Shuffle过程,所以先来了解Shuffle的原理,参考Hadoop Shuffle过程原理(Hadoop权威指南):在Hadoop中数据从Map阶段传递给Reduce阶段的过程就叫Shuffle,Shuffle机制是整个MApReduce框架中最核心的部分。

image.png

image.png

5.1.3ODPS SQL逻辑执行计划算子

  • ODPS SQL Task Operator结构(截取自ODPS官方文档)


image.png

注:可在ODPS SQL前添加EXPLAIN 执行流程得到,EXPLAIN主要有以下的作用:

1、检查SQL语法;

2、检查读取的表和分区是否符合预期,这样可以排除掉很多分区读错的尴尬;

3、检查mapreduce运行结构是否符合预期,检查mapjoin等特性有没有生效;

  • 各operator算子含义

image.png


5.2离线ODPS SQL优化方法分析


基于上面的介绍的基本概念,本小节基于ODPS SQL的Explain功能查询静态SQL的逻辑执行计划,分析SQL任务优化前后的差异,结合任务实际运行过程中Logview的监控输出,分析给出任务优化生效的原因。

5.2.1Multi Distinct优化分析

image.png

技术网站文章中有大量介绍Multi-Distinct问题的优化方法,先从下面的执行计划来看下。

CASE1:不带Distinct的Count算子使用


EXPLAIN SELECT  app_id
        ,count(user_id)
FROM    xxx.table_vst_user_test
WHERE   dt = '20230816'
GROUP BY app_id;

逻辑执行计划:

image.png

分析:可以看到在Map Task输出阶段,会以app_id字段进行Hash分区传输,输出的临时结果是 app_id和__agg_0_count字段,数据已经预聚合,不存在带有user_id的明细数据shuffle传输,所以任务运行速度较快。

CASE2:带Distinct的Count算子使用


EXPLAIN SELECT  app_id
        ,count(DISTINCT user_id)
FROM    xxx.table_vst_user_test
WHERE   dt = '20230816'
GROUP BY app_id;

image.png

分析:可以看到在Map Task输出阶段,会以app_id字段进行Hash分区传输,输出的临时结果值是 app_id&user_id。Map Task输出的中间结果无法预聚合,需要将带有user_id的明细数据传输,所以运行速度较慢,如果某个小程序对应的访问用户量较大时,极易在Reducer阶段产生数据倾斜。

CASE3:带多Distinct的Count算子使用


EXPLAIN 
SELECT  app_id
        ,count(DISTINCT user_id)
        ,count(DISTINCT cy23_source_name_l1)
        ,count(DISTINCT cy23_source_name_l2)
        ,count(DISTINCT cy23_source_name_l3)
        ,count(DISTINCT cy23_source_name_l4)
FROM    xxx.table_vst_user_test
WHERE   dt = '20230816'
GROUP BY app_id;

image.png

分析:可以看到在Map Task输出阶段,还是会以app_id字段进行Hash分区传输,输出的临时结果值是 app_id&user_id&cy23_source_name_l1&cy23_source_name_l2&cy23_source_name_l3&cy23_source_name_l4。Map Task输出的中间结果无法预聚合,需要将带有user_id及其他的待去重字段的明细数据传输,字段越多,数据传输量越大,所以运行速度较慢,如果某个小程序对应的访问用户量较大时,极易在Reducer阶段产生数据倾斜。

CASE4:带Distinct的Count算子的优化代码(该CASE是对CASE2的代码优化)


EXPLAIN 
SELECT  app_id
        ,COUNT(user_id)
FROM
(
    SELECT  app_id 
           ,user_id
    FROM    xxx.table_vst_user_test
    WHERE   dt = '${bizdate}'
    GROUP BY app_id
            ,user_id
)t
GROUP BY app_id

image.png

image.png

分析:优化后的代码,在逻辑计划里多增加了一个Reducer阶段,但在MAP Task的输出阶段,从原先的以app_id进行Hash分区改为了以app_id&user_id进行Hash分区,可以避免数据在传输到Reduce阶段因为热点数据导致的数据倾斜。在第一个Reducer执行阶段,会对Map段传输的数据进行预聚合,不存在带有明细字段的数据向下一个Reducer阶段传输,避免了数据倾斜的发生。整体来看,该优化方法,没有减少Shuffle过程中的明细数据传输,只是对于Map Task的Hash字段从app_id调整为app_id和user_id,减少了热点数据聚集的可能,通过增加计算阶段进行运行时间的优化。

5.2.2系统参数odps.sql.groupby.skewindata=True分析

image.png

CASE1:带Distinct的Count算子使用


EXPLAIN 
SELECT  app_id 
       ,COUNT(DISTINCT user_id)
FROM    xxx.table_vst_user_test
WHERE   dt = '${bizdate}'
GROUP BY app_id

image.png

分析:同5.2.1中的CASE2

CASE2:Case1代码前加入系统优化参数


SET odps.sql.groupby.skewindata = true;

EXPLAIN 
SELECT  app_id
        ,COUNT(DISTINCT user_id)
FROM    xxx.table_vst_user_test
WHERE   dt = '${bizdate}'
GROUP BY app_id

image.png

image.png

分析:可以看到加入系统优化参数后的逻辑执行计划同5.2.1中的Case4,优化后,Map阶段的输出,app_id进行Hash分区改为了以app_id&user_id进行Hash分区,避免热点数据的聚集,通过增加计算阶段进行运行时间的优化。

5.2.3.Join(Map Join/Inner Join/Left Join)

CASE1:大小表关联(SortMergeJoin)

EXPLAIN SELECT  mini_cat_name_l1
        ,COUNT(DISTINCT user_id)
FROM    (
            --主表
            SELECT  app_id
                    ,user_id
            FROM    xxx.table_vst_user_test
            WHERE   dt = '20230816'
            GROUP BY app_id
                    ,user_id
        ) t1
LEFT JOIN   (
                --维表
                SELECT  app_id
                        ,mini_cat_name_l1
                FROM    xxx.dim_category
                WHERE   dt = '20230816'
            ) t2
ON      t1.app_id = t2.app_id
GROUP BY mini_cat_name_l1;

image.png

image.png

image.png

image.png

image.png

下图来自Logview中的执行计划:

image.png

J4_1_3内部结构:

image.png

分析:逻辑执行计划中,M1阶段,主要针对右表小程序维表xxx.dim_category进行数据加工提取,由于左右表关联的Key是app_id,所以Hash分区的key也是app_id,输出的中间结果是app_id&mini_cate_name_l1。M2阶段,针对访问事件表xxx.table_vst_user_test进行数据加工,Hash分区的key是app_id&user_id,输出的中间结果是是app_id&user_id,因为Hash分区的key是app_id&user_id,所以在R3_2阶段执行时,不存在热点数据聚集导致的数据倾斜。但参看逻辑执行计划,R3_2的输出会以app_id作为Hash key进行数据传输,数据会在J4_1_3阶段进行整合,并跟M1阶段的小程序维表数据进行MergeJoin,存在数据倾斜的可能。同时在R5_4阶段,Hash分区key是mini_cate_name_l1,不同的行业类目下的用户量差异较大,也会存在可能的数据倾斜。基于Logview的执行计划,可以看到两表关联使用的是MergeJoin的算法(参考上图)。

Sort Merge Join算法原理:

image.png

image.png

算法执行过程:1. Shuffle阶段:将两张表根据join key进行重新分区,两张表数据会分布到整个集群,以便分布式并行处理;2. Sort阶段:对单个分区节点的两表数据,分别进行排序;3. Merge阶段:对排好序的两张分区表数据执行join操作。join操作很简单,分别遍历两个有序序列,碰到相同join key就merge输出,如果不同,左边小就继续取左边,反之取右边(即用即取即丢),见下图示意:

image.png

可以看出,无论分区有多大,Sort Merge Join都不用把某一侧的数据全部加载到内存中,而是即用即取即丢。

CASE2:大小表关联使用mapjoin hint(BroadcastHashJoin)

EXPLAIN SELECT  /*+mapjoin(t2)*/mini_cat_name_l1
        ,COUNT(DISTINCT user_id)
FROM    (
            SELECT  app_id
                    ,user_id
            FROM    xxx.table_vst_user_test
            WHERE   dt = '20230816'
            GROUP BY app_id
                    ,user_id
        ) t1
LEFT JOIN   (
                SELECT  app_id
                        ,mini_cat_name_l1
                FROM    xxx.dim_category
                WHERE   dt = '20230816'
            ) t2
ON      t1.app_id = t2.app_id
GROUP BY mini_cat_name_l1;

image.png

image.png

image.png

image.png

image.png

分析:逻辑执行计划中,M1阶段,主要针对右表小程序维表xxx.dim_category进行数据加工提取,由于左右表关联的Key是app_id,但明确使用的是Mapjoin,所以不存在Hash分区字段,输出的中间结果是app_id&mini_cate_name_l1。M2阶段,针对访问事件表xxx.table_vst_user_test进行数据加工,Hash分区的key是app_id&user_id,输出的中间结果是是app_id&user_id,因为Hash分区的key是app_id&user_id,所以在J3_1_2阶段执行时,不存在热点数据聚集导致的数据倾斜。数据会在J3_1_2阶段进行整合,并跟M1阶段的小程序维表数据进行Broadcast Hash Join。同时在R4_3阶段,Hash分区key是mini_cate_name_l1,不同的行业类目下的用户量差异较大,也会存在可能的数据倾斜。基于逻辑执行计划和Logview的执行计划,可以看到两表关联使用的是BroadcastHashJoin的算法。可以看到针对Case1的代码进行优化后,两表关联算法从SortMergeJoin改为了BroadcastHashJoin,特定场景下,减少了可能的数据倾斜,利用资源空间换时间。下图来自Logview中的执行计划:

image.png

Broadcast Hash Join算法:

SparkSQL中broadcast hash join定义:是将其中一张小表广播分发到大表所在的所有节点上,供打标使用。executor存储小表的全部数据,一定程度上牺牲了空间,换区shuffle操作大量的耗时。

image.png

HashJoin的伪代码逻辑:

image.png

CASE3:大小表关联使用distributed mapjoin hint


EXPLAIN SELECT  /*+mapjoin(t2)*/mini_cat_name_l1
        ,COUNT(DISTINCT user_id)
FROM    (
            SELECT  app_id
                    ,user_id
            FROM    xxx.table_vst_user_test
            WHERE   dt = '20230816'
            GROUP BY app_id
                    ,user_id
        ) t1
LEFT JOIN   (
                SELECT  app_id
                        ,mini_cat_name_l1
                FROM    xxx.dim_category
                WHERE   dt = '20230816'
            ) t2
ON      t1.app_id = t2.app_id
GROUP BY mini_cat_name_l1;

image.png

image.png

image.png

image.png

下图来自Logview中的执行计划:

image.png

J4_2_3内部结构

image.png

基于可以看到在Join Task中,使用的是DistributeMapJoin算法。分析:Case1中的执行计划为原执行计划,M1是小表,上图为使用Distributed MapJoin之后的Plan。

  • 小表一侧分为M1,R2_1 两个Stage。M1阶段读表并进行Shuffle,Shuffle的过程将数据分片(shard=2),使得具有相同hash value的数据分发到同一个worker。R2_1(HashTableBuilder1)作为server端,完成HashTable的构建并常驻内存,接受client端(J4_2_3 DistributedMapJoin1)请求完成Lookup查询并返回values。多个shard共同组合成一个分布式的hash table services,shard数量可以手动调整。各shard的service一旦启动,需要等待client端(DistributedMapJoin1)完成所有的request请求后才stop。
  • 大表一侧为Stage M3和J4_2_3。J4_2_3(DistributedMapJoin1)作为client端,通过网络传输方式将大表端的join keys,分batches往server端(HashTableBuilder1)发起request请求并获取返回values。由于server端的数据已经按照hash value分shard,client端可以根据数据的特征只请求特定的shard。

相比于原Query,使用Distributed MapJoin后,大表侧需要通过RPC建立网络通讯获取小表侧HashTable查询返回的数据,建议大表数据量应该远大于小表,否则带来的收益有限,甚至有可能因为网络的波动导致性能回退。从硬件发展趋势来看,相比于网络带宽,磁盘IO往往更容易成为瓶颈,所以长远看更有益,但是现阶段使用Distributed MapJoin时,要求大表应远大于小表数据量。注意,本case仅仅是为了对DistributedMapJoin的逻辑执行计划进行分析,与CASE1进行对比,该优化方法不一定适用该测试sql语句。具体适用场景及用法请查询参考资料章节中的DistributedMapJoin链接。

六、总结


本文重点尝试从ODPS SQL的逻辑执行计划和Logview中的执行计划出发,分析日常数据研发过程中各种优化方法背后的原理,精力有限,仅覆盖了部分调优方法的分析,希望能给大家日常SQL优化工作带来一些启发。由于掌握的ODPS底层执行原理资料有限且线上生产环境HBO对于执行计划有影响,存在理解不完全正确的可能,望读者谅解。

参考资料:

作者 | 博暄

来源 | 阿里云开发者公众号

相关文章
|
17天前
|
人工智能
歌词结构的巧妙安排:写歌词的方法与技巧解析,妙笔生词AI智能写歌词软件
歌词创作是一门艺术,关键在于巧妙的结构安排。开头需迅速吸引听众,主体部分要坚实且富有逻辑,结尾则应留下深刻印象。《妙笔生词智能写歌词软件》提供多种 AI 功能,帮助创作者找到灵感,优化歌词结构,写出打动人心的作品。
|
23天前
|
存储 算法 Java
解析HashSet的工作原理,揭示Set如何利用哈希算法和equals()方法确保元素唯一性,并通过示例代码展示了其“无重复”特性的具体应用
在Java中,Set接口以其独特的“无重复”特性脱颖而出。本文通过解析HashSet的工作原理,揭示Set如何利用哈希算法和equals()方法确保元素唯一性,并通过示例代码展示了其“无重复”特性的具体应用。
38 3
|
18天前
|
人工智能
写歌词的技巧和方法全解析:开启你的音乐创作之旅,妙笔生词智能写歌词软件
怀揣音乐梦想,渴望用歌词抒发情感?掌握关键技巧,你也能踏上创作之旅。灵感来自生活点滴,主题明确,语言简洁,韵律和谐。借助“妙笔生词智能写歌词软件”,AI辅助创作,轻松写出动人歌词,实现音乐梦想。
|
3天前
|
JSON PHP 数据格式
PHP解析配置文件的常用方法
INI文件是最常见的配置文件格式之一。
|
9天前
|
机器学习/深度学习 人工智能 安全
TPAMI:安全强化学习方法、理论与应用综述,慕工大、同济、伯克利等深度解析
【10月更文挑战第27天】强化学习(RL)在实际应用中展现出巨大潜力,但其安全性问题日益凸显。为此,安全强化学习(SRL)应运而生。近日,来自慕尼黑工业大学、同济大学和加州大学伯克利分校的研究人员在《IEEE模式分析与机器智能汇刊》上发表了一篇综述论文,系统介绍了SRL的方法、理论和应用。SRL主要面临安全性定义模糊、探索与利用平衡以及鲁棒性与可靠性等挑战。研究人员提出了基于约束、基于风险和基于监督学习等多种方法来应对这些挑战。
21 2
|
17天前
|
安全 Java
Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧
【10月更文挑战第20天】Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧,包括避免在循环外调用wait()、优先使用notifyAll()、确保线程安全及处理InterruptedException等,帮助读者更好地掌握这些方法的应用。
13 1
|
2天前
|
SQL 监控 安全
员工上网行为监控软件:SQL 在数据查询监控中的应用解析
在数字化办公环境中,员工上网行为监控软件对企业网络安全和管理至关重要。通过 SQL 查询和分析数据库中的数据,企业可以精准了解员工的上网行为,包括基础查询、复杂条件查询、数据统计与分析等,从而提高网络管理和安全防护的效率。
10 0
|
24天前
|
SQL 数据可视化 BI
SQL语句及查询结果解析:技巧与方法
在数据库管理和数据分析中,SQL语句扮演着至关重要的角色
|
1月前
|
存储 机器学习/深度学习 分布式计算
大数据技术——解锁数据的力量,引领未来趋势
【10月更文挑战第5天】大数据技术——解锁数据的力量,引领未来趋势
|
11天前
|
数据采集 监控 数据管理
数据治理之道:大数据平台的搭建与数据质量管理
【10月更文挑战第26天】随着信息技术的发展,数据成为企业核心资源。本文探讨大数据平台的搭建与数据质量管理,包括选择合适架构、数据处理与分析能力、数据质量标准与监控机制、数据清洗与校验及元数据管理,为企业数据治理提供参考。
48 1

相关产品

  • 云原生大数据计算服务 MaxCompute
  • 推荐镜像

    更多
    下一篇
    无影云桌面