SQL优化器原理 - Auto Hash Join

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 在MaxCompute中,Join操作符的实现算法之一名为"Hash Join",其实现原理是,把小表的数据全部读入内存中,并拷贝多份分发到大表数据所在机器,在 map 阶段直接扫描大表数据与内存中的小表数据进行匹配。

这是MaxCompute有关SQL优化器原理的系列文章之一。我们会陆续推出SQL优化器有关优化规则和框架的其他文章。添加钉钉群“关系代数优化技术”(群号11719083)可以获取最新文章发布动态(二维码在文章末尾)。

本文主要描述MaxCompute优化器实现的Auto Hash Join的功能。

简介

在MaxCompute中,Join操作符的实现算法之一名为"Hash Join",其实现原理是,把小表的数据全部读入内存中,并拷贝多份分发到大表数据所在机器,在 map 阶段直接扫描大表数据与内存中的小表数据进行匹配。Hash join执行方式效率很高,但是要求小表数据足够小以便放到内存中,假如小表数据太大,则任务在执行过程中会报OutOfMemory错误。

在MapCompute中,可以使用MapJoin关键字来实现Hash join,如下所示:

select /* + mapjoin(b) */  a.* from table1 a join table2 b on a.col1 = b.col2;
// b表为小表

但是这种通过使用hint的方式还是不够智能。另外对于query复杂的情况,用户很可能因为无法确定join的某一路数据量大小而放弃使用mapjoin。在最新的MaxCompute SQL 2.0中,基于代价的优化器(Cost Based Optimizer,CBO)包含了一个自动优化join为hash join的优化规则。

实现原理

在CBO中会对所有的operator的cost进行估计,这个cost包含rowcount、cpu、内存等等。有了各个operator的cost,就能估计其对应输出数据量的大小,公式可以简单的认为是: data_size = rowcount * averageRowSize。有了dataSize之后,就可以很容易知道这个任务是否适合使用HashJoin,其判定方法就是计算各个parent operator的data size之和是否小于某个阈值。假如估算出的data size在阈值范围之内,则会产生一个包含HashJoin的计划。同时对于Join,CBO也会产生一个普通的包含MergeJoin的计划,最后在这两个计划中选择cost最小的作为最优计划。

简单说来,在CBO中是否选择HashJoin作为最优计划的步骤有两个:

  • Step1:估算join的输入数据量大小,判定是否产生一个包含HashJoin的计划
  • Step2:对比HashJoin、MergeJoin相关计划的cost,选择cost最小的计划作为最优计划

举例,对如下sql进行优化:

select t1.name from
  (select dt_bad_linenum as name from bad_tpch_customer) t1
join
  (select c_name from tpch_customer) t2
on t1.name = t2.c_name;

上述sql在CBO中会翻译生成如下operator tree:

OdpsLogicalProject(name=[$0]): rowcount = 9000000.0, cumulative cost = {48000008.0 rows, 39000010.0 cpu, 0.0 io, 0.0 memory, 0.0 network}, id = 5
  LogicalJoin(condition=[EQ($0, $1)], joinType=[inner]): rowcount = 9000000.0, cumulative cost = {39000008.0 rows, 30000010.0 cpu, 0.0 io, 0.0 memory, 0.0 network}, id = 4
    OdpsLogicalProject(name=[$1]): rowcount = 4.0, cumulative cost = {8.0 rows, 9.0 cpu, 0.0 io, 0.0 memory, 0.0 network}, id = 1
      OdpsLogicalTableScan(table=[[tpch_100gb.bad_tpch_customer, dt_bad_file,dt_bad_linenum,dt_bad_msg,dt_bad_code,dt_bad_data(5) {0, 1, 2, 3, 4}]]): rowcount = 4.0, cumulative cost = {4.0 rows, 5.0 cpu, 0.0 io, 0.0 memory, 0.0 network}, id = 0
    OdpsLogicalProject(c_name=[$1]): rowcount = 1.5E7, cumulative cost = {3.0E+7 rows, 30000001 cpu, 0.0 io, 0.0 memory, 0.0 network}, id = 3
      OdpsLogicalTableScan(table=[[tpch_100gb.tpch_customer, c_custkey,c_name,c_address,c_nationkey,c_phone,c_acctbal,c_mktsegment,c_comment(8) {0, 1, 2, 3, 4, 5, 6, 7}]]): rowcount = 1.5E7, cumulative cost = {1.5E+7 rows, 15000001 cpu, 0.0 io, 0.0 memory, 0.0 network}, id = 2

从上可以看到,join的parent operator有两个:

OdpsLogicalProject(name=[$1]): rowcount = 4.0, cumulative cost = {8.0 rows, 9.0 cpu, 0.0 io, 0.0 memory, 0.0 network}, id = 1

