Apache Spark,Parquet和麻烦的Null-阿里云开发者社区

开发者社区> dasein58> 正文

Apache Spark,Parquet和麻烦的Null

简介:   关于类型安全性的经验教训,并承担过多   介绍   在将SQL分析ETL管道迁移到客户端的新Apache Spark批处理ETL基础结构时,我注意到了一些奇特的东西。 开发的基础结构具有可为空的DataFrame列架构的概念。 乍看起来似乎并不奇怪。 大多数(如果不是全部)SQL数据库都允许列为可空或不可空,对吗? 让我们研究一下在创建Spark DataFrame时,这种看似明智的概念为什么会带来问题。   from pyspark.sql import types   schema=types.StructType([
+关注继续查看

  关于类型安全性的经验教训,并承担过多

  介绍

  在将SQL分析ETL管道迁移到客户端的新Apache Spark批处理ETL基础结构时,我注意到了一些奇特的东西。 开发的基础结构具有可为空的DataFrame列架构的概念。 乍看起来似乎并不奇怪。 大多数(如果不是全部)SQL数据库都允许列为可空或不可空,对吗? 让我们研究一下在创建Spark DataFrame时,这种看似明智的概念为什么会带来问题。

  from pyspark.sql import types

  schema=types.StructType([

  types.StructField("index", types.LongType(), False),

  types.StructField("long", types.LongType(), True),

  ])

  df=sqlContext.createDataFrame(sc.emptyRDD(), schema)

  df.printSchema()

  此代码块在将为空的DataFrame df上强制实施模式。 df.printSchema()将为我们提供以下内容:

  root

  |-- index: long (nullable=false)

  |-- long: long (nullable=true)

  可以看出,内存中的DataFrame继承了已定义模式的可空性。 但是,这有点误导。 Spark中的列可空性是一个优化语句; 不是对象类型的强制。

  在本文中,我们将主要介绍通过Parquet创建和保存DataFrame的行为。 实木复合地板的文件格式和设计将不作深入介绍。

  Nullable对DataFrame列意味着什么?

  …当您定义一个架构,在该架构中所有列均声明为不具有空值时– Spark不会强制执行该操作,并且会很乐意让空值进入该列。 可为空的信号只是为了帮助Spark SQL优化处理该列。 如果列中的空值不应包含空值,则可能会得到错误的结果,或者会看到难以调试的奇怪异常。 —《 Apache Spark数据工程师指南》; 第74页

  当一列被声明为不具有空值时,Spark不会强制执行此声明。 无论用户定义的调用代码是否声明为可空,Spark都不会执行空检查。 列的可为空特性是与Catalyst Optimizer签订的一项合同,该协议不会产生空数据。 如有任何疑问,健康的做法是始终将其设置为true。 在像JSON / CSV这样的实例中将默认值默认为null以支持更宽松类型的数据源是有意义的。 更重要的是,忽略可空性是Spark的保守选择。 Apache Spark无法控制要查询的数据及其存储,因此默认为代码安全行为。 例如,始终可以以特殊方式将文件添加到DFS(分布式文件服务器),这将违反任何已定义的数据完整性约束。

  从Parquet创建DataFrame

  从Parquet文件路径创建DataFrame对用户来说很容易。 可以通过调用实例化DataFrameReader的SparkSession.read.parquet()或SparkSession.read.load('path / to / data.parquet')来完成。1在将外部数据转换为DataFrame的过程中,数据 由Spark推断模式,并为摄取Parquet零件文件的Spark作业设计查询计划。

  调用模式推断时,将设置一个标志来回答问题:"是否应合并所有Parquet零件文件中的模式?"当多个Parquet文件具有不同的架构时,可以将它们合并。二手QQ买卖平台默认行为是不合并架构。2然后区分出解析架构所需的文件。如果不需要合并,Spark总是首先尝试摘要文件。在这种情况下,_common_metadata比_metadata更可取,因为它不包含行组信息,并且对于具有许多行组的大型Parquet文件而言,它可能要小得多。如果摘要文件不可用,则行为是回退到随机的零件文件。3在默认情况下(未将架构合并标记为必要),Spark将首先尝试任意_common_metadata文件,然后回退至任意_metadata,最后是任意部分文件,并假定(正确或不正确)方案是一致的。设置了要合并的文件后,该操作将由分布式Spark作业完成。?请务必注意,数据架构始终断言为可空值。简而言之,这是因为QueryPlan()重新创建了保存架构的StructType,但强制所有包含的字段为空。

  Parquet的书写方式

  "…编写Parquet文件时,出于兼容性原因,所有列都将自动转换为可为空。" -Spark Docs

  因此,说您找到了在Spark作业的列级强制执行null的一种方法。 不幸的是,一旦您写信给Parquet,该执行就失效了。 为了从更高层次描述

  SparkSession.write.parquet(),它将从给定的DataFrame中创建一个DataSource,实施为Parquet提供的默认压缩,构建优化的查询,并使用可为空的模式复制数据。 可以将其大致描述为DataFrame创建的逆过程。

  一些实验

  在最后一部分中,我将提供一些有关默认行为的预期示例。

  在调查对Parquet的写入时,有两种选择:

  · 在建立DataFrame上使用手动定义的架构

  schema=types.StructType([

  types.StructField("index", types.LongType(), False),

  types.StructField("long", types.LongType(), True),

  ])

  data=[

  (1, 6),

  (2, 7),

  (3, None),

  (4, 8),

  (5, 9)

  ]

  df_w_schema=sqlContext.createDataFrame(data, schema)

  df_w_schema.collect()

  df_w_schema.write.parquet('nullable_check_w_schema')

  df_parquet_w_schema=sqlContext.read.schema(schema).parquet('nullable_check_w_schema')

  df_parquet_w_schema.printSchema()

  此处完成的工作是定义模式和数据集。 在写入之前,该模式的可空性得到了加强。 但是,一旦将DataFrame写入Parquet,就可以看到所有列的空性都从窗口中消失了,就像从传入的DataFrame中获得printSchema()的输出一样。

  root

  |-- index: long (nullable=true)

  |-- long: long (nullable=true)

  2.未定义架构

  df_wo_schema=sqlContext.createDataFrame(data)

  df_wo_schema.collect()

  df_wo_schema.write.mode('overwrite').parquet('nullable_check_wo_schema')

  df_parquet_wo_schema=sqlContext.read.parquet('nullable_check_wo_schema')

  df_parquet_wo_schema.printSchema()

  与1一样,我们定义了相同的数据集,但是缺少"强制"模式。 结果可以看作是

  root

  |-- _1: long (nullable=true)

  |-- _2: long (nullable=true)

  无论是否声明架构,都不会强制实现可空性。

  脚注

  [1] DataFrameReader是DataFrame与外部存储之间的接口。

  [2]

  PARQUET_SCHEMA_MERGING_ENABLED:为true时,Parquet数据源合并从所有数据文件收集的模式,否则从摘要文件或随机数据文件中选择该模式(如果没有可用的摘要文件)。

  [3]摘要文件中存储的元数据将从所有零件文件中合并。 但是,对于用户定义的键值元数据(我们在其中存储Spark SQL模式),如果键与单独的零件文件中的不同值相关联,Parquet不知道如何正确合并它们。 发生这种情况时,Parquet停止生成摘要文件,这意味着存在摘要文件时,则:

  一种。 所有部分文件都具有完全相同的Spark SQL模式或orb。 有些部分文件根本不在键值元数据中包含Spark SQL模式(因此它们的模式可能彼此不同)。

  Spark扮演悲观主义者,并考虑了第二种情况。 这意味着如果用户需要合并的架构,并且必须分析所有零件文件以进行合并,则摘要文件将不受信任。

  [4]不考虑地点。 此优化对于S3记录系统主要有用。 由于S3节点的计算限制,S3文件元数据操作可能很慢,并且本地性不可用。

  并行性受合并文件的数量限制。 因此,并行度为2的SparkSession只有一个合并文件,它将使用一个执行程序启动一个Spark作业。

  weshoffman/apache-spark-parquet-and-troublesome-nulls-28712b06f836)

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
使用NAT网关轻松为单台云服务器设置多个公网IP
在应用中,有时会遇到用户询问如何使单台云服务器具备多个公网IP的问题。 具体如何操作呢,有了NAT网关这个也不是难题。
26746 0
使用SSH远程登录阿里云ECS服务器
远程连接服务器以及配置环境
2465 0
使用OpenApi弹性释放和设置云服务器ECS释放
云服务器ECS的一个重要特性就是按需创建资源。您可以在业务高峰期按需弹性的自定义规则进行资源创建,在完成业务计算的时候释放资源。本篇将提供几个Tips帮助您更加容易和自动化的完成云服务器的释放和弹性设置。
12039 0
阿里云服务器安全组设置内网互通的方法
虽然0.0.0.0/0使用非常方便,但是发现很多同学使用它来做内网互通,这是有安全风险的,实例有可能会在经典网络被内网IP访问到。下面介绍一下四种安全的内网互联设置方法。 购买前请先:领取阿里云幸运券,有很多优惠,可到下文中领取。
11789 0
windows server 2008阿里云ECS服务器安全设置
最近我们Sinesafe安全公司在为客户使用阿里云ecs服务器做安全的过程中,发现服务器基础安全性都没有做。为了为站长们提供更加有效的安全基础解决方案,我们Sinesafe将对阿里云服务器win2008 系统进行基础安全部署实战过程! 比较重要的几部分 1.
9064 0
腾讯云服务器 设置ngxin + fastdfs +tomcat 开机自启动
在tomcat中新建一个可以启动的 .sh 脚本文件 /usr/local/tomcat7/bin/ export JAVA_HOME=/usr/local/java/jdk7 export PATH=$JAVA_HOME/bin/:$PATH export CLASSPATH=.
4623 0
阿里云ECS云服务器初始化设置教程方法
阿里云ECS云服务器初始化是指将云服务器系统恢复到最初状态的过程,阿里云的服务器初始化是通过更换系统盘来实现的,是免费的,阿里云百科网分享服务器初始化教程: 服务器初始化教程方法 本文的服务器初始化是指将ECS云服务器系统恢复到最初状态,服务器中的数据也会被清空,所以初始化之前一定要先备份好。
6934 0
+关注
765
文章
0
问答
文章排行榜
最热
最新
相关电子书
更多
《2021云上架构与运维峰会演讲合集》
立即下载
《零基础CSS入门教程》
立即下载
《零基础HTML入门教程》
立即下载