Flink CDC + Hudi + Hive + Presto构建实时数据湖最佳实践

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
简介: Flink CDC + Hudi + Hive + Presto构建实时数据湖最佳实践

1. 测试过程环境版本说明

Flink1.13.1

Scala2.11

CDH6.2.0

Hadoop3.0.0

Hive2.1.1

Hudi0.10(master)

PrestoDB0.256

Mysql5.7

2. 集群服务器基础环境

2.1 Maven和JDK环境版本

2.2 Hadoop 集群环境版本

2.3 HADOOP环境变量配置

export HADOOP_HOME=/opt/cloudera/parcels/CDH/lib/hadoopexport HADOOP_CALSSPATH=`$HADOOP_HOME/bin/hadoop classpath`

3. Hudi编译环境配置

3.1 Maven Home settings.xml配置修改

说明:指定aliyun maven地址(支持CDH cloudera依赖) mirror库

<mirrors><mirror><id>alimaven</id><mirrorOf>central,!cloudera</mirrorOf><name>aliyun maven</name><url>http://maven.aliyun.com/nexus/content/groups/public/</url></mirror></mirrors>

3.2 下载Hudi源码包

git clone https://github.com/apache/hudi.git

Hudi社区建议版本适配

hudi0.9 适配 flink1.12.2

hudi0.10(master) 适配 flink1.13.X ( 说明master分支上版本还未release)

3.3 Hudi 客户端命令行

3.4 修改Hudi集成flink和Hive编译依赖版本配置

hudi-master/packaging/hudi-flink-bundle

pom.xml文件 ( 笔者环境CDH6.2.0 hive2.1.1)

<profile><id>flink-bundle-shade-hive2</id><properties><hive.version>2.1.1-cdh6.2.0</hive.version><flink.bundle.hive.scope>compile</flink.bundle.hive.scope></properties><dependencies><dependency><groupId>${hive.groupid}</groupId><artifactId>hive-service-rpc</artifactId><version>${hive.version}</version><scope>${flink.bundle.hive.scope}</scope></dependency></dependencies></profile>

3.5 编译Hudi 指定Hadoop和Hive版本信息

mvn clean install -DskipTests-Drat.skip=true-Dscala-2.11-Dhadoop.version=3.0.0-Pflink-bundle-shade-hive2

(可加 –e –X 参数查看编译ERROR异常和DEBUG信息)

说明:默认scala2.11、默认不包含hive依赖

首次编译耗时较长 笔者首次编译大概花费 50min+(也和服务器网络有关)

后续编译会快一些 大约15min左右

3.6 Hudi编译异常

修改Hudi master pom.xml 增加 CDH repository地址

3.7 Hudi重新编译

3.8 Hudi编译结果说明

hudi-master/packaging/hudi-flink-bundle/target

hudi-flink-bundle_2.11-0.10.0-SNAPSHOT.jar

说明:hudi-flink-bundle jar 是 flink 用来写入和读取数据

hudi-master/packaging/hudi-hadoop-mr-bundle/target

hudi-hadoop-mr-bundle-0.10.0-SNAPSHOT.jar

说明:hudi-mr-bundle jar 是 hive 需要用来读hudi数据

4. Flink环境配置

版本说明:Flink 1.13.1 scala2.11版本

4.1 FLINK_HOME 下 sql-client-defaults.yaml 配置

4.2 flink-conf.yaml配置修改

# state.backend: filesystemstate.backend: rocksdb# 开启增量checkpointstate.backend.incremental: true# state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpointsstate.checkpoints.dir: hdfs://nameservice/flink/flink-checkpointsclassloader.check-leaked-classloader: falseclassloader.resolve-order: parent-first

4.3 FLINK_HOME lib下添加依赖

flink-sql-connector-mysql-cdc-1.4.0.jarflink-sql-connector-oracle-cdc-2.1-SNAPSHOT.jar.BAK – oracle cdc 依赖 flink-format-changelog-json-1.4.0.jarflink-sql-connector-kafka_2.11-1.13.1.jar--- Hadoop home lib下copy过来hadoop-mapreduce-client-common-3.0.0-cdh6.2.0.jarhadoop-mapreduce-client-core-3.0.0-cdh6.2.0.jarhadoop-mapreduce-client-jobclient-3.0.0-cdh6.2.0.jar--- hudi编译jar copy过来hudi-flink-bundle_2.11-0.10.0-SNAPSHOT.jar

说明:目前oracle cdc jar和mysql cdc jar一起在lib下发现有冲突异常

5 启动flink yarn session服务

5.1 FLINK_HOME shell 命令

