详细解读Spark的数据分析引擎:Spark SQL

简介: 在这里你可以学到夜白(作者笔名)精心整理的笔记,让我们每天进步一点点,让优秀成为一种习惯!

详细解读Spark的数据分析引擎:Spark SQL


一、spark SQL:类似于Hive,是一种数据分析引擎


什么是spark SQL?


spark SQL只能处理结构化数据


底层依赖RDD,把sql语句转换成一个个RDD,运行在不同的worker上

特点:


1、容易集成:SQL语句


2、对不同的数据源提供统一的访问方式:DataFrame 用DataFrame屏蔽数据源的差别


3、兼容Hive


大纲:


详细解读Spark的数据分析引擎:Spark SQL


核心概念:DataFrame(看作表):就是表,是Spark SQL对结构化数据的抽象集合


表现形式:RDD


表=表结构+数据


DataFrame=schema+RDD


DataSet(新API接口 看作表)


如何创建DataFrame?


1、方式一:通过case class创建DataFrame


创建表结构


case class EMP(empno:Int,ename:String,job:String,mgr:String,hiredata:String,sal:Int,comm:String,deptno:Int)


导入emp.csv文件并指定分隔符


val lines = sc.textFile(“/root/temp/emp.csv”).map(_.split(“,”))
lines.collect


将表结构和数据关联起来


val allEmp = lines.map(x=>Emp(x(0).toInt,x(1),x(2),x(3),x(4),x(5).toInt,x(6),x(7).toInt))


创建DataFrame:


val empDF = allEmp.toDF


操作DataFrame:


empDF.show:展示DataFrame


empDF.printSchema:打印DataFrame的表结构


2、方式二:通过SparkSession.createDataFrame()创建DataFrame


什么是spark session?


从spark2.0以后提供了统一访问spark各个模块的对象:spark session


创建表结构:用StructType类


import org.apache.spark.sql
import org.apache.spark.sql.types._
val myschema = StructType(List(empno:Int,ename:String,job:String,mgr:String,hiredata:String,sal:Int,comm:String,deptno:Int))


导入emp.csv文件并指定分隔符


val lines = sc.textFile(“/root/temp/emp.csv”).map(_.split(“,”))


将表结构和数据关联起来,把读入的数据emp.csv映射成一行,这里没有带表结构


import.org.apache.spark.sql._
val rowRDD = lines.map(x=>Row(x(0).toInt,x(1),x(2),x(3),x(4),x(5).toInt,x(6),x(7).toInt))


通过SparkSession.createDataFrame()创建表


val df = spark.createDataFrame(rowRDD,myschema)


3、方式三:直接读取一个具有格式的数据文件作为DataFrame(json文件)


val peopleDF = spark.read.json(“/root/training/”)


4、操作DataFrame:DSL语句和SQL语句


DSL语句:


empDF.show
empDF.printSchema


查询所有员工的信息:df.show


查询所有员工的姓名:df.select(“ename”).show


或者df.select($”ename”).show


查询员工信息:姓名 薪水 薪水+100


df.select("ename","ename",”sal”,$”sal”+100).show


查询工资大于2000的员工


df.filter(“sal”>2000).show


分组:


df.groupBy(“deptno”).count.show


SQL语句:需要将DataFrame注册成一张临时视图


df.createOrReplaceTempView(“emp”)
spark.sql(“select * from emp”).show
spark.sql(“select * from emp where deptno=10”).show


5、临时视图:2种


1、只在当前会话中有效:临时视图


df.createOrReplaceTempView(“emp”)


2、在全局范围内都有效:全局临时视图 df.createGlobalTempView(“empG”)


例:在当前会话中


spark.sql(“select * from emp”).show
spark.sql(“select * from global_temp.empG”).show


例:在新的会话中


spark.newSession.sal(“select * from emp”).show
spark.newSession.sal(“select * from global_temp.empG”).show


详细解读Spark的数据分析引擎:Spark SQL


二、使用数据源:


1、load函数加载数据源和save函数保存数据源


load函数默认的数据源是parquet文件


json函数默认的数据源是json文件


val usersDF = spark.read.load(“/root/training/spakr-2.1.0-bin-hadoop2.7/examples/”)
usersDF.select(“name”,”favorite_color”).show
usersDF.select(“name”,”favorite_color”).write.save(“/root/temp/result”)


2、Parquet文件:是sparkSQL load函数默认加载的数据源,按列存储的文件


