详细解读Spark的数据分析引擎:Spark SQL

简介: 详细解读Spark的数据分析引擎:Spark SQL

一、spark SQL:类似于Hive,是一种数据分析引擎


什么是spark SQL?


spark SQL只能处理结构化数据


底层依赖RDD,把sql语句转换成一个个RDD,运行在不同的worker上


特点:


1、容易集成:SQL语句


2、对不同的数据源提供统一的访问方式:DataFrame 用DataFrame屏蔽数据源的差别


3、兼容Hive


大纲:


核心概念:DataFrame(看作表):就是表,是Spark SQL对结构化数据的抽象集合


表现形式:RDD


表=表结构+数据


DataFrame=schema+RDD


DataSet(新API接口 看作表)


如何创建DataFrame?


1、方式一:通过case class创建DataFrame


创建表结构


case class EMP(empno:Int,ename:String,job:String,mgr:String,hiredata:String,sal:Int,comm:String,deptno:Int)


导入emp.csv文件并指定分隔符


val lines = sc.textFile("/root/temp/emp.csv").map(_.split(","))
lines.collect


将表结构和数据关联起来


val allEmp = lines.map(x=>Emp(x(0).toInt,x(1),x(2),x(3),x(4),x(5).toInt,x(6),x(7).toInt))


创建DataFrame:


val empDF = allEmp.toDF


操作DataFrame:


empDF.show:展示DataFrame


empDF.printSchema:打印DataFrame的表结构


2、方式二:通过SparkSession.createDataFrame()创建DataFrame


什么是spark session?


从spark2.0以后提供了统一访问spark各个模块的对象:spark session


创建表结构:用StructType类


import org.apache.spark.sql
import org.apache.spark.sql.types._
val myschema = StructType(List(empno:Int,ename:String,job:String,mgr:String,hiredata:String,sal:Int,comm:String,deptno:Int))


导入emp.csv文件并指定分隔符


val lines = sc.textFile("/root/temp/emp.csv").map(_.split(","))


将表结构和数据关联起来,把读入的数据emp.csv映射成一行,这里没有带表结构


import.org.apache.spark.sql._
val rowRDD = lines.map(x=>Row(x(0).toInt,x(1),x(2),x(3),x(4),x(5).toInt,x(6),x(7).toInt))


通过SparkSession.createDataFrame()创建表


val df = spark.createDataFrame(rowRDD,myschema)


3、方式三:直接读取一个具有格式的数据文件作为DataFrame(json文件)


val peopleDF = spark.read.json("/root/training/")


4、操作DataFrame:DSL语句和SQL语句

DSL语句:empDF.show


empDF.printSchema


查询所有员工的信息:df.show


查询所有员工的姓名:df.select("ename").show


或者df.select($"ename").show


查询员工信息:姓名 薪水 薪水+100


df.select($"ename",$"sal",$"sal"+100).show


查询工资大于2000的员工


df.filter("sal">2000).show


分组:


df.groupBy("deptno").count.show


SQL语句:需要将DataFrame注册成一张临时视图


df.createOrReplaceTempView("emp")
spark.sql("select * from emp").show
spark.sql("select * from emp where deptno=10").show


5、临时视图:2种


1、只在当前会话中有效:临时视图 df.createOrReplaceTempView("emp")


2、在全局范围内都有效:全局临时视图 df.createGlobalTempView("empG")


例:在当前会话中


spark.sql("select * from emp").show
spark.sql("select * from global_temp.empG").show


例:在新的会话中


spark.newSession.sal("select * from emp").show
spark.newSession.sal("select * from global_temp.empG").show


二、使用数据源:


1、load函数加载数据源和save函数保存数据源


load函数默认的数据源是parquet文件


json函数默认的数据源是json文件


val usersDF = spark.read.load("/root/training/spakr-2.1.0-bin-hadoop2.7/examples/")
usersDF.select("name","favorite_color").show
usersDF.select("name","favorite_color").write.save("/root/temp/result")


2、Parquet文件:是sparkSQL load函数默认加载的数据源,按列存储的文件


如何把其他文件格式转换成parquet文件?


例:json文件---->parquet文件


val empJSON = spark.read.json("/root/temp/emp.json") 
#直接读取一个具有格式的数据文件作为DataFrame
empJSON.write.parquet("/root/temp/empparquet") 
#/empparquet目录不能事先存在
或者empJSON.wirte.mode("overwrite").parquet("/root/temp/result") #/result目录可以事先存在


功能:支持Schema的合并


第一个文件:val df1 = sc.makeRDD(1 to 5).map(i=>(i,i*2)).toDF("single","double")


df1.write.parquet("/root/temp/test_table/key=1")


第二个文件:val df2 = sc.makeRD(6 to 10).map(i=>(i,i*3)).toDF("single","triple")


df2.write.parquet("/root/temp/test_table/key=2")


合并两个文件:val df3 = spark.read.option("mergeSchema","true").parquet("/root/temp/test_table")


