Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】(2)

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
实时计算 Flink 版,5000CU*H 3个月
云数据库 RDS MySQL,集群版 2核4GB 100GB
推荐场景:
搭建个人博客
简介: Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】

Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】(1)https://developer.aliyun.com/article/1532333

2、sql-client 使用 savepoint

1)提交一个insert作业,可以给作业设置名称

Flink SQL> create table sink(
> id int,
> ts bigint,
> vc int
> )with(
> 'connector' = 'print'
> );
insert into sink select * from source;

2)查看 job 列表

查看 job 列表是为了获得 job id,我们提交作业的时候会返回一个 job id 可以在 shell 命令行看到,或者从 web ui 端也可以看到,再或者通过下面的命令看:

show jobs;

3)停止作业,触发 savepoint

SET state.checkpoints.dir='hdfs://hadoop102:8020/chk';
SET state.savepoints.dir='hdfs://hadoop102:8020/sp';
-- 结束作业不设置保存点
stop job 'e6d3e9afed97aee7819c460a6e109445';
-- 结束作业设置保存点
stop job 'e6d3e9afed97aee7819c460a6e109445' with savepoint;

4)从 savepoint 恢复

-- 设置从savepoint恢复的路径 
SET execution.savepoint.path='hdfs://hadoop102:8020/sp/savepoint-0e0742-7e2154873185';  
 
-- 之后直接提交sql,就会从savepoint恢复
 
--允许跳过无法还原的保存点状态
set 'execution.savepoint.ignore-unclaimed-state' = 'true'; 

5)恢复后重置路径

注意:我们设置 savepoint 恢复路径后,之后的所有 insert 任务都会默认使用这个 savepoint,所以下一个作业一定要重置这个配置参数:

指定execution.savepoint.path后,将影响后面执行的所有DML语句,可以使用RESET命令重置这个配置选项。

RESET execution.savepoint.path;

如果出现reset没生效的问题,可能是个bug(包括 pipeline.name 这个参数也是),我们可以退出sql-client,再重新进,不需要重启flink的集群。

3、CateLog

       Catalog 提供了元数据信息,例如数据库、表、分区、视图以及数据库或其他外部系统中存储的函数和信息。它本来翻译过来就是目录,我们可以理解为它就是数据库的目录。

       数据处理最关键的方面之一是管理元数据。元数据可以是临时的,例如临时表、UDF。我们之前上面使用的表都是基于内存的一个 Catelog ,所以每次我们退出 sql-client 客户端的时候,这些表和数据库就不见了。元数据也可以是持久化的,例如 Hive MetaStore 中的元数据。Catalog 提供了一个统一的API,用于管理元数据,并使其可以从 Table API 和 SQL 查询语句中来访问。

       Catalog 允许用户引用其数据存储系统中现有的元数据,并自动将其映射到 Flink 的相应元数据。例如,Flink 可以直接使用 Hive MetaStore 中的表的元数据,不必在Flink中手动重写ddl,也可以将 Flink SQL 中的元数据存储到 Hive MetaStore 中。Catalog 极大地简化了用户开始使用 Flink 的步骤,并极大地提升了用户体验。

       注意:catalog 可以使得 mysql 、hive 和 flink 互通有无,互通就是可以操作读写(除了建表),而不是说只是在某个生命周期内起作用,只要连接上,flink 操作的就是实实在在的 hive 、mysql 本身,这才叫互通,而不是自嗨。

3.1、CateLog 类型

目前 Flink 包含了以下四种 Catalog:

  • GenericInMemoryCatalog:基于内存实现的 Catalog,所有元数据只在session 的生命周期(即一个 Flink 任务一次运行生命周期内)内可用。默认自动创建,会有名为“default_catalog”的内存Catalog,这个Catalog默认只有一个名为“default_database”的数据库。
  • JdbcCatalog:JdbcCatalog 使得用户可以将 Flink 通过 JDBC 协议连接到关系数据库。Postgres Catalog和MySQL Catalog是目前仅有的两种JDBC Catalog实现,将元数据存储在数据库中。
  • HiveCatalog:有两个用途,一是单纯作为 Flink 元数据的持久化存储,二是作为读写现有 Hive 元数据的接口。注意:Hive MetaStore 以小写形式存储所有元数据对象名称。Hive Metastore以小写形式存储所有元对象名称,而 GenericInMemoryCatalog会区分大小写。
  • 用户自定义 Catalog:用户可以实现 Catalog 接口实现自定义 Catalog。从Flink1.16开始引入了用户类加载器,通过CatalogFactory.Context#getClassLoader访问,否则会报错ClassNotFoundException。

3.2、JdbcCatalog(MySQL)

JdbcCatalog不支持建表,只是打通flink与mysql的连接,可以去读写mysql现有的库表。

1)上传所需jar包到lib下

  • flink-connector-jdbc-1.17-20230109.003314-120.jar
  • mysql-connector-j-5.1.7.jar

注意:Flink 是冷加载,所以上传后需要重启 yarn-session 和 sql-client

2)创建Catalog

JdbcCatalog支持以下选项:

  • name:必需,Catalog名称。
  • default-database:必需,连接到的默认数据库。
  • username: 必需,Postgres/MySQL帐户的用户名。
  • password:必需,该帐号的密码。
  • base-url:必需,数据库的jdbc url(不包含数据库名)

对于Postgres Catalog,是"jdbc:postgresql://:<端口>"

对于MySQL Catalog,是"jdbc: mysql://:<端口>"

CREATE CATALOG my_jdbc_catalog WITH(
    'type' = 'jdbc',
    -- 这里指定的只是默认使用的数据库 它会把所有数据库导进这个catalog下
    'default-database' = 'test',
    'username' = 'root',
    'password' = '123456',
    'base-url' = 'jdbc:mysql://hadoop102:3306'
);

