大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: 大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

Hadoop(已更完)

HDFS(已更完)

MapReduce(已更完)

Hive(已更完)

Flume(已更完)

Sqoop(已更完)

Zookeeper(已更完)

HBase(已更完)

Redis (已更完)

Kafka(已更完)

Spark(正在更新!)

章节内容

上节我们完成了如下的内容:


SparkSQL 语句 编码 测试 结果

输入输出

数据源包含如Parquet、JSON、CSV、Avro、ORC、Hive、JDBC、ODBC

TextFile

978d49607804daac9defe320cc3c3326_43c0253ba04140ff8e645a5b62940261.png

SparkSQL中的Join

数据分析中将两个数据集进行Join操作是很常见的场景。在Spark的物理计划阶段,Spark的Join Selection类会根据Join Hints 策略,Join表的大小、Join是等值Join还是不等值以及参与Join的Key是否可以排序等条件来选择最终的Join策略,最后Spark会利用选择好的Join策略执行最终的计算。


当前Spark一共支持五种Join策略:


Broadcast hash join (BHJ)

Shuffle hash join (SHJ)

Shuffle sort merge join(SMJ)

Shuffle-and-replicate nested loop join,又叫笛卡尔积(Cartesian product join)

Broadcast nested loop join(BNLJ)

其中 BHJ 和 SMJ 这两种 Join 策略是我们运行 Spark 任务最常见的。

JoinSelection 会先根据 Join 的 Key 为等值Join来选择 Broadcast hash join、Shuffle hash join、Shuffle sort merge join中的一个。

如果Join的Key为不等值Join或者没有指定Join条件,则会选择Broadcast nested loop join 或 Shuffle-and-replicate nested loop join。

不同的Join策略在执行效率上差别很大,了解每种Join策略的执行过程和适用条件是很有必要的。


Broadcast Hash Join

Broadcast Hash Join 的实现是将小表的数据广播到Spark所有的Executor端,这个广播过程和我们自己去广播数据没有什么区别:


利用 Collect 算子将小表的数据从Executor端拉到Driver端

在Driver端调用sparkContext.broadcast广播到所有Executor端

在Executor端使用广播的数据与大表进行Join操作(实际上执行Map操作)

这种Join策略避免了Shuffle操作,一般而言,Broadcast Hash Join会比其他Join策略执行的要快。

02da68259927c633ffb15241499b113c_6e156d7a15b740519b9c102b99b86f2f.png 使用这种 Join 策略必须满足如下的条件:


小表的数据必须很小,可以通过 spark.sql.autoBroadcasetJoinThreshold 参数来配置,默认是10MB

如果内存比较大,可以将阈值适当加大

将 spark.sql.autoBroadcastJoinThreshold 参数设置为-1,可以关闭这种连接方式

只能用于等值Join,不要求参与Join的keys可排序

Shuffle Hash Join

当表中的数据比较大,又不适合使用广播,这个时候就可以考虑 Shuffle Hash Join。

Shuffle Hash Join 同样是在大表和小表进行Join的时候选择了一种策略。

它的计算思想是:把大表和小表按照相同的分区算法和分区数据进行分区(根据参与Join的Keys进行分区),这样保证了 Hash 值一样的数据都分发到同一个分区中,然后在同一个 Executor 中两张表 Hash 值一样的分区就可以在本地进行Hash Join了。在进行 Join 之前,还会对小表的分区构建 Hash Map,Shuffle Hash Join 利用了分治思想,把大问题拆解成小问题去解决。

79ee99e3f01c36be44f5b1940954a20f_44cd0864bf94472d9fe57382ab0df871.png 要启动 Shuffle Hash Join 必须满足以下条件:


仅支持等值 Join,不要求参与Join的Keys可排序

spark.sql.join.perferSortMergeJoin 参数必须设置值为 false,参数从Spark2.0版本引入,默认值是true,也就是默认情况下是 Sort Merge Join

小表的大小(plan.stats.sizeInBytes)必须小于(spark.sql.autoBroadcastJoinThreshold * spark.sql.shuffle.partitions(默认200))

而且小表大小(stats.sizeInBytes)的三倍必须小于等于大表的大小(stats.sizeInBytes),也就是(a.stats.sizeInBytes * 3 < b.stats.sizeInBytes)

