Apache Flink 1.12.2集成Hudi 0.9.0运行指南

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink 1.12.2集成Hudi 0.9.0运行指南

1. 准备工作

1. 编译包下载

下载Flink 1.12.2包:https://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.12.2/flink-1.12.2-bin-scala_2.11.tgzHudi编译:https://github.com/apache/hudigit clone https://github.com/apache/hudi.git && cd hudimvn clean package -DskipTests注意:默认是用scala-2.11编译的 如果我们用的是flink1.12.2-2.12版本,可以自己编译成scala-2.12版本的 mvn clean package -DskipTests -Dscala-2.12 包的路径在packaging/hudi-spark-bundle/target/hudi-spark-bundle_2.12-..*-SNAPSHOT.jar上述包打好后其他步骤可参考官网步骤:https://hudi.apache.org/docs/flink-quick-start-guide.html(注意:官网使用的是Flink 1.11.x版本,测试时报如下错误

建议使用Flink1.12.2 + Hudi 0.9.0-SNAPSHOT(master)版本。


2. Batch写

2.1 环境启动

启动flink-sql客户端,提前把hudi-flink-bundle_2.12-0.9.0-SNAPSHOT.jar(笔者使用flink scala2.12版本,如果是scala2.11版本需要编译成hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar)拷贝到 $FLINK_HOME/lib目录下

export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`./bin/sql-client.sh embedded

2.2 创建表结构

CREATE TABLE t1(  uuid VARCHAR(20),  name VARCHAR(10),  age INT,  ts TIMESTAMP(3),`partition` VARCHAR(20))PARTITIONED BY (`partition`)WITH ('connector'= 'hudi','path'= 'hdfs://localhost:9000/hudi/t1','table.type'= 'MERGE_ON_READ');

2.3 插入数据

INSERT INTO t1 VALUES('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');

2.4 查询数据

设置查询模式为tableau

-- sets up the result mode to tableau to show the results directly in the CLIset execution.result-mode=tableau;

2.5 更新数据

INSERT INTO t1 VALUES ('id1','Danny',24,TIMESTAMP '1970-01-01 00:00:01','par1');

id1的数据age由23变为了24

3. Streaming读

3.1 创建表结构

CREATE TABLE t1(  uuid VARCHAR(20),  name VARCHAR(10),  age INT,  ts TIMESTAMP(3),`partition` VARCHAR(20))PARTITIONED BY (`partition`)WITH ('connector'= 'hudi','path'= 'hdfs://localhost:9000/hudi/t1','table.type'= 'MERGE_ON_READ','read.streaming.enabled'= 'true',  'read.streaming.start-commit'= '20210401134557','read.streaming.check-interval'= '4');

说明:这里将 read.streaming.enabled 设置为 true,表明通过 streaming 的方式读取表数据; read.streaming.check-interval 指定了 source 监控新的 commits 的间隔为 4s; table.type 设置表类型为 MERGE_ON_READ

3.2 查询数据

流表t1表中的数据就是刚刚批模式写入的数据

3.3 插入数据

使用批模式插入一条数据

insert into t1 values ('id9','test',27,TIMESTAMP '1970-01-01 00:00:01','par5');

3.4 查询数据

几秒后在流表中可以读取到一条新增的数据(前面插入的一条数据)

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
16天前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
48 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
7天前
|
数据库连接 PHP Apache
PHP在Apache中如何运行?
PHP在Apache中如何运行?
15 5
|
17天前
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
48 1
|
2月前
|
算法 API Apache
Flink CDC:新一代实时数据集成框架
本文源自阿里云实时计算团队 Apache Flink Committer 任庆盛在 Apache Asia CommunityOverCode 2024 的分享,涵盖 Flink CDC 的概念、版本历程、内部实现及社区未来规划。Flink CDC 是一种基于数据库日志的 CDC 技术实现的数据集成框架,能高效完成全量和增量数据的实时同步。自 2020 年以来,Flink CDC 经过多次迭代,已成为功能强大的实时数据集成工具,支持多种数据库和数据湖仓系统。未来将进一步扩展生态并提升稳定性。
582 1
Flink CDC:新一代实时数据集成框架
|
1月前
|
消息中间件 监控 Java
大数据-109 Flink 体系结构 运行架构 ResourceManager JobManager 组件关系与原理剖析
大数据-109 Flink 体系结构 运行架构 ResourceManager JobManager 组件关系与原理剖析
64 1
|
1月前
|
Java 测试技术 API
如何在 Apache JMeter 中集成 Elastic APM
如何在 Apache JMeter 中集成 Elastic APM
39 1
|
3月前
|
存储 缓存 Java
实时计算 Flink版操作报错合集之怎么处理在运行作业时遇到报错::ClassCastException
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
4月前
|
SQL JSON 缓存
玳数科技集成 Flink CDC 3.0 的实践
本文投稿自玳数科技工程师杨槐老师,介绍了 Flink CDC 3.0 与 ChunJun 框架在玳数科技的集成实践。
590 7
玳数科技集成 Flink CDC 3.0 的实践
|
3月前
|
消息中间件 监控 关系型数据库
实时计算 Flink版产品使用问题之运行后,怎么进行监控和报警
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
消息中间件 Kafka 数据处理
实时数据流处理:Dask Streams 与 Apache Kafka 集成
【8月更文第29天】在现代数据处理领域,实时数据流处理已经成为不可或缺的一部分。随着物联网设备、社交媒体和其他实时数据源的普及,处理这些高吞吐量的数据流成为了一项挑战。Apache Kafka 作为一种高吞吐量的消息队列服务,被广泛应用于实时数据流处理场景中。Dask Streams 是 Dask 库的一个子模块,它为 Python 开发者提供了一个易于使用的实时数据流处理框架。本文将介绍如何将 Dask Streams 与 Apache Kafka 结合使用,以实现高效的数据流处理。
77 0

推荐镜像

更多