《Spark大数据分析实战》——3.1节SQL on Spark-阿里云开发者社区

开发者社区> 华章计算机> 正文

《Spark大数据分析实战》——3.1节SQL on Spark

简介:
+关注继续查看

本节书摘来自华章社区《Spark大数据分析实战》一书中的第3章,第3.1节SQL on Spark,作者高彦杰 倪亚宇,更多章节内容可以访问云栖社区“华章社区”公众号查看

3.1 SQL on Spark
AMPLab将大数据分析负载分为三大类型:批量数据处理、交互式查询、实时流处理。而其中很重要的一环便是交互式查询。大数据分析栈中需要满足用户ad-hoc、reporting、iterative等类型的查询需求,也需要提供SQL接口来兼容原有数据库用户的使用习惯,同时也需要SQL能够进行关系模式的重组。完成这些重要的SQL任务的便是Spark SQL和Shark这两个开源分布式大数据查询引擎,它们可以理解为轻量级Hive SQL在Spark上的实现,业界将该类技术统称为SQL on Hadoop。
在Spark峰会2014上,Databricks宣布不再支持Shark的开发,全力以赴开发Shark的下一代技术Spark SQL,同时Hive社区也启动了Hive on Spark项目,将Spark作为Hive(除MapReduce和Tez之外的)新执行引擎。根据伯克利的Big Data Benchmark测试对比数据,Shark的In Memory性能可以达到Hive的100倍,即使是On Disk也能达到10倍的性能提升,是Hive强有力的替代解决方案。而作为Shark的进化版本的Spark SQL,在AMPLab最新的测试中的性能已经超过Shark。图3-2展示了Spark SQL和Hive on Spark是新的发展方向。


3923bc6657b69f590ec96dd27f416f929a917f18

3.1.1 为什么使用Spark SQL
由于Shark底层依赖于Hive,这个架构的优势是对传统Hive用户可以将Shark无缝集成进现有系统运行查询负载。但是也看到一些问题:随着版本升级,查询优化器依赖于Hive,不方便添加新的优化策略,需要进行另一套系统的学习和二次开发,学习成本很高。另一方面,MapReduce是进程级并行,例如:Hive在不同的进程空间会使用一些静态变量,当在同一进程空间进行多线程并行执行,多线程同时写同名称的静态变量会产生一致性问题,所以Shark需要使用另外一套独立维护的Hive源码分支。而为了解决这个问题AMPLab和Databricks利用Catalyst开发了Spark SQL。
Spark的全栈解决方案为用户提供了多样的数据分析框架,机器学习、图计算、流计算如火如荼的发展和流行吸引了大批的学习者,为什么人们今天还是要重视在大数据环境下使用SQL呢?笔者认为主要有以下几点原因:
1)易用性与用户惯性。在过去的很多年中,有大批的程序员的工作是围绕着数据库+应用的架构来做的,因为SQL的易用性提升了应用的开发效率。程序员已经习惯了业务逻辑代码调用SQL的模式去写程序,惯性的力量是强大的,如果还能用原有的方式解决现有的大数据问题,何乐而不为呢?提供SQL和JDBC的支持会让传统用户像以前一样地书写程序,大大减少迁移成本。
2)生态系统的力量。很多系统软件性能好,但是未取得成功和没落,很大程度上因为生态系统问题。传统的SQL在JDBC、ODBC、SQL的各种标准下形成了一整套成熟的生态系统,很多应用组件和工具可以迁移使用,像一些可视化的工具、数据分析工具等,原有企业的IT工具可以无缝过渡。
3)数据解耦,Spark SQL正在扩展支持多种持久化层,用户可以使用原有的持久化层存储数据,但是也可以体验和迁移到Spark SQL提供的数据分析环境下进行Big Data的分析。
3.1.2 Spark SQL架构分析
Spark SQL与传统DBMS的查询优化器+执行器的架构较为类似,只不过其执行器是在分布式环境中实现,并采用的Spark作为执行引擎。Spark SQL的查询优化是Catalyst,其基于Scala语言开发,可以灵活利用Scala原生的语言特性很方便进行功能扩展,奠定了Spark SQL的发展空间。Catalyst将SQL语言翻译成最终的执行计划,并在这个过程中进行查询优化。这里和传统不太一样的地方就在于,SQL经过查询优化器最终转换为可执行的查询计划是一个查询树,传统DB就可以执行这个查询计划了。而Spark SQL最后执行还是会在Spark内将这棵执行计划树转换为Spark的有向无环图DAG再执行。


