Apache Hudi初探(二)(与spark的结合)

简介: Apache Hudi初探(二)(与spark的结合)

背景


目前hudi的与spark的集合还是基于spark datasource V1来的,这一点可以查看hudi的source实现就可以知道:

class DefaultSource extends RelationProvider
  with SchemaRelationProvider
  with CreatableRelationProvider
  with DataSourceRegister
  with StreamSinkProvider
  with StreamSourceProvider
  with SparkAdapterSupport
  with Serializable {

闲说杂谈


我们先从hudi的写数据说起(毕竟没有写哪来的读),对应的流程:

createRelation
     ||
     \/
HoodieSparkSqlWriter.write

具体的代码


继续上一次Apache Hudi初探(与spark的结合)的代码:

      handleSaveModes(sqlContext.sparkSession, mode, basePath, tableConfig, tblName, operation, fs)
      val partitionColumns = SparkKeyGenUtils.getPartitionColumns(keyGenerator, toProperties
      (parameters))
      val tableMetaClient = if (tableExists) {
        HoodieTableMetaClient.builder
          .setConf(sparkContext.hadoopConfiguration)
          .setBasePath(path)
          .build()
      } else {
        ...
      }
      val commitActionType = CommitUtils.getCommitActionType(operation, tableConfig.getTableType)
      if (hoodieConfig.getBoolean(ENABLE_ROW_WRITER) &&
       operation == WriteOperationType.BULK_INSERT) {
       val (success, commitTime: common.util.Option[String]) = bulkInsertAsRow(sqlContext, parameters, df, tblName,
         basePath, path, instantTime, partitionColumns, tableConfig.isTablePartitioned)
       return (success, commitTime, common.util.Option.empty(), common.util.Option.empty(), hoodieWriteClient.orNull, tableConfig)
      }

handleSaveModes 是对spark SaveMode和hoodie的hoodie.datasource.write.operation配置进行校验验证


如 如果根据现有spark.sessionState.conf.resolver配置计算出来的表名(source中配置的hoodie.table.name和tableconfig获取的hoodie.table.name)不一致则报错


partitionColumns 获取分区字段,一般是 “field1,field2”格式


val tableMetaClient =


构造tableMetaClient,如果表存在,则复用现有的,
如果不存在则会新建,主要的是新建目录以及初始化对应的目录结构:


创建.hoodie目录


创建.hoodie/.schema目录


创建.hoodie/archived目录


创建.hoodie/.temp目录


创建.hoodie/.aux目录


创建.hoodie/.aux/.bootstrap目录


创建.hoodie/.aux/.bootstrap/.partitions目录


创建.hoodie/.aux/.bootstrap/.fileids目录


创建.hoodie/hoodie.properties文件

并向hoodie.properties写入属性值

最终会形成如下的文件目录机构:

  hudi_result_mor/.hoodie/.aux
  hudi_result_mor/.hoodie/.aux/.bootstrap/.partitions
  hudi_result_mor/.hoodie/.aux/.bootstrap/.fileids
  hudi_result_mor/.hoodie/.schema
  hudi_result_mor/.hoodie/.temp
  hudi_result_mor/.hoodie/archived
  hudi_result_mor/.hoodie/hoodie.properties
  hudi_result_mor/.hoodie/metadata

val commitActionType = CommitUtils.getCommitActionType


这个决定了commit的类型,如果是COW表则是commit,如果是MOR表是deltacommit,这会在文件的后缀上有体现


bulkInsertAsRow


如果同时满足“hoodie.datasource.write.row.writer.enable”(默认是true)和“hoodie.datasource.write.operation”是bulk_insert,则会按照spark原生的ROW格式写入数据,否则会有额外的转换操作


bulkInsertAsRow解析


由于bulkInsertAsRow是写入数据的重点,所以逐一分析:

    val sparkContext = sqlContext.sparkContext
    val populateMetaFields = java.lang.Boolean.parseBoolean(parameters.getOrElse(HoodieTableConfig.POPULATE_META_FIELDS.key(),
      String.valueOf(HoodieTableConfig.POPULATE_META_FIELDS.defaultValue())))
    val dropPartitionColumns = parameters.get(DataSourceWriteOptions.DROP_PARTITION_COLUMNS.key()).map(_.toBoolean)
      .getOrElse(DataSourceWriteOptions.DROP_PARTITION_COLUMNS.defaultValue())
    // register classes & schemas
    val (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tblName)
    sparkContext.getConf.registerKryoClasses(
      Array(classOf[org.apache.avro.generic.GenericData],
        classOf[org.apache.avro.Schema]))
    var schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace)
    if (dropPartitionColumns) {
      schema = generateSchemaWithoutPartitionColumns(partitionColumns, schema)
    }
    validateSchemaForHoodieIsDeleted(schema)
    sparkContext.getConf.registerAvroSchemas(schema)
    log.info(s"Registered avro schema : ${schema.toString(true)}")
    if (parameters(INSERT_DROP_DUPS.key).toBoolean) {
      throw new HoodieException("Dropping duplicates with bulk_insert in row writer path is not supported yet")
    }
  • populateMetaFields= ,如果是True,会在每行记录中添加Hudi的元数据字段(如_hoodie_commit_time等),这在后面的bulkInsertPartitionerRows时候用到,默认是True


  • dropPartitionColumns 是否删除分区字段,默认是否,也就是会保留分区字段

  • sparkContext.getConf.registerKryoClassesGenericData和Schema使用Kyro序列化

  • var schema = AvroConversionUtils.convertStructTypeToAvroSchema 把spark sql Schema转换为Avro Schema

  • sparkContext.getConf.registerAvroSchemas 注册Avro序列化

  • “hoodie.datasource.write.insert.drop.duplicates” 不允许为True
 val params: mutable.Map[String, String] = collection.mutable.Map(parameters.toSeq: _*)
    params(HoodieWriteConfig.AVRO_SCHEMA_STRING.key) = schema.toString
    val writeConfig = DataSourceUtils.createHoodieConfig(schema.toString, path, tblName, mapAsJavaMap(params))
    val bulkInsertPartitionerRows: BulkInsertPartitioner[Dataset[Row]] = if (populateMetaFields) {
      val userDefinedBulkInsertPartitionerOpt = DataSourceUtils.createUserDefinedBulkInsertPartitionerWithRows(writeConfig)
      if (userDefinedBulkInsertPartitionerOpt.isPresent) {
        userDefinedBulkInsertPartitionerOpt.get
      } else {
        BulkInsertInternalPartitionerWithRowsFactory.get(
          writeConfig.getBulkInsertSortMode, isTablePartitioned)
      }
    } else {
      // Sort modes are not yet supported when meta fields are disabled
      new NonSortPartitionerWithRows()
    }
    val arePartitionRecordsSorted = bulkInsertPartitionerRows.arePartitionRecordsSorted()
    params(HoodieInternalConfig.BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED) = arePartitionRecordsSorted.toString
    val isGlobalIndex = if (populateMetaFields) {
      SparkHoodieIndexFactory.isGlobalIndex(writeConfig)
    } else {
      false
    }
  • 注册“hoodie.avro.schema”为刚才的Avro Schema

  • val writeConfig = DataSourceUtils.createHoodieConfig

  • 创建hudiConfig对象,其中包括:

  • “hoodie.datasource.compaction.async.enable” 是否异步compaction,默认是true

        如果不是异步compaction,且满足是MOR表,则表明是同步Compaction


      “hoodie.datasource.write.insert.drop.duplicates”如果是True(默认False),则会在插入记    录的时候去重


        设置“hoodie.datasource.write.payload.class”,默认是“OverwriteWithLatestAvroPayload”


        设置“hoodie.datasource.write.precombine.field”,默认是ts字段,这个字段用在Playload的时候进行record的比较


       这里还会在在最后的build()步骤里设置"hoodie.index.type",如果是spark引擎,则是"SIMPLE"