$FLINK_HOME/bin/yarn-session.sh -s 2-jm 2048-tm 2048-nm ys-hudi01 -d

5.2 Yarn Web UI

5.3 Flinksql Client 启动命令

$FLINK_HOME/bin/sql-client.sh embedded -j ./lib/hudi-flink-bundle_2.11-0.10.0-SNAPSHOT.jar shell

说明:-j指定hudi-flink 依赖jar

Show table /show catalogs

6. MySQL binlog 开启配置

6.1 创建binlog日志存储路径

mkdir logs

6.2 修改目录属主和group

chown -R mysql:mysql /mysqldata/logs

6.3 修改mysql配置信息

vim /etc/my.cnfserver-id=2log-bin= /mysqldata/logs/mysql-binbinlog_format=rowexpire_logs_days=15binlog_row_image=full

6.4 修改完,重启mysql server

service mysqld restart

6.5 客户端查看binlog日志情况

show master logs;

Mysql 版本:5.7.30

5.6 创建mysql sources 表 DDL

create table users_cdc(   id bigint auto_increment primary key,   name varchar(20) null,   birthday timestamp default CURRENT_TIMESTAMP notnull,   ts timestamp default CURRENT_TIMESTAMP notnull);

7. FlinkCDC sink Hudi测试代码过程

7.1 Flink sql cdc DDL 语句:(具体参数说明可参考flink官网)

CREATE TABLE mysql_users (    id BIGINT PRIMARY KEY NOT ENFORCED ,    name STRING,    birthday TIMESTAMP(3),    ts TIMESTAMP(3)) WITH ('connector'= 'mysql-cdc','hostname'= '127.0.0.1','port'= '3306','username'= '','password'=’’,'server-time-zone'= 'Asia/Shanghai','debezium.snapshot.mode'='initial','database-name'= 'luo','table-name'= 'users_cdc');

7.2 查询mysql cdc 表

Flink SQL> select * from mysql_users;

由于目前MySQL users_cdc表是空,所以flinksql 查询没有数据 只有表结构;

Flink web UI:

7.3 创建一个临时视图,增加分区列 方便后续同步hive分区表

Flink SQL> create view mycdc_v AS SELECT *, DATE_FORMAT(birthday, 'yyyyMMdd') as partition FROM mysql_users;

说明:partition 关键字需要 `` 引起来

查询视图数据也是空结构,但增加了分区字段:

Flink SQL> select * from mycdc_v;

Flink web UI:

7.4 设置checkpoint间隔时间,存储路径已在flink-conf配置设置全局路径

建议:测试环境 可设置秒级别(不能太小),生产环境可设置分钟级别。

Flink SQL> set execution.checkpointing.interval=30sec;

7.5 Flinksql 创建 cdc sink hudi文件,并自动同步hive分区表DDL 语句

CREATE TABLE mysqlcdc_sync_hive01(id bigint ,name string,birthday TIMESTAMP(3),ts TIMESTAMP(3),`partition` VARCHAR(20),primary key(id) not enforced --必须指定uuid 主键)PARTITIONED BY (`partition`)with('connector'='hudi','path'= 'hdfs://nameservice /luo/hudi/mysqlcdc_sync_hive01', 'hoodie.datasource.write.recordkey.field'= 'id'-- 主键, 'write.precombine.field'= 'ts'-- 自动precombine的字段, 'write.tasks'= '1', 'compaction.tasks'= '1', 'write.rate.limit'= '2000'-- 限速, 'table.type'= 'MERGE_ON_READ'-- 默认COPY_ON_WRITE,可选MERGE_ON_READ , 'compaction.async.enabled'= 'true'-- 是否开启异步压缩, 'compaction.trigger.strategy'= 'num_commits'-- 按次数压缩, 'compaction.delta_commits'= '1'-- 默认为5, 'changelog.enabled'= 'true'-- 开启changelog变更, 'read.streaming.enabled'= 'true'-- 开启流读, 'read.streaming.check-interval'= '3'-- 检查间隔,默认60s, 'hive_sync.enable'= 'true'-- 开启自动同步hive, 'hive_sync.mode'= 'hms'-- 自动同步hive模式,默认jdbc模式, 'hive_sync.metastore.uris'= 'thrift://hadoop:9083'-- hive metastore地址-- , 'hive_sync.jdbc_url'= 'jdbc:hive2://hadoop:10000'-- hiveServer地址, 'hive_sync.table'= 'mysqlcdc_sync_hive01'-- hive 新建表名, 'hive_sync.db'= 'luo'-- hive 新建数据库名, 'hive_sync.username'= ''-- HMS 用户名, 'hive_sync.password'= ''-- HMS 密码, 'hive_sync.support_timestamp'= 'true'-- 兼容hive timestamp类型);

