Spark-TFRecord: Spark将全面支持TFRecord

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: 本文中,我们将介绍 Spark 的一个新的数据源,Spark-TFRecord。Spark-TFRecord 的目的是提供在Spark中对原生的 TensorFlow 格式进行完全支持。本项目的目的是将 TFRecord 作为Spark数据源社区中的第一等公民,类似于 Avro,JSON,Parquet等。Spark-TFRecord 不仅仅提供简单的功能支持,比如 Data Frame的读取、写入,还支持一些高阶功能,比如ParititonBy。使用 Spark-TFRecord 将会使数据处理流程与训练工程完美结合。

编译:江宇,阿里云EMR技术专家。从事Hadoop内核开发,目前专注于机器学习、深度学习大数据平台的建设。


简介:

在机器学习领域,Apache Spark 由于其支持 SQL 类型的操作以及高效的数据处理,被广泛的用于数据预处理流程,同时 TensorFlow 作为广受欢迎的深度学习框架被广泛的用于模型训练。尽管两个框架有一些共同支持的数据格式,但是,作为 TFRecord—TensorFlow 的原生格式,并没有被 Spark 完全支持。尽管之前有过一些尝试,试图解决两个系统之间的差异(比如 Spark-TensorFlow-Connector),但是现有的实现都缺少很多 Spark 支持的重要特性。

本文中,我们将介绍 Spark 的一个新的数据源,Spark-TFRecord。Spark-TFRecord 的目的是提供在Spark中对原生的 TensorFlow 格式进行完全支持。本项目的目的是将TFRecord 作为Spark数据源社区中的第一类公民,类似于 Avro,JSON,Parquet等。Spark-TFRecord 不仅仅提供简单的功能支持,比如 Data Frame的读取、写入,还支持一些高阶功能,比如ParititonBy。使用 Spark-TFRecord 将会使数据处理流程与训练工程完美结合。

LinkedIn 内部 Spark 和 TensorFlow 都被广泛的使用。Spark 被用于数据处理、训练数据预处理流程中。Spark 同时也是数据分析的领先工具。随着原来越多的商业部门使用深度学习模型,TensorFlow 成为了模型训练和模型服务的主流工具。开源的TensorFlow 模型使用 TFRecord 作为数据格式,而LinkedIn 内部大部分使用 Avro 格式。为了模型训练,我们或者修改代码使模型训练能够读取avro格式,或者将avro格式的datasets转化为TFRecord。Spark-TFRecod主要是解决后者,即将不同格式转化为TFRecord。

现有的项目和之前的尝试

在 Spark-TFRecord 项目之前,社区提供 Spark-TensorFlow-Connector , 在 Spark 中读写 TFRecord 。Spark-TensorFlow-Connector 是 TensorFlow 生态圈的一部分,并且是由 DataBricks,spark 的创始公司提供。尽管 Spark-TensorFlow-Connector 提供基本的读写功能,但是我们在LinkedIn的使用中发现了两个问题。首先,它基于 RelationProvider 接口。这个接口主要用于Spark 与数据库连接,磁盘读写操作都是由数据库来支持。然而 Spark-TensorFlow-Connector 的使用场景是磁盘IO,而不是连接数据库,这块接口需要开发者自己实现 RelationProvider 来支持IO操作。这就是为什么Spark-TensorFlow-Connector 大量代码是用于不同的磁盘读写场景。

此外,Spark-TensorFlow-Connector 缺少一些 Spark支持的重要功能,比如 PartitionBy 用于将dataset 根据不同列进行分片。我们发现这个功能在LinkedIn 中对于模型训练非常重要,提供训练过程中根据实体IDs进行切分进行分布式训练。这个功能在TensorFlow 社区中也是高需求。

Spark-TFRrecord 为了解决上述问题,实现了FileFormat 接口,其他的原生格式比如 Avro,Parquet 等也实现了该接口。使用该接口后,TFRecord 就获取了所有的 DataFrame 和 DataSet 的I/O API,包括之前说的 PartitionBy 功能。此外,之后的 Spark I/O 接口的功能增强也能够自动获取到。

设计

我们起初考虑对 Spark-TensorFlow-Connector 打补丁的方式去获取 PartitionBy 功能。检查过源码后,我们发现 Spark-TensorFlow-Connector 使用的RelationProvider接口,是用于连接 Spark 与 SQL 数据库的,不适用于 TensorFlow 场景。然后并没有一个简单解决方式去解决 RelationProvider 并不提供磁盘I/O操作这一问题。于是,我们决定采取了不同的方式,我们实现了FileFormat,FileFormat是用来实现底层的基于文件的I/O操作。实现这一功能对LinkedIn的场景是非常有用的,我们的datasets基本上都是直接读写磁盘。

