Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】(2)

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
实时计算 Flink 版,5000CU*H 3个月
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
简介: Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】

Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】(1)https://developer.aliyun.com/article/1532333

2、sql-client 使用 savepoint

1)提交一个insert作业,可以给作业设置名称

Flink SQL> create table sink(
> id int,
> ts bigint,
> vc int
> )with(
> 'connector' = 'print'
> );
insert into sink select * from source;

2)查看 job 列表

查看 job 列表是为了获得 job id,我们提交作业的时候会返回一个 job id 可以在 shell 命令行看到,或者从 web ui 端也可以看到,再或者通过下面的命令看:

show jobs;

3)停止作业,触发 savepoint

SET state.checkpoints.dir='hdfs://hadoop102:8020/chk';
SET state.savepoints.dir='hdfs://hadoop102:8020/sp';
-- 结束作业不设置保存点
stop job 'e6d3e9afed97aee7819c460a6e109445';
-- 结束作业设置保存点
stop job 'e6d3e9afed97aee7819c460a6e109445' with savepoint;

4)从 savepoint 恢复

-- 设置从savepoint恢复的路径 
SET execution.savepoint.path='hdfs://hadoop102:8020/sp/savepoint-0e0742-7e2154873185';  
 
-- 之后直接提交sql,就会从savepoint恢复
 
--允许跳过无法还原的保存点状态
set 'execution.savepoint.ignore-unclaimed-state' = 'true'; 

5)恢复后重置路径

注意:我们设置 savepoint 恢复路径后,之后的所有 insert 任务都会默认使用这个 savepoint,所以下一个作业一定要重置这个配置参数:

指定execution.savepoint.path后,将影响后面执行的所有DML语句,可以使用RESET命令重置这个配置选项。

RESET execution.savepoint.path;

如果出现reset没生效的问题,可能是个bug(包括 pipeline.name 这个参数也是),我们可以退出sql-client,再重新进,不需要重启flink的集群。

3、CateLog

       Catalog 提供了元数据信息,例如数据库、表、分区、视图以及数据库或其他外部系统中存储的函数和信息。它本来翻译过来就是目录,我们可以理解为它就是数据库的目录。

       数据处理最关键的方面之一是管理元数据。元数据可以是临时的,例如临时表、UDF。我们之前上面使用的表都是基于内存的一个 Catelog ,所以每次我们退出 sql-client 客户端的时候,这些表和数据库就不见了。元数据也可以是持久化的,例如 Hive MetaStore 中的元数据。Catalog 提供了一个统一的API,用于管理元数据,并使其可以从 Table API 和 SQL 查询语句中来访问。

       Catalog 允许用户引用其数据存储系统中现有的元数据,并自动将其映射到 Flink 的相应元数据。例如,Flink 可以直接使用 Hive MetaStore 中的表的元数据,不必在Flink中手动重写ddl,也可以将 Flink SQL 中的元数据存储到 Hive MetaStore 中。Catalog 极大地简化了用户开始使用 Flink 的步骤,并极大地提升了用户体验。

       注意:catalog 可以使得 mysql 、hive 和 flink 互通有无,互通就是可以操作读写(除了建表),而不是说只是在某个生命周期内起作用,只要连接上,flink 操作的就是实实在在的 hive 、mysql 本身,这才叫互通,而不是自嗨。

3.1、CateLog 类型

目前 Flink 包含了以下四种 Catalog:

  • GenericInMemoryCatalog:基于内存实现的 Catalog,所有元数据只在session 的生命周期(即一个 Flink 任务一次运行生命周期内)内可用。默认自动创建,会有名为“default_catalog”的内存Catalog,这个Catalog默认只有一个名为“default_database”的数据库。
  • JdbcCatalog:JdbcCatalog 使得用户可以将 Flink 通过 JDBC 协议连接到关系数据库。Postgres Catalog和MySQL Catalog是目前仅有的两种JDBC Catalog实现,将元数据存储在数据库中。
  • HiveCatalog:有两个用途,一是单纯作为 Flink 元数据的持久化存储,二是作为读写现有 Hive 元数据的接口。注意:Hive MetaStore 以小写形式存储所有元数据对象名称。Hive Metastore以小写形式存储所有元对象名称,而 GenericInMemoryCatalog会区分大小写。
  • 用户自定义 Catalog:用户可以实现 Catalog 接口实现自定义 Catalog。从Flink1.16开始引入了用户类加载器,通过CatalogFactory.Context#getClassLoader访问,否则会报错ClassNotFoundException。

