一文解析 ODPS SQL 任务优化方法原理

本文涉及的产品
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
简介: 本文重点尝试从ODPS SQL的逻辑执行计划和Logview中的执行计划出发,分析日常数据研发过程中各种优化方法背后的原理,覆盖了部分调优方法的分析,从知道怎么优化,到为什么这样优化,以及还能怎样优化。

一、背景


使用ODPS SQL进行离线数据研发时,开发同学不可避免会碰到任务性能问题,需要经常对ODPS SQL执行任务进行调优,以对重点场景任务产出时效进行保障,避免资源浪费。调优过程需要参考相关优化文档资料,发现技术网站中有很多文章介绍到相关的优化方法,但从ODPS底层执行计划来解释为什么要这样做优化以及背后的依据是什么的介绍文章比较少。本文尝试从ODPS底层逻辑计划拆解部分优化方法对应的优化原理,从知道怎么优化,到为什么这样优化,以及还能怎样优化。

二、ODPS基础架构


本节直接略过MAXCOMPUTE基本信息介绍,直接进入相关架构描述。


2 ODPS架构

image.png



ODPS按照功能逻辑划分为接入层、逻辑层、存储/计算层,对应着集群功能则是接入层、控制集群、计算集群。

  • ODPS接入层的最上层是通过LVS实现负载均衡,把请求发送给HTTP Server,该请求包括用户的AccessID和MD5签名信息,HTTP Server在接收到请求后,会把AccessID和MD5签名发给云账号服务进行用户认证,认证通过后,云账号服务会返回该用户的唯一AccountID,在后续执行逻辑中,发送的请求都是包含该AccountID,而不是AccessID。
  • 逻辑层又称作控制层,是MaxCompute的核心部分。实现用户空间和对象的管理、命令的解析与执行逻辑、数据对象的访问控制与授权等功能。在逻辑层有Worker、Scheduler和Executor三个角色:
  • Worker处理所有的RESTful请求,它可以本地处理一些作业,如对用户空间、表、资源、作业等的管理;而对于需要执行分布式计算的作业,如SQL、MR等,Worker会进一步把它提交给Scheduler处理;
  • Scheduler负责instance的调度,它会维护一个Instance列表,并把Instance分解成各个Task,生成这些Task的工作流——DAG图(Directed Acyclic Graph,有向无环图),把可以运行的Task放到TaskPool中,TaskPool是个优先级队列,后台线程会定时对该优先级队列进行排序;此外,Scheduler还会查询计算集群的资源状况,向计算集群的Fuxi master询问资源占用情况以进行流控(Fuxi slot满的时候,停止响应Executor的task申请)。
  • Executor会判断自身资源情况,如CPU、内存、正在运行的Task数(不能超过上限),如果资源满足,则会主动轮询Scheduler的TaskPool请求获取下一个Task,TaskPool会根据Task的优先级和计算集群的资源情况,把相应Task提交给Executor,Executor获取到Task后,会生成计算层的分布式作业描述文件,提交给计算层,监控这些任务的运行状态,并定时把状态汇报给Scheduler。

简单地说,当用户提交一个ODPS作业请求时,接入层先进行用户认证,然后发送给控制层的Worker,Worker判断是否为同步请求,如果为同步请求,则本地执行并返回。如果是异步请求,Worker会先做些检查(如表是否存在,版本号是否最新等),生成InstanceID,把请求进一步发送给Scheduler,并返回给客户端。Scheduler把作业分解成各个Task,Executor主动轮询Scheduler,获取相应Task,提交给计算层执行,并定时将自己持有的Task的状态汇报给Scheduler。

  • 计算层就是飞天内核(Apsara Core),运行在和控制层相互独立的计算集群上。包括Pangu(分布式文件系统)、Fuxi(资源调度系统)、Nuwa/ZK(Naming服务)、Shennong(监控模块)等。MaxCompute中的元数据存储在阿里云计算的另一个开放服务OTS(Open Table Service,开放结构化数据服务)中,元数据内容主要包括用户空间元数据、Table/Partition Schema、ACL、Job元数据、安全体系等。