bulkInsertPartitionerRows,默认是NonSortPartitionerWithRows,也就是原样输出,不做任何改动


设置"hoodie.bulkinsert.are.partitioner.records.sorted",默认为False


val isGlobalIndex = 这里会根据索引类型来判断,因为默认是“SIMPLE”索引,所以是False

val hoodieDF = HoodieDatasetBulkInsertHelper.prepareForBulkInsert(df, writeConfig, bulkInsertPartitionerRows, dropPartitionColumns)
    if (HoodieSparkUtils.isSpark2) {
      hoodieDF.write.format("org.apache.hudi.internal")
        .option(DataSourceInternalWriterHelper.INSTANT_TIME_OPT_KEY, instantTime)
        .options(params)
        .mode(SaveMode.Append)
        .save()
    } else if (HoodieSparkUtils.isSpark3) {
      hoodieDF.write.format("org.apache.hudi.spark3.internal")
        .option(DataSourceInternalWriterHelper.INSTANT_TIME_OPT_KEY, instantTime)
        .option(HoodieInternalConfig.BULKINSERT_INPUT_DATA_SCHEMA_DDL.key, hoodieDF.schema.toDDL)
        .options(params)
        .mode(SaveMode.Append)
        .save()
    } else {
      throw new HoodieException("Bulk insert using row writer is not supported with current Spark version."
        + " To use row writer please switch to spark 2 or spark 3")
    }
    val syncHiveSuccess = metaSync(sqlContext.sparkSession, writeConfig, basePath, df.schema)
    (syncHiveSuccess, common.util.Option.ofNullable(instantTime))
  }

