Apache Flink 1.12.2集成Hudi 0.9.0运行指南

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: Apache Flink 1.12.2集成Hudi 0.9.0运行指南

1. 准备工作

1. 编译包下载

下载Flink 1.12.2包:https://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.12.2/flink-1.12.2-bin-scala_2.11.tgzHudi编译:https://github.com/apache/hudigit clone https://github.com/apache/hudi.git && cd hudimvn clean package -DskipTests注意:默认是用scala-2.11编译的 如果我们用的是flink1.12.2-2.12版本,可以自己编译成scala-2.12版本的 mvn clean package -DskipTests -Dscala-2.12 包的路径在packaging/hudi-spark-bundle/target/hudi-spark-bundle_2.12-..*-SNAPSHOT.jar上述包打好后其他步骤可参考官网步骤:https://hudi.apache.org/docs/flink-quick-start-guide.html(注意:官网使用的是Flink 1.11.x版本,测试时报如下错误

建议使用Flink1.12.2 + Hudi 0.9.0-SNAPSHOT(master)版本。


2. Batch写

2.1 环境启动

启动flink-sql客户端,提前把hudi-flink-bundle_2.12-0.9.0-SNAPSHOT.jar(笔者使用flink scala2.12版本,如果是scala2.11版本需要编译成hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar)拷贝到 $FLINK_HOME/lib目录下

export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`./bin/sql-client.sh embedded

2.2 创建表结构

CREATE TABLE t1(  uuid VARCHAR(20),  name VARCHAR(10),  age INT,  ts TIMESTAMP(3),`partition` VARCHAR(20))PARTITIONED BY (`partition`)WITH ('connector'= 'hudi','path'= 'hdfs://localhost:9000/hudi/t1','table.type'= 'MERGE_ON_READ');

2.3 插入数据

INSERT INTO t1 VALUES('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');

2.4 查询数据

设置查询模式为tableau

-- sets up the result mode to tableau to show the results directly in the CLIset execution.result-mode=tableau;

2.5 更新数据

INSERT INTO t1 VALUES ('id1','Danny',24,TIMESTAMP '1970-01-01 00:00:01','par1');

id1的数据age由23变为了24

3. Streaming读

3.1 创建表结构

CREATE TABLE t1(  uuid VARCHAR(20),  name VARCHAR(10),  age INT,  ts TIMESTAMP(3),`partition` VARCHAR(20))PARTITIONED BY (`partition`)WITH ('connector'= 'hudi','path'= 'hdfs://localhost:9000/hudi/t1','table.type'= 'MERGE_ON_READ','read.streaming.enabled'= 'true',  'read.streaming.start-commit'= '20210401134557','read.streaming.check-interval'= '4');

说明:这里将 read.streaming.enabled 设置为 true,表明通过 streaming 的方式读取表数据; read.streaming.check-interval 指定了 source 监控新的 commits 的间隔为 4s; table.type 设置表类型为 MERGE_ON_READ

3.2 查询数据

流表t1表中的数据就是刚刚批模式写入的数据

3.3 插入数据

使用批模式插入一条数据

insert into t1 values ('id9','test',27,TIMESTAMP '1970-01-01 00:00:01','par5');

3.4 查询数据

几秒后在流表中可以读取到一条新增的数据(前面插入的一条数据)

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
目录
相关文章
|
29天前
|
人工智能 运维 Java
Flink Agents:基于Apache Flink的事件驱动AI智能体框架
本文基于Apache Flink PMC成员宋辛童在Community Over Code Asia 2025的演讲,深入解析Flink Agents项目的技术背景、架构设计与应用场景。该项目聚焦事件驱动型AI智能体,结合Flink的实时处理能力,推动AI在工业场景中的工程化落地,涵盖智能运维、直播分析等典型应用,展现其在AI发展第四层次——智能体AI中的重要意义。
363 27
Flink Agents:基于Apache Flink的事件驱动AI智能体框架
|
7月前
|
消息中间件 关系型数据库 MySQL
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
680 0
|
7月前
|
SQL 存储 人工智能
Apache Flink 2.0.0: 实时数据处理的新纪元
Apache Flink 2.0.0 正式发布!这是自 Flink 1.0 发布九年以来的首次重大更新,凝聚了社区两年的努力。此版本引入分离式状态管理、物化表、流批统一等创新功能,优化云原生环境下的资源利用与性能表现,并强化了对人工智能工作流的支持。同时,Flink 2.0 对 API 和配置进行了全面清理,移除了过时组件,为未来的发展奠定了坚实基础。感谢 165 位贡献者的辛勤付出,共同推动实时计算进入新纪元!
829 1
Apache Flink 2.0.0: 实时数据处理的新纪元
|
8月前
|
Java 关系型数据库 MySQL
SpringBoot 通过集成 Flink CDC 来实时追踪 MySql 数据变动
通过详细的步骤和示例代码,您可以在 SpringBoot 项目中成功集成 Flink CDC,并实时追踪 MySQL 数据库的变动。
1847 45
|
8月前
|
消息中间件 关系型数据库 MySQL
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
本教程展示如何使用Flink CDC YAML快速构建从MySQL到Kafka的流式数据集成作业,涵盖整库同步和表结构变更同步。无需编写Java/Scala代码或安装IDE,所有操作在Flink CDC CLI中完成。首先准备Flink Standalone集群和Docker环境(包括MySQL、Kafka和Zookeeper),然后通过配置YAML文件提交任务,实现数据同步。教程还介绍了路由变更、写入多个分区、输出格式设置及上游表名到下游Topic的映射等功能,并提供详细的命令和示例。最后,包含环境清理步骤以确保资源释放。
600 2
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
|
8月前
|
SQL 人工智能 关系型数据库
Flink CDC YAML:面向数据集成的 API 设计
本文整理自阿里云智能集团 Flink PMC Member & Committer 徐榜江(雪尽)在 FFA 2024 分论坛的分享,涵盖四大主题:Flink CDC、YAML API、Transform + AI 和 Community。文章详细介绍了 Flink CDC 的发展历程及其优势,特别是 YAML API 的设计与实现,以及如何通过 Transform 和 AI 模型集成提升数据处理能力。最后,分享了社区动态和未来规划,欢迎更多开发者加入开源社区,共同推动 Flink CDC 的发展。
643 12
Flink CDC YAML:面向数据集成的 API 设计
|
7月前
|
SQL 弹性计算 DataWorks
Flink CDC 在阿里云 DataWorks 数据集成入湖场景的应用实践
Flink CDC 在阿里云 DataWorks 数据集成入湖场景的应用实践
295 6
|
7月前
|
SQL 人工智能 关系型数据库
Flink CDC YAML:面向数据集成的 API 设计
Flink CDC YAML:面向数据集成的 API 设计
215 5
|
10月前
|
存储 SQL 人工智能
Apache Flink 2.0:Streaming into the Future
本文整理自阿里云智能高级技术专家宋辛童、资深技术专家梅源和高级技术专家李麟在 Flink Forward Asia 2024 主会场的分享。三位专家详细介绍了 Flink 2.0 的四大技术方向:Streaming、Stream-Batch Unification、Streaming Lakehouse 和 AI。主要内容包括 Flink 2.0 的存算分离云原生化、流批一体的 Materialized Table、Flink 与 Paimon 的深度集成,以及 Flink 在 AI 领域的应用。
1297 13
Apache Flink 2.0:Streaming into the Future

热门文章

最新文章

推荐镜像

更多