4069c97f2bf77ecedc83f470cafa16804c6c0d17

从图3-3中可以看到整个Catalyst是Spark SQL的调度核心,遵循传统数据库的查询解析步骤,对SQL进行解析,转换为逻辑查询计划、物理查询计划,最终转换为Spark的DAG后再执行。图3-4为Catalyst的执行流程。


292606720b935870ce70514e5d34cea4698b763f

SqlParser将SQL语句转换为逻辑查询计划,Analyzer对逻辑查询计划进行属性和关系关联检验,之后Optimizer通过逻辑查询优化将逻辑查询计划转换为优化的逻辑查询计划,QueryPlanner将优化的逻辑查询计划转换为物理查询计划,prepareForExecution调整数据分布,最后将物理查询计划转换为执行计划进入Spark执行任务。
2.?Spark SQL优化策略
查询优化是传统数据库中最为重要的一环,这项技术在传统数据库中已经很成熟。除了查询优化,Spark SQL在存储上也进行了优化,从以下几点查看Spark SQL的一些优化策略。
(1)内存列式存储与内存缓存表
Spark SQL可以通过cacheTable将数据存储转换为列式存储,同时将数据加载到内存进行缓存。cacheTable相当于在分布式集群的内存物化视图,将数据进行缓存,这样迭代的或者交互式的查询不用再从HDFS读数据,直接从内存读取数据大大减少了I/O开销。列式存储的优势在于Spark SQL只需要读出用户需要的列,而不需要像行存储那样需要每次将所有列读出,从而大大减少内存缓存数据量,更高效地利用内存数据缓存,同时减少网络传输和I/O开销。数据按照列式存储,由于是数据类型相同的数据连续存储,能够利用序列化和压缩减少内存空间的占用。
(2)列存储压缩
为了减少内存和硬盘空间占用,Spark SQL采用了一些压缩策略对内存列存储数据进行压缩。Spark SQL的压缩方式要比Shark丰富很多,例如它支持PassThrough, RunLengthEncoding, DictionaryEncoding, BooleanBitSet, IntDelta, LongDelta等多种压缩方式。这样能够大幅度减少内存空间占用和网络传输开销和I/O开销。
(3)逻辑查询优化
Spark SQL在逻辑查询优化(如图3-5所示)上支持列剪枝、谓词下压、属性合并等逻辑查询优化方法。列剪枝为了减少读取不必要的属性列,减少数据传输和计算开销,在查询优化器进行转换的过程中会进行列剪枝的优化。


d2b18073df22724cb1ec3601800fcde963b50660

下面介绍一个逻辑优化例子:

SELECT Class FROM (SELECT ID,Name,Class  FROM STUDENT ) S WHERE S.ID=1

