大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云数据库 RDS PostgreSQL,集群系列 2核4GB
简介: 大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

Hadoop(已更完)

HDFS(已更完)

MapReduce(已更完)

Hive(已更完)

Flume(已更完)

Sqoop(已更完)

Zookeeper(已更完)

HBase(已更完)

Redis (已更完)

Kafka(已更完)

Spark(正在更新!)

章节内容

上节我们完成了如下的内容:


SparkSQL 核心操作

Action操作 详细解释+测试案例

Transformation操作 详细解释+测试案例

58472b92e0a9e413032d0b7d491462c4_e30887716879430ba35a907878bbfe42.png SQL 语句

总体而言:SparkSQL语HQL兼容;与HQL相比,SparkSQL更简洁。

SparkSQL是Apache Spark框架中的一个模块,专门用于处理结构化和半结构化数据。它提供了对数据进行查询、处理和分析的高级接口。


SparkSQL的核心特点包括:


DataFrame API:SparkSQL提供了DataFrame API,它是一种以行和列为结构的数据集,与关系数据库中的表非常相似。DataFrame支持多种数据源,如Hive、Parquet、JSON、JDBC等,可以轻松地将数据导入并进行操作。

SQL查询:SparkSQL允许用户通过标准的SQL语法查询DataFrame,这使得数据分析师和工程师可以使用他们熟悉的SQL语言来处理大数据。SparkSQL会自动将SQL查询转换为底层的RDD操作,从而在分布式环境中执行。

与Hive集成:SparkSQL可以与Hive无缝集成,使用Hive的元数据和查询引擎。它支持HiveQL(Hive Query Language)语法,并且能够直接访问Hive中的数据。

性能优化:SparkSQL采用了多种优化技术,如Catalyst查询优化器和Tungsten物理执行引擎。这些优化技术能够自动生成高效的执行计划,提高查询的执行速度。

数据样例

// 数据
1 1,2,3
2 2,3
3 1,2

// 需要实现如下的效果
1 1
1 2
1 3
2 2
2 3
3 1
3 2

编写代码

package icu.wzk

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, SparkSession}
import org.apache.spark.sql.Encoders


case class Info(id: String, tags: String)

object SparkSql01 {

  def main(args: Array[String]): Unit = {
    val sparkSession = SparkSession
      .builder()
      .appName("SparkSQLDemo")
      .master("local[*]")
      .getOrCreate()

    val sc = sparkSession.sparkContext
    sc.setLogLevel("WARN")

    val arr = Array("1 1,2,3", "2 2,3", "3 1,2")
    val rdd: RDD[Info] = sc
      .makeRDD(arr)
      .map{
        line => val fields: Array[String] = line.split("\\s+")
          Info(fields(0), fields(1))
      }

    import sparkSession.implicits._
    implicit val infoEncoder = Encoders.product[Info]

    val ds: Dataset[Info] = sparkSession.createDataset(rdd)
    ds.createOrReplaceTempView("t1")

    sparkSession.sql(
      """
        | select id, tag
        | from t1
        | lateral view explode(split(tags, ",")) t2 as tag
        |""".stripMargin
    ).show
    sparkSession.sql(
      """
        | select id, explode(split(tags, ","))
        | from t1
        |""".stripMargin
    ).show

    sparkSession.close()
  }

}

运行测试

控制台输出结果为:

+---+---+
| id|tag|
+---+---+
|  1|  1|
|  1|  2|
|  1|  3|
|  2|  2|
|  2|  3|
|  3|  1|
|  3|  2|
+---+---+

+---+---+
| id|col|
+---+---+
|  1|  1|
|  1|  2|
|  1|  3|
|  2|  2|
|  2|  3|
|  3|  1|
|  3|  2|
+---+---+

运行结果

运行结果如下图所示:

输入与输出

SparkSQL 内建支持的数据源包括:


Parquet (默认数据源)

JSON

CSV

Avro

Images

BinaryFiles(Spark 3.0)

简单介绍一下,Parquet 是一种列式存储格式,专门为大数据处理和分析而设计。


列式存储:Parquet 采用列式存储格式,这意味着同一列的数据存储在一起。这样可以极大地提高查询性能,尤其是当查询只涉及少量列时。

高效压缩:由于同一列的数据具有相似性,Parquet 能够更高效地进行压缩,节省存储空间。

支持复杂数据类型:Parquet 支持嵌套的数据结构,包括嵌套列表、映射和结构体,这使得它非常适合处理复杂的、半结构化的数据。

跨平台:Parquet 是一种开放标准,支持多种编程语言和数据处理引擎,包括 Apache Spark、Hadoop、Impala 等。

