Spark SQL 性能优化再进一步:CBO 基于代价的优化

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: 本文将介绍 CBO,它充分考虑了数据本身的特点(如大小、分布)以及操作算子的特点(中间结果集的分布及大小)及代价,从而更好的选择执行代价最小的物理执行计划,即 SparkPlan。

转载请务必将下面这段话置于文章开头处。

本文转发自技术世界原文链接 http://www.jasongj.com/spark/cbo/

Spark CBO 背景

上文Spark SQL 内部原理中介绍的 Optimizer 属于 RBO,实现简单有效。它属于 LogicalPlan 的优化,所有优化均基于 LogicalPlan 本身的特点,未考虑数据本身的特点,也未考虑算子本身的代价。

本文将介绍 CBO,它充分考虑了数据本身的特点(如大小、分布)以及操作算子的特点(中间结果集的分布及大小)及代价,从而更好的选择执行代价最小的物理执行计划,即 SparkPlan。

Spark CBO 原理

CBO 原理是计算所有可能的物理计划的代价,并挑选出代价最小的物理执行计划。其核心在于评估一个给定的物理执行计划的代价。

物理执行计划是一个树状结构,其代价等于每个执行节点的代价总合,如下图所示。


CBO 总代价

而每个执行节点的代价,分为两个部分

  • 该执行节点对数据集的影响,或者说该节点输出数据集的大小与分布
  • 该执行节点操作算子的代价

每个操作算子的代价相对固定,可用规则来描述。而执行节点输出数据集的大小与分布,分为两个部分:1) 初始数据集,也即原始表,其数据集的大小与分布可直接通过统计得到;2)中间节点输出数据集的大小与分布可由其输入数据集的信息与操作本身的特点推算。

所以,最终主要需要解决两个问题

  • 如何获取原始数据集的统计信息
  • 如何根据输入数据集估算特定算子的输出数据集

Statistics 收集

通过如下 SQL 语句,可计算出整个表的记录总数以及总大小

1
ANALYZE TABLE table_name COMPUTE STATISTICS;

从如下示例中,Statistics 一行可见, customer 表数据总大小为 37026233 字节,即 35.3MB,总记录数为 28万,与事实相符。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
spark-sql> ANALYZE TABLE customer COMPUTE STATISTICS;
Time taken: 12.888 seconds

spark-sql> desc extended customer;
c_customer_sk bigint NULL
c_customer_id string NULL
c_current_cdemo_sk bigint NULL
c_current_hdemo_sk bigint NULL
c_current_addr_sk bigint NULL
c_first_shipto_date_sk bigint NULL
c_first_sales_date_sk bigint NULL
c_salutation string NULL
c_first_name string NULL
c_last_name string NULL
c_preferred_cust_flag string NULL
c_birth_day int NULL
c_birth_month int NULL
c_birth_year int NULL
c_birth_country string NULL
c_login string NULL
c_email_address string NULL
c_last_review_date string NULL

# Detailed Table Information
Database jason_tpc_ds
Table customer
Owner jason
Created Time Sat Sep 15 14:00:40 CST 2018
Last Access Thu Jan 01 08:00:00 CST 1970
Created By Spark 2.3.2
Type EXTERNAL
Provider hive
Table Properties [transient_lastDdlTime=1536997324]
Statistics 37026233 bytes, 280000 rows
Location hdfs://dw/tpc_ds/customer
Serde Library org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat org.apache.hadoop.mapred.TextInputFormat
OutputFormat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
Storage Properties [field.delim=|, serialization.format=|]
Partition Provider Catalog
Time taken: 1.691 seconds, Fetched 36 row(s)

通过如下 SQL 语句,可计算出指定列的统计信息

1
ANALYZE TABLE table_name COMPUTE STATISTICS FOR COLUMNS [column1] [,column2] [,column3] [,column4] ... [,columnn];

从如下示例可见,customer 表的 c_customer_sk 列最小值为 1, 最大值为 280000,null 值个数为 0,不同值个数为 274368,平均列长度为 8,最大列长度为 8。

