硬核!Apache Hudi中自定义序列化和数据写入逻辑

简介: 硬核!Apache Hudi中自定义序列化和数据写入逻辑

1. 介绍

在Apache Hudi中,Hudi的一条数据使用HoodieRecord这个类表示,其中包含了hoodie的主键,record的分区文件位置,还有今天本文的关键,payload。payload是一个条数据的内容的抽象,决定了同一个主键的数据的增删改查逻辑也决定了其序列化的方式。通过对payload的自定义,可以实现数据的灵活合并,数据的自定义编码序列化等,丰富Hudi现有的语义,提升性能。

2. 场景

包括但不限于如下场景中,我们可以通过自定义payload来实现灵活的需求。

实现同一个主键的数据非row level replace语义的合并,如mvcc语义等实现同一个主键下多时间戳数据灵活排序的语义实现输出redo/undo log的效果实现自定义序列化逻辑

3. 作用方式

首先我们回顾一下一条HoodieRecord在Spark环境中使用RDD API upsert写入MOR表的生命周期。

注意:在这个过程中,shuffle/写入文件/磁盘spill的时候,都需要保证数据是已经被序列化过的格式。

4. 实现方式

在Hudi中,默认的payload实现是DefaultHoodieRecordPayload,它是OverwriteWithLatestAvroPayload子类。而OverwriteWithLatestAvroPayload这个类继承了BaseAvroPayload并implements HoodieRecordPayload这个接口。

其中BaseAvroPayload决定了数据的序列化方式,而HoodieRecordPayload决定了数据的合并方式。后者是必须使用的,但是前者不是。下面来分别分析他们的实现。

BaseAvroPayload

/** * Base class for all AVRO record based payloads, that can be ordered based on a field. */public abstract class BaseAvroPayload implements Serializable {  /**   * Avro data extracted from the source converted to bytes.   */  public final byte[] recordBytes;  /**   * For purposes of preCombining.   */  public final Comparable orderingVal;  /**   * Instantiate {@link BaseAvroPayload}.   *   * @param record      Generic record for the payload.   * @param orderingVal {@link Comparable} to be used in pre combine.   */  public BaseAvroPayload(GenericRecord record, Comparable orderingVal) {    this.recordBytes = record != null ? HoodieAvroUtils.avroToBytes(record) : new byte[0];    this.orderingVal = orderingVal;    if (orderingVal == null) {      throw new HoodieException("Ordering value is null for record: " + record);    }  }}

首先BaseAvroPayload implements了Serializable接口,标志着这个类和它的子类都是为了序列化而设计的,大家在继承的时候需要注意子类相关attribute的可序列化问题。

构造器传入了GenericRecord和一个Comparable的变量。由于Hudi使用avro作为内部的行存序列化格式,所以输入的数据需要以GenericRecord的形式传递给payload。BaseAvroPayload会将数据直接序列化成binary待IO使用。这里的假设是我们只需要做row level操作,直接操作整行的二进制数据毫无疑问是非常高效的,这里的orderingVal是因为基于行级别的record比较在RDBMS的CDC中是非常常见的,所以增加了这个字段。这样处理之后,只需保证comparable的变量也是可序列化的,这个类的所有attribute都已经是可序列化的格式了,使用任意序列化框架直接传输即可。

HoodieRecordPayload

