超硬核解析!Apache Hudi灵活的Payload机制

本文涉及的产品
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
云解析 DNS,旗舰版 1个月
简介: Apache Hudi 的Payload是一种可扩展的数据处理机制,通过不同的Payload我们可以实现复杂场景的定制化数据写入方式,大大增加了数据处理的灵活性。Hudi Payload在写入和读取Hudi表时对数据进行去重、过滤、合并等操作的工具类,通过使用参数 "hoodie.datasource.write.payload.class"指定我们需要使用的Payload class。​

1.摘要


Apache Hudi 的Payload是一种可扩展的数据处理机制,通过不同的Payload我们可以实现复杂场景的定制化数据写入方式,大大增加了数据处理的灵活性。Hudi Payload在写入和读取Hudi表时对数据进行去重、过滤、合并等操作的工具类,通过使用参数 "hoodie.datasource.write.payload.class"指定我们需要使用的Payload class。本文我们会深入探讨Hudi Payload的机制和不同Payload的区别及使用场景。


2. 为何需要Payload


在数据写入的时候,现有整行插入、整行覆盖的方式无法满足所有场景要求,写入的数据也会有一些定制化处理需求,因此需要有更加灵活的写入方式以及对写入数据进行一定的处理,Hudi提供的playload方式可以很好的解决该问题,例如可以解决写入时数据去重问题,针对部分字段进行更新等等。


3. Payload的作用机制


写入Hudi表时需要指定一个参数hoodie.datasource.write.precombine.field,这个字段也称为Precombine Key,Hudi Payload就是根据这个指定的字段来处理数据,它将每条数据都构建成一个Payload,因此数据间的比较就变成了Payload之间的比较。只需要根据业务需求实现Payload的比较方法,即可实现对数据的处理。

Hudi所有Payload都实现HoodieRecordPayload接口,下面列出了所有实现该接口的预置Payload类。10.png


下图列举了HoodieRecordPayload接口需要实现的方法,这里有两个重要的方法preCombine和combineAndGetUpdateValue,下面我们对这两个方法进行分析。

11.png


3.1 preCombine分析

从下图可以看出,该方法比较当前数据和oldValue,然后返回一条记录。

12.png

从preCombine方法的注释描述也可以知道首先它在多条相同主键的数据同时写入Hudi时,用来进行数据去重。

调用位置

13.png

其实该方法还有另一个调用的地方,即在MOR表读取时会对Log file中的相同主键的数据进行处理。

如果同一条数据多次修改并写入了MOR表的Log文件,在读取时也会进行preCombine。

14.png


3.2 combineAndGetUpdateValue分析

该方法将currentValue(即现有parquet文件中的数据)与新数据进行对比,判断是否需要持久化新数据。

15.png

由于COW表和MOR表的读写原理差异,因此combineAndGetUpdateValue的调用在COW和MOR中也有所不同:

  • 在COW写入时会将新写入的数据与Hudi表中存的currentValue进行比较,返回需要持久化的数据
  • 在MOR读取时会将经过preCombine处理的Log中的数据与Parquet文件中的数据进行比较,返回需要持久化的数据


4.常用Payload处理逻辑的对比


了解了Payload的内核原理,下面我们对比分析下集中常用的Payload实现的方式。


4.1 OverwriteWithLatestAvroPayload

OverwriteWithLatestAvroPayload 的相关方法实现如下

16.png

可以看出使用OverwriteWithLatestAvroPayload 会根据orderingVal进行选择(这里的orderingVal即precombine key的值),而combineAndGetUpdateValue永远返回新数据。


4.2 OverwriteNonDefaultsWithLatestAvroPayload

OverwriteNonDefaultsWithLatestAvroPayload继承OverwriteWithLatestAvroPayload,preCombine方法相同,重写了combineAndGetUpdateValue方法,新数据会按字段跟schema中的default value进行比较,如果default value非null且与新数据中的值不同时,则在新数据中更新该字段。由于通常schema定义的default value都是null,在此场景下可以实现更新非null字段的功能,即如果一条数据有五个字段,使用此Payload更新三个字段时不会影响另外两个字段原来的值。

17.png


4.3 DefaultHoodieRecordPayload

DefaultHoodieRecordPayload同样继承OverwriteWithLatestAvroPayload重写了combineAndGetUpdateValue方法,通过下面代码可以看出该Payload使用precombine key对现有数据和新数据进行比较,判断是否要更新该条数据

18.png

下面我们以COW表为例展示不同Payload读写结果测试


5. 测试


我们使用如下几条源数据,以key为主键,col3为preCombine key写Hudi表。

19.png

首先我们一次写入col0是'aa'、'bb'的两条数据,由于他们的主键相同,所以在precombine时会根据col3比较去重,最终写入Hudi表的只有一条数据。(注意如果写入方式是insert或bulk_insert则不会去重)

20.png

查询结果

21.png

下面我们使用col0是'cc'的数据进行更新,这是由于三种Payload的处理逻辑不同,最终写入的数据结果也不同。

