Flink CDC + Hudi + Hive + Presto构建实时数据湖最佳实践

本文涉及的产品
RDS MySQL DuckDB 分析主实例,基础系列 4核8GB
RDS MySQL DuckDB 分析主实例,集群系列 4核8GB
RDS AI 助手,专业版
简介: Flink CDC + Hudi + Hive + Presto构建实时数据湖最佳实践

1. 测试过程环境版本说明

Flink1.13.1

Scala2.11

CDH6.2.0

Hadoop3.0.0

Hive2.1.1

Hudi0.10(master)

PrestoDB0.256

Mysql5.7

2. 集群服务器基础环境

2.1 Maven和JDK环境版本

2.2 Hadoop 集群环境版本

2.3 HADOOP环境变量配置

export HADOOP_HOME=/opt/cloudera/parcels/CDH/lib/hadoopexport HADOOP_CALSSPATH=`$HADOOP_HOME/bin/hadoop classpath`

3. Hudi编译环境配置

3.1 Maven Home settings.xml配置修改

说明:指定aliyun maven地址(支持CDH cloudera依赖) mirror库

<mirrors><mirror><id>alimaven</id><mirrorOf>central,!cloudera</mirrorOf><name>aliyun maven</name><url>http://maven.aliyun.com/nexus/content/groups/public/</url></mirror></mirrors>

3.2 下载Hudi源码包

git clone https://github.com/apache/hudi.git

Hudi社区建议版本适配

hudi0.9 适配 flink1.12.2

hudi0.10(master) 适配 flink1.13.X ( 说明master分支上版本还未release)

3.3 Hudi 客户端命令行

3.4 修改Hudi集成flink和Hive编译依赖版本配置

hudi-master/packaging/hudi-flink-bundle

pom.xml文件 ( 笔者环境CDH6.2.0 hive2.1.1)

<profile><id>flink-bundle-shade-hive2</id><properties><hive.version>2.1.1-cdh6.2.0</hive.version><flink.bundle.hive.scope>compile</flink.bundle.hive.scope></properties><dependencies><dependency><groupId>${hive.groupid}</groupId><artifactId>hive-service-rpc</artifactId><version>${hive.version}</version><scope>${flink.bundle.hive.scope}</scope></dependency></dependencies></profile>

3.5 编译Hudi 指定Hadoop和Hive版本信息

mvn clean install -DskipTests-Drat.skip=true-Dscala-2.11-Dhadoop.version=3.0.0-Pflink-bundle-shade-hive2

(可加 –e –X 参数查看编译ERROR异常和DEBUG信息)

说明:默认scala2.11、默认不包含hive依赖

首次编译耗时较长 笔者首次编译大概花费 50min+(也和服务器网络有关)

后续编译会快一些 大约15min左右

3.6 Hudi编译异常

修改Hudi master pom.xml 增加 CDH repository地址

3.7 Hudi重新编译

3.8 Hudi编译结果说明

hudi-master/packaging/hudi-flink-bundle/target

hudi-flink-bundle_2.11-0.10.0-SNAPSHOT.jar

说明:hudi-flink-bundle jar 是 flink 用来写入和读取数据

hudi-master/packaging/hudi-hadoop-mr-bundle/target

hudi-hadoop-mr-bundle-0.10.0-SNAPSHOT.jar

说明:hudi-mr-bundle jar 是 hive 需要用来读hudi数据

4. Flink环境配置

版本说明:Flink 1.13.1 scala2.11版本

4.1 FLINK_HOME 下 sql-client-defaults.yaml 配置

4.2 flink-conf.yaml配置修改

# state.backend: filesystemstate.backend: rocksdb# 开启增量checkpointstate.backend.incremental: true# state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpointsstate.checkpoints.dir: hdfs://nameservice/flink/flink-checkpointsclassloader.check-leaked-classloader: falseclassloader.resolve-order: parent-first

4.3 FLINK_HOME lib下添加依赖

flink-sql-connector-mysql-cdc-1.4.0.jarflink-sql-connector-oracle-cdc-2.1-SNAPSHOT.jar.BAK – oracle cdc 依赖 flink-format-changelog-json-1.4.0.jarflink-sql-connector-kafka_2.11-1.13.1.jar--- Hadoop home lib下copy过来hadoop-mapreduce-client-common-3.0.0-cdh6.2.0.jarhadoop-mapreduce-client-core-3.0.0-cdh6.2.0.jarhadoop-mapreduce-client-jobclient-3.0.0-cdh6.2.0.jar--- hudi编译jar copy过来hudi-flink-bundle_2.11-0.10.0-SNAPSHOT.jar

