【译】Apache Spark 2.4 内置数据源 Apache Avro

简介: 原文链接: Apache Avro as a Built-in Data Source in Apache Spark 2.4 Apache Avro 是一种流行的数据序列化格式。它广泛使用于 Apache Spark 和 Apache Hadoop 生态中,尤其适用于基于 Kafka 的数据流场景。

原文链接: Apache Avro as a Built-in Data Source in Apache Spark 2.4

Apache Avro 是一种流行的数据序列化格式。它广泛使用于 Apache Spark 和 Apache Hadoop 生态中,尤其适用于基于 Kafka 的数据流场景。从 Apache Spark 2.4 版本开始,Spark 原生支持了 Avro 数据的读写。新的内置 spark-avro 模块最初来自 Databricks 开源项目 Avro Data Source for Apache Spark(后文简称为 spark-avro )。 此外, 它还提供了:

  • 新函数 from_avro()to_avro() 用于在 DataFrame 中读写 Avro 数据,而不仅仅是文件。
  • Avro 逻辑类型支持, 包括 Decimal、Timestamp 和日期类型。
  • 2 倍的读吞吐量提升和 10% 的写吞吐量提升。

这篇博客中, 我们会通过示例逐条的讲解上述的每个功能,通过例子你会发现其 API 的易用性,高性能等优点。

Load 和 Save 函数

在 Apache Spark 2.4 中,只需要在 DataFrameReander 和 DataFrameWriter 中将文件格式指定为 “avro” 就能够加载和保存 Avro 格式数据。出于一致性考虑,用法和其他内置数据源类似。

val usersDF = spark.read.format("avro").load("examples/src/main/resources/users.avro")
usersDF.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro")

from_avro()to_avro() 函数

为了进一步简化数据转换流程,我们引入了 2 个新的内置函数: from_avro()to_avro()。Avro 常用于序列化/反序列化基于 Apache Kafka 的数据流中的消息/数据。在读取或写入 Kafka 时,把 Avro 记录当做列是非常有用的。每个 Kafka 键值对记录都会相应的新增一些元数据, 例如摄取时间戳、偏移量等。

上述函数非常有用的三个场景:

  • 当 Spark 从 Kafka 读取 Avro 二进制数据时,from_avro() 可以提取数据,清理数据并对其进行转换。
  • 如果要将结构转换为 Avro 二进制记录,然后再将其重新推送到的 Kafka 或将其写入文件,使用 to_avro()
  • 如果要将多个列重新编码为单个列,使用 to_avro()

上述函数仅支持 Scala 和 Java 中使用。

import org.apache.spark.sql.avro._

// `from_avro` requires Avro schema in JSON string format.
val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc")))

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()

