Spark SQL实战(07)-Data Sources

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: Spark SQL通过DataFrame接口支持对多种数据源进行操作。DataFrame可使用关系型变换进行操作,也可用于创建临时视图。将DataFrame注册为临时视图可以让你对其数据运行SQL查询。

1 概述


Spark SQL通过DataFrame接口支持对多种数据源进行操作。


DataFrame可使用关系型变换进行操作,也可用于创建临时视图。将DataFrame注册为临时视图可以让你对其数据运行SQL查询。


本节介绍使用Spark数据源加载和保存数据的一般方法,并进一步介绍可用于内置数据源的特定选项。


数据源关键操作:


load

save


2 大数据作业基本流程


input 业务逻辑 output

不管是使用MR/Hive/Spark/Flink/Storm。


Spark能处理多种数据源的数据,而且这些数据源可以是在不同地方:


file/HDFS/S3/OSS/COS/RDBMS

json/ORC/Parquet/JDBC

object DataSourceApp {

 def main(args: Array[String]): Unit = {

   val spark: SparkSession = SparkSession.builder()

     .master("local").getOrCreate()

 

   text(spark)

   // json(spark)

   // common(spark)

   // parquet(spark)

   // convert(spark)

   // jdbc(spark)

   jdbc2(spark)

   spark.stop()

 }

}




3 text数据源读写


读取文本文件的 API,SparkSession.read.text()


参数:


path:读取文本文件的路径。可以是单个文件、文件夹或者包含通配符的文件路径。

wholetext:如果为 True,则将整个文件读取为一条记录;否则将每行读取为一条记录。

lineSep:如果指定,则使用指定的字符串作为行分隔符。

pathGlobFilter:用于筛选文件的通配符模式。

recursiveFileLookup:是否递归查找子目录中的文件。

allowNonExistingFiles:是否允许读取不存在的文件。

allowEmptyFiles:是否允许读取空文件。

返回一个 DataFrame 对象,其中每行是文本文件中的一条记录。


