点一下关注吧!!!非常感谢!!持续更新!!!
目前已经更新到了:
Hadoop(已更完)
HDFS(已更完)
MapReduce(已更完)
Hive(已更完)
Flume(已更完)
Sqoop(已更完)
Zookeeper(已更完)
HBase(已更完)
Redis (已更完)
Kafka(已更完)
Spark(正在更新!)
章节内容
上节我们完成了如下的内容:
SparkSQL 语句 编码 测试 结果
输入输出
数据源包含如Parquet、JSON、CSV、Avro、ORC、Hive、JDBC、ODBC
TextFile
SparkSQL中的Join
数据分析中将两个数据集进行Join操作是很常见的场景。在Spark的物理计划阶段,Spark的Join Selection类会根据Join Hints 策略,Join表的大小、Join是等值Join还是不等值以及参与Join的Key是否可以排序等条件来选择最终的Join策略,最后Spark会利用选择好的Join策略执行最终的计算。
当前Spark一共支持五种Join策略:
Broadcast hash join (BHJ)
Shuffle hash join (SHJ)
Shuffle sort merge join(SMJ)
Shuffle-and-replicate nested loop join,又叫笛卡尔积(Cartesian product join)
Broadcast nested loop join(BNLJ)
其中 BHJ 和 SMJ 这两种 Join 策略是我们运行 Spark 任务最常见的。
JoinSelection 会先根据 Join 的 Key 为等值Join来选择 Broadcast hash join、Shuffle hash join、Shuffle sort merge join中的一个。
如果Join的Key为不等值Join或者没有指定Join条件,则会选择Broadcast nested loop join 或 Shuffle-and-replicate nested loop join。
不同的Join策略在执行效率上差别很大,了解每种Join策略的执行过程和适用条件是很有必要的。
Broadcast Hash Join
Broadcast Hash Join 的实现是将小表的数据广播到Spark所有的Executor端,这个广播过程和我们自己去广播数据没有什么区别:
利用 Collect 算子将小表的数据从Executor端拉到Driver端
在Driver端调用sparkContext.broadcast广播到所有Executor端
在Executor端使用广播的数据与大表进行Join操作(实际上执行Map操作)
这种Join策略避免了Shuffle操作,一般而言,Broadcast Hash Join会比其他Join策略执行的要快。
使用这种 Join 策略必须满足如下的条件:
小表的数据必须很小,可以通过 spark.sql.autoBroadcasetJoinThreshold 参数来配置,默认是10MB
如果内存比较大,可以将阈值适当加大
将 spark.sql.autoBroadcastJoinThreshold 参数设置为-1,可以关闭这种连接方式
只能用于等值Join,不要求参与Join的keys可排序
Shuffle Hash Join
当表中的数据比较大,又不适合使用广播,这个时候就可以考虑 Shuffle Hash Join。
Shuffle Hash Join 同样是在大表和小表进行Join的时候选择了一种策略。
它的计算思想是:把大表和小表按照相同的分区算法和分区数据进行分区(根据参与Join的Keys进行分区),这样保证了 Hash 值一样的数据都分发到同一个分区中,然后在同一个 Executor 中两张表 Hash 值一样的分区就可以在本地进行Hash Join了。在进行 Join 之前,还会对小表的分区构建 Hash Map,Shuffle Hash Join 利用了分治思想,把大问题拆解成小问题去解决。
要启动 Shuffle Hash Join 必须满足以下条件:
仅支持等值 Join,不要求参与Join的Keys可排序
spark.sql.join.perferSortMergeJoin 参数必须设置值为 false,参数从Spark2.0版本引入,默认值是true,也就是默认情况下是 Sort Merge Join
小表的大小(plan.stats.sizeInBytes)必须小于(spark.sql.autoBroadcastJoinThreshold * spark.sql.shuffle.partitions(默认200))
而且小表大小(stats.sizeInBytes)的三倍必须小于等于大表的大小(stats.sizeInBytes),也就是(a.stats.sizeInBytes * 3 < b.stats.sizeInBytes)
Shuffle Sort Merge Join
前面两种Join策略对表的大小都有条件的,如果参与Join的表都很大,这时候就得考虑用 Shuffle Sort Merge Join了。
Shuffle Sort Merge Join 的实现事项:
将两张表按照 Join Key进行Shuffle,保证 Join Key值相同的记录会被分在相应的分区
对每个分区内的数据进行排序
排序后再对相应的分区内的记录进行连接
无论分区多大,Sort Merge Join都不用把一侧的数据全部加载到内存中,而是即用即丢。
因为两个序列都有序,从头遍历,碰到Key相同的就输出,如果不同,左边小取左边,反之就取右边。
这样大大提高了大数据量下的SQL Join的稳定性。
要启用Shuffle Sort Merge Join必须满足以下条件:
仅支持等值 Join,并且要求参与 Join 的 Keys 可排序
Cartesian Product Join
如果Spark中两张参与Join的表没有指定连接条件,那么产生Cartesian Product Join,这个Join得到的结果其实就是两张表行数的乘积。
Broadcast Nested Loop Join
可以把 Broadcast Nested Loop Join的执行看做下面的计算:
for record_1 in relation_1: for record_2 in relation_2: # join condition is executed
可以看出 Broadcast Nested Loop Join 在某些情况会对某张表重复扫描多次,效率非常低。从名字可以看出,这种Join会根据相关条件对小表进行广播,以减少表的扫描次数。
Broadcast Nested Loop Join支持等值和不等值Join,支持所有的Join类型。
SQL解析过程
基本概念
SparkSQL 可以说Spark中的精华部分,原来基于RDD构建大数据计算任务,重新在向Dataset转移,原来基于 RDD 写的代码也在迁移。
使用 SparkSQL 编码的好处是非常大的,尤其是性能方面,有很大提升。SparkSQL 中各种内嵌的性能优化比写RDD遵循各种最佳实践更加靠谱。
尤其对于新手来说,比如先 Filter 再 Map,SparkSQL中会自动进行谓词下推,Spark SQL中会自动使用 Broadcast Join来广播小表,把 Shuffle Join转换为 Map Join等等。
SparkSQL对SQL语句的处理和关系型数据库类似,即词法/语法解析、绑定、优化、执行。SparkSQL会先将SQL语句解析成一棵树,然后使用规则(Rule)对Tree进行绑定、优化等处理过程。
SparkSQL由:Core、Catalyst、Hive、Hive-ThriftServer四部分构成:
Core:负责处理数据的输入和输出,如获取数据,查询结果输出成DataFrame等
Catalyst:负责处理整个查询过程,包括解析、绑定、优化等。
Hive:负责对Hive数据进行处理
Hive-ThriftServer:主要用于对Hive的访问
SparkSQL的代码复杂度是问题的本质复杂度带来说,SparkSQL中的Catalyst框架大部分逻辑是在一个Tree类型的数据结构上做各种折腾,基于Scala来实现还是很优雅的,Scala的偏函数和强大的Case正则匹配,让整个代码看起来非常优雅。
SparkSession是编写Spark应用代码的入口,启动一个spark-shell会提供给你创建spark-session,这个对象是整个Spark应用的起始点,以下是SparkSession的一些重要的变量和方法:
编写代码
package icu.wzk import org.apache.spark.sql.{DataFrame, SparkSession} object TestDemo01 { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName("TestDemo01") .master("local[*]") .getOrCreate() spark.sparkContext.setLogLevel("warn") import spark.implicits._ Seq((0, "zhansan", 10), (1, "lisi", 11), (2, "wangwu", 12)).toDF("id", "name", "age") .createOrReplaceTempView("stu") Seq((0, "chinese", 80), (0, "math", 100), (0, "english", 98), (1, "chinese", 86), (1, "math", 97), (1, "english", 90), (2, "chinese", 90), (2, "math", 94), (2, "english", 88)).toDF("id", "subject", "score") .createOrReplaceTempView("score") val df: DataFrame = spark.sql( """ |SELECT SUM(v) AS total_score, name |FROM ( | SELECT stu.id, 100 + 10 + score.score AS v, name | FROM stu | JOIN score ON stu.id = score.id | WHERE stu.age >= 11 |) tmp |GROUP BY name |""".stripMargin) df.show() // 打印执行计划 println(df.queryExecution) println(df.queryExecution.optimizedPlan) spark.close() } }
运行输出
执行代码可见控制台输出如下数据(我就不往服务器发了):
控制台的内容如下图所示:
+-----------+------+ |total_score| name| +-----------+------+ | 602|wangwu| | 603| lisi| +-----------+------+ == Parsed Logical Plan == 'Aggregate ['name], ['SUM('v) AS total_score#27, 'name] +- 'SubqueryAlias `tmp` +- 'Project ['stu.id, ((100 + 10) + 'score.score) AS v#26, 'name] +- 'Filter ('stu.age >= 11) +- 'Join Inner, ('stu.id = 'score.id) :- 'UnresolvedRelation `stu` +- 'UnresolvedRelation `score` == Analyzed Logical Plan == total_score: bigint, name: string Aggregate [name#8], [sum(cast(v#26 as bigint)) AS total_score#27L, name#8] +- SubqueryAlias `tmp` +- Project [id#7, ((100 + 10) + score#22) AS v#26, name#8] +- Filter (age#9 >= 11) +- Join Inner, (id#7 = id#20) :- SubqueryAlias `stu` : +- Project [_1#3 AS id#7, _2#4 AS name#8, _3#5 AS age#9] : +- LocalRelation [_1#3, _2#4, _3#5] +- SubqueryAlias `score` +- Project [_1#16 AS id#20, _2#17 AS subject#21, _3#18 AS score#22] +- LocalRelation [_1#16, _2#17, _3#18] == Optimized Logical Plan == Aggregate [name#8], [sum(cast(v#26 as bigint)) AS total_score#27L, name#8] +- Project [(110 + score#22) AS v#26, name#8] +- Join Inner, (id#7 = id#20) :- LocalRelation [id#7, name#8] +- LocalRelation [id#20, score#22] == Physical Plan == *(2) HashAggregate(keys=[name#8], functions=[sum(cast(v#26 as bigint))], output=[total_score#27L, name#8]) +- Exchange hashpartitioning(name#8, 200) +- *(1) HashAggregate(keys=[name#8], functions=[partial_sum(cast(v#26 as bigint))], output=[name#8, sum#38L]) +- *(1) Project [(110 + score#22) AS v#26, name#8] +- *(1) BroadcastHashJoin [id#7], [id#20], Inner, BuildLeft :- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))) : +- LocalTableScan [id#7, name#8] +- LocalTableScan [id#20, score#22] Aggregate [name#8], [sum(cast(v#26 as bigint)) AS total_score#27L, name#8] +- Project [(110 + score#22) AS v#26, name#8] +- Join Inner, (id#7 = id#20) :- LocalRelation [id#7, name#8] +- LocalRelation [id#20, score#22]