Apache Spark 2.2中基于成本的优化器(CBO)

简介: Apache Spark 2.2中基于成本的优化器(CBO)

Apache Spark 2.2最近装备了高级的基于成本的优化器框架用于收集并均衡不同的列数据的统计工作 (例如., 基(cardinality)、唯一值的数量、空值、最大最小值、平均/最大长度,等等)来改进查询类作业的执行计划。均衡这些作业帮助Spark在选取最优查询计划时做出更好决定。这些优化的例子包括在做hash-join时选择正确的一方建hash,选择正确的join类型(广播hash join和全洗牌hash-join)或调整多路join的顺序,等等)


在该博客中,我们将深入讲解Spark的基于成本的优化器(CBO)并讨论Spark是如何收集并存储这些数据、优化查询,并在压力测试查询中展示所带来的性能影响。


一个启发性的例子


在Spark2.2核心,Catalyst优化器是一个统一的库,用于将查询计划表示成多颗树并依次使用多个优化规则来变换他们。大部门优化规则都基于启发式,例如,他们只负责查询的结构且不关心要处理数据的属性,这样严重限制了他们的可用性。让我们用一个简单的例子来演示。考虑以下的查询,该查询过滤大小为500GB的t1表并与另一张大小为20GB的t2表做join操作。Spark使用hash join,即选择小的join关系作为构建hash表的一方并选择大的join关系作为探测方。由于t2表比t1表小, Apache Spark 2.1 将会选择右方作为构建hash表的一方而不是对其进行过滤操作(在这个案例中就是会过滤出t1表的大部分数据)。选择错误那方做构建hash表经常会导致系统由于内存限制的原因去放弃快速hash join而使用排序-归并 join(sort-merge join)。

f1b5aa71cb61427962b3becac814e5a6.png

而Apache Spark 2.2却不这么做,它会收集每个操作的统计信息 并发现左方在过滤后大小只有100MB (1 百万条纪录) ,而过滤右方会有20GB (1亿条纪录)。有了两侧正确的表大小/基的信息,Spark 2.2会选择左方为构建方,这种选择会极大加快查询速度。

为了改进查询执行计划的质量,我们使用详细的统计信息加强了Spark SQL优化器。从详细的统计信息中,我们传播统计信息到别的操作子(因为我们从下往上遍历查询树)。传播结束,我们可以估计每个数据库操作子的输出记录数和输出纪录的大小,这样就可以得到一个高效的查询计划。


统计信息收集框架


ANALYZE TABLE 命令

CBO relies on detailed statistics to optimize a query plan. CBO依赖细节化的统计信息来优化查询计划。要收集这些统计信息,用户可以使用以下这些新的SQL命令:

ANALYZE TABLE table_name COMPUTE STATISTICS

上面的 SQL 语句可以收集表级的统计信息,例如记录数、表大小(单位是byte)。这里需要注意的是ANALYZE, COMPUTE, and STATISTICS都是保留的关键字,他们已特定的列名为入参,在metastore中保存表级的统计信息。

ANALYZE TABLE table_name COMPUTE STATISTICS FOR COLUMNS column-name1, column-name2, ….

需要注意的是在ANALYZE 语句中没必要指定表的每个列-只要指定那些在过滤/join条件或group by等中涉及的列


统计信息类型


下表列出了所收集的统计信息的类型,包括数字类型、日期、时间戳和字符串、二进制数据类型

f92b9d638f4bc6ecfde47d2bc8ebbe84.png

由于CBO是以后续方式遍历Spark的逻辑计划树,我们可以自底向上地把这些统计信息传播到其他操作子。虽然我们要评估的统计信息及对应开销的操作子有很多,我们将讲解两个最复杂且有趣的操作子FILTER 和JOIN的评估统计信息的流程。


过滤选择


过滤条件是配置在SQL SELECT语句中的WHERE 子句的谓语表达式。谓语可以是包含了逻辑操作子AND、OR、NOT且包含了多个条件的复杂的逻辑表达式 。单个条件通常包含比较操作子,例如=, <, <=, >, >= or <=>。因此,根据全部过滤表达式来估计选择是非常复杂的。


我们来演示对包含多个条件逻辑表达式的复杂逻辑表达式做过滤选择 的一些计算。

  • 对于逻辑表达式AND,他的过滤选择是左条件的选择乘以右条件选择,例如fs(a AND b) = fs(a) * fs (b)。
  • 对于逻辑表达式OR,他的过滤选择是左条件的选择加上右条件选择并减去左条件中逻辑表达式AND的选择,例如 fs (a OR b) = fs (a) + fs (b) - fs (a AND b) = fs (a) + fs (b) – (fs (a) * fs (b))
  • 对于逻辑表达式NOT,他的过滤因子是1.0 减去原表达式的选择,例如 fs (NOT a) = 1.0 - fs (a)

