01 引言
我们知道 Flink 有Table
(表)、View
(视图)、Function
(函数/算子)、Database
(数据库)的概念,这都类似于我们平常使用的关系型数据库里面的概念。
相对于关系型数据库的这些概念,Flink 里还有一个 Catalog
(目录) 的概念,本文来讲解下。
02 Catalog
2.1 Catalog概述
数据处理最关键的方面之一是管理元数据:
- 元数据可以是临时的,例如在
Flink
中临时表、或者通过TableEnvironment
注册的UDF
; - 元数据也可以是持久化的,例如
Hive Metastore
中的元数据。
Catalog在Flink中提供了一个统一的API,用于管理元数据,并使其可以从 Table API 和 SQL 查询语句中来访问。Catalog
提供了元数据信息,例如数据库、表、分区、视图以及数据库或其他外部系统中存储的函数和信息。
2.2 Catalog分类
Catalog目前分为以下几类:
分类 | 描述 | 缺陷 |
GenericInMemoryCatalog | 基于内存实现的 Catalog | 所有元数据只在 session 的生命周期内可用 |
JdbcCatalog | 可以将 Flink 通过 JDBC 协议连接到关系数据库 | JDBC Catalog只实现了PostgresCatalog |
HiveCatalog | 作为原生 Flink 元数据的持久化存储,以及作为读写现有 Hive 元数据的接口 | Hive Metastore 以小写形式存储所有元数据对象名称。而 GenericInMemoryCatalog 区分大小写。 |
自定义 Catalog | 通过实现 Catalog 接口来开发自定义 Catalog | - |
2.3 Catalog API
2.3.1 数据库操作
// create database catalog.createDatabase("mydb", new CatalogDatabaseImpl(...), false); // drop database catalog.dropDatabase("mydb", false); // alter database catalog.alterDatabase("mydb", new CatalogDatabaseImpl(...), false); // get database catalog.getDatabase("mydb"); // check if a database exist catalog.databaseExists("mydb"); // list databases in a catalog catalog.listDatabases("mycatalog");
2.3.2 表操作
// create table catalog.createTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false); // drop table catalog.dropTable(new ObjectPath("mydb", "mytable"), false); // alter table catalog.alterTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false); // rename table catalog.renameTable(new ObjectPath("mydb", "mytable"), "my_new_table"); // get table catalog.getTable("mytable"); // check if a table exist or not catalog.tableExists("mytable"); // list tables in a database catalog.listTables("mydb");
2.3.3 视图操作
// create view catalog.createTable(new ObjectPath("mydb", "myview"), new CatalogViewImpl(...), false); // drop view catalog.dropTable(new ObjectPath("mydb", "myview"), false); // alter view catalog.alterTable(new ObjectPath("mydb", "mytable"), new CatalogViewImpl(...), false); // rename view catalog.renameTable(new ObjectPath("mydb", "myview"), "my_new_view", false); // get view catalog.getTable("myview"); // check if a view exist or not catalog.tableExists("mytable"); // list views in a database catalog.listViews("mydb");
2.3.4 分区操作
// create view catalog.createPartition( new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...), new CatalogPartitionImpl(...), false); // drop partition catalog.dropPartition(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...), false); // alter partition catalog.alterPartition( new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...), new CatalogPartitionImpl(...), false); // get partition catalog.getPartition(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...)); // check if a partition exist or not catalog.partitionExists(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...)); // list partitions of a table catalog.listPartitions(new ObjectPath("mydb", "mytable")); // list partitions of a table under a give partition spec catalog.listPartitions(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...)); // list partitions of a table by expression filter catalog.listPartitions(new ObjectPath("mydb", "mytable"), Arrays.asList(epr1, ...));
2.3.5 函数操作
catalog.createFunction(new ObjectPath("mydb", "myfunc"), new CatalogFunctionImpl(...), false); // drop function catalog.dropFunction(new ObjectPath("mydb", "myfunc"), false); // alter function catalog.alterFunction(new ObjectPath("mydb", "myfunc"), new CatalogFunctionImpl(...), false); // get function catalog.getFunction("myfunc"); // check if a function exist or not catalog.functionExists("myfunc"); // list functions in a database catalog.listFunctions("mydb");
2.4 Catalog 示例(SQL Client的方式)
① 首先需要注册Catalog:用户可以访问默认创建的内存 Catalog default_catalog,这个 Catalog 默认拥有一个默认数据库 default_database。 用户也可以注册其他的 Catalog 到现有的 Flink 会话中,创建方式如下(可以使用Flink里面的Factory工厂模式动态加载):
tableEnv.registerCatalog(new CustomCatalog("myCatalog"));
② 指定使用的内容:Flink 始终在当前的 Catalog 和数据库中寻找表、视图和 UDF,代码如下:
Flink SQL> USE CATALOG myCatalog; Flink SQL> USE myDB;
也可以通过提供全限定名 catalog.database.object 来访问不在当前 Catalog 中的元数据信息,代码如下:
Flink SQL> SELECT * FROM not_the_current_catalog.not_the_current_db.my_table; • 1
③ 其它常规命令:
-- 列出可用的 Catalog Flink SQL> show catalogs; -- 列出可用的数据库 Flink SQL> show databases; -- 列出可用的表 Flink SQL> show tables;
03 文末
本文主要讲解了Flink Catalog的概念以及用法,如果大家有兴趣可以进一步去官网查看相关的文档,这里我列出相关比较核心的文档:
- catalogs:https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/catalogs/
- jdbc catalog(postgress):https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/connectors/table/jdbc/#postgres-%e6%95%b0%e6%8d%ae%e5%ba%93%e4%bd%9c%e4%b8%ba-catalog
- hive catalog:https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/connectors/table/hive/overview/
接下来我的计划是编写 “如何自定义Catalog” ,以及Catalog的应用场景(有兴趣可先阅读《Ververica Platform-阿里巴巴全新Flink企业版揭秘》)相关的博客,谢谢大家的阅读,希望能帮助到大家,本文完!