HoodieDatasetBulkInsertHelper.prepareForBulkInsert 这是插入数据前的准备工作


如果"hoodie.populate.meta.fields"是True,则增加元数据字段:


_hoodie_commit_time,_hoodie_commit_seqno,_hoodie_record_key,_hoodie_partition_path,_hoodie_file_name


“hoodie.combine.before.insert”,是否在写入存储之前,先进行数据去重处理(按照precombine的key),默认是False


  • 默认走的是,只是加上元数据字段

  • 如果是设置为True,则会引入额外的shuffle来进行去重处理

  • 如果"hoodie.datasource.write.drop.partition.columns"为True(默认是False),去掉分区字段

  • 因为这里是Spark3 所以会进入到hoodieDF.write.format(“org.apache.hudi.spark3.internal”)
    这里后续再分析
相关文章
|
5月前
|
分布式计算 监控 大数据
【Spark Streaming】Spark Day10:Spark Streaming 学习笔记
【Spark Streaming】Spark Day10:Spark Streaming 学习笔记
42 0
|
5月前
|
消息中间件 分布式计算 Kafka
【Spark Streaming】Spark Day11:Spark Streaming 学习笔记
【Spark Streaming】Spark Day11:Spark Streaming 学习笔记
34 0
|
5月前
|
消息中间件 分布式计算 Kafka
Spark【Spark Streaming】
Spark【Spark Streaming】
|
11月前
|
分布式计算 Apache Spark
Apache Hudi初探(与spark的结合)
Apache Hudi初探(与spark的结合)
94 0
|
11月前
|
分布式计算 Apache Spark
Apache Hudi初探(五)(与spark的结合)
Apache Hudi初探(五)(与spark的结合)
183 0
|
11月前
|
分布式计算 Java Apache
Apache Hudi初探(四)(与spark的结合)
Apache Hudi初探(四)(与spark的结合)
66 0
|
11月前
|
分布式计算 Apache Spark
Apache Hudi初探(三)(与spark的结合)
Apache Hudi初探(三)(与spark的结合)
53 0
|
11月前
|
SQL 分布式计算 Apache
Apache Hudi初探(七)(与spark的结合)
Apache Hudi初探(七)(与spark的结合)
81 0
|
11月前
|
分布式计算 Apache Spark
Apache Hudi初探(六)(与spark的结合)
Apache Hudi初探(六)(与spark的结合)
156 0
|
12月前
|
SQL 机器学习/深度学习 分布式计算
【大数据架构】Apache Flink和Apache Spark—比较指南
【大数据架构】Apache Flink和Apache Spark—比较指南
【大数据架构】Apache Flink和Apache Spark—比较指南

推荐镜像

更多