详解 Flink Catalog 在 ChunJun 中的实践之路

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群版 2核4GB 100GB
推荐场景:
搭建个人博客
简介: Catalog 提供元数据,以及访问存储在数据库或其他外部系统中的数据所需的函数和信息,能够极大简化用户现有系统开始使用 Flink 所需的步骤,并增强用户体验,本文将为大家带来 Flink Catalog 的介绍以及 Flink Catalog 在 ChunJun 中的实践之路详解。

我们知道 Flink 有Table(表)、View(视图)、Function(函数/算子)、Database(数据库)的概念,相对于这些耳熟能详的概念,Flink 里还有一个 Catalog(目录) 的概念。

本文将为大家带来 Flink Catalog 的介绍以及 Flink Catalog 在 ChunJun 中的实践之路。

Flink Catalog 简介

Catalog 提供元数据,如数据库、表、分区、视图,以及访问存储在数据库或其他外部系统中的数据所需的函数和信息。

Flink Catalog 作用

数据处理中最关键的一个方面是管理元数据:

· 可能是暂时性的元数据,如临时表,或针对表环境注册的 UDFs;

· 或者是永久性的元数据,比如 Hive 元存储中的元数据。

Catalog 提供了一个统一的 API 来管理元数据,并使其可以从表 API 和 SQL 查询语句中来访问。

Catalog 使用户能够引用他们数据系统中的现有元数据,并自动将它们映射到 Flink 的相应元数据。例如,Flink 可以将 JDBC 表自动映射到 Flink 表,用户不必在 Flink 中手动重写 DDL。Catalog 大大简化了用户现有系统开始使用 Flink 所需的步骤,并增强了用户体验。

Flink Catalog 的结构

● Flink Catalog 原生结构

• GenericInMemoryCatalog:基于内存实现的 Catalog

• Jdbc Catalog:可以将 Flink 通过 JDBC 协议连接到关系数据库,目前 Flink 在1.12和1.13中有不同的实现,包括 MySql Catalog 和 Postgres Catalog

• Hive Catalog:作为原生 Flink 元数据的持久化存储,以及作为读写现有 Hive 元数据的接口

● Flink Iceberg Catalog

● Flink Hudi Catalog

HoodieCatalog、HoodieHiveCatalog

image.png

image.png

Flink Catalog 详解

GenericInMemoryCatalog

final CatalogManager catalogManager =
        CatalogManager.newBuilder()
                .classLoader(userClassLoader)
                .config(tableConfig)
                .defaultCatalog(
                        settings.getBuiltInCatalogName(),
                        new GenericInMemoryCatalog(
                                settings.getBuiltInCatalogName(),
                                settings.getBuiltInDatabaseName()))
                .build();
defaultCatalog =
                    new GenericInMemoryCatalog(
                            defaultCatalogName, settings.getBuiltInDatabaseName());
CatalogManager catalogManager =
                builder.defaultCatalog(defaultCatalogName, defaultCatalog).build();

GenericInMemoryCatalog 所有的数据都保存在 HashMap 里面,无法持久化。

JDBC Catalog

CREATE CATALOG my_catalog WITH(
    'type' = 'jdbc',
    'default-database' = '...',
    'username' = '...',
    'password' = '...',
    'base-url' = '...'
);
USE CATALOG my_catalog;

如果创建并使用 Postgres Catalog 或 MySQL Catalog,请配置 JDBC 连接器和相应的驱动。

JDBC Catalog 支持以下参数:

• name:必填,Catalog 的名称

• default-database:必填,默认要连接的数据库

• username:必填,Postgres/MySQL 账户的用户名

• password:必填,账户的密码

• base-url: 必填,(不应该包含数据库名)

对于 Postgres Catalog base-url 应为 "jdbc:postgresql://:" 的格式

对于 MySQL   Catalog base-url 应为 "jdbc:mysql://:" 的格式

Hive Catalog

CREATE CATALOG myhive WITH (
    'type' = 'hive',
    'default-database' = 'mydatabase',
    'hive-conf-dir' = '/opt/hive-conf'
);
-- set the HiveCatalog as the current catalog of the session
USE CATALOG myhive;

image.png

Iceberg Catalog

● Hive Catalog 管理 Iceberg 表

