Spark SQL实战(07)-Data Sources

本文涉及的产品
RDS Agent(兼容OpenClaw),2核4GB
RDS DuckDB + QuickBI 企业套餐,8核32GB + QuickBI 专业版
RDS MySQL DuckDB 分析主实例,集群系列 4核8GB
简介: Spark SQL通过DataFrame接口支持对多种数据源进行操作。DataFrame可使用关系型变换进行操作,也可用于创建临时视图。将DataFrame注册为临时视图可以让你对其数据运行SQL查询。

1 概述


Spark SQL通过DataFrame接口支持对多种数据源进行操作。


DataFrame可使用关系型变换进行操作,也可用于创建临时视图。将DataFrame注册为临时视图可以让你对其数据运行SQL查询。


本节介绍使用Spark数据源加载和保存数据的一般方法,并进一步介绍可用于内置数据源的特定选项。


数据源关键操作:


load

save


2 大数据作业基本流程


input 业务逻辑 output

不管是使用MR/Hive/Spark/Flink/Storm。


Spark能处理多种数据源的数据,而且这些数据源可以是在不同地方:


file/HDFS/S3/OSS/COS/RDBMS

json/ORC/Parquet/JDBC

object DataSourceApp {

 def main(args: Array[String]): Unit = {

   val spark: SparkSession = SparkSession.builder()

     .master("local").getOrCreate()

 

   text(spark)

   // json(spark)

   // common(spark)

   // parquet(spark)

   // convert(spark)

   // jdbc(spark)

   jdbc2(spark)

   spark.stop()

 }

}




3 text数据源读写


读取文本文件的 API,SparkSession.read.text()


参数:


path:读取文本文件的路径。可以是单个文件、文件夹或者包含通配符的文件路径。

wholetext:如果为 True,则将整个文件读取为一条记录;否则将每行读取为一条记录。

lineSep:如果指定,则使用指定的字符串作为行分隔符。

pathGlobFilter:用于筛选文件的通配符模式。

recursiveFileLookup:是否递归查找子目录中的文件。

allowNonExistingFiles:是否允许读取不存在的文件。

allowEmptyFiles:是否允许读取空文件。

返回一个 DataFrame 对象,其中每行是文本文件中的一条记录。


