Spark SQL中掌控sql语句的执行 - 了解你的查询计划

简介: Spark SQL中掌控sql语句的执行 - 了解你的查询计划

背景


spark 2.x 的sql以及申明行DataFrame APi以来,在spark查询数据越来越方便。仅仅用几行代码就能表达出复杂的查询逻辑以及实现复杂的操作。 这个api最大的优势在于用户不需要考虑太多的执行情况,自动有优化器优化出最有效率的执行方式去执行此次查询。而且有效的查询语句执行不仅是因为能够节约资源,而且能够减少终端用户等待结果的时间。

Spark SQL 优化器实际上是很成熟的,尤其是随着3.0的到来,该版本会引入一些新特性,比如动态分支裁剪以及动态查询执行。 优化器是工作在查询计划内部的并且能够应用各种规则去优化查询计划。 例如能够改变transformation的执行顺序或者对于不影响最终结果的直接丢弃。虽然有很多优秀的优化,但是有些场景人是能够做的更好的。在这篇文章里,我们就来看一下特例,并且使用一些技巧来更好的执行查询计划。


例子


首先让我们来引入一个例子。加入我们有下列json格式的数据:

{"id": 1, "user_id": 100, "price": 50}
{"id": 2, "user_id": 100, "price": 200}
{"id": 3, "user_id": 101, "price": 120}
{"id": 4, "price": 120}

每一个记录就像一个事务,而user_id这一列可能包含了很多重复的值(也可能包含null),除此之外还有其他的列来描述这个事务。 现在我们的查询是基于两个聚合的union操作,两个聚合的不同仅仅在于过滤条件的不同。在第一个聚合中我们想要获取价格总和小于50的用户,第二个聚合中我们想要获取价格综合大于100的用户,而且在第二个聚合中我们只考虑user_id不为null的。这个例子只是复杂例子的简化版本,但是这种复杂的例子是实际存在的。 以下是使用PySpark DataFrame API去表达我们想要的查询:

df = spark.read.json(data_path)
df_small = (
df
.groupBy("user_id")
.agg(sum("price").alias("price"))
.filter(col("price") < 50)
)
df_big = (
df
.filter(col("user_id").isNotNull())
.groupBy("user_id")
.agg(sum("price").alias("price"))
.filter(col("price") > 100)  
)
result = df_small.union(df_big)

计划的解释翻译


对于优化查询性能的关键点在于能够去理解并解释翻译查询计划。计划的本身是能够通过Spark DataFrame explain函数展示出来的,或者如果计划已经是在运行了,我们可以通过Spark UI找到SQL这个tab,从而找到该计划。

image.png

这个SQL tab中有已经完成的和正在运行的查询列表,所以选中我们的查询就能看到物理计划的图形化展示(这里我们移除了指标信息,这样能够使图⌚️更加简单)

image.png

这个计划是树形结构,每个节点代表了一些操作,并且携带了一些执行信息。我们可以看到这个例子中我们有两个分支,和一个root分支在最底层,叶子在最顶层,也是执行开始的地方。scan json叶子节点代表从source中读取数据,然后这里有一对hashAggregate操作,代表着聚合。在这两个聚合操作之间有一个Exchange操作,代表着shuffle。filters操作携带着过滤条件信息。

这个计划是一个典型的union操作,每一个dataframe都有一个新的分支,而且因为我们的例子中DataFrame是基于同样的数据源,这就意味着该数据源被scan了两次。现在我们能明白这里是存在优化的空间的.让数据源只被scan一次是一个很好的优化,尤其是在IO代价非常大的情况下。

在这里我们想要实现的是重利用计算–scan数据和聚合的计算,因为在DataFrame上的操作是一样的,原则上计算一次就足够了。


Cache缓存


spark中一个典型的解决重新计算的方法是利用cache。在DataFrame中有一个cache函数:

df.cache()