说明:Hudi目前支持MOR和COW两种模式

(1) Copy on Write:使用列式存储来存储数据(例如:parquet),通过在写入期间执行同步合并来简单地更新和重现文件

(2) Merge on Read:使用列式存储(parquet)+行式文件(arvo)组合存储数据。更新记录到增量文件中,然后进行同步或异步压缩来生成新版本的列式文件。

COW:Copy on Write (写时复制),快照查询+增量查询

MOR:Merge on Read (读时合并),快照查询+增量查询+读取优化查询(近实时)

使用场景上:

(1)COW适用写少读多的场景 ,MOR 适用写多读少的场景;

(2)MOR适合CDC场景,更新延迟要求较低,COW目前不支持 changelog mode 不适合处理cdc场景;

Flink web UI

7.6 Flink sql mysql cdc数据写入hudi文件数据

Flink SQL> insert into mysqlcdc_sync_hive01 select id,name,birthday,ts,partition from mycdc_v;

Flink web UI DAG图:

7.7 HDFS上Hudi文件目录情况

说明:目前还没写入测试数据,hudi目录只生成一些状态标记文件,还未生成分区目录以及.log 和.parquet数据文件,具体含义可见hudi官方文档。

7.8 Mysql数据源写入测试数据

insert into users_cdc (name) values ('cdc01');

7.9 Flinksql 查询mysql cdc insert数据:

Flink SQL> set execution.result-mode=tableau;

[WARNING] The specified key 'execution.result-mode' is deprecated. Please use 'sql-client.execution.result-mode' instead.

[INFO] Session property has been set.

Flink SQL> select * from mysql_users; -- 查询到一条insert数据

7.10 Flink web UI页面可以看到DAG 各个环节产生一条测试数据

7.11 Flinksql 查询 sink的hudi表数据

Flink SQL> select * from mysqlcdc_sync_hive01; --已查询到一条insert数据

7.12 Hdfs上Hudi文件目录变化情况

7.13 Hive分区表和数据自动同步情况

7.14 查看自动创建hive表结构

hive> show create table mysqlcdc_sync_hive01_ro;

hive> show create table mysqlcdc_sync_hive01_rt;

7.15 查看自动生成的表分区信息

hive> show partitions mysqlcdc_sync_hive01_ro;

hive> show partitions mysqlcdc_sync_hive01_rt;

说明:已自动生产hudi MOR模式的

mysqlcdc_sync_hive01_ro

mysqlcdc_sync_hive01_rt

ro表和rt表区别:

ro 表全称 read oprimized table,对于 MOR 表同步的 xxx_ro 表,只暴露压缩后的 parquet。其查询方式和COW表类似。设置完 hiveInputFormat 之后 和普通的 Hive 表一样查询即可;

rt表示增量视图,主要针对增量查询的rt表;

ro表只能查parquet文件数据, rt表 parquet文件数据和log文件数据都可查;

7.16 Hive访问Hudi数据

说明:需要引入hudi-hadoop-mr-bundle-0.10.0-SNAPSHOT.jar

引入Hudi依赖jar方式:

(1) 引入到 $HIVE_HOME/lib下;

(2) 引入到$HIVE_HOME/auxlib 自定义第三方依赖 修改 hive-site.xml配置文件;

(3) Hive shell命令行引入 Session级别有效;

其中(1)和(3)配置完后需要重启 hive-server服务;

查询Hive 分区表数据:

hive> select * from mysqlcdc_sync_hive01_ro; --已查询到mysq insert的一条数据

hive> select * from mysqlcdc_sync_hive01_rt; --已查询到mysq insert的一条数据

Hive 条件查询:

hive> select name,ts from mysqlcdc_sync_hive01_ro where partition='20211109';

Hive ro表 count查询

hive> select count(1) from mysqlcdc_sync_hive01_ro;


Hive Count异常解决:

引入hudi-hadoop-mr-bundle-0.10.0-SNAPSHOT.jar依赖

hive> add jar hdfs://nameservice /luo/hudi-hadoop-mr-bundle-0.10.0-SNAPSHOT.jar;

hive> set hive.input.format = org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;


hive> select count(1) from mysqlcdc_sync_hive01_ro; --可正常count


Hive rt表 count查询

hive> select count(1) from mysqlcdc_sync_hive01_rt;

说明:rt 表count 还是异常,和Hudi社区人员沟通hudi master目前还没release这块存在bug正在修复中

具体见:https://issues.apache.org/jira/browse/HUDI-2649

7.17 Mysql 数据源写入多条测试数据