现在我们看下可能有多个操作子的单个逻辑条件例如 =, <, <=, >, >= or <=>。对于单个操作符作为列,另一个操作符为字符串的情况,我们先计算等于 (=) 和小于 (<) 算子的过滤选择。其他的比较操作符也是类似。

  • 等于操作符 (=) :我们检查条件中的字符串常量值是否落在列的当前最小值和最大值的区间内 。这步是必要的,因为如果先使用之前的条件可能会导致区间改变。如果常量值落在区间外,那么过滤选择就是 0.0。否则,就是去重后值的反转(注意:不包含额外的柱状图信息,我们仅仅估计列值的统一分布)。后面发布的版本将会均衡柱状图来优化估计的准确性。
  • 小于操作符 (<) :检查条件中的字符串常量值落在哪个区间。如果比当前列值的最小值还小,那么过滤选择就是 0.0(如果大于最大值,选择即为1.0)。否则,我们基于可用的信息计算过滤因子。如果没有柱状图,就传播并把过滤选择设置为: (常量值– 最小值) / (最大值 – 最小值)。另外,如果有柱状图,在计算过滤选择时就会加上在当前列最小值和常量值之间的柱状图桶密度 。同时,注意在条件右边的常量值此时变成了该列的最大值。

Join基数


我们已经讨论了过滤选择, 现在讨论join的输出基。在计算二路join的输出基之前,我们需要先有双方孩子节点的输出基 。每个join端的基都不会超过原表记录数的基。更准确的说,是在执行join操作子之前,执行所有操作后得到的有效纪录数。在此,我们偏好计算下內连接(inner join)操作的基因为它经常用于演化出其他join类型的基。我们计算下在 A.k = B.k 条件下A join B 的记录数 ,即

num(A IJ B) = num(A)*num(B)/max(distinct(A.k),distinct(B.k))

num(A) 是join操作上一步操作执行后表A的有效记录数, distinct是join列 k唯一值的数量。

如下所示,通过计算内连接基,我们可以大概演化出其他join类型的基:

  • 左外连接(Left-Outer Join): num(A LOJ B) = max(num(A IJ B),num(A)) 是指内连接输出基和左外连接端A的基之间较大的值。这是因为我们需要把外端的每条纪录计入,虽然他们没有出现在join输出纪录内。
  • Right-Outer Join: num(A ROJ B) = max(num(A IJ B),num(B))
  • Full-Outer Join: num(A FOJ B) = num(A LOJ B) + num(A ROJ B) - num(A IJ B)


最优计划选择


现在我们已经有了数据统计的中间结果,让我们讨论下如何使用这个信息来选择最佳的查询计划。早先我们解释了在hash join操作中根据精确的基和统计信息选择构建方。

同样,根据确定的基和join操作的前置所有操作的大小估计,我们可以更好的估计join测的大小来决定该测是否符合广播的条件。

这些统计信息同时也有助于我们均衡基于成本的join的重排序优化。我们适配了动态编程算法[Selinger 1979]3 来选取最佳的多路join顺序。更确切的说,在构建多路join时候,我们仅考虑同个集合(包含m个元素)的最佳方案(成本最低)。举个例子,对于3路join,我们根据元素  {A, B, C} 可能的排列顺序,即 (A join B) join C, (A join C) join B 和(B join C) join A来考虑最佳join方案。我们适配 的算法考虑了所有的组合,包括左线性树(left-deep trees),浓密树(bushy trees)和右线性树(right-deep-trees)。我们还修剪笛卡儿积(cartesian product )用于在构建新的计划时如果左右子树都没有join条件包含的引用需要情况。这个修剪策略显著减少了搜索范围。

大部分数据库优化器将CPU和I/O计入考虑因素,分开考虑成本来估计总共的操作开销。在Spark中,我们用简单的公式估计join操作的成本:

cost = weight * cardinality + (1.0 - weight) * size 4

公式的第一部分对应CPU成本粗略值,第二部分对应IO。一颗join树的成本是所有中间join成本的总和。


查询的性能测试和分析


我们使用非侵入式方法把这些基于成本的优化加入到Spark,通过加入全局配置spark.sql.cbo.enabled来开关这个特性。在Spark 2.2, 这个参数默认是false 。短期内有意设置该特性默认为关闭,因为的Spark被上千家公司用于生产环境,默认开启该特性可能会导致生产环境压力变大从而导致不良后果。


配置及方法学


在四个节点 (单台配置:Huawei FusionServer RH2288 , 40 核和384 GB 内存) 的集群用TPC-DS来测试Apache Spark 2.2查询性能。在四个节点的集群运行测试查询性能的语句并设比例因子为1000(大概1TB数据)。收集全部24张表(总共245列)的统计信息大概要14分钟。

