Apache Spark 3.0 中的向量化 IO

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: 在 Apache Spark 3.0 中,SparkR 中引入了一种新的向量化(vectorized)实现,它利用 Apache Arrow 直接在 JVM 和 R 之间交换数据,且(反)序列化成本非常小

本文转载自:过往记忆大数据
原文链接


R 是数据科学中最流行的计算机语言之一,专门用于统计分析和一些扩展,如用于数据处理和机器学习任务的 RStudio addins 和其他 R 包。此外,它使数据科学家能够轻松地可视化他们的数据集。

通过在 Apache Spark 中使用 SparkR,可以很容易地扩展 R 代码。要交互式地运行作业,可以通过运行 R shell 轻松地在分布式集群中运行 R 的作业。

当 SparkR 不需要与 R 进程交互时,其性能实际上与 Scala、Java 和 Python 等其他语言 API 相同。但是,当 SparkR 作业与本机 R 函数或数据类型交互时,会性能显著下降。

如果在 Spark 和 R 之间使用 Apache Arrow 来进行数据交换,其性能会有很大的提升。这篇博客文章概述了 SparkR 中 Spark 和 R 的交互,并对比了没有向量化执行和有向量化执行的性能差异。

Spark 和 R 交互

SparkR 不仅支持丰富的 ML 和类似 SQL 的 API 集合,而且还支持用于直接与 R 代码进行交互的一组 API。例如,Spark DataFrame 和 R DataFrame 之间的无缝转换以及在 Spark DataFrame 上以分布式的方式执行 R 内置函数。

在大多数情况下,Spark 中的其他语言 API 之间的性能实际上是一致的——例如,当用户代码依赖于 Spark UDF 或者 SQL API 时,执行过程完全在 JVM 中进行, I/O 方面没有任何性能损失。比如下面的两种调用时间都只需要一秒:

// Scala API
// ~1 second
sql("SELECT id FROM range(2000000000)").filter("id > 10").count()

# R API
# ~1 second
count(filter(sql("SELECT * FROM range(2000000000)"), "id > 10"))

但是,在需要执行 R 的内置函数或将其从 R 内置类型转换到其他语言类型的情况下,其性能将有很大不同,如下所示。

// Scala API
val ds = (1L to 100000L).toDS
// ~1 second
ds.mapPartitions(iter => iter.filter(_ < 50000)).count()

# R API
df <- createDataFrame(lapply(seq(100000), function (e) list(value=e)))
# ~15 seconds - 15 times slower
count(dapply(
df, function(x) as.data.frame(x[x$value < 50000,]), schema(df)))

上面其实仅仅是对每个分区中过滤出小于 50000 的数据,然后对其进行 count 操作,但是 SparkR 却比 Scala 编写的代码慢 15 倍!

// Scala API
// ~0.2 seconds
val df = sql("SELECT * FROM range(1000000)").collect()

# R API
# ~8 seconds - 40 times slower
df <- collect(sql("SELECT * FROM range(1000000)"))

上面这个例子情况更糟糕,其仅仅是将数据收集到 Driver 端,但是 SparkR 比 Scala 要慢 40 倍!

这是因为上面计算需要与 R 内置函数或数据类型交互的 API ,但是其实现效率不高。在 SparkR 中类似的函数还有六个:

createDataFrame()
collect()
dapply()
dapplyCollect()
gapply()
gapplyCollect()
简单来说,createDataFrame() 和 collect() 需要在 JVM 和 R 之间进行序列化/反序列化,并且对数据进行转换,比如 Java 中的字符串需要转换成 R 中的 character。

原始实现(Native implementation)

image.png

上图中 SparkR DataFrame 的计算是分布在 Spark 集群上所有可用的节点上。如果不需要将数据以 R 的 data.frame 进行收集(collect)或不需要执行 R 内置函数,则在 Driver 或 executor 端不需要与 R 进程进行通信。但是当它需要使用 R 的 data.frame 或使用 R 的内置函数时,需要 Driver 或 executor 使用 sockets 使得 JVM 和 R 进行通信。

这需要在 JVM 和 R 直接对交换的数据进行序列化和反序列化操作,而这个操作的编码格式非常低效,完全没有考虑到现代 CPU 的设计,比如 CPU pipelining。

向量化执行(Vectorized implementation)

在 Apache Spark 3.0 中,SparkR 中引入了一种新的向量化(vectorized)实现,它利用 Apache Arrow 直接在 JVM 和 R 之间交换数据,且(反)序列化成本非常小,具体如下:
image.png

新的实现方式并没有在 JVM 和 R 之间使用低效的格式对数据逐行进行(反)序列化,而是利用 Apache Arrow 以高效的列格式进行流水线处理和单指令多数据(SIMD)。

新的矢量化 SparkR API 默认情况下未启用,但可以通过在 Apache Spark 3.0 中将 spark.sql.execution.arrow.sparkr.enabled 设置为 true 来启用。注意,dapplyCollect() 和 gapplyCollect() 矢量化操作尚未实现。建议使用 dapply() 和 gapply() 来替代。

基准测试结果

下面的基准测试使用的数据集为 500,000 条记录。分别测试使用和未使用矢量化的执行时间:

image.png

使用矢量化优化之后,collect() 和 createDataFrame() 性能分别大致提升 17 倍和 42x 倍;而对 dapply() 和 gapply(), 分别提升了43x 和 33x 。

从上面的启发可以看到,如果我们需要在不同系统之间进行数据交互,也可以使用 Apache Arrow。