OverwriteWithLatestAvroPayload

完全用新数据覆盖了旧数据。

22.png

OverwriteNonDefaultsWithLatestAvroPayload

由于更新数据中col1 col2为null,因此该字段未被更新。

1.png

DefaultHoodieRecordPayload

由于cc的col3小于bb的,因此该数据未被更新。24.png



6. 总结


通过上面分析我们清楚了Hudi常用的几种Payload机制,总结对比如下

Payload 更新逻辑与适用场景
OverwriteWithLatestAvroPayload 永远用新数据更新老数据全部字段,适合每次更新数据都是完整的
OverwriteNonDefaultsWithLatestAvroPayload 将新数据中的非空字段更新到老数据中,适合每次更新数据只有部分字段
DefaultHoodieRecordPayload 根据precombine key比较是否要更新数据,适合实时入湖且入湖顺序乱序

虽然Hudi提供了多个预置Payload,但是仍不能满足一些特殊场景的数据处理工作:例如用户在使用Kafka-Hudi实时入湖,但是用户的一条数据的修改不在一条Kafka消息中,而是多条相同主键的数据消息到,第一条里面有col0,col1的数据,第二条有col2,col3的数据,第三条有col4的数据,这时使用Hudi自带的Payload就无法完成将这三条数据合并之后写入Hudi表的工作,要实现这个逻辑就要通过自定义Payload,重写Payload中的preCombine和combineAndGetUpdateValue方法来实现相应的业务逻辑,并在写入时通过hoodie.datasource.write.payload.class指定我们自定义的Payload实现。

目录
相关文章
|
5天前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
16 2
|
1月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
68 3
|
7天前
|
存储 消息中间件 算法
深入探索操作系统的心脏——内核机制解析
本文旨在揭示操作系统核心——内核的工作原理,通过剖析其关键组件与机制,为读者提供一个清晰的内核结构图景。不同于常规摘要的概述性内容,本文摘要将直接聚焦于内核的核心概念、主要功能以及其在系统管理中扮演的角色,旨在激发读者对操作系统深层次运作原理的兴趣与理解。
|
19天前
|
存储 缓存 安全
🌟Java零基础:深入解析Java序列化机制
【10月更文挑战第20天】本文收录于「滚雪球学Java」专栏,专业攻坚指数级提升,希望能够助你一臂之力,帮你早日登顶实现财富自由🚀;同时,欢迎大家关注&&收藏&&订阅!持续更新中,up!up!up!!
22 3
|
20天前
|
消息中间件 存储 负载均衡
Apache Kafka核心概念解析:生产者、消费者与Broker
【10月更文挑战第24天】在数字化转型的大潮中,数据的实时处理能力成为了企业竞争力的重要组成部分。Apache Kafka 作为一款高性能的消息队列系统,在这一领域占据了重要地位。通过使用 Kafka,企业可以构建出高效的数据管道,实现数据的快速传输和处理。今天,我将从个人的角度出发,深入解析 Kafka 的三大核心组件——生产者、消费者与 Broker,希望能够帮助大家建立起对 Kafka 内部机制的基本理解。
51 2
|
24天前
|
Java 开发者 UED
Java编程中的异常处理机制解析
在Java的世界里,异常处理是确保程序稳定性和可靠性的关键。本文将深入探讨Java的异常处理机制,包括异常的类型、如何捕获和处理异常以及自定义异常的创建和使用。通过理解这些概念,开发者可以编写更加健壮和易于维护的代码。
|
1月前
|
JavaScript 前端开发 开发者
原型链深入解析:JavaScript中的核心机制
【10月更文挑战第13天】原型链深入解析:JavaScript中的核心机制
29 0
|
1月前
|
Java 开发者
Java中的异常处理机制深度解析
【10月更文挑战第8天】 在Java编程中,异常处理不仅是保证程序鲁棒性的关键手段,更是每个开发者必须精通的核心技能。本文将深入探讨Java异常处理的各个方面,包括异常的分类、捕获与处理、自定义异常以及最佳实践等,旨在帮助读者全面理解并有效应用这一机制,提升代码的可靠性和可维护性。
|
30天前
|
SQL Java API
Apache Flink 2.0-preview released
Apache Flink 社区正积极筹备 Flink 2.0 的发布,这是自 Flink 1.0 发布以来的首个重大更新。Flink 2.0 将引入多项激动人心的功能和改进,包括存算分离状态管理、物化表、批作业自适应执行等,同时也包含了一些不兼容的变更。目前提供的预览版旨在让用户提前尝试新功能并收集反馈,但不建议在生产环境中使用。
589 13
Apache Flink 2.0-preview released
|
2月前
|
SQL 消息中间件 关系型数据库
Apache Doris Flink Connector 24.0.0 版本正式发布
该版本新增了对 Flink 1.20 的支持,并支持通过 Arrow Flight SQL 高速读取 Doris 中数据。

推荐镜像

更多