Shuffle Sort Merge Join

前面两种Join策略对表的大小都有条件的,如果参与Join的表都很大,这时候就得考虑用 Shuffle Sort Merge Join了。

Shuffle Sort Merge Join 的实现事项:


将两张表按照 Join Key进行Shuffle,保证 Join Key值相同的记录会被分在相应的分区

对每个分区内的数据进行排序

排序后再对相应的分区内的记录进行连接

无论分区多大,Sort Merge Join都不用把一侧的数据全部加载到内存中,而是即用即丢。

因为两个序列都有序,从头遍历,碰到Key相同的就输出,如果不同,左边小取左边,反之就取右边。

这样大大提高了大数据量下的SQL Join的稳定性。

125171248c47af75b36b26074e00b4f5_b5998661fbe04aeea22b0b4187e02bff.png 要启用Shuffle Sort Merge Join必须满足以下条件:


仅支持等值 Join,并且要求参与 Join 的 Keys 可排序

Cartesian Product Join

如果Spark中两张参与Join的表没有指定连接条件,那么产生Cartesian Product Join,这个Join得到的结果其实就是两张表行数的乘积。


Broadcast Nested Loop Join

可以把 Broadcast Nested Loop Join的执行看做下面的计算:

for record_1 in relation_1:
  for record_2 in relation_2:
    # join condition is executed

可以看出 Broadcast Nested Loop Join 在某些情况会对某张表重复扫描多次,效率非常低。从名字可以看出,这种Join会根据相关条件对小表进行广播,以减少表的扫描次数。

Broadcast Nested Loop Join支持等值和不等值Join,支持所有的Join类型。


SQL解析过程

基本概念

SparkSQL 可以说Spark中的精华部分,原来基于RDD构建大数据计算任务,重新在向Dataset转移,原来基于 RDD 写的代码也在迁移。

使用 SparkSQL 编码的好处是非常大的,尤其是性能方面,有很大提升。SparkSQL 中各种内嵌的性能优化比写RDD遵循各种最佳实践更加靠谱。

尤其对于新手来说,比如先 Filter 再 Map,SparkSQL中会自动进行谓词下推,Spark SQL中会自动使用 Broadcast Join来广播小表,把 Shuffle Join转换为 Map Join等等。


SparkSQL对SQL语句的处理和关系型数据库类似,即词法/语法解析、绑定、优化、执行。SparkSQL会先将SQL语句解析成一棵树,然后使用规则(Rule)对Tree进行绑定、优化等处理过程。

SparkSQL由:Core、Catalyst、Hive、Hive-ThriftServer四部分构成:


Core:负责处理数据的输入和输出,如获取数据,查询结果输出成DataFrame等

Catalyst:负责处理整个查询过程,包括解析、绑定、优化等。

Hive:负责对Hive数据进行处理

Hive-ThriftServer:主要用于对Hive的访问

e862dc4f03d261cb0084154b3406ebf0_7306124de3fa46f5820bc1fd7a69912b.png

SparkSQL的代码复杂度是问题的本质复杂度带来说,SparkSQL中的Catalyst框架大部分逻辑是在一个Tree类型的数据结构上做各种折腾,基于Scala来实现还是很优雅的,Scala的偏函数和强大的Case正则匹配,让整个代码看起来非常优雅。


SparkSession是编写Spark应用代码的入口,启动一个spark-shell会提供给你创建spark-session,这个对象是整个Spark应用的起始点,以下是SparkSession的一些重要的变量和方法:

65f1a71e706b4a4582ffc9f87e75e963_50565cc8c4dd4670be3f2ff7d09a7792.png

编写代码

package icu.wzk
import org.apache.spark.sql.{DataFrame, SparkSession}



object TestDemo01 {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .appName("TestDemo01")
      .master("local[*]")
      .getOrCreate()

    spark.sparkContext.setLogLevel("warn")
    import spark.implicits._

    Seq((0, "zhansan", 10),
      (1, "lisi", 11),
      (2, "wangwu", 12)).toDF("id", "name", "age")
      .createOrReplaceTempView("stu")