在校验端到端的结果前,我们先看一条查询语句TPC-DS(Q25; 如下所示)来更好了解基于成本的join排序带来的威力。这句查询语句包括三张事实表: store_sales (29 亿行纪录), store_returns (2.88 亿行纪录) 和catalog_sales (14.4 亿行纪录). 同时也包括三张维度表: date_dim(7.3万行纪录), store (1K 行纪录) 和 item (300K 行纪录).

SELECT i_item_id, i_item_desc, s_store_id, s_store_name, sum(ss_net_profit) AS store_sales_profit, sum(sr_net_loss) AS store_returns_loss, sum(cs_net_profit) AS catalog_sales_profitFROM store_sales, store_returns, catalog_sales, date_dim d1, date_dim d2, date_dim d3, store, itemWHERE d1.d_moy = 4   AND d1.d_year = 2001   AND d1.d_date_sk = ss_sold_date_sk   AND i_item_sk = ss_item_sk   AND s_store_sk = ss_store_sk   AND ss_customer_sk = sr_customer_sk   AND ss_item_sk = sr_item_sk   AND ss_ticket_number = sr_ticket_number   AND sr_returned_date_sk = d2.d_date_sk   AND d2.d_moy BETWEEN 4 AND 10   AND d2.d_year = 2001   AND sr_customer_sk = cs_bill_customer_sk   AND sr_item_sk = cs_item_sk   AND cs_sold_date_sk = d3.d_date_sk   AND d3.d_moy BETWEEN 4 AND 10   AND d3.d_year = 2001GROUP BY i_item_id, i_item_desc, s_store_id, s_store_nameORDER BY i_item_id, i_item_desc, s_store_id, s_store_nameLIMIT 100


没使用CBO的Q25


我们先看下没使用基于成本优化的Q25的join树(如下)。一般这种树也叫做左线性树。这里, join #1 和 #2 是大的事实表与事实表join,join了3张事实表store_sales, store_returns, 和catalog_sales,并产生大的中间结果表。这两个join都以shuffle join的方式执行并会产生大的输出,其中join #1输出了1.99亿行纪录。总之,关闭CBO,查询花费了241秒。

31db29842973de9b29cc7430f7594411.png


使用了CBO的Q25


另一方面,用了CBO,Spark创建了优化方案可以减小中间结果(如下)。在该案例中,Spark创建了浓密树而不是左-深度树。在CBO规则下,Spark 先join 的是事实表对应的维度表 (在尝试直接join事实表前)。避免大表join意味着避免了大开销的shuffle。在这次查询中,中间结果大小缩小到原来的1/6(相比之前)。最后,Q25只花了71秒,性能提升了3.4倍。

77354d2654b893f043e2276a683a145e.jpg


TPC-DS 查询性能


现在我们对性能提升的原因有了直观感受,我们再看下端到端的TPC-DS查询结果。下表展示了使用CBO或没使用CBO下所有TPC-DS查询花费的:

0afe3828e83e5e781499d9dc40191a7a.png

