Apache Hudi又双叕被国内顶级云服务提供商集成了!

简介: Apache Hudi 在 HDFS 的数据集上提供了插入更新和增量拉取的流原语。

是的,最近国内云服务提供商腾讯云在其EMR-V2.2.0版本中优先集成了Hudi 0.5.1版本作为其云上的数据湖解决方案对外提供服务

Apache Hudi 在 HDFS 的数据集上提供了插入更新和增量拉取的流原语。

一般来说,我们会将大量数据存储到 HDFS,新数据增量写入,而旧数据鲜有改动,特别是在经过数据清洗,放入数据仓库的场景。而且在数据仓库如 hive 中,对于 update 的支持非常有限,计算昂贵。另一方面,若是有仅对某段时间内新增数据进行分析的场景,则 hive、presto、hbase 等也未提供原生方式,而是需要根据时间戳进行过滤分析。

在此需求下,Hudi 可以提供这两种需求的实现。第一个是对 record 级别的更新,另一个是仅对增量数据的查询。且 Hudi 提供了对 Hive、presto、Spark 的支持,可以直接使用这些组件对 Hudi 管理的数据进行查询。

Hudi 是一个通用的大数据存储系统,主要特性:

  • 摄取和查询引擎之间的快照隔离,包括 Apache Hive、Presto 和 Apache Spark。
  • 支持回滚和存储点,可以恢复数据集。
  • 自动管理文件大小和布局,以优化查询性能准实时摄取,为查询提供最新数据。
  • 实时数据和列数据的异步压缩。


时间轴


在它的核心,Hudi 维护一条包含在不同的即时时间所有对数据集操作的时间轴,从而提供了从不同时间点出发得到不同的视图下的数据集。

Hudi 即时包含以下组件:

  • 操作类型:对数据集执行的操作类型。
  • 即时时间:即时时间通常是一个时间戳(例如:20190117010349),该时间戳按操作开始时间的顺序单调增加。
  • 状态:即时的状态。


文件组织


Hudi 将 DFS 上的数据集组织到基本路径下的目录结构中。数据集分为多个分区,这些分区是包含该分区的数据文件的文件夹,这与 Hive 表非常相似。

每个分区被相对于基本路径的特定分区路径区分开来。

在每个分区内,文件被组织为文件组,由文件id唯一标识。

每个文件组包含多个文件切片,其中每个切片包含在某个提交/压缩即时时间生成的基本列文件*.parquet以及一组日志文件*.log*,该文件包含自生成基本文件以来对基本文件的插入/更新。

Hudi 采用 MVCC 设计,其中压缩操作将日志和基本文件合并以产生新的文件片,而清理操作则将未使用的/较旧的文件片删除以回收 DFS 上的空间。

Hudi 通过索引机制将给定的 hoodie 键(记录键+分区路径)映射到文件组,从而提供了高效的 Upsert。

一旦将记录的第一个版本写入文件,记录键和文件组/文件id之间的映射就永远不会改变。简而言之,映射的文件组包含一组记录的所有版本。


存储类型


Hudi 支持以下存储类型:

  • 写时复制:仅使用列文件格式(例如 parquet)存储数据。通过在写入过程中执行同步合并以更新版本并重写文件。
  • 读时合并:使用列式(例如 parquet)+ 基于行(例如 avro)的文件格式组合来存储数据。 更新记录到增量文件中,然后进行同步或异步压缩以生成列文件的新版本。

下表总结了这两种存储类型之间的权衡:

权衡 写时复制 读时合并
数据延迟 更高 更低
更新代价(I/O) 更高(重写整个parquet文件) 更低(追加到增量日志)
Parquet文件大小 更小(高更新代价(I/o)) 更大(低更新代价)
写放大 更高 更低(取决于压缩策略)


hudi 对 EMR 底层存储支持


  • HDFS
  • COS


安装 Hudi


进入 EMR 购买页,选择【产品版本】为 EMR-V2.2.0,选择【可选组件】为【hudi 0.5.1】。hudi 组件默认安装在 master 和 router 节点上。

! hudi 组件依赖 hive 和 spark 组件, 如果选择安装 hudi 组件,EMR 将自动安装 hive 和 spark 组件。


使用示例


可参考 hudi 官网示例

  1. 登录 master 节点,切换为 hadoop 用户。
  2. 加载 spark 配置。
cd /usr/local/service/hudi
ln -s /usr/local/service/spark/conf/spark-defaults.conf /usr/local/service/hudi/demo/config/spark-defaults.conf

上传配置到 hdfs:

hdfs dfs -mkdir -p /hudi/config
hdfs dfs -copyFromLocal demo/config/* /hudi/config/
  1. 修改 kafka 数据源。
/usr/local/service/apps/hudi-0.5.1/demo/config/kafka-source.properties
bootstrap.servers=kafka_ip:kafka_port

上传第一批次数据:

cat demo/data/batch_1.json | kafkacat -b [kafka_ip] -t stock_ticks -P
  1. 摄取第一批数据。
spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer --master yarn ./hudi-utilities-bundle_2.11-0.5.1-incubating.jar   --table-type COPY_ON_WRITE --source-class org.apache.hudi.utilities.sources.JsonKafkaSource --source-ordering-field ts  --target-base-path /usr/hive/warehouse/stock_ticks_cow --target-table stock_ticks_cow --props /hudi/config/kafka-source.properties --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider
spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer  --master yarn ./hudi-utilities-bundle_2.11-0.5.1-incubating.jar  --table-type MERGE_ON_READ --source-class org.apache.hudi.utilities.sources.JsonKafkaSource --source-ordering-field ts  --target-base-path /usr/hive/warehouse/stock_ticks_mor --target-table stock_ticks_mor --props /hudi/config/kafka-source.properties --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider --disable-compaction
  1. 查看 hdfs 数据。
hdfs dfs -ls /usr/hive/warehouse/
  1. 同步 hive 元数据。
bin/run_sync_tool.sh  --jdbc-url jdbc:hive2://[hiveserver2_ip:hiveserver2_port]  --user hadoop --pass [password] --partitioned-by dt --base-path /usr/hive/warehouse/stock_ticks_cow --database default --table stock_ticks_cow
bin/run_sync_tool.sh  --jdbc-url jdbc:hive2://[hiveserver2_ip:hiveserver2_port]   --user hadoop --pass [password]--partitioned-by dt --base-path /usr/hive/warehouse/stock_ticks_mor --database default --table stock_ticks_mor --skip-ro-suffix
  1. 使用计算引擎查询数据。
  • hive 引擎
beeline -u jdbc:hive2://[hiveserver2_ip:hiveserver2_port] -n hadoop --hiveconf hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat --hiveconf hive.stats.autogather=false

或者 spark 引擎

spark-sql --master yarn --conf spark.sql.hive.convertMetastoreParquet=false

hive/spark 引擎执行如下 sql 语句:

select symbol, max(ts) from stock_ticks_cow group by symbol HAVING symbol = 'GOOG';
select `_hoodie_commit_time`, symbol, ts, volume, open, close  from stock_ticks_cow where  symbol = 'GOOG';
select symbol, max(ts) from stock_ticks_mor group by symbol HAVING symbol = 'GOOG';
select `_hoodie_commit_time`, symbol, ts, volume, open, close  from stock_ticks_mor where  symbol = 'GOOG';
select symbol, max(ts) from stock_ticks_mor_rt group by symbol HAVING symbol = 'GOOG';
select `_hoodie_commit_time`, symbol, ts, volume, open, close  from stock_ticks_mor_rt where  symbol = 'GOOG';
  • 进入 presto 引擎
/usr/local/service/presto-client/presto --server localhost:9000 --catalog hive --schema default --user Hadoop

presto 查询有下划线的字段需要用双引号,例如 "_hoodie_commit_time",执行如下 sql 语句:

select symbol, max(ts) from stock_ticks_cow group by symbol HAVING symbol = 'GOOG';
select "_hoodie_commit_time", symbol, ts, volume, open, close  from stock_ticks_cow where  symbol = 'GOOG';
select symbol, max(ts) from stock_ticks_mor group by symbol HAVING symbol = 'GOOG';
select "_hoodie_commit_time", symbol, ts, volume, open, close  from stock_ticks_mor where  symbol = 'GOOG';
select symbol, max(ts) from stock_ticks_mor_rt group by symbol HAVING symbol = 'GOOG';
  1. 上传第二批数据。
cat demo/data/batch_2.json | kafkacat -b 10.0.1.70 -t stock_ticks -P
  1. 摄取第二批增量数据。
spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer --master yarn ./hudi-utilities-bundle_2.11-0.5.1-incubating.jar   --table-type COPY_ON_WRITE --source-class org.apache.hudi.utilities.sources.JsonKafkaSource --source-ordering-field ts  --target-base-path /usr/hive/warehouse/stock_ticks_cow --target-table stock_ticks_cow --props /hudi/config/kafka-source.properties --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider
spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer  --master yarn ./hudi-utilities-bundle_2.11-0.5.1-incubating.jar  --table-type MERGE_ON_READ --source-class org.apache.hudi.utilities.sources.JsonKafkaSource --source-ordering-field ts  --target-base-path /usr/hive/warehouse/stock_ticks_mor --target-table stock_ticks_mor --props /hudi/config/kafka-source.properties --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider --disable-compaction
  1. 查询增量数据,查询方法同步骤7。
  2. 使用 hive-cli 工具。
cli/bin/hudi-cli.sh
connect --path /usr/hive/warehouse/stock_ticks_mor
compactions show all
compaction schedule
合并执行计划
 compaction run --compactionInstant [requestID] --parallelism 2 --sparkMemory 1G  --schemaFilePath /hudi/config/schema.avsc --retry 1
  1. 使用 tez/spark 引擎查询。
beeline -u jdbc:hive2://[hiveserver2_ip:hiveserver2_port] -n hadoop --hiveconf hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat --hiveconf hive.stats.autogather=false
set hive.execution.engine=tez;
set hive.execution.engine=spark;

然后执行 sql 查询,可参考步骤7。


与对象存储结合使用


与 hdfs 类似,需要在存储路径前加上cosn://[bucket]。参考如下操作:

bin/kafka-server-start.sh config/server.properties &
cat     demo/data/batch_1.json | kafkacat -b kafkaip -t stock_ticks -P
cat     demo/data/batch_2.json | kafkacat -b kafkaip -t stock_ticks -P
kafkacat -b kafkaip  -L
hdfs dfs -mkdir -p cosn://[bucket]/hudi/config
hdfs dfs -copyFromLocal demo/config/*  cosn://[bucket]/hudi/config/
spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer --master yarn ./hudi-utilities-bundle_2.11-0.5.1-incubating.jar   --table-type COPY_ON_WRITE --source-class org.apache.hudi.utilities.sources.JsonKafkaSource --source-ordering-field ts  --target-base-path cosn://[bucket]/usr/hive/warehouse/stock_ticks_cow --target-table stock_ticks_cow --props cosn://[bucket]/hudi/config/kafka-source.properties --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider
spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer  --master yarn ./hudi-utilities-bundle_2.11-0.5.1-incubating.jar  --table-type MERGE_ON_READ --source-class org.apache.hudi.utilities.sources.JsonKafkaSource --source-ordering-field ts  --target-base-path cosn://[bucket]/usr/hive/warehouse/stock_ticks_mor --target-table stock_ticks_mor --props cosn://[bucket]/hudi/config/kafka-source.properties --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider --disable-compaction
bin/run_sync_tool.sh  --jdbc-url jdbc:hive2://[hiveserver2_ip:hiveserver2_port] --user hadoop --pass isd@cloud --partitioned-by dt --base-path cosn://[bucket]/usr/hive/warehouse/stock_ticks_cow --database default --table stock_ticks_cow
bin/run_sync_tool.sh  --jdbc-url jdbc:hive2://[hiveserver2_ip:hiveserver2_port] --user hadoop --pass hive --partitioned-by dt --base-path cosn://[bucket]/usr/hive/warehouse/stock_ticks_mor --database default --table stock_ticks_mor --skip-ro-suffix
beeline -u jdbc:hive2://[hiveserver2_ip:hiveserver2_port] -n hadoop --hiveconf hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat --hiveconf hive.stats.autogather=false
spark-sql --master yarn --conf spark.sql.hive.convertMetastoreParquet=false
hivesqls:
select symbol, max(ts) from stock_ticks_cow group by symbol HAVING symbol = 'GOOG';
select `_hoodie_commit_time`, symbol, ts, volume, open, close  from stock_ticks_cow where  symbol = 'GOOG';
select symbol, max(ts) from stock_ticks_mor group by symbol HAVING symbol = 'GOOG';
select `_hoodie_commit_time`, symbol, ts, volume, open, close  from stock_ticks_mor where  symbol = 'GOOG';
select symbol, max(ts) from stock_ticks_mor_rt group by symbol HAVING symbol = 'GOOG';
select `_hoodie_commit_time`, symbol, ts, volume, open, close  from stock_ticks_mor_rt where  symbol = 'GOOG';
prestosqls:
/usr/local/service/presto-client/presto --server localhost:9000 --catalog hive --schema default --user Hadoop
select symbol, max(ts) from stock_ticks_cow group by symbol HAVING symbol = 'GOOG';
select "_hoodie_commit_time", symbol, ts, volume, open, close  from stock_ticks_cow where  symbol = 'GOOG';
select symbol, max(ts) from stock_ticks_mor group by symbol HAVING symbol = 'GOOG';
select "_hoodie_commit_time", symbol, ts, volume, open, close  from stock_ticks_mor where  symbol = 'GOOG';
select symbol, max(ts) from stock_ticks_mor_rt group by symbol HAVING symbol = 'GOOG';
select "_hoodie_commit_time", symbol, ts, volume, open, close  from stock_ticks_mor_rt where  symbol = 'GOOG';
cli/bin/hudi-cli.sh
connect --path cosn://[bucket]/usr/hive/warehouse/stock_ticks_mor
compactions show all
compaction schedule
compaction run --compactionInstant [requestid]  --parallelism 2 --sparkMemory 1G  --schemaFilePath cosn://[bucket]/hudi/config/schema.avsc --retry 1
目录
相关文章
|
1月前
|
存储 测试技术 C#
Azure 云服务与 C# 集成浅谈
本文介绍了 Azure 云服务与 C# 的集成方法,涵盖基础概念、资源创建、SDK 使用、常见问题解决及单元测试等内容,通过代码示例详细说明了如何在 C# 中调用 Azure 服务,帮助开发者提高开发效率和代码质量。
40 8
|
26天前
|
消息中间件 Java Kafka
什么是Apache Kafka?如何将其与Spring Boot集成?
什么是Apache Kafka?如何将其与Spring Boot集成?
63 5
|
28天前
|
消息中间件 Java Kafka
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
43 1
|
2月前
|
Java 测试技术 API
如何在 Apache JMeter 中集成 Elastic APM
如何在 Apache JMeter 中集成 Elastic APM
49 1
|
4月前
|
监控 数据可视化 Devops
Grafana 与云服务提供商的集成
【8月更文第29天】Grafana 是一个强大的数据可视化工具,可以与多种数据源集成,从而为用户提供详细的监控和分析仪表板。在云服务时代,Grafana 的这种灵活性使得它能够轻松地与 AWS、Azure 和 Google Cloud 等云服务提供商的数据源集成,帮助 DevOps 和 SRE 团队更好地监控云资源的状态。本文将介绍如何将 Grafana 与这些主流云服务提供商的数据源集成。
70 1
|
5月前
|
关系型数据库 API Apache
Flink CDC:基于 Apache Flink 的流式数据集成框架
本文整理自阿里云 Flink SQL 团队研发工程师于喜千(yux)在 SECon 全球软件工程技术大会中数据集成专场沙龙的分享。
18436 11
Flink CDC:基于 Apache Flink 的流式数据集成框架
|
4月前
|
存储 C# 关系型数据库
“云端融合:WPF应用无缝对接Azure与AWS——从Blob存储到RDS数据库,全面解析跨平台云服务集成的最佳实践”
【8月更文挑战第31天】本文探讨了如何将Windows Presentation Foundation(WPF)应用与Microsoft Azure和Amazon Web Services(AWS)两大主流云平台无缝集成。通过具体示例代码展示了如何利用Azure Blob Storage存储非结构化数据、Azure Cosmos DB进行分布式数据库操作;同时介绍了如何借助Amazon S3实现大规模数据存储及通过Amazon RDS简化数据库管理。这不仅提升了WPF应用的可扩展性和可用性,还降低了基础设施成本。
98 0
|
4月前
|
开发者 存储 API
Xamarin 云服务集成竟然如此强大,简化后端开发不再是梦,数据存储、用户认证、推送通知全搞定!
【8月更文挑战第31天】Xamarin 是一款强大的跨平台移动应用开发工具,通过与云服务集成,显著简化了后端开发。开发者无需自行搭建服务器,即可利用云服务提供的数据存储、用户认证、推送通知等功能,大幅减少数据库设计、服务器配置及 API 开发的时间成本。借助 Azure Mobile Apps 等云服务,Xamarin 可轻松实现数据存取操作,同时增强应用安全性与用户参与度,使开发者更专注于业务逻辑和用户体验,提升开发效率并降低成本。这种方式在快速发展的移动应用领域极具价值。
64 0
|
4月前
|
消息中间件 Kafka 数据处理
实时数据流处理:Dask Streams 与 Apache Kafka 集成
【8月更文第29天】在现代数据处理领域,实时数据流处理已经成为不可或缺的一部分。随着物联网设备、社交媒体和其他实时数据源的普及,处理这些高吞吐量的数据流成为了一项挑战。Apache Kafka 作为一种高吞吐量的消息队列服务,被广泛应用于实时数据流处理场景中。Dask Streams 是 Dask 库的一个子模块,它为 Python 开发者提供了一个易于使用的实时数据流处理框架。本文将介绍如何将 Dask Streams 与 Apache Kafka 结合使用,以实现高效的数据流处理。
96 0
|
5月前
|
SQL 分布式计算 Apache
Apache Doris + Apache Hudi 快速搭建指南|Lakehouse 使用手册(一)
本文将在 Docker 环境下,为读者介绍如何快速搭建 Apache Doris + Apache Hudi 的测试及演示环境,并对各功能操作进行演示,帮助读者快速入门。
Apache Doris + Apache Hudi 快速搭建指南|Lakehouse 使用手册(一)

推荐镜像

更多