人人都懂Spark-SQL基础操作(Scala版)

简介: 人人都懂Spark-SQL基础操作(Scala版)

Spark SQL

简单的说Spark SQL是spark用来操作结构化和半结构化数据的接口。本文来讲述一下它的一些基本操作。

Spark SQL的特性

  1. 无缝地将SQL查询和spark程序混合,与常规的Python/Java/scala代码高度整合,包含了连接RDD与SQL表、公开的自定义SQL函数接口等。
  2. 可以从各种结构化数据源中读取数据,如(JSON、HIVE等)
  3. 可以通过JDBC或者ODBC连接,Spark SQL包括有行业标准的JDBC和ODBC连接的服务器模式。


为了实现这些功能Spark SQL提供了一种特殊的RDD叫做SchemaRDD,这是一个存放row对象的RDD,每个RDD代表一行记录。


Spark SQL编译时可以包含hive支持(Apache Hive 是 Hadoop上的SQL引擎),也可以不包含,包含Hive支持的Spark SQL可以支持hive表访问、UDF(用户自定义函数)、SerDe(序列化格式和反序列化格式)、以及Hive查询语言(HiveQL/HQL)。


下面来说一下Spark SQL的操作。


初始化Spark SQL

Scala中SQL的import声明:

//导入 Spark SQL
import org.apache.spark.sql.hive.HiveContext
//如果不能使用hive依赖的话
import org.apahe.spark.sql.SQLContext

需要导入的隐式转换支持

//创建Spark SQL的HiveContext
val hiveCtx = ...
//如果不能使用hive依赖的话
import hiveCtx._

添加好import声明之后,需要创建出一个HiveContext对象,如果无法一如Hive依赖,就创建出一个SQLContext对象作为SQL的上下文环境,这两个类都要传入一个SparkContext对象作为运行的基础

创建SQL上下文环境

val sc = new SparkContext(...)
val hiveCtx = new HiveContext(sc)

有了HiveContext或SQLContext后我们就可以准备读取数据并进行查询了。


基本查询示例

读取并查询推文

想要在一张数据表上进行查询,需要调用HiveContext或者SQLContext中的sql()方法,要做的第一件事就是告诉Spark SQL要查询的数据是什么。

val input = hiveCtx.jsonFile(inputFile)
//注册输入的SchemaRDD
input.registerTempTable("tweets")
//依据retweetCount(转发计数)选出推文
val topTweets = hiveCtx.sql("SELECT text,retweetCount FROM
  tweet ORDER BY retweetCount LIMIT 10")

SchemaRDD

我们在读取和执行查询的时候都会返回SchemaRDD,SchemaRDD和传统的数据库中的表的概念类似。SchemaRDD是一个由ROW对象组成的RDD,附带包含每列数据类型的结构信息。


row对象

row对象表示SchemaRDD中的记录,其本质就是一个定长的字段数组,在Scala中Row对象有一系列Getter方法,可以通过下标来获取每个字段的值。


例:在Scala中访问topTweet这个SchemaRDD中的text列(第一列)

val topTweetText = topTweets.map(row => row.getString(0))

读取和存储数据

Spark SQL支持很多种结构化数据源,可以让你跳过复杂的读取过程,轻松从各种数据源中读取到Row对象,这些数据源包括Hive表、JSON和Parquet文件。


Hive


从hive中读取数据时,Spark SQL支持任何Hive支持的存储格式(SerDe),包括文本文件,RCFiles,ORC,Parquet,Avro以及Protocol Buffer。


例:如何查询一张hive表。

import org.apache.spark.sql.hive.HiveContext
val hiveCtx = new HiveContext(sc)
val rows = hiveCtx.sql("SELECT key,value Frow mytable")
val keys = row.map(row => row.getInt(0))

Parquet

Parquet是一种流行的列式存储格式,可以高效地存储具有嵌套字段的记录,Parquet格式经常在Hadoop生态圈中被使用,他也支持Spark SQL的全部数据类型。


JSON

要读取JSON数据,只要调用hiveCtx中的jsonFile()方法即可,如果想获得从数据中推断出来的结构信息,可以在生成的SchemaRDD上调用printSchema方法。


在Scala中使用Spark SQL读取json数据。

val input = hiveCtx.jsonFile(inputFile)

基于RDD创建SchemaRDD

case class Happyperson(handle:String,favouriteBeverage:String)
...
//创建了一个人的对象,并且把它转换成SchemaRDD
val HappypersonRDD = sc.parallelize(List(Happyperson("holden","coffee")))
happyPeopleRDD.registerTempTable("happy_people")

小结

本文讲解了Spark利用Spark SQL进行结构化和半结构化数据处理的方式,除了上述的一些方法,操作RDD的方法也同样适用于SchemaRDD。

相关文章
|
1月前
|
分布式计算 大数据 Java
大数据-87 Spark 集群 案例学习 Spark Scala 案例 手写计算圆周率、计算共同好友
大数据-87 Spark 集群 案例学习 Spark Scala 案例 手写计算圆周率、计算共同好友
50 5
|
1月前
|
分布式计算 关系型数据库 MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
50 3
|
1月前
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
40 0
|
1月前
|
消息中间件 存储 分布式计算
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
89 0
|
1月前
|
分布式计算 大数据 Java
大数据-86 Spark 集群 WordCount 用 Scala & Java 调用Spark 编译并打包上传运行 梦开始的地方
大数据-86 Spark 集群 WordCount 用 Scala & Java 调用Spark 编译并打包上传运行 梦开始的地方
25 1
大数据-86 Spark 集群 WordCount 用 Scala & Java 调用Spark 编译并打包上传运行 梦开始的地方
|
1月前
|
SQL 分布式计算 Java
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
37 0
|
1月前
|
缓存 分布式计算 大数据
大数据-90 Spark 集群 RDD 编程-高阶 RDD容错机制、RDD的分区、自定义分区器(Scala编写)、RDD创建方式(一)
大数据-90 Spark 集群 RDD 编程-高阶 RDD容错机制、RDD的分区、自定义分区器(Scala编写)、RDD创建方式(一)
47 0
|
1月前
|
分布式计算 算法 大数据
大数据-90 Spark 集群 RDD 编程-高阶 RDD容错机制、RDD的分区、自定义分区器(Scala编写)、RDD创建方式(二)
大数据-90 Spark 集群 RDD 编程-高阶 RDD容错机制、RDD的分区、自定义分区器(Scala编写)、RDD创建方式(二)
50 0
|
5月前
|
SQL Java 数据库连接
2万字实操案例之在Springboot框架下基于注解用Mybatis开发实现基础操作MySQL之预编译SQL主键返回增删改查
2万字实操案例之在Springboot框架下基于注解用Mybatis开发实现基础操作MySQL之预编译SQL主键返回增删改查
77 2
|
5月前
|
分布式计算 资源调度 Java
Scala+Spark+Hadoop+IDEA实现WordCount单词计数,上传并执行任务(简单实例-下)
Scala+Spark+Hadoop+IDEA实现WordCount单词计数,上传并执行任务(简单实例-下)
59 0