首先,要注意的是一半TPC-DS性能查询没有性能的改变。这是因为使用或没使用CBO的查询计划没有不同 (例如,即使没有CBO,  Spark’s Catalyst 优化器的柱状图也可以优化这些查询。剩下的查询性能都有提升,最有意思的其中16个查询,CBO对查询计划进行巨大改变并带来了超过30%的性能提升(如下)总的来说,我们观察的图标说明16个查询大概加速了2.2倍,其中Q72 加速最大,达到了8倍。

29841c581f7862c2435c0b2457573829.png


结论


回顾前文,该博客展示了Apache Spark 2.2新的CBO不同的高光层面的。我们讨论了统计信息收集框架的细节、过滤和join时的基传播、CBO开启(选择构建方和多路重排序)以及TPC-DS查询性能的提升。

去年,我们针对CBO umbrella JIRA SPARK-16026总共处理了32个子任务,涉及到50多个补丁和7000多行代码。也就是说,在分布式数据库 均衡CBO是非常困难的而这也是向这个方向迈出的一小步。在以后的版本中,我们计划继续往这个方向做下去,继续加入更复杂的统计信息(直方图、总记录数-最小粗略估计、统计信息分区程度,等等)并改进我们的公式。

我们对已经取得的进展感到十分兴奋并希望你们喜欢这些改进。我们希望你们能在Apache Spark 2.2中尝试新的CBO!


延伸阅读


可以查看Spark 2017(峰会) 演讲: Cost Based Optimizer in Spark 2.2

  • 原理就是较小的关系更容易放到内存
  • <=> 表示‘安全的空值相等’ ,如果两边的结果都是null就返回true,如果只有一边是null就返回false
  • P. Griffiths Selinger, M. M. Astrahan, D. D. Chamberlin, R. A. Lorie, T. G. Price, “Access Path Selection in a Relational Database Management System”, Proceedings of ACM SIGMOD conference, 1979
  • weight(权值)是调优参数,可以通过配置 spark.sql.cbo.joinReorder.card.weight (默认是0.7)


目录
相关文章
|
1月前
|
存储 自然语言处理 BI
|
22天前
|
分布式计算 大数据 Apache
利用.NET进行大数据处理:Apache Spark与.NET for Apache Spark
【10月更文挑战第15天】随着大数据成为企业决策和技术创新的关键驱动力,Apache Spark作为高效的大数据处理引擎,广受青睐。然而,.NET开发者面临使用Spark的门槛。本文介绍.NET for Apache Spark,展示如何通过C#和F#等.NET语言,结合Spark的强大功能进行大数据处理,简化开发流程并提升效率。示例代码演示了读取CSV文件及统计分析的基本操作,突显了.NET for Apache Spark的易用性和强大功能。
32 1
|
3月前
|
存储 消息中间件 运维
招联金融基于 Apache Doris 数仓升级:单集群 QPS 超 10w,存储成本降低 70%
招联内部已有 40+ 个项目使用 Apache Doris ,拥有超百台集群节点,个别集群峰值 QPS 可达 10w+ 。通过应用 Doris ,招联金融在多场景中均有显著的收益,比如标签关联计算效率相较之前有 6 倍的提升,同等规模数据存储成本节省超 2/3,真正实现了降本提效。
招联金融基于 Apache Doris 数仓升级:单集群 QPS 超 10w,存储成本降低 70%
|
4月前
|
分布式计算 大数据 Spark
Spark大数据处理:技术、应用与性能优化(全)PDF书籍推荐分享
《Spark大数据处理:技术、应用与性能优化》深入浅出介绍Spark核心,涵盖部署、实战与性能调优,适合初学者。作者基于微软和IBM经验,解析Spark工作机制,探讨BDAS生态,提供实践案例,助力快速掌握。书中亦讨论性能优化策略。[PDF下载链接](https://zhangfeidezhu.com/?p=347)。![Spark Web UI](https://img-blog.csdnimg.cn/direct/16aaadbb4e13410f8cb2727c3786cc9e.png#pic_center)
147 1
Spark大数据处理:技术、应用与性能优化(全)PDF书籍推荐分享
|
3月前
|
分布式计算 Hadoop 大数据
大数据处理框架在零售业的应用:Apache Hadoop与Apache Spark
【8月更文挑战第20天】Apache Hadoop和Apache Spark为处理海量零售户数据提供了强大的支持
62 0
|
3月前
|
分布式计算 Serverless 数据处理
EMR Serverless Spark 实践教程 | 通过 Apache Airflow 使用 Livy Operator 提交任务
Apache Airflow 是一个强大的工作流程自动化和调度工具,它允许开发者编排、计划和监控数据管道的执行。EMR Serverless Spark 为处理大规模数据处理任务提供了一个无服务器计算环境。本文为您介绍如何通过 Apache Airflow 的 Livy Operator 实现自动化地向 EMR Serverless Spark 提交任务,以实现任务调度和执行的自动化,帮助您更有效地管理数据处理任务。
196 0
|
4月前
|
分布式计算 Apache Spark
|
5月前
|
分布式计算 大数据 数据处理
Apache Spark在大数据处理中的应用
Apache Spark是大数据处理的热门工具,由AMPLab开发并捐赠给Apache软件基金会。它以内存计算和优化的执行引擎著称,提供比Hadoop更快的处理速度,支持批处理、交互式查询、流处理和机器学习。Spark架构包括Driver、Master、Worker Node和Executor,核心组件有RDD、DataFrame、Dataset、Spark SQL、Spark Streaming、MLlib和GraphX。文章通过代码示例展示了Spark在批处理、交互式查询和实时数据处理中的应用,并讨论了其优势(高性能、易用性、通用性和集成性)和挑战。【6月更文挑战第11天】
130 6
|
5月前
|
消息中间件 分布式计算 关系型数据库
使用Apache Spark从MySQL到Kafka再到HDFS的数据转移
使用Apache Spark从MySQL到Kafka再到HDFS的数据转移
|
6月前
|
达摩院 开发者 容器
「达摩院MindOpt」优化形状切割问题(MILP)
在制造业,高效地利用材料不仅是节约成本的重要环节,也是可持续发展的关键因素。无论是在金属加工、家具制造还是纺织品生产中,原材料的有效利用都直接影响了整体效率和环境影响。
「达摩院MindOpt」优化形状切割问题(MILP)

推荐镜像

更多
下一篇
无影云桌面