SQL优化器原理 - Auto Hash Join

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 在MaxCompute中,Join操作符的实现算法之一名为"Hash Join",其实现原理是,把小表的数据全部读入内存中,并拷贝多份分发到大表数据所在机器,在 map 阶段直接扫描大表数据与内存中的小表数据进行匹配。

这是MaxCompute有关SQL优化器原理的系列文章之一。我们会陆续推出SQL优化器有关优化规则和框架的其他文章。添加钉钉群“关系代数优化技术”(群号11719083)可以获取最新文章发布动态(二维码在文章末尾)。

本文主要描述MaxCompute优化器实现的Auto Hash Join的功能。

简介

在MaxCompute中,Join操作符的实现算法之一名为"Hash Join",其实现原理是,把小表的数据全部读入内存中,并拷贝多份分发到大表数据所在机器,在 map 阶段直接扫描大表数据与内存中的小表数据进行匹配。Hash join执行方式效率很高,但是要求小表数据足够小以便放到内存中,假如小表数据太大,则任务在执行过程中会报OutOfMemory错误。

在MapCompute中,可以使用MapJoin关键字来实现Hash join,如下所示:

select /* + mapjoin(b) */  a.* from table1 a join table2 b on a.col1 = b.col2;
// b表为小表

但是这种通过使用hint的方式还是不够智能。另外对于query复杂的情况,用户很可能因为无法确定join的某一路数据量大小而放弃使用mapjoin。在最新的MaxCompute SQL 2.0中,基于代价的优化器(Cost Based Optimizer,CBO)包含了一个自动优化join为hash join的优化规则。

实现原理

在CBO中会对所有的operator的cost进行估计,这个cost包含rowcount、cpu、内存等等。有了各个operator的cost,就能估计其对应输出数据量的大小,公式可以简单的认为是: data_size = rowcount * averageRowSize。有了dataSize之后,就可以很容易知道这个任务是否适合使用HashJoin,其判定方法就是计算各个parent operator的data size之和是否小于某个阈值。假如估算出的data size在阈值范围之内,则会产生一个包含HashJoin的计划。同时对于Join,CBO也会产生一个普通的包含MergeJoin的计划,最后在这两个计划中选择cost最小的作为最优计划。

简单说来,在CBO中是否选择HashJoin作为最优计划的步骤有两个:

  • Step1:估算join的输入数据量大小,判定是否产生一个包含HashJoin的计划
  • Step2:对比HashJoin、MergeJoin相关计划的cost,选择cost最小的计划作为最优计划

举例,对如下sql进行优化:

select t1.name from
  (select dt_bad_linenum as name from bad_tpch_customer) t1
join
  (select c_name from tpch_customer) t2
on t1.name = t2.c_name;

上述sql在CBO中会翻译生成如下operator tree:

OdpsLogicalProject(name=[$0]): rowcount = 9000000.0, cumulative cost = {48000008.0 rows, 39000010.0 cpu, 0.0 io, 0.0 memory, 0.0 network}, id = 5
  LogicalJoin(condition=[EQ($0, $1)], joinType=[inner]): rowcount = 9000000.0, cumulative cost = {39000008.0 rows, 30000010.0 cpu, 0.0 io, 0.0 memory, 0.0 network}, id = 4
    OdpsLogicalProject(name=[$1]): rowcount = 4.0, cumulative cost = {8.0 rows, 9.0 cpu, 0.0 io, 0.0 memory, 0.0 network}, id = 1
      OdpsLogicalTableScan(table=[[tpch_100gb.bad_tpch_customer, dt_bad_file,dt_bad_linenum,dt_bad_msg,dt_bad_code,dt_bad_data(5) {0, 1, 2, 3, 4}]]): rowcount = 4.0, cumulative cost = {4.0 rows, 5.0 cpu, 0.0 io, 0.0 memory, 0.0 network}, id = 0
    OdpsLogicalProject(c_name=[$1]): rowcount = 1.5E7, cumulative cost = {3.0E+7 rows, 30000001 cpu, 0.0 io, 0.0 memory, 0.0 network}, id = 3
      OdpsLogicalTableScan(table=[[tpch_100gb.tpch_customer, c_custkey,c_name,c_address,c_nationkey,c_phone,c_acctbal,c_mktsegment,c_comment(8) {0, 1, 2, 3, 4, 5, 6, 7}]]): rowcount = 1.5E7, cumulative cost = {1.5E+7 rows, 15000001 cpu, 0.0 io, 0.0 memory, 0.0 network}, id = 2

