Spark SQL中基于parquet数据的加载方式、数据源的自动分区推断以及数据源的元数据合并

简介: 笔记

数据源:

{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
{"name":"Justin", "age":19}
{"name":"Justin", "age":32}

示例代码:

package com.kfk.spark.sql
import com.kfk.spark.common.{Comm, CommSparkSessionScala}
import org.apache.spark.sql.{SaveMode, SparkSession}
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/7
 * @time : 7:18 下午
 */
object ParquetScala {
    def main(args: Array[String]): Unit = {
        val spark = CommSparkSessionScala.getSparkSession()
        basicLoad(spark)
        partition(spark)
        mege1(spark)
        mege2(spark)
    }
    // 数据的加载方式
    def basicLoad(spark : SparkSession): Unit ={
        import spark.implicits._
        val jsonPath = Comm.fileDirPath + "people.json"
        val parquetPath = Comm.fileDirPath + "people.parquet"
        val peopleDF = spark.read.json(jsonPath)
        // 将json文件转换成parquet文件
        // peopleDF.write.parquet(Comm.fileDirPath + "people.parquet")
        val parquetDF = spark.read.parquet(parquetPath).createOrReplaceTempView("people")
        spark.sql("select * from people").map(row => "mame : " + row.getAs("name")).show()
    }
    // 数据源的自动分区
    def partition(spark : SparkSession): Unit ={
        import spark.implicits._
        val partitionPath = Comm.fileDirPath + "people"
        spark.read.parquet(partitionPath).show()
        /**
         * +----+-------+------+-------+
         * | age|   name|gender|country|
         * +----+-------+------+-------+
         * |null|Michael|  male|     US|
         * |  30|   Andy|  male|     US|
         * |  19| Justin|  male|     US|
         * |  19| Justin|  male|     US|
         * |  32| Justin|  male|     US|
         * +----+-------+------+-------+
         */
    }
    // 数据源的元数据合并一
    def mege1(spark : SparkSession): Unit ={
        import spark.implicits._
        val squareDF = spark.sparkContext.makeRDD(1 to 5).map(x => (x , x * x)).toDF("value","square")
        squareDF.write.parquet(Comm.fileDirPath + "test_table/key=1")
        val cubeDF = spark.sparkContext.makeRDD(6 to 10).map(x => (x , x * x * x)).toDF("value","cube")
        cubeDF.write.parquet(Comm.fileDirPath + "test_table/key=2")
        val megeDF = spark.read.option("mergeSchema",true).parquet(Comm.fileDirPath + "test_table").printSchema()
        /**
         * root
             |-- value: integer (nullable = true)
             |-- square: integer (nullable = true)
             |-- cube: integer (nullable = true)
             |-- key: integer (nullable = true)
         */
    }
    // 数据源的元数据合并二
    def mege2(spark : SparkSession): Unit ={
        import spark.implicits._
        val array1 = Array(("alex",22),("cherry",23))
        val arrayDF1 = spark.sparkContext.makeRDD(array1.toSeq).toDF("name","age").write.format("parquet").mode(SaveMode.Append).save(Comm.fileDirPath + "test_table2")
        val array2 = Array(("alex","dept-1"),("cherry","dept-2"))
        val arrayDF2 = spark.sparkContext.makeRDD(array2.toSeq).toDF("name","dept").write.format("parquet").mode(SaveMode.Append).save(Comm.fileDirPath + "test_table2")
        val megeDF = spark.read.option("mergeSchema",true).parquet(Comm.fileDirPath + "test_table2").printSchema()
        /**
         * +------+------+----+
         * |  name|  dept| age|
         * +------+------+----+
         * |  alex|dept-1|null|
         * |cherry|dept-2|null|
         * |  alex|  null|  22|
         * |cherry|  null|  23|
         * +------+------+----+
         *
         * root
             |-- name: string (nullable = true)
             |-- dept: string (nullable = true)
             |-- age: integer (nullable = true)
         */
    }
}


相关文章
|
3月前
|
SQL JSON 分布式计算
Spark SQL架构及高级用法
Spark SQL基于Catalyst优化器与Tungsten引擎,提供高效的数据处理能力。其架构涵盖SQL解析、逻辑计划优化、物理计划生成及分布式执行,支持复杂数据类型、窗口函数与多样化聚合操作,结合自适应查询与代码生成技术,实现高性能大数据分析。
|
12月前
|
分布式计算 关系型数据库 MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
120 3
|
7月前
|
SQL 分布式计算 资源调度
Dataphin功能Tips系列(48)-如何根据Hive SQL/Spark SQL的任务优先级指定YARN资源队列
如何根据Hive SQL/Spark SQL的任务优先级指定YARN资源队列
208 4
|
9月前
|
SQL 分布式计算 Java
Spark SQL向量化执行引擎框架Gluten-Velox在AArch64使能和优化
本文摘自 Arm China的工程师顾煜祺关于“在 Arm 平台上使用 Native 算子库加速 Spark”的分享,主要内容包括以下四个部分: 1.技术背景 2.算子库构成 3.算子操作优化 4.未来工作
954 0
|
11月前
|
SQL JSON 分布式计算
【赵渝强老师】Spark SQL的数据模型:DataFrame
本文介绍了在Spark SQL中创建DataFrame的三种方法。首先,通过定义case class来创建表结构,然后将CSV文件读入RDD并关联Schema生成DataFrame。其次,使用StructType定义表结构,同样将CSV文件读入RDD并转换为Row对象后创建DataFrame。最后,直接加载带有格式的数据文件(如JSON),通过读取文件内容直接创建DataFrame。每种方法都包含详细的代码示例和解释。
230 0
|
12月前
|
SQL 分布式计算 大数据
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
250 0
|
12月前
|
SQL 分布式计算 算法
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
197 0
|
12月前
|
SQL 分布式计算 Java
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
223 0
|
12月前
|
SQL 分布式计算 大数据
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
274 0
|
12月前
|
SQL 存储 分布式计算
大数据-93 Spark 集群 Spark SQL 概述 基本概念 SparkSQL对比 架构 抽象
大数据-93 Spark 集群 Spark SQL 概述 基本概念 SparkSQL对比 架构 抽象
175 0

热门文章

最新文章