原文链接: Apache Avro as a Built-in Data Source in Apache Spark 2.4
Apache Avro 是一种流行的数据序列化格式。它广泛使用于 Apache Spark 和 Apache Hadoop 生态中,尤其适用于基于 Kafka 的数据流场景。从 Apache Spark 2.4 版本开始,Spark 原生支持了 Avro 数据的读写。新的内置 spark-avro 模块最初来自 Databricks 开源项目 Avro Data Source for Apache Spark(后文简称为 spark-avro )。 此外, 它还提供了:
- 新函数 from_avro() 和 to_avro() 用于在 DataFrame 中读写 Avro 数据,而不仅仅是文件。
- Avro 逻辑类型支持, 包括 Decimal、Timestamp 和日期类型。
- 2 倍的读吞吐量提升和 10% 的写吞吐量提升。
这篇博客中, 我们会通过示例逐条的讲解上述的每个功能,通过例子你会发现其 API 的易用性,高性能等优点。
Load 和 Save 函数
在 Apache Spark 2.4 中,只需要在 DataFrameReander 和 DataFrameWriter 中将文件格式指定为 “avro” 就能够加载和保存 Avro 格式数据。出于一致性考虑,用法和其他内置数据源类似。
val usersDF = spark.read.format("avro").load("examples/src/main/resources/users.avro")
usersDF.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro")
from_avro() 和 to_avro() 函数
为了进一步简化数据转换流程,我们引入了 2 个新的内置函数: from_avro() 和 to_avro()。Avro 常用于序列化/反序列化基于 Apache Kafka 的数据流中的消息/数据。在读取或写入 Kafka 时,把 Avro 记录当做列是非常有用的。每个 Kafka 键值对记录都会相应的新增一些元数据, 例如摄取时间戳、偏移量等。
上述函数非常有用的三个场景:
- 当 Spark 从 Kafka 读取 Avro 二进制数据时,from_avro() 可以提取数据,清理数据并对其进行转换。
- 如果要将结构转换为 Avro 二进制记录,然后再将其重新推送到的 Kafka 或将其写入文件,使用 to_avro() 。
- 如果要将多个列重新编码为单个列,使用 to_avro() 。
上述函数仅支持 Scala 和 Java 中使用。
import org.apache.spark.sql.avro._
// `from_avro` requires Avro schema in JSON string format.
val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc")))
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
// 1. Decode the Avro data into a struct;
// 2. Filter by column `favorite_color`;
// 3. Encode the column `name` in Avro format.
val output = df
.select(from_avro('value, jsonFormatSchema) as 'user)
.where("user.favorite_color == \"red\"")
.select(to_avro($"user.name") as 'value)
val query = output
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic2")
.start()
查看更多示例,点击 Read and Write Streaming Avro Data with DataFrames。
与 Databricks spark-avro 的兼容性
内置的 spark-avro 模块与 Databricks 的开源库 spark-avro 兼容。
使用内置 Avro 模块可以加载/写入先前使用 com.databricks.spark.avro 创建的数据源表,而无需任何代码更改。实际上,如果更喜欢使用自己构建的 spark-avro jar 文件,则只需禁用配置 spark.sql.legacy.replaceDatabricksSparkAvro.enabled ,并在部署应用程序时使用选项 --jars。有关详细信息,请阅读 应用程序提交指南 中的 高级依赖管理 部分。
性能改进
通过 SPARK-24800 的 IO 优化,内置的 Avro 数据源在读取和写入 Avro 文件都实现了性能提升。我们进行了一些基准测试,观察到读取性能提高了 2 倍,写入性能提高了 8% 。
配置和方法
我们在 Databricks 社区版 上的单个节点 Apache Spark 集群上运行了基准测试。有关基准测试的详细实施,请查看 Avro 基准测试手册。
如图表所示,读取性能提升接近 2 倍,写入性能也提高了 8%。
配置细节:
- 数据:包含各种数据类型的 1 百万行数据的DataFrame:Int / Double / String / Map / Array / Struct等。
- 集群:6.0 GB 内存,0.88 内核,1 DBU。
- Databricks运行时版本:5.0(新的内置 spark-avro )和 4.0(外部 Databricks spark-avro 库)。
结论
新的内置 spark-avro 模块在 Spark SQL 和 Structured Streaming 中提供了更好的用户体验和 IO 性能。由于Spark本身对 Avro 的内置支持,最初的 spark-avro 将被弃用。
您可以在 Databricks Runtime 5.0 上尝试 Apache Spark 2.4 版本。 要了解有关如何在云中使用 Apache Avro 进行 Structured Streaming 处理的更多信息,请阅读 Azure Databricks 或 AWS 上的文档。