catalog 分类
⽤来保存元数据(数据库、表结构、分区、视图、函数等等),Flink也提供 了catalog,当然也可以在Flink中使⽤Hive的catalog。
catalog |
描述 |
GenericInMemoryCatalog |
基于内存的catalog(区分⼤⼩写),只有在session的⽣命周期范围内可⽤,默 认的 |
JdbcCatalog |
dbcCatalog 使⽤户能够通过 JDBC 协议将 Flink 连接到关系数据库, PostgresCatalog 是⽬前唯⼀的 JDBC Catalog 实现(https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/jdbc/#postgres-database-as-a-catalog) |
HiveCatalog |
Hive Catalog有两个用途: 1, 作为Flink 元数据的持久存储(DDL语句会在hive的catalog中保存元数据) 2,读写hive https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/hive/overview/ |
Hive集成
Flink集成Hive使⽤HiveCatalog即可
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/hive/overview/
新增依赖
部署的Hive是3.1.0,需要以下依赖:
<!--FlinkDependency--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-hive_2.11</artifactId><version>${flink.version}</version><!--<scope>provided</scope>--></dependency><!--HiveDependency--><dependency><groupId>org.apache.hive</groupId><artifactId>hive-exec</artifactId><version>${hive.version}</version><exclusions><exclusion><artifactId>hadoop-hdfs</artifactId><groupId>org.apache.hadoop</groupId></exclusion></exclusions><!--<scope>provided</scope>--></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-core</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>${hadoop.version}</version><exclusions><exclusion><artifactId>commons-math3</artifactId><groupId>org.apache.commons</groupId></exclusion></exclusions></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-common</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-jobclient</artifactId><version>${hadoop.version}</version></dependency>
读取hive
publicclassFlinkUseHiveCatalog { publicstaticvoidmain(String[] args) { //1、创建TableEnvironmentEnvironmentSettingssettings=EnvironmentSettings .newInstance() .build(); TableEnvironmenttEnv=TableEnvironment.create(settings); //2、初始化hive catalog(为避免代码重复,我这⾥封装了⼀下)HiveUtils.initHive(tEnv); //3、查询Hive中的表tEnv.executeSql("select * from myrs.user_op_log") .print(); } }
publicclassHiveUtils { /*** 初始化hive的相关资源* @param tableEnv*/publicstaticvoidinitHive(TableEnvironmenttableEnv) { Stringname="myhive"; Stringversion="3.1.0"; StringdefaultDatabase="default"; StringhiveConfDir="data/etc/"; //加载HiveModule(可以使⽤hive的UDF)//https://nightlies.apache.org/flink/flink-docs-release1.14/docs/dev/table/modules/tableEnv.loadModule(name, newHiveModule(version)); //使⽤hive⽅⾔(hivesql特有的语法)tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); //创建HiveCatalogHiveCataloghive=newHiveCatalog(name, defaultDatabase, hiveConfDir); tableEnv.registerCatalog(name, hive); //设置当前sesion使⽤的catalog和databsetableEnv.useCatalog(name); tableEnv.useDatabase("myrs"); } }