    Seq((0, "chinese", 80),
      (0, "math", 100),
      (0, "english", 98),
      (1, "chinese", 86),
      (1, "math", 97),
      (1, "english", 90),
      (2, "chinese", 90),
      (2, "math", 94),
      (2, "english", 88)).toDF("id", "subject", "score")
      .createOrReplaceTempView("score")

    val df: DataFrame = spark.sql(
      """
        |SELECT SUM(v) AS total_score, name
        |FROM (
        |  SELECT stu.id, 100 + 10 + score.score AS v, name
        |  FROM stu
        |  JOIN score ON stu.id = score.id
        |  WHERE stu.age >= 11
        |) tmp
        |GROUP BY name
        |""".stripMargin)

    df.show()

    // 打印执行计划
    println(df.queryExecution)
    println(df.queryExecution.optimizedPlan)

    spark.close()
  }

}

运行输出

执行代码可见控制台输出如下数据(我就不往服务器发了):

控制台的内容如下图所示:

+-----------+------+
|total_score|  name|
+-----------+------+
|        602|wangwu|
|        603|  lisi|
+-----------+------+

== Parsed Logical Plan ==
'Aggregate ['name], ['SUM('v) AS total_score#27, 'name]
+- 'SubqueryAlias `tmp`
   +- 'Project ['stu.id, ((100 + 10) + 'score.score) AS v#26, 'name]
      +- 'Filter ('stu.age >= 11)
         +- 'Join Inner, ('stu.id = 'score.id)
            :- 'UnresolvedRelation `stu`
            +- 'UnresolvedRelation `score`

