使用FLINK SQL从savepoint恢复hudi作业 (flink 1.13)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
简介: Flink从1.13版本开始支持在SQL Client从savepoint恢复作业,本文介绍如何在flink sql client中恢复flink作业。

Flink从1.13版本开始支持在SQL Client从savepoint恢复作业。flink-savepoint介绍

接下来我们从Flink SQL Client构建一个mysql cdc数据经kafka入hudi数据湖的例子。整体流程如下:

在上述第二步中,我们通过手工停止kafka→hudi的Flink任务,然后在Flink SQL Client从savepoint进行恢复。

下述工作类似于Flink SQL Client实战CDC数据入湖只是本文的flink版本为1.13.1,可参考其完成本文验证。

环境依赖

hadoop 3.2.0

zookeeper 3.6.3

kafka 2.8.0

mysql 5.7.35

flink 1.13.1-scala_2.12

flink cdc 1.4

hudi 0.10.0-SNAPSHOT (最新master分支编译 [2021/10/08])

datafaker 0.7.6

各组件说明见Flink SQL Client实战CDC数据入湖

组件说明、安装以及使用可直接在上述文章页面右上角搜索框使用关键词进行搜索。

操作指南

使用datafaker将测试数据导入mysql

  1. 在数据库中新建stu8表
mysql -u root -p

create database test;
use test;
create table stu8 (
  id int unsigned auto_increment primary key COMMENT '自增id',
  name varchar(20) not null comment '学生名字',
  school varchar(20) not null comment '学校名字',
  nickname varchar(20) not null comment '学生小名',
  age int not null comment '学生年龄',
  score decimal(4,2) not null comment '成绩',
  class_num int not null comment '班级人数',
  phone bigint not null comment '电话号码',
  email varchar(64) comment '家庭网络邮箱',
  ip varchar(32) comment 'IP地址'
  ) engine=InnoDB default charset=utf8;
  1. 新建meta.txt文件,文件内容为:
id||int||自增id[:inc(id,1)]
name||varchar(20)||学生名字
school||varchar(20)||学校名字[:enum(qinghua,beida,shanghaijiaoda,fudan,xidian,zhongda)]
nickname||varchar(20)||学生小名[:enum(tom,tony,mick,rich,jasper)]
age||int||学生年龄[:age]
score||decimal(4,2)||成绩[:decimal(4,2,1)]
class_num||int||班级人数[:int(10, 100)]
phone||bigint||电话号码[:phone_number]
email||varchar(64)||家庭网络邮箱[:email]
ip||varchar(32)||IP地址[:ipv4]
  1. 生成1000000条数据并写入到mysql中的test.stu8表(将数据设置尽量大,让写入hudi的任务能够不断进行)
datafaker rdb mysql+mysqldb://root:Pass-123-root@hadoop:3306/test?charset=utf8 stu8 1000000 --meta meta.txt 

hudi、flink-mysql-cdc、flink-kafka相关jar包下载

本文提供编译好的hudi-flink-bundle_2.12-0.10.0-SNAPSHOT.jar,如果你想自己编译hudi那么直接clone master分支进行编译即可。(注意指定hadoop版本)

将jar包下载到flink的lib目录下

cd flink-1.13.1/lib
wget https://obs-githubhelper.obs.cn-east-3.myhuaweicloud.com/blog-images/category/bigdata/flink/flink-sql-client-savepoint-example/hudi-flink-bundle_2.12-0.10.0-SNAPSHOT.jar
wget https://repo1.maven.org/maven2/com/alibaba/ververica/flink-sql-connector-mysql-cdc/1.4.0/flink-sql-connector-mysql-cdc-1.4.0.jar
wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.12/1.13.1/flink-sql-connector-kafka_2.12-1.13.1.jar

如果你在启动以及运行flink任务中遇到缺少某些类问题,请下载相关jar包并放置到flink-1.13.1/lib目录下,本实验在操作过程中遇到的缺少的包如下(点击可下载):

在yarn上启动flink session集群

首先确保已经配置好HADOOP_CLASSPATH,对于开源版本hadoop3.2.0,可通过如下方式设置:

export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$HADOOP_HOME/share/hadoop/client/*:$HADOOP_HOME/share/hadoop/common/*:$HADOOP_HOME/share/hadoop/hdfs/*:$HADOOP_HOME/share/hadoop/mapreduce/*:$HADOOP_HOME/share/hadoop/tools/*:$HADOOP_HOME/share/hadoop/yarn/*:$HADOOP_HOME/etc/hadoop/*

flink需要开启checkpoint,和配置savepoint目录,修改flink-conf.yaml配置文件

execution.checkpointing.interval: 150000ms
state.backend: rocksdb
state.checkpoints.dir: hdfs://hadoop:9000/flink-chk
state.backend.rocksdb.localdir: /tmp/rocksdb
state.savepoints.dir: hdfs://hadoop:9000/flink-1.13-savepoints

启动flink session集群

cd flink-1.13.1
bin/yarn-session.sh -s 4 -jm 2048 -tm 2048 -nm flink-hudi-test -d

启动flink sql client

cd flink-1.13.1
bin/sql-client.sh embedded -s yarn-session -j ./lib/hudi-flink-bundle_2.12-0.10.0-SNAPSHOT.jar shell

flink读取mysql binlog并写入kafka

创建mysql源表

create table stu8_binlog(
  id bigint not null,
  name string,
  school string,
  nickname string,
  age int not null,
  score decimal(4,2) not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string
) with (
  'connector' = 'mysql-cdc',
  'hostname' = 'hadoop',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'database-name' = 'test',
  'table-name' = 'stu8'
);

创建kafka目标表

create table stu8_binlog_sink_kafka(
  id bigint not null,
  name string,
  school string,
  nickname string,
  age int not null,
  score decimal(4,2) not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string,
  primary key (id) not enforced
) with (
  'connector' = 'kafka'
  ,'topic' = 'cdc_mysql_test_stu8_sink'
  ,'properties.zookeeper.connect' = 'hadoop1:2181'
  ,'properties.bootstrap.servers' = 'hadoop1:9092'
  ,'format' = 'debezium-json'
);

创建任务将mysql binlog日志写入kafka

insert into stu8_binlog_sink_kafka
select * from stu8_binlog;

flink读取kafka数据并写入hudi数据湖

创建kafka源表

create table stu8_binlog_source_kafka(
  id bigint not null,
  name string,
  school string,
  nickname string,
  age int not null,
  score decimal(4,2) not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string
 ) with (
  'connector' = 'kafka',
  'topic' = 'cdc_mysql_test_stu8_sink',
  'properties.bootstrap.servers' = 'hadoop1:9092',
  'format' = 'debezium-json',
  'scan.startup.mode' = 'earliest-offset',
  'properties.group.id' = 'testGroup'
  );

创建hudi目标表

 create table stu8_binlog_sink_hudi(
  id bigint not null,
  name string,
  `school` string,
  nickname string,
  age int not null,
  score decimal(4,2) not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string,
  primary key (id) not enforced
)
 partitioned by (`school`)
 with (
  'connector' = 'hudi',
  'path' = 'hdfs://hadoop:9000/tmp/test_stu8_binlog_sink_hudi',
  'table.type' = 'MERGE_ON_READ',
  'write.precombine.field' = 'school'
  );

创建任务将kafka数据写入到hudi中

insert into stu8_binlog_sink_hudi
select * from  stu8_binlog_source_kafka;

待任务运行一段时间后,我们手动保存hudi作业并停止任务

bin/flink stop --savepointPath hdfs://hadoop:9000/flink-1.13-savepoint/ 0128b183276022367e15b017cb682d61 -yid application_1633660054258_0001

得到该任务的具体savepoint路径:

[root@hadoop flink]# bin/flink stop --savepointPath hdfs://hadoop:9000/flink-1.13-savepoint/ 0128b183276022367e15b017cb682d61 -yid application_1633660054258_0001
2021-10-07 22:55:51,171 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Found Yarn properties file under /tmp/.yarn-properties-root.
2021-10-07 22:55:51,171 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Found Yarn properties file under /tmp/.yarn-properties-root.
Suspending job "0128b183276022367e15b017cb682d61" with a savepoint.
2021-10-07 22:55:51,317 WARN  org.apache.flink.yarn.configuration.YarnLogConfigUtil        [] - The configuration directory ('/data/flink-1.13.1/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2021-10-07 22:55:51,364 INFO  org.apache.hadoop.yarn.client.RMProxy                        [] - Connecting to ResourceManager at hadoop/192.168.241.128:8032
2021-10-07 22:55:51,504 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2021-10-07 22:55:51,506 WARN  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set.The Flink YARN Client needs one of these to be set to properly load the Hadoop configuration for accessing YARN.
2021-10-07 22:55:51,567 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface hadoop:34681 of application 'application_1633660054258_0001'.
Savepoint completed. Path: hdfs://hadoop:9000/flink-1.13-savepoint/savepoint-0128b1-8970a7371adb

从savepoint恢复任务:(在Flink SQL Client执行)

 SET execution.savepoint.path=hdfs://hadoop:9000/flink-1.13-savepoint/savepoint-0128b1-8970a7371adb
 
 insert into stu8_binlog_sink_hudi
select * from  stu8_binlog_source_kafka;

可以看到该任务从上述检查点恢复:

最新hudi基础入门、实战经验、进阶提升、经典案例,请参考:

hudi | 从大数据到人工智能

hudi-1
hudi-2

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
4月前
|
SQL 存储 API
Flink实践:通过Flink SQL进行SFTP文件的读写操作
虽然 Apache Flink 与 SFTP 之间的直接交互存在一定的限制,但通过一些创造性的方法和技术,我们仍然可以有效地实现对 SFTP 文件的读写操作。这既展现了 Flink 在处理复杂数据场景中的强大能力,也体现了软件工程中常见的问题解决思路——即通过现有工具和一定的间接方法来克服技术障碍。通过这种方式,Flink SQL 成为了处理各种数据源,包括 SFTP 文件,在内的强大工具。
224 15
|
3月前
|
消息中间件 分布式计算 大数据
大数据-123 - Flink 并行度 相关概念 全局、作业、算子、Slot并行度 Flink并行度设置与测试
大数据-123 - Flink 并行度 相关概念 全局、作业、算子、Slot并行度 Flink并行度设置与测试
200 0
|
1月前
|
SQL 存储 缓存
Flink SQL Deduplication 去重以及如何获取最新状态操作
Flink SQL Deduplication 是一种高效的数据去重功能,支持多种数据类型和灵活的配置选项。它通过哈希表、时间窗口和状态管理等技术实现去重,适用于流处理和批处理场景。本文介绍了其特性、原理、实际案例及源码分析,帮助读者更好地理解和应用这一功能。
125 14
|
3月前
|
SQL 大数据 API
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
69 0
|
3月前
|
消息中间件 分布式计算 大数据
大数据-128 - Flink 并行度设置 细节详解 全局、作业、算子、Slot
大数据-128 - Flink 并行度设置 细节详解 全局、作业、算子、Slot
196 0
|
4月前
|
SQL 安全 数据处理
揭秘数据脱敏神器:Flink SQL的神秘力量,守护你的数据宝藏!
【9月更文挑战第7天】在大数据时代,数据管理和处理尤为重要,尤其在保障数据安全与隐私方面。本文探讨如何利用Flink SQL实现数据脱敏,为实时数据处理提供有效的隐私保护方案。数据脱敏涉及在处理、存储或传输前对敏感数据进行加密、遮蔽或替换,以遵守数据保护法规(如GDPR)。Flink SQL通过内置函数和表达式支持这一过程。
103 2
|
4月前
|
SQL 大数据 数据处理
奇迹降临!解锁 Flink SQL 简单高效的终极秘籍,开启数据处理的传奇之旅!
【9月更文挑战第7天】在大数据处理领域,Flink SQL 因其强大功能与简洁语法成为开发者首选。本文分享了编写高效 Flink SQL 的实用技巧:理解数据特征及业务需求;灵活运用窗口函数(如 TUMBLE 和 HOP);优化连接操作,优先采用等值连接;合理选择数据类型以减少计算资源消耗。结合实际案例(如实时电商数据分析),并通过定期性能测试与调优,助力开发者在大数据处理中更得心应手,挖掘更多价值信息。
59 1
|
4月前
|
关系型数据库 MySQL 网络安全
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
|
6月前
|
SQL 存储 监控
SQL Server的并行实施如何优化?
【7月更文挑战第23天】SQL Server的并行实施如何优化?
155 13