Apache Hudi初探(与spark的结合)

简介: Apache Hudi初探(与spark的结合)

背景


本文基于hudi 0.12.2


目前hudi的与spark的集合还是基于spark datasource V1来的,这一点可以查看hudi的source实现就可以知道:

class DefaultSource extends RelationProvider
  with SchemaRelationProvider
  with CreatableRelationProvider
  with DataSourceRegister
  with StreamSinkProvider
  with StreamSourceProvider
  with SparkAdapterSupport
  with Serializable {

闲说杂谈


我们先从hudi的写数据说起(毕竟没有写哪来的读),对应的流程:

createRelation
     ||
     \/
HoodieSparkSqlWriter.write

###具体的代码


首先是一系列table配置的前置校验:

    assert(optParams.get("path").exists(!StringUtils.isNullOrEmpty(_)), "'path' must be set")
    val path = optParams("path")
    val basePath = new Path(path)
    val sparkContext = sqlContext.sparkContext
    val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
    tableExists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))
    var tableConfig = getHoodieTableConfig(sparkContext, path, hoodieTableConfigOpt)
    validateTableConfig(sqlContext.sparkSession, optParams, tableConfig, mode == SaveMode.Overwrite)

assert判断spark中是否传入“path”参数


tableExists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME)) 判断是否是第一次写入,如果存在.hoodie目录,则说明不是第一次写入


getHoodieTableConfig是从当前表中获取配置,也就是从.hoodile/hoodie.properties中读取配置,其中配置文件的内容见附录


validateTableConfig就是做一系列的校验

其中判断的参数为spark配置的参数和已有参数进行比对,进行如下参数一一比对


“hoodie.datasource.write.recordkey.field”和“hoodie.table.recordkey.fields”


“hoodie.datasource.write.precombine.field”和“hoodie.table.precombine.field”


“hoodie.datasource.write.keygenerator.class”和“hoodie.table.keygenerator.class”


再次是keygen的校验

    val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, tableConfig, mode)
    val originKeyGeneratorClassName = HoodieWriterUtils.getOriginKeyGenerator(parameters)
    val timestampKeyGeneratorConfigs = extractConfigsRelatedToTimestampBasedKeyGenerator(
      originKeyGeneratorClassName, parameters)
    //validate datasource and tableconfig keygen are the same
    validateKeyGeneratorConfig(originKeyGeneratorClassName, tableConfig);

mergeParamsAndGetHoodieConfig

 translateSqlOptions
      ||
      \/
 HoodieWriterUtils.parametersWithWriteDefaults
      ||
      \/
 HoodieWriterUtils.convertMapToHoodieConfig

translateSqlOptions


这里传入spark的参数转换为huid的参数:


如果spark配置中有“__partition_columns”参数,则会获取


获取“hoodie.datasource.write.keygenerator.class”的值,并对应用到“__partition_columns” 的值上,并以逗号分隔


最终写入到"hoodie.datasource.write.partitionpath.field"配置中


HoodieWriterUtils.parametersWithWriteDefaults


首先会从classpath下查找hudi-defaults.conf,如果找到则加载,


再次从环境变量HUDI_CONF_DIR查找hudi-defaults.conf文件


保持"hoodie.payload.ordering.field"和"hoodie.datasource.write.precombine.field"一致


HoodieWriterUtils.convertMapToHoodieConfig


把map对象转换为HoodieConfig对象


HoodieWriterUtils.getOriginKeyGenerator


extractConfigsRelatedToTimestampBasedKeyGenerator


获取timestampKeyGeneratorConfigs


validateKeyGeneratorConfig


对spark中配置的keygen和table中配置的进行校验


“hoodie.datasource.write.keygenerator.class”/"hoodie.sql.origin.keygen.class"和“hoodie.table.keygenerator.class”进行比对


其他校验及操作


spark中的参数”hoodie.table.name“必须存在


"spark.serializer"必须是“KryoSerializer”


假如配置了"hoodie.datasource.write.insert.drop.duplicates"为true 且 “hoodie.datasource.write.operation”为“upsert”时,

改“hoodie.datasource.write.operation”为“insert”



附录


  • .hoodile/hoodie.properties 文件内容