从上可以看到,join的parent operator有两个:

OdpsLogicalProject(name=[$1]): rowcount = 4.0, cumulative cost = {8.0 rows, 9.0 cpu, 0.0 io, 0.0 memory, 0.0 network}, id = 1

OdpsLogicalProject(c_name=[$1]): rowcount = 1.5E7, cumulative cost = {3.0E+7 rows, 30000001 cpu, 0.0 io, 0.0 memory, 0.0 network}, id = 3
    

其中id为1的project其输出记录数是4行,且其输出列只有1列(bad_tpch_customer表中有5列),估算其输出数据量,认为其适合使用HashJoin,因此其产生的计划中包含两种:

  • 计划1:HashJoin
OdpsPhysicalProject(name=[$0]): rowcount = 3.24, cumulative cost = {28500024.88 rows, 28500013.222723687326862 cpu, 270001607.0 io, 496.0 memory, 378.0 network}, id = 109
  OdpsPhysicalHashJoin(type=[INNER], equi=[[($0,$1)]], mainstream=[1]): rowcount = 3.24, cumulative cost = {28500021.64 rows, 28500013.222723687326862 cpu, 270001548.0 io, 496.0 memory, 378.0 network}, id = 108
    OdpsPhysicalStreamlineRead(order=[[]]): rowcount = 3.6, cumulative cost = {18.4 rows, 13.222723687326862 cpu, 1170.0 io, 100.0 memory, 0.0 network}, id = 106
      OdpsPhysicalStreamlineWrite(shuffle=[broadcast], order=[[]]): rowcount = 3.6, cumulative cost = {14.8 rows, 8.611361843663431 cpu, 810.0 io, 100.0 memory, 0.0 network}, id = 105
        OdpsPhysicalProject(name=[$0]): rowcount = 3.6, cumulative cost = {11.2 rows, 4.0 cpu, 450.0 io, 100.0 memory, 0.0 network}, id = 104
          OdpsPhysicalFilter(condition=[ISNOTNULL($0)]): rowcount = 3.6, cumulative cost = {7.6 rows, 4.0 cpu, 400.0 io, 100.0 memory, 0.0 network}, id = 103
            OdpsPhysicalTableScan(table=[[tpch_100gb.bad_tpch_customer, dt_bad_linenum(1) {1}]]): rowcount = 4.0, cumulative cost = {4.0 rows, 0.0 cpu, 400.0 io, 0.0 memory, 0.0 network}, id = 102
    OdpsPhysicalFilter(condition=[ISNOTNULL($0)]): rowcount = 1.35E7, cumulative cost = {2.85E+7 rows, 15000000.0 cpu, 270000000.0 io, 18.0 memory, 0.0 network}, id = 107
      OdpsPhysicalTableScan(table=[[tpch_100gb.tpch_customer, c_name(1) {1}]]): rowcount = 1.5E7, cumulative cost = {1.5E+7 rows, 0.0 cpu, 2.7E+8 io, 0.0 memory, 0.0 network}, id = 99  
  • 计划2:MergeJoin
