Apache Avro as a Built-in Data Source in Apache Spark 2.4

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: Apache Avro 是一种流行的数据序列化格式。它广泛用于 Apache Spark 和 Apache Hadoop 生态系统,尤其适用于基于 Kafka 的数据管道。从 Apache Spark 2.

Apache Avro 是一种流行的数据序列化格式。它广泛用于 Apache Spark 和 Apache Hadoop 生态系统,尤其适用于基于 Kafka 的数据管道。从 Apache Spark 2.4 版本开始(参见 Apache Spark 2.4 正式发布,重要功能详细介绍),Spark 为读取和写入 Avro 数据提供内置支持。新的内置 spark-avro 模块最初来自 Databricks 的开源项目Avro Data Source for Apache Spark。除此之外,它还提供以下功能:

  • 新函数 from_avro() 和 to_avro() 用于在 DataFrame 中读取和写入 Avro 数据,而不仅仅是文件。
  • 支持 Avro 逻辑类型(logical types),包括 Decimal,Timestamp 和 Date类型。
  • 2倍读取吞吐量提高和10%写入吞吐量提升。

本文将通过示例介绍上面的每个功能。

加载和保存函数

在 Apache Spark 2.4 中,为了读写 Avro 格式的数据,你只需在 DataFrameReader 和 DataFrameWriter 中将文件格式指定为“avro”即可。其用法和其他数据源用法很类似。如下所示:

val iteblogDF = spark.read.format("avro").load("examples/src/main/resources/iteblog.avro")
iteblogDF.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro")

from_avro() 和 to_avro() 的使用

为了进一步简化数据转换流程(transformation pipeline),社区引入了两个新的内置函数:from_avro() 和 to_avro()。Avro 通常用于序列化/反序列化基于 Apache Kafka 的数据管道中的消息或数据,在读取或写入 Kafka 时,将 Avro records 作为列将非常有用。每个 Kafka 键值记录都会增加一些元数据,例如 Kafka 的摄取时间戳,Kafka 的偏移量等。

在以下三种场景,from_avro() 和 to_avro() 函数将非常有用:

  • 当使用 Spark 从 Kafka 中读取 Avro 格式的数据,可以使用 from_avro() 函数来抽取你要的数据,清理数据并对其进行转换。
  • 当你想要将 structs 格式的数据转换为 Avro 二进制记录,然后将它们发送到 Kafka 或写入到文件,你可以使用 to_avro()。
  • 如果你需要将多个列重新编码为单个列,请使用to_avro().

目前这两个函数仅在 Scala 和 Java 语言中可用。from_avro 和 to_avro 函数的使用除了需要人为指定 Avro schema,其他的和使用 from_json 和 to_json 函数一样,下面是这两个函数的使用示例。

在代码里面指定 Avro 模式

import org.apache.spark.sql.avro._
import org.apache.avro.SchemaBuilder
 
val servers = "www.iteblog.com:9092"
// When reading the key and value of a Kafka topic, decode the
// binary (Avro) data into structured data.
// The schema of the resulting DataFrame is: <key: string, value: int>
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", SchemaBuilder.builder().stringType()).as("key"),
    from_avro($"value", SchemaBuilder.builder().intType()).as("value"))
 
// Convert structured data to binary from string (key column) and
// int (value column) and save them to a Kafka topic.
iteblogDF
  .select(
    to_avro($"key").as("key"),
    to_avro($"value").as("value"))
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("topic", "t")
  .save()

通过 Schema Registry 服务提供 Avro 模式

如果我们有 Schema Registry 服务,那么我们就不需要在代码里面指定 Avro 模式了,如下:

import org.apache.spark.sql.avro._
 
// Read a Kafka topic "t", assuming the key and value are already
// registered in Schema Registry as subjects "t-key" and "t-value" of type
// string and int. The binary key and value columns are turned into string
// and int type with Avro and Schema Registry. The schema of the resulting DataFrame
// is: <key: string, value: int>.
val schemaRegistryAddr = "https://www.iteblog.com"
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", "t-key", schemaRegistryAddr).as("key"),
    from_avro($"value", "t-value", schemaRegistryAddr).as("value"))
 
  // Given that key and value columns are registered in Schema Registry, convert
  // structured data of key and value columns to Avro binary data by reading the schema
  // info from the Schema Registry. The converted data is saved to Kafka as a Kafka topic "t".
  iteblogDF
    .select(
      to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
      to_avro($"value", lit("t-value"), schemaRegistryAddr).as("value"))
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("topic", "t")
  .save()

通过文件设置 Avro 模式

我们还可以将 Avro 模式写入到文件里面,然后在代码里面读取模式文件:

import org.apache.spark.sql.avro._
 
// `from_avro` requires Avro schema in JSON string format.
val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc")))
 
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "iteblog1")
  .load()
 
