Apache Hudi初探(八)(与spark的结合)--非bulk_insert模式

简介: Apache Hudi初探(八)(与spark的结合)--非bulk_insert模式

背景


之前讨论的都是’hoodie.datasource.write.operation’:'bulk_insert’的前提下,在这种模式下,是没有json文件的已形成如下的文件:

/dt=1/.hoodie_partition_metadata
/dt=1/2ffe3579-6ddb-4c5f-bf03-5c1b5dfce0a0-0_0-41263-0_20230528233336713.parquet
/dt=1/30b7d5b2-12e8-415a-8ec5-18206fe601c0-0_0-22102-0_20230528231643200.parquet
/dt=1/4abc1c6d-a8aa-4c15-affc-61a35171ce69-0_4-22106-0_20230528231643200.parquet
/dt=1/513dee80-2e8c-4db8-baee-a767b9dba41c-0_2-22104-0_20230528231643200.parquet
/dt=1/57076f86-0a62-4f52-8b50-31a5f769b26a-0_1-22103-0_20230528231643200.parquet
/dt=1/84553727-be9d-4273-bad9-0a38d9240815-0_0-59818-0_20230528233513387.parquet
/dt=1/fecd6a84-9a74-40b1-bfc1-13612a67a785-0_0-26640-0_20230528231723951.parquet

因为是"bulk insert"操作,所以没有去重的需要,所以直接采用spark原生的方式,

以下我们讨论非spark原生的方式,


闲说杂谈


继续Apache Hudi初探(二)(与spark的结合)


剩下的代码:

 val reconcileSchema = parameters(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean
 val (writeResult, writeClient: SparkRDDWriteClient[HoodieRecordPayload[Nothing]]) =
 ...
  case _ => { // any other operation
   // register classes & schemas
   val (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tblName)
   sparkContext.getConf.registerKryoClasses(
     Array(classOf[org.apache.avro.generic.GenericData],
       classOf[org.apache.avro.Schema]))
   // TODO(HUDI-4472) revisit and simplify schema handling
   val sourceSchema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace)
   val latestTableSchema = getLatestTableSchema(sqlContext.sparkSession, tableMetaClient).getOrElse(sourceSchema)
   val schemaEvolutionEnabled = parameters.getOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key(), "false").toBoolean
   var internalSchemaOpt = getLatestTableInternalSchema(hoodieConfig, tableMetaClient)
   val writerSchema: Schema =
     if (reconcileSchema) {
       // In case we need to reconcile the schema and schema evolution is enabled,
       // we will force-apply schema evolution to the writer's schema
       if (schemaEvolutionEnabled && internalSchemaOpt.isEmpty) {
         internalSchemaOpt = Some(AvroInternalSchemaConverter.convert(sourceSchema))
       }
       if (internalSchemaOpt.isDefined) {
       ...
   // Convert to RDD[HoodieRecord]
   val genericRecords: RDD[GenericRecord] = HoodieSparkUtils.createRdd(df, structName, nameSpace, reconcileSchema,
     org.apache.hudi.common.util.Option.of(writerSchema))
   val shouldCombine = parameters(INSERT_DROP_DUPS.key()).toBoolean ||
     operation.equals(WriteOperationType.UPSERT) ||
     parameters.getOrElse(HoodieWriteConfig.COMBINE_BEFORE_INSERT.key(),
       HoodieWriteConfig.COMBINE_BEFORE_INSERT.defaultValue()).toBoolean
   val hoodieAllIncomingRecords = genericRecords.map(gr => {
     val processedRecord = getProcessedRecord(partitionColumns, gr, dropPartitionColumns)
     val hoodieRecord = if (shouldCombine) {
       val orderingVal = HoodieAvroUtils.getNestedFieldVal(gr, hoodieConfig.getString(PRECOMBINE_FIELD), false, parameters.getOrElse(
         DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
         DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()).toBoolean)
         .asInstanceOf[Comparable[_]]
       DataSourceUtils.createHoodieRecord(processedRecord,
         orderingVal,
         keyGenerator.getKey(gr),
         hoodieConfig.getString(PAYLOAD_CLASS_NAME))
     } else {
       DataSourceUtils.createHoodieRecord(processedRecord, keyGenerator.getKey(gr), hoodieConfig.getString(PAYLOAD_CLASS_NAME))
     }
     hoodieRecord
   }).toJavaRDD()
   val writerDataSchema = if (dropPartitionColumns) generateSchemaWithoutPartitionColumns(partitionColumns, writerSchema) else writerSchema
   // Create a HoodieWriteClient & issue the write.
   val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, writerDataSchema.toString, path,
     tblName, mapAsJavaMap(addSchemaEvolutionParameters(parameters, internalSchemaOpt) - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key)
   )).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]
   if (isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())) {
     asyncCompactionTriggerFn.get.apply(client)
   }
   if (isAsyncClusteringEnabled(client, parameters)) {
     asyncClusteringTriggerFn.get.apply(client)
   }
   val hoodieRecords =
     if (hoodieConfig.getBoolean(INSERT_DROP_DUPS)) {
       DataSourceUtils.dropDuplicates(jsc, hoodieAllIncomingRecords, mapAsJavaMap(parameters))
     } else {
       hoodieAllIncomingRecords
     }
   client.startCommitWithTime(instantTime, commitActionType)
   val writeResult = DataSourceUtils.doWriteOperation(client, hoodieRecords, instantTime, operation)
   (writeResult, client)
 }