下图展示了各个模块
image.png

每个模块作用如下:

Schema Inferencer: 用于将Spark的数据类型推测为TFRecord的数据类型,我们复用了很多Spark-Tensorflow-Connector功能。

TFRecord Reader: 读取磁盘中TFRecord文件并使用反序列化器将TFRecord转换为Spark的InternalRow数据结构。

TFRecord Writer:将Spark的InternalRow数据结构通过序列化器转化为TFRecord格式并保存至磁盘。我们使用TensorFlow Hadoop库的写入器。

TFRecord Deserializer: 反序列化器,将TFRecord转化为Spark InternalRow。

TFRecord Serializer: 序列化器,将Spark InternalRow转化为TFRecord。

如何使用Spark-TFRecord

Spark-TFRecord与Spark-TensorFlow-Connector完全后向兼容。迁移十分方便,只需要加入spark-tfrecord jar包并且指定数据格式为“tfrecord”。下面的例子显示了如何使用Spark-TFRecord去读取倾斜和partition TFRecord文件。更多的例子可以参照github仓库

// launch spark-shell with the following command:
// SPARK_HOME/bin/spark-shell --jar target/spark-tfrecord_2.11-0.1.jar
import org.apache.spark.sql.SaveMode
val df = Seq((8, "bat"),(8, "abc"), (1, "xyz"), (2, "aaa")).toDF("number", "word")
df.show
// scala> df.show
// +------+----+
// |number|word|
// +------+----+
// |     8| bat|
// |     8| abc|
// |     1| xyz|
// |     2| aaa|
// +------+----+
val tf_output_dir = "/tmp/tfrecord-test"
// dump the tfrecords to files.
df.repartition(3, col("number")).write.mode(SaveMode.Overwrite).partitionBy("number").format("tfrecord").option("recordType", "Example").save(tf_output_dir)
// ls /tmp/tfrecord-test
// _SUCCESS        number=1        number=2        number=8
// read back the tfrecords from files.
val new_df = spark.read.format("tfrecord").option("recordType", "Example").load(tf_output_dir)
new_df.show
// scala> new_df.show
// +----+------+
// |word|number|
// +----+------+
// | bat|     8|
// | abc|     8|
// | xyz|     1|
// | aaa|     2|

总结

Spark-TFRecord使得Record可以作为Spark 数据格式的一等公民与其他数据格式一起使用。包含了所有dataframe API的功能,比如读、写、分区等。目前我们仅限于schemas符合Spark-Tensorflow-Connector要求。未来的工作将会提供更复杂的schemas支持。


原文链接:

https://engineering.linkedin.com/blog/2020/spark-tfrecord


阿里巴巴开源大数据技术团队成立Apache Spark中国技术社区,定期推送精彩案例,技术专家直播,问答区近万人Spark技术同学在线提问答疑,只为营造纯粹的Spark氛围,欢迎钉钉扫码加入!
二维码spark群.JPG

对开源大数据和感兴趣的同学可以加小编微信(下图二维码,备注“进群”)进入技术交流微信群。

image.png

Apache Spark技术交流社区公众号,微信扫一扫关注

image.png