== Analyzed Logical Plan ==
total_score: bigint, name: string
Aggregate [name#8], [sum(cast(v#26 as bigint)) AS total_score#27L, name#8]
+- SubqueryAlias `tmp`
   +- Project [id#7, ((100 + 10) + score#22) AS v#26, name#8]
      +- Filter (age#9 >= 11)
         +- Join Inner, (id#7 = id#20)
            :- SubqueryAlias `stu`
            :  +- Project [_1#3 AS id#7, _2#4 AS name#8, _3#5 AS age#9]
            :     +- LocalRelation [_1#3, _2#4, _3#5]
            +- SubqueryAlias `score`
               +- Project [_1#16 AS id#20, _2#17 AS subject#21, _3#18 AS score#22]
                  +- LocalRelation [_1#16, _2#17, _3#18]

== Optimized Logical Plan ==
Aggregate [name#8], [sum(cast(v#26 as bigint)) AS total_score#27L, name#8]
+- Project [(110 + score#22) AS v#26, name#8]
   +- Join Inner, (id#7 = id#20)
      :- LocalRelation [id#7, name#8]
      +- LocalRelation [id#20, score#22]

== Physical Plan ==
*(2) HashAggregate(keys=[name#8], functions=[sum(cast(v#26 as bigint))], output=[total_score#27L, name#8])
+- Exchange hashpartitioning(name#8, 200)
   +- *(1) HashAggregate(keys=[name#8], functions=[partial_sum(cast(v#26 as bigint))], output=[name#8, sum#38L])
      +- *(1) Project [(110 + score#22) AS v#26, name#8]
         +- *(1) BroadcastHashJoin [id#7], [id#20], Inner, BuildLeft
            :- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
            :  +- LocalTableScan [id#7, name#8]
            +- LocalTableScan [id#20, score#22]
Aggregate [name#8], [sum(cast(v#26 as bigint)) AS total_score#27L, name#8]
+- Project [(110 + score#22) AS v#26, name#8]
   +- Join Inner, (id#7 = id#20)
      :- LocalRelation [id#7, name#8]
      +- LocalRelation [id#20, score#22]

接下篇:https://developer.aliyun.com/article/1622630

目录
相关文章
|
11天前
|
存储 SpringCloudAlibaba Java
【SpringCloud Alibaba系列】一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论
一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论。
【SpringCloud Alibaba系列】一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论
zdl
|
2月前
|
消息中间件 运维 大数据
大数据实时计算产品的对比测评:实时计算Flink版 VS 自建Flink集群
本文介绍了实时计算Flink版与自建Flink集群的对比,涵盖部署成本、性能表现、易用性和企业级能力等方面。实时计算Flink版作为全托管服务,显著降低了运维成本,提供了强大的集成能力和弹性扩展,特别适合中小型团队和业务波动大的场景。文中还提出了改进建议,并探讨了与其他产品的联动可能性。总结指出,实时计算Flink版在简化运维、降低成本和提升易用性方面表现出色,是大数据实时计算的优选方案。
zdl
170 56
|
1月前
|
SQL 存储 关系型数据库
MySQL进阶突击系列(01)一条简单SQL搞懂MySQL架构原理 | 含实用命令参数集
本文从MySQL的架构原理出发,详细介绍其SQL查询的全过程,涵盖客户端发起SQL查询、服务端SQL接口、解析器、优化器、存储引擎及日志数据等内容。同时提供了MySQL常用的管理命令参数集,帮助读者深入了解MySQL的技术细节和优化方法。
|
2月前
|
SQL 算法 大数据
为什么大数据平台会回归SQL
在大数据领域,尽管非结构化数据占据了大数据平台80%以上的存储空间,结构化数据分析依然是核心任务。SQL因其广泛的应用基础和易于上手的特点成为大数据处理的主要语言,各大厂商纷纷支持SQL以提高市场竞争力。然而,SQL在处理复杂计算时表现出的性能和开发效率低下问题日益凸显,如难以充分利用现代硬件能力、复杂SQL优化困难等。为了解决这些问题,出现了像SPL这样的开源计算引擎,它通过提供更高效的开发体验和计算性能,以及对多种数据源的支持,为大数据处理带来了新的解决方案。
|
15天前
|
存储 负载均衡 监控
揭秘 Elasticsearch 集群架构,解锁大数据处理神器
Elasticsearch 是一个强大的分布式搜索和分析引擎,广泛应用于大数据处理、实时搜索和分析。本文深入探讨了 Elasticsearch 集群的架构和特性,包括高可用性和负载均衡,以及主节点、数据节点、协调节点和 Ingest 节点的角色和功能。
37 0
|
2月前
|
SQL 存储 大数据
单机顶集群的大数据技术来了
大数据时代,分布式数仓如MPP成为热门技术,但其高昂的成本让人望而却步。对于多数任务,数据量并未达到PB级,单体数据库即可胜任。然而,由于SQL语法的局限性和计算任务的复杂性,分布式解决方案显得更为必要。esProc SPL作为一种开源轻量级计算引擎,通过高效的算法和存储机制,实现了单机性能超越集群的效果,为低成本、高效能的数据处理提供了新选择。
|
2月前
|
SQL 存储 算法
比 SQL 快出数量级的大数据计算技术
SQL 是大数据计算中最常用的工具,但在实际应用中,SQL 经常跑得很慢,浪费大量硬件资源。例如,某银行的反洗钱计算在 11 节点的 Vertica 集群上跑了 1.5 小时,而用 SPL 重写后,单机只需 26 秒。类似地,电商漏斗运算和时空碰撞任务在使用 SPL 后,性能也大幅提升。这是因为 SQL 无法写出低复杂度的算法,而 SPL 提供了更强大的数据类型和基础运算,能够实现高效计算。
|
3月前
|
存储 分布式计算 druid
大数据-152 Apache Druid 集群模式 配置启动【下篇】 超详细!(一)
大数据-152 Apache Druid 集群模式 配置启动【下篇】 超详细!(一)
51 1
大数据-152 Apache Druid 集群模式 配置启动【下篇】 超详细!(一)
|
3月前
|
存储 缓存 监控
深入解析:Elasticsearch集群性能调优策略与最佳实践
【10月更文挑战第8天】Elasticsearch 是一个分布式的、基于 RESTful 风格的搜索和数据分析引擎,它能够快速地存储、搜索和分析大量数据。随着企业对实时数据处理需求的增长,Elasticsearch 被广泛应用于日志分析、全文搜索、安全信息和事件管理(SIEM)等领域。然而,为了确保 Elasticsearch 集群能够高效运行并满足业务需求,需要进行一系列的性能调优工作。
190 3
|
3月前
|
分布式计算 大数据 分布式数据库
大数据-158 Apache Kylin 安装配置详解 集群模式启动(一)
大数据-158 Apache Kylin 安装配置详解 集群模式启动(一)
63 5

推荐镜像

更多