真香!PySpark整合Apache Hudi实战

简介: 笔记

1. 准备


Hudi支持Spark-2.x版本,你可以点击如下链接安装Spark,并使用pyspark启动

# pyspark
export PYSPARK_PYTHON=$(which python3)
spark-2.4.4-bin-hadoop2.7/bin/pyspark \
  --packages org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4 \
  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
  • spark-avro模块需要在--packages显示指定
  • spark-avro和spark的版本必须匹配
  • 本示例中,由于依赖spark-avro_2.11,因此使用的是scala2.11构建hudi-spark-bundle,如果使用spark-avro_2.12,相应的需要使用hudi-spark-bundle_2.12

进行一些前置变量初始化

# pyspark
tableName = "hudi_trips_cow"
basePath = "file:///tmp/hudi_trips_cow"
dataGen = sc._jvm.org.apache.hudi.QuickstartUtils.DataGenerator()

其中DataGenerator可以用来基于行程schema生成插入和删除的样例数据。


2. 插入数据


生成一些新的行程数据,加载到DataFrame中,并将DataFrame写入Hudi表

# pyspark
inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(10))
df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
hudi_options = {
  'hoodie.table.name': tableName,
  'hoodie.datasource.write.recordkey.field': 'uuid',
  'hoodie.datasource.write.partitionpath.field': 'partitionpath',
  'hoodie.datasource.write.table.name': tableName,
  'hoodie.datasource.write.operation': 'insert',
  'hoodie.datasource.write.precombine.field': 'ts',
  'hoodie.upsert.shuffle.parallelism': 2, 
  'hoodie.insert.shuffle.parallelism': 2
}
df.write.format("hudi"). \
  options(**hudi_options). \
  mode("overwrite"). \
  save(basePath)

mode(Overwrite)会覆盖并重新创建数据集。示例中提供了一个主键 (schema中的uuid),分区字段(region/county/city)和组合字段(schema中的ts) 以确保行程记录在每个分区中都是唯一的。


3. 查询数据


将数据加载至DataFrame

# pyspark
tripsSnapshotDF = spark. \
  read. \
  format("hudi"). \
  load(basePath + "/*/*/*/*")
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("select fare, begin_lon, begin_lat, ts from  hudi_trips_snapshot where fare > 20.0").show()
spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from  hudi_trips_snapshot").show()

该查询提供读取优化视图,由于我们的分区路径格式为region/country/city),从基本路径(basepath)开始,我们使用load(basePath + "/*/*/*/*")来加载数据。


4. 更新数据


与插入新数据类似,还是使用DataGenerator生成更新数据,然后使用DataFrame写入Hudi表。

# pyspark
updates = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateUpdates(10))
df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
df.write.format("hudi"). \
  options(**hudi_options). \
  mode("append"). \
  save(basePath)

注意,现在保存模式现在为append。通常,除非是第一次尝试创建数据集,否则请始终使用追加模式。每个写操作都会生成一个新的由时间戳表示的commit


5. 增量查询


Hudi提供了增量拉取的能力,即可以拉取从指定commit时间之后的变更,如不指定结束时间,那么将会拉取最新的变更。

# pyspark
# reload data
spark. \
  read. \
  format("hudi"). \
  load(basePath + "/*/*/*/*"). \
  createOrReplaceTempView("hudi_trips_snapshot")
commits = list(map(lambda row: row[0], spark.sql("select distinct(_hoodie_commit_time) as commitTime from  hudi_trips_snapshot order by commitTime").limit(50).collect()))
beginTime = commits[len(commits) - 2] # commit time we are interested in
# incrementally query data
incremental_read_options = {
  'hoodie.datasource.query.type': 'incremental',
  'hoodie.datasource.read.begin.instanttime': beginTime,
}
tripsIncrementalDF = spark.read.format("hudi"). \
  options(**incremental_read_options). \
  load(basePath)
tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")
spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from  hudi_trips_incremental where fare > 20.0").show()

这表示查询在开始时间提交之后的所有变更,此增量拉取功能可以在批量数据上构建流式管道。


6. 特定时间点查询


即如何查询特定时间的数据,可以通过将结束时间指向特定的提交时间,将开始时间指向”000”(表示最早的提交时间)来表示特定时间。

# pyspark
beginTime = "000" # Represents all commits > this time.
endTime = commits[len(commits) - 2]
# query point in time data
point_in_time_read_options = {
  'hoodie.datasource.query.type': 'incremental',
  'hoodie.datasource.read.end.instanttime': endTime,
  'hoodie.datasource.read.begin.instanttime': beginTime
}
tripsPointInTimeDF = spark.read.format("hudi"). \
  options(**point_in_time_read_options). \
  load(basePath)
tripsPointInTimeDF.createOrReplaceTempView("hudi_trips_point_in_time")
spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_point_in_time where fare > 20.0").show()


7. 删除数据


删除传入的HoodieKey集合,注意:删除操作只支持append模式

