flink 读取hudi 表元数据信息

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: flink 如何获取hudi 表的元数据信息

一:catalog 配置

String createCatalog = "CREATE CATALOG hudi_catalog WITH (\n" +        

"    'type' = 'hudi',\n" +      

 "    'mode' = 'hms',\n" +        

"    'default-database' = 'default',\n" +        

"    'hive.conf.dir' = '/Users/xuchao/conf/hadoopconf/dev_hadoop_flink01',\n" +        

"    'table.external' = 'true'\n" +        

")";

二:表信息读取org.apache.hudi.table.catalog.HoodieHiveCatalog#getTable

  •  通过HiveMetaStoreClient 获取当前db.tablename 的 org.apache.hadoop.hive.metastore.api.Table 信息,并转化为flink 需要属性的表结构

image.png

  •  获取最新的表结构,构建org.apache.avro.Schema

   1:  获取Schema 的逻辑入口在StreamerUtil.getLatestTableSchema,主要通过表所在路径(eg:hdfs://ns1/dtInsight/hive/warehouse/flink_db/test_hudi_flink_cow1)+ hiveConf 来解析,具体逻辑后面的步骤会逐一解析。

   2: 获取ActiveTimeLine .     通过调用元数据接口获取对应目录下需要读取的扩展名的文件,

image.png

其中timelinePath ---> hdfs://ns1/dtInsight/hive/warehouse/flink_db/test_hudi_flink_cow1/.hoodie

includedExtensions ---> [.restore, .rollback, .clean.inflight, .schemacommit.inflight, .inflight, .savepoint.inflight, .restore.inflight, .deltacommit.requested, .restore.requested, .replacecommit, .deltacommit, .savepoint, .replacecommit.requested, .deltacommit.inflight, .indexing.inflight, .schemacommit.requested, .compaction.requested, .indexing.requested, .rollback.requested, .replacecommit.inflight, .clean.requested, .rollback.inflight, .compaction.inflight, .commit.requested, .commit, .schemacommit, .indexing, .clean]

 3: 根据上面扫描目录获取到的元数据信息进行过滤处理,过滤逻辑:根据 时间+action 进行分组,

     分组之后根据组里面的state 对比获取最高级的instance

image.png

  HoodieInstant 属性action,state 的含义:

 action 指的是对Hudi表执行的操作类型 目前包括:

       commit(表示一批记录原子性的写入到一张表中),

      deltacommit(增量提交指的是将一批记录原子地写入MergeOnRead类型表,其中一些/所有数据都可以写入增量日志),

      clean(清除表中不再需要的旧版本文件),

      savepoint(将文件组标记为“saved”,cleans执行时不会删除对应的数据),

      restore,

      rollback(Commits或者Delta_commit执行不成功时回滚数据,删除期间产生的任意文件),

      compaction(将行式文件转化为列式文件),

      replacecommit,

      indexing;

 state表示在指定的时间点(Instant Time)对Hudi表执行操作(Instant Action)后,表所处的状态,目前包括:REQUESTED(已调度但未初始化),INFLIGHT(当前正在执行),COMPLETED(操作执行完成),NIL( Invalid instant)  ==> 注意在代码中是enum,比较的时候是基于顺序比较大小;

   4:获取最新的timeline

     metaClient.getActiveTimeline().getLastCommitMetadataWithValidSchema()

    基于上面获取到的HoodieInstant 构建

image.png


image.png

按上面的规则获取到最新的HoodiInstant;

hdfs dfs -cat /dtInsight/hive/warehouse/flink_db/test_hudi_flink_cow1/.hoodie/20230113135706243.commit, 查看其中的内容:

image.png

从commit 的schema str 中解析出Schema,eg:

image.png

  • 从paramaters 里面获取对应的primaryKey 信息,最后在基于flink 的CatalogTable 构建出CatalogBaseTable

image.png

至此hudi 表的元数据信息构建完成

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
3月前
|
容灾 流计算
美团 Flink 大作业部署问题之Checkpoint 的 metadata 文件包含什么信息
美团 Flink 大作业部署问题之Checkpoint 的 metadata 文件包含什么信息
|
3月前
|
SQL 流计算
Flink SQL 在快手实践问题之由于meta信息变化导致的state向前兼容问题如何解决
Flink SQL 在快手实践问题之由于meta信息变化导致的state向前兼容问题如何解决
47 1
|
4月前
|
SQL 数据处理 流计算
实时计算 Flink版产品使用问题之怎么创建永久表
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
实时计算 Flink版产品使用问题之怎么创建永久表
|
3月前
|
Java 微服务 Spring
驾驭复杂性:Spring Cloud在微服务构建中的决胜法则
【8月更文挑战第31天】Spring Cloud是在Spring Framework基础上打造的微服务解决方案,提供服务发现、配置管理、消息路由等功能,适用于构建复杂的微服务架构。本文介绍如何利用Spring Cloud搭建微服务,包括Eureka服务发现、Config Server配置管理和Zuul API网关等组件的配置与使用。通过Spring Cloud,可实现快速开发、自动化配置,并提升系统的伸缩性和容错性,尽管仍需面对分布式事务等挑战,但其强大的社区支持有助于解决问题。
73 0
|
3月前
|
消息中间件 Java 数据处理
揭秘Apache Flink的Exactly-Once神技:如何在数据流海中确保每条信息精准无误,不丢不重?
【8月更文挑战第26天】Apache Flink 是一款先进的流处理框架,其核心特性 Exactly-Once 语义保证了数据处理的精准无误。尤其在金融及电商等高要求场景下,该特性极为关键。本文深入解析 Flink 如何实现 Exactly-Once 语义:通过状态管理确保中间结果可靠存储;利用一致的检查点机制定期保存状态快照;以及通过精确的状态恢复避免数据重复处理或丢失。最后,提供一个 Java 示例,展示如何计算用户访问次数,并确保 Exactly-Once 语义的应用。
88 0
|
4月前
|
存储 JSON Kubernetes
实时计算 Flink版操作报错合集之 写入hudi时报错,该如何排查
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
4月前
|
SQL 监控 关系型数据库
实时计算 Flink版操作报错合集之在设置监控PostgreSQL数据库时,将wal_level设置为logical,出现一些表更新和删除操作报错,怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
4月前
|
SQL 消息中间件 Java
实时计算 Flink版操作报错合集之错误信息显示找不到MysqlsnapshotsplitAssimer类,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
4月前
|
Java 关系型数据库 MySQL
实时计算 Flink版操作报错合集之同步tidb到hudi报错,一般是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
4月前
|
SQL 分布式计算 Hadoop
实时计算 Flink版产品使用问题之建了一张upsert-kafka的flink表,但是数据为空,该如何排查
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。