insert into users_cdc (name) values ('cdc02');insert into users_cdc (name) values ('cdc03');insert into users_cdc (name) values ('cdc04');insert into users_cdc (name) values ('cdc05');insert into users_cdc (name) values ('cdc06');


Flink web UI DAG中数据链路情况:

7.18 Flinksql中新写入数据查询情况

Yarn web UI application_1626256835287_40351[1]资源使用情况


Hdfs上Hudi文件目录变化情况


Hudi状态文件说明:

(1)requested:表示一个动作已被安排,但尚未启动

(2)inflight:表示当前正在执行操作

(3)completed:表示在时间线上完成了操作

Flink jobmanager log sync hive过程详细日志




7.19 Mysql 数据源更新数据

update users_cdc set name = 'cdc05-bj'where id = 5;


7.20 Flinksql 查询cdc update数据 产生两条binlog数据


说明:flinksql 查询最终只有一条+I有效数据,且数据已更新

Flink web UI DAG接受到两条binlog数据,但最终compact和sink只有一条有效数据


7.21 MySQL 数据源 delete 一条数据:

deletefrom users_cdc where id = 3;


Flink Web UI job DAG中捕获一条新数据:


Flinksql changlog delete数据变化查询

HDFS上Hudi数据文件生成情况



Hudi文件类型说明:

(1)commits: 表示将一批数据原子性写入表中

(2)cleans: 清除表中不在需要的旧版本文件的后台活动

(3)delta_commit:增量提交是指将一批数据原子性写入MergeOnRead类型的表中,其中部分或者所有数据可以写入增量日志中

(4)compaction: 协调hudi中差异数据结构的后台活动,例如:将更新从基于行的日志文件变成列格式。在内部,压缩的表现为时间轴上的特殊提交

(5)rollback:表示提交操作不成功且已经回滚,会删除在写入过程中产生的数据


说明:hudi分区文件以及.log和.parquet文件都已生成

两种文件区别:Hudi会在DFS分布式文件系统上的basepath基本路径下组织成目录结构。每张对应的表都会成多个分区,这些分区是包含该分区的数据文件的文件夹,与hive的目录结构非常相似。在每个分区内,文件被组织成文件组,文件id为唯一标识。每个文件组包含多个切片,其中每个切片包含在某个提交/压缩即时时间生成的基本列文件(parquet文件),以及自生成基本文件以来对基本文件的插入/更新的一组日志文件(*.log)。Hudi采用MVCC设计,其中压缩操作会将日志和基本文件合并成新的文件片,清理操作会将未使用/较旧的文件片删除来回收DFS上的空间。

Flink 任务checkpoint 情况:

设置30s 一次



7.22 Hive shell查询数据update和delete变化情况:

hive> select * from mysqlcdc_sync_hive01_ro;


hive> select * from mysqlcdc_sync_hive01_rt;


7.23 Hudi Client端操作Hudi表

进入Hudi客户端命令行:

hudi-master/hudi-cli/hudi-cli.sh

连接Hudi表,查看表信息

hudi->connect --path hdfs://nameservice1/tmp/luo/hudi/mysqlcdc_sync_hive01


查看Hudi commit信息

hudi:mysqlcdc_sync_hive01->commits show --sortBy "CommitTime"

查看Hudi compactions 计划

hudi:mysqlcdc_sync_hive01->compactions show all

7.24 PrestoDB 查询Hive表Hudi数据

版本说明:PrestoDB 0.256 DBeaver7.0.4

PrestoDB 集群配置和hive集成参考PrestoDB官网

presto-server-***/etc/catalog/hive.properties 配置hive catalog

可通过 presto-cli 连接 hive metastore 开启查询,presto-cli 的设置参考 presto官方配置;

DBeaver客户端查询Hive ro表数据:


Hive ro表count 正常:


查询Hive rt表数据查询异常:


Hive rt表count异常:


Presto Web ui:



相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
18天前
|
消息中间件 资源调度 关系型数据库
如何在Flink on YARN环境中配置Debezium CDC 3.0,以实现实时捕获数据库变更事件并将其传输到Flink进行处理
本文介绍了如何在Flink on YARN环境中配置Debezium CDC 3.0,以实现实时捕获数据库变更事件并将其传输到Flink进行处理。主要内容包括安装Debezium、配置Kafka Connect、创建Flink任务以及启动任务的具体步骤,为构建实时数据管道提供了详细指导。
45 9
|
2月前
|
算法 API Apache
Flink CDC:新一代实时数据集成框架
本文源自阿里云实时计算团队 Apache Flink Committer 任庆盛在 Apache Asia CommunityOverCode 2024 的分享,涵盖 Flink CDC 的概念、版本历程、内部实现及社区未来规划。Flink CDC 是一种基于数据库日志的 CDC 技术实现的数据集成框架,能高效完成全量和增量数据的实时同步。自 2020 年以来,Flink CDC 经过多次迭代,已成为功能强大的实时数据集成工具,支持多种数据库和数据湖仓系统。未来将进一步扩展生态并提升稳定性。
613 1
Flink CDC:新一代实时数据集成框架
|
2月前
|
消息中间件 canal 数据采集
Flink CDC 在货拉拉的落地与实践
陈政羽在Apache Asia Community Over Code 2024上分享了《货拉拉在Flink CDC生产实践落地》。文章介绍了货拉拉业务背景、技术选型及其在实时数据采集中的挑战与解决方案,详细阐述了Flink CDC的技术优势及在稳定性、兼容性等方面的应用成果。通过实际案例展示了Flink CDC在提升数据采集效率、降低延迟等方面的显著成效,并展望了未来发展方向。
555 14
Flink CDC 在货拉拉的落地与实践
|
3月前
|
Oracle 关系型数据库 新能源
Flink CDC 在新能源制造业的实践
本文撰写自某新能源企业的研发工程师 单葛尧 老师。本文详细介绍该新能源企业的大数据平台中 CDC 技术架构选型和 Flink CDC 的最佳实践。
453 13
Flink CDC 在新能源制造业的实践
|
3月前
|
SQL 数据库 流计算
Flink CDC数据读取问题之一致性如何解决
Flink CDC 使用Change Data Capture (CDC)技术从数据库捕获变更事件,并利用Flink的流处理能力确保数据读取一致性。相较于传统工具,它具备全增量一体化数据集成能力,满足实时性需求。在实践中解决了高效数据同步、稳定同步大量表数据等问题。应用场景包括实时数据同步、实时数据集成等。快速上手需学习基本概念与实践操作。未来发展方向包括提升效率与稳定性,并依据用户需求持续优化。
129 1
|
4月前
|
关系型数据库 API Apache
Flink CDC:基于 Apache Flink 的流式数据集成框架
本文整理自阿里云 Flink SQL 团队研发工程师于喜千(yux)在 SECon 全球软件工程技术大会中数据集成专场沙龙的分享。
18264 11
Flink CDC:基于 Apache Flink 的流式数据集成框架
|
4月前
|
SQL JSON 缓存
玳数科技集成 Flink CDC 3.0 的实践
本文投稿自玳数科技工程师杨槐老师,介绍了 Flink CDC 3.0 与 ChunJun 框架在玳数科技的集成实践。
597 7
玳数科技集成 Flink CDC 3.0 的实践
|
3月前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版操作报错合集之遇到iava.lang.NoClassDefFoundError: ververica/cdc/common/utils/StrinaUtils错误,是什么导致的
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
3月前
|
资源调度 关系型数据库 MySQL
【Flink on YARN + CDC 3.0】神操作!看完这篇教程,你也能成为数据流处理高手!从零开始,一步步教会你在Flink on YARN模式下如何配置Debezium CDC 3.0,让你的数据库变更数据瞬间飞起来!
【8月更文挑战第15天】随着Apache Flink的普及,企业广泛采用Flink on YARN部署流处理应用,高效利用集群资源。变更数据捕获(CDC)工具在现代数据栈中至关重要,能实时捕捉数据库变化并转发给下游系统处理。本文以Flink on YARN为例,介绍如何在Debezium CDC 3.0中配置MySQL连接器,实现数据流处理。首先确保YARN上已部署Flink集群,接着安装Debezium MySQL连接器并配置Kafka Connect。最后,创建Flink任务消费变更事件并提交任务到Flink集群。通过这些步骤,可以构建出从数据库变更到实时处理的无缝数据管道。
311 2
|
3月前
|
SQL 关系型数据库 MySQL
“震撼揭秘!Flink CDC如何轻松实现SQL Server到MySQL的实时数据同步?一招在手,数据无忧!”
【8月更文挑战第7天】随着大数据技术的发展,实时数据同步变得至关重要。Apache Flink作为高性能流处理框架,在实时数据处理领域扮演着核心角色。Flink CDC(Change Data Capture)组件的加入,使得数据同步更为高效。本文介绍如何使用Flink CDC实现从SQL Server到MySQL的实时数据同步,并提供示例代码。首先确保SQL Server启用了CDC功能,接着在Flink环境中引入相关连接器。通过定义源表与目标表,并执行简单的`INSERT INTO SELECT`语句,即可完成数据同步。
354 1
下一篇
无影云桌面