(1)Catalogs主要定义
Catalog提供了元数据信息,例如数据库、表、分区、视图以及数据库或其他外部系统中存储的函数和信息。
元数据可以是临时的,例如临时表、或者通过TableEnvironment注册的UDF,也可以是持久化的,例如Hive Metastore中的元数据。
Catalog提供了一个统一的API,用于管理元数据,并使其可以从Table API和 SQL查询语句中来访问。
(2)Catalogs类型
GenericInMemoryCatalog
GenericInMemoryCatalog 是基于内存实现的 Catalog,所有元数据只在 session 的生命周期内可用。
JdbcCatalog
JdbcCatalog 使得用户可以将 Flink 通过 JDBC 协议连接到关系数据库。PostgresCatalog 是当前实现的唯一一种 JDBC Catalog。
HiveCatalog
HiveCatalog 有两个用途:作为原生 Flink 元数据的持久化存储,以及作为读写现有 Hive 元数据的接口。
警告 Hive Metastore 以小写形式存储所有元数据对象名称。而 GenericInMemoryCatalog 区分大小写。
(3)Catalogs在Flink SQL架构中的位置
(4)Catalogs 操作
使用 SQL DDL
TableEnvironment tableEnv = ... // Create a HiveCatalog Catalog catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>"); // Register the catalog tableEnv.registerCatalog("myhive", catalog); // Create a catalog database tableEnv.executeSql("CREATE DATABASE mydb WITH (...)"); // Create a catalog table tableEnv.executeSql("CREATE TABLE mytable (name STRING, age INT) WITH (...)"); tableEnv.listTables(); // should return the tables in current catalog and database.
数据库操作
// create database catalog.createDatabase("mydb", new CatalogDatabaseImpl(...), false); // drop database catalog.dropDatabase("mydb", false); // alter database catalog.alterDatabase("mydb", new CatalogDatabaseImpl(...), false); // get databse catalog.getDatabase("mydb"); // check if a database exist catalog.databaseExists("mydb"); // list databases in a catalog catalog.listDatabases("mycatalog");
表操作
// create table catalog.createTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false); // drop table catalog.dropTable(new ObjectPath("mydb", "mytable"), false); // alter table catalog.alterTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false); // rename table catalog.renameTable(new ObjectPath("mydb", "mytable"), "my_new_table"); // get table catalog.getTable("mytable"); // check if a table exist or not catalog.tableExists("mytable"); // list tables in a database catalog.listTables("mydb");