3.2、JdbcCatalog(MySQL)

JdbcCatalog不支持建表,只是打通flink与mysql的连接,可以去读写mysql现有的库表。

1)上传所需jar包到lib下

  • flink-connector-jdbc-1.17-20230109.003314-120.jar
  • mysql-connector-j-5.1.7.jar

注意:Flink 是冷加载,所以上传后需要重启 yarn-session 和 sql-client

2)创建Catalog

JdbcCatalog支持以下选项:

  • name:必需,Catalog名称。
  • default-database:必需,连接到的默认数据库。
  • username: 必需,Postgres/MySQL帐户的用户名。
  • password:必需,该帐号的密码。
  • base-url:必需,数据库的jdbc url(不包含数据库名)

对于Postgres Catalog,是"jdbc:postgresql://:<端口>"

对于MySQL Catalog,是"jdbc: mysql://:<端口>"

CREATE CATALOG my_jdbc_catalog WITH(
    'type' = 'jdbc',
    -- 这里指定的只是默认使用的数据库 它会把所有数据库导进这个catalog下
    'default-database' = 'test',
    'username' = 'root',
    'password' = '123456',
    'base-url' = 'jdbc:mysql://hadoop102:3306'
);

3)查看 Catalog

show catalogs;

4)使用指定的 Catalog

use catalog my_jdbc_catalog;

我们发现,除了 mysql 的系统数据库看不到,别的都别导进来了。

我们也可以直接往表中插入数据,而不用向之前那样去建立映射表:

insert into ws2 values(2,2,2);

注意:在 jdbcCatalog 下是不支持建表的,什么表都不行(映射表或者普通表)!

要建表需要返回到之前默认的 default_catalog 才可以,但是我们是可以从 jdbc_catalog 去查 default_catalog 下的表数据的。

select * from default_catalog.mydatabase.source;

此外,我们也可以把不同类型catalog下不同的表数据关联在一起:

select * from default_catalog.mydatabase.source s join my_jdbc_catalog.test.ws2 w on s.id=w.id;

最后,每次我们退出 sql-client 的时候,其实我们创建的 jdbc_catalog 还是会被删除的,所以我们最好把创建catalog这些命令写进一个 sql 文件,初始化启动 sql-client 的时候执行一下。

3.3、HiveCatalog

同样,HiveCatalog 可以打通所有 Hive 的库和表,这样我们就可以在 Flink 直接读写 Hive 表。此外,我们还可以在 catalog 下创建我们 Flink 的表,比如带有 Kafka 连接器的表,而且即使我们退出客户端,再次进去 HiveCatalog ,那张表还是存在的。

1)上传 jar 包

  • flink-sql-connector-hive-3.1.3_2.12-1.17.0.jar
  • mysql-connector-j-5.1.7.jar(我的 Hive 元数据存储在 MySQL)

2)更换planner依赖

只有在使用Hive方言或HiveServer2时才需要这样额外的计划器jar移动,但这是Hive集成的推荐设置。

这个我们之前使用 FileSystem 创建映射表的时候已经做过了。

3)重启flink集群和sql-client

4)启动外置的hive metastore服务

Hive metastore必须作为独立服务运行,也就是hive-site中必须配置hive.metastore.uris。(必须启动 hive 的元数据服务,不然我们flink无法获取hive中的数据)

# & 的意思是后台启动
# hive --service metastore &
# 这里直接启动我的 hive
hiveservice.sh start
# 查看hive 启动没有
hiveservice status

启动 hive 后会一直挂在那,我们可以判断一下元数据服务是否启动:

netstat -anp|grep 9083
# 或者
ps -ef|grep -i metastore

5)创建 Catalog

配置项

必需

默认值

类型

说明

type

Yes

(none)

String

Catalog类型,创建HiveCatalog时必须设置为'hive'。

name

Yes

(none)

String

Catalog的唯一名称

hive-conf-dir

No

(none)

String

包含hive -site.xml的目录,需要Hadoop文件系统支持。如果没指定hdfs协议,则认为是本地文件系统。如果不指定该选项,则在类路径中搜索hive-site.xml。

default-database

No

default

String

Hive Catalog使用的默认数据库

hive-version

No

(none)

