大数据领域中的小文件问题,也是一个非常棘手的问题,仅次于数据倾斜问题,对于时间和性能能都是毁灭性打击。本文参考网上对于小文件问题的定义和常见系统的解决方案,给大家还原一个大数据系统中小文件问题的系统性解决方案。本文针对目前大数据领域主要的主要框架,讲解了小文件产生的原因和一些解决办法
hive中小文件的处理方法
1. 使用 hive 自带的 concatenate 命令,自动合并小文件
大数据领域中的小文件问题,也是一个非常棘手的问题,仅次于数据倾斜问题,对于时间和性能能都是毁灭性打击。本文参考网上对于小文件问题的定义和常见系统的解决方案,给大家还原一个大数据系统中小文件问题的系统性解决方案。 本文针对目前大数据领域主要的主要框架,讲解了小文件产生的原因和一些解决办法 hive中小文件的处理方法 1. 使用 hive 自带的 concatenate 命令,自动合并小文件
注意:
1、concatenate 命令只支持 RCFILE 和 ORC 文件类型。
2、使用concatenate命令合并小文件时不能指定合并后的文件数量,但可以多次执行该命令。
3、当多次使用concatenate后文件数量不在变化,这个跟参数 mapreduce.input.fileinputformat.split.minsize=256mb 的设置有关,可设定每个文件的最小size。
2. 调整参数减少Map数量
#执行Map前进行小文件合并 #CombineHiveInputFormat底层是 Hadoop的 CombineFileInputFormat 方法 #此方法是在mapper中将多个文件合成一个split作为输入 set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; -- 默认 #每个Map最大输入大小(这个值决定了合并后文件的数量) set mapred.max.split.size=256000000; -- 256M #一个节点上split的至少的大小(这个值决定了多个DataNode上的文件是否需要合并) set mapred.min.split.size.per.node=100000000; -- 100M #一个交换机下split的至少的大小(这个值决定了多个交换机上的文件是否需要合并) set mapred.min.split.size.per.rack=100000000; -- 100M
设置map输出和reduce输出进行合并的相关参数:
#设置map端输出进行合并,默认为true set hive.merge.mapfiles = true; #设置reduce端输出进行合并,默认为false set hive.merge.mapredfiles = true; #设置合并文件的大小 set hive.merge.size.per.task = 256*1000*1000; -- 256M #当输出文件的平均大小小于该值时,启动一个独立的MapReduce任务进行文件merge set hive.merge.smallfiles.avgsize=16000000; -- 16M
启用压缩
# hive的查询结果输出是否进行压缩 set hive.exec.compress.output=true; # MapReduce Job的结果输出是否使用压缩 set mapreduce.output.fileoutputformat.compress=true;
3. 减少Reduce的数量
#reduce 的个数决定了输出的文件的个数,所以可以调整reduce的个数控制hive表的文件数量, #hive中的分区函数 distribute by 正好是控制MR中partition分区的, #然后通过设置reduce的数量,结合分区函数让数据均衡的进入每个reduce即可。 #设置reduce的数量有两种方式,第一种是直接设置reduce个数 set mapreduce.job.reduces=10; #第二种是设置每个reduce的大小,Hive会根据数据总大小猜测确定一个reduce个数 set hive.exec.reducers.bytes.per.reducer=5120000000; -- 默认是1G,设置为5G #执行以下语句,将数据均衡的分配到reduce中 set mapreduce.job.reduces=10; insert overwrite table A partition(dt) select * from B distribute by rand(); 解释:如设置reduce数量为10,则使用 rand(), 随机生成一个数 x % 10 , 这样数据就会随机进入 reduce 中,防止出现有的文件过大或过小
4、distribute by
insert overwrite table test [partition(hour=...)] select * from test distribute by floor (rand()*5);
5. 使用hadoop的archive将小文件归档
Hadoop Archive简称HAR,是一个高效地将小文件放入HDFS块中的文件存档工具,它能够将多个小文件打包成一个HAR文件,这样在减少namenode内存使用的同时,仍然允许对文件进行透明的访问
#用来控制归档是否可用 set hive.archive.enabled=true; #通知Hive在创建归档时是否可以设置父目录 set hive.archive.har.parentdir.settable=true; #控制需要归档文件的大小 set har.partfile.size=1099511627776; #使用以下命令进行归档 ALTER TABLE A ARCHIVE PARTITION(dt='2020-12-24', hr='12'); #对已归档的分区恢复为原文件 ALTER TABLE A UNARCHIVE PARTITION(dt='2020-12-24', hr='12');
hive小文件调优实践_PONY LEE的博客-CSDN博客_hive设置文件大小
spark处理小文件
1、通过repartition或coalesce算子控制最后的DataSet的分区数(Spark Core)
注意repartition和coalesce的区别,具体可以参考文章《重要|Spark分区并行度决定机制》
2. spark自适应调整
社区在Spark2.3版本之后的AdaptiveExecute特性之中就能很好的解决Partition个数过多导致小文件过多的问题. 通过动态的评估Shuffle输入的个数(通过设置spark.sql.adaptive.shuffle.targetPostShuffleInputSize实现), 可以聚合多个Task任务, 减少Reduce的个数 。
使用方式:
# spark2.3 set spark.sql.adaptive.enabled=true set spark.sql.adaptive.shuffle.targetPostShuffleInputSize=128MB # spark3.0 spark.sql.adaptive.enabled=true spark.sql.adaptive.coalescePartitions.enabled=true
3.HINT方式
社区在Spark2.4版本之后引入HINT模式SPARK-24940, 可以由用户来指定最后分区的个数, 只要在SQL语句之中加入注释文件。
3.HINT方式 社区在Spark2.4版本之后引入HINT模式SPARK-24940, 可以由用户来指定最后分区的个数, 只要在SQL语句之中加入注释文件。
支持简单无Shuffle模式的Reparation
缺点:
需要人工干预, 设计Partition的个数, 而对于变化的场景来说, 难有一个固定的Partition个数无法处理Shuffle输出比过低的场景。
4.独立的小文件合并
处理Shuffle输出比过低的场景, 因此我们需要一种兜底方案: 直接读取HDFS上的数据, 进行合并操作。当插入任务完成之后, 新启动一个Job读取所有的数据, 然后根据设置的文件大小, 进行合并并会写到HDFS之中. 由于是直读直写的方式, 因此对于数据大小的评估是非常精确的, 因此可以很好的避免Shuffle输出比的问题.
优点:
彻底解决小文件问题
缺点:
引入新的一次Job过程, 性能会受影响, 特别对中型任务会有一定的影响(10秒左右)
使用方式:
set spark.sql.merge.enabled=true; set spark.sql.merge.size.per.task=134217728; --128 * 1024 * 1024 bytes
性能优化: ORC和Parquet格式支持按行读取和按Stripe读取, Stripe读取可以认为是GroupRead, 由于不需要解析文件里面具体的数值, 因此可以按照Stripe粒度读取文件, 再写入文件之中, 以Stripe粒度合并文件。
set spark.sql.merge.mode=fast; -- 默认是pretty, 是逐行读写文件的, 性能较慢
fast模式目前只针对ORC格式做了优化性能较快, 其他格式会额外引入一次任务, 尤其在动态分区场景下, 性能会下降不少
5.spark自定义异步合并工具类
object CombineSmallFiles { /** * Step 1:读取指定目录下需要合并文件的目录和目录下的文件名,修改文件名后缀为.old:相当于记录为需要合并 */ def searchFiles(spark: SparkSession, hdfs: FileSystem, srcStr: String, fileType: String): Unit = { val srcPath = new Path(srcStr) val fileStatus = hdfs.listStatus(srcPath) var smallFileCount: Int = 0 fileStatus.foreach(file => { // 对大小小于128M的文件进行标记 if (!file.isDirectory && !file.getPath.getName.startsWith(".") && (hdfs.getContentSummary(file.getPath).getLength < 134217728)) { if (!file.getPath.getName.endsWith(".old")) { hdfs.rename(file.getPath, new Path(file.getPath + ".old")) } smallFileCount += 1 } else if (file.isDirectory && !file.getPath.getName.startsWith(".")) { searchFiles(spark, hdfs, file.getPath.toUri.getPath, fileType) } }) if (smallFileCount > 1) { combineSmallFile(spark, hdfs, srcPath, fileType) } } /** * 合并 */ def combineSmallFile(spark: SparkSession, hdfs: FileSystem, srcPath: Path, fileType: String): Unit = { val srcStr = srcPath.toUri.getPath val combineStr = srcStr + "/.combine" //如果因为程序中断导致combine遗留合并后的文件,则移动后清除 moveCombineFileAndRemove(hdfs, srcStr, combineStr) // Step 2:获取目录下.old的文件,读取写入临时目录并生成文件.combine spark .read .format(fileType) .load(srcStr + "/*.old") .repartition(1) .write .format(fileType) .save(combineStr) // Step3:删除.old -> mv .combine下文件到源目录 -> 删除.combine hdfs.listStatus(srcPath).foreach(file => { // 对.old结尾的文件清除 if (!file.isDirectory && file.getPath.getName.endsWith(".old")) { hdfs.delete(file.getPath, true) } }) moveCombineFileAndRemove(hdfs, srcStr, combineStr) } /** * 移动合并文件并清除合并用的临时目录 * * @param hdfs * @param srcStr * @param combineStr */ def moveCombineFileAndRemove(hdfs: FileSystem, srcStr: String, combineStr: String): Unit = { val combinePath = new Path(combineStr) if (!hdfs.exists(combinePath)) return hdfs.listStatus(combinePath).foreach(combineFile => { if (combineFile.getPath.getName.startsWith("part-")) { hdfs.rename(combineFile.getPath, new Path(srcStr + "/" + combineFile.getPath.getName + ".combine")) hdfs.deleteOnExit(combineFile.getPath) } }) hdfs.delete(new Path(combineStr), true) } }
调用
/** * 运维程序:小文件合并 */ object BT_OPS_CombineSmallFiles { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName("BT_OPS_CombineSmallFiles") .config("spark.sql.adaptive.enabled", "true") .config("spark.sql.adaptive.shuffle.targetPostShuffleInputSize", "134217728") .enableHiveSupport() .getOrCreate() val hdfs: FileSystem = org.apache.hadoop.fs.FileSystem.get(spark.sparkContext.hadoopConfiguration) val srcPath = "你的HDFS路径" searchFiles(spark, hdfs,srcPath, "orc") spark.stop } }
6. 增加batch大小(spark streaming)
这种方法很容易理解,batch越大,从外部接收的event就越多, 内存积累的数据也就越多,那么输出的文件数也就回变少, 比如将上面例子中的batch时间从10s增加为100s,| 那么一个小时的文件数量就会减少到1152个。 但是此时延迟会比较大,不适合实时性要求高的场景。
7. 自己调用foreach去append
sparkstreaming提供的foreach这个outout类api,可以让我们自定义输出计算结果的方法。 那么我们其实也可以利用这个特性,那就是每个batch在要写文件时,并不是去生成一个新的文件流,而是把之前的文件打开。考虑这种方法的可行性, 首先,HDFS上的文件不支持修改,但是很多都支持追加,那么每个batch的每个partition就对应一个输出文件,每次都去追加这个partition对应的输出文件, 这样也可以实现减少文件数量的目的。这种方法要注意的就是不能无限制的追加,当判断一个文件已经达到某一个阈值时,就要产生一个新的文件进行追加了。
flink处理小文件
1.自定义 PartitionCommitPolicy
一个典型的 Parquet 文件合并的案例代码如下。我们可以通过
'sink.partition-commit.policy.kind' = 'metastore,success-file,custom', 'sink.partition-commit.policy.class' = 'me.lmagics.flinkexp.hiveintegration.util.ParquetFileMergingCommitPolicy
参数进行配置:
package me.lmagics.flinkexp.hiveintegration.util; import org.apache.flink.hive.shaded.parquet.example.data.Group; import org.apache.flink.hive.shaded.parquet.hadoop.ParquetFileReader; import org.apache.flink.hive.shaded.parquet.hadoop.ParquetFileWriter.Mode; import org.apache.flink.hive.shaded.parquet.hadoop.ParquetReader; import org.apache.flink.hive.shaded.parquet.hadoop.ParquetWriter; import org.apache.flink.hive.shaded.parquet.hadoop.example.ExampleParquetWriter; import org.apache.flink.hive.shaded.parquet.hadoop.example.GroupReadSupport; import org.apache.flink.hive.shaded.parquet.hadoop.metadata.CompressionCodecName; import org.apache.flink.hive.shaded.parquet.hadoop.metadata.ParquetMetadata; import org.apache.flink.hive.shaded.parquet.hadoop.util.HadoopInputFile; import org.apache.flink.hive.shaded.parquet.schema.MessageType; import org.apache.flink.table.filesystem.PartitionCommitPolicy; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; import java.util.List; public class ParquetFileMergingCommitPolicy implements PartitionCommitPolicy { private static final Logger LOGGER = LoggerFactory.getLogger(ParquetFileMergingCommitPolicy.class); @Override public void commit(Context context) throws Exception { Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); String partitionPath = context.partitionPath().getPath(); List<Path> files = listAllFiles(fs, new Path(partitionPath), "part-"); LOGGER.info("{} files in path {}", files.size(), partitionPath); MessageType schema = getParquetSchema(files, conf); if (schema == null) { return; } LOGGER.info("Fetched parquet schema: {}", schema.toString()); Path result = merge(partitionPath, schema, files, fs); LOGGER.info("Files merged into {}", result.toString()); } private List<Path> listAllFiles(FileSystem fs, Path dir, String prefix) throws IOException { List<Path> result = new ArrayList<>(); RemoteIterator<LocatedFileStatus> dirIterator = fs.listFiles(dir, false); while (dirIterator.hasNext()) { LocatedFileStatus fileStatus = dirIterator.next(); Path filePath = fileStatus.getPath(); if (fileStatus.isFile() && filePath.getName().startsWith(prefix)) { result.add(filePath); } } return result; } private MessageType getParquetSchema(List<Path> files, Configuration conf) throws IOException { if (files.size() == 0) { return null; } HadoopInputFile inputFile = HadoopInputFile.fromPath(files.get(0), conf); ParquetFileReader reader = ParquetFileReader.open(inputFile); ParquetMetadata metadata = reader.getFooter(); MessageType schema = metadata.getFileMetaData().getSchema(); reader.close(); return schema; } private Path merge(String partitionPath, MessageType schema, List<Path> files, FileSystem fs) throws IOException { Path mergeDest = new Path(partitionPath + "/result-" + System.currentTimeMillis() + ".parquet"); ParquetWriter<Group> writer = ExampleParquetWriter.builder(mergeDest) .withType(schema) .withConf(fs.getConf()) .withWriteMode(Mode.CREATE) .withCompressionCodec(CompressionCodecName.SNAPPY) .build(); for (Path file : files) { ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), file) .withConf(fs.getConf()) .build(); Group data; while((data = reader.read()) != null) { writer.write(data); } reader.close(); } writer.close(); for (Path file : files) { fs.delete(file, false); } return mergeDest; } }
2. flink1.12之后新增的 table/SQL 参数配置
auto-compaction=true compaction.file-size=1024M
3. flink datastream api
从 1.15 版本开始 FileSink
开始支持已经提交 pending
文件的合并,从而允许应用设置一个较小的时间周期并且避免生成大量的小文件。尤其是当用户使用 bulk 格式 的时候:这种格式要求用户必须在 checkpoint 的时候切换文件。
文件合并功能可以通过以下代码打开:
FileSink<Integer> fileSink= FileSink.forRowFormat(new Path(path),new SimpleStringEncoder<Integer>()) .enableCompact( FileCompactStrategy.Builder.newBuilder() .setNumCompactThreads(1024) .enableCompactionOnCheckpoint(5) .build(), new RecordWiseFileCompactor<>( new DecoderBasedReader.Factory<>(SimpleStringDecoder::new))) .build();
这一功能开启后,在文件转为 pending
状态与文件最终提交之间会进行文件合并。这些 pending
状态的文件将首先被提交为一个以 .
开头的 临时文件。这些文件随后将会按照用户指定的策略和合并方式进行合并并生成合并后的 pending
状态的文件。然后这些文件将被发送给 Committer 并提交为正式文件,在这之后,原始的临时文件也会被删除掉。
当开启文件合并功能时,用户需要指定 FileCompactStrategy 与 FileCompactor 。
FileCompactStrategy 指定何时以及哪些文件将被合并。目前有两个并行的条件:目标文件大小与间隔的 Checkpoint 数量。当目前缓存的文件的总大小达到指定的阈值,或自上次合并后经过的 Checkpoint 次数已经达到指定次数时, FileSink
将创建一个异步任务来合并当前缓存的文件。
FileCompactor 指定如何将给定的路径列表对应的文件进行合并将结果写入到文件中。根据如何写文件,它可以分为两类:
- OutputStreamBasedFileCompactor : 用户将合并后的结果写入一个输出流中。通常在用户不希望或者无法从输入文件中读取记录时使用。这种类型的
CompactingFileWriter
的一个例子是 ConcatFileCompactor ,它直接将给定的文件进行合并并将结果写到输出流中。 - RecordWiseFileCompactor :这种类型的
CompactingFileWriter
会逐条读出输入文件的记录用户,然后和FileWriter
一样写入输出文件中。CompactingFileWriter
的一个例子是 RecordWiseFileCompactor ,它从给定的文件中读出记录并写出到CompactingFileWriter
中。用户需要指定如何从原始文件中读出记录。
注意事项1 一旦启用了文件合并功能,此后若需要再关闭,必须在构建
FileSink
时显式调用disableCompact
方法。注意事项2 如果启用了文件合并功能,文件可见的时间会被延长。
iceberg处理小文件
使用spark+iceberg合并小文件
import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.actions.Actions; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.hive.HiveCatalog; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.internal.SQLConf; import java.util.HashMap; import java.util.Map; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS; /** * @author: zhushang * @create: 2021-04-02 14:30 */ public class SparkCompaction { public static void main(String[] args) { TableIdentifier identifier = TableIdentifier.of(Namespace.of("db"), "table"); Map<String, String> config = new HashMap<>(); config.put("type", "iceberg"); config.put("catalog-type", "hive"); config.put("property-version", "1"); config.put("warehouse", "warehouse"); config.put("uri", "thrift://local:9083"); config.put("io-impl", "org.apache.iceberg.aliyun.oss.OSSFileIO"); config.put("oss.endpoint", "https://xxx.aliyuncs.com"); config.put("oss.access.key.id", "key"); config.put("oss.access.key.secret", "secret"); sparkSession(); HiveCatalog hiveCatalog = new HiveCatalog(new Configuration()); hiveCatalog.initialize("iceberg_hive_catalog", config); Table table = hiveCatalog.loadTable(identifier); Actions.forTable(table).rewriteDataFiles().targetSizeInBytes(128 * 1024 * 1024).execute(); Snapshot snapshot = table.currentSnapshot(); if (snapshot != null) { table.expireSnapshots().expireOlderThan(snapshot.timestampMillis()).commit(); } } private static void sparkSession() { SparkSession.builder() .master("local[*]") .config(SQLConf.PARTITION_OVERWRITE_MODE().key(), "dynamic") .config("spark.hadoop." + METASTOREURIS.varname, "localhost:9083") .config("spark.sql.warehouse.dir", "warehouse") .config("spark.executor.heartbeatInterval", "100000") .config("spark.network.timeoutInterval", "100000") .enableHiveSupport() .getOrCreate(); } }
使用flink+iceberg合并小文件
import org.apache.flink.api.java.utils.ParameterTool; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.flink.CatalogLoader; import org.apache.iceberg.flink.actions.Actions; import java.util.HashMap; import java.util.Map; public class FlinkCompaction { public static void main(String[] args) throws Exception { ParameterTool tool = ParameterTool.fromArgs(args); Map<String, String> properties = new HashMap<>(); properties.put("type", "iceberg"); properties.put("catalog-type", "hive"); properties.put("property-version", "1"); properties.put("warehouse", tool.get("warehouse")); properties.put("uri", tool.get("uri")); if (tool.has("oss.endpoint")) { properties.put("io-impl", "org.apache.iceberg.aliyun.oss.OSSFileIO"); properties.put("oss.endpoint", tool.get("oss.endpoint")); properties.put("oss.access.key.id", tool.get("oss.access.key.id")); properties.put("oss.access.key.secret", tool.get("oss.access.key.secret")); } CatalogLoader loader = CatalogLoader.hive(tool.get("catalog"), new Configuration(), properties); Catalog catalog = loader.loadCatalog(); TableIdentifier identifier = TableIdentifier.of(Namespace.of(tool.get("db")), tool.get("table")); Table table = catalog.loadTable(identifier); /** * 合并小文件,核心代码 */ Actions.forTable(table) .rewriteDataFiles() .maxParallelism(5) .targetSizeInBytes(128 * 1024 * 1024) .execute(); Snapshot snapshot = table.currentSnapshot(); if (snapshot != null) { table.expireSnapshots().expireOlderThan(snapshot.timestampMillis()).commit(); } } }
总结:无论spark还是flink写入iceberg,合并小文件的关键为:
Actions.forTable(table).rewriteDataFiles().targetSizeInBytes(128 * 1024 * 1024).
.targetSizeInBytes(128 * 1024 * 1024)参数决定了最终结果文件的大小。
hudi合并小文件
我们使用COPY_ON_WRITE表来演示Hudi如何自动处理文件大小特性。
关键配置项如下:
- hoodie.parquet.max.file.size:数据文件最大大小,Hudi将试着维护文件大小到该指定值;
- hoodie.parquet.small.file.limit:小于该大小的文件均被视为小文件;
- hoodie.copyonwrite.insert.split.size:单文件中插入记录条数,此值应与单个文件中的记录数匹配(可以根据最大文件大小和每个记录大小来确定)