1. Hudi Cleaner是做什么的
Hudi Cleaner(清理程序)通常在 commit
和 deltacommit
之后立即运行,删除不再需要的旧文件。如果在使用增量拉取功能,请确保配置了清理项来保留足够数量的commit(提交),以便可以回退,另一个考虑因素是为长时间运行的作业提供足够的时间来完成运行。否则,Cleaner可能会删除该作业正在读取或可能被其读取的文件,并使该作业失败。通常,默认配置为10会允许每30分钟运行一次提取,以保留长达5(10 * 0.5)个小时的数据。如果以繁进行摄取,或者为查询提供更多运行时间,可增加 hoodie.cleaner.commits.retained
配置项的值。
2. Hudi的模式演进(schema evolution)是什么
Hudi使用 Avro
作为记录的内部表示形式,这主要是由于其良好的架构兼容性和演进特性。这也是摄取或ETL管道保持可靠的关键所在。只要传递给Hudi的模式(无论是在 DeltaStreamer
显示提供还是由 SparkDatasource
的 Dataset
模式隐式)向后兼容(例如不删除任何字段,仅追加新字段),Hudi将无缝处理新旧数据的的读/写操作并会保持Hive模式为最新。
3. 如何压缩(compaction)MOR数据集
在MOR数据集上进行压缩的最简单方法是运行内联压缩(compaction inline),但需要花费更多时间。通常情况下,当有少量的迟到数据落入旧分区时,这可能特别有用,在这种情况下,你可能想压缩最后的N个分区,同时等待较旧的分区积累足够的日志。其最终会将大多数最新数据转化查询优化的列格式,即从日志log文件转化为parquet文件。
还可异步运行压缩,这可以通过单独压缩任务来完成。如果使用的是 DeltaStreamer
,则可以在连续模式下运行压缩,在该模式下,会在单个spark任务内同时进行摄取和压缩。
4. Hudi写入的性能/最大延迟
写入Hudi的速度在写入操作以及在调整文件大小做了权衡。就像数据库在磁盘上的直接/原始文件产生I/O开销一样,与读取/写入原始DFS文件或支持数据库之类的功能相比,Hudi可能会产生开销。Hudi采用了数据库文献中的技术,以使这些开销最少,具体可参考下表。
与许多管理时间序列数据的系统一样,如果键具有时间戳前缀或单调增加/减少,则Hudi的性能会更好,而我们几乎总是可以实现这一目标。即便是UUID密钥,也可以按照以下技巧来获得有序的密钥另请参阅调优指南以获取有关JVM和其他配置的更多提示。
5. Hudi读取/查询的性能
- 对于读优化视图(Read optimized views),可以达到Hive/Spark/Presto的parquet表相同的查询性能。
- 对于增量视图( Incremental views),相对于全表扫描所花费的时间,速度更快。例如,如果在最后一个小时中,在1000个文件的分区中仅更改了100个文件,那么与完全扫描该分区以查找新数据相比,使用Hudi中的增量拉取可以将速度提高10倍。
- 对于实时视图(Real time views),性能类似于Hive/Spark/Presto中Avro格式的表。
6. 如何避免创建大量小文件
Hudi的一项关键设计是避免创建小文件,并且始终写入适当大小的文件,其会在摄取/写入上花费更多时间以保持查询的高效。写入非常小的文件然后进行合并的方法只能解决小文件带来的系统可伸缩性问题,其无论如何都会因为小文件而降低查询速度。
执行插入更新/插入操作时,Hudi可以配置文件大小。(注意:bulk_insert操作不提供此功能,其设计为用来替代 spark.write.parquet
。)
对于写时复制,可以配置基本/parquet文件的最大大小和软限制,小于限制的为小文件。Hudi将在写入时会尝试将足够的记录添加到一个小文件中,以使其达到配置的最大限制。例如,对于 compactionSmallFileSize=100MB
和 limitFileSize=120MB
,Hudi将选择所有小于100MB的文件,并尝试将其增加到120MB。
对于读时合并,几乎没有其他配置。可以配置最大日志大小和一个因子,该因子表示当数据从avro转化到parquet文件时大小减小量。
HUDI-26
将较小的文件组合并成较大的文件组,从而提升提升性能。
7. 如何使用DeltaStreamer或Spark DataSource API写入未分区的Hudi数据集
Hudi支持写入未分区数据集。如果要写入未分区的Hudi数据集并执行配置单元表同步,需要在传递的属性中设置以下配置:
hoodie.datasource.write.keygenerator.class=org.apache.hudi.NonpartitionedKeyGenerator hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.NonPartitionedExtractor
8. 为什么必须进行两种不同的配置才能使Spark与Hudi配合使用
非Hive引擎倾向于自己列举DFS上的文件来查询数据集。例如,Spark直接从文件系统(HDFS或S3)读取路径。
Spark调用如下:
- org.apache.spark.rdd.NewHadoopRDD.getPartitions
- org.apache.parquet.hadoop.ParquetInputFormat.getSplits
- org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits
在不了解Hudi的文件布局的情况下,引擎只会简单地读取所有parquet文件并显示结果,这样结果中可能会出现大量的重复项。
有两种方法可以配置查询引擎来正确读取Hudi数据集
A) 调用 HoodieParquetInputFormat#getSplits
和 HoodieParquetInputFormat#getRecordReader
方法
- Hive原生就会执行此操作,因为InputFormat是Hive中插入表格式的抽象。HoodieParquetInputFormat扩展了MapredParquetInputFormat,其是hive的一种输入格式,将Hudi表注册到Hive metastore中。
- 当使用
UseFileSplitsFromInputFormat
注解时,Presto会使用输入格式来获取分片,然后继续使用自己的优化/矢量化parquet读取器来查询写时复制表。 - 可以使用
--conf spark.sql.hive.convertMetastoreParquet=false
将Spark强制回退到HoodieParquetInputFormat
类。
B) 使引擎调用路径过滤器(path filter)或其他方式来直接调用Hudi类来过滤DFS上的文件并挑选最新的文件切片
- 即使我们可以强制Spark回退到使用InputFormat类,但这样做可能会失去使用Spark的parquet读取器的能力。
- 为保持parquet文件读取性能的优势,我们将
HoodieROTablePathFilter
设置为路径过滤器,并在Spark 的Hadoop Configuration中指定,确保始终选择Hudi相关文件的文件夹(路径)或文件的最新文件片。这将过滤出重复的条目并显示每个记录的最新条目。
9. 已有数据集,如何使用部分数据来评估Hudi
可以将该数据的一部分批量导入到新的hudi表中。例如一个月的数据
spark.read.parquet("your_data_set/path/to/month") .write.format("org.apache.hudi") .option("hoodie.datasource.write.operation", "bulk_insert") .option("hoodie.datasource.write.storage.type", "storage_type") // COPY_ON_WRITE or MERGE_ON_READ .option(RECORDKEY_FIELD_OPT_KEY, ""). .option(PARTITIONPATH_FIELD_OPT_KEY, "") ... .mode(SaveMode.Append) .save(basePath);
一旦有初始副本后,就可选择一些数据样本进行更新插入操作
spark.read.parquet("your_data_set/path/to/month").limit(n) // Limit n records .write.format("org.apache.hudi") .option("hoodie.datasource.write.operation", "upsert") .option(RECORDKEY_FIELD_OPT_KEY, ""). .option(PARTITIONPATH_FIELD_OPT_KEY, "") ... .mode(SaveMode.Append) .save(basePath);
对于读取的表的合并,若还需要调度和运行压缩(compaction)任务。则可以使用 spark sumbit
直接提交 org.apache.hudi.utilities.HoodieCompactor
运行压缩,也可以使用HUDI CLI运行压缩。