1. 准备
Hudi支持Spark-2.x版本,你可以点击如下链接安装Spark,并使用pyspark启动
# pyspark export PYSPARK_PYTHON=$(which python3) spark-2.4.4-bin-hadoop2.7/bin/pyspark \ --packages org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4 \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
- spark-avro模块需要在--packages显示指定
- spark-avro和spark的版本必须匹配
- 本示例中,由于依赖spark-avro_2.11,因此使用的是scala2.11构建hudi-spark-bundle,如果使用spark-avro_2.12,相应的需要使用hudi-spark-bundle_2.12
进行一些前置变量初始化
# pyspark tableName = "hudi_trips_cow" basePath = "file:///tmp/hudi_trips_cow" dataGen = sc._jvm.org.apache.hudi.QuickstartUtils.DataGenerator()
其中DataGenerator可以用来基于行程schema生成插入和删除的样例数据。
2. 插入数据
生成一些新的行程数据,加载到DataFrame中,并将DataFrame写入Hudi表
# pyspark inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(10)) df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)) hudi_options = { 'hoodie.table.name': tableName, 'hoodie.datasource.write.recordkey.field': 'uuid', 'hoodie.datasource.write.partitionpath.field': 'partitionpath', 'hoodie.datasource.write.table.name': tableName, 'hoodie.datasource.write.operation': 'insert', 'hoodie.datasource.write.precombine.field': 'ts', 'hoodie.upsert.shuffle.parallelism': 2, 'hoodie.insert.shuffle.parallelism': 2 } df.write.format("hudi"). \ options(**hudi_options). \ mode("overwrite"). \ save(basePath)
mode(Overwrite)
会覆盖并重新创建数据集。示例中提供了一个主键 (schema中的uuid
),分区字段(region/county/city
)和组合字段(schema中的ts
) 以确保行程记录在每个分区中都是唯一的。
3. 查询数据
将数据加载至DataFrame
# pyspark tripsSnapshotDF = spark. \ read. \ format("hudi"). \ load(basePath + "/*/*/*/*") tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot") spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0").show() spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from hudi_trips_snapshot").show()
该查询提供读取优化视图,由于我们的分区路径格式为
region/country/city
),从基本路径(basepath)开始,我们使用load(basePath + "/*/*/*/*")
来加载数据。
4. 更新数据
与插入新数据类似,还是使用DataGenerator生成更新数据,然后使用DataFrame写入Hudi表。
# pyspark updates = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateUpdates(10)) df = spark.read.json(spark.sparkContext.parallelize(updates, 2)) df.write.format("hudi"). \ options(**hudi_options). \ mode("append"). \ save(basePath)
注意,现在保存模式现在为
append
。通常,除非是第一次尝试创建数据集,否则请始终使用追加模式。每个写操作都会生成一个新的由时间戳表示的commit 。
5. 增量查询
Hudi提供了增量拉取的能力,即可以拉取从指定commit时间之后的变更,如不指定结束时间,那么将会拉取最新的变更。
# pyspark # reload data spark. \ read. \ format("hudi"). \ load(basePath + "/*/*/*/*"). \ createOrReplaceTempView("hudi_trips_snapshot") commits = list(map(lambda row: row[0], spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_trips_snapshot order by commitTime").limit(50).collect())) beginTime = commits[len(commits) - 2] # commit time we are interested in # incrementally query data incremental_read_options = { 'hoodie.datasource.query.type': 'incremental', 'hoodie.datasource.read.begin.instanttime': beginTime, } tripsIncrementalDF = spark.read.format("hudi"). \ options(**incremental_read_options). \ load(basePath) tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental") spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_incremental where fare > 20.0").show()
这表示查询在开始时间提交之后的所有变更,此增量拉取功能可以在批量数据上构建流式管道。
6. 特定时间点查询
即如何查询特定时间的数据,可以通过将结束时间指向特定的提交时间,将开始时间指向”000”(表示最早的提交时间)来表示特定时间。
# pyspark beginTime = "000" # Represents all commits > this time. endTime = commits[len(commits) - 2] # query point in time data point_in_time_read_options = { 'hoodie.datasource.query.type': 'incremental', 'hoodie.datasource.read.end.instanttime': endTime, 'hoodie.datasource.read.begin.instanttime': beginTime } tripsPointInTimeDF = spark.read.format("hudi"). \ options(**point_in_time_read_options). \ load(basePath) tripsPointInTimeDF.createOrReplaceTempView("hudi_trips_point_in_time") spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_point_in_time where fare > 20.0").show()
7. 删除数据
删除传入的HoodieKey集合,注意:删除操作只支持append模式
# pyspark # fetch total records count spark.sql("select uuid, partitionPath from hudi_trips_snapshot").count() # fetch two records to be deleted ds = spark.sql("select uuid, partitionPath from hudi_trips_snapshot").limit(2) # issue deletes hudi_delete_options = { 'hoodie.table.name': tableName, 'hoodie.datasource.write.recordkey.field': 'uuid', 'hoodie.datasource.write.partitionpath.field': 'partitionpath', 'hoodie.datasource.write.table.name': tableName, 'hoodie.datasource.write.operation': 'delete', 'hoodie.datasource.write.precombine.field': 'ts', 'hoodie.upsert.shuffle.parallelism': 2, 'hoodie.insert.shuffle.parallelism': 2 } from pyspark.sql.functions import lit deletes = list(map(lambda row: (row[0], row[1]), ds.collect())) df = spark.sparkContext.parallelize(deletes).toDF(['partitionpath', 'uuid']).withColumn('ts', lit(0.0)) df.write.format("hudi"). \ options(**hudi_delete_options). \ mode("append"). \ save(basePath) # run the same read query as above. roAfterDeleteViewDF = spark. \ read. \ format("hudi"). \ load(basePath + "/*/*/*/*") roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot") # fetch should return (total - 2) records spark.sql("select uuid, partitionPath from hudi_trips_snapshot").count()
8. 总结
本篇博文展示了如何使用pyspark来插入、删除、更新Hudi表,有pyspark和Hudi需求的小伙伴不妨一试!