超硬核解析!Apache Hudi灵活的Payload机制

本文涉及的产品
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
简介: Apache Hudi 的Payload是一种可扩展的数据处理机制,通过不同的Payload我们可以实现复杂场景的定制化数据写入方式,大大增加了数据处理的灵活性。Hudi Payload在写入和读取Hudi表时对数据进行去重、过滤、合并等操作的工具类,通过使用参数 "hoodie.datasource.write.payload.class"指定我们需要使用的Payload class。​

1.摘要


Apache Hudi 的Payload是一种可扩展的数据处理机制,通过不同的Payload我们可以实现复杂场景的定制化数据写入方式,大大增加了数据处理的灵活性。Hudi Payload在写入和读取Hudi表时对数据进行去重、过滤、合并等操作的工具类,通过使用参数 "hoodie.datasource.write.payload.class"指定我们需要使用的Payload class。本文我们会深入探讨Hudi Payload的机制和不同Payload的区别及使用场景。


2. 为何需要Payload


在数据写入的时候,现有整行插入、整行覆盖的方式无法满足所有场景要求,写入的数据也会有一些定制化处理需求,因此需要有更加灵活的写入方式以及对写入数据进行一定的处理,Hudi提供的playload方式可以很好的解决该问题,例如可以解决写入时数据去重问题,针对部分字段进行更新等等。


3. Payload的作用机制


写入Hudi表时需要指定一个参数hoodie.datasource.write.precombine.field,这个字段也称为Precombine Key,Hudi Payload就是根据这个指定的字段来处理数据,它将每条数据都构建成一个Payload,因此数据间的比较就变成了Payload之间的比较。只需要根据业务需求实现Payload的比较方法,即可实现对数据的处理。

Hudi所有Payload都实现HoodieRecordPayload接口,下面列出了所有实现该接口的预置Payload类。10.png


下图列举了HoodieRecordPayload接口需要实现的方法,这里有两个重要的方法preCombine和combineAndGetUpdateValue,下面我们对这两个方法进行分析。

11.png


3.1 preCombine分析

从下图可以看出,该方法比较当前数据和oldValue,然后返回一条记录。

12.png

从preCombine方法的注释描述也可以知道首先它在多条相同主键的数据同时写入Hudi时,用来进行数据去重。

调用位置

13.png

其实该方法还有另一个调用的地方,即在MOR表读取时会对Log file中的相同主键的数据进行处理。

如果同一条数据多次修改并写入了MOR表的Log文件,在读取时也会进行preCombine。

14.png


3.2 combineAndGetUpdateValue分析

该方法将currentValue(即现有parquet文件中的数据)与新数据进行对比,判断是否需要持久化新数据。

15.png

由于COW表和MOR表的读写原理差异,因此combineAndGetUpdateValue的调用在COW和MOR中也有所不同:

  • 在COW写入时会将新写入的数据与Hudi表中存的currentValue进行比较,返回需要持久化的数据
  • 在MOR读取时会将经过preCombine处理的Log中的数据与Parquet文件中的数据进行比较,返回需要持久化的数据


4.常用Payload处理逻辑的对比


了解了Payload的内核原理,下面我们对比分析下集中常用的Payload实现的方式。


4.1 OverwriteWithLatestAvroPayload

OverwriteWithLatestAvroPayload 的相关方法实现如下

16.png

可以看出使用OverwriteWithLatestAvroPayload 会根据orderingVal进行选择(这里的orderingVal即precombine key的值),而combineAndGetUpdateValue永远返回新数据。


4.2 OverwriteNonDefaultsWithLatestAvroPayload

OverwriteNonDefaultsWithLatestAvroPayload继承OverwriteWithLatestAvroPayload,preCombine方法相同,重写了combineAndGetUpdateValue方法,新数据会按字段跟schema中的default value进行比较,如果default value非null且与新数据中的值不同时,则在新数据中更新该字段。由于通常schema定义的default value都是null,在此场景下可以实现更新非null字段的功能,即如果一条数据有五个字段,使用此Payload更新三个字段时不会影响另外两个字段原来的值。

17.png


4.3 DefaultHoodieRecordPayload

DefaultHoodieRecordPayload同样继承OverwriteWithLatestAvroPayload重写了combineAndGetUpdateValue方法,通过下面代码可以看出该Payload使用precombine key对现有数据和新数据进行比较,判断是否要更新该条数据

18.png

下面我们以COW表为例展示不同Payload读写结果测试


5. 测试


我们使用如下几条源数据,以key为主键,col3为preCombine key写Hudi表。

19.png

首先我们一次写入col0是'aa'、'bb'的两条数据,由于他们的主键相同,所以在precombine时会根据col3比较去重,最终写入Hudi表的只有一条数据。(注意如果写入方式是insert或bulk_insert则不会去重)

20.png

查询结果

21.png

下面我们使用col0是'cc'的数据进行更新,这是由于三种Payload的处理逻辑不同,最终写入的数据结果也不同。

OverwriteWithLatestAvroPayload

完全用新数据覆盖了旧数据。

22.png

OverwriteNonDefaultsWithLatestAvroPayload

由于更新数据中col1 col2为null,因此该字段未被更新。

1.png

DefaultHoodieRecordPayload

由于cc的col3小于bb的,因此该数据未被更新。24.png



6. 总结


