背景
在某些业务场景下,我们需要一个标志来衡量hudi数据写入的进度,比如:Flink 实时向 Hudi 表写入数据,然后使用这个 Hudi 表来支持批量计算并通过一个 flag 来评估它的分区数据是否完整从而进一步写入分区数据进行分区级别的ETL,这也就是我们通常说的流转批
。
EventTime计算原理
图中Flink Sink包含了两个算子。第一个writer 算子,它负责把数据写入文件,writer在checkpoint触发时,会把自己写入的最大的一个时间传到commit算子中,然后commit算子从多个上游传过来的时间中选取一个最小值作为这一批提交数据的时间,并写入HUDI表的元数据中。
社区相关工作参考: https://issues.apache.org/jira/browse/HUDI-5095
案例使用
我们的方案是将这个进度值(EventTime)存储为 hudi 提交(版本)元数据的属性里,然后通过访问这个元数据属性获取这个进度值。在下游的批处理任务之前加一个监控任务去监控最新快照元数据。如果它的时间已经超过了当前的分区时间,就认为这个表的数据已经完备了,这个监控任务就会成功触发下游的批处理任务进行计算,这样可以防止在异常场景下数据管道或者批处理任务空跑的情况。
下图是一个flink 1分钟级别入库到HUDI ODS表, 然后通过流转批计算写入HUDI DWD表的一个执行过程。
US调度系统轮询逻辑
如何解决乱序到来问题, 我们可以通过设置spedGapTime来设置允许延迟到来的范围默认是0 不会延迟到来。
Maven pom 依赖
针对此功能特性的Hudi依赖版本如下
org.apache.hudi hudi-flink1.13-bundle 0.12.1 org.apache.hudi hudi-flink1.15-bundle 0.12.1
如何设置EventTime
能够解析的字段类型及格式如下:
类型 | 示例 |
TIMESTAMP(3) | 2012-12-12T12:12:12 |
TIMESTAMP(3) | 2012-12-12 12:12:12 |
DATE | 2012-12-12 |
BIGINT | 100L |
INT | 100 |
Flink API
用户只需要设置flink conf指定时间字段作为时间推进字段
Map options = new HashMap<>(); // 这里省略其他表字段 options.put(FlinkOptions.EVENT_TIME_FIELD.key(), "ts"); HoodiePipeline.Builder builder = HoodiePipeline.builder(targetTable) .column("id int not null") .column("ts string") .column("dt string") .pk("id") .partition("dt") .options(options);
Flink SQL
通过设置hoodie.payload.event.time.field
指定需要计算的eventtime的字段
create table hudi_cow_01(\n" + " uuid varchar(20),\n" + " name varchar(10),\n" + " age int,\n" + " ts timestamp(3),\n" + " PRIMARY KEY(uuid) NOT ENFORCED\n" + ")\n" + " with (\n" + // 这里省略其他参数 " 'hoodie.payload.event.time.field' = 'ts'\n" ")
如何读取EventTime
Spark SQL
call show_commit_extra_metadata(table => 'hudi_tauth_test.hudi_cow_01', metadata_key => 'hoodie.payload.event.time.field');
Java API
代码获取片段如下
Option commitMetadataOption = MetadataConversionUtils.getHoodieCommitMetadata(metaClient, currentInstant); if (!commitMetadataOption.isPresent()) { throw new HoodieException(String.format("Commit %s not found commitMetadata in Commits %s.", currentInstant, timeline)); } // 获取到当前版本的时间进度 String eventTime = commitMetadataOption.get().getExtraMetadata().get(FlinkOptions.EVENT_TIME_FIELD.key()); System.out.println("current eventTime: " + eventTime); 输出结果如下 current eventTime: 1667971364742