hoodie.table.timeline.timezone=LOCAL
hoodie.table.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator
hoodie.table.precombine.field=dt
hoodie.table.version=5
hoodie.database.name=
hoodie.datasource.write.hive_style_partitioning=true
hoodie.table.checksum=493353519
hoodie.partition.metafile.use.base.format=false
hoodie.archivelog.folder=archived
hoodie.table.name=test_hudi_mor
hoodie.compaction.payload.class=org.apache.hudi.common.model.OverwriteWithLatestAvroPayload
hoodie.populate.meta.fields=true
hoodie.table.type=MERGE_ON_READ
hoodie.datasource.write.partitionpath.urlencode=false
hoodie.table.base.file.format=PARQUET
hoodie.datasource.write.drop.partition.columns=false
hoodie.table.metadata.partitions=files
hoodie.timeline.layout.version=1
hoodie.table.recordkey.fields=id
hoodie.table.partition.fields=dt```
相关文章
|
3天前
|
分布式计算 Serverless 数据处理
EMR Serverless Spark 实践教程 | 通过 Apache Airflow 使用 Livy Operator 提交任务
Apache Airflow 是一个强大的工作流程自动化和调度工具,它允许开发者编排、计划和监控数据管道的执行。EMR Serverless Spark 为处理大规模数据处理任务提供了一个无服务器计算环境。本文为您介绍如何通过 Apache Airflow 的 Livy Operator 实现自动化地向 EMR Serverless Spark 提交任务,以实现任务调度和执行的自动化,帮助您更有效地管理数据处理任务。
19 0
|
1月前
|
分布式计算 大数据 Spark
Spark大数据处理:技术、应用与性能优化(全)PDF书籍推荐分享
《Spark大数据处理:技术、应用与性能优化》深入浅出介绍Spark核心,涵盖部署、实战与性能调优,适合初学者。作者基于微软和IBM经验,解析Spark工作机制,探讨BDAS生态,提供实践案例,助力快速掌握。书中亦讨论性能优化策略。[PDF下载链接](https://zhangfeidezhu.com/?p=347)。![Spark Web UI](https://img-blog.csdnimg.cn/direct/16aaadbb4e13410f8cb2727c3786cc9e.png#pic_center)
76 1
Spark大数据处理:技术、应用与性能优化(全)PDF书籍推荐分享
|
1月前
|
分布式计算 数据处理 流计算
实时计算 Flink版产品使用问题之使用Spark ThriftServer查询同步到Hudi的数据时,如何实时查看数据变化
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1月前
|
SQL 分布式计算 Apache
Apache Doris + Apache Hudi 快速搭建指南|Lakehouse 使用手册(一)
本文将在 Docker 环境下,为读者介绍如何快速搭建 Apache Doris + Apache Hudi 的测试及演示环境,并对各功能操作进行演示,帮助读者快速入门。
Apache Doris + Apache Hudi 快速搭建指南|Lakehouse 使用手册(一)
|
30天前
|
分布式计算 Apache Spark
|
2月前
|
SQL 分布式计算 HIVE
实时计算 Flink版产品使用问题之同步到Hudi的数据是否可以被Hive或Spark直接读取
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2月前
|
分布式计算 大数据 数据处理
Apache Spark在大数据处理中的应用
Apache Spark是大数据处理的热门工具,由AMPLab开发并捐赠给Apache软件基金会。它以内存计算和优化的执行引擎著称,提供比Hadoop更快的处理速度,支持批处理、交互式查询、流处理和机器学习。Spark架构包括Driver、Master、Worker Node和Executor,核心组件有RDD、DataFrame、Dataset、Spark SQL、Spark Streaming、MLlib和GraphX。文章通过代码示例展示了Spark在批处理、交互式查询和实时数据处理中的应用,并讨论了其优势(高性能、易用性、通用性和集成性)和挑战。【6月更文挑战第11天】
76 6
|
2月前
|
消息中间件 Java Kafka
实时计算 Flink版操作报错合集之从hudi读数据,报错NoSuchMethodError:org.apache.hudi.format.cow.vector.reader.PaequetColumnarRowSplit.getRecord(),该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
17天前
|
SQL API Apache
官宣|Apache Flink 1.20 发布公告
Apache Flink 1.20.0 已发布,这是迈向 Flink 2.0 的最后一个小版本,后者预计年底发布。此版本包含多项改进和新功能,涉及 13 个 FLIPs 和 300 多个问题解决。亮点包括引入物化表简化 ETL 管道开发,统一检查点文件合并机制减轻文件系统压力,以及 SQL 语法增强如支持 `DISTRIBUTED BY` 语句。此外,还进行了大量的配置项清理工作,为 Flink 2.0 铺平道路。这一版本得益于 142 位贡献者的共同努力,其中包括来自中国多家知名企业的开发者。
595 7
官宣|Apache Flink 1.20 发布公告
|
9天前
|
消息中间件 大数据 Kafka
"Apache Flink:重塑大数据实时处理新纪元,卓越性能与灵活性的实时数据流处理王者"
【8月更文挑战第10天】Apache Flink以卓越性能和高度灵活性在大数据实时处理领域崭露头角。它打破批处理与流处理的传统界限,采用统一模型处理有界和无界数据流,提升了开发效率和系统灵活性。Flink支持毫秒级低延迟处理,通过时间窗口、状态管理和自动并行化等关键技术确保高性能与可靠性。示例代码展示了如何使用Flink从Kafka读取实时数据并进行处理,简明扼要地呈现了Flink的强大能力。随着技术进步,Flink将在更多场景中提供高效可靠的解决方案,持续引领大数据实时处理的发展趋势。
30 7

推荐镜像

更多