关于类型安全性的经验教训,并承担过多
介绍
在将SQL分析ETL管道迁移到客户端的新Apache Spark批处理ETL基础结构时,我注意到了一些奇特的东西。 开发的基础结构具有可为空的DataFrame列架构的概念。 乍看起来似乎并不奇怪。 大多数(如果不是全部)SQL数据库都允许列为可空或不可空,对吗? 让我们研究一下在创建Spark DataFrame时,这种看似明智的概念为什么会带来问题。
from pyspark.sql import types
schema=types.StructType([
types.StructField("index", types.LongType(), False),
types.StructField("long", types.LongType(), True),
])
df=sqlContext.createDataFrame(sc.emptyRDD(), schema)
df.printSchema()
此代码块在将为空的DataFrame df上强制实施模式。 df.printSchema()将为我们提供以下内容:
root
|-- index: long (nullable=false)
|-- long: long (nullable=true)
可以看出,内存中的DataFrame继承了已定义模式的可空性。 但是,这有点误导。 Spark中的列可空性是一个优化语句; 不是对象类型的强制。
在本文中,我们将主要介绍通过Parquet创建和保存DataFrame的行为。 实木复合地板的文件格式和设计将不作深入介绍。
Nullable对DataFrame列意味着什么?
…当您定义一个架构,在该架构中所有列均声明为不具有空值时– Spark不会强制执行该操作,并且会很乐意让空值进入该列。 可为空的信号只是为了帮助Spark SQL优化处理该列。 如果列中的空值不应包含空值,则可能会得到错误的结果,或者会看到难以调试的奇怪异常。 —《 Apache Spark数据工程师指南》; 第74页
当一列被声明为不具有空值时,Spark不会强制执行此声明。 无论用户定义的调用代码是否声明为可空,Spark都不会执行空检查。 列的可为空特性是与Catalyst Optimizer签订的一项合同,该协议不会产生空数据。 如有任何疑问,健康的做法是始终将其设置为true。 在像JSON / CSV这样的实例中将默认值默认为null以支持更宽松类型的数据源是有意义的。 更重要的是,忽略可空性是Spark的保守选择。 Apache Spark无法控制要查询的数据及其存储,因此默认为代码安全行为。 例如,始终可以以特殊方式将文件添加到DFS(分布式文件服务器),这将违反任何已定义的数据完整性约束。
从Parquet创建DataFrame
从Parquet文件路径创建DataFrame对用户来说很容易。 可以通过调用实例化DataFrameReader的SparkSession.read.parquet()或SparkSession.read.load('path / to / data.parquet')来完成。1在将外部数据转换为DataFrame的过程中,数据 由Spark推断模式,并为摄取Parquet零件文件的Spark作业设计查询计划。
调用模式推断时,将设置一个标志来回答问题:"是否应合并所有Parquet零件文件中的模式?"当多个Parquet文件具有不同的架构时,可以将它们合并。二手QQ买卖平台默认行为是不合并架构。2然后区分出解析架构所需的文件。如果不需要合并,Spark总是首先尝试摘要文件。在这种情况下,_common_metadata比_metadata更可取,因为它不包含行组信息,并且对于具有许多行组的大型Parquet文件而言,它可能要小得多。如果摘要文件不可用,则行为是回退到随机的零件文件。3在默认情况下(未将架构合并标记为必要),Spark将首先尝试任意_common_metadata文件,然后回退至任意_metadata,最后是任意部分文件,并假定(正确或不正确)方案是一致的。设置了要合并的文件后,该操作将由分布式Spark作业完成。?请务必注意,数据架构始终断言为可空值。简而言之,这是因为QueryPlan()重新创建了保存架构的StructType,但强制所有包含的字段为空。
Parquet的书写方式
"…编写Parquet文件时,出于兼容性原因,所有列都将自动转换为可为空。" -Spark Docs
因此,说您找到了在Spark作业的列级强制执行null的一种方法。 不幸的是,一旦您写信给Parquet,该执行就失效了。 为了从更高层次描述
SparkSession.write.parquet(),它将从给定的DataFrame中创建一个DataSource,实施为Parquet提供的默认压缩,构建优化的查询,并使用可为空的模式复制数据。 可以将其大致描述为DataFrame创建的逆过程。
一些实验
在最后一部分中,我将提供一些有关默认行为的预期示例。
在调查对Parquet的写入时,有两种选择:
· 在建立DataFrame上使用手动定义的架构
schema=types.StructType([
types.StructField("index", types.LongType(), False),
types.StructField("long", types.LongType(), True),
])
data=[
(1, 6),
(2, 7),
(3, None),
(4, 8),
(5, 9)
]
df_w_schema=sqlContext.createDataFrame(data, schema)
df_w_schema.collect()
df_w_schema.write.parquet('nullable_check_w_schema')
df_parquet_w_schema=sqlContext.read.schema(schema).parquet('nullable_check_w_schema')
df_parquet_w_schema.printSchema()
此处完成的工作是定义模式和数据集。 在写入之前,该模式的可空性得到了加强。 但是,一旦将DataFrame写入Parquet,就可以看到所有列的空性都从窗口中消失了,就像从传入的DataFrame中获得printSchema()的输出一样。
root
|-- index: long (nullable=true)
|-- long: long (nullable=true)
2.未定义架构
df_wo_schema=sqlContext.createDataFrame(data)
df_wo_schema.collect()
df_wo_schema.write.mode('overwrite').parquet('nullable_check_wo_schema')
df_parquet_wo_schema=sqlContext.read.parquet('nullable_check_wo_schema')
df_parquet_wo_schema.printSchema()
与1一样,我们定义了相同的数据集,但是缺少"强制"模式。 结果可以看作是
root
|-- _1: long (nullable=true)
|-- _2: long (nullable=true)
无论是否声明架构,都不会强制实现可空性。
脚注
[1] DataFrameReader是DataFrame与外部存储之间的接口。
[2]
PARQUET_SCHEMA_MERGING_ENABLED:为true时,Parquet数据源合并从所有数据文件收集的模式,否则从摘要文件或随机数据文件中选择该模式(如果没有可用的摘要文件)。
[3]摘要文件中存储的元数据将从所有零件文件中合并。 但是,对于用户定义的键值元数据(我们在其中存储Spark SQL模式),如果键与单独的零件文件中的不同值相关联,Parquet不知道如何正确合并它们。 发生这种情况时,Parquet停止生成摘要文件,这意味着存在摘要文件时,则:
一种。 所有部分文件都具有完全相同的Spark SQL模式或orb。 有些部分文件根本不在键值元数据中包含Spark SQL模式(因此它们的模式可能彼此不同)。
Spark扮演悲观主义者,并考虑了第二种情况。 这意味着如果用户需要合并的架构,并且必须分析所有零件文件以进行合并,则摘要文件将不受信任。
[4]不考虑地点。 此优化对于S3记录系统主要有用。 由于S3节点的计算限制,S3文件元数据操作可能很慢,并且本地性不可用。
并行性受合并文件的数量限制。 因此,并行度为2的SparkSession只有一个合并文件,它将使用一个执行程序启动一个Spark作业。
weshoffman/apache-spark-parquet-and-troublesome-nulls-28712b06f836)