3、json文件:


spark.read.json("/root/training/spark-2.1.0-bin-hadoop-2.7/examples/src/main/resources/people.json")
spark.read.format("json").load("/root/training/spark-2.1.0-bin-hadoop2.7/examples/src/main/resources/people.json")


4、RDBMS:需要把RDBMS的驱动加入到spark shell中


spark.read.format("jdbc").option("url","jdbc:oracle:thin:@192.168.182.11:1521/orcl.example.com").option("dbtable","scott.emp").option("user","scott").option("password","tiger").load


或使用Properties类


import java.util.Properties
val prop = new Properties()
prop.setProperty("user","scott")
prop.setProperty("password","tiger")
val oracleDF1 = spark.read.jdbc("jdbc:oracle:thin:@192.168.182.11:1521/orcl")


作者:李金泽AllenLi,清华大学硕士研究生,研究方向:大数据和人工智能


相关文章
|
2月前
|
关系型数据库 MySQL 数据库
阿里云数据库RDS费用价格:MySQL、SQL Server、PostgreSQL和MariaDB引擎收费标准
阿里云RDS数据库支持MySQL、SQL Server、PostgreSQL、MariaDB,多种引擎优惠上线!MySQL倚天版88元/年,SQL Server 2核4G仅299元/年,PostgreSQL 227元/年起。高可用、可弹性伸缩,安全稳定。详情见官网活动页。
|
2月前
|
关系型数据库 分布式数据库 数据库
阿里云数据库收费价格:MySQL、PostgreSQL、SQL Server和MariaDB引擎费用整理
阿里云数据库提供多种类型,包括关系型与NoSQL,主流如PolarDB、RDS MySQL/PostgreSQL、Redis等。价格低至21元/月起,支持按需付费与优惠套餐,适用于各类应用场景。
|
2月前
|
关系型数据库 MySQL 数据库
阿里云数据库RDS支持MySQL、SQL Server、PostgreSQL和MariaDB引擎
阿里云数据库RDS支持MySQL、SQL Server、PostgreSQL和MariaDB引擎,提供高性价比、稳定安全的云数据库服务,适用于多种行业与业务场景。
|
6月前
|
SQL 自然语言处理 数据可视化
狂揽20.2k星!还在傻傻的写SQL吗,那你就完了!这款开源项目,让数据分析像聊天一样简单?再见吧SQL
PandasAI是由Sinaptik AI团队打造的开源项目,旨在通过自然语言处理技术简化数据分析流程。用户只需用自然语言提问,即可快速生成可视化图表和分析结果,大幅降低数据分析门槛。该项目支持多种数据源连接、智能图表生成、企业级安全防护等功能,适用于市场分析、财务管理、产品决策等多个场景。上线两年已获20.2k GitHub星标,采用MIT开源协议,项目地址为https://github.com/sinaptik-ai/pandas-ai。
348 5
|
11月前
|
SQL 存储 缓存
日志服务 SQL 引擎全新升级
SQL 作为 SLS 基础功能,每天承载了用户大量日志数据的分析请求,既有小数据量的快速查询(如告警、即席查询等);也有上万亿数据规模的报表级分析。SLS 作为 Serverless 服务,除了要满足不同用户的各类需求,还要兼顾性能、隔离性、稳定性等要求。过去一年多的时间,SLS SQL 团队做了大量的工作,对 SQL 引擎进行了全新升级,SQL 的执行性能、隔离性等方面都有了大幅的提升。
444 97
|
9月前
|
SQL 关系型数据库 分布式数据库
利用 PolarDB PG 版向量化引擎,加速复杂 SQL 查询!完成任务领发财新年抱枕!
利用 PolarDB PG 版向量化引擎,加速复杂 SQL 查询!完成任务领发财新年抱枕!
285 14
|
8月前
|
SQL 存储 自然语言处理
YashanDB SQL 引擎
YashanDB SQL 引擎
|
9月前
|
SQL 数据可视化 IDE
SQL做数据分析的困境,查询语言无法回答的真相
SQL 在简单数据分析任务中表现良好,但面对复杂需求时显得力不从心。例如,统计新用户第二天的留存率或连续活跃用户的计算,SQL 需要嵌套子查询和复杂关联,代码冗长难懂。Python 虽更灵活,但仍需变通思路,复杂度较高。相比之下,SPL(Structured Process Language)语法简洁、支持有序计算和分组子集保留,具备强大的交互性和调试功能,适合处理复杂的深度数据分析任务。SPL 已开源免费,是数据分析师的更好选择。
|
10月前
|
SQL 分布式计算 Java
Spark SQL向量化执行引擎框架Gluten-Velox在AArch64使能和优化
本文摘自 Arm China的工程师顾煜祺关于“在 Arm 平台上使用 Native 算子库加速 Spark”的分享,主要内容包括以下四个部分: 1.技术背景 2.算子库构成 3.算子操作优化 4.未来工作
1348 0