Spark5:SparkSQL

简介: Spark5:SparkSQL

一、SparkSQL简介

1.前身:Hive中SQL

Hive是将Hive SQL转换成MapReduce然后提交到集群上执行,大大简化了编写MapReduce的程序的复杂性

Hive中SQL查询转化为MapReduce作业的过程

由于MapReduce这种计算模型执行效率比较慢。所有Spark SQL的应运而生,它是将Spark SQL转换成RDD,然后提交到集群执行,执行效率非常快。

2.架构

Hive是基于Hadoop的数据仓库工具。

Spark通过Shark把SQL转化为Spark作业。

Shark导致两个问题:

1.执行计划优化完全依赖于Hive,不方便添加新的优化策略;

2.Spark兼容Hive,存在线程安全问题。

3.数据抽象

Spark Core(RDD,弹性分布式数据)

Spark SQL(DataFrame,带有Schema信息的RDD)

4.优点

融合关系数据库结构化数据管理能力、机器学习算法的数据处理能力。

能够融合非结构化数据分析。

非结构化数据通过Spark SQL构建DataFrame;

半结构化文本类型数据,进行解析,构建关系型表格DataFrame。

二、DataFrame概述

1.简介

带有Scheme信息的RDD。

DataFrame与RDD的区别

杂乱 -> 有序

DataFrame里面存放的结构化数据的描述信息,DataFrame要有表头(表的描述信息),描述了有多少列,每一列数叫什么名字、什么类型、能不能为空?

DataFrame是特殊的RDD(RDD+Schema信息就变成了DataFrame)

2.创建

构建SparkSession类型对象

scala> import org.apache.spark.sql.SparkSession

生成SparkSession的对象名称为Spark

scala> val spark = SparkSession.builder().getOrCreate()

导入隐式转换的包

import spark.implicits.

3.保存

从示例文件people.json中创建DataFrame,保存成csv格式文件。

4.操作

//打印DataFrame的Schema信息
personDF.printSchema
//查询所有的name和age,并将age+1
personDF.select(col("id"), col("name"), col("age") + 1).show
//过滤age大于等于18的
personDF.filter(col("age") >= 18).show

//按年龄进行分组并统计相同年龄的人数
personDF.groupBy("age").count().show()
//排序
df.sort(df("age").desc).show()

//多列不同排序规则
df.sort(df("age").desc, df("name").asc).show()
//重命名
df.select(df("name").as("username"),df("age")).show()

三、RDD转换到DataFrame

1.利用反射机制推断RDD模式

1.创建SparkContext

2.创建SQLContext

3.创建RDD

4.创建一个类,并定义类的成员变量

5.整理数据并关联class

6.将RDD转换成DataFrame(导入隐式转换)

7.将DataFrame注册成临时表

8.书写SQL(Transformation)

9.执行Action

//创建SQLContext
val sqlContext = new SQLContext(sc)
//从指定的地址创建RDD
val lineRDD = sc.textFile(args(0)).map(_.split(" "))

把RDD转换到DataFrame

//创建case class
case class Person(id: Int, name: String, age: Int)
//将RDD和case class关联
val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt)).toDF()

进行查询

//注册表
personDF.createOrReplaceTempView("t_person")
//传入SQL
val df = sqlContext.sql("select * from t_person order by age desc limit 2")

打印

2.编程定义RDD模式

加载进来DataFrame进行SQL查询

1.创建SparkContext

2.创建SQLContext

3.创建RDD

4.创建StructType(schema)

5.整理数据将数据跟Row关联

6.通过rowRDD和schema创建DataFrame

7.将DataFrame注册成临时表

8.书写SQL(Transformation)

9.执行Action

//通过StructType直接指定每个字段的schema
val schema = StructType(
  List(
    StructField("id", IntegerType, true),
    StructField("name", StringType, true),
    StructField("age", IntegerType, true)
  )
)

//将RDD映射到rowRDD
val rowRDD = personRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).toInt))

拼装起来

//将schema信息应用到rowRDD上
val personDataFrame = sqlContext.createDataFrame(rowRDD, schema)
//注册表
personDF.createOrReplaceTempView("t_person")
//执行SQL
val df = sqlContext.sql("select * from t_person order by age desc limit 4")

目录
相关文章
|
4月前
|
设计模式 SQL 分布式计算
Spark Day06:Spark Core之Spark 内核调度和SparkSQL快速入门
Spark Day06:Spark Core之Spark 内核调度和SparkSQL快速入门
43 0
|
2月前
|
SQL 分布式计算 API
Spark学习------SparkSQL(概述、编程、数据的加载和保存)
Spark学习------SparkSQL(概述、编程、数据的加载和保存)
57 2
|
1月前
|
SQL 分布式计算 Java
Spark学习---SparkSQL(概述、编程、数据的加载和保存、自定义UDFA、项目实战)
Spark学习---SparkSQL(概述、编程、数据的加载和保存、自定义UDFA、项目实战)
106 1
|
4月前
【spark2.x】如何通过SparkSQL读取csv文件
【spark2.x】如何通过SparkSQL读取csv文件
90 0
|
8月前
|
SQL 存储 JSON
Spark学习---5、SparkSQL(概述、编程、数据的加载和保存)(二)
Spark学习---5、SparkSQL(概述、编程、数据的加载和保存)(二)
|
8月前
|
SQL 缓存 分布式计算
Spark学习---5、SparkSQL(概述、编程、数据的加载和保存)(一)
Spark学习---5、SparkSQL(概述、编程、数据的加载和保存)(一)
|
SQL JSON 分布式计算
Spark 操作 kudu--dataFrame , sparkSQL 操作 | 学习笔记
快速学习 Spark 操作 kudu--dataFrame , sparkSQL 操作
302 0
Spark 操作 kudu--dataFrame , sparkSQL 操作 | 学习笔记
|
SQL 分布式计算 NoSQL
Spark从入门到入土(五):SparkSQL原理与实战
Spark从入门到入土(五):SparkSQL原理与实战
Spark从入门到入土(五):SparkSQL原理与实战
|
SQL 分布式计算 Spark
【Spark】(task3)SparkSQL基础
一、使用Spark SQL完成任务1里面的数据筛选 先是创建dataframe数据:
151 0
|
分布式计算 资源调度 DataWorks
MaxComputeSpark Spark 与 SparkSQL 对比分析及使用注意事项 | 学习笔记
快速学习 MaxComputeSpark Spark 与 SparkSQL 对比分析及使用注意事项
229 0