相关实践学习
基于EMR Serverless StarRocks一键玩转世界杯
基于StarRocks构建极速统一OLAP平台
快速掌握阿里云 E-MapReduce
E-MapReduce 是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户提供集群、作业、数据等管理的一站式大数据处理分析服务。 本课程主要介绍阿里云 E-MapReduce 的使用方法。
相关文章
|
3月前
|
机器学习/深度学习 分布式计算 算法
Spark快速大数据分析PDF下载读书分享推荐
《Spark快速大数据分析》适合初学者,聚焦Spark实用技巧,同时深入核心概念。作者团队来自Databricks,书中详述Spark 3.0新特性,结合机器学习展示大数据分析。Spark是大数据分析的首选工具,本书助你驾驭这一利器。[PDF下载链接][1]。 ![Spark Book Cover][2] [1]: https://zhangfeidezhu.com/?p=345 [2]: https://i-blog.csdnimg.cn/direct/6b851489ad1944548602766ea9d62136.png#pic_center
124 1
Spark快速大数据分析PDF下载读书分享推荐
|
2月前
|
分布式计算 资源调度 大数据
【决战大数据之巅】:Spark Standalone VS YARN —— 揭秘两大部署模式的恩怨情仇与终极对决!
【8月更文挑战第7天】随着大数据需求的增长,Apache Spark 成为关键框架。本文对比了常见的 Spark Standalone 与 YARN 部署模式。Standalone 作为自带的轻量级集群管理服务,易于设置,适用于小规模或独立部署;而 YARN 作为 Hadoop 的资源管理系统,支持资源的统一管理和调度,更适合大规模生产环境及多框架集成。我们将通过示例代码展示如何在这两种模式下运行 Spark 应用程序。
165 3
|
1月前
|
机器学习/深度学习 分布式计算 大数据
Spark 适合解决多种类型的大数据处理问题
【9月更文挑战第1天】Spark 适合解决多种类型的大数据处理问题
38 3
|
2月前
|
分布式计算 大数据 Apache
跨越界限:当.NET遇上Apache Spark,大数据世界的新篇章如何谱写?
【8月更文挑战第28天】随着信息时代的发展,大数据已成为推动企业决策、科研与技术创新的关键力量。Apache Spark凭借其卓越的分布式计算能力和多功能数据处理特性,在大数据领域占据重要地位。然而,对于.NET开发者而言,如何在Spark生态中发挥自身优势成为一个新课题。为此,微软与Apache Spark社区共同推出了.NET for Apache Spark,使开发者能用C#、F#等语言编写Spark应用,不仅保留了Spark的强大功能,还融合了.NET的强类型系统、丰富库支持及良好跨平台能力,极大地降低了学习门槛并拓展了.NET的应用范围。
52 3
|
2月前
|
分布式计算 大数据 数据处理
Apache Spark的应用与优势:解锁大数据处理的无限潜能
【8月更文挑战第23天】Apache Spark以其卓越的性能、易用性、通用性、弹性与可扩展性以及丰富的生态系统,在大数据处理领域展现出了强大的竞争力和广泛的应用前景。随着大数据技术的不断发展和普及,Spark必将成为企业实现数字化转型和业务创新的重要工具。未来,我们有理由相信,Spark将继续引领大数据处理技术的发展潮流,为企业创造更大的价值。
|
4月前
|
存储 分布式计算 Hadoop
Spark和Hadoop都是大数据处理领域的重要工具
【6月更文挑战第17天】Spark和Hadoop都是大数据处理领域的重要工具
164 59
|
2月前
|
Java Spring API
Spring框架与GraphQL的史诗级碰撞:颠覆传统,重塑API开发的未来传奇!
【8月更文挑战第31天】《Spring框架与GraphQL:构建现代API》介绍了如何结合Spring框架与GraphQL构建高效、灵活的API。首先通过引入`spring-boot-starter-data-graphql`等依赖支持GraphQL,然后定义查询和类型,利用`@GraphQLQuery`等注解实现具体功能。Spring的依赖注入和事务管理进一步增强了GraphQL服务的能力。示例展示了从查询到突变的具体实现,证明了Spring与GraphQL结合的强大潜力,适合现代API设计与开发。
55 0
|
2月前
|
分布式计算 Hadoop 大数据
Spark 与 Hadoop 的大数据之战:一场惊心动魄的技术较量,决定数据处理的霸权归属!
【8月更文挑战第7天】无论是 Spark 的高效内存计算,还是 Hadoop 的大规模数据存储和处理能力,它们都为大数据的发展做出了重要贡献。
70 2
|
3月前
|
分布式计算 Hadoop 大数据
Hadoop与Spark在大数据处理中的对比
【7月更文挑战第30天】Hadoop和Spark在大数据处理中各有优势,选择哪个框架取决于具体的应用场景和需求。Hadoop适合处理大规模数据的离线分析,而Spark则更适合需要快速响应和迭代计算的应用场景。在实际应用中,可以根据数据处理的需求、系统的可扩展性、成本效益等因素综合考虑,选择适合的框架进行大数据处理。
|
2月前
|
大数据 RDMA
神龙大数据加速引擎MRACC问题之MRACC-Spark利用eRDMA近网络优化插件来提升性能如何解决
神龙大数据加速引擎MRACC问题之MRACC-Spark利用eRDMA近网络优化插件来提升性能如何解决
33 0
下一篇
无影云桌面