1
2
3
4
5
6
7
8
9
10
11
12
13
spark-sql> ANALYZE TABLE customer COMPUTE STATISTICS FOR COLUMNS c_customer_sk, c_customer_id, c_current_cdemo_sk;
Time taken: 9.139 seconds
spark-sql> desc extended customer c_customer_sk;
col_name c_customer_sk
data_type bigint
comment NULL
min 1
max 280000
num_nulls 0
distinct_count 274368
avg_col_len 8
max_col_len 8
histogram NULL

除上述示例中的统计信息外,Spark CBO 还直接等高直方图。在上例中,histogram 为 NULL。其原因是,spark.sql.statistics.histogram.enabled 默认值为 false,也即 ANALYZE 时默认不计算及存储 histogram。

下例中,通过 SET spark.sql.statistics.histogram.enabled=true; 启用 histogram 后,完整的统计信息如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
spark-sql> ANALYZE TABLE customer COMPUTE STATISTICS FOR COLUMNS c_customer_sk,c_customer_id,c_current_cdemo_sk,c_current_hdemo_sk,c_current_addr_sk,c_first_shipto_date_sk,c_first_sales_date_sk,c_salutation,c_first_name,c_last_name,c_preferred_cust_flag,c_birth_day,c_birth_month,c_birth_year,c_birth_country,c_login,c_email_address,c_last_review_date;
Time taken: 125.624 seconds

spark-sql> desc extended customer c_customer_sk;
col_name c_customer_sk
data_type bigint
comment NULL
min 1
max 280000
num_nulls 0
distinct_count 274368
avg_col_len 8
max_col_len 8
histogram height: 1102.3622047244094, num_of_bins: 254
bin_0 lower_bound: 1.0, upper_bound: 1090.0, distinct_count: 1089
bin_1 lower_bound: 1090.0, upper_bound: 2206.0, distinct_count: 1161
bin_2 lower_bound: 2206.0, upper_bound: 3286.0, distinct_count: 1124

...

bin_251 lower_bound: 276665.0, upper_bound: 277768.0, distinct_count: 1041
bin_252 lower_bound: 277768.0, upper_bound: 278870.0, distinct_count: 1098
bin_253 lower_bound: 278870.0, upper_bound: 280000.0, distinct_count: 1106

从上图可见,生成的 histogram 为 equal-height histogram,且高度为 1102.36,bin 数为 254。其中 bin 个数可由 spark.sql.statistics.histogram.numBins 配置。对于每个 bin,匀记录其最小值,最大值,以及 distinct count。

值得注意的是,这里的 distinct count 并不是精确值,而是通过 HyperLogLog 计算出来的近似值。使用 HyperLogLog 的原因有二

  • 使用 HyperLogLog 计算 distinct count 速度快速
  • HyperLogLog 计算出的 distinct count 可以合并。例如可以直接将两个 bin 的 HyperLogLog 值合并算出这两个 bin 总共的 distinct count,而无须从重新计算,且合并结果的误差可控

算子对数据集影响估计

对于中间算子,可以根据输入数据集的统计信息以及算子的特性,可以估算出输出数据集的统计结果。


Spark SQL CBO Operator Estimation

本节以 Filter 为例说明算子对数据集的影响。

对于常见的 Column A < value B Filter,可通过如下方式估算输出中间结果的统计信息

  • 若 B < A.min,则无数据被选中,输出结果为空
  • 若 B > A.max,则全部数据被选中,输出结果与 A 相同,且统计信息不变
  • 若 A.min < B < A.max,则被选中的数据占比为 (B.value - A.min) / (A.max - A.min),A.min 不变,A.max 更新为 B.value,A.ndv = A.ndv * (B.value - A.min) / (A.max - A.min)

Spark SQL CBO Filter Estimation




上述估算的前提是,字段 A 数据均匀分布。但很多时候,数据分布并不均匀,且当数据倾斜严重是,上述估算误差较大。此时,可充分利用 histogram 进行更精确的估算