c35a5a5ff302f5a488248f55253e7a76_a3f72d398ed14b438d88be4c526f82ce.png Parquet

特点:Parquet是一种列式存储格式,特别适合大规模数据的存储和处理。它支持压缩和嵌套数据结构,因此在存储效率和读取性能方面表现优异。


使用方式:spark.read.parquet(“path/to/data”) 读取Parquet文件;df.write.parquet(“path/to/output”) 将DataFrame保存为Parquet格式。


JSON

特点:JSON是一种轻量级的数据交换格式,广泛用于Web应用程序和NoSQL数据库中。SparkSQL能够解析和生成JSON格式的数据,并支持嵌套结构。


使用方式:spark.read.json(“path/to/data”) 读取JSON文件;df.write.json(“path/to/output”) 将DataFrame保存为JSON格式。


CSV

特点:CSV(逗号分隔值)是最常见的平面文本格式之一,简单易用,但不支持嵌套结构。SparkSQL支持读取和写入CSV文件,并提供了处理缺失值、指定分隔符等功能。


使用方式:spark.read.csv(“path/to/data”) 读取CSV文件;df.write.csv(“path/to/output”) 将DataFrame保存为CSV格式。


Avro

特点:Avro是一种行式存储格式,适合大规模数据的序列化。它支持丰富的数据结构和模式演化,通常用于Hadoop生态系统中的数据存储和传输。


使用方式:spark.read.format(“avro”).load(“path/to/data”) 读取Avro文件;df.write.format(“avro”).save(“path/to/output”) 将DataFrame保存为Avro格式。


ORC

特点:ORC(Optimized Row Columnar)是一种高效的列式存储格式,专为大数据处理而设计,支持高压缩率和快速读取性能。它在存储空间和I/O性能方面表现优越。


使用方式:spark.read.orc(“path/to/data”) 读取ORC文件;df.write.orc(“path/to/output”) 将DataFrame保存为ORC格式。


Hive Tables

特点:SparkSQL能够无缝集成Hive,直接访问Hive元数据,并对Hive表进行查询。它支持HiveQL语法,并能够利用Hive的存储格式和结构。


使用方式:通过spark.sql(“SELECT * FROM hive_table”)查询Hive表;也可以使用saveAsTable将DataFrame写入Hive表。


JDBC/ODBC

特点:SparkSQL支持通过JDBC/ODBC接口连接关系型数据库,如MySQL、PostgreSQL、Oracle等。它允许从数据库读取数据并将结果写回数据库。