OdpsLogicalProject(c_name=[$1]): rowcount = 1.5E7, cumulative cost = {3.0E+7 rows, 30000001 cpu, 0.0 io, 0.0 memory, 0.0 network}, id = 3
    

其中id为1的project其输出记录数是4行,且其输出列只有1列(bad_tpch_customer表中有5列),估算其输出数据量,认为其适合使用HashJoin,因此其产生的计划中包含两种:

  • 计划1:HashJoin
OdpsPhysicalProject(name=[$0]): rowcount = 3.24, cumulative cost = {28500024.88 rows, 28500013.222723687326862 cpu, 270001607.0 io, 496.0 memory, 378.0 network}, id = 109
  OdpsPhysicalHashJoin(type=[INNER], equi=[[($0,$1)]], mainstream=[1]): rowcount = 3.24, cumulative cost = {28500021.64 rows, 28500013.222723687326862 cpu, 270001548.0 io, 496.0 memory, 378.0 network}, id = 108
    OdpsPhysicalStreamlineRead(order=[[]]): rowcount = 3.6, cumulative cost = {18.4 rows, 13.222723687326862 cpu, 1170.0 io, 100.0 memory, 0.0 network}, id = 106
      OdpsPhysicalStreamlineWrite(shuffle=[broadcast], order=[[]]): rowcount = 3.6, cumulative cost = {14.8 rows, 8.611361843663431 cpu, 810.0 io, 100.0 memory, 0.0 network}, id = 105
        OdpsPhysicalProject(name=[$0]): rowcount = 3.6, cumulative cost = {11.2 rows, 4.0 cpu, 450.0 io, 100.0 memory, 0.0 network}, id = 104
          OdpsPhysicalFilter(condition=[ISNOTNULL($0)]): rowcount = 3.6, cumulative cost = {7.6 rows, 4.0 cpu, 400.0 io, 100.0 memory, 0.0 network}, id = 103
            OdpsPhysicalTableScan(table=[[tpch_100gb.bad_tpch_customer, dt_bad_linenum(1) {1}]]): rowcount = 4.0, cumulative cost = {4.0 rows, 0.0 cpu, 400.0 io, 0.0 memory, 0.0 network}, id = 102
    OdpsPhysicalFilter(condition=[ISNOTNULL($0)]): rowcount = 1.35E7, cumulative cost = {2.85E+7 rows, 15000000.0 cpu, 270000000.0 io, 18.0 memory, 0.0 network}, id = 107
      OdpsPhysicalTableScan(table=[[tpch_100gb.tpch_customer, c_name(1) {1}]]): rowcount = 1.5E7, cumulative cost = {1.5E+7 rows, 0.0 cpu, 2.7E+8 io, 0.0 memory, 0.0 network}, id = 99  
  • 计划2:MergeJoin
OdpsPhysicalProject(name=[$0]): rowcount = 3.24, cumulative cost = {55500024.88 rows, 471791423.394757487326862 cpu, 756001229.0 io, 336.0 memory, 270459000360.0 network}, id = 104
  OdpsPhysicalMergeJoin(type=[INNER], equi=[[($0,$1)]]): rowcount = 3.24, cumulative cost = {55500021.64 rows, 471791423.394757487326862 cpu, 756001170.0 io, 336.0 memory, 270459000360.0 network}, id = 103
    OdpsPhysicalStreamlineRead(order=[[0]]): rowcount = 3.6, cumulative cost = {18.4 rows, 13.222723687326862 cpu, 1170.0 io, 100.0 memory, 360.0 network}, id = 99
      OdpsPhysicalStreamlineWrite(shuffle=[hash[0],JoinHasher], order=[[0]]): rowcount = 3.6, cumulative cost = {14.8 rows, 8.611361843663431 cpu, 810.0 io, 100.0 memory, 0.0 network}, id = 98
        OdpsPhysicalProject(name=[$0]): rowcount = 3.6, cumulative cost = {11.2 rows, 4.0 cpu, 450.0 io, 100.0 memory, 0.0 network}, id = 97
          OdpsPhysicalFilter(condition=[ISNOTNULL($0)]): rowcount = 3.6, cumulative cost = {7.6 rows, 4.0 cpu, 400.0 io, 100.0 memory, 0.0 network}, id = 96
            OdpsPhysicalTableScan(table=[[tpch_100gb.bad_tpch_customer, dt_bad_linenum(1) {1}]]): rowcount = 4.0, cumulative cost = {4.0 rows, 0.0 cpu, 400.0 io, 0.0 memory, 0.0 network}, id = 95
    OdpsPhysicalStreamlineRead(order=[[0]]): rowcount = 1.35E7, cumulative cost = {5.55E+7 rows, 458291406.5720338 cpu, 756000000.0 io, 18.0 memory, 270459000000.0 network}, id = 102
      OdpsPhysicalStreamlineWrite(shuffle=[hash[0],JoinHasher], order=[[0]]): rowcount = 1.35E7, cumulative cost = {4.20E+7 rows, 236645703.2860169 cpu, 513000000.0 io, 18.0 memory, 0.0 network}, id = 101
        OdpsPhysicalFilter(condition=[ISNOTNULL($0)]): rowcount = 1.35E7, cumulative cost = {2.85E+7 rows, 15000000.0 cpu, 270000000.0 io, 18.0 memory, 0.0 network}, id = 100
          OdpsPhysicalTableScan(table=[[tpch_100gb.tpch_customer, c_name(1) {1}]]): rowcount = 1.5E7, cumulative cost = {1.5E+7 rows, 0.0 cpu, 2.7E+8 io, 0.0 memory, 0.0 network}, id = 92