这个是一个延迟转换,意味着只有在一些action触发后数据才会放到缓存层,在spark中Caching是一个很普通的操作,然而这是有限制的,特别是数据量很大和集群集资源非常紧张的情况下。而且我们必须意识到存储数据在缓冲层是需要额外的开销的,而且操作自身也是需要开销的。 在整个DataFrame df中调用cache操作并不能优化因为这个操作会缓存所有的列到存储中。一个更好的方法是只缓存选择被使用的字段。


重新使用Exchage


除了缓存,也还有另一种方法,这个方法不好用图形化描述,且基于重新利用Exchange。这个Exchange操作代表着用来集群之间移动数据的shuffle操作。shuffle操作一般在聚合,join,和一些转换操作中会用到。关于shuffle比较重要的事是spark总是会把shuffle 写的数据存储在磁盘,而且因为存储在磁盘,在必要的时候可以重新被使用。实际上spark在某个时机上会重新利用该数据。比如在spark发现从叶子节点到exchange节点的多个分支时重复的时候就会进行reuse操作[ReuseExchange规则],如果存在这种情况,说明我们这些重复的分支是有一样的计算,是可以重新被使用的。我们可以从计划中识别出来是否有这种场景,因为这些分支应该像以下这样:

image.png

在我们的例子中,spark并不会重新利用Exchange,但是可以利用一些技巧而从使它被重新利用。为什么在我们的例子中Exchange不能被重新利用的原因是右边的分支有着user_id不为null的条件。该过滤条件是union操作的两个分支的唯一不同点,如果我们能消除这个不同点,spark将会重新利用EXchange。


计划的改进

我们怎么样才能分支是一样的呢?假如说是这个filer操作导致的,那我们可以颠倒filter的顺序,在聚合之后再进行过滤操作,因为这个对结果没有影响。然而这有一个陷阱。假如我们如下这样修改:

df_big = (
 df.groupBy("user_id")
 .agg(sum("price").alias("price"))
 .filter(col("price") > 100)
 .filter(col("price").isNotNull())
)

再一次检查最终的查询计划,我们发现这个计划没有改变。解释很简单–这个filter操作被优化器移动了。

从概念上来讲,存在着两种计划 逻辑计划和物理计划,这个时很好理解的。并且逻辑计划在转换为物理计划前会经过一个优化阶段。当我们改变了一些转换以后,直接反应在逻辑计划中。优化器会应用一系列的优化规则,这些规则通常是基于推断的。在我们的例子中,这个规则是PushDownPredicate,该规则是确保filters操作尽量被移动到靠近数据源的位置。它来源于进行过滤操作再进行数据集的操作效率更高。这个规则在大部分场景是很有用的。 然而在这里却不适用我们的例子。

为了让filter在合适的位置,我们必须限制优化器。从spark 2.4以来我们可以通过配置项来让优化器排除某种规则:

spark.conf.set(
"spark.sql.optimizer.excludedRules",     "org.apache.spark.sql.catalyst.optimizer.PushDownPredicate")

设置了这个以后,再一次运行查询语句,我们能看到filters操作的位置就如我们想的一样。这两个分支是一样的了,spark将会重新利用Exchange,数据将会只会被扫描一次,聚合操作也只会计算一次。

在spark 3.0 情况有些不用,优化规则有不同的名字–PushDownPredicates,而且还有一个额外的规则用来下推filter-PushPredicateThroughNonJoin,所以实际上我们需要排除两个规则。


总结

我们看到通过这个,spark 开发者给了我们一种控制优化器的能力。但是也伴随着一种责任,我们列举了一下当使用这种技术的一些重点:

  • 当我们排除了PushDownPredicate,我们就得对这个查询中所有的filter负责,不仅仅是我们想要重新定位的filter。 这个还存在着另一种filter,这种filter很大概率出现的,例如分区filter,所以我们需要确保他们被放在合适的位置。
  • 限制了优化器,使用filter就是用户的工作了。在我们的例子中,加速查询是在IO比较昂贵的情况下,因为我们能实现数据只能被浏览一次,如果数据有很多列,这适用在文件格式不是列格式的青情况下,像json或者csv格式
  • 如果数据集很小,就不值得控制优化器了,反而cache能达到同样的效果。然而当数据集很大的时候,存储数据的额外开销就很明显了。从另一方面说,重新利用Exchange就没有额外的开销了,因为shuffle数据都存储在磁盘
  • 这个技术基于spark内部的行为,并没有官方文档,并且如果以后功能上有改动,很难去察觉。在我们的例子中,在spark 3.0中是有改动的,首先规则被重命名,并且加上了另一个规则


