Apache Spark,Parquet和麻烦的Null

简介:   关于类型安全性的经验教训,并承担过多  介绍  在将SQL分析ETL管道迁移到客户端的新Apache Spark批处理ETL基础结构时,我注意到了一些奇特的东西。 开发的基础结构具有可为空的DataFrame列架构的概念。 乍看起来似乎并不奇怪。 大多数(如果不是全部)SQL数据库都允许列为可空或不可空,对吗? 让我们研究一下在创建Spark DataFrame时,这种看似明智的概念为什么会带来问题。  from pyspark.sql import types  schema=types.StructType([

  关于类型安全性的经验教训,并承担过多

  介绍

  在将SQL分析ETL管道迁移到客户端的新Apache Spark批处理ETL基础结构时,我注意到了一些奇特的东西。 开发的基础结构具有可为空的DataFrame列架构的概念。 乍看起来似乎并不奇怪。 大多数(如果不是全部)SQL数据库都允许列为可空或不可空,对吗? 让我们研究一下在创建Spark DataFrame时,这种看似明智的概念为什么会带来问题。

  from pyspark.sql import types

  schema=types.StructType([

  types.StructField("index", types.LongType(), False),

  types.StructField("long", types.LongType(), True),

  ])

  df=sqlContext.createDataFrame(sc.emptyRDD(), schema)

  df.printSchema()

  此代码块在将为空的DataFrame df上强制实施模式。 df.printSchema()将为我们提供以下内容:

  root

  |-- index: long (nullable=false)

  |-- long: long (nullable=true)

  可以看出,内存中的DataFrame继承了已定义模式的可空性。 但是,这有点误导。 Spark中的列可空性是一个优化语句; 不是对象类型的强制。

  在本文中,我们将主要介绍通过Parquet创建和保存DataFrame的行为。 实木复合地板的文件格式和设计将不作深入介绍。

  Nullable对DataFrame列意味着什么?

  …当您定义一个架构,在该架构中所有列均声明为不具有空值时– Spark不会强制执行该操作,并且会很乐意让空值进入该列。 可为空的信号只是为了帮助Spark SQL优化处理该列。 如果列中的空值不应包含空值,则可能会得到错误的结果,或者会看到难以调试的奇怪异常。 —《 Apache Spark数据工程师指南》; 第74页

  当一列被声明为不具有空值时,Spark不会强制执行此声明。 无论用户定义的调用代码是否声明为可空,Spark都不会执行空检查。 列的可为空特性是与Catalyst Optimizer签订的一项合同,该协议不会产生空数据。 如有任何疑问,健康的做法是始终将其设置为true。 在像JSON / CSV这样的实例中将默认值默认为null以支持更宽松类型的数据源是有意义的。 更重要的是,忽略可空性是Spark的保守选择。 Apache Spark无法控制要查询的数据及其存储,因此默认为代码安全行为。 例如,始终可以以特殊方式将文件添加到DFS(分布式文件服务器),这将违反任何已定义的数据完整性约束。

  从Parquet创建DataFrame

  从Parquet文件路径创建DataFrame对用户来说很容易。 可以通过调用实例化DataFrameReader的SparkSession.read.parquet()或SparkSession.read.load('path / to / data.parquet')来完成。1在将外部数据转换为DataFrame的过程中,数据 由Spark推断模式,并为摄取Parquet零件文件的Spark作业设计查询计划。

  调用模式推断时,将设置一个标志来回答问题:"是否应合并所有Parquet零件文件中的模式?"当多个Parquet文件具有不同的架构时,可以将它们合并。二手QQ买卖平台默认行为是不合并架构。2然后区分出解析架构所需的文件。如果不需要合并,Spark总是首先尝试摘要文件。在这种情况下,_common_metadata比_metadata更可取,因为它不包含行组信息,并且对于具有许多行组的大型Parquet文件而言,它可能要小得多。如果摘要文件不可用,则行为是回退到随机的零件文件。3在默认情况下(未将架构合并标记为必要),Spark将首先尝试任意_common_metadata文件,然后回退至任意_metadata,最后是任意部分文件,并假定(正确或不正确)方案是一致的。设置了要合并的文件后,该操作将由分布式Spark作业完成。?请务必注意,数据架构始终断言为可空值。简而言之,这是因为QueryPlan()重新创建了保存架构的StructType,但强制所有包含的字段为空。

  Parquet的书写方式

  "…编写Parquet文件时,出于兼容性原因,所有列都将自动转换为可为空。" -Spark Docs

  因此,说您找到了在Spark作业的列级强制执行null的一种方法。 不幸的是,一旦您写信给Parquet,该执行就失效了。 为了从更高层次描述

  SparkSession.write.parquet(),它将从给定的DataFrame中创建一个DataSource,实施为Parquet提供的默认压缩,构建优化的查询,并使用可为空的模式复制数据。 可以将其大致描述为DataFrame创建的逆过程。

  一些实验

  在最后一部分中,我将提供一些有关默认行为的预期示例。

  在调查对Parquet的写入时,有两种选择:

  · 在建立DataFrame上使用手动定义的架构

  schema=types.StructType([

  types.StructField("index", types.LongType(), False),

  types.StructField("long", types.LongType(), True),

  ])

  data=[

  (1, 6),

  (2, 7),

  (3, None),

  (4, 8),

  (5, 9)

  ]

  df_w_schema=sqlContext.createDataFrame(data, schema)

  df_w_schema.collect()

  df_w_schema.write.parquet('nullable_check_w_schema')

  df_parquet_w_schema=sqlContext.read.schema(schema).parquet('nullable_check_w_schema')

  df_parquet_w_schema.printSchema()

  此处完成的工作是定义模式和数据集。 在写入之前,该模式的可空性得到了加强。 但是,一旦将DataFrame写入Parquet,就可以看到所有列的空性都从窗口中消失了,就像从传入的DataFrame中获得printSchema()的输出一样。

  root

  |-- index: long (nullable=true)

  |-- long: long (nullable=true)

  2.未定义架构

  df_wo_schema=sqlContext.createDataFrame(data)

  df_wo_schema.collect()

  df_wo_schema.write.mode('overwrite').parquet('nullable_check_wo_schema')

  df_parquet_wo_schema=sqlContext.read.parquet('nullable_check_wo_schema')

  df_parquet_wo_schema.printSchema()

  与1一样,我们定义了相同的数据集,但是缺少"强制"模式。 结果可以看作是

  root

  |-- _1: long (nullable=true)

  |-- _2: long (nullable=true)

  无论是否声明架构,都不会强制实现可空性。

  脚注

  [1] DataFrameReader是DataFrame与外部存储之间的接口。

  [2]

  PARQUET_SCHEMA_MERGING_ENABLED:为true时,Parquet数据源合并从所有数据文件收集的模式,否则从摘要文件或随机数据文件中选择该模式(如果没有可用的摘要文件)。

  [3]摘要文件中存储的元数据将从所有零件文件中合并。 但是,对于用户定义的键值元数据(我们在其中存储Spark SQL模式),如果键与单独的零件文件中的不同值相关联,Parquet不知道如何正确合并它们。 发生这种情况时,Parquet停止生成摘要文件,这意味着存在摘要文件时,则:

  一种。 所有部分文件都具有完全相同的Spark SQL模式或orb。 有些部分文件根本不在键值元数据中包含Spark SQL模式(因此它们的模式可能彼此不同)。

  Spark扮演悲观主义者,并考虑了第二种情况。 这意味着如果用户需要合并的架构,并且必须分析所有零件文件以进行合并,则摘要文件将不受信任。

  [4]不考虑地点。 此优化对于S3记录系统主要有用。 由于S3节点的计算限制,S3文件元数据操作可能很慢,并且本地性不可用。

  并行性受合并文件的数量限制。 因此,并行度为2的SparkSession只有一个合并文件,它将使用一个执行程序启动一个Spark作业。

  weshoffman/apache-spark-parquet-and-troublesome-nulls-28712b06f836)

目录
相关文章
|
4月前
|
分布式计算 大数据 数据处理
Apache Spark:提升大规模数据处理效率的秘籍
【4月更文挑战第7天】本文介绍了Apache Spark的大数据处理优势和核心特性,包括内存计算、RDD、一站式解决方案。分享了Spark实战技巧,如选择部署模式、优化作业执行流程、管理内存与磁盘、Spark SQL优化及监控调优工具的使用。通过这些秘籍,可以提升大规模数据处理效率,发挥Spark在实际项目中的潜力。
209 0
|
4月前
|
机器学习/深度学习 SQL 分布式计算
Apache Spark 的基本概念和在大数据分析中的应用
介绍 Apache Spark 的基本概念和在大数据分析中的应用
233 0
|
23天前
|
分布式计算 Hadoop 大数据
大数据处理框架在零售业的应用:Apache Hadoop与Apache Spark
【8月更文挑战第20天】Apache Hadoop和Apache Spark为处理海量零售户数据提供了强大的支持
32 0
|
2月前
|
分布式计算 大数据 Spark
Spark大数据处理:技术、应用与性能优化(全)PDF书籍推荐分享
《Spark大数据处理:技术、应用与性能优化》深入浅出介绍Spark核心,涵盖部署、实战与性能调优,适合初学者。作者基于微软和IBM经验,解析Spark工作机制,探讨BDAS生态,提供实践案例,助力快速掌握。书中亦讨论性能优化策略。[PDF下载链接](https://zhangfeidezhu.com/?p=347)。![Spark Web UI](https://img-blog.csdnimg.cn/direct/16aaadbb4e13410f8cb2727c3786cc9e.png#pic_center)
91 1
Spark大数据处理:技术、应用与性能优化(全)PDF书籍推荐分享
|
27天前
|
分布式计算 Serverless 数据处理
EMR Serverless Spark 实践教程 | 通过 Apache Airflow 使用 Livy Operator 提交任务
Apache Airflow 是一个强大的工作流程自动化和调度工具,它允许开发者编排、计划和监控数据管道的执行。EMR Serverless Spark 为处理大规模数据处理任务提供了一个无服务器计算环境。本文为您介绍如何通过 Apache Airflow 的 Livy Operator 实现自动化地向 EMR Serverless Spark 提交任务,以实现任务调度和执行的自动化,帮助您更有效地管理数据处理任务。
139 0
|
2月前
|
分布式计算 Apache Spark
|
3月前
|
分布式计算 大数据 数据处理
Apache Spark在大数据处理中的应用
Apache Spark是大数据处理的热门工具,由AMPLab开发并捐赠给Apache软件基金会。它以内存计算和优化的执行引擎著称,提供比Hadoop更快的处理速度,支持批处理、交互式查询、流处理和机器学习。Spark架构包括Driver、Master、Worker Node和Executor,核心组件有RDD、DataFrame、Dataset、Spark SQL、Spark Streaming、MLlib和GraphX。文章通过代码示例展示了Spark在批处理、交互式查询和实时数据处理中的应用,并讨论了其优势(高性能、易用性、通用性和集成性)和挑战。【6月更文挑战第11天】
83 6
|
3月前
|
分布式计算 Spark 大数据
深入探究Apache Spark在大数据处理中的实践应用
【6月更文挑战第2天】Apache Spark是流行的开源大数据处理框架,以其内存计算速度和低延迟脱颖而出。本文涵盖Spark概述、核心组件(包括Spark Core、SQL、Streaming和MLlib)及其在数据预处理、批处理分析、交互式查询、实时处理和机器学习中的应用。通过理解Spark内部机制和实践应用,可提升大数据处理效率,发挥其在各行业的潜力。
|
3月前
|
消息中间件 分布式计算 关系型数据库
使用Apache Spark从MySQL到Kafka再到HDFS的数据转移
使用Apache Spark从MySQL到Kafka再到HDFS的数据转移
|
4月前
|
消息中间件 分布式计算 Serverless
CDC一键入湖:当 Apache Hudi DeltaStreamer 遇见 Serverless Spark
CDC一键入湖:当 Apache Hudi DeltaStreamer 遇见 Serverless Spark
100 2

推荐镜像

更多