Flink Catalog解读

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 我们知道 Flink 有Table(表)、View(视图)、Function(函数/算子)、Database(数据库)的概念,这都类似于我们平常使用的关系型数据库里面的概念。

01 引言


我们知道 Flink 有Table(表)、View(视图)、Function(函数/算子)、Database(数据库)的概念,这都类似于我们平常使用的关系型数据库里面的概念。


相对于关系型数据库的这些概念,Flink 里还有一个 Catalog(目录) 的概念,本文来讲解下。

1bb899eece314f56a5ca6d3074570690.png


02 Catalog


2.1 Catalog概述


数据处理最关键的方面之一是管理元数据:


  • 元数据可以是临时的,例如在Flink中临时表、或者通过 TableEnvironment 注册的 UDF;
  • 元数据也可以是持久化的,例如 Hive Metastore 中的元数据。


Catalog在Flink中提供了一个统一的API,用于管理元数据,并使其可以从 Table API 和 SQL 查询语句中来访问。Catalog 提供了元数据信息,例如数据库、表、分区、视图以及数据库或其他外部系统中存储的函数和信息。


2.2 Catalog分类


Catalog目前分为以下几类:

微信截图_20221010162139.png


2.3 Catalog API

2.3.1 数据库操作

// create database
catalog.createDatabase("mydb", new CatalogDatabaseImpl(...), false);
// drop database
catalog.dropDatabase("mydb", false);
// alter database
catalog.alterDatabase("mydb", new CatalogDatabaseImpl(...), false);
// get database
catalog.getDatabase("mydb");
// check if a database exist
catalog.databaseExists("mydb");
// list databases in a catalog
catalog.listDatabases("mycatalog");


2.3.2 表操作

// create table
catalog.createTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false);
// drop table
catalog.dropTable(new ObjectPath("mydb", "mytable"), false);
// alter table
catalog.alterTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false);
// rename table
catalog.renameTable(new ObjectPath("mydb", "mytable"), "my_new_table");
// get table
catalog.getTable("mytable");
// check if a table exist or not
catalog.tableExists("mytable");
// list tables in a database
catalog.listTables("mydb");


2.3.3 视图操作

// create view
catalog.createTable(new ObjectPath("mydb", "myview"), new CatalogViewImpl(...), false);
// drop view
catalog.dropTable(new ObjectPath("mydb", "myview"), false);
// alter view
catalog.alterTable(new ObjectPath("mydb", "mytable"), new CatalogViewImpl(...), false);
// rename view
catalog.renameTable(new ObjectPath("mydb", "myview"), "my_new_view", false);
// get view
catalog.getTable("myview");
// check if a view exist or not
catalog.tableExists("mytable");
// list views in a database
catalog.listViews("mydb");


2.3.4 分区操作

// create view
catalog.createPartition(
    new ObjectPath("mydb", "mytable"),
    new CatalogPartitionSpec(...),
    new CatalogPartitionImpl(...),
    false);
// drop partition
catalog.dropPartition(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...), false);
// alter partition
catalog.alterPartition(
    new ObjectPath("mydb", "mytable"),
    new CatalogPartitionSpec(...),
    new CatalogPartitionImpl(...),
    false);
// get partition
catalog.getPartition(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));
// check if a partition exist or not
catalog.partitionExists(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));
// list partitions of a table
catalog.listPartitions(new ObjectPath("mydb", "mytable"));
// list partitions of a table under a give partition spec
catalog.listPartitions(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));
// list partitions of a table by expression filter
catalog.listPartitions(new ObjectPath("mydb", "mytable"), Arrays.asList(epr1, ...));


2.3.5 函数操作

catalog.createFunction(new ObjectPath("mydb", "myfunc"), new CatalogFunctionImpl(...), false);
// drop function
catalog.dropFunction(new ObjectPath("mydb", "myfunc"), false);
// alter function
catalog.alterFunction(new ObjectPath("mydb", "myfunc"), new CatalogFunctionImpl(...), false);
// get function
catalog.getFunction("myfunc");
// check if a function exist or not
catalog.functionExists("myfunc");
// list functions in a database
catalog.listFunctions("mydb");


2.4 Catalog 示例(SQL Client的方式)


① 首先需要注册Catalog:用户可以访问默认创建的内存 Catalog default_catalog,这个 Catalog 默认拥有一个默认数据库 default_database。 用户也可以注册其他的 Catalog 到现有的 Flink 会话中,创建方式如下(可以使用Flink里面的Factory工厂模式动态加载):

tableEnv.registerCatalog(new CustomCatalog("myCatalog"));


② 指定使用的内容:Flink 始终在当前的 Catalog 和数据库中寻找表、视图和 UDF,代码如下:

Flink SQL> SELECT * FROM not_the_current_catalog.not_the_current_db.my_table;


③ 其它常规命令

-- 列出可用的 Catalog
Flink SQL> show catalogs;
-- 列出可用的数据库 
Flink SQL> show databases;
-- 列出可用的表
Flink SQL> show tables;


03 文末


本文主要讲解了Flink Catalog的概念以及用法,如果大家有兴趣可以进一步去官网查看相关的文档,这里我列出相关比较核心的文档:



接下来我的计划是编写 “如何自定义Catalog” ,以及Catalog的应用场景(有兴趣可先阅读《Ververica Platform-阿里巴巴全新Flink企业版揭秘》)相关的博客,谢谢大家的阅读,希望能帮助到大家,本文完!










相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
5月前
|
SQL 存储 数据库
Flink + Paimon 数据 CDC 入湖最佳实践
Flink + Paimon 数据 CDC 入湖最佳实践
1160 59
|
5月前
|
Java 关系型数据库 MySQL
Flink CDC有见这个报错不?
【2月更文挑战第29天】Flink CDC有见这个报错不?
70 2
|
5月前
|
Oracle 关系型数据库 流计算
Flink CDC里我通过oracle的connector往hudi插入数据
【1月更文挑战第23天】【1月更文挑战第115篇】Flink CDC里我通过oracle的connector往hudi插入数据
113 8
|
5月前
|
缓存 NoSQL 数据库
Flink cdc到doris,starrocks,table store
Flink cdc到doris,starrocks,table store
|
5月前
|
SQL 分布式计算 MaxCompute
Apache Flink目前不支持直接写入MaxCompute,但是可以通过Hive Catalog将Flink的数据写入Hive表
Apache Flink目前不支持直接写入MaxCompute,但是可以通过Hive Catalog将Flink的数据写入Hive表
91 3
|
10月前
|
Oracle 关系型数据库 数据库
Flink CDC中database 为什么ORCLDB,不是ORCL吗?
Flink CDC中database 为什么ORCLDB,不是ORCL吗?
39 0
|
11月前
|
SQL Java 数据库
Flink Catalog解读
Flink Catalog解读
203 0
|
存储 SQL 消息中间件
基于 Flink & Paimon 实现 Streaming Warehouse 数据一致性管理
字节跳动基础架构工程师李明,在 Apache Paimon Meetup 的分享。
14827 5
基于 Flink & Paimon 实现 Streaming Warehouse 数据一致性管理
|
SQL 消息中间件 存储
|
SQL 存储 关系型数据库
Flink CDC 系列 - 构建 MySQL 和 Postgres 上的 Streaming ETL
本篇教程将展示如何基于 Flink CDC 快速构建 MySQL 和 Postgres 的流式 ETL。
Flink CDC 系列 - 构建 MySQL 和 Postgres 上的 Streaming ETL
下一篇
无影云桌面