如果开启了Schema Evolution,也就是hoodie.datasource.write.reconcile.schema是true,默认是false,就会进行schema的合并


convertStructTypeToAvroSchema 把df的schema转换成avro的schema

并且从*.hoodie/20230530073115535.deltacommit* 获取internalSchemaOpt,具体的合并就是把即将写入的schema和internalSchemaOpt进行合并


最后赋值给writerSchema,有可能还需要hoodie.schema.on.read.enable,默认是false


  • HoodieSparkUtils.createRdd 创建RDD

  • 把df转换为了RDD[GenericRecord]类型,赋值给genericRecords


val hoodieAllIncomingRecords = genericRecords.map(gr => {


首先如果是hoodie.datasource.write.drop.partition.columns为true(默认是false),则会从schema中删除hoodie.datasource.write.

partitionpath.field字段


如果hoodie.datasource.write.insert.drop.duplicates为true(默认是false)或者hoodie.datasource.write.operation是upsert(默认

是upsert),或者hoodie.combine.before.insert为true(默认是false),

则会创建HoodieAvroRecord<>(hKey, payload)类型的实例,其中HoodieKey以recordkey和partitionpath组成,playload为OverwriteWithLatestAvroPayload实例,


hoodieAllIncomingRecords就变成了RDD[HoodieAvroRecord]


  • writerDataSchema= client 这些就是创建SparkRDDWriteClient 客户端


  • isAsyncCompactionEnabled

  • 默认asyncCompactionTriggerFnDefined是没有的,所以不会开启异步的CompactionisAsyncClusteringEnabled同理也是

val hoodieRecords =


如果配置了hoodie.datasource.write.insert.drop.duplicatestrue(默认是false),则会进行去重处理,具体是调用DataSourceUtils.dropDuplicates方法:

SparkRDDReadClient client = new SparkRDDReadClient<>(new HoodieSparkEngineContext(jssc), writeConfig);
   return client.tagLocation(incomingHoodieRecords)
       .filter(r -> !((HoodieRecord<HoodieRecordPayload>) r).isCurrentLocationKnown());

SparkRDDReadClient client 在创建Client的时候,会进行索引的创建this.index = SparkHoodieIndexFactory.createIndex(clientConfig);


如果有hoodie.index.class设置,则实例化对象,否则根据hoodie.index.type的值来建立索引(默认是HoodieSimpleIndex,适合做测试用)


client.tagLocation(incomingHoodieRecords)…


从要插入的记录中过滤出在index中不存在的记录,最终调用的是index.tagLocation方法

如果hoodie.datasource.write.insert.drop.duplicates为false,则保留所有的记录


client.startCommitWithTime 开始写操作,这涉及到回滚的操作


  • 会先过滤出需要回滚的的的写失败的文件,如果hoodie.cleaner.policy.failed.writesEAGER(默认是EAGER),就会在这次提交中回滚失败的文件

  • 然后创建一个后缀为deltacommit.requested的文件,此时没有真正的写

  • val writeResult = DataSourceUtils.doWriteOperation

  • 真正的写操作


相关文章
|
13天前
|
分布式计算 资源调度 大数据
【决战大数据之巅】:Spark Standalone VS YARN —— 揭秘两大部署模式的恩怨情仇与终极对决!
【8月更文挑战第7天】随着大数据需求的增长,Apache Spark 成为关键框架。本文对比了常见的 Spark Standalone 与 YARN 部署模式。Standalone 作为自带的轻量级集群管理服务,易于设置,适用于小规模或独立部署;而 YARN 作为 Hadoop 的资源管理系统,支持资源的统一管理和调度,更适合大规模生产环境及多框架集成。我们将通过示例代码展示如何在这两种模式下运行 Spark 应用程序。
56 3
|
20天前
|
SQL 弹性计算 资源调度
云服务器 ECS产品使用问题之bin/spark-sql --master yarn如何进行集群模式运行
云服务器ECS(Elastic Compute Service)是各大云服务商阿里云提供的一种基础云计算服务,它允许用户租用云端计算资源来部署和运行各种应用程序。以下是一个关于如何使用ECS产品的综合指南。
|
3天前
|
分布式计算 Serverless 数据处理
EMR Serverless Spark 实践教程 | 通过 Apache Airflow 使用 Livy Operator 提交任务
Apache Airflow 是一个强大的工作流程自动化和调度工具,它允许开发者编排、计划和监控数据管道的执行。EMR Serverless Spark 为处理大规模数据处理任务提供了一个无服务器计算环境。本文为您介绍如何通过 Apache Airflow 的 Livy Operator 实现自动化地向 EMR Serverless Spark 提交任务,以实现任务调度和执行的自动化,帮助您更有效地管理数据处理任务。
19 0
|
1月前
|
分布式计算 大数据 Spark
Spark大数据处理:技术、应用与性能优化(全)PDF书籍推荐分享
《Spark大数据处理:技术、应用与性能优化》深入浅出介绍Spark核心,涵盖部署、实战与性能调优,适合初学者。作者基于微软和IBM经验,解析Spark工作机制,探讨BDAS生态,提供实践案例,助力快速掌握。书中亦讨论性能优化策略。[PDF下载链接](https://zhangfeidezhu.com/?p=347)。![Spark Web UI](https://img-blog.csdnimg.cn/direct/16aaadbb4e13410f8cb2727c3786cc9e.png#pic_center)
76 1
Spark大数据处理:技术、应用与性能优化(全)PDF书籍推荐分享
|
1月前
|
SQL 分布式计算 大数据
MaxCompute操作报错合集之 Spark Local模式启动报错,是什么原因
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
1月前
|
SQL 分布式计算 Apache
Apache Doris + Apache Hudi 快速搭建指南|Lakehouse 使用手册(一)
本文将在 Docker 环境下,为读者介绍如何快速搭建 Apache Doris + Apache Hudi 的测试及演示环境,并对各功能操作进行演示,帮助读者快速入门。
Apache Doris + Apache Hudi 快速搭建指南|Lakehouse 使用手册(一)
|
30天前
|
分布式计算 Apache Spark
|
2月前
|
存储 分布式计算 监控
Spark Standalone模式是一种集群部署方式
【6月更文挑战第17天】Spark Standalone模式是一种集群部署方式
32 7
|
2月前
|
分布式计算 大数据 数据处理
Apache Spark在大数据处理中的应用
Apache Spark是大数据处理的热门工具,由AMPLab开发并捐赠给Apache软件基金会。它以内存计算和优化的执行引擎著称,提供比Hadoop更快的处理速度,支持批处理、交互式查询、流处理和机器学习。Spark架构包括Driver、Master、Worker Node和Executor,核心组件有RDD、DataFrame、Dataset、Spark SQL、Spark Streaming、MLlib和GraphX。文章通过代码示例展示了Spark在批处理、交互式查询和实时数据处理中的应用,并讨论了其优势(高性能、易用性、通用性和集成性)和挑战。【6月更文挑战第11天】
76 6
|
2月前
|
Apache 开发者
Apache Seata 如何解决 TCC 模式的幂等、悬挂和空回滚问题
【6月更文挑战第8天】Apache Seata 是一款分布式事务框架,解决TCC模式下的幂等、悬挂和空回滚问题。通过记录事务状态处理幂等,设置超时机制避免悬挂,明确标记Try操作成功来处理空回滚。Seata 提供丰富配置和管理功能,确保分布式事务的可靠性和效率,支持复杂事务处理场景,为企业业务发展提供支持。
105 7

推荐镜像

更多