(Flink) default_database.flink_table -> 
(Iceberg) default_database.flink_table
CREATE TABLE flink_table (
    id   BIGINT,
    data STRING
) WITH (
    'connector'='iceberg',
    'catalog-name'='hive_prod',
    'uri'='thrift://localhost:9083',
    'warehouse'='hdfs://nn:8020/path/to/warehouse'
);
(Flink)default_database.flink_table -> 
(Iceberg) hive_db.hive_iceberg_table
CREATE TABLE flink_table (
    id   BIGINT,
    data STRING
) WITH (
    'connector'='iceberg',
    'catalog-name'='hive_prod',
    'catalog-database'='hive_db',
    'catalog-table'='hive_iceberg_table',
    'uri'='thrift://localhost:9083',
    'warehouse'='hdfs://nn:8020/path/to/warehouse'
);

● Hadoop Catalog 管理 Iceberg 表

CREATE TABLE flink_table (
    id   BIGINT,
    data STRING
) WITH (
    'connector'='iceberg',
    'catalog-name'='hadoop_prod',
    'catalog-type'='hadoop',
    'warehouse'='hdfs://nn:8020/path/to/warehouse'
);

● 自定义 Catalog 管理 Iceberg 表

CREATE TABLE flink_table (
    id   BIGINT,
    data STRING
) WITH (
    'connector'='iceberg',
    'catalog-name'='custom_prod',
    'catalog-impl'='com.my.custom.CatalogImpl',
     -- More table properties for the customized catalog
    'my-additional-catalog-config'='my-value',
     ...
);

• connector:iceberg

• catalog-name:用户指定的目录名称,这是必须的,因为连接器没有任何默认值

• catalog-type:内置目录的 hive 或 hadoop(默认为hive),或者对于使用 catalog-impl 的自定义目录实现,不做设置

• catalog-impl:自定义目录实现的全限定类名,如果 catalog-type 没有被设置,则必须被设置,更多细节请参见自定义目录

• catalog-database: 后台目录中的 iceberg 数据库名称,默认使用当前的 Flink 数据库名称

• catalog-table: 后台目录中的冰山表名,默认使用 Flink CREATE TABLE 句子中的表名

Hudi Catalog

create catalog hudi with(
 'type' = 'hudi',
 'mode' = 'hms',
  'hive.conf.dir'='/etc/hive/conf'
);
--- 创建数据库供hudi使用
create database hudi.hudidb;
--- order表
CREATE TABLE hudi.hudidb.orders_hudi(
  uuid INT,
  ts INT,
  num INT,
  PRIMARY KEY(uuid) NOT ENFORCED
) WITH (
  'connector' = 'hudi',
  'table.type' = 'MERGE_ON_READ'
);
select * from hudi.hudidb.orders_hudi;

image.png

image.png

Flink Catalog 在 ChunJun 中的实践

下面将为大家介绍本文的重头戏,Flink Catalog 在 ChunJun 中的实践之路。

直接引入开源 Catalog

ChunJun 目前的所有 Catalog 为以下四种:

image.png

● Hive Catalog 需要的依赖

image.png

● Iceberg Catalog 需要的依赖

image.png

● JDBC Catalog

JDBC 因为 Flink 1.12 和 1.13 API 有变化,因此需要涉及源码的改动,改动一些 API 后,从源码引入。

● DT Catalog

结合内部业务,自定义的一种 Catalog ,下文将会进行详细介绍。

DT Catalog -存储元数据表设计

● 创建 mysql 元数据表 database_info

