【译】Apache Spark 2.4 内置数据源 Apache Avro

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: 原文链接: Apache Avro as a Built-in Data Source in Apache Spark 2.4 Apache Avro 是一种流行的数据序列化格式。它广泛使用于 Apache Spark 和 Apache Hadoop 生态中,尤其适用于基于 Kafka 的数据流场景。

原文链接: Apache Avro as a Built-in Data Source in Apache Spark 2.4

Apache Avro 是一种流行的数据序列化格式。它广泛使用于 Apache Spark 和 Apache Hadoop 生态中,尤其适用于基于 Kafka 的数据流场景。从 Apache Spark 2.4 版本开始,Spark 原生支持了 Avro 数据的读写。新的内置 spark-avro 模块最初来自 Databricks 开源项目 Avro Data Source for Apache Spark(后文简称为 spark-avro )。 此外, 它还提供了:

  • 新函数 from_avro()to_avro() 用于在 DataFrame 中读写 Avro 数据,而不仅仅是文件。
  • Avro 逻辑类型支持, 包括 Decimal、Timestamp 和日期类型。
  • 2 倍的读吞吐量提升和 10% 的写吞吐量提升。

这篇博客中, 我们会通过示例逐条的讲解上述的每个功能,通过例子你会发现其 API 的易用性,高性能等优点。

Load 和 Save 函数

在 Apache Spark 2.4 中,只需要在 DataFrameReander 和 DataFrameWriter 中将文件格式指定为 “avro” 就能够加载和保存 Avro 格式数据。出于一致性考虑,用法和其他内置数据源类似。

val usersDF = spark.read.format("avro").load("examples/src/main/resources/users.avro")
usersDF.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro")

from_avro()to_avro() 函数

为了进一步简化数据转换流程,我们引入了 2 个新的内置函数: from_avro()to_avro()。Avro 常用于序列化/反序列化基于 Apache Kafka 的数据流中的消息/数据。在读取或写入 Kafka 时,把 Avro 记录当做列是非常有用的。每个 Kafka 键值对记录都会相应的新增一些元数据, 例如摄取时间戳、偏移量等。

上述函数非常有用的三个场景:

  • 当 Spark 从 Kafka 读取 Avro 二进制数据时,from_avro() 可以提取数据,清理数据并对其进行转换。
  • 如果要将结构转换为 Avro 二进制记录,然后再将其重新推送到的 Kafka 或将其写入文件,使用 to_avro()
  • 如果要将多个列重新编码为单个列,使用 to_avro()

上述函数仅支持 Scala 和 Java 中使用。

import org.apache.spark.sql.avro._

// `from_avro` requires Avro schema in JSON string format.
val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc")))

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()

