spark sql编程之实现合并Parquet格式的DataFrame的schema

简介: spark sql编程之实现合并Parquet格式的DataFrame的schema

首先说下什么是schema,其实这跟通俗来讲,与我们传统数据表字段的名称是一个意思。明白了这个,我们在继续往下看。


合并schema


首先创建RDD,并转换为含有两个字段"value", "square"的DataFrame

val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")

21de21353720b1c3423d21dca2e877c9.jpg然后以parquet格式保存

squaresDF.write.parquet("data/test_table/key=1")


然后在创建RDD,并转换为含有两个字段"value", "cube"的DataFrame

val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube")

835c3d904063ad8648bab772b6c0edc0.jpg然后以parquet格式保存


cubesDF.write.parquet("data/test_table/key=2")

最后合并schema

val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")

2a5e9913d7fb6c9a69968360014b9b75.jpg

我们打印schema

mergedDF.printSchema()

e8b753e78dc83ecbd879dfa7b47b0c47.jpg

接着我们现实数据

mergedDF.show

4be30bdc539681241467fa36cc46050e.jpg

如果想合并schema需要设置mergeSchema 为true,当然还有另外一种方式是设置spark.sql.parquet.mergeSchema为true。


相关补充说明:


Hive metastore Parquet表格式转换


当读取hive的 Parquet 表时,Spark SQL为了提高性能,会使用自己的支持的Parquet,由配置 spark.sql.hive.convertMetastoreParquet控制,默认是开启的。

上面除了Parquet格式支持外,还有ProtocolBuffer, Avro, 和Thrift支持合并。


如何修改配置项:


可以通过SparkSession 的setConf 或则使用SQL命令

SET key=value

更多配置项如下:

5cc947ef714ffbb182104eef59a47c43.jpg

目录
相关文章
|
8月前
|
SQL 自然语言处理 数据库
【Azure Developer】分享两段Python代码处理表格(CSV格式)数据 : 根据每列的内容生成SQL语句
本文介绍了使用Python Pandas处理数据收集任务中格式不统一的问题。针对两种情况:服务名对应多人拥有状态(1/0表示),以及服务名与人名重复列的情况,分别采用双层for循环和字典数据结构实现数据转换,最终生成Name对应的Services列表(逗号分隔)。此方法高效解决大量数据的人工处理难题,减少错误并提升效率。文中附带代码示例及执行结果截图,便于理解和实践。
224 4
|
4月前
|
SQL JSON 分布式计算
Spark SQL架构及高级用法
Spark SQL基于Catalyst优化器与Tungsten引擎,提供高效的数据处理能力。其架构涵盖SQL解析、逻辑计划优化、物理计划生成及分布式执行,支持复杂数据类型、窗口函数与多样化聚合操作,结合自适应查询与代码生成技术,实现高性能大数据分析。
|
8月前
|
SQL 分布式计算 资源调度
Dataphin功能Tips系列(48)-如何根据Hive SQL/Spark SQL的任务优先级指定YARN资源队列
如何根据Hive SQL/Spark SQL的任务优先级指定YARN资源队列
319 4
|
10月前
|
SQL 分布式计算 Java
Spark SQL向量化执行引擎框架Gluten-Velox在AArch64使能和优化
本文摘自 Arm China的工程师顾煜祺关于“在 Arm 平台上使用 Native 算子库加速 Spark”的分享,主要内容包括以下四个部分: 1.技术背景 2.算子库构成 3.算子操作优化 4.未来工作
1303 0
|
SQL 监控 关系型数据库
SQL语句当前及历史信息查询-performance schema的使用
本文介绍了如何使用MySQL的Performance Schema来获取SQL语句的当前和历史执行信息。Performance Schema默认在MySQL 8.0中启用,可以通过查询相关表来获取详细的SQL执行信息,包括当前执行的SQL、历史执行记录和统计汇总信息,从而快速定位和解决性能瓶颈。
640 1
|
SQL JSON 分布式计算
【赵渝强老师】Spark SQL的数据模型:DataFrame
本文介绍了在Spark SQL中创建DataFrame的三种方法。首先,通过定义case class来创建表结构,然后将CSV文件读入RDD并关联Schema生成DataFrame。其次,使用StructType定义表结构,同样将CSV文件读入RDD并转换为Row对象后创建DataFrame。最后,直接加载带有格式的数据文件(如JSON),通过读取文件内容直接创建DataFrame。每种方法都包含详细的代码示例和解释。
278 0
|
SQL 分布式计算 大数据
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
276 0
|
SQL 分布式计算 算法
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
225 0
|
SQL 分布式计算 Java
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
268 0
|
SQL 分布式计算 大数据
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
345 0