Apache Hudi初探(三)(与spark的结合)

简介: Apache Hudi初探(三)(与spark的结合)

背景


目前hudi的与spark的集合还是基于spark datasource V1来的,这一点可以查看hudi的source实现就可以知道:

class DefaultSource extends RelationProvider
  with SchemaRelationProvider
  with CreatableRelationProvider
  with DataSourceRegister
  with StreamSinkProvider
  with StreamSourceProvider
  with SparkAdapterSupport
  with Serializable {

闲说杂谈


继续上次的Apache Hudi初探(二)涉及的代码:

      hoodieDF.write.format("org.apache.hudi.spark3.internal")
        .option(DataSourceInternalWriterHelper.INSTANT_TIME_OPT_KEY, instantTime)
        .option(HoodieInternalConfig.BULKINSERT_INPUT_DATA_SCHEMA_DDL.key, hoodieDF.schema.toDDL)
        .options(params)
        .mode(SaveMode.Append)
        .save()

也就是说最终会调用到org.apache.hudi.spark3.internal.DefaultSource类,

public class DefaultSource extends BaseDefaultSource implements TableProvider {
  @Override
  public StructType inferSchema(CaseInsensitiveStringMap options) {
    return StructType.fromDDL(options.get(HoodieInternalConfig.BULKINSERT_INPUT_DATA_SCHEMA_DDL.key()));
  }
  @Override
  public Table getTable(StructType schema, Transform[] partitioning, Map<String, String> properties) {
    String instantTime = properties.get(DataSourceInternalWriterHelper.INSTANT_TIME_OPT_KEY);
    String path = properties.get("path");
    String tblName = properties.get(HoodieWriteConfig.TBL_NAME.key());
    boolean populateMetaFields = Boolean.parseBoolean(properties.getOrDefault(HoodieTableConfig.POPULATE_META_FIELDS.key(),
        Boolean.toString(HoodieTableConfig.POPULATE_META_FIELDS.defaultValue())));
    boolean arePartitionRecordsSorted = Boolean.parseBoolean(properties.getOrDefault(HoodieInternalConfig.BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED,
        Boolean.toString(HoodieInternalConfig.DEFAULT_BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED)));
    // Create a new map as the properties is an unmodifiableMap on Spark 3.2.0
    Map<String, String> newProps = new HashMap<>(properties);
    // Auto set the value of "hoodie.parquet.writelegacyformat.enabled"
    tryOverrideParquetWriteLegacyFormatProperty(newProps, schema);
    // 1st arg to createHoodieConfig is not really required to be set. but passing it anyways.
    HoodieWriteConfig config = DataSourceUtils.createHoodieConfig(newProps.get(HoodieWriteConfig.AVRO_SCHEMA_STRING.key()), path, tblName, newProps);
    return new HoodieDataSourceInternalTable(instantTime, config, schema, getSparkSession(),
        getConfiguration(), newProps, populateMetaFields, arePartitionRecordsSorted);
  }
}

可以看到该类是继承了TableProvider,也就是说是时基于DataSource V2的,而且save的方法最终会调用到getTable方法。


  • inferSchema方法的调用链如下:
    save
     ||
     \/
  saveInternal
     ||
     \/
 DataSourceV2Utils.getTableFromProvider
     ||
     \/
  provider.getTable  => provider.inferSchema(options)

"hoodie.bulkinsert.schema.ddl"会在DF.option方法中传递过来


getTable方法调用链和inferSchema一样,DataSourceUtils.createHoodieConfig已经分析过了,

最终会生成HoodieDataSourceInternalTable对象,该table对象对象的调用如下:

        val relation = DataSourceV2Relation.create(table, catalog, ident, dsOptions)
        checkPartitioningMatchesV2Table(table)
        if (mode == SaveMode.Append) {
          runCommand(df.sparkSession, "save") {
            AppendData.byName(relation, df.logicalPlan, finalOptions)
          }

其中catalogident为None,接下来的就是Spark的行为了,也就是说最终会生成AppendDataExec物理计划:

   case class AppendDataExec(
     table: SupportsWrite,
     writeOptions: CaseInsensitiveStringMap,
     query: SparkPlan,
     refreshCache: () => Unit) extends V2TableWriteExec with BatchWriteHelper {
   override protected def run(): Seq[InternalRow] = {
     val writtenRows = writeWithV2(newWriteBuilder().buildForBatch())
     refreshCache()
     writtenRows
   }

最终会调用HoodieDataSourceInternalBatchWrite类的方法:

 ...
 this.dataSourceInternalWriterHelper = new DataSourceInternalWriterHelper(instantTime, writeConfig, structType,
      jss, hadoopConfiguration, extraMetadata);
 ...
 @Override
 public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) {
   dataSourceInternalWriterHelper.createInflightCommit();
   if (WriteOperationType.BULK_INSERT == dataSourceInternalWriterHelper.getWriteOperationType()) {
     return new HoodieBulkInsertDataInternalWriterFactory(dataSourceInternalWriterHelper.getHoodieTable(),
         writeConfig, instantTime, structType, populateMetaFields, arePartitionRecordsSorted);
   } else {
     throw new IllegalArgumentException("Write Operation Type + " + dataSourceInternalWriterHelper.getWriteOperationType() + " not supported ");
   }
 }
 @Override
 public boolean useCommitCoordinator() {
   return dataSourceInternalWriterHelper.useCommitCoordinator();
 }
 @Override
 public void onDataWriterCommit(WriterCommitMessage message) {
   dataSourceInternalWriterHelper.onDataWriterCommit(message.toString());
 }
 @Override
 public void commit(WriterCommitMessage[] messages) {
   List<HoodieWriteStat> writeStatList = Arrays.stream(messages).map(m -> (HoodieWriterCommitMessage) m)
       .flatMap(m -> m.getWriteStatuses().stream().map(HoodieInternalWriteStatus::getStat)).collect(Collectors.toList());
   dataSourceInternalWriterHelper.commit(writeStatList);
 }
 @Override
 public void abort(WriterCommitMessage[] messages) {
   dataSourceInternalWriterHelper.abort();
 }

具体的这些方法的被调用过程,读者可以自行查看AppendDataExec类的Run方法,

目前这些方法的调用链如下:

 createBatchWriterFactory => dataWriter.write => dataWriter.commit/abort => dataWriter.close
     ||
     \/
 onDataWriterCommit
     ||
     \/
 commit/abort

由于这个过程涉及的东西比较多,所以下篇文章一起说明

相关文章
|
5月前
|
消息中间件 分布式计算 Kafka
【Spark Streaming】Spark Day11:Spark Streaming 学习笔记
【Spark Streaming】Spark Day11:Spark Streaming 学习笔记
34 0
|
5月前
|
分布式计算 监控 大数据
【Spark Streaming】Spark Day10:Spark Streaming 学习笔记
【Spark Streaming】Spark Day10:Spark Streaming 学习笔记
42 0
|
5月前
|
消息中间件 分布式计算 Kafka
Spark【Spark Streaming】
Spark【Spark Streaming】
|
11月前
|
分布式计算 Apache Spark
Apache Hudi初探(五)(与spark的结合)
Apache Hudi初探(五)(与spark的结合)
183 0
|
11月前
|
分布式计算 Apache Spark
Apache Hudi初探(与spark的结合)
Apache Hudi初探(与spark的结合)
94 0
|
11月前
|
SQL 分布式计算 Apache
Apache Hudi初探(七)(与spark的结合)
Apache Hudi初探(七)(与spark的结合)
81 0
|
11月前
|
分布式计算 Java Apache
Apache Hudi初探(四)(与spark的结合)
Apache Hudi初探(四)(与spark的结合)
66 0
|
11月前
|
分布式计算 Apache Spark
Apache Hudi初探(六)(与spark的结合)
Apache Hudi初探(六)(与spark的结合)
156 0
|
11月前
|
存储 SQL 分布式计算
Apache Hudi初探(二)(与spark的结合)
Apache Hudi初探(二)(与spark的结合)
127 0
|
12月前
|
SQL 机器学习/深度学习 分布式计算
【大数据架构】Apache Flink和Apache Spark—比较指南
【大数据架构】Apache Flink和Apache Spark—比较指南
【大数据架构】Apache Flink和Apache Spark—比较指南

推荐镜像

更多