-- 创建表的 sql
create table database_info
(
    `id`            bigint PRIMARY KEY NOT NULL AUTO_INCREMENT COMMENT '项目ID',-- database id
    `catalog_name`  varchar(255) COMMENT 'catalog 名字',
    `database_name` varchar(255) COMMENT 'database 名字',
    `catalog_type`  varchar(30) COMMENT 'catalog 类型, eg: mysql,oracle...',
    `project_id`    int(11)            NOT NULL COMMENT '项目ID',
    `tenant_id`     int(11)            NOT NULL COMMENT '租户ID'
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8;
-- 创建索引
CREATE INDEX idx_catalog_name_database_name_project_id_tenant_id ON database_info (`catalog_name`, `database_name`, `project_id`, `tenant_id`);

● 创建 mysql 元数据表 table_info

-- 创建表的 sql
create table table_info
(
    `id`            bigint PRIMARY KEY NOT NULL AUTO_INCREMENT,
    `database_id`    bigint COMMENT 'database_info 表的 id',
    `table_name`  varchar(255) COMMENT '表名',
    `project_id`    int(11)            NOT NULL COMMENT '项目ID',
    `tenant_id`     int(11)            NOT NULL COMMENT '租户ID'
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8;
-- 创建索引
CREATE INDEX idx_catalog_id_project_id_tenant_id ON table_info (`database_id`, `project_id`, `tenant_id`);
CREATE INDEX idx_database_id_table_name_project_id_tenant_id ON table_info (`database_id`, `table_name`, `project_id`, `tenant_id`);

● 创建 mysql 元数据表 properties_info

create table properties_info
(
    `id`       bigint PRIMARY KEY NOT NULL AUTO_INCREMENT ,
    `table_id` bigint(20) COMMENT 'table_info 表的 id',
    `key`      varchar(255) COMMENT '表的属性 key',
    `value`    varchar(255) COMMENT '表的属性 value'
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8;
CREATE INDEX idx_table_id ON properties_info (table_id);

● properties_info 里面存了什么?

schema.0.name=id,
  schema.0.data-type=INT NOT NULL,
  schema.1.name=name,
  schema.1.data-type=VARCHAR(2147483647)
  schema.2.name=age,
  schema.2.data-type=BIGINT,
  schema.primary-key.name=PK_3386,
  schema.primary-key.columns=id,
  connector=jdbc,
  url=jdbc:mysql: //172.16.83.218:3306/wujuan?useSSL=false,
  username=drpeco,
  password=DT@Stack#123,
  comment=,
  scan.auto-commit=true,
  lookup.cache.max-rows=20000,
  scan.fetch-size=10,
  lookup.cache.ttl=700000
  table-name=t2,

使用 DT Catalog

● 创建 DT Catalog

CREATE CATALOG catalog1
WITH (
    'type' = 'dt',
    'default-database' = 'default_database',
    'driver' = 'com.mysql.cj.jdbc.Driver',
    'url' = 'jdbc:mysql://xxx:3306/catalog_default',
    'username' = 'drpeco',
    'password' = 'DT@Stack#123',
    'project-id' = '1',
    'tenant-id' = '1'
  );

image.png

● 创建 Database

DROP DATABASE [IF EXISTS] [catalog_name.]db_name [ (RESTRICT | CASCADE) ]
Drop a database with the given database name. If the database to drop does not exist, an exception is thrown.
IF EXISTS
If the database does not exist, nothing happens.
RESTRICT
Dropping a non-empty database triggers an exception. Enabled by default.
CASCADE
Dropping a non-empty database also drops all associated tables and functions.
create database if not exists catalog1.database1
drop database if exists catalog1.database1 
-- 删除非空数据库,连通数据库中的所有表也一起删除
drop database if exists catalog1.database1 CASCADE

● 创建 Table

1)Rename Table

ALTER TABLE [catalog_name.][db_name.]table_name RENAME TO new_table_name
Rename the given table name to another new table name

2)Set or Alter Table Properties

ALTER TABLE [catalog_name.][db_name.]table_name SET (key1=val1, key2=val2, ...) 
Set one or more properties in the specified table. If a particular property is already set in the table, override the old value with the new one.
-- 创建表
CREATE TABLE if not exists catalog1.default_database.table1
(
    id      int,
    name    string,
    age     bigint,
    primary key ( id) not enforced
) with (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://172.16.83.218:3306/wujuan?useSSL=false',
    'table-name' = 't2',
    'username' = 'drpeco',
    'password' = 'DT@Stack#123'
);
-- 删除表
drop table if exists mysql_catalog2.wujuan_database2.wujuan_table
-- 重命名表名
ALTER TABLE catalog1.default_database.table1 RENAME TO table2;
-- 设置表属性
ALTER TABLE catalog1.default_database.table1 
SET (
'tablename'='t2',
'url'='dbc:mysql://172.16.83.218:3306/wujuan?useSSL=false'
)

使用 DTCatalog 的具体场景和实现原理

● 全部是 DDL,只有 Catalog 的创建

· 可以执行,但是没有意义,ChunJun 不会存储 Catalog 信息,只有平台存储;

· 不支持语法校验。

CREATE CATALOG catalog1
WITH (
    'type' = 'DT',
    'default-database' = 'default_database',
    'driver' = 'com.mysql.cj.jdbc.Driver',
    'url' = 'jdbc:mysql://172.16.100.186:3306/catalog_default?autoReconnect=true&failOverReadOnly=false',
    'username' = 'drpeco',
    'password' = 'DT@Stack#123',
    'project-id' = '1',
    'tenant-id' = '1'
  );

● 全部是 DDL,包含 Catalog、Database、Table 的创建

· 无论创建数据库、表,删除数据库、表,必须包含 create catalog 语句;

· 可以执行,可以创建数据库和表;

· 不支持语法校验。

-- 初始化 Catalog
CREATE CATALOG catalog1
WITH (
    'type' = 'dt',
    'default-database' = 'default_database',
    'driver' = 'com.mysql.cj.jdbc.Driver',
    'url' = 'jdbc:mysql://172.16.100.186:3306/catalog_default',
    'username' = 'drpeco',
    'password' = 'DT@Stack#123',
    'project-id' = '1',
    'tenant-id' = '1'
  );
-- 创建数据库
create database if not exists database1
-- 创建表
CREATE TABLE if not exists catalog1.default_database.table1
(
    id      int,
    name    string,
    age     bigint,
    primary key ( id) not enforced
) with (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://172.16.83.218:3306/wujuan?useSSL=false',
    'table-name' = 't2',
    'username' = 'drpeco',
    'password' = 'DT@Stack#123'
);
// 抛出异常的逻辑
StatementSet statementSet = SqlParser.parseSql(job, jarUrlList, tEnv);
TableResult execute = statementSet.execute();       -->
tableEnvironment.executeInternal(operations);        -->
Pipeline pipeline = execEnv.createPipeline(transformations, tableConfig, jobName);     -->
StreamGraph streamGraph = ExecutorUtils.generateStreamGraph(getExecutionEnvironment(), transformations); -->
// 抛出异常的方法
public static StreamGraph generateStreamGraph(StreamExecutionEnvironment execEnv, List<Transformation<?>> transformations){
        if (transformations.size() <= 0) {
                throw new IllegalStateException(
                        "No operators defined in streaming topology. Cannot generate StreamGraph.");
        }
        ...
        return generator.generate();
}
// 如果没有 insert 语句的时候,无法生成 JobGraph,但是 DDL 是执行成功的。
// 因此捕获 FlinkX 抛出的特殊异常,此语句的异常 Message 是 FlinkX 里面处理的。
try {
    PackagedProgramUtils.createJobGraph(program, flinkConfig, 1, false);
} catch (ProgramInvocationException e) {
    // 仅执行 DDL FlinkX 抛出的异常
    if (!e.getMessage().contains("OnlyExecuteDDL")) {
        throw e;
    }
}

image.png

● DDL + DML,包含 create + insert 语句

1)初始化 Catalog