结论

我们知道如果要实现优化的前提是我们能够理解查询计划。spark的优化器通过一系列的推导规则能够很好的优化我们的查询。然而这里也有一些场景优化规则是不适用的。 有时候查询重写很好,有时候不好,因为重写查询将会实现不同的逻辑计划,并且我们不能直接控制被执行的物理计划。因为从spark 2.4以来,我们可以通过配置excludedRules来限制优化器,从未来定制了一些常规的物理计划。

在很多场景中,依赖于优化器我们可以得到固定的计划,并且有一个高效的执行。然而 这里有一些性能压力,这里我们可以检查最终的计划,并且查看是否可以通过限制优化器来进行优化。

相关文章
|
13天前
|
SQL 存储 人工智能
Vanna:开源 AI 检索生成框架,自动生成精确的 SQL 查询
Vanna 是一个开源的 Python RAG(Retrieval-Augmented Generation)框架,能够基于大型语言模型(LLMs)为数据库生成精确的 SQL 查询。Vanna 支持多种 LLMs、向量数据库和 SQL 数据库,提供高准确性查询,同时确保数据库内容安全私密,不外泄。
77 7
Vanna:开源 AI 检索生成框架,自动生成精确的 SQL 查询
|
20天前
|
SQL Java
使用java在未知表字段情况下通过sql查询信息
使用java在未知表字段情况下通过sql查询信息
34 8
|
18天前
|
SQL 存储 BI
gbase 8a 数据库 SQL合并类优化——不同数据统计周期合并为一条SQL语句
gbase 8a 数据库 SQL合并类优化——不同数据统计周期合并为一条SQL语句
|
26天前
|
SQL 安全 PHP
PHP开发中防止SQL注入的方法,包括使用参数化查询、对用户输入进行过滤和验证、使用安全的框架和库等,旨在帮助开发者有效应对SQL注入这一常见安全威胁,保障应用安全
本文深入探讨了PHP开发中防止SQL注入的方法,包括使用参数化查询、对用户输入进行过滤和验证、使用安全的框架和库等,旨在帮助开发者有效应对SQL注入这一常见安全威胁,保障应用安全。
46 4
|
29天前
|
SQL 监控 关系型数据库
SQL语句当前及历史信息查询-performance schema的使用
本文介绍了如何使用MySQL的Performance Schema来获取SQL语句的当前和历史执行信息。Performance Schema默认在MySQL 8.0中启用,可以通过查询相关表来获取详细的SQL执行信息,包括当前执行的SQL、历史执行记录和统计汇总信息,从而快速定位和解决性能瓶颈。
|
1月前
|
SQL 存储 缓存
如何优化SQL查询性能?
【10月更文挑战第28天】如何优化SQL查询性能?
107 10
|
1月前
|
SQL 关系型数据库 MySQL
|
2月前
|
SQL 数据库 开发者
功能发布-自定义SQL查询
本期主要为大家介绍ClkLog九月上线的新功能-自定义SQL查询。
|
2月前
|
SQL 移动开发 Oracle
SQL语句实现查询连续六天数据的方法与技巧
在数据库查询中,有时需要筛选出符合特定时间连续性条件的数据记录
|
1月前
|
SQL JSON 分布式计算
【赵渝强老师】Spark SQL的数据模型:DataFrame
本文介绍了在Spark SQL中创建DataFrame的三种方法。首先,通过定义case class来创建表结构,然后将CSV文件读入RDD并关联Schema生成DataFrame。其次,使用StructType定义表结构,同样将CSV文件读入RDD并转换为Row对象后创建DataFrame。最后,直接加载带有格式的数据文件(如JSON),通过读取文件内容直接创建DataFrame。每种方法都包含详细的代码示例和解释。