Spark-TFRecord: Spark将全面支持TFRecord

简介: 本文中,我们将介绍 Spark 的一个新的数据源,Spark-TFRecord。Spark-TFRecord 的目的是提供在Spark中对原生的 TensorFlow 格式进行完全支持。本项目的目的是将 TFRecord 作为Spark数据源社区中的第一等公民,类似于 Avro,JSON,Parquet等。Spark-TFRecord 不仅仅提供简单的功能支持,比如 Data Frame的读取、写入,还支持一些高阶功能,比如ParititonBy。使用 Spark-TFRecord 将会使数据处理流程与训练工程完美结合。

编译:江宇,阿里云EMR技术专家。从事Hadoop内核开发,目前专注于机器学习、深度学习大数据平台的建设。


简介:

在机器学习领域,Apache Spark 由于其支持 SQL 类型的操作以及高效的数据处理,被广泛的用于数据预处理流程,同时 TensorFlow 作为广受欢迎的深度学习框架被广泛的用于模型训练。尽管两个框架有一些共同支持的数据格式,但是,作为 TFRecord—TensorFlow 的原生格式,并没有被 Spark 完全支持。尽管之前有过一些尝试,试图解决两个系统之间的差异(比如 Spark-TensorFlow-Connector),但是现有的实现都缺少很多 Spark 支持的重要特性。

本文中,我们将介绍 Spark 的一个新的数据源,Spark-TFRecord。Spark-TFRecord 的目的是提供在Spark中对原生的 TensorFlow 格式进行完全支持。本项目的目的是将TFRecord 作为Spark数据源社区中的第一类公民,类似于 Avro,JSON,Parquet等。Spark-TFRecord 不仅仅提供简单的功能支持,比如 Data Frame的读取、写入,还支持一些高阶功能,比如ParititonBy。使用 Spark-TFRecord 将会使数据处理流程与训练工程完美结合。

LinkedIn 内部 Spark 和 TensorFlow 都被广泛的使用。Spark 被用于数据处理、训练数据预处理流程中。Spark 同时也是数据分析的领先工具。随着原来越多的商业部门使用深度学习模型,TensorFlow 成为了模型训练和模型服务的主流工具。开源的TensorFlow 模型使用 TFRecord 作为数据格式,而LinkedIn 内部大部分使用 Avro 格式。为了模型训练,我们或者修改代码使模型训练能够读取avro格式,或者将avro格式的datasets转化为TFRecord。Spark-TFRecod主要是解决后者,即将不同格式转化为TFRecord。

现有的项目和之前的尝试

在 Spark-TFRecord 项目之前,社区提供 Spark-TensorFlow-Connector , 在 Spark 中读写 TFRecord 。Spark-TensorFlow-Connector 是 TensorFlow 生态圈的一部分,并且是由 DataBricks,spark 的创始公司提供。尽管 Spark-TensorFlow-Connector 提供基本的读写功能,但是我们在LinkedIn的使用中发现了两个问题。首先,它基于 RelationProvider 接口。这个接口主要用于Spark 与数据库连接,磁盘读写操作都是由数据库来支持。然而 Spark-TensorFlow-Connector 的使用场景是磁盘IO,而不是连接数据库,这块接口需要开发者自己实现 RelationProvider 来支持IO操作。这就是为什么Spark-TensorFlow-Connector 大量代码是用于不同的磁盘读写场景。

此外,Spark-TensorFlow-Connector 缺少一些 Spark支持的重要功能,比如 PartitionBy 用于将dataset 根据不同列进行分片。我们发现这个功能在LinkedIn 中对于模型训练非常重要,提供训练过程中根据实体IDs进行切分进行分布式训练。这个功能在TensorFlow 社区中也是高需求。

Spark-TFRrecord 为了解决上述问题,实现了FileFormat 接口,其他的原生格式比如 Avro,Parquet 等也实现了该接口。使用该接口后,TFRecord 就获取了所有的 DataFrame 和 DataSet 的I/O API,包括之前说的 PartitionBy 功能。此外,之后的 Spark I/O 接口的功能增强也能够自动获取到。

设计

我们起初考虑对 Spark-TensorFlow-Connector 打补丁的方式去获取 PartitionBy 功能。检查过源码后,我们发现 Spark-TensorFlow-Connector 使用的RelationProvider接口,是用于连接 Spark 与 SQL 数据库的,不适用于 TensorFlow 场景。然后并没有一个简单解决方式去解决 RelationProvider 并不提供磁盘I/O操作这一问题。于是,我们决定采取了不同的方式,我们实现了FileFormat,FileFormat是用来实现底层的基于文件的I/O操作。实现这一功能对LinkedIn的场景是非常有用的,我们的datasets基本上都是直接读写磁盘。

下图展示了各个模块
image.png

每个模块作用如下:

Schema Inferencer: 用于将Spark的数据类型推测为TFRecord的数据类型,我们复用了很多Spark-Tensorflow-Connector功能。

TFRecord Reader: 读取磁盘中TFRecord文件并使用反序列化器将TFRecord转换为Spark的InternalRow数据结构。

TFRecord Writer:将Spark的InternalRow数据结构通过序列化器转化为TFRecord格式并保存至磁盘。我们使用TensorFlow Hadoop库的写入器。

TFRecord Deserializer: 反序列化器,将TFRecord转化为Spark InternalRow。

TFRecord Serializer: 序列化器,将Spark InternalRow转化为TFRecord。

