Apache Hudi初探(五)(与spark的结合)

简介: Apache Hudi初探(五)(与spark的结合)

背景


目前hudi的与spark的集合还是基于spark datasource V1来的,这一点可以查看hudi的source实现就可以知道:

class DefaultSource extends RelationProvider
  with SchemaRelationProvider
  with CreatableRelationProvider
  with DataSourceRegister
  with StreamSinkProvider
  with StreamSourceProvider
  with SparkAdapterSupport
  with Serializable {

闲说杂谈


继续上次的Apache Hudi初探(四)涉及的代码:

 // HoodieDataSourceInternalBatchWrite 类中的方法:其所涉及的的方法调用链如下:
 createBatchWriterFactory => dataWriter.write => dataWriter.commit/abort => dataWriter.close
     ||
     \/
 onDataWriterCommit
     ||
     \/
 commit/abort
  • 在解释commit做的事情之前,DataSourceInternalWriterHelper在构建器阶段还有做了一件事,那就是writeClient.preWrite
    this.metaClient = HoodieTableMetaClient.builder().setConf(configuration).setBasePath(writeConfig.getBasePath()).build();
    // writeClient是 SparkRDDWriteClient 实例
    writeClient.preWrite(instantTime, WriteOperationType.BULK_INSERT, metaClient);

metaClient构建一个HoodieTableMetaClient类型的 hoodie 元数据客户端


如果hoodie.metastore.enable开启(默认是不开启),则新建HoodieTableMetastoreClient类型的实例,否则新建HoodieTableMetastoreClient实例


writeClient.preWrite 这是在写入数据前做的准备工作


  • 根据hoodie.write.concurrency.mode设置的模式来判断(默认是single_writer,还有个选项是optimistic_concurrency_control),如果是OCC则会获取上一次成功的事务,否则为空

  • 根据hoodie.write.concurrency.mode设置的模式来判断(默认是single_writer,还有个选项是optimistic_concurrency_control),如果是OCC则会获取上一次成功的事务,否则为空

  • 是否开启archive归档服务,会根据hoodie.archive.automatic(默认是true)或者hoodie.archive.async(默认是false)和hoodie.table.

  • services.enabled(默认是true) 来启动服务 AsyncCleanerService.startAsyncArchiveIfEnabled

  • 所以默认情况clean和Archive服务都不是异步后台服务


  • 来看commit所做的事情,它最终会调用到dataSourceInternalWriterHelper.commit方法:
public void commit(List<HoodieWriteStat> writeStatList) {
    try {
      writeClient.commitStats(instantTime, writeStatList, Option.of(extraMetadata),
          CommitUtils.getCommitActionType(operationType, metaClient.getTableType()));
    } catch (Exception ioe) {
      throw new HoodieException(ioe.getMessage(), ioe);
    } finally {
      writeClient.close();
    }
  }

这里的writeClientSparkRDDWriteClient的实例,该实例的对一个的commit方法的如下:

public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, Option<Map<String, String>> extraMetadata,
                             String commitActionType, Map<String, List<String>> partitionToReplaceFileIds) {
    // Skip the empty commit if not allowed
    if (!config.allowEmptyCommit() && stats.isEmpty()) {
      return true;
    }
    LOG.info("Committing " + instantTime + " action " + commitActionType);
    // Create a Hoodie table which encapsulated the commits and files visible
    HoodieTable table = createTable(config, hadoopConf);
    HoodieCommitMetadata metadata = CommitUtils.buildMetadata(stats, partitionToReplaceFileIds,
        extraMetadata, operationType, config.getWriteSchema(), commitActionType);
    HoodieInstant inflightInstant = new HoodieInstant(State.INFLIGHT, table.getMetaClient().getCommitActionType(), instantTime);
    HeartbeatUtils.abortIfHeartbeatExpired(instantTime, table, heartbeatClient, config);
    this.txnManager.beginTransaction(Option.of(inflightInstant),
        lastCompletedTxnAndMetadata.isPresent() ? Option.of(lastCompletedTxnAndMetadata.get().getLeft()) : Option.empty());
    try {
      preCommit(inflightInstant, metadata);
      commit(table, commitActionType, instantTime, metadata, stats);
      // already within lock, and so no lock requried for archival
      postCommit(table, metadata, instantTime, extraMetadata, false);
      LOG.info("Committed " + instantTime);
      releaseResources();
    } catch (IOException e) {
      throw new HoodieCommitException("Failed to complete commit " + config.getBasePath() + " at time " + instantTime, e);
    } finally {
      this.txnManager.endTransaction(Option.of(inflightInstant));
    }
    // We don't want to fail the commit if hoodie.fail.writes.on.inline.table.service.exception is false. We catch warn if false
    try {
      // do this outside of lock since compaction, clustering can be time taking and we don't need a lock for the entire execution period
      runTableServicesInline(table, metadata, extraMetadata);
    } catch (Exception e) {
      if (config.isFailOnInlineTableServiceExceptionEnabled()) {
        throw e;
      }
      LOG.warn("Inline compaction or clustering failed with exception: " + e.getMessage()
          + ". Moving further since \"hoodie.fail.writes.on.inline.table.service.exception\" is set to false.");
    }
    emitCommitMetrics(instantTime, metadata, commitActionType);
    // callback if needed.
    if (config.writeCommitCallbackOn()) {
      if (null == commitCallback) {
        commitCallback = HoodieCommitCallbackFactory.create(config);
      }
      commitCallback.call(new HoodieWriteCommitCallbackMessage(instantTime, config.getTableName(), config.getBasePath(), stats));
    }
    return true;
  }
  • 如果是不允许空提交(hoodie.allow.empty.commit默认是true,也就是允许空提交),也就是没有任何数据插入的情况下,就直接返回这对于比如offset的元数据也是需要记录下来

  • createTable 新建一个HoodieTable,这里我们加入建立了HoodieSparkMergeOnReadTable类型的表

  • CommitUtils.buildMetadata 构造元信息,

  • 其中传入的参数operationTypebulk_insertschemaToStoreInCommit是avro schema(之前有设置),commitActionTypedeltacommit,partitionToReplaceFileIdsMap.empty,这里只是构建了HoodieCommitMetadata对象,把对应的元数据的信息记录了下来

  • HoodieInstant 新建了一个HoodieInstant类型的实例,这里是表明是inflight阶段

  • 判断heartbeat是否超时,如果是hoodie.cleaner.policy.failed.writesLAZY,且超时,则报异常

  • txnManager.beginTransaction 开启事务,主要是获取锁

  • 如果是hoodie.write.concurrency.modeoptimistic_concurrency_control,则会开启事务,因为这种情况下会存在冲突的可能性

  • lockManager.lock()hoodie.write.lock.provider配置中获取锁,默认是ZookeeperBasedLockProvider 实现是基于InterProcessMutex

  • 会基于hoodie.metrics.lock.enable的配置是否开启lock时期的metrics

  • reset(currentTxnOwnerInstant 把这次的TxnOwnerInstant设置为currentTxnOwnerInstant
相关文章
|
3月前
|
分布式计算 大数据 Spark
Spark大数据处理:技术、应用与性能优化(全)PDF书籍推荐分享
《Spark大数据处理:技术、应用与性能优化》深入浅出介绍Spark核心,涵盖部署、实战与性能调优,适合初学者。作者基于微软和IBM经验,解析Spark工作机制,探讨BDAS生态,提供实践案例,助力快速掌握。书中亦讨论性能优化策略。[PDF下载链接](https://zhangfeidezhu.com/?p=347)。![Spark Web UI](https://img-blog.csdnimg.cn/direct/16aaadbb4e13410f8cb2727c3786cc9e.png#pic_center)
112 1
Spark大数据处理:技术、应用与性能优化(全)PDF书籍推荐分享
|
2月前
|
分布式计算 Hadoop 大数据
大数据处理框架在零售业的应用:Apache Hadoop与Apache Spark
【8月更文挑战第20天】Apache Hadoop和Apache Spark为处理海量零售户数据提供了强大的支持
46 0
|
2月前
|
分布式计算 Serverless 数据处理
EMR Serverless Spark 实践教程 | 通过 Apache Airflow 使用 Livy Operator 提交任务
Apache Airflow 是一个强大的工作流程自动化和调度工具,它允许开发者编排、计划和监控数据管道的执行。EMR Serverless Spark 为处理大规模数据处理任务提供了一个无服务器计算环境。本文为您介绍如何通过 Apache Airflow 的 Livy Operator 实现自动化地向 EMR Serverless Spark 提交任务,以实现任务调度和执行的自动化,帮助您更有效地管理数据处理任务。
163 0
|
3月前
|
SQL 分布式计算 Apache
Apache Doris + Apache Hudi 快速搭建指南|Lakehouse 使用手册(一)
本文将在 Docker 环境下,为读者介绍如何快速搭建 Apache Doris + Apache Hudi 的测试及演示环境,并对各功能操作进行演示,帮助读者快速入门。
Apache Doris + Apache Hudi 快速搭建指南|Lakehouse 使用手册(一)
|
3月前
|
分布式计算 Apache Spark
|
4月前
|
分布式计算 大数据 数据处理
Apache Spark在大数据处理中的应用
Apache Spark是大数据处理的热门工具,由AMPLab开发并捐赠给Apache软件基金会。它以内存计算和优化的执行引擎著称,提供比Hadoop更快的处理速度,支持批处理、交互式查询、流处理和机器学习。Spark架构包括Driver、Master、Worker Node和Executor,核心组件有RDD、DataFrame、Dataset、Spark SQL、Spark Streaming、MLlib和GraphX。文章通过代码示例展示了Spark在批处理、交互式查询和实时数据处理中的应用,并讨论了其优势(高性能、易用性、通用性和集成性)和挑战。【6月更文挑战第11天】
102 6
|
4月前
|
分布式计算 Spark 大数据
深入探究Apache Spark在大数据处理中的实践应用
【6月更文挑战第2天】Apache Spark是流行的开源大数据处理框架,以其内存计算速度和低延迟脱颖而出。本文涵盖Spark概述、核心组件(包括Spark Core、SQL、Streaming和MLlib)及其在数据预处理、批处理分析、交互式查询、实时处理和机器学习中的应用。通过理解Spark内部机制和实践应用,可提升大数据处理效率,发挥其在各行业的潜力。
|
4月前
|
消息中间件 Java Kafka
实时计算 Flink版操作报错合集之从hudi读数据,报错NoSuchMethodError:org.apache.hudi.format.cow.vector.reader.PaequetColumnarRowSplit.getRecord(),该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
4月前
|
消息中间件 分布式计算 关系型数据库
使用Apache Spark从MySQL到Kafka再到HDFS的数据转移
使用Apache Spark从MySQL到Kafka再到HDFS的数据转移

推荐镜像

更多