/** * Every Hoodie table has an implementation of the <code>HoodieRecordPayload</code> This abstracts out callbacks which depend on record specific logic. */@PublicAPIClass(maturity = ApiMaturityLevel.STABLE)public interface HoodieRecordPayload<T extends HoodieRecordPayload> extends Serializable {  /**   * This method is deprecated. Please use this {@link #preCombine(HoodieRecordPayload, Properties)} method.   */  @Deprecated  @PublicAPIMethod(maturity = ApiMaturityLevel.DEPRECATED)  T preCombine(T oldValue);  /**   * When more than one HoodieRecord have the same HoodieKey, this function combines them before attempting to insert/upsert by taking in a property map.   * Implementation can leverage the property to decide their business logic to do preCombine.   *   * @param oldValue   instance of the old {@link HoodieRecordPayload} to be combined with.   * @param properties Payload related properties. For example pass the ordering field(s) name to extract from value in storage.   *   * @return the combined value   */  @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)  default T preCombine(T oldValue, Properties properties) {    return preCombine(oldValue);  }  /**   * This methods is deprecated. Please refer to {@link #combineAndGetUpdateValue(IndexedRecord, Schema, Properties)} for java docs.   */  @Deprecated  @PublicAPIMethod(maturity = ApiMaturityLevel.DEPRECATED)  Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) throws IOException;  /**   * This methods lets you write custom merging/combining logic to produce new values as a function of current value on storage and whats contained   * in this object. Implementations can leverage properties if required.   * <p>   * eg:   * 1) You are updating counters, you may want to add counts to currentValue and write back updated counts   * 2) You may be reading DB redo logs, and merge them with current image for a database row on storage   * </p>   *   * @param currentValue Current value in storage, to merge/combine this payload with   * @param schema Schema used for record   * @param properties Payload related properties. For example pass the ordering field(s) name to extract from value in storage.   * @return new combined/merged value to be written back to storage. EMPTY to skip writing this record.   */  default Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, Properties properties) throws IOException {    return combineAndGetUpdateValue(currentValue, schema);  }  /**   * This method is deprecated. Refer to {@link #getInsertValue(Schema, Properties)} for java docs.   * @param schema Schema used for record   * @return the {@link IndexedRecord} to be inserted.   */  @Deprecated  @PublicAPIMethod(maturity = ApiMaturityLevel.DEPRECATED)  Option<IndexedRecord> getInsertValue(Schema schema) throws IOException;  /**   * Generates an avro record out of the given HoodieRecordPayload, to be written out to storage. Called when writing a new value for the given   * HoodieKey, wherein there is no existing record in storage to be combined against. (i.e insert) Return EMPTY to skip writing this record.   * Implementations can leverage properties if required.   * @param schema Schema used for record   * @param properties Payload related properties. For example pass the ordering field(s) name to extract from value in storage.   * @return the {@link IndexedRecord} to be inserted.   */  @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)  default Option<IndexedRecord> getInsertValue(Schema schema, Properties properties) throws IOException {    return getInsertValue(schema);  }  /**   * This method can be used to extract some metadata from HoodieRecordPayload. The metadata is passed to {@code WriteStatus.markSuccess()} and   * {@code WriteStatus.markFailure()} in order to compute some aggregate metrics using the metadata in the context of a write success or failure.   * @return the metadata in the form of Map<String, String> if any.   */  @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)  default Option<Map<String, String>> getMetadata() {    return Option.empty();  }}

这个类的注释写得非常清楚,其中每个方法都有定义两个个不同接口,截止本文发出时候(0.9.0版本),部分内部逻辑还在使用deprecated的旧版本,所以在使用时需要注意,逻辑最好放在旧接口里。

简单来说,preCombine 这个方法定义了两个payload合并的逻辑,在两个场景下会被调用:

1.当deduplicated 开启时,写入的数据两两合并时用到2.在MOR表发生compaction时,两条从log中读取的payload合并时用到3.MOR表使用RT视图读取时

combineAndGetUpdateValue 则定义了写入数据和baseFile中的数据(这里已经被转化成avro的行存格式)的合并方式。通常情况下,这合并逻辑应该和preCombine保持语义上的一致。

最后getInsertValue则定义了如何将数据从payload形式转化成GenericRecord。在Hoodie相关的WriteHandle中被大量使用。通常是被用在写入Log/BaseFile时调用的。

几点额外注意的是:

1.combineAndGetUpdateValue和getInsertValue返回的都是Option,在这里,如果返回Option.empty(),就是指数据删除的意思。EmptyHoodieRecordPayload 正是这一逻辑的payload表达,如果preCombine的返回结果是删除,则可以返回这个类的实例而hoodie中,在insert和upsert中通过添加_hoodie_is_deleted字段来实现删除的原理,本质上也是在payload中判断到这个字段,就返回空来实现的。2.不论是否继承BaseAvroPayload这个类/是否需要Comparable类型的orderingVal, 最好保留(GenericRecord, Comparable)这个构造器,因为Hudi中存在反射调用创建对象,默认寻找的构造器就是这个。

5. 使用场景

5. 1 Column Level的数据合并

有时候我们希望能够实现两个数据合并时,能够按照每个列的实现不同的合并逻辑。这时候就可以在preCombinecombineAndGetUpdateValue方法中借助schema遍历所有列,然后做不同的处理。如果需要在preCombine中使用Schema,可以在构造器初始化的时候保留GenericRecord中schema的引用。如果发生序列化后的传输,同时又没有使用schema可以序列化的版本(avro 1.8.2中 schema是不可序列化的对象),那么可以从方法中传递的properties中传递的信息构建schema。