def text(spark: SparkSession): Unit = {

 import spark.implicits._

 val textDF: DataFrame = spark.read.text(

   "/Users/javaedge/Downloads/sparksql-train/data/people.txt")

 val result: Dataset[(String, String)] = textDF.map(x => {

   val splits: Array[String] = x.getString(0).split(",")

   (splits(0).trim, splits(1).trim)

 })


编译无问题,运行时报错:


Exception in thread "main" org.apache.spark.sql.AnalysisException: Text data source supports only a single column, and you have 2 columns.;


思考下,如何使用text方式,输出多列的值?


修正后

val result: Dataset[String] = textDF.map(x => {

 val splits: Array[String] = x.getString(0).split(",")

 splits(0).trim

})

result.write.text("out")


继续报错:


Exception in thread "main" org.apache.spark.sql.AnalysisException: path file:/Users/javaedge/Downloads/sparksql-train/out already exists.;


回想Hadoop中MapReduce的输出:


第一次0K

第二次也会报错输出目录已存在

这关系到 Spark 中的 mode


SaveMode

Spark SQL中,使用DataFrame或Dataset的write方法将数据写入外部存储系统时,使用“SaveMode”参数指定如何处理已存在的数据。


SaveMode有四种取值:


SaveMode.ErrorIfExists:如果目标路径已经存在,则会引发异常

SaveMode.Append:将数据追加到现有数据

SaveMode.Overwrite:覆盖现有数据

SaveMode.Ignore:若目标路径已经存在,则不执行任何操作

所以,修正如下:


result.write.mode(SaveMode.overwrite).text("out")


4 JSON 数据源


// JSON

def json(spark: SparkSession): Unit = {

 import spark.implicits._

 val jsonDF: DataFrame = spark.read.json(

   "/Users/javaedge/Downloads/sparksql-train/data/people.json")

 jsonDF.show()


 // 只要age>20的数据

 jsonDF.filter("age > 20")

   .select("name")

   .write.mode(SaveMode.Overwrite).json("out")

 

output:

+----+-------+

| age|   name|

+----+-------+

|null|Michael|

|  30|   Andy|

|  19| Justin|

+----+-------+


嵌套 JSON


// 嵌套 JSON

val jsonDF2: DataFrame = spark.read.json(

 "/Users/javaedge/Downloads/sparksql-train/data/people2.json")

jsonDF2.show()

jsonDF2.select($"name",

 $"age",

 $"info.work".as("work"),

 $"info.home".as("home"))

 .write.mode("overwrite")

 .json("out")


output:

+---+-------------------+----+

|age|               info|name|

+---+-------------------+----+

| 30|[shenzhen, beijing]|  PK|

+---+-------------------+----+



5 标准写法


// 标准API写法

private def common(spark: SparkSession): Unit = {

 import spark.implicits._

 val textDF: DataFrame = spark.read.format("text").load(

   "/Users/javaedge/Downloads/sparksql-train/data/people.txt")

 val jsonDF: DataFrame = spark.read.format("json").load(

   "/Users/javaedge/Downloads/sparksql-train/data/people.json")

 textDF.show()

 println("~~~~~~~~")

 jsonDF.show()

 jsonDF.write.format("json").mode("overwrite").save("out")

}

output:

+-----------+

|      value|

+-----------+

|Michael, 29|

|   Andy, 30|

| Justin, 19|

+-----------+

~~~~~~~~

+----+-------+

| age|   name|

+----+-------+

|null|Michael|

|  30|   Andy|

|  19| Justin|

+----+-------+



6 Parquet数据源


6.1 简介

一种列式存储格式,在大数据环境中高效地存储和处理数据。由Hadoop生态系统中的Apache Parquet项目开发的。


6.2 设计目标

支持高效的列式存储和压缩,并提供高性能的读/写能力,以便处理大规模结构化数据。


Parquet可以与许多不同的计算框架一起使用,如Apache Hadoop、Apache Spark、Apache Hive等,因此广泛用于各种大数据应用程序中。


6.3 优点

高性能、节省存储空间、支持多种编程语言和数据类型、易于集成和扩展等。


private def parquet(spark: SparkSession): Unit = {

 import spark.implicits._

 val parquetDF: DataFrame = spark.read.parquet(

   "/Users/javaedge/Downloads/sparksql-train/data/users.parquet")

 parquetDF.printSchema()

 parquetDF.show()

 parquetDF.select("name", "favorite_numbers")

   .write.mode("overwrite")

   .option("compression", "none")

   .parquet("out")

 

output:

root

|-- name: string (nullable = true)

|-- favorite_color: string (nullable = true)

|-- favorite_numbers: array (nullable = true)

|    |-- element: integer (containsNull = true)

+------+--------------+----------------+

|  name|favorite_color|favorite_numbers|

+------+--------------+----------------+

|Alyssa|          null|  [3, 9, 15, 20]|

|   Ben|           red|              []|

+------+--------------+----------------+



7convert

方便从一种数据源写到另一种数据源。


存储类型转换:JSON==>Parquet


def convert(spark: SparkSession): Unit = {

 import spark.implicits._

 val jsonDF: DataFrame = spark.read.format("json")

   .load("/Users/javaedge/Downloads/sparksql-train/data/people.json")

 jsonDF.show()

 jsonDF.filter("age>20")

   .write.format("parquet").mode(SaveMode.Overwrite).save("out")

9.png



8 JDBC


有些数据是在MySQL,使用Spark处理,肯定要通过Spark读出MySQL的数据。

数据源是text/json,通过Spark处理完后,要将统计结果写入MySQL。


查 DB

写法一

def jdbc(spark: SparkSession): Unit = {

 import spark.implicits._

 val jdbcDF = spark.read

   .format("jdbc")

   .option("url", "jdbc:mysql://localhost:3306")

   .option("dbtable", "smartrm_monolith.order")

   .option("user", "root")

   .option("password", "root")

   .load()

 jdbcDF.filter($"order_id" > 150).show(100)

}


8.png


写法二

val connectionProperties = new Properties()

connectionProperties.put("user", "root")

connectionProperties.put("password", "root")

val jdbcDF2: DataFrame = spark.read

 .jdbc(url, srcTable, connectionProperties)

jdbcDF2.filter($"order_id" > 100)


写 DB

val connProps = new Properties()

connProps.put("user", "root")

connProps.put("password", "root")

val jdbcDF: DataFrame = spark.read.jdbc(url, srcTable, connProps)

jdbcDF.filter($"order_id" > 100)

 .write.jdbc(url, "smartrm_monolith.order_bak", connProps)


若 目标表不存在,会自动帮你创建:

7.png



统一配置管理

如何将那么多数据源配置参数统一管理呢?


先引入依赖:


<dependency>

   <groupId>com.typesafe</groupId>

   <artifactId>config</artifactId>

   <version>1.3.3</version>

</dependency>


配置文件:


6.png


读配置的程序:


package com.javaedge.bigdata.chapter05

import com.typesafe.config.{Config, ConfigFactory}

object ParamsApp {

 def main(args: Array[String]): Unit = {

   val config: Config = ConfigFactory.load()

   val url: String = config.getString("db.default.url")

   println(url)

 }

}

private def jdbcConfig(spark: SparkSession): Unit = {

 import spark.implicits._

 val config = ConfigFactory.load()

 val url = config.getString("db.default.url")

 val user = config.getString("db.default.user")

 val password = config.getString("db.default.password")

 val driver = config.getString("db.default.driver")

 val database = config.getString("db.default.database")

 val table = config.getString("db.default.table")

 val sinkTable = config.getString("db.default.sink.table")

 val connectionProperties = new Properties()

 connectionProperties.put("user", user)

 connectionProperties.put("password", password)

 val jdbcDF: DataFrame = spark.read.jdbc(url, s"$database.$table", connectionProperties)

 jdbcDF.filter($"order_id" > 100).show()



写到新表:


jdbcDF.filter($"order_id" > 158)

.write.jdbc(url, s"$database.$sinkTable", connectionProperties)


5.png

5.png

相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
2月前
|
SQL 数据库 开发者
MSSQL性能调优实战技巧:索引优化、SQL语句微调与并发控制策略
在Microsoft SQL Server(MSSQL)的管理与优化中,性能调优是一项复杂但至关重要的任务
|
2月前
|
SQL 监控 数据库
MSSQL性能调优实战技巧:索引优化策略、SQL查询重构与并发控制详解
在Microsoft SQL Server(MSSQL)的管理与优化过程中,性能调优是确保数据库高效运行的关键环节
|
11天前
|
SQL 存储 数据处理
"SQL触发器实战大揭秘:一键解锁数据自动化校验与更新魔法,让数据库管理从此告别繁琐,精准高效不再是梦!"
【8月更文挑战第31天】在数据库管理中,确保数据准确性和一致性至关重要。SQL触发器能自动执行数据校验与更新,显著提升工作效率。本文通过一个员工信息表的例子,详细介绍了如何利用触发器自动设定和校验薪资,确保其符合业务规则。提供的示例代码展示了在插入新记录时如何自动检查并调整薪资,以满足最低标准。这不仅减轻了数据库管理员的负担,还提高了数据处理的准确性和效率。触发器虽强大,但也需谨慎使用,以避免复杂性和性能问题。
22 1
|
2月前
|
SQL 安全 数据库
Python Web开发者必学:SQL注入、XSS、CSRF攻击与防御实战演练!
【7月更文挑战第26天】在 Python Web 开发中, 安全性至关重要。本文聚焦 SQL 注入、XSS 和 CSRF 这三大安全威胁,提供实战防御策略。SQL 注入可通过参数化查询和 ORM 框架来防范;XSS 则需 HTML 转义用户输入与实施 CSP;CSRF 防御依赖 CSRF 令牌和双重提交 Cookie。掌握这些技巧,能有效加固 Web 应用的安全防线。安全是持续的过程,需贯穿开发始终。
64 1
Python Web开发者必学:SQL注入、XSS、CSRF攻击与防御实战演练!
|
29天前
|
SQL 存储 分布式计算
|
8天前
|
SQL 安全 数据库
基于SQL Server事务日志的数据库恢复技术及实战代码详解
基于事务日志的数据库恢复技术是SQL Server中一个非常强大的功能,它能够帮助数据库管理员在数据丢失或损坏的情况下,有效地恢复数据。通过定期备份数据库和事务日志,并在需要时按照正确的步骤恢复,可以最大限度地减少数据丢失的风险。需要注意的是,恢复数据是一个需要谨慎操作的过程,建议在执行恢复操作之前,详细了解相关的操作步骤和注意事项,以确保数据的安全和完整。
18 0
|
11天前
|
测试技术 Java
全面保障Struts 2应用质量:掌握单元测试与集成测试的关键策略
【8月更文挑战第31天】Struts 2 的测试策略结合了单元测试与集成测试。单元测试聚焦于单个组件(如 Action 类)的功能验证,常用 Mockito 模拟依赖项;集成测试则关注组件间的交互,利用 Cactus 等框架确保框架拦截器和 Action 映射等按预期工作。通过确保高测试覆盖率并定期更新测试用例,可以提升应用的整体稳定性和质量。
22 0
|
11天前
|
数据库 Java 监控
Struts 2 日志管理化身神秘魔法师,洞察应用运行乾坤,演绎奇幻篇章!
【8月更文挑战第31天】在软件开发中,了解应用运行状况至关重要。日志管理作为 Struts 2 应用的关键组件,记录着每个动作和决策,如同监控摄像头,帮助我们迅速定位问题、分析性能和使用情况,为优化提供依据。Struts 2 支持多种日志框架(如 Log4j、Logback),便于配置日志级别、格式和输出位置。通过在 Action 类中添加日志记录,我们能在开发过程中获取详细信息,及时发现并解决问题。合理配置日志不仅有助于调试,还能分析用户行为,提升应用性能和稳定性。
29 0
|
11天前
|
前端开发 Java JSON
Struts 2携手AngularJS与React:探索企业级后端与现代前端框架的完美融合之道
【8月更文挑战第31天】随着Web应用复杂性的提升,前端技术日新月异。AngularJS和React作为主流前端框架,凭借强大的数据绑定和组件化能力,显著提升了开发动态及交互式Web应用的效率。同时,Struts 2 以其出色的性能和丰富的功能,成为众多Java开发者构建企业级应用的首选后端框架。本文探讨了如何将 Struts 2 与 AngularJS 和 React 整合,以充分发挥前后端各自优势,构建更强大、灵活的 Web 应用。
23 0
|
11天前
|
SQL 数据采集 算法
【电商数据分析利器】SQL实战项目大揭秘:手把手教你构建用户行为分析系统,从数据建模到精准营销的全方位指南!
【8月更文挑战第31天】随着电商行业的快速发展,用户行为分析的重要性日益凸显。本实战项目将指导你使用 SQL 构建电商平台用户行为分析系统,涵盖数据建模、采集、处理与分析等环节。文章详细介绍了数据库设计、测试数据插入及多种行为分析方法,如购买频次统计、商品销售排名、用户活跃时间段分析和留存率计算,帮助电商企业深入了解用户行为并优化业务策略。通过这些步骤,你将掌握利用 SQL 进行大数据分析的关键技术。
25 0