Apache Hudi初探(六)(与spark的结合)

简介: Apache Hudi初探(六)(与spark的结合)

背景


目前hudi的与spark的集合还是基于spark datasource V1来的,这一点可以查看hudi的source实现就可以知道:

class DefaultSource extends RelationProvider
  with SchemaRelationProvider
  with CreatableRelationProvider
  with DataSourceRegister
  with StreamSinkProvider
  with StreamSourceProvider
  with SparkAdapterSupport
  with Serializable {

后续发现也是基于Datasource V2的


闲说杂谈


继续上次的Apache Hudi初探(五)涉及的代码:

  preCommit(inflightInstant, metadata);
  commit(table, commitActionType, instantTime, metadata, stats);
  // already within lock, and so no lock requried for archival
  postCommit(table, metadata, instantTime, extraMetadata, false);
  LOG.info("Committed " + instantTime);
  releaseResources();
  this.txnManager.endTransaction(Option.of(inflightInstant));
  ...
  runTableServicesInline(table, metadata, extraMetadata)
  emitCommitMetrics(instantTime, metadata, commitActionType);
    // callback if needed.
    if (config.writeCommitCallbackOn()) {
      if (null == commitCallback) {
        commitCallback = HoodieCommitCallbackFactory.create(config);
      }
      commitCallback.call(new HoodieWriteCommitCallbackMessage(instantTime, config.getTableName(), config.getBasePath(), stats));
    }
    return true;
  }

之前说到只有获取到分布式锁的线程才可以继续一下操作


preCommit 解决潜在存在的写元数据的文件冲突,因为有可能当前的写入和后台Compaction/Clustering操作存在冲突


commit 真正的写元数据的


  • 在写元数据之前得了解一下marker文件 ,marker文件的创建是HoodieRowCreateHandle类中:
  createMarkerFile(partitionPath, fileName, instantTime, table, writeConfig);

这里会根据hoodie.write.markers.type的值(默认是"TIMELINE_SERVER_BASED")以及hoodie.embed.timeline.server(默认是true)


来创建marker文件,对于默认值来说(也就是TimelineServerBasedWriteMarkers如果是基于HDFS的,就是DirectWriteMarkers),

就会向内置的TimelineServerBasedWriteMarkers服务发送创建marker文件的请求(文件的后缀为*.parquet.marker.CREATE*),


而该服务的创建是在BaseHoodieClient的构造方法中:

 startEmbeddedServerView()

在commit文件的时候,会调用reconcileAgainstMarkers,会删除由于spark speculation导致的部分文件,这也是marker文件存在的意义


并且更新指标如"duration"和“numFilesFinalized”


writeTableMetadata 更新table的元数据


activeTimeline.saveAsComplete 创建类似20230422183552014.deltacommit的元数据文件


postCommit做一些commit后的清理工作


quietDeleteMarkerDir 删除对应的marker文件


autoCleanOnCommit 如果hoodie.clean.automatic为true且hoodie.clean.async 为false的情况下,就进行同步清理


主要是清理不需要的历史文件(parquet数据文件或者log文件)


autoArchiveOnCommit如果hoodie.archive.automatic为true且hoodie.archive.async为false,就进行归档操作


主要是对instant归档,减少timeline的操作压力



  • releaseResources 在这里主要是释放掉持久化的RDD

  • this.txnManager.endTransaction(Option.of(inflightInstant))
    结束事务,释放掉锁


runTableServicesInline 这个是在事务之外的,因为这个操作是个耗时的操作,包含了CompactionClustering


如果 hoodie.metadata.enable 为true(默认就是true) 就会继续进行后续操作


如果 hoodie.compact.inline为false(默认就是false)并且 hoodie.compact.schedule.inline 为true(默认是false)就是会进行Compaction


如果 hoodie.compact.inline 为true,也是会进行Compaction


Clustering的操作也一样,只不过对应的配合为hoodie.clustering.inline和hoodie.clustering.schedule.inline


emitCommitMetrics指标进行采集


如果hoodie.write.commit.callback.on为true(默认是false)


还会回调hoodie.write.commit.callback.class的类(默认为org.apache.hudi.callback.impl.HoodieWriteCommitHttpCallback),根据发送一些信息给hoodie.write.commit.callback.http.url配置的http server