阿里巴巴开源大数据技术团队成立Apache Spark中国技术社区,定期推送精彩案例,技术专家直播,问答区近万人Spark技术同学在线提问答疑,只为营造纯粹的Spark氛围,欢迎钉钉扫码加入!
二维码spark群.JPG

对开源大数据和感兴趣的同学可以加小编微信(下图二维码,备注“进群”)进入技术交流微信群。

image.png

Apache Spark技术交流社区公众号,微信扫一扫关注

image.png

相关实践学习
基于EMR Serverless StarRocks一键玩转世界杯
基于StarRocks构建极速统一OLAP平台
快速掌握阿里云 E-MapReduce
E-MapReduce 是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户提供集群、作业、数据等管理的一站式大数据处理分析服务。 本课程主要介绍阿里云 E-MapReduce 的使用方法。
相关文章
|
5月前
|
分布式计算 大数据 数据处理
Apache Spark:提升大规模数据处理效率的秘籍
【4月更文挑战第7天】本文介绍了Apache Spark的大数据处理优势和核心特性,包括内存计算、RDD、一站式解决方案。分享了Spark实战技巧,如选择部署模式、优化作业执行流程、管理内存与磁盘、Spark SQL优化及监控调优工具的使用。通过这些秘籍,可以提升大规模数据处理效率,发挥Spark在实际项目中的潜力。
264 0
|
5月前
|
负载均衡 应用服务中间件 nginx
org.apache.catalina.connector.ClientAbortException: java.io.IOException: 断开的管道
org.apache.catalina.connector.ClientAbortException: java.io.IOException: 断开的管道
557 0
|
2月前
|
分布式计算 Hadoop 大数据
大数据处理框架在零售业的应用:Apache Hadoop与Apache Spark
【8月更文挑战第20天】Apache Hadoop和Apache Spark为处理海量零售户数据提供了强大的支持
40 0
|
3月前
|
分布式计算 大数据 Spark
Spark大数据处理:技术、应用与性能优化(全)PDF书籍推荐分享
《Spark大数据处理:技术、应用与性能优化》深入浅出介绍Spark核心,涵盖部署、实战与性能调优,适合初学者。作者基于微软和IBM经验,解析Spark工作机制,探讨BDAS生态,提供实践案例,助力快速掌握。书中亦讨论性能优化策略。[PDF下载链接](https://zhangfeidezhu.com/?p=347)。![Spark Web UI](https://img-blog.csdnimg.cn/direct/16aaadbb4e13410f8cb2727c3786cc9e.png#pic_center)
103 1
Spark大数据处理:技术、应用与性能优化(全)PDF书籍推荐分享
|
2月前
|
分布式计算 Serverless 数据处理
EMR Serverless Spark 实践教程 | 通过 Apache Airflow 使用 Livy Operator 提交任务
Apache Airflow 是一个强大的工作流程自动化和调度工具,它允许开发者编排、计划和监控数据管道的执行。EMR Serverless Spark 为处理大规模数据处理任务提供了一个无服务器计算环境。本文为您介绍如何通过 Apache Airflow 的 Livy Operator 实现自动化地向 EMR Serverless Spark 提交任务,以实现任务调度和执行的自动化,帮助您更有效地管理数据处理任务。
157 0
|
3月前
|
分布式计算 Apache Spark
|
4月前
|
分布式计算 大数据 数据处理
Apache Spark在大数据处理中的应用
Apache Spark是大数据处理的热门工具,由AMPLab开发并捐赠给Apache软件基金会。它以内存计算和优化的执行引擎著称,提供比Hadoop更快的处理速度,支持批处理、交互式查询、流处理和机器学习。Spark架构包括Driver、Master、Worker Node和Executor,核心组件有RDD、DataFrame、Dataset、Spark SQL、Spark Streaming、MLlib和GraphX。文章通过代码示例展示了Spark在批处理、交互式查询和实时数据处理中的应用,并讨论了其优势(高性能、易用性、通用性和集成性)和挑战。【6月更文挑战第11天】
96 6
|
5月前
|
Dubbo Java 应用服务中间件
Serialized class org.apache.catalina.core.ApplicationPart must implement java.io.Serializable
Serialized class org.apache.catalina.core.ApplicationPart must implement java.io.Serializable
81 0
|
4月前
|
分布式计算 Spark 大数据
深入探究Apache Spark在大数据处理中的实践应用
【6月更文挑战第2天】Apache Spark是流行的开源大数据处理框架,以其内存计算速度和低延迟脱颖而出。本文涵盖Spark概述、核心组件(包括Spark Core、SQL、Streaming和MLlib)及其在数据预处理、批处理分析、交互式查询、实时处理和机器学习中的应用。通过理解Spark内部机制和实践应用,可提升大数据处理效率,发挥其在各行业的潜力。
|
5月前
|
机器学习/深度学习 数据采集 分布式计算
【机器学习】Spark ML 对数据进行规范化预处理 StandardScaler 与向量拆分
标准化Scaler是数据预处理技术,用于将特征值映射到均值0、方差1的标准正态分布,以消除不同尺度特征的影响,提升模型稳定性和精度。Spark ML中的StandardScaler实现此功能,通过`.setInputCol`、`.setOutputCol`等方法配置并应用到DataFrame数据。示例展示了如何在Spark中使用StandardScaler进行数据规范化,包括创建SparkSession,构建DataFrame,使用VectorAssembler和StandardScaler,以及将向量拆分为列。规范化有助于降低特征重要性,提高模型训练速度和计算效率。
下一篇
无影云桌面