一:catalog 配置
String createCatalog = "CREATE CATALOG hudi_catalog WITH (\n" +
" 'type' = 'hudi',\n" +
" 'mode' = 'hms',\n" +
" 'default-database' = 'default',\n" +
" 'hive.conf.dir' = '/Users/xuchao/conf/hadoopconf/dev_hadoop_flink01',\n" +
" 'table.external' = 'true'\n" +
")";
二:表信息读取org.apache.hudi.table.catalog.HoodieHiveCatalog#getTable
- 通过HiveMetaStoreClient 获取当前db.tablename 的 org.apache.hadoop.hive.metastore.api.Table 信息,并转化为flink 需要属性的表结构
- 获取最新的表结构,构建org.apache.avro.Schema
1: 获取Schema 的逻辑入口在StreamerUtil.getLatestTableSchema,主要通过表所在路径(eg:hdfs://ns1/dtInsight/hive/warehouse/flink_db/test_hudi_flink_cow1)+ hiveConf 来解析,具体逻辑后面的步骤会逐一解析。
2: 获取ActiveTimeLine . 通过调用元数据接口获取对应目录下需要读取的扩展名的文件,
其中timelinePath ---> hdfs://ns1/dtInsight/hive/warehouse/flink_db/test_hudi_flink_cow1/.hoodie
includedExtensions ---> [.restore, .rollback, .clean.inflight, .schemacommit.inflight, .inflight, .savepoint.inflight, .restore.inflight, .deltacommit.requested, .restore.requested, .replacecommit, .deltacommit, .savepoint, .replacecommit.requested, .deltacommit.inflight, .indexing.inflight, .schemacommit.requested, .compaction.requested, .indexing.requested, .rollback.requested, .replacecommit.inflight, .clean.requested, .rollback.inflight, .compaction.inflight, .commit.requested, .commit, .schemacommit, .indexing, .clean]
3: 根据上面扫描目录获取到的元数据信息进行过滤处理,过滤逻辑:根据 时间+action 进行分组,
分组之后根据组里面的state 对比获取最高级的instance
HoodieInstant 属性action,state 的含义:
action 指的是对Hudi表执行的操作类型 目前包括:
commit(表示一批记录原子性的写入到一张表中),
deltacommit(增量提交指的是将一批记录原子地写入MergeOnRead类型表,其中一些/所有数据都可以写入增量日志),
clean(清除表中不再需要的旧版本文件),
savepoint(将文件组标记为“saved”,cleans执行时不会删除对应的数据),
restore,
rollback(Commits或者Delta_commit执行不成功时回滚数据,删除期间产生的任意文件),
compaction(将行式文件转化为列式文件),
replacecommit,
indexing;
state表示在指定的时间点(Instant Time)对Hudi表执行操作(Instant Action)后,表所处的状态,目前包括:REQUESTED(已调度但未初始化),INFLIGHT(当前正在执行),COMPLETED(操作执行完成),NIL( Invalid instant) ==> 注意在代码中是enum,比较的时候是基于顺序比较大小;
4:获取最新的timeline
metaClient.getActiveTimeline().getLastCommitMetadataWithValidSchema()
基于上面获取到的HoodieInstant 构建
按上面的规则获取到最新的HoodiInstant;
hdfs dfs -cat /dtInsight/hive/warehouse/flink_db/test_hudi_flink_cow1/.hoodie/20230113135706243.commit, 查看其中的内容:
从commit 的schema str 中解析出Schema,eg:
- 从paramaters 里面获取对应的primaryKey 信息,最后在基于flink 的CatalogTable 构建出CatalogBaseTable
至此hudi 表的元数据信息构建完成