三、ODPS基础概念



3 ODPS作业概念


官方文档描述ODPS元数据模型:

image.png

image.png

通常情况下,一个odps job对应一个odps instance(会产生一个instance_id), 一个odps instance对应一个odps task, 一个odps task对应一个活多个fuxi job,一个fuxi job可以基于DAG被拆分为多个类型的task如map、reduce和joiner。一个odps instance对应两个fuxi job的case(小文件合并):

image.png

四、ODPS运行时监控



4 Logview2.0框架


参考链接:https://help.aliyun.com/zh/maxcompute/user-guide/use-logview-v2-0-to-view-job-information后续的任务调优都会基于logview2.0作业运行时监控进行。

五、ODPS执行计划



5.1SQL执行顺序


5.1.1SQL执行顺序

image.png

通用的SQL 逻辑算子:

image.png

5.1.2SHUFFLE概念

目前大部分的sql性能问题都会和Shuffle强相关,本节重点介绍shuffle基本概念。目前基本所有的SQL优化问题都会涉及到Shuffle过程,所以先来了解Shuffle的原理,参考Hadoop Shuffle过程原理(Hadoop权威指南):在Hadoop中数据从Map阶段传递给Reduce阶段的过程就叫Shuffle,Shuffle机制是整个MApReduce框架中最核心的部分。

image.png

image.png

5.1.3ODPS SQL逻辑执行计划算子

  • ODPS SQL Task Operator结构(截取自ODPS官方文档)


image.png

注:可在ODPS SQL前添加EXPLAIN 执行流程得到,EXPLAIN主要有以下的作用:

1、检查SQL语法;

2、检查读取的表和分区是否符合预期,这样可以排除掉很多分区读错的尴尬;

3、检查mapreduce运行结构是否符合预期,检查mapjoin等特性有没有生效;

  • 各operator算子含义

image.png


5.2离线ODPS SQL优化方法分析


基于上面的介绍的基本概念,本小节基于ODPS SQL的Explain功能查询静态SQL的逻辑执行计划,分析SQL任务优化前后的差异,结合任务实际运行过程中Logview的监控输出,分析给出任务优化生效的原因。

5.2.1Multi Distinct优化分析

image.png

技术网站文章中有大量介绍Multi-Distinct问题的优化方法,先从下面的执行计划来看下。

CASE1:不带Distinct的Count算子使用


EXPLAIN SELECT  app_id
        ,count(user_id)
FROM    xxx.table_vst_user_test
WHERE   dt = '20230816'
GROUP BY app_id;

逻辑执行计划:

image.png

分析:可以看到在Map Task输出阶段,会以app_id字段进行Hash分区传输,输出的临时结果是 app_id和__agg_0_count字段,数据已经预聚合,不存在带有user_id的明细数据shuffle传输,所以任务运行速度较快。

CASE2:带Distinct的Count算子使用


EXPLAIN SELECT  app_id
        ,count(DISTINCT user_id)
FROM    xxx.table_vst_user_test
WHERE   dt = '20230816'
GROUP BY app_id;

image.png

分析:可以看到在Map Task输出阶段,会以app_id字段进行Hash分区传输,输出的临时结果值是 app_id&user_id。Map Task输出的中间结果无法预聚合,需要将带有user_id的明细数据传输,所以运行速度较慢,如果某个小程序对应的访问用户量较大时,极易在Reducer阶段产生数据倾斜。

CASE3:带多Distinct的Count算子使用


EXPLAIN 
SELECT  app_id
        ,count(DISTINCT user_id)
        ,count(DISTINCT cy23_source_name_l1)
        ,count(DISTINCT cy23_source_name_l2)
        ,count(DISTINCT cy23_source_name_l3)
        ,count(DISTINCT cy23_source_name_l4)
FROM    xxx.table_vst_user_test
WHERE   dt = '20230816'
GROUP BY app_id;