相关文章
|
3月前
|
分布式计算 大数据 Spark
Spark大数据处理:技术、应用与性能优化(全)PDF书籍推荐分享
《Spark大数据处理:技术、应用与性能优化》深入浅出介绍Spark核心,涵盖部署、实战与性能调优,适合初学者。作者基于微软和IBM经验,解析Spark工作机制,探讨BDAS生态,提供实践案例,助力快速掌握。书中亦讨论性能优化策略。[PDF下载链接](https://zhangfeidezhu.com/?p=347)。![Spark Web UI](https://img-blog.csdnimg.cn/direct/16aaadbb4e13410f8cb2727c3786cc9e.png#pic_center)
112 1
Spark大数据处理:技术、应用与性能优化(全)PDF书籍推荐分享
|
2月前
|
分布式计算 Hadoop 大数据
大数据处理框架在零售业的应用:Apache Hadoop与Apache Spark
【8月更文挑战第20天】Apache Hadoop和Apache Spark为处理海量零售户数据提供了强大的支持
46 0
|
2月前
|
分布式计算 Serverless 数据处理
EMR Serverless Spark 实践教程 | 通过 Apache Airflow 使用 Livy Operator 提交任务
Apache Airflow 是一个强大的工作流程自动化和调度工具,它允许开发者编排、计划和监控数据管道的执行。EMR Serverless Spark 为处理大规模数据处理任务提供了一个无服务器计算环境。本文为您介绍如何通过 Apache Airflow 的 Livy Operator 实现自动化地向 EMR Serverless Spark 提交任务,以实现任务调度和执行的自动化,帮助您更有效地管理数据处理任务。
163 0
|
3月前
|
SQL 分布式计算 Apache
Apache Doris + Apache Hudi 快速搭建指南|Lakehouse 使用手册(一)
本文将在 Docker 环境下,为读者介绍如何快速搭建 Apache Doris + Apache Hudi 的测试及演示环境,并对各功能操作进行演示,帮助读者快速入门。
Apache Doris + Apache Hudi 快速搭建指南|Lakehouse 使用手册(一)
|
3月前
|
分布式计算 Apache Spark
|
4月前
|
分布式计算 大数据 数据处理
Apache Spark在大数据处理中的应用
Apache Spark是大数据处理的热门工具,由AMPLab开发并捐赠给Apache软件基金会。它以内存计算和优化的执行引擎著称,提供比Hadoop更快的处理速度,支持批处理、交互式查询、流处理和机器学习。Spark架构包括Driver、Master、Worker Node和Executor,核心组件有RDD、DataFrame、Dataset、Spark SQL、Spark Streaming、MLlib和GraphX。文章通过代码示例展示了Spark在批处理、交互式查询和实时数据处理中的应用,并讨论了其优势(高性能、易用性、通用性和集成性)和挑战。【6月更文挑战第11天】
102 6
|
4月前
|
分布式计算 Spark 大数据
深入探究Apache Spark在大数据处理中的实践应用
【6月更文挑战第2天】Apache Spark是流行的开源大数据处理框架,以其内存计算速度和低延迟脱颖而出。本文涵盖Spark概述、核心组件(包括Spark Core、SQL、Streaming和MLlib)及其在数据预处理、批处理分析、交互式查询、实时处理和机器学习中的应用。通过理解Spark内部机制和实践应用,可提升大数据处理效率,发挥其在各行业的潜力。
|
4月前
|
消息中间件 Java Kafka
实时计算 Flink版操作报错合集之从hudi读数据,报错NoSuchMethodError:org.apache.hudi.format.cow.vector.reader.PaequetColumnarRowSplit.getRecord(),该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
4月前
|
消息中间件 分布式计算 关系型数据库
使用Apache Spark从MySQL到Kafka再到HDFS的数据转移
使用Apache Spark从MySQL到Kafka再到HDFS的数据转移
|
2月前
|
存储 消息中间件 Java
Apache Flink 实践问题之原生TM UI日志问题如何解决
Apache Flink 实践问题之原生TM UI日志问题如何解决
40 1

推荐镜像

更多