Spark SQL CBO Filter Estimation with Histogram

启用 Historgram 后,Filter Column A < value B的估算方法为

  • 若 B < A.min,则无数据被选中,输出结果为空
  • 若 B > A.max,则全部数据被选中,输出结果与 A 相同,且统计信息不变
  • 若 A.min < B < A.max,则被选中的数据占比为 height(<B) / height(All),A.min 不变,A.max = B.value,A.ndv = ndv(<B)

在上图中,B.value = 15,A.min = 0,A.max = 32,bin 个数为 10。Filter 后 A.ndv = ndv(<B.value) = ndv(<15)。该值可根据 A < 15 的 5 个 bin 的 ndv 通过 HyperLogLog 合并而得,无须重新计算所有 A < 15 的数据。

算子代价估计

SQL 中常见的操作有 Selection(由 select 语句表示),Filter(由 where 语句表示)以及笛卡尔乘积(由 join 语句表示)。其中代价最高的是 join。

Spark SQL 的 CBO 通过如下方法估算 join 的代价

1
2
Cost = rows * weight + size * (1 - weight)
Cost = CostCPU * weight + CostIO * (1 - weight)

其中 rows 即记录行数代表了 CPU 代价,size 代表了 IO 代价。weight 由 spark.sql.cbo.joinReorder.card.weight 决定,其默认值为 0.7。

Build侧选择

对于两表Hash Join,一般选择小表作为build size,构建哈希表,另一边作为 probe side。未开启 CBO 时,根据表原始数据大小选择 t2 作为build side


Spark SQL build side without CBO


而开启 CBO 后,基于估计的代价选择 t1 作为 build side。更适合本例


Spark SQL build side with CBO


优化 Join 类型

在 Spark SQL 中,Join 可分为 Shuffle based Join 和 BroadcastJoin。Shuffle based Join 需要引入 Shuffle,代价相对较高。BroadcastJoin 无须 Join,但要求至少有一张表足够小,能通过 Spark 的 Broadcast 机制广播到每个 Executor 中。

在不开启 CBO 中,Spark SQL 通过 spark.sql.autoBroadcastJoinThreshold 判断是否启用 BroadcastJoin。其默认值为 10485760 即 10 MB。

并且该判断基于参与 Join 的表的原始大小。

在下图示例中,Table 1 大小为 1 TB,Table 2 大小为 20 GB,因此在对二者进行 join 时,由于二者都远大于自动 BroatcastJoin 的阈值,因此 Spark SQL 在未开启 CBO 时选用 SortMergeJoin 对二者进行 Join。

而开启 CBO 后,由于 Table 1 经过 Filter 1 后结果集大小为 500 GB,Table 2 经过 Filter 2 后结果集大小为 10 MB 低于自动 BroatcastJoin 阈值,因此 Spark SQL 选用 BroadcastJoin。


Spark SQL join type selection with CBO


优化多表 Join 顺序

未开启 CBO 时,Spark SQL 按 SQL 中 join 顺序进行 Join。极端情况下,整个 Join 可能是 left-deep tree。在下图所示 TPC-DS Q25 中,多路 Join 存在如下问题,因此耗时 241 秒。

  • left-deep tree,因此所有后续 Join 都依赖于前面的 Join 结果,各 Join 间无法并行进行
  • 前面的两次 Join 输入输出数据量均非常大,属于大 Join,执行时间较长


Spark SQL multi join


开启 CBO 后, Spark SQL 将执行计划优化如下


Spark SQL multi join reorder with CBO


优化后的 Join 有如下优势,因此执行时间降至 71 秒

  • Join 树不再是 left-deep tree,因此 Join 3 与 Join 4 可并行进行,Join 5 与 Join 6 可并行进行
  • 最大的 Join 5 输出数据只有两百万条结果,Join 6 有 1.49 亿条结果,Join 7相当于小 Join