如何把其他文件格式转换成parquet文件?


例:json文件—->parquet文件


val empJSON = spark.read.json(“/root/temp/emp.json”) #直接读取一个具有格式的数据文件作为DataFrame


empJSON.write.parquet(“/root/temp/empparquet”) #/empparquet目录不能事先存在
或者empJSON.wirte.mode(“overwrite”).parquet(“/root/temp/result”) #/result目录可以事先存在


功能:支持Schema的合并


第一个文件:


val df1 = sc.makeRDD(1 to 5).map(i=>(i,i*2)).toDF(“single”,”double”)
df1.write.parquet(“/root/temp/test_table/key=1”)


第二个文件:


val df2 = sc.makeRD(6 to 10).map(i=>(i,i*3)).toDF(“single”,”triple”)
df2.write.parquet(“/root/temp/test_table/key=2”)


合并两个文件:


val df3 = spark.read.option(“mergeSchema”,”true”).parquet(“/root/temp/test_table”)


3、json文件:


spark.read.json(“/root/training/spark-2.1.0-bin-hadoop-2.7/examples/src/main/resources/people.json”)
spark.read.format(“json”).load(“/root/training/spark-2.1.0-bin-hadoop2.7/examples/src/main/resources/people.json”)


4、RDBMS:需要把RDBMS的驱动加入到spark shell中


spark.read.format(“jdbc”).option(“url”,”jdbc:oracle:thin:@192.168.182.11:1521/orcl.example.com”).option(“dbtable”,”scott.emp”).option(“user”,”scott”).option(“password”,”tiger”).load


或使用Properties类


import java.util.Properties
val prop = new Properties()
prop.setProperty(“user”,”scott”)
prop.setProperty(“password”,”tiger”)
val oracleDF1 = spark.read.jdbc(“jdbc:oracle:thin:@192.168.182.11:1521/orcl”)


相关文章
|
15天前
|
SQL JSON 分布式计算
【赵渝强老师】Spark SQL的数据模型:DataFrame
本文介绍了在Spark SQL中创建DataFrame的三种方法。首先,通过定义case class来创建表结构,然后将CSV文件读入RDD并关联Schema生成DataFrame。其次,使用StructType定义表结构,同样将CSV文件读入RDD并转换为Row对象后创建DataFrame。最后,直接加载带有格式的数据文件(如JSON),通过读取文件内容直接创建DataFrame。每种方法都包含详细的代码示例和解释。
|
17天前
|
SQL 数据挖掘 Python
数据分析编程:SQL,Python or SPL?
数据分析编程用什么,SQL、python or SPL?话不多说,直接上代码,对比明显,明眼人一看就明了:本案例涵盖五个数据分析任务:1) 计算用户会话次数;2) 球员连续得分分析;3) 连续三天活跃用户数统计;4) 新用户次日留存率计算;5) 股价涨跌幅分析。每个任务基于相应数据表进行处理和计算。
|
1月前
|
SQL 分布式计算 大数据
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
47 0
|
1月前
|
SQL 分布式计算 算法
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
81 0
|
1月前
|
SQL 分布式计算 Java
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
37 0
|
1月前
|
SQL 分布式计算 大数据
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
57 0
|
1月前
|
SQL 存储 分布式计算
大数据-93 Spark 集群 Spark SQL 概述 基本概念 SparkSQL对比 架构 抽象
大数据-93 Spark 集群 Spark SQL 概述 基本概念 SparkSQL对比 架构 抽象
41 0
|
3月前
|
前端开发 Java JSON
Struts 2携手AngularJS与React:探索企业级后端与现代前端框架的完美融合之道
【8月更文挑战第31天】随着Web应用复杂性的提升,前端技术日新月异。AngularJS和React作为主流前端框架,凭借强大的数据绑定和组件化能力,显著提升了开发动态及交互式Web应用的效率。同时,Struts 2 以其出色的性能和丰富的功能,成为众多Java开发者构建企业级应用的首选后端框架。本文探讨了如何将 Struts 2 与 AngularJS 和 React 整合,以充分发挥前后端各自优势,构建更强大、灵活的 Web 应用。
59 0
|
2月前
|
关系型数据库 MySQL 网络安全
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
|
4月前
|
SQL 存储 监控
SQL Server的并行实施如何优化?
【7月更文挑战第23天】SQL Server的并行实施如何优化?
115 13
下一篇
无影云桌面