使用FLINK SQL从savepoint恢复hudi作业 (flink 1.13)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群版 2核4GB 100GB
推荐场景:
搭建个人博客
简介: Flink从1.13版本开始支持在SQL Client从savepoint恢复作业,本文介绍如何在flink sql client中恢复flink作业。

Flink从1.13版本开始支持在SQL Client从savepoint恢复作业。flink-savepoint介绍

接下来我们从Flink SQL Client构建一个mysql cdc数据经kafka入hudi数据湖的例子。整体流程如下:

在上述第二步中,我们通过手工停止kafka→hudi的Flink任务,然后在Flink SQL Client从savepoint进行恢复。

下述工作类似于Flink SQL Client实战CDC数据入湖只是本文的flink版本为1.13.1,可参考其完成本文验证。

环境依赖

hadoop 3.2.0

zookeeper 3.6.3

kafka 2.8.0

mysql 5.7.35

flink 1.13.1-scala_2.12

flink cdc 1.4

hudi 0.10.0-SNAPSHOT (最新master分支编译 [2021/10/08])

datafaker 0.7.6

各组件说明见Flink SQL Client实战CDC数据入湖

组件说明、安装以及使用可直接在上述文章页面右上角搜索框使用关键词进行搜索。

操作指南

使用datafaker将测试数据导入mysql

  1. 在数据库中新建stu8表
mysql -u root -p

create database test;
use test;
create table stu8 (
  id int unsigned auto_increment primary key COMMENT '自增id',
  name varchar(20) not null comment '学生名字',
  school varchar(20) not null comment '学校名字',
  nickname varchar(20) not null comment '学生小名',
  age int not null comment '学生年龄',
  score decimal(4,2) not null comment '成绩',
  class_num int not null comment '班级人数',
  phone bigint not null comment '电话号码',
  email varchar(64) comment '家庭网络邮箱',
  ip varchar(32) comment 'IP地址'
  ) engine=InnoDB default charset=utf8;
  1. 新建meta.txt文件,文件内容为:
id||int||自增id[:inc(id,1)]
name||varchar(20)||学生名字
school||varchar(20)||学校名字[:enum(qinghua,beida,shanghaijiaoda,fudan,xidian,zhongda)]
nickname||varchar(20)||学生小名[:enum(tom,tony,mick,rich,jasper)]
age||int||学生年龄[:age]
score||decimal(4,2)||成绩[:decimal(4,2,1)]
class_num||int||班级人数[:int(10, 100)]
phone||bigint||电话号码[:phone_number]
email||varchar(64)||家庭网络邮箱[:email]
ip||varchar(32)||IP地址[:ipv4]
  1. 生成1000000条数据并写入到mysql中的test.stu8表(将数据设置尽量大,让写入hudi的任务能够不断进行)
datafaker rdb mysql+mysqldb://root:Pass-123-root@hadoop:3306/test?charset=utf8 stu8 1000000 --meta meta.txt 

hudi、flink-mysql-cdc、flink-kafka相关jar包下载

本文提供编译好的hudi-flink-bundle_2.12-0.10.0-SNAPSHOT.jar,如果你想自己编译hudi那么直接clone master分支进行编译即可。(注意指定hadoop版本)

将jar包下载到flink的lib目录下

cd flink-1.13.1/lib
wget https://obs-githubhelper.obs.cn-east-3.myhuaweicloud.com/blog-images/category/bigdata/flink/flink-sql-client-savepoint-example/hudi-flink-bundle_2.12-0.10.0-SNAPSHOT.jar
wget https://repo1.maven.org/maven2/com/alibaba/ververica/flink-sql-connector-mysql-cdc/1.4.0/flink-sql-connector-mysql-cdc-1.4.0.jar
wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.12/1.13.1/flink-sql-connector-kafka_2.12-1.13.1.jar

如果你在启动以及运行flink任务中遇到缺少某些类问题,请下载相关jar包并放置到flink-1.13.1/lib目录下,本实验在操作过程中遇到的缺少的包如下(点击可下载):

在yarn上启动flink session集群

首先确保已经配置好HADOOP_CLASSPATH,对于开源版本hadoop3.2.0,可通过如下方式设置:

export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$HADOOP_HOME/share/hadoop/client/*:$HADOOP_HOME/share/hadoop/common/*:$HADOOP_HOME/share/hadoop/hdfs/*:$HADOOP_HOME/share/hadoop/mapreduce/*:$HADOOP_HOME/share/hadoop/tools/*:$HADOOP_HOME/share/hadoop/yarn/*:$HADOOP_HOME/etc/hadoop/*

flink需要开启checkpoint,和配置savepoint目录,修改flink-conf.yaml配置文件

execution.checkpointing.interval: 150000ms
state.backend: rocksdb
state.checkpoints.dir: hdfs://hadoop:9000/flink-chk
state.backend.rocksdb.localdir: /tmp/rocksdb
state.savepoints.dir: hdfs://hadoop:9000/flink-1.13-savepoints

启动flink session集群

cd flink-1.13.1
bin/yarn-session.sh -s 4 -jm 2048 -tm 2048 -nm flink-hudi-test -d

启动flink sql client

cd flink-1.13.1
bin/sql-client.sh embedded -s yarn-session -j ./lib/hudi-flink-bundle_2.12-0.10.0-SNAPSHOT.jar shell

flink读取mysql binlog并写入kafka

创建mysql源表

create table stu8_binlog(
  id bigint not null,
  name string,
  school string,
  nickname string,
  age int not null,
  score decimal(4,2) not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string
) with (
  'connector' = 'mysql-cdc',
  'hostname' = 'hadoop',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'database-name' = 'test',
  'table-name' = 'stu8'
);

创建kafka目标表

create table stu8_binlog_sink_kafka(
  id bigint not null,
  name string,
  school string,
  nickname string,
  age int not null,
  score decimal(4,2) not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string,
  primary key (id) not enforced
) with (
  'connector' = 'kafka'
  ,'topic' = 'cdc_mysql_test_stu8_sink'
  ,'properties.zookeeper.connect' = 'hadoop1:2181'
  ,'properties.bootstrap.servers' = 'hadoop1:9092'
  ,'format' = 'debezium-json'
);

创建任务将mysql binlog日志写入kafka

insert into stu8_binlog_sink_kafka
select * from stu8_binlog;

flink读取kafka数据并写入hudi数据湖

创建kafka源表

create table stu8_binlog_source_kafka(
  id bigint not null,
  name string,
  school string,
  nickname string,
  age int not null,
  score decimal(4,2) not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string
 ) with (
  'connector' = 'kafka',
  'topic' = 'cdc_mysql_test_stu8_sink',
  'properties.bootstrap.servers' = 'hadoop1:9092',
  'format' = 'debezium-json',
  'scan.startup.mode' = 'earliest-offset',
  'properties.group.id' = 'testGroup'
  );

创建hudi目标表

 create table stu8_binlog_sink_hudi(
  id bigint not null,
  name string,
  `school` string,
  nickname string,
  age int not null,
  score decimal(4,2) not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string,
  primary key (id) not enforced
)
 partitioned by (`school`)
 with (
  'connector' = 'hudi',
  'path' = 'hdfs://hadoop:9000/tmp/test_stu8_binlog_sink_hudi',
  'table.type' = 'MERGE_ON_READ',
  'write.precombine.field' = 'school'
  );

创建任务将kafka数据写入到hudi中

insert into stu8_binlog_sink_hudi
select * from  stu8_binlog_source_kafka;

待任务运行一段时间后,我们手动保存hudi作业并停止任务

bin/flink stop --savepointPath hdfs://hadoop:9000/flink-1.13-savepoint/ 0128b183276022367e15b017cb682d61 -yid application_1633660054258_0001

得到该任务的具体savepoint路径:

[root@hadoop flink]# bin/flink stop --savepointPath hdfs://hadoop:9000/flink-1.13-savepoint/ 0128b183276022367e15b017cb682d61 -yid application_1633660054258_0001
2021-10-07 22:55:51,171 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Found Yarn properties file under /tmp/.yarn-properties-root.
2021-10-07 22:55:51,171 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Found Yarn properties file under /tmp/.yarn-properties-root.
Suspending job "0128b183276022367e15b017cb682d61" with a savepoint.
2021-10-07 22:55:51,317 WARN  org.apache.flink.yarn.configuration.YarnLogConfigUtil        [] - The configuration directory ('/data/flink-1.13.1/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2021-10-07 22:55:51,364 INFO  org.apache.hadoop.yarn.client.RMProxy                        [] - Connecting to ResourceManager at hadoop/192.168.241.128:8032
2021-10-07 22:55:51,504 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2021-10-07 22:55:51,506 WARN  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set.The Flink YARN Client needs one of these to be set to properly load the Hadoop configuration for accessing YARN.
2021-10-07 22:55:51,567 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface hadoop:34681 of application 'application_1633660054258_0001'.
Savepoint completed. Path: hdfs://hadoop:9000/flink-1.13-savepoint/savepoint-0128b1-8970a7371adb

从savepoint恢复任务:(在Flink SQL Client执行)

 SET execution.savepoint.path=hdfs://hadoop:9000/flink-1.13-savepoint/savepoint-0128b1-8970a7371adb
 
 insert into stu8_binlog_sink_hudi
select * from  stu8_binlog_source_kafka;

可以看到该任务从上述检查点恢复:

最新hudi基础入门、实战经验、进阶提升、经典案例,请参考:

hudi | 从大数据到人工智能

hudi-1
hudi-2

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
26天前
|
SQL 缓存 监控
14个Flink SQL性能优化实践分享
【7月更文挑战第12天】 1. **合理设置并行度**: 根据数据量和资源调整以提高处理速度. 2. **优化数据源**: 使用分区表并进行预处理减少输入量. 3. **数据缓存**: 采用 `BROADCAST` 或 `REPARTITION` 缓存常用数据. 4. **索引和分区**: 创建索引并按常用字段分区. 5. **避免不必要的计算**: 检查并移除多余的计算步骤. 6. **调整内存配置**: 分配足够内存避免性能下降. 7. **优化连接操作**: 选择适合大表和小表的连接方式. 8. **数据类型优化**: 选择合适类型以节省资源. ........
|
11天前
|
SQL 物联网 数据处理
"颠覆传统,Hive SQL与Flink激情碰撞!解锁流批一体数据处理新纪元,让数据决策力瞬间爆表,你准备好了吗?"
【8月更文挑战第9天】数据时代,实时性和准确性至关重要。传统上,批处理与流处理各司其职,但Apache Flink打破了这一界限,尤其Flink与Hive SQL的结合,开创了流批一体的数据处理新时代。这不仅简化了数据处理流程,还极大提升了效率和灵活性。例如,通过Flink SQL,可以轻松实现流数据与批数据的融合分析,无需在两者间切换。这种融合不仅降低了技术门槛,还为企业提供了更强大的数据支持,无论是在金融、电商还是物联网领域,都将发挥巨大作用。
31 6
|
11天前
|
SQL 大数据 测试技术
奇迹降临!解锁 Flink SQL 简单高效的终极秘籍,开启数据处理的传奇之旅!
【8月更文挑战第9天】在大数据处理领域,Flink SQL 因其强大功能与简洁语法深受开发者青睐。本文分享了编写高效 Flink SQL 的实用技巧:首先需深刻理解数据特性与业务目标;其次,合理运用窗口函数(如 TUMBLE 和 HOP)可大幅提升效率;优化连接操作,优先采用等值连接并恰当选择连接表;正确选取数据类型以减少类型转换开销;最后,持续进行性能测试与调优。通过这些方法,我们能在实际项目中(如实时电商数据分析)更高效地处理数据,挖掘出更多价值。
29 6
|
11天前
|
SQL 安全 数据处理
揭秘数据脱敏神器:Flink SQL的神秘力量,守护你的数据宝藏!
【8月更文挑战第9天】在大数据时代,Apache Flink以其强大的流处理能力脱颖而出,而Flink SQL则为数据处理带来了灵活性。本文介绍如何运用Flink SQL实现数据脱敏——一项关键的隐私保护技术。通过内置函数与表达式,在SQL查询中加入脱敏逻辑,可有效处理敏感信息,如个人身份与财务数据,以符合GDPR等数据保护法规。示例展示了如何对信用卡号进行脱敏,采用`CASE`语句检查并替换敏感数据。此外,Flink SQL支持自定义函数,适用于更复杂的脱敏需求。掌握此技能对于保障数据安全至关重要。
31 5
|
13天前
|
SQL 关系型数据库 MySQL
“震撼揭秘!Flink CDC如何轻松实现SQL Server到MySQL的实时数据同步?一招在手,数据无忧!”
【8月更文挑战第7天】随着大数据技术的发展,实时数据同步变得至关重要。Apache Flink作为高性能流处理框架,在实时数据处理领域扮演着核心角色。Flink CDC(Change Data Capture)组件的加入,使得数据同步更为高效。本文介绍如何使用Flink CDC实现从SQL Server到MySQL的实时数据同步,并提供示例代码。首先确保SQL Server启用了CDC功能,接着在Flink环境中引入相关连接器。通过定义源表与目标表,并执行简单的`INSERT INTO SELECT`语句,即可完成数据同步。
39 1
|
14天前
|
SQL Java Scala
flink-cdc SQL Server op 字段如何获取?
Flink CDC 是 Apache Flink 的组件,用于捕获数据库变更事件。对 SQL Server,通过 Debezium 连接器支持变更数据捕获。`op` 字段标识操作类型(INSERT、UPDATE、DELETE)。配置包括添加依赖及设定 Source 连接器,可通过 Flink SQL 或 Java/Scala 完成。示例查询利用 `op` 字段筛选处理变更事件。
23 1
|
23天前
|
SQL 数据处理 Apache
Apache Flink SQL:实时计算的核心引擎
Apache Flink SQL 的一些核心功能,并探讨了其在实时计算领域的应用。随着 Flink 社区的不断发展和完善,Flink SQL 将变得越来越强大,为实时数据分析带来更多的可能性。
|
27天前
|
SQL 存储 监控
SQL Server的并行实施如何优化?
【7月更文挑战第23天】SQL Server的并行实施如何优化?
45 13
|
24天前
|
SQL
解锁 SQL Server 2022的时间序列数据功能
【7月更文挑战第14天】要解锁SQL Server 2022的时间序列数据功能,可使用`generate_series`函数生成整数序列,例如:`SELECT value FROM generate_series(1, 10)。此外,`date_bucket`函数能按指定间隔(如周)对日期时间值分组,这些工具结合窗口函数和其他时间日期函数,能高效处理和分析时间序列数据。更多信息请参考官方文档和技术资料。
|
22天前
|
SQL 存储 网络安全
关系数据库SQLserver 安装 SQL Server
【7月更文挑战第26天】
34 6