背景
目前hudi的与spark的集合还是基于spark datasource V1来的,这一点可以查看hudi的source实现就可以知道:
class DefaultSource extends RelationProvider with SchemaRelationProvider with CreatableRelationProvider with DataSourceRegister with StreamSinkProvider with StreamSourceProvider with SparkAdapterSupport with Serializable {
闲说杂谈
继续上次的Apache Hudi初探(二)涉及的代码:
hoodieDF.write.format("org.apache.hudi.spark3.internal") .option(DataSourceInternalWriterHelper.INSTANT_TIME_OPT_KEY, instantTime) .option(HoodieInternalConfig.BULKINSERT_INPUT_DATA_SCHEMA_DDL.key, hoodieDF.schema.toDDL) .options(params) .mode(SaveMode.Append) .save()
也就是说最终会调用到org.apache.hudi.spark3.internal.DefaultSource
类,
public class DefaultSource extends BaseDefaultSource implements TableProvider { @Override public StructType inferSchema(CaseInsensitiveStringMap options) { return StructType.fromDDL(options.get(HoodieInternalConfig.BULKINSERT_INPUT_DATA_SCHEMA_DDL.key())); } @Override public Table getTable(StructType schema, Transform[] partitioning, Map<String, String> properties) { String instantTime = properties.get(DataSourceInternalWriterHelper.INSTANT_TIME_OPT_KEY); String path = properties.get("path"); String tblName = properties.get(HoodieWriteConfig.TBL_NAME.key()); boolean populateMetaFields = Boolean.parseBoolean(properties.getOrDefault(HoodieTableConfig.POPULATE_META_FIELDS.key(), Boolean.toString(HoodieTableConfig.POPULATE_META_FIELDS.defaultValue()))); boolean arePartitionRecordsSorted = Boolean.parseBoolean(properties.getOrDefault(HoodieInternalConfig.BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED, Boolean.toString(HoodieInternalConfig.DEFAULT_BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED))); // Create a new map as the properties is an unmodifiableMap on Spark 3.2.0 Map<String, String> newProps = new HashMap<>(properties); // Auto set the value of "hoodie.parquet.writelegacyformat.enabled" tryOverrideParquetWriteLegacyFormatProperty(newProps, schema); // 1st arg to createHoodieConfig is not really required to be set. but passing it anyways. HoodieWriteConfig config = DataSourceUtils.createHoodieConfig(newProps.get(HoodieWriteConfig.AVRO_SCHEMA_STRING.key()), path, tblName, newProps); return new HoodieDataSourceInternalTable(instantTime, config, schema, getSparkSession(), getConfiguration(), newProps, populateMetaFields, arePartitionRecordsSorted); } }
可以看到该类是继承了TableProvider
,也就是说是时基于DataSource V2的,而且save的方法最终会调用到getTable方法。
- inferSchema方法的调用链如下:
save || \/ saveInternal || \/ DataSourceV2Utils.getTableFromProvider || \/ provider.getTable => provider.inferSchema(options)
"hoodie.bulkinsert.schema.ddl"会在DF.option方法中传递过来
getTable方法调用链和inferSchema一样,DataSourceUtils.createHoodieConfig已经分析过了,
最终会生成HoodieDataSourceInternalTable对象,该table对象对象的调用如下:
val relation = DataSourceV2Relation.create(table, catalog, ident, dsOptions) checkPartitioningMatchesV2Table(table) if (mode == SaveMode.Append) { runCommand(df.sparkSession, "save") { AppendData.byName(relation, df.logicalPlan, finalOptions) }
其中catalog和ident为None,接下来的就是Spark的行为了,也就是说最终会生成AppendDataExec物理计划:
case class AppendDataExec( table: SupportsWrite, writeOptions: CaseInsensitiveStringMap, query: SparkPlan, refreshCache: () => Unit) extends V2TableWriteExec with BatchWriteHelper { override protected def run(): Seq[InternalRow] = { val writtenRows = writeWithV2(newWriteBuilder().buildForBatch()) refreshCache() writtenRows }
最终会调用HoodieDataSourceInternalBatchWrite类的方法:
... this.dataSourceInternalWriterHelper = new DataSourceInternalWriterHelper(instantTime, writeConfig, structType, jss, hadoopConfiguration, extraMetadata); ... @Override public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) { dataSourceInternalWriterHelper.createInflightCommit(); if (WriteOperationType.BULK_INSERT == dataSourceInternalWriterHelper.getWriteOperationType()) { return new HoodieBulkInsertDataInternalWriterFactory(dataSourceInternalWriterHelper.getHoodieTable(), writeConfig, instantTime, structType, populateMetaFields, arePartitionRecordsSorted); } else { throw new IllegalArgumentException("Write Operation Type + " + dataSourceInternalWriterHelper.getWriteOperationType() + " not supported "); } } @Override public boolean useCommitCoordinator() { return dataSourceInternalWriterHelper.useCommitCoordinator(); } @Override public void onDataWriterCommit(WriterCommitMessage message) { dataSourceInternalWriterHelper.onDataWriterCommit(message.toString()); } @Override public void commit(WriterCommitMessage[] messages) { List<HoodieWriteStat> writeStatList = Arrays.stream(messages).map(m -> (HoodieWriterCommitMessage) m) .flatMap(m -> m.getWriteStatuses().stream().map(HoodieInternalWriteStatus::getStat)).collect(Collectors.toList()); dataSourceInternalWriterHelper.commit(writeStatList); } @Override public void abort(WriterCommitMessage[] messages) { dataSourceInternalWriterHelper.abort(); }
具体的这些方法的被调用过程,读者可以自行查看AppendDataExec类的Run方法,
目前这些方法的调用链如下:
createBatchWriterFactory => dataWriter.write => dataWriter.commit/abort => dataWriter.close || \/ onDataWriterCommit || \/ commit/abort
由于这个过程涉及的东西比较多,所以下篇文章一起说明