OdpsPhysicalProject(name=[$0]): rowcount = 3.24, cumulative cost = {55500024.88 rows, 471791423.394757487326862 cpu, 756001229.0 io, 336.0 memory, 270459000360.0 network}, id = 104
  OdpsPhysicalMergeJoin(type=[INNER], equi=[[($0,$1)]]): rowcount = 3.24, cumulative cost = {55500021.64 rows, 471791423.394757487326862 cpu, 756001170.0 io, 336.0 memory, 270459000360.0 network}, id = 103
    OdpsPhysicalStreamlineRead(order=[[0]]): rowcount = 3.6, cumulative cost = {18.4 rows, 13.222723687326862 cpu, 1170.0 io, 100.0 memory, 360.0 network}, id = 99
      OdpsPhysicalStreamlineWrite(shuffle=[hash[0],JoinHasher], order=[[0]]): rowcount = 3.6, cumulative cost = {14.8 rows, 8.611361843663431 cpu, 810.0 io, 100.0 memory, 0.0 network}, id = 98
        OdpsPhysicalProject(name=[$0]): rowcount = 3.6, cumulative cost = {11.2 rows, 4.0 cpu, 450.0 io, 100.0 memory, 0.0 network}, id = 97
          OdpsPhysicalFilter(condition=[ISNOTNULL($0)]): rowcount = 3.6, cumulative cost = {7.6 rows, 4.0 cpu, 400.0 io, 100.0 memory, 0.0 network}, id = 96
            OdpsPhysicalTableScan(table=[[tpch_100gb.bad_tpch_customer, dt_bad_linenum(1) {1}]]): rowcount = 4.0, cumulative cost = {4.0 rows, 0.0 cpu, 400.0 io, 0.0 memory, 0.0 network}, id = 95
    OdpsPhysicalStreamlineRead(order=[[0]]): rowcount = 1.35E7, cumulative cost = {5.55E+7 rows, 458291406.5720338 cpu, 756000000.0 io, 18.0 memory, 270459000000.0 network}, id = 102
      OdpsPhysicalStreamlineWrite(shuffle=[hash[0],JoinHasher], order=[[0]]): rowcount = 1.35E7, cumulative cost = {4.20E+7 rows, 236645703.2860169 cpu, 513000000.0 io, 18.0 memory, 0.0 network}, id = 101
        OdpsPhysicalFilter(condition=[ISNOTNULL($0)]): rowcount = 1.35E7, cumulative cost = {2.85E+7 rows, 15000000.0 cpu, 270000000.0 io, 18.0 memory, 0.0 network}, id = 100
          OdpsPhysicalTableScan(table=[[tpch_100gb.tpch_customer, c_name(1) {1}]]): rowcount = 1.5E7, cumulative cost = {1.5E+7 rows, 0.0 cpu, 2.7E+8 io, 0.0 memory, 0.0 network}, id = 92

比较上述两个计划的cost,明显计划1的cost更小,因此选择包含HashJoin的计划1作为最优计划。

总结

AutoHashJoin的一个很大的好处是能让用户免参与的进行这个优化,同时对于一些复杂的query也更有可能使用HashJoin。但是,因为CBO无法完美估计数据量,会出现误判从而导致任务OOM的情况。针对这种情况,MaxCompute也进行了相应的调整,对于CBO误判导致HashJoin OOM的任务会关闭HashJoin rule来重试。

目前CBO中使用HashJoin的阈值比较保守,默认是25MB。主要原因是CBO对于数据量的估计有偏差,无法完美估计数据量,而估计不准的原因有两个:

  • 数据是压缩存储的,CBO拿到的statistics不准
  • CBO的估计算法有偏差

这两个问题也是CBO致力解决的问题。

image.png

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
目录
相关文章
|
15天前
|
SQL
SQL JOIN
【11月更文挑战第06天】
33 4
|
21天前
|
SQL 关系型数据库 MySQL
图解 SQL 里的各种 JOIN
用文氏图表示 SQL 里的各种 JOIN,一下子就理解了。
30 2
|
1月前
|
SQL 关系型数据库 数据库
SQL数据库:核心原理与应用实践
随着信息技术的飞速发展,数据库管理系统已成为各类组织和企业中不可或缺的核心组件。在众多数据库管理系统中,SQL(结构化查询语言)数据库以其强大的数据管理能力和灵活性,广泛应用于各类业务场景。本文将深入探讨SQL数据库的基本原理、核心特性以及实际应用。一、SQL数据库概述SQL数据库是一种关系型数据库
49 5
|
1月前
|
SQL 分布式计算 Java
Hadoop-11-MapReduce JOIN 操作的Java实现 Driver Mapper Reducer具体实现逻辑 模拟SQL进行联表操作
Hadoop-11-MapReduce JOIN 操作的Java实现 Driver Mapper Reducer具体实现逻辑 模拟SQL进行联表操作
31 3
|
1月前
|
SQL 监控 安全
SQL注入公鸡分类及原理
SQL注入公鸡分类及原理
|
1月前
|
SQL 关系型数据库 MySQL
sql注入原理与实战(三)数据库操作
sql注入原理与实战(三)数据库操作
sql注入原理与实战(三)数据库操作
|
1月前
|
SQL 分布式计算 大数据
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
42 0
|
1月前
|
SQL 分布式计算 算法
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
76 0
|
1月前
|
SQL 关系型数据库 Serverless
sql注入原理与实战(四)数据表操作
sql注入原理与实战(四)数据表操作
|
1月前
|
SQL 存储 Java
sql注入原理与实战(二)数据库原理
sql注入原理与实战(二)数据库原理