说明:目前oracle cdc jar和mysql cdc jar一起在lib下发现有冲突异常

5 启动flink yarn session服务

5.1 FLINK_HOME shell 命令

$FLINK_HOME/bin/yarn-session.sh -s 2-jm 2048-tm 2048-nm ys-hudi01 -d

5.2 Yarn Web UI

5.3 Flinksql Client 启动命令

$FLINK_HOME/bin/sql-client.sh embedded -j ./lib/hudi-flink-bundle_2.11-0.10.0-SNAPSHOT.jar shell

说明:-j指定hudi-flink 依赖jar

Show table /show catalogs

6. MySQL binlog 开启配置

6.1 创建binlog日志存储路径

mkdir logs

6.2 修改目录属主和group

chown -R mysql:mysql /mysqldata/logs

6.3 修改mysql配置信息

vim /etc/my.cnfserver-id=2log-bin= /mysqldata/logs/mysql-binbinlog_format=rowexpire_logs_days=15binlog_row_image=full

6.4 修改完,重启mysql server

service mysqld restart

6.5 客户端查看binlog日志情况

show master logs;

Mysql 版本:5.7.30

5.6 创建mysql sources 表 DDL

create table users_cdc(   id bigint auto_increment primary key,   name varchar(20) null,   birthday timestamp default CURRENT_TIMESTAMP notnull,   ts timestamp default CURRENT_TIMESTAMP notnull);

7. FlinkCDC sink Hudi测试代码过程

7.1 Flink sql cdc DDL 语句:(具体参数说明可参考flink官网)

CREATE TABLE mysql_users (    id BIGINT PRIMARY KEY NOT ENFORCED ,    name STRING,    birthday TIMESTAMP(3),    ts TIMESTAMP(3)) WITH ('connector'= 'mysql-cdc','hostname'= '127.0.0.1','port'= '3306','username'= '','password'=’’,'server-time-zone'= 'Asia/Shanghai','debezium.snapshot.mode'='initial','database-name'= 'luo','table-name'= 'users_cdc');

7.2 查询mysql cdc 表

Flink SQL> select * from mysql_users;

由于目前MySQL users_cdc表是空,所以flinksql 查询没有数据 只有表结构;

Flink web UI:

7.3 创建一个临时视图,增加分区列 方便后续同步hive分区表

Flink SQL> create view mycdc_v AS SELECT *, DATE_FORMAT(birthday, 'yyyyMMdd') as partition FROM mysql_users;

说明:partition 关键字需要 `` 引起来

查询视图数据也是空结构,但增加了分区字段:

Flink SQL> select * from mycdc_v;

Flink web UI:

7.4 设置checkpoint间隔时间,存储路径已在flink-conf配置设置全局路径

建议:测试环境 可设置秒级别(不能太小),生产环境可设置分钟级别。

Flink SQL> set execution.checkpointing.interval=30sec;

7.5 Flinksql 创建 cdc sink hudi文件,并自动同步hive分区表DDL 语句

CREATE TABLE mysqlcdc_sync_hive01(id bigint ,name string,birthday TIMESTAMP(3),ts TIMESTAMP(3),`partition` VARCHAR(20),primary key(id) not enforced --必须指定uuid 主键)PARTITIONED BY (`partition`)with('connector'='hudi','path'= 'hdfs://nameservice /luo/hudi/mysqlcdc_sync_hive01', 'hoodie.datasource.write.recordkey.field'= 'id'-- 主键, 'write.precombine.field'= 'ts'-- 自动precombine的字段, 'write.tasks'= '1', 'compaction.tasks'= '1', 'write.rate.limit'= '2000'-- 限速, 'table.type'= 'MERGE_ON_READ'-- 默认COPY_ON_WRITE,可选MERGE_ON_READ , 'compaction.async.enabled'= 'true'-- 是否开启异步压缩, 'compaction.trigger.strategy'= 'num_commits'-- 按次数压缩, 'compaction.delta_commits'= '1'-- 默认为5, 'changelog.enabled'= 'true'-- 开启changelog变更, 'read.streaming.enabled'= 'true'-- 开启流读, 'read.streaming.check-interval'= '3'-- 检查间隔,默认60s, 'hive_sync.enable'= 'true'-- 开启自动同步hive, 'hive_sync.mode'= 'hms'-- 自动同步hive模式,默认jdbc模式, 'hive_sync.metastore.uris'= 'thrift://hadoop:9083'-- hive metastore地址-- , 'hive_sync.jdbc_url'= 'jdbc:hive2://hadoop:10000'-- hiveServer地址, 'hive_sync.table'= 'mysqlcdc_sync_hive01'-- hive 新建表名, 'hive_sync.db'= 'luo'-- hive 新建数据库名, 'hive_sync.username'= ''-- HMS 用户名, 'hive_sync.password'= ''-- HMS 密码, 'hive_sync.support_timestamp'= 'true'-- 兼容hive timestamp类型);

