Apache Hudi 流转批 场景实践

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Hudi 流转批 场景实践

背景

在某些业务场景下,我们需要一个标志来衡量hudi数据写入的进度,比如:Flink 实时向 Hudi 表写入数据,然后使用这个 Hudi 表来支持批量计算并通过一个 flag 来评估它的分区数据是否完整从而进一步写入分区数据进行分区级别的ETL,这也就是我们通常说的流转批

EventTime计算原理

图中Flink Sink包含了两个算子。第一个writer 算子,它负责把数据写入文件,writer在checkpoint触发时,会把自己写入的最大的一个时间传到commit算子中,然后commit算子从多个上游传过来的时间中选取一个最小值作为这一批提交数据的时间,并写入HUDI表的元数据中。

社区相关工作参考: https://issues.apache.org/jira/browse/HUDI-5095

案例使用

我们的方案是将这个进度值(EventTime)存储为 hudi 提交(版本)元数据的属性里,然后通过访问这个元数据属性获取这个进度值。在下游的批处理任务之前加一个监控任务去监控最新快照元数据。如果它的时间已经超过了当前的分区时间,就认为这个表的数据已经完备了,这个监控任务就会成功触发下游的批处理任务进行计算,这样可以防止在异常场景下数据管道或者批处理任务空跑的情况。

下图是一个flink 1分钟级别入库到HUDI ODS表, 然后通过流转批计算写入HUDI DWD表的一个执行过程。

US调度系统轮询逻辑

如何解决乱序到来问题,  我们可以通过设置spedGapTime来设置允许延迟到来的范围默认是0 不会延迟到来。

Maven pom 依赖

针对此功能特性的Hudi依赖版本如下

org.apache.hudi
    hudi-flink1.13-bundle
    0.12.1
  
  
    org.apache.hudi
    hudi-flink1.15-bundle
    0.12.1


如何设置EventTime

能够解析的字段类型及格式如下:

类型 示例
TIMESTAMP(3) 2012-12-12T12:12:12
TIMESTAMP(3) 2012-12-12 12:12:12
DATE 2012-12-12
BIGINT 100L
INT 100

Flink API

用户只需要设置flink conf指定时间字段作为时间推进字段

Map options = new HashMap<>();
// 这里省略其他表字段
options.put(FlinkOptions.EVENT_TIME_FIELD.key(), "ts");
HoodiePipeline.Builder builder = HoodiePipeline.builder(targetTable)
     .column("id int not null")
     .column("ts string")
     .column("dt string")
     .pk("id")
     .partition("dt")
     .options(options);

Flink SQL

通过设置hoodie.payload.event.time.field指定需要计算的eventtime的字段

create table hudi_cow_01(\n" +
"  uuid varchar(20),\n" +
"  name varchar(10),\n" +
"  age int,\n" +
"  ts timestamp(3),\n" +
"  PRIMARY KEY(uuid) NOT ENFORCED\n" +
")\n" +
" with (\n" +
 // 这里省略其他参数
"  'hoodie.payload.event.time.field' = 'ts'\n"
")

如何读取EventTime

Spark SQL

call show_commit_extra_metadata(table => 'hudi_tauth_test.hudi_cow_01', metadata_key => 'hoodie.payload.event.time.field');

Java API

代码获取片段如下

Option commitMetadataOption = MetadataConversionUtils.getHoodieCommitMetadata(metaClient, currentInstant);
if (!commitMetadataOption.isPresent()) {
    throw new HoodieException(String.format("Commit %s not found commitMetadata in Commits %s.", currentInstant, timeline));
}
// 获取到当前版本的时间进度
String eventTime = commitMetadataOption.get().getExtraMetadata().get(FlinkOptions.EVENT_TIME_FIELD.key());
System.out.println("current eventTime: " + eventTime);
输出结果如下
current eventTime: 1667971364742
相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
26天前
|
消息中间件 存储 监控
构建高可用性Apache Kafka集群:从理论到实践
【10月更文挑战第24天】随着大数据时代的到来,数据传输与处理的需求日益增长。Apache Kafka作为一个高性能的消息队列服务,因其出色的吞吐量、可扩展性和容错能力而受到广泛欢迎。然而,在构建大规模生产环境下的Kafka集群时,保证其高可用性是至关重要的。本文将从个人实践经验出发,详细介绍如何构建一个高可用性的Kafka集群,包括集群规划、节点配置以及故障恢复机制等方面。
64 4
|
3月前
|
消息中间件 分布式计算 Hadoop
Apache Flink 实践问题之Flume与Hadoop之间的物理墙问题如何解决
Apache Flink 实践问题之Flume与Hadoop之间的物理墙问题如何解决
56 3
|
3月前
|
消息中间件 运维 Kafka
Apache Flink 实践问题之达到网卡的最大速度如何解决
Apache Flink 实践问题之达到网卡的最大速度如何解决
48 2
|
25天前
|
存储 数据挖掘 数据处理
巴别时代使用 Apache Paimon 构建 Streaming Lakehouse 的实践
随着数据湖技术的发展,企业纷纷探索其优化潜力。本文分享了巴别时代使用 Apache Paimon 构建 Streaming Lakehouse 的实践。Paimon 支持流式和批处理,提供高性能、统一的数据访问和流批一体的优势。通过示例代码和实践经验,展示了如何高效处理实时数据,解决了数据一致性和故障恢复等挑战。
107 61
|
22天前
|
监控 Cloud Native BI
8+ 典型分析场景,25+ 标杆案例,Apache Doris 和 SelectDB 精选案例集(2024版)电子版上线
飞轮科技正式推出 Apache Doris 和 SelectDB 精选案例集 ——《走向现代化的数据仓库(2024 版)》,汇聚了来自各行各业的成功案例与实践经验。该书以行业为划分标准,辅以使用场景标签,旨在为读者提供一个高度整合、全面涵盖、分类清晰且易于查阅的学习资源库。
|
1月前
|
存储 分布式计算 druid
大数据-149 Apache Druid 基本介绍 技术特点 应用场景
大数据-149 Apache Druid 基本介绍 技术特点 应用场景
59 1
大数据-149 Apache Druid 基本介绍 技术特点 应用场景
|
1月前
|
SQL 存储 分布式计算
大数据-157 Apache Kylin 背景 历程 特点 场景 架构 组件 详解
大数据-157 Apache Kylin 背景 历程 特点 场景 架构 组件 详解
31 9
|
2月前
|
存储 JSON 物联网
查询性能提升 10 倍、存储空间节省 65%,Apache Doris 半结构化数据分析方案及典型场景
本文我们将聚焦企业最普遍使用的 JSON 数据,分别介绍业界传统方案以及 Apache Doris 半结构化数据存储分析的三种方案,并通过图表直观展示这些方案的优势与不足。同时,结合具体应用场景,分享不同需求场景下的使用方式,帮助用户快速选择最合适的 JSON 数据存储及分析方案。
查询性能提升 10 倍、存储空间节省 65%,Apache Doris 半结构化数据分析方案及典型场景
|
1月前
|
存储 小程序 Apache
10月26日@杭州,飞轮科技 x 阿里云举办 Apache Doris Meetup,探索保险、游戏、制造及电信领域数据仓库建设实践
10月26日,由飞轮科技与阿里云联手发起的 Apache Doris 杭州站 Meetup 即将开启!
56 0
|
3月前
|
数据采集 分布式计算 Kubernetes
Apache Flink 实践问题之ZooKeeper 网络瞬断时如何解决
Apache Flink 实践问题之ZooKeeper 网络瞬断时如何解决
94 4

推荐镜像

更多
下一篇
无影云桌面