Apache Flink 1.12.2集成Hudi 0.9.0运行指南

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink 1.12.2集成Hudi 0.9.0运行指南

1. 准备工作

1. 编译包下载

下载Flink 1.12.2包:https://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.12.2/flink-1.12.2-bin-scala_2.11.tgzHudi编译:https://github.com/apache/hudigit clone https://github.com/apache/hudi.git && cd hudimvn clean package -DskipTests注意:默认是用scala-2.11编译的 如果我们用的是flink1.12.2-2.12版本,可以自己编译成scala-2.12版本的 mvn clean package -DskipTests -Dscala-2.12 包的路径在packaging/hudi-spark-bundle/target/hudi-spark-bundle_2.12-..*-SNAPSHOT.jar上述包打好后其他步骤可参考官网步骤:https://hudi.apache.org/docs/flink-quick-start-guide.html(注意:官网使用的是Flink 1.11.x版本,测试时报如下错误

建议使用Flink1.12.2 + Hudi 0.9.0-SNAPSHOT(master)版本。


2. Batch写

2.1 环境启动

启动flink-sql客户端,提前把hudi-flink-bundle_2.12-0.9.0-SNAPSHOT.jar(笔者使用flink scala2.12版本,如果是scala2.11版本需要编译成hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar)拷贝到 $FLINK_HOME/lib目录下

export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`./bin/sql-client.sh embedded

2.2 创建表结构

CREATE TABLE t1(  uuid VARCHAR(20),  name VARCHAR(10),  age INT,  ts TIMESTAMP(3),`partition` VARCHAR(20))PARTITIONED BY (`partition`)WITH ('connector'= 'hudi','path'= 'hdfs://localhost:9000/hudi/t1','table.type'= 'MERGE_ON_READ');

2.3 插入数据

INSERT INTO t1 VALUES('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');

2.4 查询数据

设置查询模式为tableau

-- sets up the result mode to tableau to show the results directly in the CLIset execution.result-mode=tableau;

2.5 更新数据

INSERT INTO t1 VALUES ('id1','Danny',24,TIMESTAMP '1970-01-01 00:00:01','par1');

id1的数据age由23变为了24

3. Streaming读

3.1 创建表结构

CREATE TABLE t1(  uuid VARCHAR(20),  name VARCHAR(10),  age INT,  ts TIMESTAMP(3),`partition` VARCHAR(20))PARTITIONED BY (`partition`)WITH ('connector'= 'hudi','path'= 'hdfs://localhost:9000/hudi/t1','table.type'= 'MERGE_ON_READ','read.streaming.enabled'= 'true',  'read.streaming.start-commit'= '20210401134557','read.streaming.check-interval'= '4');

说明:这里将 read.streaming.enabled 设置为 true,表明通过 streaming 的方式读取表数据; read.streaming.check-interval 指定了 source 监控新的 commits 的间隔为 4s; table.type 设置表类型为 MERGE_ON_READ

3.2 查询数据

流表t1表中的数据就是刚刚批模式写入的数据

3.3 插入数据

使用批模式插入一条数据

insert into t1 values ('id9','test',27,TIMESTAMP '1970-01-01 00:00:01','par5');

3.4 查询数据

几秒后在流表中可以读取到一条新增的数据(前面插入的一条数据)

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
12天前
|
存储 人工智能 大数据
The Past, Present and Future of Apache Flink
本文整理自阿里云开源大数据负责人王峰(莫问)在 Flink Forward Asia 2024 上海站主论坛开场的分享,今年正值 Flink 开源项目诞生的第 10 周年,借此时机,王峰回顾了 Flink 在过去 10 年的发展历程以及 Flink社区当前最新的技术成果,最后展望下一个十年 Flink 路向何方。
293 33
The Past, Present and Future of Apache Flink
|
1月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
141 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
2月前
|
SQL Java API
Apache Flink 2.0-preview released
Apache Flink 社区正积极筹备 Flink 2.0 的发布,这是自 Flink 1.0 发布以来的首个重大更新。Flink 2.0 将引入多项激动人心的功能和改进,包括存算分离状态管理、物化表、批作业自适应执行等,同时也包含了一些不兼容的变更。目前提供的预览版旨在让用户提前尝试新功能并收集反馈,但不建议在生产环境中使用。
848 13
Apache Flink 2.0-preview released
|
2月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
90 3
|
25天前
|
消息中间件 Java Kafka
什么是Apache Kafka?如何将其与Spring Boot集成?
什么是Apache Kafka?如何将其与Spring Boot集成?
62 5
|
27天前
|
消息中间件 Java Kafka
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
43 1
|
1月前
|
数据库连接 PHP Apache
PHP在Apache中如何运行?
PHP在Apache中如何运行?
43 5
|
1月前
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
72 1
|
2月前
|
分布式计算 监控 大数据
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
81 1
|
2月前
|
Java 测试技术 API
如何在 Apache JMeter 中集成 Elastic APM
如何在 Apache JMeter 中集成 Elastic APM
49 1

热门文章

最新文章

推荐镜像

更多