CREATE CATALOG catalog1
WITH (
    'type' = 'dt',
    'default-database' = 'default_database',
    'driver' = 'com.mysql.cj.jdbc.Driver',
    'url' = 'jdbc:mysql://172.16.100.186:3306/catalog_default',
    'username' = 'drpeco',
    'password' = 'DT@Stack#123',
    'project-id' = '1',
    'tenant-id' = '1'
  );

2.1)创建数据库

create database if not exists database1

2.2)创建源表

CREATE TABLE if not exists catalog1.default_database.table1
(
    id      int,
    name    string,
    age     bigint,
    primary key ( id) not enforced
) with (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://172.16.83.218:3306/wujuan?useSSL=false',
    'table-name' = 't2',
    'username' = 'drpeco',
    'password' = 'DT@Stack#123'
);

3.1)创建数据库

create database if not exists catalog1.database2;

3.2)创建结果表

CREATE TABLE if not exists catalog1.database2.table2
(
    id      int,
    name    string,
    age     bigint,
    primary key ( id) not enforced
) with (
 'connector' = 'print'
);

4)执行任务

insert into catalog1.database2.table2 select * from catalog1.database1.table1

· 不可以执行,可以提交;

· 支持语法校验。

● DML,只有 Insert 语句

-- 初始化 Catalog
CREATE CATALOG catalog1
WITH (
  'type' = 'dt',
  'default-database' = 'default_database',
  'driver' = 'com.mysql.cj.jdbc.Driver',
  'url' = 'jdbc:mysql://172.16.100.186:3306/catalog_default',
  'username' = 'drpeco',
  'password' = 'DT@Stack#123',
  'project-id' = '1',
  'tenant-id' = '1'
);
-- 执行任务
insert into catalog1.database2.table2 select * from catalog1.database1.table1
· 如果 Catalog 的 数据库和表都已经创建好了,那么直接写 insert 就可以提交任务;

