Apache Hudi 流转批 场景实践

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Hudi 流转批 场景实践

背景

在某些业务场景下,我们需要一个标志来衡量hudi数据写入的进度,比如:Flink 实时向 Hudi 表写入数据,然后使用这个 Hudi 表来支持批量计算并通过一个 flag 来评估它的分区数据是否完整从而进一步写入分区数据进行分区级别的ETL,这也就是我们通常说的流转批

EventTime计算原理

图中Flink Sink包含了两个算子。第一个writer 算子,它负责把数据写入文件,writer在checkpoint触发时,会把自己写入的最大的一个时间传到commit算子中,然后commit算子从多个上游传过来的时间中选取一个最小值作为这一批提交数据的时间,并写入HUDI表的元数据中。

社区相关工作参考: https://issues.apache.org/jira/browse/HUDI-5095

案例使用

我们的方案是将这个进度值(EventTime)存储为 hudi 提交(版本)元数据的属性里,然后通过访问这个元数据属性获取这个进度值。在下游的批处理任务之前加一个监控任务去监控最新快照元数据。如果它的时间已经超过了当前的分区时间,就认为这个表的数据已经完备了,这个监控任务就会成功触发下游的批处理任务进行计算,这样可以防止在异常场景下数据管道或者批处理任务空跑的情况。

下图是一个flink 1分钟级别入库到HUDI ODS表, 然后通过流转批计算写入HUDI DWD表的一个执行过程。

US调度系统轮询逻辑

如何解决乱序到来问题,  我们可以通过设置spedGapTime来设置允许延迟到来的范围默认是0 不会延迟到来。

Maven pom 依赖

针对此功能特性的Hudi依赖版本如下

org.apache.hudi
    hudi-flink1.13-bundle
    0.12.1
  
  
    org.apache.hudi
    hudi-flink1.15-bundle
    0.12.1


如何设置EventTime

能够解析的字段类型及格式如下:

类型 示例
TIMESTAMP(3) 2012-12-12T12:12:12
TIMESTAMP(3) 2012-12-12 12:12:12
DATE 2012-12-12
BIGINT 100L
INT 100

Flink API

用户只需要设置flink conf指定时间字段作为时间推进字段

Map options = new HashMap<>();
// 这里省略其他表字段
options.put(FlinkOptions.EVENT_TIME_FIELD.key(), "ts");
HoodiePipeline.Builder builder = HoodiePipeline.builder(targetTable)
     .column("id int not null")
     .column("ts string")
     .column("dt string")
     .pk("id")
     .partition("dt")
     .options(options);

Flink SQL

通过设置hoodie.payload.event.time.field指定需要计算的eventtime的字段

