文章目录
一、Spark SQL的进化之路
二、认识Spark SQL
2.1 什么是Spark SQL?
2.2 Spark SQL的作用
2.3 运行原理
2.4 特点
2.5 Spark SQL数据抽象
三、Spark SQL API
3.1 SparkSession
3.2 DataSet ( Spark1. 6 + )
1、创建 DataSet
2、使用case Class 创建 DataSet
3、使用DataSet完成Demo
3.3 DataFrame
1、创建 DataFrame
2、使用DataFrame完成Demo
四、Spark SQL 操作Hive表
4.1 文件配置
4.1 操作 Hive 表
五、Spark SQL 连 MySQL
六、Spark SQL 内置函数
七、Spark SQL 自定义函数
一、Spark SQL的进化之路
1.0以前: Shark
1.1:x开始:Spark SQL(只是测试性的) SQL
1.3.x:Spark SQL(正式版本)+Dataframe
1.5.x:Spark SQL 钨丝计划
b
1.6.x:Spark SQL+DataFrame+DataSet(测试版本)
x:Spark SQL+DataFrame+DataSet(正式版本)
Spark SQL:还有其他的优化 StructuredStreaming(DataSet)
二、认识Spark SQL
2.1 什么是Spark SQL?
spark SQL是spark的一个模块,主要用于进行结构化数据的处理。它提供的最核心的编程抽象就是DataFrame。
2.2 Spark SQL的作用
提供一个编程抽象(DataFrame) 并且作为分布式 SQL 查询引擎
DataFrame:它可以根据很多源进行构建,包括:结构化的数据文件,hive中的表,外部的关系型数据库,以及RDD
2.3 运行原理
将 Spark SQL 转化为 RDD, 然后提交到集群执行
2.4 特点
(1)容易整合
(2)统一的数据访问方式
(3)兼容 Hive
(4)标准的数据连接
2.5 Spark SQL数据抽象
RDD (Spark1.0) -> DataFrame (Spark1.3) -> DataSet ( Spark1. 6)
- Spark SQL提供了DataFrame和DataSet的数据抽象。
- DataFrame就是RDD + Schema,可以认为是一-张二维表格。 他的劣势是在编译器不进行表格中的字段的类型检查。在运行期进行检查。
- DataSet是 Spark最新的数据抽象, Spark的发 展会逐步将DataSet作为主要的数据抽象,弱化RDD和DataFrame。 DataSet包含了DataFr ame所有的优化机制。除此之外提供了以样例类为Schema模型的强类型。
- DataFrame = DataSet [Row]
- DataFrame和DataSet都有可控的内存管理机制,所有数据都保存在非堆上,都使用了catalyst进行SQL的优化。
三、Spark SQL API
3.1 SparkSession
- SQLContext Spark SQL的编程入口
- HiveContext的子集,包含更多的功能
- SparkSession (Spark 2.x 推荐)
- SparkSession:合并了SQLContext与HiveContext
- 提供与Spark功能交互单一入口点,并允许使用DataFrame和DataSet API对Spark进行编程
SparkSession 是 Spark 2.0 引如的新概念。SparkSession为用户提供了统一的切入点,来让用户学习spark的各项功能。 在spark的早期版本中,SparkContext 是 spark 的主要切入点,由于RDD是主要的API,我们通过sparkcontext来创建和操作RDD。对于每个其他的API,我们需要使用不同的context。例如,对于Streming,我们需要使用 StreamingContext ;对于sql,使用 sqlContext ;对于Hive,使用 hiveContext 。但是随着 DataSet 和 DataFrame 的API逐渐成为标准的API,就需要为他们建立接入点。所以在spark2.0中,引入SparkSession作为DataSet和DataFrame API 的切入点,SparkSession封装了SparkConf、SparkContext和SQLContext。为了向后兼容,SQLContext和HiveContext也被保存下来。
SparkSession 实质上是 SQLContext 和 HiveContext 的组合(未来可能还会加上 StreamingContext),所以在 SQLContext 和 HiveContext 上可用的 API 在 SparkSession 上同样是可以使用的。SparkSession 内部封装了 SparkContext,所以计算实际上是由 SparkContext 完成的。
特点:
- 为用户提供一个统一的切入点使用Spark 各项功能
- 允许用户通过它调用 DataFrame 和 Dataset 相关 API 来编写程序
- 减少了用户需要了解的一些概念,可以很容易的与 Spark 进行交互
- 与 Spark 交互之时不需要显示的创建 SparkConf, SparkContext 以及 SQlContext,这些对象已经封闭在 SparkSession 中
3.2 DataSet ( Spark1. 6 + )
1、创建 DataSet
createDataset()的参数可以是:Seq、Array、RDD scala> spark.createDataset(1 to 3).show +-----+ |value| +-----+ | 1| | 2| | 3| +-----+ scala> spark.createDataset(List(("a",1))).show +---+---+ | _1| _2| +---+---+ | a| 1| +---+---+
2、使用case Class 创建 DataSet
Scala中在class 关键字前面加上case关键字,这个类就可以成为样例类
scala> case class Userinfos(userid:Int,username:String) defined class Userinfos scala> case class Scores(sid:Int,userid:Int,score:Int) defined class Scores scala> val users = Seq(Userinfos(1,"zhangsan"),Userinfos(2,"lisi"),Userinfos(3,"Wangwu")).toDS users: org.apache.spark.sql.Dataset[Userinfos] = [userid: int, username: string] scala> val scs = Seq(Scores(1,1,100),Scores(2,1,30),Scores(3,2,60),Scores(4,3,35)).toDS scs: org.apache.spark.sql.Dataset[Scores] = [sid: int, userid: int ... 1 more field] scala> users.join(scs,users("userid")===scs("userid")).show +------+--------+---+------+-----+ |userid|username|sid|userid|score| +------+--------+---+------+-----+ | 1|zhangsan| 2| 1| 30| | 1|zhangsan| 1| 1| 100| | 2| lisi| 3| 2| 60| | 3| Wangwu| 4| 3| 35| +------+--------+---+------+-----+
case class Customers(custid:String,lname:String,fname:String,cardno:String,addr:String,area:String,city:String,language:String,score:String) case class Orders(orderid:String,orddate:String,custid:String,status:String) case class OrderItem(oiid:String,ordid:String,proid:String,buynum:String,sp:String,cp:String) case class Products(proid:String,protype:String,proname:String,price:String,photo:String) // 效率高 方法一 val custs = sc.textFile("hdfs://192.168.56.137:9000/20200106/customers.csv").map(str=>{ var e = str.replaceAll("\"","").split(",") Customers(e(0),e(1),e(2),e(3),e(4),e(5),e(6),e(7),e(8)) }) scala> custs.toDS.show(1) +------+-------+---------+---------+---------+------------------+-----------+--------+-----+ |custid| lname| fname| cardno| addr| area| city|language|score| +------+-------+---------+---------+---------+------------------+-----------+--------+-----+ | 1|Richard|Hernandez|XXXXXXXXX|XXXXXXXXX|6303 Heather Plaza|Brownsville| TX|78521| +------+-------+---------+---------+---------+------------------+-----------+--------+-----+ // 方法二 val custs = sc.textFile("hdfs://192.168.56.137:9000/20200106/customers.csv").map(_.replaceAll("\"","").split(",")).map(x=>Customers(x(0),x(1),x(2),x(3),x(4),x(5),x(6),x(7),x(8))).toDS.show(1) val ords = sc.textFile("hdfs://192.168.56.137:9000/20200106/orders.csv").map(_.replaceAll("\"","").split(",")).map(x=>Orders(x(0),x(1),x(2),x(3))).toDS val items = sc.textFile("hdfs://192.168.56.137:9000/20200106/order_items.csv").map(_.replaceAll("\"","").split(",")).map(x=>OrderItem(x(0),x(1),x(2),x(3),x(4),x(5))).toDS
3、使用DataSet完成Demo
- 如上我们已经使用RDD装载业务数据
- 定义样例类
- 将RDD转换为DataSet
// 连接查询 谁的消费额最高 三表查询 val tabitem = items.groupBy("ordid").agg(sum($"cp").as("countPrice")) val tabord = tabitem.join(ords,tabitem("ordid")===ords("orderid")) tabord.show(2) +-----+----------+-------+-------------------+------+---------------+ |ordid|countPrice|orderid| orddate|custid| status| +-----+----------+-------+-------------------+------+---------------+ |10096| 369.97| 10096|2013-09-25 00:00:00| 10503|PENDING_PAYMENT| |10351| 299.98| 10351|2013-09-27 00:00:00| 723| ON_HOLD| +-----+----------+-------+-------------------+------+---------------+ tabord.where("ordid=1").show() +-----+----------+-------------------+------+------+ |ordid|countPrice| orddate|custid|status| +-----+----------+-------------------+------+------+ | 1| 299.98|2013-07-25 00:00:00| 11599|CLOSED| +-----+----------+-------------------+------+------+ tabord.orderBy(desc("countPrice")).show(3) +-----+------------------+-------------------+------+--------+ |ordid| countPrice| orddate|custid| status| +-----+------------------+-------------------+------+--------+ |68703|3089.9500000000003|2013-08-16 00:00:00| 9515|COMPLETE| |68724| 2609.93|2013-09-26 00:00:00| 1148|COMPLETE| |68809| 2579.94|2014-03-12 00:00:00| 5946| ON_HOLD| +-----+------------------+-------------------+------+--------+ tabord.orderBy(desc("countPrice")).limit(1).show +-----+------------------+-------------------+------+--------+ |ordid| countPrice| orddate|custid| status| +-----+------------------+-------------------+------+--------+ |68703|3089.9500000000003|2013-08-16 00:00:00| 9515|COMPLETE| +-----+------------------+-------------------+------+--------+ val tabod=tabord.orderBy(desc("countPrice")).limit(1) tabod.join(custs,tabod("custid")===custs("custid")) .select(col("lname"),col("fname"),col("countPrice")).show +--------+-----+------------------+ | lname|fname| countPrice| +--------+-----+------------------+ |Victoria|Smith|3089.9500000000003| +--------+-----+------------------+ // 哪个产品销量最高 val procounts =items.groupBy("proid") .agg(sum($"buynum").as("countnum")) .orderBy(desc("countnum")).limit(1) procounts .join(pros,procounts("proid")===pros("proid")) .drop(pros("proid")) .select(col("proid"),col("protype"),col("proname"),col("countnum")).show +-----+-------+--------------------+--------+ |proid|protype| proname|countnum| +-----+-------+--------------------+--------+ | 365| 17|Perfect Fitness P...| 73698.0| +-----+-------+--------------------+--------+
3.3 DataFrame
在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。DataFrame与RDD的主要区别在于,前者带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。这使得Spark SQL得以洞察更多的结构信息,从而对藏于DataFrame背后的数据源以及作用于DataFrame之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标。反观RDD,由于无从得知所存数据元素的具体内部结构,Spark Core只能在stage层面进行简单、通用的流水线优化。
1、创建 DataFrame
scala> val cus = spark.read.format("csv").option("header","true").load("hdfs://192.168.56.137:9000/20200102/events.csv") // 使用printSchema方法输出DataFrame的Schema信息 scala> cus.printSchema // 表头有名字 直接注册一个临时表 scala> cus.registerTempTable("users") // sql()方法执行SQL查询操作 scala> spark.sql("select * from users").show(2) scala> spark.sql("select event_id,user_id from users").show(2)
// 表头没名字 scala> val cs = spark.read.format("csv").load("hdfs://192.168.56.137:9000/20200106/customers.csv") scala> cs.printSchema root |-- _c0: string (nullable = true) |-- _c1: string (nullable = true) |-- _c2: string (nullable = true) |-- _c3: string (nullable = true) |-- _c4: string (nullable = true) |-- _c5: string (nullable = true) |-- _c6: string (nullable = true) |-- _c7: string (nullable = true) |-- _c8: string (nullable = true) // 单个命名 scala> cs.withColumnRenamed("_c0","id") // 多个命名 scala> val lookup = Map("_c0"->"id","_c1"->"lname") scala> cs.select(cs.columns.map(c=>col(c).as(lookup.getOrElse(c,c))):_*) scala> cs.select(cs.columns.map(c=>col(c).as(lookup.getOrElse(c,c))):_*).registerTempTable("abc") scala> spark.sql("select id,lname from abc").show(2) +---+-------+ | id| lname| +---+-------+ | 1|Richard| | 2| Mary| +---+-------+
2、使用DataFrame完成Demo
val custs = spark.read.format("csv").load("hdfs://192.168.56.137:9000/20200106/customers.csv") val ords = spark.read.format("csv").load("hdfs://192.168.56.137:9000/20200106/orders.csv") val items = spark.read.format("csv").load("hdfs://192.168.56.137:9000/20200106/order_items.csv") val pros = spark.read.format("csv").load("hdfs://192.168.56.137:9000/20200106/products.csv") val lookup = Map("_c0"->"ordid","_c1"->"orddate","_c2"->"custid","_c3"->"status") ords.select(ords.columns.map(c=>col(c).as(lookup.getOrElse(c,c))):_*).registerTempTable("ordstab") spark.sql("select dayofweek(orddate) as week,count(ordid) as countnum from ordstab group by week") +----+--------+ |week|countnum| +----+--------+ | 1| 9735| | 6| 10288| | 3| 9964| | 5| 9862| | 4| 9758| | 7| 9984| | 2| 9292| +----+--------+ val lookup = Map("_c0"->"custid","_c1"->"lname","_c2"->"fname","_c3"->"cardno","_c4"->"addr","_c5"->"area","_c6"->"city","_c7"->"language","_c8"->"score") custs.select(custs.columns.map(c=>col(c).as(lookup.getOrElse(c,c))):_*).registerTempTable("custstab") val lookup = Map("_c0"->"oiid","_c1"->"ordid","_c2"->"proid","_c3"->"buynum","_c4"->"sp","_c5"->"cp") items.select(items.columns.map(c=>col(c).as(lookup.getOrElse(c,c))):_*).registerTempTable("itemstab") spark.sql("select c.city ,sum(t.sp) as total_purchasses from itemstab as t inner join ordstab as o on o.ordid=t.ordid inner join custstab as c on c.custid=o.custid group by c.city order by total_purchasses desc")