image.png

分析:可以看到在Map Task输出阶段,还是会以app_id字段进行Hash分区传输,输出的临时结果值是 app_id&user_id&cy23_source_name_l1&cy23_source_name_l2&cy23_source_name_l3&cy23_source_name_l4。Map Task输出的中间结果无法预聚合,需要将带有user_id及其他的待去重字段的明细数据传输,字段越多,数据传输量越大,所以运行速度较慢,如果某个小程序对应的访问用户量较大时,极易在Reducer阶段产生数据倾斜。

CASE4:带Distinct的Count算子的优化代码(该CASE是对CASE2的代码优化)


EXPLAIN 
SELECT  app_id
        ,COUNT(user_id)
FROM
(
    SELECT  app_id 
           ,user_id
    FROM    xxx.table_vst_user_test
    WHERE   dt = '${bizdate}'
    GROUP BY app_id
            ,user_id
)t
GROUP BY app_id

image.png

image.png

分析:优化后的代码,在逻辑计划里多增加了一个Reducer阶段,但在MAP Task的输出阶段,从原先的以app_id进行Hash分区改为了以app_id&user_id进行Hash分区,可以避免数据在传输到Reduce阶段因为热点数据导致的数据倾斜。在第一个Reducer执行阶段,会对Map段传输的数据进行预聚合,不存在带有明细字段的数据向下一个Reducer阶段传输,避免了数据倾斜的发生。整体来看,该优化方法,没有减少Shuffle过程中的明细数据传输,只是对于Map Task的Hash字段从app_id调整为app_id和user_id,减少了热点数据聚集的可能,通过增加计算阶段进行运行时间的优化。

5.2.2系统参数odps.sql.groupby.skewindata=True分析

image.png

CASE1:带Distinct的Count算子使用


EXPLAIN 
SELECT  app_id 
       ,COUNT(DISTINCT user_id)
FROM    xxx.table_vst_user_test
WHERE   dt = '${bizdate}'
GROUP BY app_id

image.png

分析:同5.2.1中的CASE2

CASE2:Case1代码前加入系统优化参数


SET odps.sql.groupby.skewindata = true;

EXPLAIN 
SELECT  app_id
        ,COUNT(DISTINCT user_id)
FROM    xxx.table_vst_user_test
WHERE   dt = '${bizdate}'
GROUP BY app_id

image.png

image.png

分析:可以看到加入系统优化参数后的逻辑执行计划同5.2.1中的Case4,优化后,Map阶段的输出,app_id进行Hash分区改为了以app_id&user_id进行Hash分区,避免热点数据的聚集,通过增加计算阶段进行运行时间的优化。

5.2.3.Join(Map Join/Inner Join/Left Join)

CASE1:大小表关联(SortMergeJoin)

EXPLAIN SELECT  mini_cat_name_l1
        ,COUNT(DISTINCT user_id)
FROM    (
            --主表
            SELECT  app_id
                    ,user_id
            FROM    xxx.table_vst_user_test
            WHERE   dt = '20230816'
            GROUP BY app_id
                    ,user_id
        ) t1
LEFT JOIN   (
                --维表
                SELECT  app_id
                        ,mini_cat_name_l1
                FROM    xxx.dim_category
                WHERE   dt = '20230816'
            ) t2
ON      t1.app_id = t2.app_id
GROUP BY mini_cat_name_l1;

image.png

image.png

image.png

image.png

image.png

下图来自Logview中的执行计划:

image.png

J4_1_3内部结构:

image.png