def text(spark: SparkSession): Unit = {

 import spark.implicits._

 val textDF: DataFrame = spark.read.text(

   "/Users/javaedge/Downloads/sparksql-train/data/people.txt")

 val result: Dataset[(String, String)] = textDF.map(x => {

   val splits: Array[String] = x.getString(0).split(",")

   (splits(0).trim, splits(1).trim)

 })


编译无问题,运行时报错:


Exception in thread "main" org.apache.spark.sql.AnalysisException: Text data source supports only a single column, and you have 2 columns.;


思考下,如何使用text方式,输出多列的值?


修正后

val result: Dataset[String] = textDF.map(x => {

 val splits: Array[String] = x.getString(0).split(",")

 splits(0).trim

})

result.write.text("out")


继续报错:


Exception in thread "main" org.apache.spark.sql.AnalysisException: path file:/Users/javaedge/Downloads/sparksql-train/out already exists.;


回想Hadoop中MapReduce的输出:


第一次0K

第二次也会报错输出目录已存在

这关系到 Spark 中的 mode


SaveMode

Spark SQL中,使用DataFrame或Dataset的write方法将数据写入外部存储系统时,使用“SaveMode”参数指定如何处理已存在的数据。


SaveMode有四种取值:


SaveMode.ErrorIfExists:如果目标路径已经存在,则会引发异常

SaveMode.Append:将数据追加到现有数据

SaveMode.Overwrite:覆盖现有数据

SaveMode.Ignore:若目标路径已经存在,则不执行任何操作

所以,修正如下:


result.write.mode(SaveMode.overwrite).text("out")


4 JSON 数据源


// JSON

def json(spark: SparkSession): Unit = {

 import spark.implicits._

 val jsonDF: DataFrame = spark.read.json(

   "/Users/javaedge/Downloads/sparksql-train/data/people.json")

 jsonDF.show()


 // 只要age>20的数据

 jsonDF.filter("age > 20")

   .select("name")

   .write.mode(SaveMode.Overwrite).json("out")

 

output:

+----+-------+

| age|   name|

+----+-------+

|null|Michael|

|  30|   Andy|

|  19| Justin|

+----+-------+


嵌套 JSON


// 嵌套 JSON

val jsonDF2: DataFrame = spark.read.json(

 "/Users/javaedge/Downloads/sparksql-train/data/people2.json")

jsonDF2.show()

jsonDF2.select($"name",

 $"age",

 $"info.work".as("work"),

 $"info.home".as("home"))

 .write.mode("overwrite")

 .json("out")


output:

+---+-------------------+----+

|age|               info|name|

+---+-------------------+----+

| 30|[shenzhen, beijing]|  PK|

+---+-------------------+----+



5 标准写法


// 标准API写法

private def common(spark: SparkSession): Unit = {

 import spark.implicits._

 val textDF: DataFrame = spark.read.format("text").load(

   "/Users/javaedge/Downloads/sparksql-train/data/people.txt")

 val jsonDF: DataFrame = spark.read.format("json").load(

   "/Users/javaedge/Downloads/sparksql-train/data/people.json")

 textDF.show()

 println("~~~~~~~~")

 jsonDF.show()

 jsonDF.write.format("json").mode("overwrite").save("out")

}

output:

+-----------+

|      value|

+-----------+

|Michael, 29|

|   Andy, 30|

| Justin, 19|

+-----------+

~~~~~~~~

+----+-------+

| age|   name|

+----+-------+

|null|Michael|

|  30|   Andy|

|  19| Justin|

+----+-------+



6 Parquet数据源


6.1 简介

一种列式存储格式,在大数据环境中高效地存储和处理数据。由Hadoop生态系统中的Apache Parquet项目开发的。


6.2 设计目标

支持高效的列式存储和压缩,并提供高性能的读/写能力,以便处理大规模结构化数据。


Parquet可以与许多不同的计算框架一起使用,如Apache Hadoop、Apache Spark、Apache Hive等,因此广泛用于各种大数据应用程序中。


6.3 优点

高性能、节省存储空间、支持多种编程语言和数据类型、易于集成和扩展等。


private def parquet(spark: SparkSession): Unit = {

 import spark.implicits._

 val parquetDF: DataFrame = spark.read.parquet(

   "/Users/javaedge/Downloads/sparksql-train/data/users.parquet")

 parquetDF.printSchema()

 parquetDF.show()

 parquetDF.select("name", "favorite_numbers")

   .write.mode("overwrite")

   .option("compression", "none")

   .parquet("out")

 

output:

root

|-- name: string (nullable = true)

|-- favorite_color: string (nullable = true)

|-- favorite_numbers: array (nullable = true)

|    |-- element: integer (containsNull = true)

+------+--------------+----------------+

|  name|favorite_color|favorite_numbers|

+------+--------------+----------------+

|Alyssa|          null|  [3, 9, 15, 20]|

|   Ben|           red|              []|

+------+--------------+----------------+



7convert

方便从一种数据源写到另一种数据源。


存储类型转换:JSON==>Parquet


def convert(spark: SparkSession): Unit = {

 import spark.implicits._

 val jsonDF: DataFrame = spark.read.format("json")

   .load("/Users/javaedge/Downloads/sparksql-train/data/people.json")

 jsonDF.show()

 jsonDF.filter("age>20")

   .write.format("parquet").mode(SaveMode.Overwrite).save("out")

9.png



8 JDBC


有些数据是在MySQL,使用Spark处理,肯定要通过Spark读出MySQL的数据。

数据源是text/json,通过Spark处理完后,要将统计结果写入MySQL。


查 DB

写法一

def jdbc(spark: SparkSession): Unit = {

 import spark.implicits._

 val jdbcDF = spark.read

   .format("jdbc")

   .option("url", "jdbc:mysql://localhost:3306")

   .option("dbtable", "smartrm_monolith.order")

   .option("user", "root")

   .option("password", "root")

   .load()

 jdbcDF.filter($"order_id" > 150).show(100)

}


8.png


写法二

val connectionProperties = new Properties()

connectionProperties.put("user", "root")

connectionProperties.put("password", "root")

val jdbcDF2: DataFrame = spark.read

 .jdbc(url, srcTable, connectionProperties)

jdbcDF2.filter($"order_id" > 100)


写 DB

val connProps = new Properties()

connProps.put("user", "root")

connProps.put("password", "root")

val jdbcDF: DataFrame = spark.read.jdbc(url, srcTable, connProps)

jdbcDF.filter($"order_id" > 100)

 .write.jdbc(url, "smartrm_monolith.order_bak", connProps)


若 目标表不存在,会自动帮你创建:

7.png



统一配置管理

如何将那么多数据源配置参数统一管理呢?


先引入依赖:


<dependency>

   <groupId>com.typesafe</groupId>

   <artifactId>config</artifactId>

   <version>1.3.3</version>

</dependency>


配置文件:


6.png


读配置的程序:


package com.javaedge.bigdata.chapter05

import com.typesafe.config.{Config, ConfigFactory}

object ParamsApp {

 def main(args: Array[String]): Unit = {

   val config: Config = ConfigFactory.load()

   val url: String = config.getString("db.default.url")

   println(url)

 }

}

private def jdbcConfig(spark: SparkSession): Unit = {

 import spark.implicits._

 val config = ConfigFactory.load()

 val url = config.getString("db.default.url")

 val user = config.getString("db.default.user")

 val password = config.getString("db.default.password")

 val driver = config.getString("db.default.driver")

 val database = config.getString("db.default.database")

 val table = config.getString("db.default.table")

 val sinkTable = config.getString("db.default.sink.table")

 val connectionProperties = new Properties()

 connectionProperties.put("user", user)

 connectionProperties.put("password", password)

 val jdbcDF: DataFrame = spark.read.jdbc(url, s"$database.$table", connectionProperties)

 jdbcDF.filter($"order_id" > 100).show()



写到新表:


jdbcDF.filter($"order_id" > 158)

.write.jdbc(url, s"$database.$sinkTable", connectionProperties)


5.png

5.png

相关实践学习
每个IT人都想学的“Web应用上云经典架构”实战
本实验从Web应用上云这个最基本的、最普遍的需求出发,帮助IT从业者们通过“阿里云Web应用上云解决方案”,了解一个企业级Web应用上云的常见架构,了解如何构建一个高可用、可扩展的企业级应用架构。
MySQL数据库入门学习
本课程通过最流行的开源数据库MySQL带你了解数据库的世界。 &nbsp; 相关的阿里云产品:云数据库RDS MySQL 版 阿里云关系型数据库RDS(Relational Database Service)是一种稳定可靠、可弹性伸缩的在线数据库服务,提供容灾、备份、恢复、迁移等方面的全套解决方案,彻底解决数据库运维的烦恼。 了解产品详情:&nbsp;https://www.aliyun.com/product/rds/mysql&nbsp;
目录
相关文章
|
11月前
|
人工智能 分布式计算 大数据
大数据≠大样本:基于Spark的特征降维实战(提升10倍训练效率)
本文探讨了大数据场景下降维的核心问题与解决方案,重点分析了“维度灾难”对模型性能的影响及特征冗余的陷阱。通过数学证明与实际案例,揭示高维空间中样本稀疏性问题,并提出基于Spark的分布式降维技术选型与优化策略。文章详细展示了PCA在亿级用户画像中的应用,包括数据准备、核心实现与效果评估,同时深入探讨了协方差矩阵计算与特征值分解的并行优化方法。此外,还介绍了动态维度调整、非线性特征处理及降维与其他AI技术的协同效应,为生产环境提供了最佳实践指南。最终总结出降维的本质与工程实践原则,展望未来发展方向。
631 0
|
SQL 运维 监控
SQL查询太慢?实战讲解YashanDB SQL调优思路
本文是Meetup第十期“调优实战专场”的第二篇技术文章,上一篇《高效查询秘诀,解码YashanDB优化器分组查询优化手段》中,我们揭秘了YashanDB分组查询优化秘诀,本文将通过一个案例,助你快速上手YashanDB慢日志功能,精准定位“慢SQL”后进行优化。
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
1205 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
10月前
|
SQL JSON 分布式计算
Spark SQL架构及高级用法
Spark SQL基于Catalyst优化器与Tungsten引擎,提供高效的数据处理能力。其架构涵盖SQL解析、逻辑计划优化、物理计划生成及分布式执行,支持复杂数据类型、窗口函数与多样化聚合操作,结合自适应查询与代码生成技术,实现高性能大数据分析。
744 2
|
SQL 分布式计算 资源调度
Dataphin功能Tips系列(48)-如何根据Hive SQL/Spark SQL的任务优先级指定YARN资源队列
如何根据Hive SQL/Spark SQL的任务优先级指定YARN资源队列
494 4
|
SQL 数据库 UED
SQL性能提升秘籍:5步优化法与10个实战案例
在数据库管理和应用开发中,SQL查询的性能优化至关重要。高效的SQL查询不仅可以提高应用的响应速度,还能降低服务器负载,提升用户体验。本文将分享SQL优化的五大步骤和十个实战案例,帮助构建高效、稳定的数据库应用。
1620 3
|
SQL 分布式计算 Java
Spark SQL向量化执行引擎框架Gluten-Velox在AArch64使能和优化
本文摘自 Arm China的工程师顾煜祺关于“在 Arm 平台上使用 Native 算子库加速 Spark”的分享,主要内容包括以下四个部分: 1.技术背景 2.算子库构成 3.算子操作优化 4.未来工作
2150 0
|
SQL 缓存 监控
SQL性能提升指南:五大优化策略与十个实战案例
在数据库性能优化的世界里,SQL优化是提升查询效率的关键。一个高效的SQL查询可以显著减少数据库的负载,提高应用响应速度,甚至影响整个系统的稳定性和扩展性。本文将介绍SQL优化的五大步骤,并结合十个实战案例,为你提供一份详尽的性能提升指南。
1781 0
|
SQL JSON 分布式计算
【赵渝强老师】Spark SQL的数据模型:DataFrame
本文介绍了在Spark SQL中创建DataFrame的三种方法。首先,通过定义case class来创建表结构,然后将CSV文件读入RDD并关联Schema生成DataFrame。其次,使用StructType定义表结构,同样将CSV文件读入RDD并转换为Row对象后创建DataFrame。最后,直接加载带有格式的数据文件(如JSON),通过读取文件内容直接创建DataFrame。每种方法都包含详细的代码示例和解释。
461 0
|
SQL 分布式计算 大数据
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
612 0