Catalyst将原有查询通过谓词下压,将选择操作ID=1优先执行,这样过滤大部分数据,通过属性合并将最后的投影只做一次最终保留Class属性列。
(4)Join优化
Spark SQL深度借鉴传统数据库查询优化技术的精髓,同时也在分布式环境下进行特定的优化策略调整和创新。Spark SQL对Join进行了优化支持多种连接算法,现在的连接算法已经比Shark丰富,而且很多原来Shark的元素也逐步迁移过来。例如:BroadcastHashJoin、BroadcastNestedLoopJoin、HashJoin、LeftSemiJoin,等等。
下面介绍一个其中的BroadcastHashJoin算法思想。
BroadcastHashJoin将小表转化为广播变量进行广播,这样避免Shuff?le开销,最后在分区内做Hash连接。这里用的就是Hive中Map Side Join的思想。同时用了DBMS中的Hash连接算法做连接。
随着Spark SQL的发展,未来会有更多的查询优化策略加入进来。同时后续Spark SQL会支持像Shark Server一样的服务端、JDBC接口,兼容更多的持久化层例如NoSQL,传统的DBMS等。一个强有力的结构化大数据查询引擎正在崛起。
3.?如何使用Spark SQL

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// 在这里引入sqlContext下所有的方法就可以直接用sql方法进行查询
import sqlContext._
case class Person(name: String, age: Int)
// 下面的people是含有case类型数据的RDD,会默认由Scala的implicit机制将RDD转换为
  SchemaRDD,SchemaRDD是SparkSQL中的核心RDD
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt))
// 在内存的元数据中注册表信息,这样一个Spark SQL表就创建完成了
people.registerAsTable("people")
// sql语句就会触发上面分析的Spark SQL的执行过程,读者可以参考上面的图示
val teenagers = sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
// 最后生成teenagers也是一个RDD
teenagers.map(t =>"Name: " + t(0)).collect().foreach(println)

通过之前的介绍,读者对支撑结构化数据分析任务的Spark SQL的原理与使用有了一定的了解。在生产环境中,有一类数据分析任务对响应延迟要求高,需要实时处理流数据,在BDAS中,Spark Streaming用于支撑大规模流式处理分析任务。

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
基于Numpy的统计分析实战
标题中的英文首字母大写比较规范,但在python实际使用中均为小写。 2018年7月27日笔记 学习内容: 1.从文件中读取数据 2.将数据写入文件 3.利用数学和统计分析函数完成实际统计分析应用 4.掌握数组相关的常用函数 1.文本文件读写 1.1使用numpy.savetxt方法写入文本文件 numpy.savetxt方法需要2个参数:第1个参数是文件名,数据类型为字符串str; 第2个参数是被写入文件的nda数据,数据类型为ndarray对象。
1020 0
史上最快! 10小时大数据入门实战(九)- 前沿技术拓展Spark,Flink,Beam
spark Spark 开发语言及运行模式介绍 Scala安装 下载 Scala ...
1430 0
OLAP on TableStore:基于Data Lake Analytics的Serverless SQL大数据分析
TableStore(简称OTS)是阿里云的一款分布式表格系统,为用户提供schema-free的分布式表格服务。随着越来越多用户对OLAP有强烈的需求,我们提供在表格存储上接入Data Lake Analytics(简称DLA)服务的方式,提供一种快速的OLAP解决方案。
6489 0
MaxCompute Studio使用心得系列3——可视化分析作业运行
我们很熟悉的是通过Logview 去分析作业的执行情况,logview上有很详细的执行日志,而Studio不仅仅提供可视化的信息,还会明确给出一些分析结论如job是有否长尾或数据倾斜情况。
3426 0
Python系列直播——深入Python与日志服务,玩转大规模数据分析处理实战
Python系列直播——深入Python与日志服务,玩转大规模数据分析处理实战
4839 0
基于Spark的机器学习实践 (三) - 实战环境搭建
0 相关源码 1 Spark环境安装 ◆ Spark 由scala语言编写,提供多种语言接口,需要JVM ◆ 官方为我们提供了Spark 编译好的版本,可以不必进行手动编译 ◆ Spark安装不难,配置需要注意,并且不一定需要Hadoop环境 下载 解压 tar zxvf spark-2.
1048 0
Okhttp3源码解析(1)-OkHttpClient分析
前言 上篇文章我们讲了Okhttp的基本用法,今天根据上节讲到请求流程来分析源码,那么第一步就是实例化OkHttpClient对象,所以我们今天主要分析下OkHttpClient源码! 初始化-构造方式 创建 OkHttpClient实例的两种方式 1.
2594 0
10059
文章
0
问答
来源圈子
更多
+ 订阅
文章排行榜
最热
最新
相关电子书
更多
《2021云上架构与运维峰会演讲合集》
立即下载
《零基础CSS入门教程》
立即下载
《零基础HTML入门教程》
立即下载