Spark SQL中基于parquet数据的加载方式、数据源的自动分区推断以及数据源的元数据合并

简介: 笔记

数据源:

{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
{"name":"Justin", "age":19}
{"name":"Justin", "age":32}

示例代码:

package com.kfk.spark.sql
import com.kfk.spark.common.{Comm, CommSparkSessionScala}
import org.apache.spark.sql.{SaveMode, SparkSession}
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/7
 * @time : 7:18 下午
 */
object ParquetScala {
    def main(args: Array[String]): Unit = {
        val spark = CommSparkSessionScala.getSparkSession()
        basicLoad(spark)
        partition(spark)
        mege1(spark)
        mege2(spark)
    }
    // 数据的加载方式
    def basicLoad(spark : SparkSession): Unit ={
        import spark.implicits._
        val jsonPath = Comm.fileDirPath + "people.json"
        val parquetPath = Comm.fileDirPath + "people.parquet"
        val peopleDF = spark.read.json(jsonPath)
        // 将json文件转换成parquet文件
        // peopleDF.write.parquet(Comm.fileDirPath + "people.parquet")
        val parquetDF = spark.read.parquet(parquetPath).createOrReplaceTempView("people")
        spark.sql("select * from people").map(row => "mame : " + row.getAs("name")).show()
    }
    // 数据源的自动分区
    def partition(spark : SparkSession): Unit ={
        import spark.implicits._
        val partitionPath = Comm.fileDirPath + "people"
        spark.read.parquet(partitionPath).show()
        /**
         * +----+-------+------+-------+
         * | age|   name|gender|country|
         * +----+-------+------+-------+
         * |null|Michael|  male|     US|
         * |  30|   Andy|  male|     US|
         * |  19| Justin|  male|     US|
         * |  19| Justin|  male|     US|
         * |  32| Justin|  male|     US|
         * +----+-------+------+-------+
         */
    }
    // 数据源的元数据合并一
    def mege1(spark : SparkSession): Unit ={
        import spark.implicits._
        val squareDF = spark.sparkContext.makeRDD(1 to 5).map(x => (x , x * x)).toDF("value","square")
        squareDF.write.parquet(Comm.fileDirPath + "test_table/key=1")
        val cubeDF = spark.sparkContext.makeRDD(6 to 10).map(x => (x , x * x * x)).toDF("value","cube")
        cubeDF.write.parquet(Comm.fileDirPath + "test_table/key=2")
        val megeDF = spark.read.option("mergeSchema",true).parquet(Comm.fileDirPath + "test_table").printSchema()
        /**
         * root
             |-- value: integer (nullable = true)
             |-- square: integer (nullable = true)
             |-- cube: integer (nullable = true)
             |-- key: integer (nullable = true)
         */
    }
    // 数据源的元数据合并二
    def mege2(spark : SparkSession): Unit ={
        import spark.implicits._
        val array1 = Array(("alex",22),("cherry",23))
        val arrayDF1 = spark.sparkContext.makeRDD(array1.toSeq).toDF("name","age").write.format("parquet").mode(SaveMode.Append).save(Comm.fileDirPath + "test_table2")
        val array2 = Array(("alex","dept-1"),("cherry","dept-2"))
        val arrayDF2 = spark.sparkContext.makeRDD(array2.toSeq).toDF("name","dept").write.format("parquet").mode(SaveMode.Append).save(Comm.fileDirPath + "test_table2")
        val megeDF = spark.read.option("mergeSchema",true).parquet(Comm.fileDirPath + "test_table2").printSchema()
        /**
         * +------+------+----+
         * |  name|  dept| age|
         * +------+------+----+
         * |  alex|dept-1|null|
         * |cherry|dept-2|null|
         * |  alex|  null|  22|
         * |cherry|  null|  23|
         * +------+------+----+
         *
         * root
             |-- name: string (nullable = true)
             |-- dept: string (nullable = true)
             |-- age: integer (nullable = true)
         */
    }
}


相关文章
|
10月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
简介:本文整理自阿里云高级技术专家李麟在Flink Forward Asia 2025新加坡站的分享,介绍了Flink 2.1 SQL在实时数据处理与AI融合方面的关键进展,包括AI函数集成、Join优化及未来发展方向,助力构建高效实时AI管道。
1222 43
|
10月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
本文整理自阿里云的高级技术专家、Apache Flink PMC 成员李麟老师在 Flink Forward Asia 2025 新加坡[1]站 —— 实时 AI 专场中的分享。将带来关于 Flink 2.1 版本中 SQL 在实时数据处理和 AI 方面进展的话题。
594 0
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
|
11月前
|
SQL
SQL如何只让特定列中只显示一行数据
SQL如何只让特定列中只显示一行数据
|
SQL 数据挖掘 关系型数据库
【SQL 周周练】一千条数据需要做一天,怎么用 SQL 处理电表数据(如何动态构造自然月)
题目来自于某位发帖人在某 Excel 论坛的求助,他需要将电表缴费数据按照缴费区间拆开后再按月份汇总。当时用手工处理数据,自称一千条数据就需要处理一天。我将这个问题转化为 SQL 题目。
477 12
|
11月前
|
SQL
SQL中如何删除指定查询出来的数据
SQL中如何删除指定查询出来的数据
|
11月前
|
SQL 关系型数据库 MySQL
SQL如何对不同表的数据进行更新
本文介绍了如何将表A的Col1数据更新到表B的Col1中,分别提供了Microsoft SQL和MySQL的实现方法,并探讨了多表合并后更新的优化方式,如使用MERGE语句提升效率。适用于数据库数据同步与批量更新场景。
|
12月前
|
SQL DataWorks 数据管理
SQL血缘分析实战!数据人必会的3大救命场景
1. 开源工具:Apache Atlas(元数据管理)、Spline(血缘追踪) 2. 企业级方案:阿里DataWorks血缘分析、腾讯云CDW血缘引擎 3. 自研技巧:在ETL脚本中植入版本水印,用注释记录业务逻辑变更 📌 重点总结:
|
关系型数据库 MySQL 网络安全
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
|
SQL 存储 监控
SQL Server的并行实施如何优化?
【7月更文挑战第23天】SQL Server的并行实施如何优化?
945 13
解锁 SQL Server 2022的时间序列数据功能
【7月更文挑战第14天】要解锁SQL Server 2022的时间序列数据功能,可使用`generate_series`函数生成整数序列,例如:`SELECT value FROM generate_series(1, 10)。此外,`date_bucket`函数能按指定间隔(如周)对日期时间值分组,这些工具结合窗口函数和其他时间日期函数,能高效处理和分析时间序列数据。更多信息请参考官方文档和技术资料。
640 9