一、Spark SQL简介
(一)从Shark说起
Hive是一个基于Hadoop 的数据仓库工具,提供了类似于关系数据库SQL的查询语言HiveQL,用户可以通过HiveQL语句快速实现简单的MapReduce统计,Hive 自身可以自动将HiveQL语句快速转换成MapReduce 任务进行运行。当用户向Hive输入一段命令或查询(即HiveQL 语句)时, Hive需要与Hadoop交互来完成该操作。该命令或查询首先进入到驱动模块,由驱动模块中的编译器进行解析编译,并由优化器对该操作进行优化计算,然后交给执行器去执行,执行器通常的任务是启动一个或多个MapReduce任务。如图所示描述了用户提交一段SQL查询后,Hive把sQL 语句转化成MapReduce任务进行执行的详细过程。
Shark即Hive on Spark,为了实现与Hive兼容,Shark在HiveQL方面重用了Hive中HiveQL的解析、逻辑执行计划翻译、执行计划优化等逻辑,可以近似认为仅将物理执行计划从MapReduce作业替换成了Spark作业,通过Hive的HiveQL解析,把HiveQL翻译成Spark上的RDD操作。
Shark的出现,使得SQL-on-Hadoop的性能比Hive有了10-100倍的提高。
Shark的设计导致了两个问题: 一是执行计划优化完全依赖于Hive,不方便添加新的优化策略
二是因为Spark是线程级并行,而MapReduce是进程级并行,因此,Spark在兼容Hive的实现上存在线程安全问题,导致Shark不得不使用另外一套独立维护的打了补丁的Hive源码分支。
2014年6月1日Shark项目和Spark SQL项目的主持人Reynold Xin宣布:停止对Shark的开发,团队将所有资源放在Spark SQL项目上,至此,Shark的发展画上了句号,但也因此发展出两个分支:Spark SQL和Hive on Spark。Spark SQL作为Spark生态的一员继续发展,而不再受限于Hive,只是兼容Hive Hive on Spark是一个Hive的发展计划,该计划将Spark作为Hive的底层引擎之一,也就是说,Hive将不再受限于一个引擎,可以采用Map-Reduce、Tez、Spark等引擎。
(二)Spark SQL架构
Spark SQL架构如图所示,Spark SQL在Hive兼容层面仅依赖HiveQL解析、Hive元数据,也就是说,从HQL被解析成抽象语法树(AST)起,就全部由Spark SQL接管了。Spark SQL执行计划生成和优化都由Catalyst(函数式关系查询优化框架)负责。
Spark SQL增加了DataFrame(即带有Schema信息的RDD),使用户可以在Spark SQL中执行SQL语句,数据既可以来自RDD,也可以是Hive、HDFS、Cassandra等外部数据源,还可以是JSON格式的数据 Spark SQL目前支持Scala、Java、Python三种语言,支持SQL-92规范。
(三)为什么推出Spark SQL
关系数据库已经很流行 关系数据库在大数据时代已经不能满足要求 首先,用户需要从不同数据源执行各种操作,包括结构化、半结构化和非结构化数据 其次,用户需要执行高级分析,比如机器学习和图像处理 在实际大数据应用中,经常需要融合关系查询和复杂分析算法(比如机器学习或图像处理),但是,缺少这样的系统。
Spark SQL填补了这个鸿沟: 首先,可以提供DataFrame API,可以对内部和外部各种数据源执行各种关系型操作 其次,可以支持大数据中的大量数据源和数据分析算法 Spark SQL可以融合:传统关系数据库的结构化数据管理能力和机器学习算法的数据处理能力。
二、DataFrame概述
Spark SQL所使用的数据抽象并非RDD,而是DataFrame。DataFrame的推出,让Spark具备了处理大规模结构化数据的能力,不仅比原有的RDD转化方式更加简单易用,而且获得了更高的计算性能 Spark能够轻松实现从MySQL到DataFrame的转化,并且支持SQL查询。
RDD是分布式的 Java对象的集合,但是,对象内部结构对于RDD而言却是不可知的 DataFrame是一种以RDD为基础的分布式数据集,提供了详细的结构信息。如图所示为RDD和DataFrame的区别。三、DataFrame的创建
从Spark2.0以上版本开始,Spark使用全新的SparkSession接口替代Spark1.6中的SQLContext及HiveContext接口来实现其对数据加载、转换、处理等功能。SparkSession实现了SQLContext及HiveContext所有功能。
SparkSession支持从不同的数据源加载数据,并把数据转换成DataFrame,并且支持把DataFrame转换成SQLContext自身中的表,然后使用SQL语句来操作数据。SparkSession亦提供了HiveQL以及其他依赖于Hive的功能的支持。
可以通过如下语句创建一个SparkSession对象:
>>> from pyspark import SparkContext,SparkConf >>> from pyspark.sql import SparkSession >>> spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()
实际上,在启动进入pyspark以后,pyspark就默认提供了一个SparkContext对象(名称为sc)和一个SparkSession对象(名称为spark)。
在创建DataFrame时,可以使用spark.read操作,从不同类型的文件中加载数据创建DataFrame。例如:
spark.read.text("people.txt"):读取文本文件people.txt创建DataFrame;在读取本地文件或HDFS文件时,要注意给出正确的文件路径。
spark.read.json("people.json"):读取people.json文件创建DataFrame。
spark.read.parquet(“people.parquet”):读取people.parquet文件创建DataFrame。
或者也可以使用如下格式的语句:
spark.read.format("text").load("people.txt"):读取文本文件people.json创建DataFrame。 spark.read.format("json").load("people.json"):读取JSON文件people.json创建DataFrame。 spark.read.format("parquet").load("people.parquet"):读取Parquet文件people.parquet创建DataFrame。
一个实例:
在“/usr/local/spark/examples/src/main/resources/”这个目录下,这个目录下有两个样例数据people.json和people.txt。
people.json文件的内容如下:
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
people.txt文件的内容如下:
Michael, 29
Andy, 30Justin, 19
>>> df = spark.read.json("file:///usr/local/spark/examples/src/main/resources/people.json") >>> df.show() +----+-------+ | age| name| +----+-------+ |null|Michael| | 30| Andy| | 19| Justin| +----+-------+
四、DataFrame的保存
可以使用spark.write操作,把一个DataFrame保存成不同格式的文件,例如,把一个名称为df的DataFrame保存到不同格式文件中,方法如下:
DataFrame保存到不同格式文件中,方法如下:
df.write.text("people.txt")
df.write.json("people.json")
df.write.parquet("people.parquet")
或者也可以使用如下格式的语句:
df.write.format("text").save("people.txt")
df.write.format("json").save("people.json")
df.write.format ("parquet").save("people.parquet")
下面从示例文件people.json中创建一个DataFrame,名称为peopleDF,把peopleDF保存到另外一个JSON文件中,然后,再从peopleDF中选取一个列(即name列),把该列数据保存到一个文本文件中。
>>> peopleDF = spark.read.format("json").\ ... load("file:///usr/local/spark/examples/src/main/resources/people.json") >>> peopleDF.select("name", "age").write.format("json").\ ... save("file:///home/zhc/mycode/sparksql/newpeople.json") >>> peopleDF.select("name").write.format("text").\ ... save("file:///home/zhc/mycode/sparksql/newpeople.txt")
然后到“/home/zhc/mycode/sparksql/”路径下可以看到生成一个名称为newpeople.json的目录(不是文件)和一个名称为newpeople.txt的目录(不是文件)。
五、DataFrame的常用操作
可以执行一些常用的DataFrame操作,先创建一个DataFrame:
>>> df=spark.read.json("file:///usr/local/spark/examples/src/main/resources/people.json")
1、printSchema()
>>> df.printSchema()
2、select()
>>> df.select(df["name"],df["age"]+1).show()
3、filter()
>>> df.filter(df["age"]>20).show()
4、groupBy()
>>> df.groupBy("age").count().show()
5、sort()
>>> df.sort(df["age"].desc()).show()
六、从RDD转换得到DataFrame
(一)利用反射机制推断RDD模式
利用反射机制来推断包含特定类型对象的RDD的模式(Schema),适用于数据结构已知时的RDD转换。
在“/usr/local/spark/examples/src/main/resources/”目录下,有个Spark安装时自带的样例数据people.txt,其内容如下:
Michael, 29
Andy, 30
Justin, 19
现在要把people.txt加载到内存中生成一个DataFrame,并查询其中的数据。执行代码如下:
>>> from pyspark.sql import Row >>> people = spark.sparkContext.\ ... textFile("file:///usr/local/spark/examples/src/main/resources/people.txt").\ ... map(lambda line: line.split(",")).\ ... map(lambda p: Row(name=p[0], age=int(p[1]))) >>> schemaPeople = spark.createDataFrame(people) #必须注册为临时表才能供下面的查询使用 >>> schemaPeople.createOrReplaceTempView("people") >>> personsDF = spark.sql("select name,age from people where age > 20") #DataFrame中的每个元素都是一行记录,包含name和age两个字段,分别用p.name和p.age来获取值 >>> personsRDD=personsDF.rdd.map(lambda p:"Name: "+p.name+ ","+"Age: "+str(p.age)) >>> personsRDD.foreach(print) Name: Michael,Age: 29 Name: Andy,Age: 30
(二)使用编程方式定义RDD模式
使用编程接口构造一个模式(Schema),并将其应用在已知的RDD上,适用于数据结构未知的RDD转换。
当无法提前获知数据结构时,就需要采用编程方式定义RDD模式。比如,现在需要通过编程方式把“/usr/local/spark/examples/src/main/resources/people.txt”加载进来生成DataFrame,并完成SQL查询。步骤如下:
下面是利用Spark SQL查询people.txt的完整代码:
>>> from pyspark.sql.types import * >>> from pyspark.sql import Row #下面生成“表头” >>> schemaString = "name age" >>> fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split(" ")] >>> schema = StructType(fields) #下面生成“表中的记录” >>> lines = spark.sparkContext.\ ... textFile("file:///usr/local/spark/examples/src/main/resources/people.txt") >>> parts = lines.map(lambda x: x.split(",")) >>> people = parts.map(lambda p: Row(p[0], p[1].strip())) #下面把“表头”和“表中的记录”拼装在一起 >>> schemaPeople = spark.createDataFrame(people, schema) #注册一个临时表供下面查询使用 >>> schemaPeople.createOrReplaceTempView("people") >>> results = spark.sql("SELECT name,age FROM people") >>> results.show() +-------+---+ | name|age| +-------+---+ |Michael| 29| | Andy| 30| | Justin| 19| +-------+---+
七、使用Spark SQL读写数据库
Spark SQL可以支持Parquet、JSON、Hive等数据源,并且可以通过JDBC连接外部数据源。
(一)准备工作
在Linux系统中安装MySQL数据库的方法,可以参照我上一篇博客。
在Linux系统中安装MySQL数据库-CSDN博客
https://blog.csdn.net/Morse_Chen/article/details/135154114
安装成功后,在Linux终端启动MySQL数据库,命令如下:
[root@bigdata zhc]# systemctl start mysqld.service [root@bigdata zhc]# mysql -u root -p #屏幕会提示你输入密码
输入下面SQL语句完成数据库和表的创建:
mysql> create database spark; mysql> use spark; mysql> create table student (id int(4), name char(20), gender char(4), age int(4)); mysql> insert into student values(1,'Xueqian','F',23); mysql> insert into student values(2,'Weiliang','M',24); mysql> select * from student;
为了让Spark能够顺利连接MySQL数据库,还需要MySQL数据库驱动程序。可以上网查找下载MySQL的JDBC驱动程序。下载MySQL的JDBC驱动程序,比如mysql-connector-java-5.1.40.tar.gz 。把该驱动程序解压出mysql-connector-java-5.1.40-bin.jar文件,并将其拷贝到spark的安装目录“/usr/local/spark/jars”下。完成以上操作后,再启动进入pyspark。
(二)读取MySQL数据库中的数据
启动进入pyspark后,执行以下命令连接数据库,读取数据,并显示:
>>> jdbcDF = spark.read.format("jdbc") \ .option("driver","com.mysql.jdbc.Driver") \ .option("url", "jdbc:mysql://localhost:3306/spark") \ .option("dbtable", "student") \ .option("user", "root") \ .option("password", "123456") \ .load() >>> jdbcDF.show() +---+--------+------+---+ | id| name|gender|age| +---+--------+------+---+ | 1| Xueqian| F| 23| | 2|Weiliang| M| 24| +---+--------+------+---+
(三)向MySQL数据库写入数据
在MySQL数据库中已经创建了一个名称为spark的数据库,并创建了一个名称为student的表 创建后,查看一下数据库内容:
现在开始编写程序,创建一个“/home/zhc/mycode/sparksql/InsertStudent.py”,往spark.student表中插入两条记录。
#/home/zhc/mycode/sparksql/InsertStudent.py from pyspark.sql import Row from pyspark.sql.types import * from pyspark import SparkContext,SparkConf from pyspark.sql import SparkSession spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate() #下面设置模式信息 schema = StructType([StructField("id", IntegerType(), True),StructField("name", StringType(), True),StructField("gender", StringType(), True),StructField("age", IntegerType(), True)]) #下面设置两条数据,表示两个学生的信息 studentRDD = spark.sparkContext.parallelize(["3 Rongcheng M 26","4 Guanhua M 27"]).map(lambda x:x.split(" ")) #下面创建Row对象,每个Row对象都是rowRDD中的一行 rowRDD = studentRDD.map(lambda p:Row(int(p[0].strip()), p[1].strip(), p[2].strip(), int(p[3].strip()))) #建立起Row对象和模式之间的对应关系,也就是把数据和模式对应起来 studentDF = spark.createDataFrame(rowRDD, schema) #写入数据库 prop = {} prop['user'] = 'root' prop['password'] = 'MYsql123!' prop['driver'] = "com.mysql.jdbc.Driver" studentDF.write.jdbc("jdbc:mysql://localhost:3306/spark?useSSL=false",'student','append', prop)
通过spark-submit提交上述代码文件:
[root@bigdata sparksql]# spark-submit InsertStudent.py
执行上述代码后,可以看一下效果,在MySQL Shell环境中使用SQL查询spark.student表发生了什么变化。
另外,解决一下在运行上述代码时,可能出现的问题:
很显然,上图中运行代码时抛出了异常。
这是因为与MySQL数据库的SSL连接失败了,我们只需要将数据源的URL后面添加**?useSSL=false**就可以解决,也就是禁用SSL:
再次运行代码,就OK了。