使用方式:spark.read.format(“jdbc”).option(“url”, “jdbc:mysql://host/db”).option(“dbtable”, “table”).option(“user”, “username”).option(“password”, “password”).load() 读取数据库表;df.write.format(“jdbc”).option(“url”, “jdbc:mysql://host/db”).option(“dbtable”, “table”).option(“user”, “username”).option(“password”, “password”).save() 将DataFrame写入数据库。


Text Files

特点:SparkSQL可以处理简单的文本文件,每一行被读取为一个字符串。适合用于处理纯文本数据。


使用方式:spark.read.text(“path/to/data”) 读取文本文件;df.write.text(“path/to/output”) 将DataFrame保存为文本格式。


Delta Lake (外部插件)

特点:Delta Lake是一种开源存储层,构建在Parquet格式之上,支持ACID事务、可扩展元数据处理和流批一体的实时数据处理。尽管不是内建的数据源,但它在Spark生态系统中得到了广泛支持。


使用方式:spark.read.format(“delta”).load(“path/to/delta-table”) 读取Delta表;df.write.format(“delta”).save(“path/to/delta-table”) 将DataFrame保存为Delta格式。


测试案例

val df1 =
spark.read.format("parquet").load("data/users.parquet")
// Use Parquet; you can omit format("parquet") if you wish as
it's the default
val df2 = spark.read.load("data/users.parquet")

// Use CSV
val df3 = spark.read.format("csv")
.option("inferSchema", "true")
.option("header", "true")
.load("data/people1.csv")

// Use JSON
val df4 = spark.read.format("json")
.load("data/emp.json")

此外还支持 JDBC 的方式:

val jdbcDF = sparkSession
  .read
  .format("jdbc")
  .option("url", "jdbc:mysql://h122.wzk.icu/spark_test?useSSL=false")
  .option("driver", "com.mysql.jdbc.Driver")
  .option("user", "hive")
  .option("password", "hive@wzk.icu")
  .load()
jdbcDF.show()

32eb66984e9cf5d1817ce0e3cdc3cd54_db91cef3efa345adaa2ebac0e7ed964c.png

导入依赖

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-hive_2.12</artifactId>
  <version>${spark.version}</version>
</dependency>

hive-site

需要在项目的 Resource 目录下,新增一个 hive-site.xml

备注:最好使用 metastore service连接Hive,使用直接metastore的方式时,SparkSQL程序会修改Hive的版本信息

<configuration>
    <property>
        <name>hive.metastore.uris</name>
        <value>thrift://h122.wzk.icu:9083</value>
    </property>
</configuration>

编写代码

object AccessHive {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .appName("Demo1")
      .master("local[*]")
      .enableHiveSupport()
      // 设为true时,Spark使用与Hive相同的约定来编写Parquet数据
      .config("spark.sql.parquet.writeLegacyFormat", true)
      .getOrCreate()

    val sc = spark.sparkContext
    sc.setLogLevel("warn")

    spark.sql("show databases").show
    spark.sql("select * from ods.ods_trade_product_info").show

    val df: DataFrame = spark.table("ods.ods_trade_product_info")
    df.show()

    df.write.mode(SaveMode.Append).saveAsTable("ods.ods_trade_product_info_back")
    spark.table("ods.ods_trade_product_info_back").show

    spark.close()
  }
}

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
1月前
|
分布式计算 关系型数据库 MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
49 3
|
1月前
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
40 0
|
1月前
|
消息中间件 存储 分布式计算
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
82 0
|
15天前
|
SQL Java 数据库连接
[SQL]SQL注入与SQL执行过程(基于JDBC)
本文介绍了SQL注入的概念及其危害,通过示例说明了恶意输入如何导致SQL语句异常执行。同时,详细解释了SQL语句的执行过程,并提出了使用PreparedStatement来防止SQL注入的方法。
37 1
|
1月前
|
缓存 分布式计算 大数据
大数据-90 Spark 集群 RDD 编程-高阶 RDD容错机制、RDD的分区、自定义分区器(Scala编写)、RDD创建方式(一)
大数据-90 Spark 集群 RDD 编程-高阶 RDD容错机制、RDD的分区、自定义分区器(Scala编写)、RDD创建方式(一)
45 0
|
1月前
|
分布式计算 算法 大数据
大数据-90 Spark 集群 RDD 编程-高阶 RDD容错机制、RDD的分区、自定义分区器(Scala编写)、RDD创建方式(二)
大数据-90 Spark 集群 RDD 编程-高阶 RDD容错机制、RDD的分区、自定义分区器(Scala编写)、RDD创建方式(二)
50 0
|
26天前
|
数据采集 JSON 数据处理
抓取和分析JSON数据:使用Python构建数据处理管道
在大数据时代,电商网站如亚马逊、京东等成为数据采集的重要来源。本文介绍如何使用Python结合代理IP、多线程等技术,高效、隐秘地抓取并处理电商网站的JSON数据。通过爬虫代理服务,模拟真实用户行为,提升抓取效率和稳定性。示例代码展示了如何抓取亚马逊商品信息并进行解析。
抓取和分析JSON数据:使用Python构建数据处理管道
|
11天前
|
JSON 数据格式 索引
Python中序列化/反序列化JSON格式的数据
【11月更文挑战第4天】本文介绍了 Python 中使用 `json` 模块进行序列化和反序列化的操作。序列化是指将 Python 对象(如字典、列表)转换为 JSON 字符串,主要使用 `json.dumps` 方法。示例包括基本的字典和列表序列化,以及自定义类的序列化。反序列化则是将 JSON 字符串转换回 Python 对象,使用 `json.loads` 方法。文中还提供了具体的代码示例,展示了如何处理不同类型的 Python 对象。
|
15天前
|
JSON 缓存 前端开发
PHP如何高效地处理JSON数据:从编码到解码
在现代Web开发中,JSON已成为数据交换的标准格式。本文探讨了PHP如何高效处理JSON数据,包括编码和解码的过程。通过简化数据结构、使用优化选项、缓存机制及合理设置解码参数等方法,可以显著提升JSON处理的性能,确保系统快速稳定运行。
|
8天前
|
JSON API 数据安全/隐私保护
拍立淘按图搜索API接口返回数据的JSON格式示例
拍立淘按图搜索API接口允许用户通过上传图片来搜索相似的商品,该接口返回的通常是一个JSON格式的响应,其中包含了与上传图片相似的商品信息。以下是一个基于淘宝平台的拍立淘按图搜索API接口返回数据的JSON格式示例,同时提供对其关键字段的解释