Apache Hudi + Flink作业运行指南

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云原生数据仓库AnalyticDB MySQL版,基础版 8ACU 100GB 1个月
简介: Apache Hudi + Flink作业运行指南

近日Apache Hudi社区合并了Flink引擎的基础实现(HUDI-1327),这意味着 Hudi 开始支持 Flink 引擎。有很多小伙伴在交流群里咨询 Hudi on Flink 的使用姿势,三言两语不好描述,不如实操演示一把,于是有了这篇文章。

当前 Flink 版本的Hudi还只支持读取 Kafka 数据,Sink到 COW(COPY_ON_WRITE) 类型的 Hudi 表中,其他功能还在继续完善中。

这里我们简要介绍下如何从 Kafka 读取数据写出到Hudi表。

1. 打包

由于还没有正式发布, 我们需要到Github下载源码自行打包。

git clone https://github.com/apache/hudi.git && cd hudimvn clean package -DskipTests

Windows 系统用户打包时会报如下错误:

[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.6.0:exec (Setup HUDI_WS) on project hudi-integ-test: Command execution failed. Cannot run program "\bin\bash" (in directory "D:\github\hudi\hudi-integ-test"): CreateProcess error=2, 系统找不到指定的文件。 -> [Help 1][ERROR][ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.[ERROR] Re-run Maven using the -X switch to enable full debug logging.[ERROR][ERROR] For more information about the errors and possible solutions, please read the following articles:[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException[ERROR][ERROR] After correcting the problems, you can resume the build with the command[ERROR]   mvn <goals> -rf :hudi-integ-test

这是 hudi-integ-test 模块的一个bash脚本无法执行导致的错误,我们可以把它注释掉。

修改D:\github\hudi\pom.xml根pom文件

<modules>    <module>hudi-common</module>    <module>hudi-cli</module>    <module>hudi-client</module>    <module>hudi-hadoop-mr</module>    <module>hudi-spark</module>    <module>hudi-timeline-service</module>    <module>hudi-utilities</module>    <module>hudi-sync</module>    <module>packaging/hudi-hadoop-mr-bundle</module>    <module>packaging/hudi-hive-sync-bundle</module>    <module>packaging/hudi-spark-bundle</module>    <module>packaging/hudi-presto-bundle</module>    <module>packaging/hudi-utilities-bundle</module>    <module>packaging/hudi-timeline-server-bundle</module>    <module>docker/hoodie/hadoop</module><!--    <module>hudi-integ-test</module>--><!--    <module>packaging/hudi-integ-test-bundle</module>-->    <module>hudi-examples</module>    <module>hudi-flink</module>    <module>packaging/hudi-flink-bundle</module>  </modules>

再次执行 mvn clean package -DskipTests, 执行成功后,找到这个jar : D:\github\hudi\packaging\hudi-flink-bundle\target\hudi-flink-bundle_2.11-0.6.1-SNAPSHOT.jar (笔者Hudi源码在D:\github\ 路径下,大家根据自己实际路径找一下)

这个 hudi-flink-bundle_2.11-0.6.1-SNAPSHOT.jar 就是我们需要使用的flink客户端,类似于原版的 hudi-utilities-bundle_2.11-x.x.x.jar

2. 入参介绍

有几个必传的参数介绍下:

--kafka-topic :Kafka 主题--kafka-group-id :消费组--kafka-bootstrap-servers : Kafka brokers--target-base-path : Hudi 表基本路径--target-table :Hudi 表名--table-type :Hudi 表类型--props : 任务配置

其他参数可以参考 org.apache.hudi.HoodieFlinkStreamer.Config,里面每个参数都有介绍 。

3. 启动准备清单

1.Kafka 主题,消费组2.jar上传到服务器3.schema 文件4.Hudi任务配置文件

注意根据自己的配置把配置文件放到合适的地方,笔者的 hudi-conf.properties和schem.avsc文件均上传在HDFS。

-rw-r--r-- 1 user user      592 Nov 19 09:32 hudi-conf.properties-rw-r--r-- 1 user user 39086937 Nov 30 15:51 hudi-flink-bundle_2.11-0.6.1-SNAPSHOT.jar-rw-r--r-- 1 user user 1410 Nov 17 17:52 schema.avsc

hudi-conf.properties内容如下

hoodie.datasource.write.recordkey.field=uuidhoodie.datasource.write.partitionpath.field=tsbootstrap.servers=xxx:9092hoodie.deltastreamer.keygen.timebased.timestamp.type=EPOCHMILLISECONDShoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/ddhoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedAvroKeyGeneratorhoodie.embed.timeline.server=falsehoodie.deltastreamer.schemaprovider.source.schema.file=hdfs://olap/hudi/test/config/flink/schema.avschoodie.deltastreamer.schemaprovider.target.schema.file=hdfs://olap/hudi/test/config/flink/schema.avsc

schema.avsc内容如下

{  "type":"record",  "name":"stock_ticks",  "fields":[{     "name": "uuid",     "type": "string"  }, {     "name": "ts",     "type": "long"  }, {     "name": "symbol",     "type": "string"  },{     "name": "year",     "type": "int"  },{     "name": "month",     "type": "int"  },{     "name": "high",     "type": "double"  },{     "name": "low",     "type": "double"  },{     "name": "key",     "type": "string"  },{     "name": "close",     "type": "double"  }, {     "name": "open",     "type": "double"  }, {     "name": "day",     "type":"string"  }]}

4. 启动任务

/opt/flink-1.11.2/bin/flink run -c org.apache.hudi.HoodieFlinkStreamer -m yarn-cluster -d -yjm 1024 -ytm 1024 -p 4 -ys 3 -ynm hudi_on_flink_test hudi-flink-bundle_2.11-0.6.1-SNAPSHOT.jar --kafka-topic hudi_test_flink --kafka-group-id hudi_on_flink --kafka-bootstrap-servers xxx:9092 --table-type COPY_ON_WRITE --target-base-path hdfs://olap/hudi/test/data/hudi_on_flink --target-table hudi_on_flink  --props hdfs://olap/hudi/test/config/flink/hudi-conf.properties --checkpoint-interval 3000 --flink-checkpoint-path hdfs://olap/hudi/hudi_on_flink_cp

查看监控页面,任务已经跑起来了

现在在Hdfs路径下已经创建了一个空表(Hudi自动创建)

我们向 topic 中发数据(发了 900 条,本地写的 Producer 就不贴代码了)

我们查一下结果:

@Test  public void query() {    spark.read().format("hudi")        .load(basePath + "/*/*/*/*")        .createOrReplaceTempView("tmp_view");    spark.sql("select * from tmp_view limit 2").show();    spark.sql("select count(1) from tmp_view").show();  }
+-------------------+--------------------+--------------------+----------------------+--------------------+--------------------+-------------+--------------------+----+-----+-------------------+------------------+------+------------------+-------------------+---+|_hoodie_commit_time|_hoodie_commit_seqno|  _hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|                uuid|           ts|              symbol|year|month|               high|               low|   key|             close|               open|day|+-------------------+--------------------+--------------------+----------------------+--------------------+--------------------+-------------+--------------------+----+-----+-------------------+------------------+------+------------------+-------------------+---+|     20201130162542| 20201130162542_0_20|01e11b9c-012a-461...|            2020/10/29|c8f3a30a-0523-4c8...|01e11b9c-012a-461...|1603947341061|12a-4614-89c3-f62...| 120|   10|0.45757580489415417|0.0816472025173598|01e11b|0.5795817262998396|0.15864898816336837|  1||     20201130162542| 20201130162542_0_21|22e96b41-344a-4be...|            2020/10/29|c8f3a30a-0523-4c8...|22e96b41-344a-4be...|1603921161580|44a-4be2-8454-832...| 120|   10| 0.6200960168557579| 0.946080636091312|22e96b|0.6138608980526853| 0.5445994550724997|  1|+-------------------+--------------------+--------------------+----------------------+--------------------+--------------------+-------------+--------------------+----+-----+-------------------+------------------+------+------------------+-------------------+---+
+--------+|count(1)|+--------+|     900|+--------+

5. 总结

本文简要介绍了使用 Flink 引擎将数据写出到Hudi表的过程。主要包括自主打可执行jar、启动参数介绍、Schema配置、Hudi任务参数配置等步骤

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
30天前
|
SQL Java API
Apache Flink 2.0-preview released
Apache Flink 社区正积极筹备 Flink 2.0 的发布,这是自 Flink 1.0 发布以来的首个重大更新。Flink 2.0 将引入多项激动人心的功能和改进,包括存算分离状态管理、物化表、批作业自适应执行等,同时也包含了一些不兼容的变更。目前提供的预览版旨在让用户提前尝试新功能并收集反馈,但不建议在生产环境中使用。
589 13
Apache Flink 2.0-preview released
|
1月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
68 3
|
1月前
|
分布式计算 监控 大数据
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
57 1
|
1月前
|
数据挖掘 物联网 数据处理
深入探讨Apache Flink:实时数据流处理的强大框架
在数据驱动时代,企业需高效处理实时数据流。Apache Flink作为开源流处理框架,以其高性能和灵活性成为首选平台。本文详细介绍Flink的核心特性和应用场景,包括实时流处理、强大的状态管理、灵活的窗口机制及批处理兼容性。无论在实时数据分析、金融服务、物联网还是广告技术领域,Flink均展现出巨大潜力,是企业实时数据处理的理想选择。随着大数据需求增长,Flink将继续在数据处理领域发挥重要作用。
|
SQL 存储 Java
Hudi on Flink 快速上手指南
本文由阿里巴巴的陈玉兆分享,主要介绍 Flink 集成 Hudi 的最新版本功能以及快速上手实践指南。
Hudi on Flink 快速上手指南
|
2月前
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。
|
4月前
|
存储 监控 大数据
阿里云实时计算Flink在多行业的应用和实践
本文整理自 Flink Forward Asia 2023 中闭门会的分享。主要分享实时计算在各行业的应用实践,对回归实时计算的重点场景进行介绍以及企业如何使用实时计算技术,并且提供一些在技术架构上的参考建议。
821 7
阿里云实时计算Flink在多行业的应用和实践
|
17天前
|
存储 分布式计算 流计算
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
本文介绍了阿里云开源大数据团队在实时计算领域的最新成果——向量化流计算引擎Flash。文章主要内容包括:Apache Flink 成为业界流计算标准、Flash 核心技术解读、性能测试数据以及在阿里巴巴集团的落地效果。Flash 是一款完全兼容 Apache Flink 的新一代流计算引擎,通过向量化技术和 C++ 实现,大幅提升了性能和成本效益。
696 10
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
|
3月前
|
SQL 消息中间件 Kafka
实时计算 Flink版产品使用问题之如何在EMR-Flink的Flink SOL中针对source表单独设置并行度
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
14天前
|
SQL 运维 数据可视化
阿里云实时计算Flink版产品体验测评
阿里云实时计算Flink基于Apache Flink构建,提供一站式实时大数据分析平台,支持端到端亚秒级实时数据分析,适用于实时大屏、实时报表、实时ETL和风控监测等场景,具备高性价比、开发效率、运维管理和企业安全等优势。

热门文章

最新文章

推荐镜像

更多