public HoodieRecordPayload preCombine(HoodieRecordPayload oldValue, Properties properties) {    if (schema == null) {        this.schema = new Schema.Parser().parse(properties.get(HoodieWriteConfig.AVRO_SCHEMA_STRING.key()));    }    initialSchema(properties);    GenericRecord thisRecord = getInsertValue(schema).get();    GenericRecord otherRecord = oldValue.getInsertValue(schema).get();    List<Schema.Field> fields = schema.getFields();    for (Schema.Field field : fields) {        // logic for each column    }    return new HoodieRecordPayload(thisRecord, orderingVal);}

5.2 实现自定义的序列化方式

在默认的BaseAvroPayload中,一次upsert,一条数据通常最少要序列化/反序列化三次,第一次是创建payload的时候,第二次是在写入时反序列化,第三次是写入文件时序列化。如果数据非常复杂,序列化其实是非常耗时的。我们可以通过灵活定义payload来决定序列化的方式,减少触发正反序列化的次数。这个技巧在Compaction的时候也可以获得收益。如考虑如下场景:

对于一条kakfa的数据,我们可以把key和partition相关的内容存在kafka的key/timestamp中。然后使用binary的方式获取kafka的value。通过kafka的key来构建HoodieRecordKey,然后将value直接以二进制方式存在payload中的map/list中,这样不会触发任何关于数据的序列化,额外的开销很低。而后将合并的逻辑放在getInsertValue方法中,在从payload转换成GenericRecord时,才将binary进行同一个key的数据合并和数据,这样只需要一次avro的序列化操作就可以完成写入过程。

需要注意的是,这样的设计方式毫无疑问增加了复杂度,使业务逻辑抽象方式变难,同时因为avro的序列化压缩比例通常比较高,如果直接传输业务数据,可能会有更大的IO和内存占用,需要根据场景评估收益。

6. 总结

本篇文章中我们介绍了Apache Hudi的关键数据抽象payload逻辑,同时介绍了几种关键payload的实现,最后给出基于payload的几种典型应用场景。

目录
相关文章
|
2天前
|
存储 运维 监控
从 ClickHouse 到 Apache Doris:在网易云音乐日增万亿日志数据场景下的落地
日志数据已成为企业洞察系统状态、监控网络安全及分析业务动态的宝贵资源。网易云音乐引入 Apache Doris 作为日志库新方案,替换了 ClickHouse。解决了 ClickHouse 运维复杂、不支持倒排索引的问题。目前已经稳定运行 3 个季度,规模达到 50 台服务器, 倒排索引将全文检索性能提升7倍,2PB 数据,每天新增日志量超过万亿条,峰值写入吞吐 6GB/s 。
从 ClickHouse 到 Apache Doris:在网易云音乐日增万亿日志数据场景下的落地
|
5月前
|
XML 存储 JSON
Twaver-HTML5基础学习(19)数据容器(2)_数据序列化_XML、Json
本文介绍了Twaver HTML5中的数据序列化,包括XML和JSON格式的序列化与反序列化方法。文章通过示例代码展示了如何将DataBox中的数据序列化为XML和JSON字符串,以及如何从这些字符串中反序列化数据,重建DataBox中的对象。此外,还提到了用户自定义属性的序列化注册方法。
61 1
|
3月前
|
JSON 数据格式 索引
Python中序列化/反序列化JSON格式的数据
【11月更文挑战第4天】本文介绍了 Python 中使用 `json` 模块进行序列化和反序列化的操作。序列化是指将 Python 对象(如字典、列表)转换为 JSON 字符串,主要使用 `json.dumps` 方法。示例包括基本的字典和列表序列化,以及自定义类的序列化。反序列化则是将 JSON 字符串转换回 Python 对象,使用 `json.loads` 方法。文中还提供了具体的代码示例,展示了如何处理不同类型的 Python 对象。
|
3月前
|
XML JSON Kubernetes
什么是 YAML?:一种简洁高效的数据序列化格式
什么是 YAML?:一种简洁高效的数据序列化格式
387 0
|
4月前
|
JSON 前端开发 数据格式
前端的全栈之路Meteor篇(五):自定义对象序列化的EJSON介绍 - 跨设备的对象传输
EJSON是Meteor框架中扩展了标准JSON的库,支持更多数据类型如`Date`、`Binary`等。它提供了序列化和反序列化功能,使客户端和服务器之间的复杂数据传输更加便捷高效。EJSON还支持自定义对象的定义和传输,通过`EJSON.addType`注册自定义类型,确保数据在两端无缝传递。
|
4月前
|
消息中间件 存储 Java
大数据-58 Kafka 高级特性 消息发送02-自定义序列化器、自定义分区器 Java代码实现
大数据-58 Kafka 高级特性 消息发送02-自定义序列化器、自定义分区器 Java代码实现
101 3
|
4月前
|
SQL 消息中间件 大数据
大数据-159 Apache Kylin 构建Cube 准备和测试数据(一)
大数据-159 Apache Kylin 构建Cube 准备和测试数据(一)
125 1
|
4月前
|
SQL 大数据 Apache
大数据-159 Apache Kylin 构建Cube 准备和测试数据(二)
大数据-159 Apache Kylin 构建Cube 准备和测试数据(二)
106 1
|
4月前
|
分布式计算 监控 大数据
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
119 1
|
4月前
|
消息中间件 存储 分布式计算
大数据-61 Kafka 高级特性 消息消费02-主题与分区 自定义反序列化 拦截器 位移提交 位移管理 重平衡
大数据-61 Kafka 高级特性 消息消费02-主题与分区 自定义反序列化 拦截器 位移提交 位移管理 重平衡
42 1

推荐镜像

更多