解锁Apache Hudi删除记录新姿势

简介: 解锁Apache Hudi删除记录新姿势

1. 引入

在0.5.1版本之前,用户若想删除某条记录,可以使用Spark DataSource,并将 DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY设置为 EmptyHoodieRecordPayload.class.getName,便可删除指定记录,在Hudi新发布的0.5.1版本,可不使用上述配置项删除记录,而提供三种方式删除记录:Hudi APISpark DataSourceDeltaStreamer,下面逐一介绍如何使用。

2. 步骤

2.1 使用Hudi API

如果应用程序中已经内嵌了HoodieWriteClient,可以直接使用HoodieWriteClient如下API删除记录

/**
   * Deletes a list of {@link HoodieKey}s from the Hoodie table, at the supplied commitTime {@link HoodieKey}s will be
   * deduped and non existant keys will be removed before deleting.
   *
   * @param keys {@link List} of {@link HoodieKey}s to be deleted
   * @param commitTime Commit time handle
   * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts
   */
  public JavaRDD<WriteStatus> delete(JavaRDD<HoodieKey> keys, final String commitTime);

2.2 使用DataSource

介绍如何使用Datasource API对示例数据集执行删除的示例。与快速入门中的示例相同。

1. 启动spark-shell
bin/spark-shell --packages org.apache.hudi:hudi-spark-bundle:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
2. 导入必要的Import
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
val tableName = "hudi_cow_table"
val basePath = "file:///tmp/hudi_cow_table"
val dataGen = new DataGenerator
3. 插入数据
val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("org.apache.hudi").
    options(getQuickstartWriteConfigs).
    option(PRECOMBINE_FIELD_OPT_KEY, "ts").
    option(RECORDKEY_FIELD_OPT_KEY, "uuid").
    option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
    option(TABLE_NAME, tableName).
    mode(Overwrite).
    save(basePath);
4. 查询数据
val roViewDF = spark.
    read.
    format("org.apache.hudi").
    load(basePath + "/*/*/*/*")
roViewDF.createOrReplaceTempView("hudi_ro_table")
spark.sql("select count(*) from hudi_ro_table").show() // should return 10 (number of records inserted above)
val riderValue = spark.sql("select distinct rider from hudi_ro_table").show()
// copy the value displayed to be used in next step
5. 准备待删除数据集

首先通过查询准备好待删除的数据集

val df = spark.sql("select uuid, partitionPath from hudi_ro_table where rider = 'rider-213'")
6. 删除数据
val deletes = dataGen.generateDeletes(df.collectAsList())
val df = spark.read.json(spark.sparkContext.parallelize(deletes, 2));
df.write.format("org.apache.hudi").
options(getQuickstartWriteConfigs).
option(OPERATION_OPT_KEY,"delete").
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath);
7. 验证

重新加载表记录,验证记录是否被删除

val roViewDFAfterDelete = spark.
    read.
    format("org.apache.hudi").
    load(basePath + "/*/*/*/*")
roViewDFAfterDelete.createOrReplaceTempView("hudi_ro_table")
spark.sql("select uuid, partitionPath from hudi_ro_table where rider = 'rider-213'").show() // should not return any rows

2.3 使用DeltaStreamer

使用HoodieDeltaStreamer进行删除与upsert相同,它依赖每个记录中名为“hoodieisdeleted”的boolean类型的特定字段。

  • 如果记录的字段值设置为false或不存在,则将其视为常规upsert。
  • 如果不是(如果该值设置为true),则将其视为已删除记录。

这意味着必须更改数据源的schema来添加此字段,并且所有传入记录都应设置此字段值,在未来的版本中我们将尽量放开这点。

如原始数据源的schema如下。

{
  "type":"record",
  "name":"example_tbl",
  "fields":[{
     "name": "uuid",
     "type": "String"
  }, {
     "name": "ts",
     "type": "string"
  },  {
     "name": "partitionPath",
     "type": "string"
  }, {
     "name": "rank",
     "type": "long"
  }
]}

那么要利用DeltaStreamer的删除功能,必须更改schema如下。

{
  "type":"record",
  "name":"example_tbl",
  "fields":[{
     "name": "uuid",
     "type": "String"
  }, {
     "name": "ts",
     "type": "string"
  },  {
     "name": "partitionPath",
     "type": "string"
  }, {
     "name": "rank",
     "type": "long"
  }, {
    "name" : "_hoodie_is_deleted",
    "type" : "boolean",
    "default" : false
  }
]}

upsert传入记录示例数据如下

{"ts": 0.0, "uuid": "69cdb048-c93e-4532-adf9-f61ce6afe605", "rank": 1034, "partitionpath": "americas/brazil/sao_paulo", "_hoodie_is_deleted" : false}

delete传入记录示例数据如下

{"ts": 0.0, "uuid": "19tdb048-c93e-4532-adf9-f61ce6afe10", "rank": 1045, "partitionpath": "americas/brazil/sao_paulo", "_hoodie_is_deleted" : true}

只需要进行一次性的变更,DeltasDreamer将处理每批中的upsert和delete,并且每一批都可以包含upsert和deletes的混合,之后不需要额外的步骤或更改。

3. 总结

在Hudi 0.5.1-incubating版本中引入了额外三种删除记录的能力,用户可使用上述任意一种方案来达到删除记录的目的。

目录
相关文章
|
6月前
|
存储 Apache
Apache Hudi Savepoint实现分析
Apache Hudi Savepoint实现分析
113 0
|
6月前
|
存储 机器学习/深度学习 Apache
如何将Apache Hudi应用于机器学习
如何将Apache Hudi应用于机器学习
59 0
|
6月前
|
Apache 索引
精进Hudi系列|Apache Hudi索引实现分析(五)之基于List的IndexFileFilter
精进Hudi系列|Apache Hudi索引实现分析(五)之基于List的IndexFileFilter
67 0
|
6月前
|
存储 SQL Apache
Apache Hudi与Delta Lake对比
Apache Hudi与Delta Lake对比
100 0
|
6月前
|
Apache
Apache Hudi Rollback实现分析
Apache Hudi Rollback实现分析
93 0
|
4月前
|
SQL 分布式计算 Apache
Apache Doris + Apache Hudi 快速搭建指南|Lakehouse 使用手册(一)
本文将在 Docker 环境下,为读者介绍如何快速搭建 Apache Doris + Apache Hudi 的测试及演示环境,并对各功能操作进行演示,帮助读者快速入门。
Apache Doris + Apache Hudi 快速搭建指南|Lakehouse 使用手册(一)
|
5月前
|
消息中间件 Java Kafka
实时计算 Flink版操作报错合集之从hudi读数据,报错NoSuchMethodError:org.apache.hudi.format.cow.vector.reader.PaequetColumnarRowSplit.getRecord(),该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
120 0
|
6月前
|
存储 SQL 分布式计算
使用Amazon EMR和Apache Hudi在S3上插入,更新,删除数据
使用Amazon EMR和Apache Hudi在S3上插入,更新,删除数据
222 0
|
6月前
|
存储 分布式计算 Hadoop
一文了解Apache Hudi架构、工具和最佳实践
一文了解Apache Hudi架构、工具和最佳实践
1122 0
|
6月前
|
SQL 分布式计算 NoSQL
使用Apache Hudi和Debezium构建健壮的CDC管道
使用Apache Hudi和Debezium构建健壮的CDC管道
73 0

推荐镜像

更多