String

HiveCatalog能够自动检测正在使用的Hive版本。建议不要指定Hive版本,除非自动检测失败。

hadoop-conf-dir

No

(none)

String

Hadoop conf目录的路径。只支持本地文件系统路径。设置Hadoop conf的推荐方法是通过HADOOP_CONF_DIR环境变量。只有当环境变量不适合你时才使用该选项,例如,如果你想分别配置每个HiveCatalog。

CREATE CATALOG myhive WITH (
    'type' = 'hive',
    'default-database' = 'default',
    'hive-conf-dir' = '/opt/module/hive-3.1.2/conf'
);

6)查看 catalog

我们在 hive 中创建一个数据库 test 再创建一张表 ws:

我们再往 ws 中插入一条数据:

hive(test)> insert into ws values(1,1,1);

Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】(3)https://developer.aliyun.com/article/1532337

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
3月前
|
SQL 消息中间件 关系型数据库
Apache Doris Flink Connector 24.0.0 版本正式发布
该版本新增了对 Flink 1.20 的支持,并支持通过 Arrow Flight SQL 高速读取 Doris 中数据。
|
3月前
|
SQL 存储 API
Flink实践:通过Flink SQL进行SFTP文件的读写操作
虽然 Apache Flink 与 SFTP 之间的直接交互存在一定的限制,但通过一些创造性的方法和技术,我们仍然可以有效地实现对 SFTP 文件的读写操作。这既展现了 Flink 在处理复杂数据场景中的强大能力,也体现了软件工程中常见的问题解决思路——即通过现有工具和一定的间接方法来克服技术障碍。通过这种方式,Flink SQL 成为了处理各种数据源,包括 SFTP 文件,在内的强大工具。
199 15
|
4天前
|
SQL 存储 缓存
Flink SQL Deduplication 去重以及如何获取最新状态操作
Flink SQL Deduplication 是一种高效的数据去重功能,支持多种数据类型和灵活的配置选项。它通过哈希表、时间窗口和状态管理等技术实现去重,适用于流处理和批处理场景。本文介绍了其特性、原理、实际案例及源码分析,帮助读者更好地理解和应用这一功能。
53 14
|
4月前
|
存储 API 网络架构
【Azure 存储服务】调用REST API获取Stroage Account Table中所有的Entity计数 -- Count
【Azure 存储服务】调用REST API获取Stroage Account Table中所有的Entity计数 -- Count
|
2月前
|
SQL 大数据 API
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
56 0
|
3月前
|
SQL 安全 数据处理
揭秘数据脱敏神器:Flink SQL的神秘力量,守护你的数据宝藏!
【9月更文挑战第7天】在大数据时代,数据管理和处理尤为重要,尤其在保障数据安全与隐私方面。本文探讨如何利用Flink SQL实现数据脱敏,为实时数据处理提供有效的隐私保护方案。数据脱敏涉及在处理、存储或传输前对敏感数据进行加密、遮蔽或替换,以遵守数据保护法规(如GDPR)。Flink SQL通过内置函数和表达式支持这一过程。
91 2
|
3月前
|
SQL 大数据 数据处理
奇迹降临!解锁 Flink SQL 简单高效的终极秘籍,开启数据处理的传奇之旅!
【9月更文挑战第7天】在大数据处理领域,Flink SQL 因其强大功能与简洁语法成为开发者首选。本文分享了编写高效 Flink SQL 的实用技巧:理解数据特征及业务需求;灵活运用窗口函数(如 TUMBLE 和 HOP);优化连接操作,优先采用等值连接;合理选择数据类型以减少计算资源消耗。结合实际案例(如实时电商数据分析),并通过定期性能测试与调优,助力开发者在大数据处理中更得心应手,挖掘更多价值信息。
52 1
|
4月前
|
存储 JSON API
【Azure 存储服务】使用REST API操作Azure Storage Table,删除数据(Delete Entity)
【Azure 存储服务】使用REST API操作Azure Storage Table,删除数据(Delete Entity)
【Azure 存储服务】使用REST API操作Azure Storage Table,删除数据(Delete Entity)
|
4月前
|
SQL 流计算
Flink SQL 在快手实践问题之由于meta信息变化导致的state向前兼容问题如何解决
Flink SQL 在快手实践问题之由于meta信息变化导致的state向前兼容问题如何解决
51 1
下一篇
DataWorks