3)查看 Catalog

show catalogs;

4)使用指定的 Catalog

use catalog my_jdbc_catalog;

我们发现,除了 mysql 的系统数据库看不到,别的都别导进来了。

我们也可以直接往表中插入数据,而不用向之前那样去建立映射表:

insert into ws2 values(2,2,2);

注意:在 jdbcCatalog 下是不支持建表的,什么表都不行(映射表或者普通表)!

要建表需要返回到之前默认的 default_catalog 才可以,但是我们是可以从 jdbc_catalog 去查 default_catalog 下的表数据的。

select * from default_catalog.mydatabase.source;

此外,我们也可以把不同类型catalog下不同的表数据关联在一起:

select * from default_catalog.mydatabase.source s join my_jdbc_catalog.test.ws2 w on s.id=w.id;

最后,每次我们退出 sql-client 的时候,其实我们创建的 jdbc_catalog 还是会被删除的,所以我们最好把创建catalog这些命令写进一个 sql 文件,初始化启动 sql-client 的时候执行一下。

3.3、HiveCatalog

同样,HiveCatalog 可以打通所有 Hive 的库和表,这样我们就可以在 Flink 直接读写 Hive 表。此外,我们还可以在 catalog 下创建我们 Flink 的表,比如带有 Kafka 连接器的表,而且即使我们退出客户端,再次进去 HiveCatalog ,那张表还是存在的。

1)上传 jar 包

  • flink-sql-connector-hive-3.1.3_2.12-1.17.0.jar
  • mysql-connector-j-5.1.7.jar(我的 Hive 元数据存储在 MySQL)

2)更换planner依赖

只有在使用Hive方言或HiveServer2时才需要这样额外的计划器jar移动,但这是Hive集成的推荐设置。

这个我们之前使用 FileSystem 创建映射表的时候已经做过了。

3)重启flink集群和sql-client

4)启动外置的hive metastore服务

Hive metastore必须作为独立服务运行,也就是hive-site中必须配置hive.metastore.uris。(必须启动 hive 的元数据服务,不然我们flink无法获取hive中的数据)

# & 的意思是后台启动
# hive --service metastore &
# 这里直接启动我的 hive
hiveservice.sh start
# 查看hive 启动没有
hiveservice status

启动 hive 后会一直挂在那,我们可以判断一下元数据服务是否启动:

netstat -anp|grep 9083
# 或者
ps -ef|grep -i metastore

5)创建 Catalog

配置项

必需

默认值

类型

说明

type

Yes

(none)

String

Catalog类型,创建HiveCatalog时必须设置为'hive'。

name

Yes

(none)

String

Catalog的唯一名称

hive-conf-dir

No

(none)

String

包含hive -site.xml的目录,需要Hadoop文件系统支持。如果没指定hdfs协议,则认为是本地文件系统。如果不指定该选项,则在类路径中搜索hive-site.xml。

default-database

No

default

String

Hive Catalog使用的默认数据库

hive-version

No

(none)

String

HiveCatalog能够自动检测正在使用的Hive版本。建议不要指定Hive版本,除非自动检测失败。

hadoop-conf-dir

No

(none)

String

Hadoop conf目录的路径。只支持本地文件系统路径。设置Hadoop conf的推荐方法是通过HADOOP_CONF_DIR环境变量。只有当环境变量不适合你时才使用该选项,例如,如果你想分别配置每个HiveCatalog。

CREATE CATALOG myhive WITH (
    'type' = 'hive',
    'default-database' = 'default',
    'hive-conf-dir' = '/opt/module/hive-3.1.2/conf'
);

6)查看 catalog

我们在 hive 中创建一个数据库 test 再创建一张表 ws:

我们再往 ws 中插入一条数据:

hive(test)> insert into ws values(1,1,1);

Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】(3)https://developer.aliyun.com/article/1532337

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
28天前
|
SQL 关系型数据库 API
实时计算 Flink版产品使用问题之如何使用stream api
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
28天前
|
NoSQL 关系型数据库 Java
实时计算 Flink版产品使用问题之如何使用Flink MongoDB Connector连接MongoDB
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
26天前
|
关系型数据库 数据库 流计算
实时计算 Flink版操作报错合集之在使用Flink CDC TiDB Connector时,无法获取到事件,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
379 0
|
26天前
|
Kubernetes Oracle 关系型数据库
实时计算 Flink版操作报错合集之用dinky在k8s上提交作业,会报错:Caused by: org.apache.flink.table.api.ValidationException:,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
139 0
|
26天前
|
SQL 关系型数据库 数据库
实时计算 Flink版操作报错合集之在本地执行代码没有问题,但是在线执行sql命令就会报错,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
100 0
|
26天前
|
SQL 关系型数据库 MySQL
实时计算 Flink版操作报错合集之在执行SQL语句时遇到了类找不到,该怎么解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
102 0
|
1月前
|
SQL JSON 分布式计算
|
1月前
|
SQL 分布式计算 Java
|
SQL API 流计算
Flink关系型API简介
在接触关系型API之前,用户通常会采用DataStream、DataSet API来编写Flink程序,它们都提供了丰富的处理能力,以DataStream为例,它有如下这些优点: 富有表现力的流处理,包括但不限于:转换数据,更新状态,定义窗口、聚合,事件时间语义,有状态且保证正确性等; 高度自定义.
1569 0
|
26天前
|
消息中间件 Java 关系型数据库
实时计算 Flink版操作报错合集之从 PostgreSQL 读取数据并写入 Kafka 时,遇到 "initial slot snapshot too large" 的错误,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
923 0