分析:逻辑执行计划中,M1阶段,主要针对右表小程序维表xxx.dim_category进行数据加工提取,由于左右表关联的Key是app_id,所以Hash分区的key也是app_id,输出的中间结果是app_id&mini_cate_name_l1。M2阶段,针对访问事件表xxx.table_vst_user_test进行数据加工,Hash分区的key是app_id&user_id,输出的中间结果是是app_id&user_id,因为Hash分区的key是app_id&user_id,所以在R3_2阶段执行时,不存在热点数据聚集导致的数据倾斜。但参看逻辑执行计划,R3_2的输出会以app_id作为Hash key进行数据传输,数据会在J4_1_3阶段进行整合,并跟M1阶段的小程序维表数据进行MergeJoin,存在数据倾斜的可能。同时在R5_4阶段,Hash分区key是mini_cate_name_l1,不同的行业类目下的用户量差异较大,也会存在可能的数据倾斜。基于Logview的执行计划,可以看到两表关联使用的是MergeJoin的算法(参考上图)。

Sort Merge Join算法原理:

image.png

image.png

算法执行过程:1. Shuffle阶段:将两张表根据join key进行重新分区,两张表数据会分布到整个集群,以便分布式并行处理;2. Sort阶段:对单个分区节点的两表数据,分别进行排序;3. Merge阶段:对排好序的两张分区表数据执行join操作。join操作很简单,分别遍历两个有序序列,碰到相同join key就merge输出,如果不同,左边小就继续取左边,反之取右边(即用即取即丢),见下图示意:

image.png

可以看出,无论分区有多大,Sort Merge Join都不用把某一侧的数据全部加载到内存中,而是即用即取即丢。

CASE2:大小表关联使用mapjoin hint(BroadcastHashJoin)

EXPLAIN SELECT  /*+mapjoin(t2)*/mini_cat_name_l1
        ,COUNT(DISTINCT user_id)
FROM    (
            SELECT  app_id
                    ,user_id
            FROM    xxx.table_vst_user_test
            WHERE   dt = '20230816'
            GROUP BY app_id
                    ,user_id
        ) t1
LEFT JOIN   (
                SELECT  app_id
                        ,mini_cat_name_l1
                FROM    xxx.dim_category
                WHERE   dt = '20230816'
            ) t2
ON      t1.app_id = t2.app_id
GROUP BY mini_cat_name_l1;

image.png

image.png

image.png

image.png

image.png

分析:逻辑执行计划中,M1阶段,主要针对右表小程序维表xxx.dim_category进行数据加工提取,由于左右表关联的Key是app_id,但明确使用的是Mapjoin,所以不存在Hash分区字段,输出的中间结果是app_id&mini_cate_name_l1。M2阶段,针对访问事件表xxx.table_vst_user_test进行数据加工,Hash分区的key是app_id&user_id,输出的中间结果是是app_id&user_id,因为Hash分区的key是app_id&user_id,所以在J3_1_2阶段执行时,不存在热点数据聚集导致的数据倾斜。数据会在J3_1_2阶段进行整合,并跟M1阶段的小程序维表数据进行Broadcast Hash Join。同时在R4_3阶段,Hash分区key是mini_cate_name_l1,不同的行业类目下的用户量差异较大,也会存在可能的数据倾斜。基于逻辑执行计划和Logview的执行计划,可以看到两表关联使用的是BroadcastHashJoin的算法。可以看到针对Case1的代码进行优化后,两表关联算法从SortMergeJoin改为了BroadcastHashJoin,特定场景下,减少了可能的数据倾斜,利用资源空间换时间。下图来自Logview中的执行计划:

image.png

Broadcast Hash Join算法:

SparkSQL中broadcast hash join定义:是将其中一张小表广播分发到大表所在的所有节点上,供打标使用。executor存储小表的全部数据,一定程度上牺牲了空间,换区shuffle操作大量的耗时。

image.png

HashJoin的伪代码逻辑:

image.png

CASE3:大小表关联使用distributed mapjoin hint


EXPLAIN SELECT  /*+mapjoin(t2)*/mini_cat_name_l1
        ,COUNT(DISTINCT user_id)
FROM    (
            SELECT  app_id
                    ,user_id
            FROM    xxx.table_vst_user_test
            WHERE   dt = '20230816'
            GROUP BY app_id
                    ,user_id
        ) t1
