【译】Apache Spark 2.4 内置数据源 Apache Avro

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: 原文链接: Apache Avro as a Built-in Data Source in Apache Spark 2.4 Apache Avro 是一种流行的数据序列化格式。它广泛使用于 Apache Spark 和 Apache Hadoop 生态中,尤其适用于基于 Kafka 的数据流场景。

原文链接: Apache Avro as a Built-in Data Source in Apache Spark 2.4

Apache Avro 是一种流行的数据序列化格式。它广泛使用于 Apache Spark 和 Apache Hadoop 生态中,尤其适用于基于 Kafka 的数据流场景。从 Apache Spark 2.4 版本开始,Spark 原生支持了 Avro 数据的读写。新的内置 spark-avro 模块最初来自 Databricks 开源项目 Avro Data Source for Apache Spark(后文简称为 spark-avro )。 此外, 它还提供了:

  • 新函数 from_avro()to_avro() 用于在 DataFrame 中读写 Avro 数据,而不仅仅是文件。
  • Avro 逻辑类型支持, 包括 Decimal、Timestamp 和日期类型。
  • 2 倍的读吞吐量提升和 10% 的写吞吐量提升。

这篇博客中, 我们会通过示例逐条的讲解上述的每个功能,通过例子你会发现其 API 的易用性,高性能等优点。

Load 和 Save 函数

在 Apache Spark 2.4 中,只需要在 DataFrameReander 和 DataFrameWriter 中将文件格式指定为 “avro” 就能够加载和保存 Avro 格式数据。出于一致性考虑,用法和其他内置数据源类似。

val usersDF = spark.read.format("avro").load("examples/src/main/resources/users.avro")
usersDF.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro")

from_avro()to_avro() 函数

为了进一步简化数据转换流程,我们引入了 2 个新的内置函数: from_avro()to_avro()。Avro 常用于序列化/反序列化基于 Apache Kafka 的数据流中的消息/数据。在读取或写入 Kafka 时,把 Avro 记录当做列是非常有用的。每个 Kafka 键值对记录都会相应的新增一些元数据, 例如摄取时间戳、偏移量等。

上述函数非常有用的三个场景:

  • 当 Spark 从 Kafka 读取 Avro 二进制数据时,from_avro() 可以提取数据,清理数据并对其进行转换。
  • 如果要将结构转换为 Avro 二进制记录,然后再将其重新推送到的 Kafka 或将其写入文件,使用 to_avro()
  • 如果要将多个列重新编码为单个列,使用 to_avro()

上述函数仅支持 Scala 和 Java 中使用。

import org.apache.spark.sql.avro._

// `from_avro` requires Avro schema in JSON string format.
val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc")))

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()

