Apache Hudi + Flink作业运行指南

简介: Apache Hudi + Flink作业运行指南

近日Apache Hudi社区合并了Flink引擎的基础实现(HUDI-1327),这意味着 Hudi 开始支持 Flink 引擎。有很多小伙伴在交流群里咨询 Hudi on Flink 的使用姿势,三言两语不好描述,不如实操演示一把,于是有了这篇文章。

当前 Flink 版本的Hudi还只支持读取 Kafka 数据,Sink到 COW(COPY_ON_WRITE) 类型的 Hudi 表中,其他功能还在继续完善中。

这里我们简要介绍下如何从 Kafka 读取数据写出到Hudi表。

1. 打包

由于还没有正式发布, 我们需要到Github下载源码自行打包。

git clone https://github.com/apache/hudi.git && cd hudimvn clean package -DskipTests

Windows 系统用户打包时会报如下错误:

[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.6.0:exec (Setup HUDI_WS) on project hudi-integ-test: Command execution failed. Cannot run program "\bin\bash" (in directory "D:\github\hudi\hudi-integ-test"): CreateProcess error=2, 系统找不到指定的文件。 -> [Help 1][ERROR][ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.[ERROR] Re-run Maven using the -X switch to enable full debug logging.[ERROR][ERROR] For more information about the errors and possible solutions, please read the following articles:[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException[ERROR][ERROR] After correcting the problems, you can resume the build with the command[ERROR]   mvn <goals> -rf :hudi-integ-test

这是 hudi-integ-test 模块的一个bash脚本无法执行导致的错误,我们可以把它注释掉。

修改D:\github\hudi\pom.xml根pom文件

<modules>    <module>hudi-common</module>    <module>hudi-cli</module>    <module>hudi-client</module>    <module>hudi-hadoop-mr</module>    <module>hudi-spark</module>    <module>hudi-timeline-service</module>    <module>hudi-utilities</module>    <module>hudi-sync</module>    <module>packaging/hudi-hadoop-mr-bundle</module>    <module>packaging/hudi-hive-sync-bundle</module>    <module>packaging/hudi-spark-bundle</module>    <module>packaging/hudi-presto-bundle</module>    <module>packaging/hudi-utilities-bundle</module>    <module>packaging/hudi-timeline-server-bundle</module>    <module>docker/hoodie/hadoop</module><!--    <module>hudi-integ-test</module>--><!--    <module>packaging/hudi-integ-test-bundle</module>-->    <module>hudi-examples</module>    <module>hudi-flink</module>    <module>packaging/hudi-flink-bundle</module>  </modules>

再次执行 mvn clean package -DskipTests, 执行成功后,找到这个jar : D:\github\hudi\packaging\hudi-flink-bundle\target\hudi-flink-bundle_2.11-0.6.1-SNAPSHOT.jar (笔者Hudi源码在D:\github\ 路径下,大家根据自己实际路径找一下)

这个 hudi-flink-bundle_2.11-0.6.1-SNAPSHOT.jar 就是我们需要使用的flink客户端,类似于原版的 hudi-utilities-bundle_2.11-x.x.x.jar

2. 入参介绍

有几个必传的参数介绍下:

--kafka-topic :Kafka 主题--kafka-group-id :消费组--kafka-bootstrap-servers : Kafka brokers--target-base-path : Hudi 表基本路径--target-table :Hudi 表名--table-type :Hudi 表类型--props : 任务配置

其他参数可以参考 org.apache.hudi.HoodieFlinkStreamer.Config,里面每个参数都有介绍 。

3. 启动准备清单

1.Kafka 主题,消费组2.jar上传到服务器3.schema 文件4.Hudi任务配置文件

注意根据自己的配置把配置文件放到合适的地方,笔者的 hudi-conf.properties和schem.avsc文件均上传在HDFS。

-rw-r--r-- 1 user user      592 Nov 19 09:32 hudi-conf.properties-rw-r--r-- 1 user user 39086937 Nov 30 15:51 hudi-flink-bundle_2.11-0.6.1-SNAPSHOT.jar-rw-r--r-- 1 user user 1410 Nov 17 17:52 schema.avsc

hudi-conf.properties内容如下

hoodie.datasource.write.recordkey.field=uuidhoodie.datasource.write.partitionpath.field=tsbootstrap.servers=xxx:9092hoodie.deltastreamer.keygen.timebased.timestamp.type=EPOCHMILLISECONDShoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/ddhoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedAvroKeyGeneratorhoodie.embed.timeline.server=falsehoodie.deltastreamer.schemaprovider.source.schema.file=hdfs://olap/hudi/test/config/flink/schema.avschoodie.deltastreamer.schemaprovider.target.schema.file=hdfs://olap/hudi/test/config/flink/schema.avsc

schema.avsc内容如下

{  "type":"record",  "name":"stock_ticks",  "fields":[{     "name": "uuid",     "type": "string"  }, {     "name": "ts",     "type": "long"  }, {     "name": "symbol",     "type": "string"  },{     "name": "year",     "type": "int"  },{     "name": "month",     "type": "int"  },{     "name": "high",     "type": "double"  },{     "name": "low",     "type": "double"  },{     "name": "key",     "type": "string"  },{     "name": "close",     "type": "double"  }, {     "name": "open",     "type": "double"  }, {     "name": "day",     "type":"string"  }]}

4. 启动任务

/opt/flink-1.11.2/bin/flink run -c org.apache.hudi.HoodieFlinkStreamer -m yarn-cluster -d -yjm 1024 -ytm 1024 -p 4 -ys 3 -ynm hudi_on_flink_test hudi-flink-bundle_2.11-0.6.1-SNAPSHOT.jar --kafka-topic hudi_test_flink --kafka-group-id hudi_on_flink --kafka-bootstrap-servers xxx:9092 --table-type COPY_ON_WRITE --target-base-path hdfs://olap/hudi/test/data/hudi_on_flink --target-table hudi_on_flink  --props hdfs://olap/hudi/test/config/flink/hudi-conf.properties --checkpoint-interval 3000 --flink-checkpoint-path hdfs://olap/hudi/hudi_on_flink_cp

查看监控页面,任务已经跑起来了

现在在Hdfs路径下已经创建了一个空表(Hudi自动创建)

我们向 topic 中发数据(发了 900 条,本地写的 Producer 就不贴代码了)

我们查一下结果:

@Test  public void query() {    spark.read().format("hudi")        .load(basePath + "/*/*/*/*")        .createOrReplaceTempView("tmp_view");    spark.sql("select * from tmp_view limit 2").show();    spark.sql("select count(1) from tmp_view").show();  }
+-------------------+--------------------+--------------------+----------------------+--------------------+--------------------+-------------+--------------------+----+-----+-------------------+------------------+------+------------------+-------------------+---+|_hoodie_commit_time|_hoodie_commit_seqno|  _hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|                uuid|           ts|              symbol|year|month|               high|               low|   key|             close|               open|day|+-------------------+--------------------+--------------------+----------------------+--------------------+--------------------+-------------+--------------------+----+-----+-------------------+------------------+------+------------------+-------------------+---+|     20201130162542| 20201130162542_0_20|01e11b9c-012a-461...|            2020/10/29|c8f3a30a-0523-4c8...|01e11b9c-012a-461...|1603947341061|12a-4614-89c3-f62...| 120|   10|0.45757580489415417|0.0816472025173598|01e11b|0.5795817262998396|0.15864898816336837|  1||     20201130162542| 20201130162542_0_21|22e96b41-344a-4be...|            2020/10/29|c8f3a30a-0523-4c8...|22e96b41-344a-4be...|1603921161580|44a-4be2-8454-832...| 120|   10| 0.6200960168557579| 0.946080636091312|22e96b|0.6138608980526853| 0.5445994550724997|  1|+-------------------+--------------------+--------------------+----------------------+--------------------+--------------------+-------------+--------------------+----+-----+-------------------+------------------+------+------------------+-------------------+---+
+--------+|count(1)|+--------+|     900|+--------+

5. 总结

本文简要介绍了使用 Flink 引擎将数据写出到Hudi表的过程。主要包括自主打可执行jar、启动参数介绍、Schema配置、Hudi任务参数配置等步骤

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
目录
相关文章
|
5月前
|
人工智能 数据处理 API
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
Apache Flink Agents 是由阿里云、Ververica、Confluent 与 LinkedIn 联合推出的开源子项目,旨在基于 Flink 构建可扩展、事件驱动的生产级 AI 智能体框架,实现数据与智能的实时融合。
935 6
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
存储 数据管理 物联网
491 0
存储 SQL 分布式计算
310 0
|
6月前
|
人工智能 运维 Java
Flink Agents:基于Apache Flink的事件驱动AI智能体框架
本文基于Apache Flink PMC成员宋辛童在Community Over Code Asia 2025的演讲,深入解析Flink Agents项目的技术背景、架构设计与应用场景。该项目聚焦事件驱动型AI智能体,结合Flink的实时处理能力,推动AI在工业场景中的工程化落地,涵盖智能运维、直播分析等典型应用,展现其在AI发展第四层次——智能体AI中的重要意义。
2160 27
Flink Agents:基于Apache Flink的事件驱动AI智能体框架
|
存储 Cloud Native 数据处理
从嵌入式状态管理到云原生架构:Apache Flink 的演进与下一代增量计算范式
本文整理自阿里云资深技术专家、Apache Flink PMC 成员梅源在 Flink Forward Asia 新加坡 2025上的分享,深入解析 Flink 状态管理系统的发展历程,从核心设计到 Flink 2.0 存算分离架构,并展望未来基于流批一体的通用增量计算方向。
483 0
从嵌入式状态管理到云原生架构:Apache Flink 的演进与下一代增量计算范式
|
7月前
|
消息中间件 存储 Kafka
Apache Flink错误处理实战手册:2年生产环境调试经验总结
本文由 Ververica 客户成功经理 Naci Simsek 撰写,基于其在多个行业 Flink 项目中的实战经验,总结了 Apache Flink 生产环境中常见的三大典型问题及其解决方案。内容涵盖 Kafka 连接器迁移导致的状态管理问题、任务槽负载不均问题以及 Kryo 序列化引发的性能陷阱,旨在帮助企业开发者避免常见误区,提升实时流处理系统的稳定性与性能。
634 0
Apache Flink错误处理实战手册:2年生产环境调试经验总结
|
7月前
|
存储 人工智能 数据处理
对话王峰:Apache Flink 在 AI 时代的“剑锋”所向
Flink 2.0 架构升级实现存算分离,迈向彻底云原生化,支持更大规模状态管理、提升资源效率、增强容灾能力。通过流批一体与 AI 场景融合,推动实时计算向智能化演进。生态项目如 Paimon、Fluss 和 Flink CDC 构建湖流一体架构,实现分钟级时效性与低成本平衡。未来,Flink 将深化 AI Agents 框架,引领事件驱动的智能数据处理新方向。
753 6
|
7月前
|
SQL 人工智能 API
Apache Flink 2.1.0: 面向实时 Data + AI 全面升级,开启智能流处理新纪元
Apache Flink 2.1.0 正式发布,标志着实时数据处理引擎向统一 Data + AI 平台迈进。新版本强化了实时 AI 能力,支持通过 Flink SQL 和 Table API 创建及调用 AI 模型,新增 Model DDL、ML_PREDICT 表值函数等功能,实现端到端的实时 AI 工作流。同时增强了 Flink SQL 的流处理能力,引入 Process Table Functions(PTFs)、Variant 数据类型,优化流式 Join 及状态管理,显著提升作业稳定性与资源利用率。
761 0
|
7月前
|
SQL 人工智能 数据挖掘
Apache Flink:从实时数据分析到实时AI
Apache Flink 是实时数据处理领域的核心技术,历经十年发展,已从学术项目成长为实时计算的事实标准。它在现代数据架构中发挥着关键作用,支持实时数据分析、湖仓集成及实时 AI 应用。随着 Flink 2.0 的发布,其在流式湖仓、AI 驱动决策等方面展现出强大潜力,正推动企业迈向智能化、实时化的新阶段。
868 9
Apache Flink:从实时数据分析到实时AI

热门文章

最新文章

推荐镜像

更多