比较上述两个计划的cost,明显计划1的cost更小,因此选择包含HashJoin的计划1作为最优计划。

总结

AutoHashJoin的一个很大的好处是能让用户免参与的进行这个优化,同时对于一些复杂的query也更有可能使用HashJoin。但是,因为CBO无法完美估计数据量,会出现误判从而导致任务OOM的情况。针对这种情况,MaxCompute也进行了相应的调整,对于CBO误判导致HashJoin OOM的任务会关闭HashJoin rule来重试。

目前CBO中使用HashJoin的阈值比较保守,默认是25MB。主要原因是CBO对于数据量的估计有偏差,无法完美估计数据量,而估计不准的原因有两个:

  • 数据是压缩存储的,CBO拿到的statistics不准
  • CBO的估计算法有偏差

这两个问题也是CBO致力解决的问题。

image.png

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
目录
相关文章
|
1天前
|
SQL 安全 关系型数据库
sql注入原理和sqlmap命令的基础命令以及使用手法
sql注入原理和sqlmap命令的基础命令以及使用手法
|
3月前
|
SQL 存储 安全
SQL数据库:核心原理、应用实践与未来展望
在电子商务领域,SQL数据库用于存储商品信息、用户信息、订单信息等。通过SQL数据库,电商平台可以实现商品的快速检索、用户行为的跟踪分析、订单状态的实时更新等功能,提升用户体验和运营效率。
|
3月前
|
SQL 算法 数据库
SQL优化器原理 - Join重排
保证等价性:不同的Join顺序可能产生相同的结果集,但执行成本可能不同。因此,在重排Join顺序时,必须确保结果集的等价性。
|
3月前
|
SQL 算法 数据库
SQL优化器原理 - Join重排。
保证等价性:不同的Join顺序可能产生相同的结果集,但执行成本可能不同。因此,在重排Join顺序时,必须确保结果集的等价性。
|
5月前
|
达摩院 开发者 容器
「达摩院MindOpt」优化形状切割问题(MILP)
在制造业,高效地利用材料不仅是节约成本的重要环节,也是可持续发展的关键因素。无论是在金属加工、家具制造还是纺织品生产中,原材料的有效利用都直接影响了整体效率和环境影响。
「达摩院MindOpt」优化形状切割问题(MILP)
|
5月前
|
人工智能 自然语言处理 达摩院
MindOpt 云上建模求解平台:多求解器协同优化
数学规划是一种数学优化方法,主要是寻找变量的取值在特定的约束情况下,使我们的决策目标得到一个最大或者最小值的决策。
|
3月前
|
人工智能 算法 调度
优化问题之如何选择合适的优化求解器
优化问题之如何选择合适的优化求解器
|
3月前
|
调度 决策智能
优化问题之优化求解器有哪些主要的评估特性
优化问题之优化求解器有哪些主要的评估特性
|
达摩院 调度
使用达摩院MindOpt优化交通调度_最大化通行量—线性规划问题
在数学规划中,网络流问题是指一类基于网络模型的流量分配问题。网络流问题的目标是在网络中分配资源,使得网络的流量满足一定的限制条件,并且使得某些目标函数最小或最大化。网络流问题通常涉及一个有向图,图中每个节点表示一个资源,每条边表示资源之间的关系。边上有一个容量值,表示该边上最多可以流动的资源数量。流量从源节点开始流出,经过一系列中间节点,最终到达汇节点。在这个过程中,需要遵守一定的流量守恒和容量限制条件。
|
5月前
|
存储 达摩院 调度
「达摩院MindOpt」优化FlowShop流水线作业排班问题
在企业在面临大量多样化的生产任务时,如何合理地安排流水线作业以提高生产效率及确保交货期成为了一个重要的问题。
「达摩院MindOpt」优化FlowShop流水线作业排班问题