Apache Hudi初探(三)(与spark的结合)

简介: Apache Hudi初探(三)(与spark的结合)

背景


目前hudi的与spark的集合还是基于spark datasource V1来的,这一点可以查看hudi的source实现就可以知道:

class DefaultSource extends RelationProvider
  with SchemaRelationProvider
  with CreatableRelationProvider
  with DataSourceRegister
  with StreamSinkProvider
  with StreamSourceProvider
  with SparkAdapterSupport
  with Serializable {

闲说杂谈


继续上次的Apache Hudi初探(二)涉及的代码:

      hoodieDF.write.format("org.apache.hudi.spark3.internal")
        .option(DataSourceInternalWriterHelper.INSTANT_TIME_OPT_KEY, instantTime)
        .option(HoodieInternalConfig.BULKINSERT_INPUT_DATA_SCHEMA_DDL.key, hoodieDF.schema.toDDL)
        .options(params)
        .mode(SaveMode.Append)
        .save()

也就是说最终会调用到org.apache.hudi.spark3.internal.DefaultSource类,

public class DefaultSource extends BaseDefaultSource implements TableProvider {
  @Override
  public StructType inferSchema(CaseInsensitiveStringMap options) {
    return StructType.fromDDL(options.get(HoodieInternalConfig.BULKINSERT_INPUT_DATA_SCHEMA_DDL.key()));
  }
  @Override
  public Table getTable(StructType schema, Transform[] partitioning, Map<String, String> properties) {
    String instantTime = properties.get(DataSourceInternalWriterHelper.INSTANT_TIME_OPT_KEY);
    String path = properties.get("path");
    String tblName = properties.get(HoodieWriteConfig.TBL_NAME.key());
    boolean populateMetaFields = Boolean.parseBoolean(properties.getOrDefault(HoodieTableConfig.POPULATE_META_FIELDS.key(),
        Boolean.toString(HoodieTableConfig.POPULATE_META_FIELDS.defaultValue())));
    boolean arePartitionRecordsSorted = Boolean.parseBoolean(properties.getOrDefault(HoodieInternalConfig.BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED,
        Boolean.toString(HoodieInternalConfig.DEFAULT_BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED)));
    // Create a new map as the properties is an unmodifiableMap on Spark 3.2.0
    Map<String, String> newProps = new HashMap<>(properties);
    // Auto set the value of "hoodie.parquet.writelegacyformat.enabled"
    tryOverrideParquetWriteLegacyFormatProperty(newProps, schema);
    // 1st arg to createHoodieConfig is not really required to be set. but passing it anyways.
    HoodieWriteConfig config = DataSourceUtils.createHoodieConfig(newProps.get(HoodieWriteConfig.AVRO_SCHEMA_STRING.key()), path, tblName, newProps);
    return new HoodieDataSourceInternalTable(instantTime, config, schema, getSparkSession(),
        getConfiguration(), newProps, populateMetaFields, arePartitionRecordsSorted);
  }
}

可以看到该类是继承了TableProvider,也就是说是时基于DataSource V2的,而且save的方法最终会调用到getTable方法。


  • inferSchema方法的调用链如下:
    save
     ||
     \/
  saveInternal
     ||
     \/
 DataSourceV2Utils.getTableFromProvider
     ||
     \/
  provider.getTable  => provider.inferSchema(options)

"hoodie.bulkinsert.schema.ddl"会在DF.option方法中传递过来


getTable方法调用链和inferSchema一样,DataSourceUtils.createHoodieConfig已经分析过了,

最终会生成HoodieDataSourceInternalTable对象,该table对象对象的调用如下:

        val relation = DataSourceV2Relation.create(table, catalog, ident, dsOptions)
        checkPartitioningMatchesV2Table(table)
        if (mode == SaveMode.Append) {
          runCommand(df.sparkSession, "save") {
            AppendData.byName(relation, df.logicalPlan, finalOptions)
          }

其中catalogident为None,接下来的就是Spark的行为了,也就是说最终会生成AppendDataExec物理计划:

   case class AppendDataExec(
     table: SupportsWrite,
     writeOptions: CaseInsensitiveStringMap,
     query: SparkPlan,
     refreshCache: () => Unit) extends V2TableWriteExec with BatchWriteHelper {
   override protected def run(): Seq[InternalRow] = {
     val writtenRows = writeWithV2(newWriteBuilder().buildForBatch())
     refreshCache()
     writtenRows
   }

最终会调用HoodieDataSourceInternalBatchWrite类的方法:

 ...
 this.dataSourceInternalWriterHelper = new DataSourceInternalWriterHelper(instantTime, writeConfig, structType,
      jss, hadoopConfiguration, extraMetadata);
 ...
 @Override
 public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) {
   dataSourceInternalWriterHelper.createInflightCommit();
   if (WriteOperationType.BULK_INSERT == dataSourceInternalWriterHelper.getWriteOperationType()) {
     return new HoodieBulkInsertDataInternalWriterFactory(dataSourceInternalWriterHelper.getHoodieTable(),
         writeConfig, instantTime, structType, populateMetaFields, arePartitionRecordsSorted);
   } else {
     throw new IllegalArgumentException("Write Operation Type + " + dataSourceInternalWriterHelper.getWriteOperationType() + " not supported ");
   }
 }
 @Override
 public boolean useCommitCoordinator() {
   return dataSourceInternalWriterHelper.useCommitCoordinator();
 }
 @Override
 public void onDataWriterCommit(WriterCommitMessage message) {
   dataSourceInternalWriterHelper.onDataWriterCommit(message.toString());
 }
 @Override
 public void commit(WriterCommitMessage[] messages) {
   List<HoodieWriteStat> writeStatList = Arrays.stream(messages).map(m -> (HoodieWriterCommitMessage) m)
       .flatMap(m -> m.getWriteStatuses().stream().map(HoodieInternalWriteStatus::getStat)).collect(Collectors.toList());
   dataSourceInternalWriterHelper.commit(writeStatList);
 }
 @Override
 public void abort(WriterCommitMessage[] messages) {
   dataSourceInternalWriterHelper.abort();
 }

具体的这些方法的被调用过程,读者可以自行查看AppendDataExec类的Run方法,

目前这些方法的调用链如下:

 createBatchWriterFactory => dataWriter.write => dataWriter.commit/abort => dataWriter.close
     ||
     \/
 onDataWriterCommit
     ||
     \/
 commit/abort

由于这个过程涉及的东西比较多,所以下篇文章一起说明

相关文章
|
Java C++ 网络架构
【Java】@ApiOperation vs @ApiResponse in Swagger
【Java】@ApiOperation vs @ApiResponse in Swagger
238 0
|
Java Apache Scala
【阿里云镜像】配置阿里云Maven 镜像
【阿里云镜像】配置阿里云Maven 镜像
24936 1
【阿里云镜像】配置阿里云Maven 镜像
|
11月前
|
数据采集 前端开发 JavaScript
捕捉页面的关键元素:用CSS选择器与Puppeteer自动抓取
本文介绍了如何使用 Puppeteer 结合 CSS 选择器抓取动态网页中的关键元素,以亚航网站的特价机票信息为例,通过设置代理 IP、User-Agent 和 Cookie 等技术手段,有效提升爬虫策略,实现高效、稳定的爬取。
312 5
捕捉页面的关键元素:用CSS选择器与Puppeteer自动抓取
|
SQL 网络协议 关系型数据库
mysql 连接超时wait_timeout问题解决
com.mysql.jdbc.CommunicationsException: The last packet successfully received from the server was58129 seconds ago.The last packet sent successfully to the server was 58129 seconds ago, which is longer than the server configured value of 'wait_timeout'. You should consider either expiring and/or tes
|
7月前
|
SQL 关系型数据库 MySQL
seatunnel配置mysql2hive
本文介绍了SeaTunnel的安装与使用教程,涵盖从安装、配置到数据同步的全过程。主要内容包括: 1. **SeaTunnel安装**:详细描述了下载、解压及配置连接器等步骤。 2. **模拟数据到Hive (fake2hive)**:通过编辑测试脚本,将模拟数据写入Hive表。 3. **MySQL到控制台 (mysql2console)**:创建配置文件并执行命令,将MySQL数据输出到控制台。 4. **MySQL到Hive (mysql2hive)**:创建Hive表,配置并启动同步任务,支持单表和多表同步。
|
10月前
|
机器学习/深度学习 人工智能 API
【AI系统】昇腾异构计算架构 CANN
本文介绍了昇腾 AI 异构计算架构 CANN,涵盖硬件层面的达·芬奇架构和软件层面的全栈支持,旨在提供高性能神经网络计算所需的硬件基础和软件环境。通过多层级架构,CANN 实现了高效的 AI 应用开发与性能优化,支持多种主流 AI 框架,并提供丰富的开发工具和接口,助力开发者快速构建和优化神经网络模型。
661 1
|
11月前
|
人工智能 atlas 开发工具
【AI系统】昇腾 AI 架构介绍
昇腾计算产业基于华为昇腾系列处理器,涵盖硬件、基础软件、应用使能等,构建全栈AI计算基础设施。华为通过开放硬件、开源软件,支持多框架,推动AI技术在端、边、云的广泛应用,促进AI产业生态繁荣。
875 1
|
网络协议 算法 关系型数据库
MySQL8 中文参考(八十四)(5)
MySQL8 中文参考(八十四)
386 5
|
SQL 关系型数据库 MySQL
实时计算 Flink版操作报错之报错File is not a valid field name 如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
存储 Linux Shell
linux查找技巧: find grep xargs
linux查找技巧: find grep xargs
173 13