// 1. Decode the Avro data into a struct;
// 2. Filter by column `favorite_color`;
// 3. Encode the column `name` in Avro format.
val output = df
  .select(from_avro('value, jsonFormatSchema) as 'user)
  .where("user.favorite_color == \"red\"")
  .select(to_avro($"user.name") as 'value)

val query = output
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic2")
  .start()

查看更多示例,点击 Read and Write Streaming Avro Data with DataFrames

与 Databricks spark-avro 的兼容性

内置的 spark-avro 模块与 Databricks 的开源库 spark-avro 兼容。
使用内置 Avro 模块可以加载/写入先前使用 com.databricks.spark.avro 创建的数据源表,而无需任何代码更改。实际上,如果更喜欢使用自己构建的 spark-avro jar 文件,则只需禁用配置 spark.sql.legacy.replaceDatabricksSparkAvro.enabled ,并在部署应用程序时使用选项 --jars。有关详细信息,请阅读 应用程序提交指南 中的 高级依赖管理 部分。

性能改进

通过 SPARK-24800 的 IO 优化,内置的 Avro 数据源在读取和写入 Avro 文件都实现了性能提升。我们进行了一些基准测试,观察到读取性能提高了 2 倍,写入性能提高了 8% 。

配置和方法

我们在 Databricks 社区版 上的单个节点 Apache Spark 集群上运行了基准测试。有关基准测试的详细实施,请查看 Avro 基准测试手册
image1
如图表所示,读取性能提升接近 2 倍,写入性能也提高了 8%。

配置细节:

  • 数据:包含各种数据类型的 1 百万行数据的DataFrame:Int / Double / String / Map / Array / Struct等。
  • 集群:6.0 GB 内存,0.88 内核,1 DBU。
  • Databricks运行时版本:5.0(新的内置 spark-avro )和 4.0(外部 Databricks spark-avro 库)。

结论

新的内置 spark-avro 模块在 Spark SQLStructured Streaming 中提供了更好的用户体验和 IO 性能。由于Spark本身对 Avro 的内置支持,最初的 spark-avro 将被弃用。

您可以在 Databricks Runtime 5.0 上尝试 Apache Spark 2.4 版本。 要了解有关如何在云中使用 Apache Avro 进行 Structured Streaming 处理的更多信息,请阅读 Azure Databricks 或 AWS 上的文档。

欢迎spark感兴趣的同学入群技术交流!

image

相关文章
|
11月前
|
存储 人工智能 数据处理
Apache Doris 2025 Roadmap:构建 GenAI 时代实时高效统一的数据底座
秉承“以场景驱动创新” 的核心理念,持续深耕三大核心场景的关键能力,并对大模型 GenAI 场景的融合应用进行重点投入,为智能时代构建实时、高效、统一的数据底座。
589 10
Apache Doris 2025 Roadmap:构建 GenAI 时代实时高效统一的数据底座
|
存储 运维 监控
从 ClickHouse 到 Apache Doris:在网易云音乐日增万亿日志数据场景下的落地
日志数据已成为企业洞察系统状态、监控网络安全及分析业务动态的宝贵资源。网易云音乐引入 Apache Doris 作为日志库新方案,替换了 ClickHouse。解决了 ClickHouse 运维复杂、不支持倒排索引的问题。目前已经稳定运行 3 个季度,规模达到 50 台服务器, 倒排索引将全文检索性能提升7倍,2PB 数据,每天新增日志量超过万亿条,峰值写入吞吐 6GB/s 。
962 5
从 ClickHouse 到 Apache Doris:在网易云音乐日增万亿日志数据场景下的落地
|
存储 SQL 数据挖掘
数据无界、湖仓无界, Apache Doris 湖仓一体解决方案全面解读(上篇)
湖仓一体架构融合了数据湖的低成本、高扩展性,以及数据仓库的高性能、强数据治理能力,高效应对大数据时代的挑战。为助力企业实现湖仓一体的建设,Apache Doris 提出了数据无界和湖仓无界核心理念,并结合自身特性,助力企业加速从 0 到 1 构建湖仓体系,降低转型过程中的风险和成本。本文将对湖仓一体演进及 Apache Doris 湖仓一体方案进行介绍。
1276 1
数据无界、湖仓无界, Apache Doris 湖仓一体解决方案全面解读(上篇)
|
分布式计算 大数据 Apache
利用.NET进行大数据处理:Apache Spark与.NET for Apache Spark
【10月更文挑战第15天】随着大数据成为企业决策和技术创新的关键驱动力,Apache Spark作为高效的大数据处理引擎,广受青睐。然而,.NET开发者面临使用Spark的门槛。本文介绍.NET for Apache Spark,展示如何通过C#和F#等.NET语言,结合Spark的强大功能进行大数据处理,简化开发流程并提升效率。示例代码演示了读取CSV文件及统计分析的基本操作,突显了.NET for Apache Spark的易用性和强大功能。
449 1
|
SQL 消息中间件 大数据
大数据-159 Apache Kylin 构建Cube 准备和测试数据(一)
大数据-159 Apache Kylin 构建Cube 准备和测试数据(一)
391 1
|
SQL 大数据 Apache
大数据-159 Apache Kylin 构建Cube 准备和测试数据(二)
大数据-159 Apache Kylin 构建Cube 准备和测试数据(二)
232 1
|
分布式计算 监控 大数据
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
334 1
|
分布式计算 Java Spark
|
分布式计算 Java Spark
Spark Streaming 数据清理机制
大家刚开始用Spark Streaming时,心里肯定嘀咕,对于一个7*24小时运行的数据,cache住的RDD,broadcast 系统会帮忙自己清理掉么?还是说必须自己做清理?如果系统帮忙清理的话,机制是啥?
3227 0
|
9月前
|
人工智能 分布式计算 大数据
大数据≠大样本:基于Spark的特征降维实战(提升10倍训练效率)
本文探讨了大数据场景下降维的核心问题与解决方案,重点分析了“维度灾难”对模型性能的影响及特征冗余的陷阱。通过数学证明与实际案例,揭示高维空间中样本稀疏性问题,并提出基于Spark的分布式降维技术选型与优化策略。文章详细展示了PCA在亿级用户画像中的应用,包括数据准备、核心实现与效果评估,同时深入探讨了协方差矩阵计算与特征值分解的并行优化方法。此外,还介绍了动态维度调整、非线性特征处理及降维与其他AI技术的协同效应,为生产环境提供了最佳实践指南。最终总结出降维的本质与工程实践原则,展望未来发展方向。
461 0

推荐镜像

更多