create table hudi_cow_01(\n" +
"  uuid varchar(20),\n" +
"  name varchar(10),\n" +
"  age int,\n" +
"  ts timestamp(3),\n" +
"  PRIMARY KEY(uuid) NOT ENFORCED\n" +
")\n" +
" with (\n" +
 // 这里省略其他参数
"  'hoodie.payload.event.time.field' = 'ts'\n"
")

如何读取EventTime

Spark SQL

call show_commit_extra_metadata(table => 'hudi_tauth_test.hudi_cow_01', metadata_key => 'hoodie.payload.event.time.field');

Java API

代码获取片段如下

Option commitMetadataOption = MetadataConversionUtils.getHoodieCommitMetadata(metaClient, currentInstant);
if (!commitMetadataOption.isPresent()) {
    throw new HoodieException(String.format("Commit %s not found commitMetadata in Commits %s.", currentInstant, timeline));
}
// 获取到当前版本的时间进度
String eventTime = commitMetadataOption.get().getExtraMetadata().get(FlinkOptions.EVENT_TIME_FIELD.key());
System.out.println("current eventTime: " + eventTime);
输出结果如下
current eventTime: 1667971364742
相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
2月前
|
消息中间件 存储 监控
构建高可用性Apache Kafka集群:从理论到实践
【10月更文挑战第24天】随着大数据时代的到来,数据传输与处理的需求日益增长。Apache Kafka作为一个高性能的消息队列服务,因其出色的吞吐量、可扩展性和容错能力而受到广泛欢迎。然而,在构建大规模生产环境下的Kafka集群时,保证其高可用性是至关重要的。本文将从个人实践经验出发,详细介绍如何构建一个高可用性的Kafka集群,包括集群规划、节点配置以及故障恢复机制等方面。
127 4
|
3天前
|
存储 运维 监控
金融场景 PB 级大规模日志平台:中信银行信用卡中心从 Elasticsearch 到 Apache Doris 的先进实践
中信银行信用卡中心每日新增日志数据 140 亿条(80TB),全量归档日志量超 40PB,早期基于 Elasticsearch 构建的日志云平台,面临存储成本高、实时写入性能差、文本检索慢以及日志分析能力不足等问题。因此使用 Apache Doris 替换 Elasticsearch,实现资源投入降低 50%、查询速度提升 2~4 倍,同时显著提高了运维效率。
金融场景 PB 级大规模日志平台:中信银行信用卡中心从 Elasticsearch 到 Apache Doris 的先进实践
|
2月前
|
存储 数据挖掘 数据处理
巴别时代使用 Apache Paimon 构建 Streaming Lakehouse 的实践
随着数据湖技术的发展,企业纷纷探索其优化潜力。本文分享了巴别时代使用 Apache Paimon 构建 Streaming Lakehouse 的实践。Paimon 支持流式和批处理,提供高性能、统一的数据访问和流批一体的优势。通过示例代码和实践经验,展示了如何高效处理实时数据,解决了数据一致性和故障恢复等挑战。
140 61
|
2月前
|
存储 消息中间件 分布式计算
Cisco WebEx 数据平台:统一 Trino、Pinot、Iceberg 及 Kyuubi,探索 Apache Doris 在 Cisco 的改造实践
Cisco WebEx 早期数据平台采用了多系统架构(包括 Trino、Pinot、Iceberg 、 Kyuubi 等),面临架构复杂、数据冗余存储、运维困难、资源利用率低、数据时效性差等问题。因此,引入 Apache Doris 替换了 Trino、Pinot 、 Iceberg 及 Kyuubi 技术栈,依赖于 Doris 的实时数据湖能力及高性能 OLAP 分析能力,统一数据湖仓及查询分析引擎,显著提升了查询性能及系统稳定性,同时实现资源成本降低 30%。
Cisco WebEx 数据平台:统一 Trino、Pinot、Iceberg 及 Kyuubi,探索 Apache Doris 在 Cisco 的改造实践
|
2月前
|
SQL 存储 数据处理
兼顾高性能与低成本,浅析 Apache Doris 异步物化视图原理及典型场景
Apache Doris 物化视图进行了支持。**早期版本中,Doris 支持同步物化视图;从 2.1 版本开始,正式引入异步物化视图,[并在 3.0 版本中完善了这一功能](https://www.selectdb.com/blog/1058)。**
|
2月前
|
监控 Cloud Native BI
8+ 典型分析场景,25+ 标杆案例,Apache Doris 和 SelectDB 精选案例集(2024版)电子版上线
飞轮科技正式推出 Apache Doris 和 SelectDB 精选案例集 ——《走向现代化的数据仓库(2024 版)》,汇聚了来自各行各业的成功案例与实践经验。该书以行业为划分标准,辅以使用场景标签,旨在为读者提供一个高度整合、全面涵盖、分类清晰且易于查阅的学习资源库。
|
3月前
|
存储 分布式计算 druid
大数据-149 Apache Druid 基本介绍 技术特点 应用场景
大数据-149 Apache Druid 基本介绍 技术特点 应用场景
94 1
大数据-149 Apache Druid 基本介绍 技术特点 应用场景
|
3月前
|
SQL 存储 分布式计算
大数据-157 Apache Kylin 背景 历程 特点 场景 架构 组件 详解
大数据-157 Apache Kylin 背景 历程 特点 场景 架构 组件 详解
56 9
|
3月前
|
存储 小程序 Apache
10月26日@杭州,飞轮科技 x 阿里云举办 Apache Doris Meetup,探索保险、游戏、制造及电信领域数据仓库建设实践
10月26日,由飞轮科技与阿里云联手发起的 Apache Doris 杭州站 Meetup 即将开启!
86 0
|
1月前
|
存储 人工智能 大数据
The Past, Present and Future of Apache Flink
本文整理自阿里云开源大数据负责人王峰(莫问)在 Flink Forward Asia 2024 上海站主论坛开场的分享,今年正值 Flink 开源项目诞生的第 10 周年,借此时机,王峰回顾了 Flink 在过去 10 年的发展历程以及 Flink社区当前最新的技术成果,最后展望下一个十年 Flink 路向何方。
355 33
The Past, Present and Future of Apache Flink

热门文章

最新文章

推荐镜像

更多