说明:Hudi目前支持MOR和COW两种模式

(1) Copy on Write:使用列式存储来存储数据(例如:parquet),通过在写入期间执行同步合并来简单地更新和重现文件

(2) Merge on Read:使用列式存储(parquet)+行式文件(arvo)组合存储数据。更新记录到增量文件中,然后进行同步或异步压缩来生成新版本的列式文件。

COW:Copy on Write (写时复制),快照查询+增量查询

MOR:Merge on Read (读时合并),快照查询+增量查询+读取优化查询(近实时)

使用场景上:

(1)COW适用写少读多的场景 ,MOR 适用写多读少的场景;

(2)MOR适合CDC场景,更新延迟要求较低,COW目前不支持 changelog mode 不适合处理cdc场景;

Flink web UI

7.6 Flink sql mysql cdc数据写入hudi文件数据

Flink SQL> insert into mysqlcdc_sync_hive01 select id,name,birthday,ts,partition from mycdc_v;

Flink web UI DAG图:

7.7 HDFS上Hudi文件目录情况

说明:目前还没写入测试数据,hudi目录只生成一些状态标记文件,还未生成分区目录以及.log 和.parquet数据文件,具体含义可见hudi官方文档。

7.8 Mysql数据源写入测试数据

insert into users_cdc (name) values ('cdc01');

7.9 Flinksql 查询mysql cdc insert数据:

Flink SQL> set execution.result-mode=tableau;

[WARNING] The specified key 'execution.result-mode' is deprecated. Please use 'sql-client.execution.result-mode' instead.

[INFO] Session property has been set.

Flink SQL> select * from mysql_users; -- 查询到一条insert数据

7.10 Flink web UI页面可以看到DAG 各个环节产生一条测试数据

7.11 Flinksql 查询 sink的hudi表数据

Flink SQL> select * from mysqlcdc_sync_hive01; --已查询到一条insert数据

7.12 Hdfs上Hudi文件目录变化情况

7.13 Hive分区表和数据自动同步情况

7.14 查看自动创建hive表结构

hive> show create table mysqlcdc_sync_hive01_ro;

hive> show create table mysqlcdc_sync_hive01_rt;

7.15 查看自动生成的表分区信息

hive> show partitions mysqlcdc_sync_hive01_ro;

hive> show partitions mysqlcdc_sync_hive01_rt;

说明:已自动生产hudi MOR模式的

mysqlcdc_sync_hive01_ro

mysqlcdc_sync_hive01_rt

ro表和rt表区别:

ro 表全称 read oprimized table,对于 MOR 表同步的 xxx_ro 表,只暴露压缩后的 parquet。其查询方式和COW表类似。设置完 hiveInputFormat 之后 和普通的 Hive 表一样查询即可;

rt表示增量视图,主要针对增量查询的rt表;

ro表只能查parquet文件数据, rt表 parquet文件数据和log文件数据都可查;

7.16 Hive访问Hudi数据

说明:需要引入hudi-hadoop-mr-bundle-0.10.0-SNAPSHOT.jar

引入Hudi依赖jar方式:

(1) 引入到 $HIVE_HOME/lib下;

(2) 引入到$HIVE_HOME/auxlib 自定义第三方依赖 修改 hive-site.xml配置文件;

(3) Hive shell命令行引入 Session级别有效;

其中(1)和(3)配置完后需要重启 hive-server服务;

查询Hive 分区表数据:

hive> select * from mysqlcdc_sync_hive01_ro; --已查询到mysq insert的一条数据

hive> select * from mysqlcdc_sync_hive01_rt; --已查询到mysq insert的一条数据

Hive 条件查询:

hive> select name,ts from mysqlcdc_sync_hive01_ro where partition='20211109';

Hive ro表 count查询

hive> select count(1) from mysqlcdc_sync_hive01_ro;


Hive Count异常解决:

引入hudi-hadoop-mr-bundle-0.10.0-SNAPSHOT.jar依赖