// 1. Decode the Avro data into a struct;
// 2. Filter by column `favorite_color`;
// 3. Encode the column `name` in Avro format.
val output = df
  .select(from_avro('value, jsonFormatSchema) as 'user)
  .where("user.favorite_color == \"red\"")
  .select(to_avro($"user.name") as 'value)

val query = output
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic2")
  .start()

查看更多示例,点击 Read and Write Streaming Avro Data with DataFrames

与 Databricks spark-avro 的兼容性

内置的 spark-avro 模块与 Databricks 的开源库 spark-avro 兼容。
使用内置 Avro 模块可以加载/写入先前使用 com.databricks.spark.avro 创建的数据源表,而无需任何代码更改。实际上,如果更喜欢使用自己构建的 spark-avro jar 文件,则只需禁用配置 spark.sql.legacy.replaceDatabricksSparkAvro.enabled ,并在部署应用程序时使用选项 --jars。有关详细信息,请阅读 应用程序提交指南 中的 高级依赖管理 部分。

性能改进

通过 SPARK-24800 的 IO 优化,内置的 Avro 数据源在读取和写入 Avro 文件都实现了性能提升。我们进行了一些基准测试,观察到读取性能提高了 2 倍,写入性能提高了 8% 。

配置和方法

我们在 Databricks 社区版 上的单个节点 Apache Spark 集群上运行了基准测试。有关基准测试的详细实施,请查看 Avro 基准测试手册
image1
如图表所示,读取性能提升接近 2 倍,写入性能也提高了 8%。

配置细节:

  • 数据:包含各种数据类型的 1 百万行数据的DataFrame:Int / Double / String / Map / Array / Struct等。
  • 集群:6.0 GB 内存,0.88 内核,1 DBU。
  • Databricks运行时版本:5.0(新的内置 spark-avro )和 4.0(外部 Databricks spark-avro 库)。

结论

新的内置 spark-avro 模块在 Spark SQLStructured Streaming 中提供了更好的用户体验和 IO 性能。由于Spark本身对 Avro 的内置支持,最初的 spark-avro 将被弃用。

您可以在 Databricks Runtime 5.0 上尝试 Apache Spark 2.4 版本。 要了解有关如何在云中使用 Apache Avro 进行 Structured Streaming 处理的更多信息,请阅读 Azure Databricks 或 AWS 上的文档。

欢迎spark感兴趣的同学入群技术交流!

image

相关实践学习
基于EMR Serverless StarRocks一键玩转世界杯
基于StarRocks构建极速统一OLAP平台
快速掌握阿里云 E-MapReduce
E-MapReduce 是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户提供集群、作业、数据等管理的一站式大数据处理分析服务。 本课程主要介绍阿里云 E-MapReduce 的使用方法。
相关文章
|
2月前
|
分布式计算 关系型数据库 MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
56 3
|
1月前
|
SQL DataWorks 关系型数据库
阿里云 DataWorks 正式支持 SelectDB & Apache Doris 数据源,实现 MySQL 整库实时同步
阿里云数据库 SelectDB 版是阿里云与飞轮科技联合基于 Apache Doris 内核打造的现代化数据仓库,支持大规模实时数据上的极速查询分析。通过实时、统一、弹性、开放的核心能力,能够为企业提供高性价比、简单易用、安全稳定、低成本的实时大数据分析支持。SelectDB 具备世界领先的实时分析能力,能够实现秒级的数据实时导入与同步,在宽表、复杂多表关联、高并发点查等不同场景下,提供超越一众国际知名的同类产品的优秀性能,多次登顶 ClickBench 全球数据库分析性能排行榜。
|
2月前
|
分布式计算 大数据 Apache
利用.NET进行大数据处理:Apache Spark与.NET for Apache Spark
【10月更文挑战第15天】随着大数据成为企业决策和技术创新的关键驱动力,Apache Spark作为高效的大数据处理引擎,广受青睐。然而,.NET开发者面临使用Spark的门槛。本文介绍.NET for Apache Spark,展示如何通过C#和F#等.NET语言,结合Spark的强大功能进行大数据处理,简化开发流程并提升效率。示例代码演示了读取CSV文件及统计分析的基本操作,突显了.NET for Apache Spark的易用性和强大功能。
60 1
|
2月前
|
SQL 消息中间件 大数据
大数据-159 Apache Kylin 构建Cube 准备和测试数据(一)
大数据-159 Apache Kylin 构建Cube 准备和测试数据(一)
74 1
|
2月前
|
SQL 大数据 Apache
大数据-159 Apache Kylin 构建Cube 准备和测试数据(二)
大数据-159 Apache Kylin 构建Cube 准备和测试数据(二)
87 1
|
2月前
|
分布式计算 监控 大数据
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
80 1
|
3月前
|
SQL 消息中间件 Java
兼容Trino Connector,扩展Apache Doris数据源接入能力|Lakehouse 使用手册(四)
通过兼容 Connector 插件,Apache Doris 能够支持 Trino/Presto 可对接的所有数据源,而无需改动 Doris 的内核代码。
兼容Trino Connector,扩展Apache Doris数据源接入能力|Lakehouse 使用手册(四)
|
4月前
|
存储 消息中间件 人工智能
AI大模型独角兽 MiniMax 基于阿里云数据库 SelectDB 版内核 Apache Doris 升级日志系统,PB 数据秒级查询响应
早期 MiniMax 基于 Grafana Loki 构建了日志系统,在资源消耗、写入性能及系统稳定性上都面临巨大的挑战。为此 MiniMax 开始寻找全新的日志系统方案,并基于阿里云数据库 SelectDB 版内核 Apache Doris 升级了日志系统,新系统已接入 MiniMax 内部所有业务线日志数据,数据规模为 PB 级, 整体可用性达到 99.9% 以上,10 亿级日志数据的检索速度可实现秒级响应。
AI大模型独角兽 MiniMax 基于阿里云数据库 SelectDB 版内核 Apache Doris 升级日志系统,PB 数据秒级查询响应
|
3月前
|
存储 大数据 数据挖掘
【数据新纪元】Apache Doris:重塑实时分析性能,解锁大数据处理新速度,引爆数据价值潜能!
【9月更文挑战第5天】Apache Doris以其卓越的性能、灵活的架构和高效的数据处理能力,正在重塑实时分析的性能极限,解锁大数据处理的新速度,引爆数据价值的无限潜能。在未来的发展中,我们有理由相信Apache Doris将继续引领数据处理的潮流,为企业提供更快速、更准确、更智能的数据洞察和决策支持。让我们携手并进,共同探索数据新纪元的无限可能!
163 11
|
10天前
|
存储 人工智能 大数据
The Past, Present and Future of Apache Flink
本文整理自阿里云开源大数据负责人王峰(莫问)在 Flink Forward Asia 2024 上海站主论坛开场的分享,今年正值 Flink 开源项目诞生的第 10 周年,借此时机,王峰回顾了 Flink 在过去 10 年的发展历程以及 Flink社区当前最新的技术成果,最后展望下一个十年 Flink 路向何方。
286 33
The Past, Present and Future of Apache Flink

推荐镜像

更多