1. 引入
在0.5.1版本之前,用户若想删除某条记录,可以使用Spark DataSource,并将 DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY
设置为 EmptyHoodieRecordPayload.class.getName
,便可删除指定记录,在Hudi新发布的0.5.1版本,可不使用上述配置项删除记录,而提供三种方式删除记录:Hudi API,Spark DataSource,DeltaStreamer,下面逐一介绍如何使用。
2. 步骤
2.1 使用Hudi API
如果应用程序中已经内嵌了HoodieWriteClient,可以直接使用HoodieWriteClient如下API删除记录
/** * Deletes a list of {@link HoodieKey}s from the Hoodie table, at the supplied commitTime {@link HoodieKey}s will be * deduped and non existant keys will be removed before deleting. * * @param keys {@link List} of {@link HoodieKey}s to be deleted * @param commitTime Commit time handle * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts */ public JavaRDD<WriteStatus> delete(JavaRDD<HoodieKey> keys, final String commitTime);
2.2 使用DataSource
介绍如何使用Datasource API对示例数据集执行删除的示例。与快速入门中的示例相同。
1. 启动spark-shell
bin/spark-shell --packages org.apache.hudi:hudi-spark-bundle:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4 \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
2. 导入必要的Import
import org.apache.hudi.QuickstartUtils._ import scala.collection.JavaConversions._ import org.apache.spark.sql.SaveMode._ import org.apache.hudi.DataSourceReadOptions._ import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.config.HoodieWriteConfig._ val tableName = "hudi_cow_table" val basePath = "file:///tmp/hudi_cow_table" val dataGen = new DataGenerator
3. 插入数据
val inserts = convertToStringList(dataGen.generateInserts(10)) val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)) df.write.format("org.apache.hudi"). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Overwrite). save(basePath);
4. 查询数据
val roViewDF = spark. read. format("org.apache.hudi"). load(basePath + "/*/*/*/*") roViewDF.createOrReplaceTempView("hudi_ro_table") spark.sql("select count(*) from hudi_ro_table").show() // should return 10 (number of records inserted above) val riderValue = spark.sql("select distinct rider from hudi_ro_table").show() // copy the value displayed to be used in next step
5. 准备待删除数据集
首先通过查询准备好待删除的数据集
val df = spark.sql("select uuid, partitionPath from hudi_ro_table where rider = 'rider-213'")
6. 删除数据
val deletes = dataGen.generateDeletes(df.collectAsList()) val df = spark.read.json(spark.sparkContext.parallelize(deletes, 2)); df.write.format("org.apache.hudi"). options(getQuickstartWriteConfigs). option(OPERATION_OPT_KEY,"delete"). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Append). save(basePath);
7. 验证
重新加载表记录,验证记录是否被删除
val roViewDFAfterDelete = spark. read. format("org.apache.hudi"). load(basePath + "/*/*/*/*") roViewDFAfterDelete.createOrReplaceTempView("hudi_ro_table") spark.sql("select uuid, partitionPath from hudi_ro_table where rider = 'rider-213'").show() // should not return any rows
2.3 使用DeltaStreamer
使用HoodieDeltaStreamer进行删除与upsert相同,它依赖每个记录中名为“hoodieisdeleted”的boolean类型的特定字段。
- 如果记录的字段值设置为false或不存在,则将其视为常规upsert。
- 如果不是(如果该值设置为true),则将其视为已删除记录。
这意味着必须更改数据源的schema来添加此字段,并且所有传入记录都应设置此字段值,在未来的版本中我们将尽量放开这点。
如原始数据源的schema如下。
{ "type":"record", "name":"example_tbl", "fields":[{ "name": "uuid", "type": "String" }, { "name": "ts", "type": "string" }, { "name": "partitionPath", "type": "string" }, { "name": "rank", "type": "long" } ]}
那么要利用DeltaStreamer的删除功能,必须更改schema如下。
{ "type":"record", "name":"example_tbl", "fields":[{ "name": "uuid", "type": "String" }, { "name": "ts", "type": "string" }, { "name": "partitionPath", "type": "string" }, { "name": "rank", "type": "long" }, { "name" : "_hoodie_is_deleted", "type" : "boolean", "default" : false } ]}
upsert传入记录示例数据如下
{"ts": 0.0, "uuid": "69cdb048-c93e-4532-adf9-f61ce6afe605", "rank": 1034, "partitionpath": "americas/brazil/sao_paulo", "_hoodie_is_deleted" : false}
delete传入记录示例数据如下
{"ts": 0.0, "uuid": "19tdb048-c93e-4532-adf9-f61ce6afe10", "rank": 1045, "partitionpath": "americas/brazil/sao_paulo", "_hoodie_is_deleted" : true}
只需要进行一次性的变更,DeltasDreamer将处理每批中的upsert和delete,并且每一批都可以包含upsert和deletes的混合,之后不需要额外的步骤或更改。
3. 总结
在Hudi 0.5.1-incubating版本中引入了额外三种删除记录的能力,用户可使用上述任意一种方案来达到删除记录的目的。