MOR与COW
Hudi提供两类型表:写时复制(Copy on Write, COW)表和读时合并(Merge On Read, MOR)表。
表类型:
Copy On Write
COW,顾名思义,它是在数据写入的时候,复制一份原来的拷贝,在其基础上添加新数据。
正在读数据的请求,读取的是最近的完整副本,这类似Mysql 的MVCC的思想。
- 优点:读取时,只读取对应分区的一个数据文件即可,较为高效;
- 缺点:数据写入的时候,需要复制一个先前的副本再在其基础上生成新的数据文件,这个过程比较耗时。
- 适用场景:对于一些读多写少的数据,写入时复制的做法就很不错,例如配置、黑名单、物流地址等变化非常少的数据,这是一种无锁的实现。可以帮我们实现程序更高的并发。
- COW表主要使用列式文件格式(Parquet)存储数据,在写入数据过程中,执行同步合并,更新数据版本并重写数据文件,类似RDBMS中的B-Tree更新。
- 更新update:在更新记录时,Hudi会先找到包含更新数据的文件,然后再使用更新值(最新的数据)重写该文件,包含其他记录的文件保持不变。当突然有大量写操作时会导致重写大量文件,从而导致极大的I/O开销。
- 读取read:在读取数据时,通过读取最新的数据文件来获取最新的更新,此存储类型适用于少量写入和大量读取的场景
COW缺陷
- 数据一致性问题
cow这种实现只是保证数据的最终一致性,在添加到拷贝数据但还没进行替换的时候,读到的仍然是旧数据。 - 内存占用问题
如果对象比较大,频繁地进行替换会消耗内存,从而引发 Java 的 GC 问题,这个时候,我们应该考虑其他的容器,例如 ConcurrentHashMap。
Merge On Read
简称MOR,新插入的数据存储在delta log 中,定期再将delta log合并进行parquet数据文件。
读取数据时,会将delta log跟老的数据文件做merge,得到完整的数据返回。下图演示了MOR的两 种数据读写方式。
- 优点:由于写入数据先写delta log,且delta log较小,所以写入成本较低;
- 缺点:需要定期合并整理compact,否则碎片文件较多。读取性能较差,因为需要将delta log和老数据文件合并
- MOR表是COW表的升级版,它使用列式(parquet)与行式(avro)文件混合的方式存储数据。在更新记录时,类似NoSQL中的LSM-Tree更新。
- 更新:在更新记录时,仅更新到增量文件(Avro)中,然后进行异步(或同步)的compaction,最后创建列式文件(parquet)的新版本。此存储类型适合频繁写的工作负载,因为新记录是以追加的模式写入增量文件中。
- 读取:在读取数据集时,需要先将增量文件与旧文件进行合并,然后生成列式文件成功后,再进行查询
写时拷贝(COW)-这类似于RDBMS B-Tree更新
读时合并(MOR)-这类似于No-SQL LSM-Tree更新
查询方式:
Hudi支持以下存储数据的视图
读优化视图 (Read Optimized Queries)
- 直接查询基本文件(数据集的最新快照),其实就是列式文件(Parquet)。并保证与非Hudi列式数据集相比,具有相同的列式查询性能。
- 读优化查询和快照查询相同仅访问基本文件,提供给定文件片自上次执行压缩操作以来的数据。通常查询数据的最新程度的保证取决于压缩策
- 可查看给定的commit/compact即时操作的表的最新快照。
增量视图 (Snapshot Queries)
- 仅查询新写入数据集的文件,需要指定一个Commit/Compaction的即时时间(位于Timeline上的某个Instant)作为条件,来查询此条件之后的新数据。
- 可查看自给定commit/delta commit即时操作以来新写入的数据,有效的提供变更流来启用增量数据管道。
实时视图 (Snapshot Queries)
- 查询某个增量提交操作中数据集的最新快照,先进行动态合并最新的基本文件(Parquet)和增量文件(Avro)来提供近实时数据集(通常会存在几分钟的延迟)。
- 读取所有 partiiton 下每个 FileGroup 最新的 FileSlice 中的文件,Copy On Write 表读 parquet 文件,Merge On Read 表读 parquet + log 文件
对于 MergeOnRead 表选择查询类型需做以下权衡:
hudi 在hive中有两张表:
xxx_ro:历史(compact策略触发后能查询到的数据,Read Optimized)
xxx_rt:实时
hudi payload
Hudi 的Payload是一种可扩展的数据处理机制,通过不同的Payload我们可以实现复杂场景的定制化数据写入方式,大大增加了数据处理的灵活性。Hudi Payload在写入和读取Hudi表时对数据进行去重、过滤、合并等操作的工具类,通过使用参数 "hoodie.datasource.write.payload.class"指定我们需要使用的Payload class。本文我们会深入探讨Hudi Payload的机制和不同预制Payload之前的区别及使用场景。
在数据写入的时候,现有整行插入、整行覆盖的方式是无法满足所以场景要求的,写入的数据也会有一些定制化处理需求,因此需要有更加灵活的写入方式以及对写入数据进行一定的处理,Hudi提供的playload方式可以很好的解决该问题,例如:可以解决写入时是数据去重问题,针对部分字段进行更新等等。
写入Hudi表时需要指定一个参数hoodie.datasource.write.precombine.field
,这个字段也称为Precombine Key,Hudi Payload就是根据这个指定的字段来处理数据,它将每条数据都构建成一个Payload,因此数据间的比较就变成了Payload之间的比较。只需要根据业务需求实现Payload的比较方法,即可实现对数据的处理。
数据湖hudi的spark优化参数
小文件优化
为了便于说明,本文只考虑 COPY_ON_WRITE 表的小文件自动合并功能。在阅读下文之前,我们先来看看几个相关的参数:
hoodie.parquet.max.file.size
:数据文件的最大大小。Hudi 会尝试将文件大小保持在此配置值;hoodie.parquet.small.file.limit
:文件大小小于这个配置值的均视为小文件;hoodie.copyonwrite.insert.split.size
:单分区插入的数据条数,这个值应该和单个文件的记录条数相同。可以根据 hoodie.parquet.max.file.size 和单条记录的大小进行调整。spark+hudi优化
通过Spark作业将数据写入Hudi时,需要注意的调优手段如下:
输入并行性: Hudi对输入进行分区默认并发度为1500,以确保每个Spark分区都在2GB的限制内(在Spark2.4.0版本之后去除了该限制),如果有更大的输入,则相应地进行调整。我们建议设置shuffle的并发度,配置项为hoodie.[insert|upsert|bulkinsert].shuffle.parallelism
,以使其至少达到input_data_size/500MB
。
Off-heap(堆外)内存: Hudi写入parquet文件,需要使用一定的堆外内存,如果遇到此类故障,请考虑设置类似spark.yarn.executor.memoryOverhead
或spark.yarn.driver.memoryOverhead
的值。
Spark 内存: 通常Hudi需要能够将单个文件读入内存以执行合并或压缩操作,因此执行程序的内存应足以容纳此文件。另外,Hudi会缓存输入数据以便能够智能地放置数据,因此预留一些spark.memory.storageFraction
通常有助于提高性能。
调整文件大小: 设置limitFileSize
以平衡接收/写入延迟与文件数量,并平衡与文件数据相关的元数据开销。
时间序列/日志数据: 对于单条记录较大的数据库/nosql变更日志,可调整默认配置。另一类非常流行的数据是时间序列/事件/日志数据,它往往更加庞大,每个分区的记录更多。在这种情况下,请考虑通过bloomFilterFPP()/bloomFilterNumEntries()
来调整Bloom过滤器的精度,以加速目标索引查找时间,另外可考虑一个以事件时间为前缀的键,这将使用范围修剪并显着加快索引查找的速度。
GC调优: 请确保遵循Spark调优指南中的垃圾收集调优技巧,以避免OutOfMemory错误。使用G1 / CMS收集器,其中添加到spark.executor.extraJavaOptions的示例如下:
-XX:NewSize=1g -XX:SurvivorRatio=2 -XX:+UseCompressedOops -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:CMSInitiatingOccupancyFraction=70 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCApplicationConcurrentTime -XX:+PrintTenuringDistribution -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/hoodie-heapdump.hprof
OutOfMemory错误: 如果出现OOM错误,则可尝试通过如下配置处理:
spark.memory.fraction = 0.2 spark.memory.storageFraction = 0.2
允许其溢出而不是OOM(速度变慢与间歇性崩溃相比)。
以下是可以参考的完整的生产配置:
spark.driver.extraClassPath /etc/hive/conf spark.driver.extraJavaOptions -XX:+PrintTenuringDistribution -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCApplicationConcurrentTime -XX:+PrintGCTimeStamps -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/hoodie-heapdump.hprof spark.driver.maxResultSize 2g spark.driver.memory 4g spark.executor.cores 1 spark.executor.extraJavaOptions -XX:+PrintFlagsFinal -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy -XX:+UnlockDiagnosticVMOptions -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/hoodie-heapdump.hprof spark.executor.id driver spark.executor.instances 300 spark.executor.memory 6g spark.rdd.compress true spark.kryoserializer.buffer.max 512m spark.serializer org.apache.spark.serializer.KryoSerializer spark.shuffle.service.enabled true spark.sql.hive.convertMetastoreParquet false spark.submit.deployMode cluster spark.task.cpus 1 spark.task.maxFailures 4 spark.yarn.driver.memoryOverhead 1024 spark.yarn.executor.memoryOverhead 3072 spark.yarn.max.executor.failures 100
数据湖hudi的flink优化参数
表参数
1、Memory
2. Parallelism
3. Compaction
只适用于online compaction
内存优化
MOR
- Flink 的状态后端设置为
rocksdb
(默认的in memory
状态后端非常的消耗内存) - 如果内存足够,
compaction.max_memory
可以设置得更大些(默认为100MB
,可以调大到1024MB
) - 关注 taskManager 分配给每个 write task 的内存,保证每个 write task 能够分配到
write.task.max.size
所配置的内存大小。比如 taskManager 的内存是4GB
, 运行了2
个StreamWriteFunction
,那每个 write function 能分到2GB
,尽量预留一些缓存。因为网络缓存,taskManager 上其他类型的 task (比如BucketAssignFunction
)也会消耗一些内存 - 需要关注 compaction 的内存变化。
compaction.max_memory
控制了每个 compaction task 读 log 时可以利用的内存大小。compaction.tasks
控制了 compaction task 的并发
COW
- 把 Flink 的状态后端设置为 rocksdb (默认的 in memory 状态后端非常的消耗内存)
- 同时调大 write.task.max.size 和 write.merge.max_memory (默认值分别是 1024MB 和 100MB,可以调整为 2014MB 和 1024MB)
- 关注 taskManager 分配给每个 write task 的内存,保证每个 write task 能够分配到 write.task.max.size 所配置的内存大小。比如 taskManager 的内存是 4GB, 运行了 2 个 StreamWriteFunction,那每个 write function 能分到 2GB,尽量预留一些缓存。因为网络缓存,taskManager 上其他类型的 task (比如 BucketAssignFunction)也会消耗一些内存
离线批量导入
针对存量数据导入的需求,如果存量数据来源于其他数据源,可以使用离线批量导入功能(bulk_insert
),快速将存量数据导入 Hudi。
NOTE
bulk_insert
省去了 avro 的序列化以及数据的 merge 过程,后续也不会再有去重操作。所以,数据的唯一性需要自己来保证。
NOTE
bulk_insert
在 batch execution mode
模式下执行更加高效。 batch execution mode
模式默认会按照 partition path 排序输入消息再写入 Hudi, 避免 file handle 频繁切换导致性能下降。
NOTE
bulk_insert
的 write tasks 的并发是通过参数 write.tasks
来指定,并发的数量会影响到小文件的数量,理论上,bulk_insert
的 write tasks 的并发数就是划分的 bucket 数, 当然每个 bucket 在写到文件大小上限(parquet 120 MB)的时候会回滚到新的文件句柄,所以最后:写文件数量 >= write.bucket_assign.tasks
。
参数:
全量接增量
针对全量数据导入后,接增量的需求。如果已经有全量的离线 Hudi 表,需要接上实时写入,并且保证数据不重复,可以开启 全量接增量(index bootstrap
)功能。
NOTE
如果觉得流程冗长,可以在写入全量数据的时候资源调大直接走流模式写,全量走完接新数据再将资源调小(或者开启 写入限流 )。
参数
使用流程
1、CREATE TABLE 创建和 Hudi 表对应的语句,注意 table.type
必须正确
2、设置 index.bootstrap.enabled
= true
开启索引加载功能
3、在 flink-conf.yaml
中设置 Checkpoint 失败容忍 :execution.checkpointing.tolerable-failed-checkpoints = n
(取决于checkpoint 调度次数)
4、等待第一次 Checkpoint 完成,表示索引加载完成
5、索引加载完成后可以退出并保存 savepoint(也可以直接用 externalized checkpoint)
6、重启任务,将 index.bootstrap.enable
设置为 false
,参数配置到合适的大小
# NOTE 索引加载是阻塞式,所以在索引加载过程中 Checkpoint 无法完成 索引加载由数据流触发,需要确保每个 partition 都至少有1条数据,即上游 source 有数据进来 索引加载为并发加载,根据数据量大小加载时间不同,可以在log中搜索 finish loading the index under partition 和 Load record form file 日志内容来观察索引加载的进 第一次 Checkpoint 成功就表示索引已经加载完成,后续从 Checkpoint 恢复时无需再次加载索引
Changelog 模式
针对使用 Hudi 保留消息的所有变更(I / -U / U / D),之后接上 Flink 引擎的有状态计算实现全链路近实时数仓生产(增量计算)的需求,Hudi 的 MOR 表 通过行存原生支持保留消息的所有变更(format 层面的集成),通过流读 MOR 表可以消费到所有的变更记录。
参数
NOTE 批(快照)读仍然会合并所有的中间结果,不管 FORMAT 是否已存储中间状态。
NOTE 设置 CHANGELOG.ENABLE
为 TRUE
后,中间的变更也只是 BEST EFFORT
:异步的压缩任务会将中间变更合并成 1
条,所以如果流读消费不够及时,被压缩后 只能读到最后一条记录。当然,通过调整压缩的缓存时间可以预留一定的时间缓冲给 READER,比如调整压缩的两个参数:COMPACTION.DELTA_COMMITS
AND COMPACTION.DELTA_SECONDS
。
Insert 模式
当前 Hudi 对于 Insert 模式
默认会采用小文件策略:MOR 会追加写 avro log 文件,COW 会不断合并之前的 parquet 文件(并且增量的数据会去重),这样会导致性能下降。
如果想关闭文件合并,可以设置 write.insert.deduplicate
为 false
。关闭后,不会有任何的去重行为,每次 flush 都是直接写独立的 parquet(MOR 表也会直接写 parquet)。
参数
参考链接:https://hudi.apache.org/cn/docs/next/flink-quick-start-guide