LEFT JOIN   (
                SELECT  app_id
                        ,mini_cat_name_l1
                FROM    xxx.dim_category
                WHERE   dt = '20230816'
            ) t2
ON      t1.app_id = t2.app_id
GROUP BY mini_cat_name_l1;

image.png

image.png

image.png

image.png

下图来自Logview中的执行计划:

image.png

J4_2_3内部结构

image.png

基于可以看到在Join Task中,使用的是DistributeMapJoin算法。分析:Case1中的执行计划为原执行计划,M1是小表,上图为使用Distributed MapJoin之后的Plan。

  • 小表一侧分为M1,R2_1 两个Stage。M1阶段读表并进行Shuffle,Shuffle的过程将数据分片(shard=2),使得具有相同hash value的数据分发到同一个worker。R2_1(HashTableBuilder1)作为server端,完成HashTable的构建并常驻内存,接受client端(J4_2_3 DistributedMapJoin1)请求完成Lookup查询并返回values。多个shard共同组合成一个分布式的hash table services,shard数量可以手动调整。各shard的service一旦启动,需要等待client端(DistributedMapJoin1)完成所有的request请求后才stop。
  • 大表一侧为Stage M3和J4_2_3。J4_2_3(DistributedMapJoin1)作为client端,通过网络传输方式将大表端的join keys,分batches往server端(HashTableBuilder1)发起request请求并获取返回values。由于server端的数据已经按照hash value分shard,client端可以根据数据的特征只请求特定的shard。

相比于原Query,使用Distributed MapJoin后,大表侧需要通过RPC建立网络通讯获取小表侧HashTable查询返回的数据,建议大表数据量应该远大于小表,否则带来的收益有限,甚至有可能因为网络的波动导致性能回退。从硬件发展趋势来看,相比于网络带宽,磁盘IO往往更容易成为瓶颈,所以长远看更有益,但是现阶段使用Distributed MapJoin时,要求大表应远大于小表数据量。注意,本case仅仅是为了对DistributedMapJoin的逻辑执行计划进行分析,与CASE1进行对比,该优化方法不一定适用该测试sql语句。具体适用场景及用法请查询参考资料章节中的DistributedMapJoin链接。

六、总结


本文重点尝试从ODPS SQL的逻辑执行计划和Logview中的执行计划出发,分析日常数据研发过程中各种优化方法背后的原理,精力有限,仅覆盖了部分调优方法的分析,希望能给大家日常SQL优化工作带来一些启发。由于掌握的ODPS底层执行原理资料有限且线上生产环境HBO对于执行计划有影响,存在理解不完全正确的可能,望读者谅解。

参考资料:

作者 | 博暄

来源 | 阿里云开发者公众号