// 1. Decode the Avro data into a struct;
// 2. Filter by column `favorite_color`;
// 3. Encode the column `name` in Avro format.
val output = df
  .select(from_avro('value, jsonFormatSchema) as 'user)
  .where("user.favorite_color == \"red\"")
  .select(to_avro($"user.name") as 'value)
 
val query = output
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "iteblog2")
  .start()

与 Databricks spark-avro的兼容性

因为 Spark 内置对读写 Avro 数据的支持是从 Spark 2.4 才引入的,所以在这些版本之前,可能有用户已经使用了 Databricks 开源的 spark-avro。但是不用急,内置的 spark-avro 模块和这个是完全兼容的。我们仅仅需要将之前引入的 com.databricks.spark.avro 修改成 org.apache.spark.sql.avro._ 即可。

性能测试

基于 SPARK-24800 的优化,内置 Avro 数据源读写 Avro 文件的性能得到很大提升。社区在这方面进行了相关的基准测试,结果表明,在1百万行的数据(包含 Int/Double/String/Map/Array/Struct 等各种数据格式)测试中,读取的性能提升了2倍,写的性能提升了8%。基准测试的代码可参见 这里,测试比较如下:

image

结论

内置的 spark-avro 模块为 Spark SQL 和 Structured Streaming 提供了更好的用户体验以及 IO 性能。

本文转载自: https://blog.csdn.net/bingdianone/article/details/84965861#_Databricks_sparkavro_124
英文原文地址:https://databricks.com/blog/2018/11/30/apache-avro-as-a-built-in-data-source-in-apache-spark-2-4.html

相关实践学习
基于EMR Serverless StarRocks一键玩转世界杯
基于StarRocks构建极速统一OLAP平台
快速掌握阿里云 E-MapReduce
E-MapReduce 是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户提供集群、作业、数据等管理的一站式大数据处理分析服务。 本课程主要介绍阿里云 E-MapReduce 的使用方法。
相关文章
|
6月前
|
分布式计算 大数据 数据处理
Apache Spark:提升大规模数据处理效率的秘籍
【4月更文挑战第7天】本文介绍了Apache Spark的大数据处理优势和核心特性,包括内存计算、RDD、一站式解决方案。分享了Spark实战技巧,如选择部署模式、优化作业执行流程、管理内存与磁盘、Spark SQL优化及监控调优工具的使用。通过这些秘籍,可以提升大规模数据处理效率,发挥Spark在实际项目中的潜力。
479 0
|
28天前
|
分布式计算 大数据 Apache
利用.NET进行大数据处理:Apache Spark与.NET for Apache Spark
【10月更文挑战第15天】随着大数据成为企业决策和技术创新的关键驱动力,Apache Spark作为高效的大数据处理引擎,广受青睐。然而,.NET开发者面临使用Spark的门槛。本文介绍.NET for Apache Spark,展示如何通过C#和F#等.NET语言,结合Spark的强大功能进行大数据处理,简化开发流程并提升效率。示例代码演示了读取CSV文件及统计分析的基本操作,突显了.NET for Apache Spark的易用性和强大功能。
34 1
|
4月前
|
分布式计算 大数据 Spark
Spark大数据处理:技术、应用与性能优化(全)PDF书籍推荐分享
《Spark大数据处理:技术、应用与性能优化》深入浅出介绍Spark核心,涵盖部署、实战与性能调优,适合初学者。作者基于微软和IBM经验,解析Spark工作机制,探讨BDAS生态,提供实践案例,助力快速掌握。书中亦讨论性能优化策略。[PDF下载链接](https://zhangfeidezhu.com/?p=347)。![Spark Web UI](https://img-blog.csdnimg.cn/direct/16aaadbb4e13410f8cb2727c3786cc9e.png#pic_center)
149 1
Spark大数据处理:技术、应用与性能优化(全)PDF书籍推荐分享
|
3月前
|
分布式计算 Hadoop 大数据
大数据处理框架在零售业的应用:Apache Hadoop与Apache Spark
【8月更文挑战第20天】Apache Hadoop和Apache Spark为处理海量零售户数据提供了强大的支持
63 0
|
3月前
|
分布式计算 Serverless 数据处理
EMR Serverless Spark 实践教程 | 通过 Apache Airflow 使用 Livy Operator 提交任务
Apache Airflow 是一个强大的工作流程自动化和调度工具,它允许开发者编排、计划和监控数据管道的执行。EMR Serverless Spark 为处理大规模数据处理任务提供了一个无服务器计算环境。本文为您介绍如何通过 Apache Airflow 的 Livy Operator 实现自动化地向 EMR Serverless Spark 提交任务,以实现任务调度和执行的自动化,帮助您更有效地管理数据处理任务。
203 0
|
4月前
|
分布式计算 Apache Spark
|
5月前
|
分布式计算 大数据 数据处理
Apache Spark在大数据处理中的应用
Apache Spark是大数据处理的热门工具,由AMPLab开发并捐赠给Apache软件基金会。它以内存计算和优化的执行引擎著称,提供比Hadoop更快的处理速度,支持批处理、交互式查询、流处理和机器学习。Spark架构包括Driver、Master、Worker Node和Executor,核心组件有RDD、DataFrame、Dataset、Spark SQL、Spark Streaming、MLlib和GraphX。文章通过代码示例展示了Spark在批处理、交互式查询和实时数据处理中的应用,并讨论了其优势(高性能、易用性、通用性和集成性)和挑战。【6月更文挑战第11天】
137 6
|
5月前
|
分布式计算 Spark 大数据
深入探究Apache Spark在大数据处理中的实践应用
【6月更文挑战第2天】Apache Spark是流行的开源大数据处理框架,以其内存计算速度和低延迟脱颖而出。本文涵盖Spark概述、核心组件(包括Spark Core、SQL、Streaming和MLlib)及其在数据预处理、批处理分析、交互式查询、实时处理和机器学习中的应用。通过理解Spark内部机制和实践应用,可提升大数据处理效率,发挥其在各行业的潜力。
|
5月前
|
消息中间件 分布式计算 关系型数据库
使用Apache Spark从MySQL到Kafka再到HDFS的数据转移
使用Apache Spark从MySQL到Kafka再到HDFS的数据转移
|
6月前
|
消息中间件 分布式计算 Serverless
CDC一键入湖:当 Apache Hudi DeltaStreamer 遇见 Serverless Spark
CDC一键入湖:当 Apache Hudi DeltaStreamer 遇见 Serverless Spark
117 2

推荐镜像

更多