通过上面分析我们清楚了Hudi常用的几种Payload机制,总结对比如下

Payload 更新逻辑与适用场景
OverwriteWithLatestAvroPayload 永远用新数据更新老数据全部字段,适合每次更新数据都是完整的
OverwriteNonDefaultsWithLatestAvroPayload 将新数据中的非空字段更新到老数据中,适合每次更新数据只有部分字段
DefaultHoodieRecordPayload 根据precombine key比较是否要更新数据,适合实时入湖且入湖顺序乱序

虽然Hudi提供了多个预置Payload,但是仍不能满足一些特殊场景的数据处理工作:例如用户在使用Kafka-Hudi实时入湖,但是用户的一条数据的修改不在一条Kafka消息中,而是多条相同主键的数据消息到,第一条里面有col0,col1的数据,第二条有col2,col3的数据,第三条有col4的数据,这时使用Hudi自带的Payload就无法完成将这三条数据合并之后写入Hudi表的工作,要实现这个逻辑就要通过自定义Payload,重写Payload中的preCombine和combineAndGetUpdateValue方法来实现相应的业务逻辑,并在写入时通过hoodie.datasource.write.payload.class指定我们自定义的Payload实现。

目录
相关文章
|
1月前
|
Java 数据库连接 开发者
Java中的异常处理机制深度解析
【8月更文挑战第13天】本文旨在深入探讨Java编程语言中一个至关重要的组成部分——异常处理机制。我们将从基本概念入手,逐步展开讨论异常处理在Java语言设计中的角色和重要性,以及如何正确利用这一机制来提高代码的健壮性和可维护性。文章将通过分析异常处理的最佳实践,揭示如何在复杂的应用程序中有效地管理和处理错误情况。
|
1天前
|
Java 程序员 开发者
Java中的异常处理机制深度解析
本文旨在深入探讨Java中异常处理的核心概念与实际应用,通过剖析异常的本质、分类、捕获及处理方法,揭示其在程序设计中的关键作用。不同于常规摘要,本文将直接切入主题,以简明扼要的方式概述异常处理的重要性及其在Java编程中的应用策略,引导读者快速把握异常处理的精髓。
|
2天前
|
安全 网络协议 应用服务中间件
AJP Connector:深入解析及在Apache HTTP Server中的应用
【9月更文挑战第6天】在Java Web应用开发中,Tomcat作为广泛使用的Servlet容器,经常与Apache HTTP Server结合使用,以提供高效、稳定的Web服务。而AJP Connector(Apache JServ Protocol Connector)作为连接Tomcat和Apache HTTP Server的重要桥梁,扮演着至关重要的角色
19 2
|
4天前
|
Java 开发者
深入解析Java中的异常处理机制
本文将深入探讨Java中异常处理的核心概念和实际应用,包括异常的分类、捕获、处理以及最佳实践。我们将通过具体示例展示如何有效使用try-catch块、throws关键字和自定义异常类,以帮助读者更好地理解和应用Java异常处理机制。
10 1
|
4天前
|
Java 程序员 开发者
Java中的异常处理机制深度解析
本文旨在深入探讨Java中异常处理的机制,包括异常的分类、如何捕获和处理异常,以及自定义异常的最佳实践。通过实例讲解,帮助读者更好地理解如何在Java编程中有效管理和利用异常处理来提高代码的健壮性和可维护性。
|
4天前
|
分布式计算 Java Apache
Apache Spark Streaming技术深度解析
【9月更文挑战第4天】Apache Spark Streaming是Apache Spark生态系统中用于处理实时数据流的一个重要组件。它将输入数据分成小批次(micro-batch),然后利用Spark的批处理引擎进行处理,从而结合了批处理和流处理的优点。这种处理方式使得Spark Streaming既能够保持高吞吐量,又能够处理实时数据流。
14 0
|
1月前
|
存储 SQL 关系型数据库
深入解析MySQL事务机制和锁机制
深入解析MySQL事务机制和锁机制
|
1月前
|
存储 缓存 NoSQL
深入解析Memcached:内部机制、存储结构及在大数据中的应用
深入解析Memcached:内部机制、存储结构及在大数据中的应用
|
17天前
|
Java Spring
🔥JSF 与 Spring 强强联手:打造高效、灵活的 Web 应用新标杆!💪 你还不知道吗?
【8月更文挑战第31天】JavaServer Faces(JSF)与 Spring 框架是常用的 Java Web 技术。本文介绍如何整合两者,发挥各自优势,构建高效灵活的 Web 应用。首先通过 `web.xml` 和 `ContextLoaderListener` 配置 Spring 上下文,在 `applicationContext.xml` 定义 Bean。接着使用 `@Autowired` 将 Spring 管理的 Bean 注入到 JSF 管理的 Bean 中。
29 0
|
18天前
|
JavaScript 前端开发 开发者
深入解析Angular装饰器:揭秘框架核心机制与应用——从基础用法到内部原理的全面教程
【8月更文挑战第31天】本文深入解析了Angular框架中的装饰器特性,包括其基本概念、使用方法及内部机制。装饰器作为TypeScript的关键特性,在Angular中用于定义组件、服务等。通过具体示例介绍了`@Component`和`@Injectable`装饰器的应用,展示了如何利用装饰器优化代码结构与依赖注入,帮助开发者构建高效、可维护的应用。
20 0

热门文章

最新文章

推荐镜像

更多