# pyspark
# fetch total records count
spark.sql("select uuid, partitionPath from hudi_trips_snapshot").count()
# fetch two records to be deleted
ds = spark.sql("select uuid, partitionPath from hudi_trips_snapshot").limit(2)
# issue deletes
hudi_delete_options = {
  'hoodie.table.name': tableName,
  'hoodie.datasource.write.recordkey.field': 'uuid',
  'hoodie.datasource.write.partitionpath.field': 'partitionpath',
  'hoodie.datasource.write.table.name': tableName,
  'hoodie.datasource.write.operation': 'delete',
  'hoodie.datasource.write.precombine.field': 'ts',
  'hoodie.upsert.shuffle.parallelism': 2, 
  'hoodie.insert.shuffle.parallelism': 2
}
from pyspark.sql.functions import lit
deletes = list(map(lambda row: (row[0], row[1]), ds.collect()))
df = spark.sparkContext.parallelize(deletes).toDF(['partitionpath', 'uuid']).withColumn('ts', lit(0.0))
df.write.format("hudi"). \
  options(**hudi_delete_options). \
  mode("append"). \
  save(basePath)
# run the same read query as above.
roAfterDeleteViewDF = spark. \
  read. \
  format("hudi"). \
  load(basePath + "/*/*/*/*") 
roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot")
# fetch should return (total - 2) records
spark.sql("select uuid, partitionPath from hudi_trips_snapshot").count()


8. 总结


本篇博文展示了如何使用pyspark来插入、删除、更新Hudi表,有pyspark和Hudi需求的小伙伴不妨一试!

目录
相关文章
|
8月前
|
运维 Linux Apache
Linux Apache服务详解——Apache虚拟目录与禁止显示目录列表实战
Linux Apache服务详解——Apache虚拟目录与禁止显示目录列表实战
119 2
|
8月前
|
存储 Apache
Apache Hudi Savepoint实现分析
Apache Hudi Savepoint实现分析
131 0
|
2月前
|
消息中间件 数据挖掘 Kafka
Apache Kafka流处理实战:构建实时数据分析应用
【10月更文挑战第24天】在当今这个数据爆炸的时代,能够快速准确地处理实时数据变得尤为重要。无论是金融交易监控、网络行为分析还是物联网设备的数据收集,实时数据处理技术都是不可或缺的一部分。Apache Kafka作为一款高性能的消息队列系统,不仅支持传统的消息传递模式,还提供了强大的流处理能力,能够帮助开发者构建高效、可扩展的实时数据分析应用。
108 5
|
3月前
|
消息中间件 存储 druid
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
63 3
|
5月前
|
关系型数据库 Linux 网络安全
"Linux系统实战:从零开始部署Apache+PHP Web项目,轻松搭建您的在线应用"
【8月更文挑战第9天】Linux作为服务器操作系统,凭借其稳定性和安全性成为部署Web项目的优选平台。本文以Apache Web服务器和PHP项目为例,介绍部署流程。首先,通过包管理器安装Apache与PHP;接着创建项目目录,并上传项目文件至该目录;根据需要配置Apache虚拟主机;最后重启Apache服务并测试项目。确保防火墙允许HTTP流量,正确配置数据库连接,并定期更新系统以维持安全。随着项目复杂度提升,进一步学习高级配置将变得必要。
423 0
|
6月前
|
SQL 分布式计算 Apache
Apache Doris + Apache Hudi 快速搭建指南|Lakehouse 使用手册(一)
本文将在 Docker 环境下,为读者介绍如何快速搭建 Apache Doris + Apache Hudi 的测试及演示环境,并对各功能操作进行演示,帮助读者快速入门。
Apache Doris + Apache Hudi 快速搭建指南|Lakehouse 使用手册(一)
|
7月前
|
存储 Apache 文件存储
在Apache环境下为Web网站增设访问控制:实战指南
在Apache服务器上保护网站资源涉及启用访问控制模块(`mod_authz_core`和`mod_auth_basic`),在`.htaccess`或`httpd.conf`中设定权限,如限制对特定目录的访问。创建`.htpasswd`文件存储用户名和密码,并使用`htpasswd`工具管理用户。完成配置后重启Apache服务,访问受限目录时需提供有效的用户名和密码。对于高安全性需求,可考虑更复杂的认证方法。【6月更文挑战第20天】
458 4
|
7月前
|
弹性计算 应用服务中间件 Linux
双剑合璧:在同一ECS服务器上共存Apache与Nginx的实战攻略
在ECS服务器上同时部署Apache和Nginx的实战:安装更新系统,Ubuntu用`sudo apt install apache2 nginx`,CentOS用`sudo yum install httpd nginx`。配置Nginx作为反向代理,处理静态内容及转发动态请求到Apache(监听8080端口)。调整Apache的`ports.conf`监听8080。重启服务测试,实现两者高效协同,提升Web服务性能。记得根据流量和需求优化配置。【6月更文挑战第21天】
645 1
|
6月前
|
分布式计算 Apache Spark
|
7月前
|
消息中间件 Java Kafka
实时计算 Flink版操作报错合集之从hudi读数据,报错NoSuchMethodError:org.apache.hudi.format.cow.vector.reader.PaequetColumnarRowSplit.getRecord(),该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
140 0

推荐镜像

更多