如何使用Spark-TFRecord

Spark-TFRecord与Spark-TensorFlow-Connector完全后向兼容。迁移十分方便,只需要加入spark-tfrecord jar包并且指定数据格式为“tfrecord”。下面的例子显示了如何使用Spark-TFRecord去读取倾斜和partition TFRecord文件。更多的例子可以参照github仓库

// launch spark-shell with the following command:
// SPARK_HOME/bin/spark-shell --jar target/spark-tfrecord_2.11-0.1.jar
import org.apache.spark.sql.SaveMode
val df = Seq((8, "bat"),(8, "abc"), (1, "xyz"), (2, "aaa")).toDF("number", "word")
df.show
// scala> df.show
// +------+----+
// |number|word|
// +------+----+
// |     8| bat|
// |     8| abc|
// |     1| xyz|
// |     2| aaa|
// +------+----+
val tf_output_dir = "/tmp/tfrecord-test"
// dump the tfrecords to files.
df.repartition(3, col("number")).write.mode(SaveMode.Overwrite).partitionBy("number").format("tfrecord").option("recordType", "Example").save(tf_output_dir)
// ls /tmp/tfrecord-test
// _SUCCESS        number=1        number=2        number=8
// read back the tfrecords from files.
val new_df = spark.read.format("tfrecord").option("recordType", "Example").load(tf_output_dir)
new_df.show
// scala> new_df.show
// +----+------+
// |word|number|
// +----+------+
// | bat|     8|
// | abc|     8|
// | xyz|     1|
// | aaa|     2|

总结

Spark-TFRecord使得Record可以作为Spark 数据格式的一等公民与其他数据格式一起使用。包含了所有dataframe API的功能,比如读、写、分区等。目前我们仅限于schemas符合Spark-Tensorflow-Connector要求。未来的工作将会提供更复杂的schemas支持。


原文链接:

https://engineering.linkedin.com/blog/2020/spark-tfrecord


阿里巴巴开源大数据技术团队成立Apache Spark中国技术社区,定期推送精彩案例,技术专家直播,问答区近万人Spark技术同学在线提问答疑,只为营造纯粹的Spark氛围,欢迎钉钉扫码加入!
二维码spark群.JPG

对开源大数据和感兴趣的同学可以加小编微信(下图二维码,备注“进群”)进入技术交流微信群。

image.png

Apache Spark技术交流社区公众号,微信扫一扫关注

image.png

相关实践学习
数据湖构建DLF快速入门
本教程通过使⽤数据湖构建DLF产品对于淘宝用户行为样例数据的分析,介绍数据湖构建DLF产品的数据发现和数据探索功能。
快速掌握阿里云 E-MapReduce
E-MapReduce 是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户提供集群、作业、数据等管理的一站式大数据处理分析服务。 本课程主要介绍阿里云 E-MapReduce 的使用方法。
相关文章
|
3月前
|
机器学习/深度学习 SQL 分布式计算
Apache Spark 的基本概念和在大数据分析中的应用
介绍 Apache Spark 的基本概念和在大数据分析中的应用
159 0
|
14天前
|
分布式计算 Hadoop 大数据
大数据技术与Python:结合Spark和Hadoop进行分布式计算
【4月更文挑战第12天】本文介绍了大数据技术及其4V特性,阐述了Hadoop和Spark在大数据处理中的作用。Hadoop提供分布式文件系统和MapReduce,Spark则为内存计算提供快速处理能力。通过Python结合Spark和Hadoop,可在分布式环境中进行数据处理和分析。文章详细讲解了如何配置Python环境、安装Spark和Hadoop,以及使用Python编写和提交代码到集群进行计算。掌握这些技能有助于应对大数据挑战。
|
3月前
|
机器学习/深度学习 SQL 分布式计算
介绍 Apache Spark 的基本概念和在大数据分析中的应用。
介绍 Apache Spark 的基本概念和在大数据分析中的应用。
|
2天前
|
分布式计算 大数据 数据处理
[AIGC大数据基础] Spark 入门
[AIGC大数据基础] Spark 入门
|
2月前
|
分布式计算 大数据 Java
Spark 大数据实战:基于 RDD 的大数据处理分析
Spark 大数据实战:基于 RDD 的大数据处理分析
122 0
|
3月前
|
分布式计算 监控 大数据
Spark RDD分区和数据分布:优化大数据处理
Spark RDD分区和数据分布:优化大数据处理
|
4月前
|
分布式计算 大数据 Linux
Python大数据之PySpark(三)使用Python语言开发Spark程序代码
Python大数据之PySpark(三)使用Python语言开发Spark程序代码
107 0
|
4月前
|
SQL 分布式计算 大数据
Hudi数据湖技术引领大数据新风口(三)解决spark模块依赖冲突
Hudi数据湖技术引领大数据新风口(三)解决spark模块依赖冲突
102 0
|
4月前
|
分布式计算 资源调度 大数据
大数据开发岗面试复习30天冲刺 - 日积月累,每日五题【Day30】——Spark数据调优(文末附完整文档)
大数据开发岗面试复习30天冲刺 - 日积月累,每日五题【Day30】——Spark数据调优(文末附完整文档)
65 0
|
4月前
|
SQL 机器学习/深度学习 分布式计算
大数据开发岗面试复习30天冲刺 - 日积月累,每日五题【Day17】——Spark4
大数据开发岗面试复习30天冲刺 - 日积月累,每日五题【Day17】——Spark4
44 0