· 不可以执行,可以提交;

· 支持语法校验。


《数据治理行业实践白皮书》下载地址:https://fs80.cn/380a4b

想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=szalykfz

同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术 qun」,交流最新开源技术信息,qun 号码:30537511,项目地址:https://github.com/DTStack

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
21天前
|
存储 监控 大数据
阿里云实时计算Flink在多行业的应用和实践
本文整理自 Flink Forward Asia 2023 中闭门会的分享。主要分享实时计算在各行业的应用实践,对回归实时计算的重点场景进行介绍以及企业如何使用实时计算技术,并且提供一些在技术架构上的参考建议。
580 7
阿里云实时计算Flink在多行业的应用和实践
|
26天前
|
SQL 缓存 监控
14个Flink SQL性能优化实践分享
【7月更文挑战第12天】 1. **合理设置并行度**: 根据数据量和资源调整以提高处理速度. 2. **优化数据源**: 使用分区表并进行预处理减少输入量. 3. **数据缓存**: 采用 `BROADCAST` 或 `REPARTITION` 缓存常用数据. 4. **索引和分区**: 创建索引并按常用字段分区. 5. **避免不必要的计算**: 检查并移除多余的计算步骤. 6. **调整内存配置**: 分配足够内存避免性能下降. 7. **优化连接操作**: 选择适合大表和小表的连接方式. 8. **数据类型优化**: 选择合适类型以节省资源. ........
|
1月前
|
SQL Java Serverless
实时计算 Flink版操作报错合集之在写入SLS(Serverless Log Service)时出现报错,该如何排查
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
1月前
|
数据采集 运维 Cloud Native
Flink+Paimon在阿里云大数据云原生运维数仓的实践
构建实时云原生运维数仓以提升大数据集群的运维能力,采用 Flink+Paimon 方案,解决资源审计、拓扑及趋势分析需求。
18418 54
Flink+Paimon在阿里云大数据云原生运维数仓的实践
|
12天前
|
SQL 分布式计算 数据库
畅捷通基于Flink的实时数仓落地实践
本文整理自畅捷通总架构师、阿里云MVP专家郑芸老师在 Flink Forward Asia 2023 中闭门会上的分享。
8207 14
畅捷通基于Flink的实时数仓落地实践
|
1月前
|
SQL JSON 缓存
玳数科技集成 Flink CDC 3.0 的实践
本文投稿自玳数科技工程师杨槐老师,介绍了 Flink CDC 3.0 与 ChunJun 框架在玳数科技的集成实践。
497 7
玳数科技集成 Flink CDC 3.0 的实践
|
18天前
|
存储 运维 Cloud Native
"Flink+Paimon:阿里云大数据云原生运维数仓的创新实践,引领实时数据处理新纪元"
【8月更文挑战第2天】Flink+Paimon在阿里云大数据云原生运维数仓的实践
181 3
|
23天前
|
数据采集 资源调度 搜索推荐
Flink在实时搜索引擎索引构建中的深度应用与实践
随着数据源规模的扩大和查询请求的增加,如何优化Flink的性能和资源调度成为了一个重要的问题。Flink提供了多种性能优化手段,如并行度调整、状态后端选择、任务链优化等。同时,Flink还支持与YARN、Kubernetes等集群管理系统集成,实现资源的动态调度和弹性伸缩,以适应不同规模的业务需求。
|
2月前
|
SQL 存储 NoSQL
贝壳找房基于Flink+Paimon进行全量数据实时分组排序的实践
本文投稿自贝壳家装数仓团队,在结合家装业务场景下所探索出的一种基于 Flink+Paimon 的排序方案。这种方案可以在实时环境对全量数据进行准确的分组排序,同时减少对内存资源的消耗。在这一方案中,引入了“事件时间分段”的概念,以避免 Flink State 中冗余数据对排序结果的干扰,在保证排序结果准确性的同时,减少了对内存的消耗。并且基于数据湖组件 Paimon 的聚合模型和 Audit Log 数据在数据湖内构建了拉链表,为排序结果提供了灵活的历史数据基础。
28712 8
贝壳找房基于Flink+Paimon进行全量数据实时分组排序的实践
|
1月前
|
Oracle 关系型数据库 MySQL
实时计算 Flink版产品使用问题之在online模式下增量抓取Oracle数据时,在archive_log切换时,出现数据丢失的情况,是什么原因
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。