// 1. Decode the Avro data into a struct;
// 2. Filter by column `favorite_color`;
// 3. Encode the column `name` in Avro format.
val output = df
  .select(from_avro('value, jsonFormatSchema) as 'user)
  .where("user.favorite_color == \"red\"")
  .select(to_avro($"user.name") as 'value)

val query = output
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic2")
  .start()

查看更多示例,点击 Read and Write Streaming Avro Data with DataFrames

与 Databricks spark-avro 的兼容性

内置的 spark-avro 模块与 Databricks 的开源库 spark-avro 兼容。
使用内置 Avro 模块可以加载/写入先前使用 com.databricks.spark.avro 创建的数据源表,而无需任何代码更改。实际上,如果更喜欢使用自己构建的 spark-avro jar 文件,则只需禁用配置 spark.sql.legacy.replaceDatabricksSparkAvro.enabled ,并在部署应用程序时使用选项 --jars。有关详细信息,请阅读 应用程序提交指南 中的 高级依赖管理 部分。

性能改进

通过 SPARK-24800 的 IO 优化,内置的 Avro 数据源在读取和写入 Avro 文件都实现了性能提升。我们进行了一些基准测试,观察到读取性能提高了 2 倍,写入性能提高了 8% 。

配置和方法

我们在 Databricks 社区版 上的单个节点 Apache Spark 集群上运行了基准测试。有关基准测试的详细实施,请查看 Avro 基准测试手册
image1
如图表所示,读取性能提升接近 2 倍,写入性能也提高了 8%。

配置细节:

  • 数据:包含各种数据类型的 1 百万行数据的DataFrame:Int / Double / String / Map / Array / Struct等。
  • 集群:6.0 GB 内存,0.88 内核,1 DBU。
  • Databricks运行时版本:5.0(新的内置 spark-avro )和 4.0(外部 Databricks spark-avro 库)。

结论

新的内置 spark-avro 模块在 Spark SQLStructured Streaming 中提供了更好的用户体验和 IO 性能。由于Spark本身对 Avro 的内置支持,最初的 spark-avro 将被弃用。

您可以在 Databricks Runtime 5.0 上尝试 Apache Spark 2.4 版本。 要了解有关如何在云中使用 Apache Avro 进行 Structured Streaming 处理的更多信息,请阅读 Azure Databricks 或 AWS 上的文档。

欢迎spark感兴趣的同学入群技术交流!

image

相关实践学习
基于EMR Serverless StarRocks一键玩转世界杯
基于StarRocks构建极速统一OLAP平台
快速掌握阿里云 E-MapReduce
E-MapReduce 是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户提供集群、作业、数据等管理的一站式大数据处理分析服务。 本课程主要介绍阿里云 E-MapReduce 的使用方法。
相关文章
|
14天前
|
SQL DataWorks 关系型数据库
阿里云 DataWorks 正式支持 SelectDB & Apache Doris 数据源,实现 MySQL 整库实时同步
阿里云数据库 SelectDB 版是阿里云与飞轮科技联合基于 Apache Doris 内核打造的现代化数据仓库,支持大规模实时数据上的极速查询分析。通过实时、统一、弹性、开放的核心能力,能够为企业提供高性价比、简单易用、安全稳定、低成本的实时大数据分析支持。SelectDB 具备世界领先的实时分析能力,能够实现秒级的数据实时导入与同步,在宽表、复杂多表关联、高并发点查等不同场景下,提供超越一众国际知名的同类产品的优秀性能,多次登顶 ClickBench 全球数据库分析性能排行榜。
|
28天前
|
分布式计算 大数据 Apache
利用.NET进行大数据处理:Apache Spark与.NET for Apache Spark
【10月更文挑战第15天】随着大数据成为企业决策和技术创新的关键驱动力,Apache Spark作为高效的大数据处理引擎,广受青睐。然而,.NET开发者面临使用Spark的门槛。本文介绍.NET for Apache Spark,展示如何通过C#和F#等.NET语言,结合Spark的强大功能进行大数据处理,简化开发流程并提升效率。示例代码演示了读取CSV文件及统计分析的基本操作,突显了.NET for Apache Spark的易用性和强大功能。
34 1
|
1月前
|
SQL 消息中间件 大数据
大数据-159 Apache Kylin 构建Cube 准备和测试数据(一)
大数据-159 Apache Kylin 构建Cube 准备和测试数据(一)
50 1
|
1月前
|
SQL 大数据 Apache
大数据-159 Apache Kylin 构建Cube 准备和测试数据(二)
大数据-159 Apache Kylin 构建Cube 准备和测试数据(二)
75 1
|
1月前
|
分布式计算 监控 大数据
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
55 1
|
2月前
|
SQL 消息中间件 Java
兼容Trino Connector,扩展Apache Doris数据源接入能力|Lakehouse 使用手册(四)
通过兼容 Connector 插件,Apache Doris 能够支持 Trino/Presto 可对接的所有数据源,而无需改动 Doris 的内核代码。
兼容Trino Connector,扩展Apache Doris数据源接入能力|Lakehouse 使用手册(四)
|
3月前
|
存储 消息中间件 人工智能
AI大模型独角兽 MiniMax 基于阿里云数据库 SelectDB 版内核 Apache Doris 升级日志系统,PB 数据秒级查询响应
早期 MiniMax 基于 Grafana Loki 构建了日志系统,在资源消耗、写入性能及系统稳定性上都面临巨大的挑战。为此 MiniMax 开始寻找全新的日志系统方案,并基于阿里云数据库 SelectDB 版内核 Apache Doris 升级了日志系统,新系统已接入 MiniMax 内部所有业务线日志数据,数据规模为 PB 级, 整体可用性达到 99.9% 以上,10 亿级日志数据的检索速度可实现秒级响应。
AI大模型独角兽 MiniMax 基于阿里云数据库 SelectDB 版内核 Apache Doris 升级日志系统,PB 数据秒级查询响应
|
2月前
|
存储 大数据 数据挖掘
【数据新纪元】Apache Doris:重塑实时分析性能,解锁大数据处理新速度,引爆数据价值潜能!
【9月更文挑战第5天】Apache Doris以其卓越的性能、灵活的架构和高效的数据处理能力,正在重塑实时分析的性能极限,解锁大数据处理的新速度,引爆数据价值的无限潜能。在未来的发展中,我们有理由相信Apache Doris将继续引领数据处理的潮流,为企业提供更快速、更准确、更智能的数据洞察和决策支持。让我们携手并进,共同探索数据新纪元的无限可能!
141 11
|
29天前
|
SQL Java API
Apache Flink 2.0-preview released
Apache Flink 社区正积极筹备 Flink 2.0 的发布,这是自 Flink 1.0 发布以来的首个重大更新。Flink 2.0 将引入多项激动人心的功能和改进,包括存算分离状态管理、物化表、批作业自适应执行等,同时也包含了一些不兼容的变更。目前提供的预览版旨在让用户提前尝试新功能并收集反馈,但不建议在生产环境中使用。
581 13
Apache Flink 2.0-preview released
|
1月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
65 3

推荐镜像

更多