hive> add jar hdfs://nameservice /luo/hudi-hadoop-mr-bundle-0.10.0-SNAPSHOT.jar;

hive> set hive.input.format = org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;


hive> select count(1) from mysqlcdc_sync_hive01_ro; --可正常count


Hive rt表 count查询

hive> select count(1) from mysqlcdc_sync_hive01_rt;

说明:rt 表count 还是异常,和Hudi社区人员沟通hudi master目前还没release这块存在bug正在修复中

具体见:https://issues.apache.org/jira/browse/HUDI-2649

7.17 Mysql 数据源写入多条测试数据

insert into users_cdc (name) values ('cdc02');insert into users_cdc (name) values ('cdc03');insert into users_cdc (name) values ('cdc04');insert into users_cdc (name) values ('cdc05');insert into users_cdc (name) values ('cdc06');


Flink web UI DAG中数据链路情况:

7.18 Flinksql中新写入数据查询情况

Yarn web UI application_1626256835287_40351[1]资源使用情况


Hdfs上Hudi文件目录变化情况


Hudi状态文件说明:

(1)requested:表示一个动作已被安排,但尚未启动

(2)inflight:表示当前正在执行操作

(3)completed:表示在时间线上完成了操作

Flink jobmanager log sync hive过程详细日志




7.19 Mysql 数据源更新数据

update users_cdc set name = 'cdc05-bj'where id = 5;


7.20 Flinksql 查询cdc update数据 产生两条binlog数据


说明:flinksql 查询最终只有一条+I有效数据,且数据已更新

Flink web UI DAG接受到两条binlog数据,但最终compact和sink只有一条有效数据


7.21 MySQL 数据源 delete 一条数据:

deletefrom users_cdc where id = 3;


Flink Web UI job DAG中捕获一条新数据:


Flinksql changlog delete数据变化查询

HDFS上Hudi数据文件生成情况



Hudi文件类型说明:

(1)commits: 表示将一批数据原子性写入表中

(2)cleans: 清除表中不在需要的旧版本文件的后台活动

(3)delta_commit:增量提交是指将一批数据原子性写入MergeOnRead类型的表中,其中部分或者所有数据可以写入增量日志中

(4)compaction: 协调hudi中差异数据结构的后台活动,例如:将更新从基于行的日志文件变成列格式。在内部,压缩的表现为时间轴上的特殊提交

(5)rollback:表示提交操作不成功且已经回滚,会删除在写入过程中产生的数据


说明:hudi分区文件以及.log和.parquet文件都已生成

两种文件区别:Hudi会在DFS分布式文件系统上的basepath基本路径下组织成目录结构。每张对应的表都会成多个分区,这些分区是包含该分区的数据文件的文件夹,与hive的目录结构非常相似。在每个分区内,文件被组织成文件组,文件id为唯一标识。每个文件组包含多个切片,其中每个切片包含在某个提交/压缩即时时间生成的基本列文件(parquet文件),以及自生成基本文件以来对基本文件的插入/更新的一组日志文件(*.log)。Hudi采用MVCC设计,其中压缩操作会将日志和基本文件合并成新的文件片,清理操作会将未使用/较旧的文件片删除来回收DFS上的空间。

Flink 任务checkpoint 情况:

设置30s 一次



7.22 Hive shell查询数据update和delete变化情况:

hive> select * from mysqlcdc_sync_hive01_ro;


hive> select * from mysqlcdc_sync_hive01_rt;


7.23 Hudi Client端操作Hudi表

进入Hudi客户端命令行:

hudi-master/hudi-cli/hudi-cli.sh

连接Hudi表,查看表信息

hudi->connect --path hdfs://nameservice1/tmp/luo/hudi/mysqlcdc_sync_hive01


查看Hudi commit信息

hudi:mysqlcdc_sync_hive01->commits show --sortBy "CommitTime"

查看Hudi compactions 计划

hudi:mysqlcdc_sync_hive01->compactions show all

7.24 PrestoDB 查询Hive表Hudi数据

版本说明:PrestoDB 0.256 DBeaver7.0.4

PrestoDB 集群配置和hive集成参考PrestoDB官网

presto-server-***/etc/catalog/hive.properties 配置hive catalog

可通过 presto-cli 连接 hive metastore 开启查询,presto-cli 的设置参考 presto官方配置;

DBeaver客户端查询Hive ro表数据:


Hive ro表count 正常:


查询Hive rt表数据查询异常:


Hive rt表count异常:


Presto Web ui:



相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
目录
相关文章
消息中间件 存储 传感器
348 0
|
SQL 存储 API
Flink Materialized Table:构建流批一体 ETL
Flink Materialized Table:构建流批一体 ETL
249 3
|
SQL 存储 HIVE
鹰角基于 Flink + Paimon + Trino 构建湖仓一体化平台实践项目
鹰角基于 Flink + Paimon + Trino 构建湖仓一体化平台实践项目
855 2
|
SQL 存储 HIVE
鹰角基于 Flink + Paimon + Trino 构建湖仓一体化平台实践项目
本文整理自鹰角网络大数据开发工程师朱正军在Flink Forward Asia 2024上的分享,主要涵盖四个方面:鹰角数据平台架构、数据湖选型、湖仓一体建设及未来展望。文章详细介绍了鹰角如何构建基于Paimon的数据湖,解决了Hudi入湖的痛点,并通过Trino引擎和Ranger权限管理实现高效的数据查询与管控。此外,还探讨了湖仓一体平台的落地效果及未来技术发展方向,包括Trino与Paimon的集成增强、StarRocks的应用以及Paimon全面替换Hive的计划。
1591 1
鹰角基于 Flink + Paimon + Trino 构建湖仓一体化平台实践项目
|
SQL 存储 API
Flink Materialized Table:构建流批一体 ETL
本文整理自阿里云智能集团 Apache Flink Committer 刘大龙老师在2024FFA流批一体论坛的分享,涵盖三部分内容:数据工程师用户故事、Materialized Table 构建流批一体 ETL 及 Demo。文章通过案例分析传统 Lambda 架构的挑战,介绍了 Materialized Table 如何简化流批处理,提供统一 API 和声明式 ETL,实现高效的数据处理和维护。最后展示了基于 Flink 和 Paimon 的实际演示,帮助用户更好地理解和应用这一技术。
943 7
Flink Materialized Table:构建流批一体 ETL
|
SQL 监控 关系型数据库
用友畅捷通在Flink上构建实时数仓、挑战与最佳实践
本文整理自用友畅捷通数据架构师王龙强在FFA2024上的分享,介绍了公司在Flink上构建实时数仓的经验。内容涵盖业务背景、数仓建设、当前挑战、最佳实践和未来展望。随着数据量增长,公司面临数据库性能瓶颈及实时数据处理需求,通过引入Flink技术逐步解决了数据同步、链路稳定性和表结构差异等问题,并计划在未来进一步优化链路稳定性、探索湖仓一体架构以及结合AI技术推进数据资源高效利用。
879 25
用友畅捷通在Flink上构建实时数仓、挑战与最佳实践
|
存储 关系型数据库 BI
实时计算UniFlow:Flink+Paimon构建流批一体实时湖仓
实时计算架构中,传统湖仓架构在数据流量管控和应用场景支持上表现良好,但在实际运营中常忽略细节,导致新问题。为解决这些问题,提出了流批一体的实时计算湖仓架构——UniFlow。该架构通过统一的流批计算引擎、存储格式(如Paimon)和Flink CDC工具,简化开发流程,降低成本,并确保数据一致性和实时性。UniFlow还引入了Flink Materialized Table,实现了声明式ETL,优化了调度和执行模式,使用户能灵活调整新鲜度与成本。最终,UniFlow不仅提高了开发和运维效率,还提供了更实时的数据支持,满足业务决策需求。
|
10月前
|
SQL 分布式计算 大数据
大数据新视界 --大数据大厂之Hive与大数据融合:构建强大数据仓库实战指南
本文深入介绍 Hive 与大数据融合构建强大数据仓库的实战指南。涵盖 Hive 简介、优势、安装配置、数据处理、性能优化及安全管理等内容,并通过互联网广告和物流行业案例分析,展示其实际应用。具有专业性、可操作性和参考价值。
大数据新视界 --大数据大厂之Hive与大数据融合:构建强大数据仓库实战指南
|
SQL 分布式计算 Java
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
353 0
|
SQL 分布式计算 大数据
大数据处理平台Hive详解
【7月更文挑战第15天】Hive作为基于Hadoop的数据仓库工具,在大数据处理和分析领域发挥着重要作用。通过提供类SQL的查询语言,Hive降低了数据处理的门槛,使得具有SQL背景的开发者可以轻松地处理大规模数据。然而,Hive也存在查询延迟高、表达能力有限等缺点,需要在实际应用中根据具体场景和需求进行选择和优化。
1197 6