相关实践学习
基于EMR Serverless StarRocks一键玩转世界杯
基于StarRocks构建极速统一OLAP平台
快速掌握阿里云 E-MapReduce
E-MapReduce 是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户提供集群、作业、数据等管理的一站式大数据处理分析服务。 本课程主要介绍阿里云 E-MapReduce 的使用方法。
相关文章
|
1月前
|
SQL Oracle 数据库
使用访问指导(SQL Access Advisor)优化数据库业务负载
本文介绍了Oracle的SQL访问指导(SQL Access Advisor)的应用场景及其使用方法。访问指导通过分析给定的工作负载,提供索引、物化视图和分区等方面的优化建议,帮助DBA提升数据库性能。具体步骤包括创建访问指导任务、创建工作负载、连接工作负载至访问指导、设置任务参数、运行访问指导、查看和应用优化建议。访问指导不仅针对单条SQL语句,还能综合考虑多条SQL语句的优化效果,为DBA提供全面的决策支持。
72 11
|
4天前
|
SQL 分布式计算 Java
Spark SQL向量化执行引擎框架Gluten-Velox在AArch64使能和优化
本文摘自 Arm China的工程师顾煜祺关于“在 Arm 平台上使用 Native 算子库加速 Spark”的分享,主要内容包括以下四个部分: 1.技术背景 2.算子库构成 3.算子操作优化 4.未来工作
|
2月前
|
SQL 缓存 数据库
SQL慢查询优化策略
在数据库管理和应用开发中,SQL查询的性能优化至关重要。慢查询优化不仅可以提高应用的响应速度,还能降低服务器负载,提升用户体验。本文将详细介绍针对SQL慢查询的优化策略。
|
2月前
|
SQL 存储 BI
gbase 8a 数据库 SQL合并类优化——不同数据统计周期合并为一条SQL语句
gbase 8a 数据库 SQL合并类优化——不同数据统计周期合并为一条SQL语句
|
2月前
|
SQL 数据库
gbase 8a 数据库 SQL优化案例-关联顺序优化
gbase 8a 数据库 SQL优化案例-关联顺序优化
|
2月前
|
SQL 数据库 UED
SQL性能提升秘籍:5步优化法与10个实战案例
在数据库管理和应用开发中,SQL查询的性能优化至关重要。高效的SQL查询不仅可以提高应用的响应速度,还能降低服务器负载,提升用户体验。本文将分享SQL优化的五大步骤和十个实战案例,帮助构建高效、稳定的数据库应用。
138 3
|
2月前
|
SQL 缓存 监控
SQL性能提升指南:五大优化策略与十个实战案例
在数据库性能优化的世界里,SQL优化是提升查询效率的关键。一个高效的SQL查询可以显著减少数据库的负载,提高应用响应速度,甚至影响整个系统的稳定性和扩展性。本文将介绍SQL优化的五大步骤,并结合十个实战案例,为你提供一份详尽的性能提升指南。
70 0
|
8月前
|
SQL 分布式计算 数据库
【大数据技术Spark】Spark SQL操作Dataframe、读写MySQL、Hive数据库实战(附源码)
【大数据技术Spark】Spark SQL操作Dataframe、读写MySQL、Hive数据库实战(附源码)
343 0
|
8月前
|
SQL 分布式计算 大数据
【大数据技术Hadoop+Spark】Spark SQL、DataFrame、Dataset的讲解及操作演示(图文解释)
【大数据技术Hadoop+Spark】Spark SQL、DataFrame、Dataset的讲解及操作演示(图文解释)
187 0
|
2月前
|
SQL JSON 分布式计算
【赵渝强老师】Spark SQL的数据模型:DataFrame
本文介绍了在Spark SQL中创建DataFrame的三种方法。首先,通过定义case class来创建表结构,然后将CSV文件读入RDD并关联Schema生成DataFrame。其次,使用StructType定义表结构,同样将CSV文件读入RDD并转换为Row对象后创建DataFrame。最后,直接加载带有格式的数据文件(如JSON),通过读取文件内容直接创建DataFrame。每种方法都包含详细的代码示例和解释。