一、SparkSQL简介
1.前身:Hive中SQL
Hive是将Hive SQL转换成MapReduce然后提交到集群上执行,大大简化了编写MapReduce的程序的复杂性
Hive中SQL查询转化为MapReduce作业的过程
由于MapReduce这种计算模型执行效率比较慢。所有Spark SQL的应运而生,它是将Spark SQL转换成RDD,然后提交到集群执行,执行效率非常快。
2.架构
Hive是基于Hadoop的数据仓库工具。
Spark通过Shark把SQL转化为Spark作业。
Shark导致两个问题:
1.执行计划优化完全依赖于Hive,不方便添加新的优化策略;
2.Spark兼容Hive,存在线程安全问题。
3.数据抽象
Spark Core(RDD,弹性分布式数据)
Spark SQL(DataFrame,带有Schema信息的RDD)
4.优点
融合关系数据库结构化数据管理能力、机器学习算法的数据处理能力。
能够融合非结构化数据分析。
非结构化数据通过Spark SQL构建DataFrame;
半结构化文本类型数据,进行解析,构建关系型表格DataFrame。
二、DataFrame概述
1.简介
带有Scheme信息的RDD。
DataFrame与RDD的区别
杂乱 -> 有序
DataFrame里面存放的结构化数据的描述信息,DataFrame要有表头(表的描述信息),描述了有多少列,每一列数叫什么名字、什么类型、能不能为空?
DataFrame是特殊的RDD(RDD+Schema信息就变成了DataFrame)
2.创建
构建SparkSession类型对象
scala> import org.apache.spark.sql.SparkSession
生成SparkSession的对象名称为Spark
scala> val spark = SparkSession.builder().getOrCreate()
导入隐式转换的包
import spark.implicits.
3.保存
从示例文件people.json中创建DataFrame,保存成csv格式文件。
4.操作
//打印DataFrame的Schema信息 personDF.printSchema //查询所有的name和age,并将age+1 personDF.select(col("id"), col("name"), col("age") + 1).show //过滤age大于等于18的 personDF.filter(col("age") >= 18).show
//按年龄进行分组并统计相同年龄的人数 personDF.groupBy("age").count().show() //排序 df.sort(df("age").desc).show()
//多列不同排序规则 df.sort(df("age").desc, df("name").asc).show() //重命名 df.select(df("name").as("username"),df("age")).show()
三、RDD转换到DataFrame
1.利用反射机制推断RDD模式
1.创建SparkContext
2.创建SQLContext
3.创建RDD
4.创建一个类,并定义类的成员变量
5.整理数据并关联class
6.将RDD转换成DataFrame(导入隐式转换)
7.将DataFrame注册成临时表
8.书写SQL(Transformation)
9.执行Action
//创建SQLContext val sqlContext = new SQLContext(sc) //从指定的地址创建RDD val lineRDD = sc.textFile(args(0)).map(_.split(" "))
把RDD转换到DataFrame
//创建case class case class Person(id: Int, name: String, age: Int) //将RDD和case class关联 val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt)).toDF()
进行查询
//注册表 personDF.createOrReplaceTempView("t_person") //传入SQL val df = sqlContext.sql("select * from t_person order by age desc limit 2")
打印
2.编程定义RDD模式
加载进来DataFrame进行SQL查询
1.创建SparkContext
2.创建SQLContext
3.创建RDD
4.创建StructType(schema)
5.整理数据将数据跟Row关联
6.通过rowRDD和schema创建DataFrame
7.将DataFrame注册成临时表
8.书写SQL(Transformation)
9.执行Action
//通过StructType直接指定每个字段的schema val schema = StructType( List( StructField("id", IntegerType, true), StructField("name", StringType, true), StructField("age", IntegerType, true) ) )
//将RDD映射到rowRDD val rowRDD = personRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).toInt))
拼装起来
//将schema信息应用到rowRDD上 val personDataFrame = sqlContext.createDataFrame(rowRDD, schema) //注册表 personDF.createOrReplaceTempView("t_person") //执行SQL val df = sqlContext.sql("select * from t_person order by age desc limit 4")