hudi概念讲解

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
日志服务 SLS,月写入数据量 50GB 1个月
简介: hudi概念讲解

MOR与COW

Hudi提供两类型表:写时复制(Copy on Write, COW)表和读时合并(Merge On Read, MOR)表。

60a6bcefe26f4b118e50f46e4d0afd1d.png表类型:

Copy On Write

           COW,顾名思义,它是在数据写入的时候,复制一份原来的拷贝,在其基础上添加新数据。

                   正在读数据的请求,读取的是最近的完整副本,这类似Mysql 的MVCC的思想。

  • 优点:读取时,只读取对应分区的一个数据文件即可,较为高效;
  • 缺点:数据写入的时候,需要复制一个先前的副本再在其基础上生成新的数据文件,这个过程比较耗时。
  • 适用场景:对于一些读多写少的数据,写入时复制的做法就很不错,例如配置、黑名单、物流地址等变化非常少的数据,这是一种无锁的实现。可以帮我们实现程序更高的并发。


  • COW表主要使用列式文件格式(Parquet)存储数据,在写入数据过程中,执行同步合并,更新数据版本并重写数据文件,类似RDBMS中的B-Tree更新。
  • 更新update:在更新记录时,Hudi会先找到包含更新数据的文件,然后再使用更新值(最新的数据)重写该文件,包含其他记录的文件保持不变。当突然有大量写操作时会导致重写大量文件,从而导致极大的I/O开销。
  • 读取read:在读取数据时,通过读取最新的数据文件来获取最新的更新,此存储类型适用于少量写入和大量读取的场景

COW缺陷

  • 数据一致性问题
    cow这种实现只是保证数据的最终一致性,在添加到拷贝数据但还没进行替换的时候,读到的仍然是旧数据。
  • 内存占用问题
    如果对象比较大,频繁地进行替换会消耗内存,从而引发 Java 的 GC 问题,这个时候,我们应该考虑其他的容器,例如 ConcurrentHashMap

Merge On Read

     简称MOR,新插入的数据存储在delta log 中,定期再将delta log合并进行parquet数据文件。

     读取数据时,会将delta log跟老的数据文件做merge,得到完整的数据返回。下图演示了MOR的两               种数据读写方式。

           

  • 优点:由于写入数据先写delta log,且delta log较小,所以写入成本较低
  • 缺点:需要定期合并整理compact,否则碎片文件较多。读取性能较差,因为需要将delta log和老数据文件合并
     

  • MOR表是COW表的升级版,它使用列式(parquet)与行式(avro)文件混合的方式存储数据。在更新记录时,类似NoSQL中的LSM-Tree更新。
  • 更新:在更新记录时,仅更新到增量文件(Avro)中,然后进行异步(或同步)的compaction,最后创建列式文件(parquet)的新版本。此存储类型适合频繁写的工作负载,因为新记录是以追加的模式写入增量文件中。
  • 读取:在读取数据集时,需要先将增量文件与旧文件进行合并,然后生成列式文件成功后,再进行查询

写时拷贝(COW)-这类似于RDBMS B-Tree更新

读时合并(MOR)-这类似于No-SQL LSM-Tree更新

60a6bcefe26f4b118e50f46e4d0afd1d.png

查询方式:


Hudi支持以下存储数据的视图

   读优化视图 (Read Optimized Queries)

  • 直接查询基本文件(数据集的最新快照),其实就是列式文件(Parquet)。并保证与非Hudi列式数据集相比,具有相同的列式查询性能。

  • 读优化查询和快照查询相同仅访问基本文件,提供给定文件片自上次执行压缩操作以来的数据。通常查询数据的最新程度的保证取决于压缩策
  • 可查看给定的commit/compact即时操作的表的最新快照

   增量视图 (Snapshot Queries)

  • 仅查询新写入数据集的文件,需要指定一个Commit/Compaction的即时时间(位于Timeline上的某个Instant)作为条件,来查询此条件之后的新数据。
  • 可查看自给定commit/delta commit即时操作以来新写入的数据,有效的提供变更流来启用增量数据管道。

   实时视图 (Snapshot Queries)

  • 查询某个增量提交操作中数据集的最新快照,先进行动态合并最新的基本文件(Parquet)和增量文件(Avro)来提供近实时数据集(通常会存在几分钟的延迟)。
  • 读取所有 partiiton 下每个 FileGroup 最新的 FileSlice 中的文件,Copy On Write 表读 parquet 文件,Merge On Read 表读 parquet + log 文件

对于 MergeOnRead 表选择查询类型需做以下权衡:

60a6bcefe26f4b118e50f46e4d0afd1d.png

 

hudi 在hive中有两张表:
xxx_ro:历史(compact策略触发后能查询到的数据,
Read Optimized
xxx_rt:实时

hudi payload

Hudi 的Payload是一种可扩展的数据处理机制,通过不同的Payload我们可以实现复杂场景的定制化数据写入方式,大大增加了数据处理的灵活性。Hudi Payload在写入和读取Hudi表时对数据进行去重、过滤、合并等操作的工具类,通过使用参数 "hoodie.datasource.write.payload.class"指定我们需要使用的Payload class。本文我们会深入探讨Hudi Payload的机制和不同预制Payload之前的区别及使用场景。


在数据写入的时候,现有整行插入、整行覆盖的方式是无法满足所以场景要求的,写入的数据也会有一些定制化处理需求,因此需要有更加灵活的写入方式以及对写入数据进行一定的处理,Hudi提供的playload方式可以很好的解决该问题,例如:可以解决写入时是数据去重问题,针对部分字段进行更新等等


写入Hudi表时需要指定一个参数hoodie.datasource.write.precombine.field,这个字段也称为Precombine Key,Hudi Payload就是根据这个指定的字段来处理数据,它将每条数据都构建成一个Payload,因此数据间的比较就变成了Payload之间的比较。只需要根据业务需求实现Payload的比较方法,即可实现对数据的处理。

60a6bcefe26f4b118e50f46e4d0afd1d.png

数据湖hudi的spark优化参数

小文件优化

为了便于说明,本文只考虑 COPY_ON_WRITE 表的小文件自动合并功能。在阅读下文之前,我们先来看看几个相关的参数:

hoodie.parquet.max.file.size:数据文件的最大大小。Hudi 会尝试将文件大小保持在此配置值;hoodie.parquet.small.file.limit:文件大小小于这个配置值的均视为小文件;hoodie.copyonwrite.insert.split.size:单分区插入的数据条数,这个值应该和单个文件的记录条数相同。可以根据 hoodie.parquet.max.file.size 和单条记录的大小进行调整。spark+hudi优化

通过Spark作业将数据写入Hudi时,需要注意的调优手段如下:

输入并行性: Hudi对输入进行分区默认并发度为1500,以确保每个Spark分区都在2GB的限制内(在Spark2.4.0版本之后去除了该限制),如果有更大的输入,则相应地进行调整。我们建议设置shuffle的并发度,配置项为hoodie.[insert|upsert|bulkinsert].shuffle.parallelism,以使其至少达到input_data_size/500MB

Off-heap(堆外)内存: Hudi写入parquet文件,需要使用一定的堆外内存,如果遇到此类故障,请考虑设置类似spark.yarn.executor.memoryOverheadspark.yarn.driver.memoryOverhead的值。

Spark 内存: 通常Hudi需要能够将单个文件读入内存以执行合并或压缩操作,因此执行程序的内存应足以容纳此文件。另外,Hudi会缓存输入数据以便能够智能地放置数据,因此预留一些spark.memory.storageFraction通常有助于提高性能。

调整文件大小: 设置limitFileSize以平衡接收/写入延迟与文件数量,并平衡与文件数据相关的元数据开销。

时间序列/日志数据: 对于单条记录较大的数据库/nosql变更日志,可调整默认配置。另一类非常流行的数据是时间序列/事件/日志数据,它往往更加庞大,每个分区的记录更多。在这种情况下,请考虑通过bloomFilterFPP()/bloomFilterNumEntries()来调整Bloom过滤器的精度,以加速目标索引查找时间,另外可考虑一个以事件时间为前缀的键,这将使用范围修剪并显着加快索引查找的速度。

GC调优: 请确保遵循Spark调优指南中的垃圾收集调优技巧,以避免OutOfMemory错误。使用G1 / CMS收集器,其中添加到spark.executor.extraJavaOptions的示例如下

-XX:NewSize=1g -XX:SurvivorRatio=2 -XX:+UseCompressedOops -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:CMSInitiatingOccupancyFraction=70 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCApplicationConcurrentTime -XX:+PrintTenuringDistribution -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/hoodie-heapdump.hprof

OutOfMemory错误: 如果出现OOM错误,则可尝试通过如下配置处理:

spark.memory.fraction = 0.2
spark.memory.storageFraction = 0.2

允许其溢出而不是OOM(速度变慢与间歇性崩溃相比)。

以下是可以参考的完整的生产配置:

spark.driver.extraClassPath /etc/hive/conf
spark.driver.extraJavaOptions -XX:+PrintTenuringDistribution -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCApplicationConcurrentTime -XX:+PrintGCTimeStamps -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/hoodie-heapdump.hprof
spark.driver.maxResultSize 2g
spark.driver.memory 4g
spark.executor.cores 1
spark.executor.extraJavaOptions -XX:+PrintFlagsFinal -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy -XX:+UnlockDiagnosticVMOptions -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/hoodie-heapdump.hprof
spark.executor.id driver
spark.executor.instances 300
spark.executor.memory 6g
spark.rdd.compress true
spark.kryoserializer.buffer.max 512m
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.shuffle.service.enabled true
spark.sql.hive.convertMetastoreParquet false
spark.submit.deployMode cluster
spark.task.cpus 1
spark.task.maxFailures 4
spark.yarn.driver.memoryOverhead 1024
spark.yarn.executor.memoryOverhead 3072
spark.yarn.max.executor.failures 100

数据湖hudi的flink优化参数

表参数
1、Memory

60a6bcefe26f4b118e50f46e4d0afd1d.png

2. Parallelism

60a6bcefe26f4b118e50f46e4d0afd1d.png

3. Compaction

只适用于online compaction

60a6bcefe26f4b118e50f46e4d0afd1d.png

内存优化

MOR

  • Flink 的状态后端设置为 rocksdb (默认的 in memory 状态后端非常的消耗内存)
  • 如果内存足够,compaction.max_memory 可以设置得更大些(默认为 100MB,可以调大到 1024MB
  • 关注 taskManager 分配给每个 write task 的内存,保证每个 write task 能够分配到 write.task.max.size 所配置的内存大小。比如 taskManager 的内存是 4GB, 运行了 2StreamWriteFunction,那每个 write function 能分到 2GB,尽量预留一些缓存。因为网络缓存,taskManager 上其他类型的 task (比如 BucketAssignFunction)也会消耗一些内存
  • 需要关注 compaction 的内存变化。 compaction.max_memory 控制了每个 compaction task 读 log 时可以利用的内存大小。compaction.tasks 控制了 compaction task 的并发

COW

  • 把 Flink 的状态后端设置为 rocksdb (默认的 in memory 状态后端非常的消耗内存)
  • 同时调大 write.task.max.size 和 write.merge.max_memory (默认值分别是 1024MB 和 100MB,可以调整为 2014MB 和 1024MB)
  • 关注 taskManager 分配给每个 write task 的内存,保证每个 write task 能够分配到 write.task.max.size 所配置的内存大小。比如 taskManager 的内存是 4GB, 运行了 2 个 StreamWriteFunction,那每个 write function 能分到 2GB,尽量预留一些缓存。因为网络缓存,taskManager 上其他类型的 task (比如 BucketAssignFunction)也会消耗一些内存

离线批量导入

针对存量数据导入的需求,如果存量数据来源于其他数据源,可以使用离线批量导入功能(bulk_insert),快速将存量数据导入 Hudi。

NOTE

bulk_insert 省去了 avro 的序列化以及数据的 merge 过程,后续也不会再有去重操作。所以,数据的唯一性需要自己来保证。

NOTE

bulk_insertbatch execution mode 模式下执行更加高效。 batch execution mode 模式默认会按照 partition path 排序输入消息再写入 Hudi, 避免 file handle 频繁切换导致性能下降。

NOTE

bulk_insert 的 write tasks 的并发是通过参数 write.tasks 来指定,并发的数量会影响到小文件的数量,理论上,bulk_insert 的 write tasks 的并发数就是划分的 bucket 数, 当然每个 bucket 在写到文件大小上限(parquet 120 MB)的时候会回滚到新的文件句柄,所以最后:写文件数量 >= write.bucket_assign.tasks

参数:

60a6bcefe26f4b118e50f46e4d0afd1d.png

全量接增量

针对全量数据导入后,接增量的需求。如果已经有全量的离线 Hudi 表,需要接上实时写入,并且保证数据不重复,可以开启 全量接增量(index bootstrap)功能。

NOTE

如果觉得流程冗长,可以在写入全量数据的时候资源调大直接走流模式写,全量走完接新数据再将资源调小(或者开启 写入限流 )。

参数

60a6bcefe26f4b118e50f46e4d0afd1d.png

使用流程

1、CREATE TABLE 创建和 Hudi 表对应的语句,注意 table.type 必须正确

2、设置 index.bootstrap.enabled = true 开启索引加载功能

3、在 flink-conf.yaml 中设置 Checkpoint 失败容忍 :execution.checkpointing.tolerable-failed-checkpoints = n(取决于checkpoint 调度次数)

4、等待第一次 Checkpoint 完成,表示索引加载完成

5、索引加载完成后可以退出并保存 savepoint(也可以直接用 externalized checkpoint)

6、重启任务,将 index.bootstrap.enable 设置为 false,参数配置到合适的大小

# NOTE
索引加载是阻塞式,所以在索引加载过程中 Checkpoint 无法完成
索引加载由数据流触发,需要确保每个 partition 都至少有1条数据,即上游 source 有数据进来
索引加载为并发加载,根据数据量大小加载时间不同,可以在log中搜索 finish loading the index under partition 和 Load record form file 日志内容来观察索引加载的进
第一次 Checkpoint 成功就表示索引已经加载完成,后续从 Checkpoint 恢复时无需再次加载索引

Changelog 模式

针对使用 Hudi 保留消息的所有变更(I / -U / U / D),之后接上 Flink 引擎的有状态计算实现全链路近实时数仓生产(增量计算)的需求,Hudi 的 MOR 表 通过行存原生支持保留消息的所有变更(format 层面的集成),通过流读 MOR 表可以消费到所有的变更记录。

参数

60a6bcefe26f4b118e50f46e4d0afd1d.png

NOTE     批(快照)读仍然会合并所有的中间结果,不管 FORMAT 是否已存储中间状态。
NOTE  设置 CHANGELOG.ENABLETRUE 后,中间的变更也只是 BEST EFFORT:异步的压缩任务会将中间变更合并成 1 条,所以如果流读消费不够及时,被压缩后 只能读到最后一条记录。当然,通过调整压缩的缓存时间可以预留一定的时间缓冲给 READER,比如调整压缩的两个参数:COMPACTION.DELTA_COMMITS AND COMPACTION.DELTA_SECONDS

Insert 模式

当前 Hudi 对于 Insert 模式 默认会采用小文件策略:MOR 会追加写 avro log 文件,COW 会不断合并之前的 parquet 文件(并且增量的数据会去重),这样会导致性能下降。

如果想关闭文件合并,可以设置 write.insert.deduplicatefalse。关闭后,不会有任何的去重行为,每次 flush 都是直接写独立的 parquet(MOR 表也会直接写 parquet)。

参数

60a6bcefe26f4b118e50f46e4d0afd1d.png

参考链接:https://hudi.apache.org/cn/docs/next/flink-quick-start-guide








相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
6月前
|
存储 分布式计算 Apache
构建 Streaming Lakehouse:使用 Paimon 和 Hudi 的性能对比
Apache Paimon 和 Apache Hudi 作为数据湖存储格式,有着高吞吐的写入和低延迟的查询性能,是构建数据湖的常用组件。本文将在阿里云EMR 上,针对数据实时入湖场景,对 Paimon 和 Hudi 的性能进行比对,然后分别以 Paimon 和 Hudi 作为统一存储搭建准实时数仓。
58864 8
构建 Streaming Lakehouse:使用 Paimon 和 Hudi 的性能对比
|
6月前
|
存储 关系型数据库 Apache
Halodoc使用Apache Hudi构建Lakehouse的关键经验
Halodoc使用Apache Hudi构建Lakehouse的关键经验
78 4
|
6月前
|
SQL 消息中间件 关系型数据库
iceberg实践
iceberg实践
|
6月前
|
SQL 存储 分布式计算
【大数据技术Hadoop+Spark】Hive数据仓库架构、优缺点、数据模型介绍(图文解释 超详细)
【大数据技术Hadoop+Spark】Hive数据仓库架构、优缺点、数据模型介绍(图文解释 超详细)
1067 0
|
存储 消息中间件 SQL
Flink 基础学习(五)数据存储
前面两篇笔记已经写了数据来源和转换如何使用,那么这篇当然就到了数据存储,接下来将会从以下角度介绍一下(喜闻乐见的 What / Why / How)~:
1271 0
Flink 基础学习(五)数据存储
|
3月前
|
缓存 Cloud Native 关系型数据库
MPP架构数据仓库使用问题之Calcite 是一个什么样的类库,它主要用于什么地方
MPP架构数据仓库使用问题之Calcite 是一个什么样的类库,它主要用于什么地方
|
6月前
|
存储 SQL 调度
Hudi基本概念
Hudi基本概念
72 0
|
6月前
|
存储 SQL 分布式计算
使用Apache Hudi构建大规模、事务性数据湖
使用Apache Hudi构建大规模、事务性数据湖
128 0
|
SQL 存储 分布式计算
|
存储 SQL 分布式计算
下一篇
无影云桌面