目录
打赏
0
0
1
0
2528
分享
相关文章
HashMap深度解析:从原理到实战
HashMap,作为Java集合框架中的一个核心组件,以其高效的键值对存储和检索机制,在软件开发中扮演着举足轻重的角色。作为一名资深的AI工程师,深入理解HashMap的原理、历史、业务场景以及实战应用,对于提升数据处理和算法实现的效率至关重要。本文将通过手绘结构图、流程图,结合Java代码示例,全方位解析HashMap,帮助读者从理论到实践全面掌握这一关键技术。
72 13
深入解析MySQL的EXPLAIN:指标详解与索引优化
MySQL 中的 `EXPLAIN` 语句用于分析和优化 SQL 查询,帮助你了解查询优化器的执行计划。本文详细介绍了 `EXPLAIN` 输出的各项指标,如 `id`、`select_type`、`table`、`type`、`key` 等,并提供了如何利用这些指标优化索引结构和 SQL 语句的具体方法。通过实战案例,展示了如何通过创建合适索引和调整查询语句来提升查询性能。
121 9
深入解析云计算中的微服务架构:原理、优势与实践
深入解析云计算中的微服务架构:原理、优势与实践
81 1
探索阿里云 Flink 物化表:原理、优势与应用场景全解析
阿里云Flink的物化表是流批一体化平台中的关键特性,支持低延迟实时更新、灵活查询性能、无缝流批处理和高容错性。它广泛应用于电商、物联网和金融等领域,助力企业高效处理实时数据,提升业务决策能力。实践案例表明,物化表显著提高了交易欺诈损失率的控制和信贷审批效率,推动企业在数字化转型中取得竞争优势。
50 14
使用访问指导(SQL Access Advisor)优化数据库业务负载
本文介绍了Oracle的SQL访问指导(SQL Access Advisor)的应用场景及其使用方法。访问指导通过分析给定的工作负载,提供索引、物化视图和分区等方面的优化建议,帮助DBA提升数据库性能。具体步骤包括创建访问指导任务、创建工作负载、连接工作负载至访问指导、设置任务参数、运行访问指导、查看和应用优化建议。访问指导不仅针对单条SQL语句,还能综合考虑多条SQL语句的优化效果,为DBA提供全面的决策支持。
59 11
Transformer模型变长序列优化:解析PyTorch上的FlashAttention2与xFormers
本文探讨了Transformer模型中变长输入序列的优化策略,旨在解决深度学习中常见的计算效率问题。文章首先介绍了批处理变长输入的技术挑战,特别是填充方法导致的资源浪费。随后,提出了多种优化技术,包括动态填充、PyTorch NestedTensors、FlashAttention2和XFormers的memory_efficient_attention。这些技术通过减少冗余计算、优化内存管理和改进计算模式,显著提升了模型的性能。实验结果显示,使用FlashAttention2和无填充策略的组合可以将步骤时间减少至323毫秒,相比未优化版本提升了约2.5倍。
58 3
Transformer模型变长序列优化:解析PyTorch上的FlashAttention2与xFormers
探索网络模型与协议:从OSI到HTTPs的原理解析
OSI七层网络模型和TCP/IP四层模型是理解和设计计算机网络的框架。OSI模型包括物理层、数据链路层、网络层、传输层、会话层、表示层和应用层,而TCP/IP模型则简化为链路层、网络层、传输层和 HTTPS协议基于HTTP并通过TLS/SSL加密数据,确保安全传输。其连接过程涉及TCP三次握手、SSL证书验证、对称密钥交换等步骤,以保障通信的安全性和完整性。数字信封技术使用非对称加密和数字证书确保数据的机密性和身份认证。 浏览器通过Https访问网站的过程包括输入网址、DNS解析、建立TCP连接、发送HTTPS请求、接收响应、验证证书和解析网页内容等步骤,确保用户与服务器之间的安全通信。
74 1
React 文本区域组件 Textarea:深入解析与优化
本文介绍了 React 中 Textarea 组件的基础用法、常见问题及优化方法,包括状态绑定、初始值设置、样式自定义、性能优化和跨浏览器兼容性处理,并提供了代码案例。
60 8
千万级电商线上无阻塞双buffer缓冲优化ID生成机制深度解析
【11月更文挑战第30天】在千万级电商系统中,ID生成机制是核心基础设施之一。一个高效、可靠的ID生成系统对于保障系统的稳定性和性能至关重要。本文将深入探讨一种在千万级电商线上广泛应用的ID生成机制——无阻塞双buffer缓冲优化方案。本文从概述、功能点、背景、业务点、底层原理等多个维度进行解析,并通过Java语言实现多个示例,指出各自实践的优缺点。希望给需要的同学提供一些参考。
52 7
Java虚拟机(JVM)垃圾回收机制深度解析与优化策略####
本文旨在深入探讨Java虚拟机(JVM)的垃圾回收机制,揭示其工作原理、常见算法及参数调优方法。通过剖析垃圾回收的生命周期、内存区域划分以及GC日志分析,为开发者提供一套实用的JVM垃圾回收优化指南,助力提升Java应用的性能与稳定性。 ####

相关产品

  • 云原生大数据计算服务 MaxCompute
  • 推荐镜像

    更多
    AI助理

    你好,我是AI助理

    可以解答问题、推荐解决方案等