Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】(1)https://developer.aliyun.com/article/1532333
2、sql-client 使用 savepoint
1)提交一个insert作业,可以给作业设置名称
Flink SQL> create table sink( > id int, > ts bigint, > vc int > )with( > 'connector' = 'print' > );
insert into sink select * from source;
2)查看 job 列表
查看 job 列表是为了获得 job id,我们提交作业的时候会返回一个 job id 可以在 shell 命令行看到,或者从 web ui 端也可以看到,再或者通过下面的命令看:
show jobs;
3)停止作业,触发 savepoint
SET state.checkpoints.dir='hdfs://hadoop102:8020/chk'; SET state.savepoints.dir='hdfs://hadoop102:8020/sp'; -- 结束作业不设置保存点 stop job 'e6d3e9afed97aee7819c460a6e109445'; -- 结束作业设置保存点 stop job 'e6d3e9afed97aee7819c460a6e109445' with savepoint;
4)从 savepoint 恢复
-- 设置从savepoint恢复的路径 SET execution.savepoint.path='hdfs://hadoop102:8020/sp/savepoint-0e0742-7e2154873185'; -- 之后直接提交sql,就会从savepoint恢复 --允许跳过无法还原的保存点状态 set 'execution.savepoint.ignore-unclaimed-state' = 'true';
5)恢复后重置路径
注意:我们设置 savepoint 恢复路径后,之后的所有 insert 任务都会默认使用这个 savepoint,所以下一个作业一定要重置这个配置参数:
指定execution.savepoint.path后,将影响后面执行的所有DML语句,可以使用RESET命令重置这个配置选项。
RESET execution.savepoint.path;
如果出现reset没生效的问题,可能是个bug(包括 pipeline.name 这个参数也是),我们可以退出sql-client,再重新进,不需要重启flink的集群。
3、CateLog
Catalog 提供了元数据信息,例如数据库、表、分区、视图以及数据库或其他外部系统中存储的函数和信息。它本来翻译过来就是目录,我们可以理解为它就是数据库的目录。
数据处理最关键的方面之一是管理元数据。元数据可以是临时的,例如临时表、UDF。我们之前上面使用的表都是基于内存的一个 Catelog ,所以每次我们退出 sql-client 客户端的时候,这些表和数据库就不见了。元数据也可以是持久化的,例如 Hive MetaStore 中的元数据。Catalog 提供了一个统一的API,用于管理元数据,并使其可以从 Table API 和 SQL 查询语句中来访问。
Catalog 允许用户引用其数据存储系统中现有的元数据,并自动将其映射到 Flink 的相应元数据。例如,Flink 可以直接使用 Hive MetaStore 中的表的元数据,不必在Flink中手动重写ddl,也可以将 Flink SQL 中的元数据存储到 Hive MetaStore 中。Catalog 极大地简化了用户开始使用 Flink 的步骤,并极大地提升了用户体验。
注意:catalog 可以使得 mysql 、hive 和 flink 互通有无,互通就是可以操作读写(除了建表),而不是说只是在某个生命周期内起作用,只要连接上,flink 操作的就是实实在在的 hive 、mysql 本身,这才叫互通,而不是自嗨。
3.1、CateLog 类型
目前 Flink 包含了以下四种 Catalog:
- GenericInMemoryCatalog:基于内存实现的 Catalog,所有元数据只在session 的生命周期(即一个 Flink 任务一次运行生命周期内)内可用。默认自动创建,会有名为“default_catalog”的内存Catalog,这个Catalog默认只有一个名为“default_database”的数据库。
- JdbcCatalog:JdbcCatalog 使得用户可以将 Flink 通过 JDBC 协议连接到关系数据库。Postgres Catalog和MySQL Catalog是目前仅有的两种JDBC Catalog实现,将元数据存储在数据库中。
- HiveCatalog:有两个用途,一是单纯作为 Flink 元数据的持久化存储,二是作为读写现有 Hive 元数据的接口。注意:Hive MetaStore 以小写形式存储所有元数据对象名称。Hive Metastore以小写形式存储所有元对象名称,而 GenericInMemoryCatalog会区分大小写。
- 用户自定义 Catalog:用户可以实现 Catalog 接口实现自定义 Catalog。从Flink1.16开始引入了用户类加载器,通过CatalogFactory.Context#getClassLoader访问,否则会报错ClassNotFoundException。
3.2、JdbcCatalog(MySQL)
JdbcCatalog不支持建表,只是打通flink与mysql的连接,可以去读写mysql现有的库表。
1)上传所需jar包到lib下
- flink-connector-jdbc-1.17-20230109.003314-120.jar
- mysql-connector-j-5.1.7.jar
注意:Flink 是冷加载,所以上传后需要重启 yarn-session 和 sql-client
2)创建Catalog
JdbcCatalog支持以下选项:
- name:必需,Catalog名称。
- default-database:必需,连接到的默认数据库。
- username: 必需,Postgres/MySQL帐户的用户名。
- password:必需,该帐号的密码。
- base-url:必需,数据库的jdbc url(不包含数据库名)
对于Postgres Catalog,是"jdbc:postgresql://:<端口>"
对于MySQL Catalog,是"jdbc: mysql://:<端口>"
CREATE CATALOG my_jdbc_catalog WITH( 'type' = 'jdbc', -- 这里指定的只是默认使用的数据库 它会把所有数据库导进这个catalog下 'default-database' = 'test', 'username' = 'root', 'password' = '123456', 'base-url' = 'jdbc:mysql://hadoop102:3306' );
3)查看 Catalog
show catalogs;
4)使用指定的 Catalog
use catalog my_jdbc_catalog;
我们发现,除了 mysql 的系统数据库看不到,别的都别导进来了。
我们也可以直接往表中插入数据,而不用向之前那样去建立映射表:
insert into ws2 values(2,2,2);
注意:在 jdbcCatalog 下是不支持建表的,什么表都不行(映射表或者普通表)!
要建表需要返回到之前默认的 default_catalog 才可以,但是我们是可以从 jdbc_catalog 去查 default_catalog 下的表数据的。
select * from default_catalog.mydatabase.source;
此外,我们也可以把不同类型catalog下不同的表数据关联在一起:
select * from default_catalog.mydatabase.source s join my_jdbc_catalog.test.ws2 w on s.id=w.id;
最后,每次我们退出 sql-client 的时候,其实我们创建的 jdbc_catalog 还是会被删除的,所以我们最好把创建catalog这些命令写进一个 sql 文件,初始化启动 sql-client 的时候执行一下。
3.3、HiveCatalog
同样,HiveCatalog 可以打通所有 Hive 的库和表,这样我们就可以在 Flink 直接读写 Hive 表。此外,我们还可以在 catalog 下创建我们 Flink 的表,比如带有 Kafka 连接器的表,而且即使我们退出客户端,再次进去 HiveCatalog ,那张表还是存在的。
1)上传 jar 包
- flink-sql-connector-hive-3.1.3_2.12-1.17.0.jar
- mysql-connector-j-5.1.7.jar(我的 Hive 元数据存储在 MySQL)
2)更换planner依赖
只有在使用Hive方言或HiveServer2时才需要这样额外的计划器jar移动,但这是Hive集成的推荐设置。
这个我们之前使用 FileSystem 创建映射表的时候已经做过了。
3)重启flink集群和sql-client
4)启动外置的hive metastore服务
Hive metastore必须作为独立服务运行,也就是hive-site中必须配置hive.metastore.uris。(必须启动 hive 的元数据服务,不然我们flink无法获取hive中的数据)
# & 的意思是后台启动 # hive --service metastore & # 这里直接启动我的 hive hiveservice.sh start # 查看hive 启动没有 hiveservice status
启动 hive 后会一直挂在那,我们可以判断一下元数据服务是否启动:
netstat -anp|grep 9083 # 或者 ps -ef|grep -i metastore
5)创建 Catalog
配置项 |
必需 |
默认值 |
类型 |
说明 |
type |
Yes |
(none) |
String |
Catalog类型,创建HiveCatalog时必须设置为'hive'。 |
name |
Yes |
(none) |
String |
Catalog的唯一名称 |
hive-conf-dir |
No |
(none) |
String |
包含hive -site.xml的目录,需要Hadoop文件系统支持。如果没指定hdfs协议,则认为是本地文件系统。如果不指定该选项,则在类路径中搜索hive-site.xml。 |
default-database |
No |
default |
String |
Hive Catalog使用的默认数据库 |
hive-version |
No |
(none) |
String |
HiveCatalog能够自动检测正在使用的Hive版本。建议不要指定Hive版本,除非自动检测失败。 |
hadoop-conf-dir |
No |
(none) |
String |
Hadoop conf目录的路径。只支持本地文件系统路径。设置Hadoop conf的推荐方法是通过HADOOP_CONF_DIR环境变量。只有当环境变量不适合你时才使用该选项,例如,如果你想分别配置每个HiveCatalog。 |
CREATE CATALOG myhive WITH ( 'type' = 'hive', 'default-database' = 'default', 'hive-conf-dir' = '/opt/module/hive-3.1.2/conf' );
6)查看 catalog
我们在 hive 中创建一个数据库 test 再创建一张表 ws:
我们再往 ws 中插入一条数据:
hive(test)> insert into ws values(1,1,1);
Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】(3)https://developer.aliyun.com/article/1532337