开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

有没有对flink 离线任务压缩 hudi表 增大一次压缩的commit个数,感觉默认的太慢了

有没有对flink 离线任务压缩 hudi表 增大一次压缩的commit个数,感觉默认的太慢了

展开
收起
游客6vdkhpqtie2h2 2022-09-16 10:35:15 799 0
12 条回答
写回答
取消 提交回答
  • 公众号:网络技术联盟站,InfoQ签约作者,阿里云社区签约作者,华为云 云享专家,BOSS直聘 创作王者,腾讯课堂创作领航员,博客+论坛:https://www.wljslmz.cn,工程师导航:https://www.wljslmz.com

    是的,可以通过在 Flink 的 Hudi 写入操作中配置压缩参数来调整 Hudi 表的压缩速度。具体来说,可以通过设置 HoodieWriteConfig.TABLE_FILE_LOG_BLOCK_SIZE_BYTES 和 HoodieWriteConfig.TABLE_FILE_MAX_SIZE_BYTES 参数来调整一次写入操作中压缩的 commit 个数。

    HoodieWriteConfig.TABLE_FILE_LOG_BLOCK_SIZE_BYTES 参数表示写入操作中要压缩的单个 commit 的大小,可以适当增加该参数的值来增大一次压缩的 commit 个数。例如,将其默认值 16777216 增加到 67108864,即将单个 commit 的大小从 16MB 增大到 64MB。

    HoodieWriteConfig.TABLE_FILE_MAX_SIZE_BYTES 参数表示当一个数据文件达到该大小时,会触发一个压缩 commit。可以将其适当减小来加快单个 commit 的完成时间。

    需要注意的是,增加一次压缩的 commit 个数也会导致单个 commit 的完成时间变长,因此需要根据实际情况进行调整。

    以下是一个示例代码片段:

        HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
            ...
            .withTableFileMaxSize(1024 * 1024 * 1024) // 设置单个文件大小上限
            .withLogFileSize(256 * 1024 * 1024)// 设置 log 日志单个文件大小上限
            .withTableFileLogBlockMaxSize(512 * 1024 * 1024) // 设置单个 commit 的大小
            .build();
    
        HoodieFlinkWriteConfig hoodieConfig = HoodieFlinkWriteConfig.newBuilder()
            ...
            .withWriteConfig(writeConfig)
            .build();
            
        HoodieSink hoodieSink = new FlinkOptionsBuilder()
            ...
            .withWritePayloadClass(hoodieRecord.getClass())
            .buildHoodieSink(hoodieConfig);
    

    在上述示例中,可以看到通过 HoodieWriteConfig.TABLE_FILE_LOG_BLOCK_SIZE_BYTES 参数来控制单个 commit 的大小。具体配置方法可以根据实际情况进行调整。

    2023-05-05 21:00:55
    赞同 展开评论 打赏
  • 对于Flink离线任务压缩Hudi表,可以通过增加一次压缩的commit个数来提高压缩速度。默认情况下,Hudi会在每个commit后进行一次压缩,可以通过设置以下参数来改变压缩行为:

    hoodie.compaction.max.commit:设置一次压缩的最大commit个数。 hoodie.compaction.small.file.limit:设置一个小文件的最大大小,当文件大小小于该值时,会被合并成一个大文件进行压缩。 通过调整以上参数,可以提高Hudi表的压缩效率。

    2023-05-05 18:09:08
    赞同 展开评论 打赏
  • 在Flink离线任务中压缩Hudi表时,默认情况下只压缩一个commit。如果您需要增大一次压缩的commit个数,可以使用HoodieConfig中的以下参数:

    1、hoodie.parquet.small.file.limit:默认值为104857600(100MB),表示当一个commit中的small files(小于该值的文件)超过该限制时,会被合并为一个更大的文件。您可以将这个值调大以增大一次压缩的commit个数,但需要注意文件大小的总和应不超过可用的内存大小。

    2、hoodie.table.max.commits:默认值为10,表示每个表可以最多保存的commit数。当这个限制被达到时,可以手动触发Hudi的compaction任务来压缩多个commit。您可以将这个值调大以增大一次压缩的commit个数,但需要注意可能会造成Hudi元数据存储压力增大。

    在Flink中,可以通过HoodieConfig来设置这些参数。例如:

    HoodieConfig config = ...  // 构造配置对象
    
    config.setValue(HoodieTableConfig.BASE_FILE_SIZE, "107374182400"); // 设置每个文件的大小为100GB
    config.setValue(HoodieTableConfig.FILTER_STAT_BEFORE_COMBINE, "false"); // 关闭filter stat,能够提升性能
    config.setValue(HoodieTableConfig.TABLE_TYPE, "COPY_ON_WRITE");
    
    // 增大一次压缩的commit个数
    config.setValue(HoodieTableConfig.MAX_COMMIT_METADATA_LIST, "20");
    config.setValue(HoodieTableConfig.PAYLOAD_SIZE_LIMIT_BYTES, "1073741824"); // 设置payload大小限制为1GB
    
    HoodieFlinkWriteConfig writeConfig = ...  // 构造写入配置
    writeConfig.withHoodieConfig(config);
    

    需要注意的是,增大一次压缩的commit个数会带来一些额外的开销和风险,包括元数据存储增大、压缩时间延长、任务占用的内存增多等。因此,在增大commit个数之前,需要评估您的系统环境和需求,并进行充分测试,以确保操作的安全和有效性。

    2023-05-03 07:50:20
    赞同 展开评论 打赏
  • 可以通过调整 Hudi 的配置参数来增大一次压缩的 commit 个数。

    具体可以调整以下两个参数:

    1. hoodie.compact.inline.max.delta.commits: 这个参数控制一个 inline 的 commit 中最多包含多少个 delta commit。默认值是 5,可以根据具体场景适当调高。

    2. hoodie.compact.inline.max.delta.seconds: 这个参数控制一个 inline 的 commit 中包含的 delta commit 的时间跨度。默认值是 60 秒,可以根据具体场景适当调高或调低。

    注意,调整这两个参数可能会对系统的性能产生一定的影响,所以需要根据实际情况进行调整。同时,压缩任务还与数据量、硬件配置等因素有关,如果压缩速度过慢,也可以考虑增加节点数量或升级硬件配置等措施来提升压缩速度。

    2023-04-28 20:26:09
    赞同 展开评论 打赏
  • 云端行者觅知音, 技术前沿我独行。 前言探索无边界, 阿里风光引我情。

    这个你可以在Flink中使用Hudi进行离线任务压缩时,可以通过调整CompactionAPI的参数来增大一次压缩的commit个数。具体来说,可以通过设置CompactionAPI的hoodie.compact.max.commits参数来控制一次压缩的commit个数。该参数默认值为3,表示每次压缩最多处理3个commit对应的数据。如果需要增大一次压缩的commit个数,可以将该参数设置为更大的值。

    2023-04-28 10:25:56
    赞同 展开评论 打赏
  • 天下风云出我辈,一入江湖岁月催,皇图霸业谈笑中,不胜人生一场醉。
    调整 commit 策略:Hudi 支持多种不同的 commit 策略,如按时间、按数据大小等。如果默认的 commit 策略不能满足您的需求,可以调整为更适合的 commit 策略,如使用 BucketizedWritingMiniBatcher 进行基于时间的批量写入。
    
    调整并行度:Flink 作为分布式计算框架,支持对任务进行并行度调整。如果 Hudi 表的压缩 commit 操作过慢,可以适当增加并行度,从而提高压缩 commit 的效率。
    
    调整存储格式:Hudi 支持多种存储格式,如 Parquet、ORC 等。根据实际情况和需求,可以选择合适的存储格式,以提高压缩 commit 的效率。
    
    2023-04-27 09:07:22
    赞同 展开评论 打赏
  • 值得去的地方都没有捷径

    首先,以下是一些关于压缩和Hudi表的一般建议:

    压缩操作需要消耗大量的计算资源,因此在增加一次压缩的提交个数之前,您需要评估您的计算资源是否足够支持这样的操作。如果您在资源不足的情况下尝试压缩数据,则可能会导致性能问题或任务失败。

    Hudi表采用增量存储方式,支持快速数据插入和查询,同时支持更新和删除操作。在进行Hudi表的压缩时,您需要考虑到这些差异,以便优化您的压缩操作。

    您可以考虑使用分区操作来减少压缩操作的负担,这样可以使操作更加高效。为此,您可以将数据分成大小合适的分区,然后对分区进行压缩,而不是一次压缩整个Hudi表。

    接下来,以下是一些关于Flink的一般建议:

    Flink是一个针对大规模数据流处理的开源流处理框架。它具有高吞吐量、低延迟以及高容错性的特点,同时支持流式和批式数据处理模式。

    当您需要对大规模数据进行离线处理时,Flink可以帮助您处理不同类型的数据集。它的任务处理过程可以分为以下几个步骤:数据输入、数据转换/转换/操作、数据输出。

    在Flink中,您可以使用不同的数据源(如Kafka、HDFS等)将数据输入到Flink任务中。然后,您可以使用不同的算子(例如Map、Reduce、Join等)转换数据,最后可以将结果写回到数据存储中。如果您使用Hudi表来保存数据,则可以使用Hudi提供的API将数据写入到Hudi表中。

    关于离线任务的压缩操作,您可以在Flink任务中使用压缩库(如gzip或Snappy)来压缩数据。Flink也提供了与Hadoop兼容的API,这意味着您可以使用Hadoop压缩库来进行压缩操作。

    最后,如果您想要进一步了解如何在Flink中优化离线任务的性能,我建议您查看官方文档。Flink官网提供了丰富的文档和教程,可以帮助您更好地了解Flink的工作方式并进行优化。

    2023-04-26 16:21:29
    赞同 展开评论 打赏
  • 默认情况下,Hudi会在每个commit完成后进行一次离线压缩,如果数据量较大,会导致压缩时间较长。可以考虑增大一次压缩的commit个数来提高离线压缩的效率。

    具体的做法是在Flink中配置Hudi Writer的参数,设置"hoodie.insert.shuffle.parallelism"和"hoodie.bulkinsert.shuffle.parallelism"参数,将其设置为较大的值,例如100、200等,表示一次commit包含的数据量增大,从而减少离线压缩的次数,提高离线压缩的效率。需要注意的是,增大一次压缩的commit个数会增加内存和磁盘的消耗,需要根据实际情况进行调整。另外,还可以考虑使用增量合并或流式压缩等Hudi的高级特性,以提高数据压缩的效率和性能。

    2023-04-25 08:55:43
    赞同 展开评论 打赏
  • 十分耕耘,一定会有一分收获!

    楼主你好,根据你的描述可以知道,慢的原因是因为Flink的离线任务压缩Hudi表时,默认只压缩一个commit的数据,你可以增大一次压缩的commit个数来解决慢的问题。

    2023-04-24 22:45:21
    赞同 展开评论 打赏
  • 全栈JAVA领域创作者

    可以通过调整hudi的配置文件来增大一次压缩的commit个数,具体可以参考官方文档中的配置项:

    hoodie.compact.inline.max.delta.commits: 在压缩过程中,一次性压缩的最大delta commits数量,默认为3。 hoodie.cleaner.commits.retained: 保留的commit个数,超过这个数量就会被压缩,默认为2。 如果想要进一步优化压缩效率,还可以考虑调整Hudi WriteConfig中的参数,例如:

    insertShuffleParallelism: 插入数据时的并行度。 upsertShuffleParallelism: 更新数据时的并行度。 hoodie.bulkinsert.shuffle.parallelism: 批量插入数据时的并行度。 需要注意的是,增大压缩的commit个数会增加一次压缩所需要的资源和时间,具体应该根据实际场景进行调整。

    2023-04-24 08:02:21
    赞同 展开评论 打赏
  • 热爱开发

    Flink 离线任务压缩 Hudi 表时,默认只压缩一个 commit 的数据,这可能会导致任务执行速度较慢。想要增大一次压缩的 commit 个数,可以尝试调整以下参数:

    hoodie.bulkinsert.shuffle.parallelism:该参数是 BulkInsert 操作使用的 Shuffle 并行度,设为较大的值可以增加压缩速度。

    hoodie.compact.max.delta.commits:该参数是控制 Compact 操作每次最多处理的 delta commits 数量,设为较大的值可以增加压缩速度。

    需要注意的是,在增大一次压缩的 commit 个数时,也需要考虑到系统资源的限制,如果 commit 数量过大,可能会导致系统卡顿或者 OOM 错误等问题。因此,建议在适当提高批量压缩的同时,根据实际情况进行测试和调整,以保证系统的稳定性和性能。

    2023-04-23 17:52:45
    赞同 展开评论 打赏
  • Flink 离线任务压缩 hudi 表 增大一次压缩的commit个数,可以通过以下步骤实现:

    在 Flink 任务中,使用 org.apache.flink.streaming.api.functions.source.SourceFunction 类来实现 Flink 离线任务压缩 hudi 表的功能。 在 SourceFunction 类中,可以通过 setCommits 方法来设置每次压缩的commit个数。 在 Flink 任务中,可以使用 org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext 类来获取当前的 Flink 流水线执行环境,并使用 setCommits 方法来设置每次压缩的commit个数。 以下是一个示例代码,演示如何使用 Flink 离线任务压缩 hudi 表并增大一次压缩的commit个数:

    java import org.apache.flink.streaming.api.functions.source.SourceFunction;
    import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
    import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceState;
    import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceStateFunction;
    import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceStateFunction.SourceStateConsumer;
    import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceStateFunction.SourceStateFunctionResult;
    import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceStateFunctionResult.SourceStateFunctionResultBuilder;
    import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceStateFunctionResult.SourceStateFunctionResultWithException;
    import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceStateFunctionResultWithException.ExceptionType;
    import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceStateFunctionResultWithException.ExceptionResult;

    import java.util.Arrays;
    import java.util.List;

    public cla

    2023-04-23 17:24:04
    赞同 展开评论 打赏
滑动查看更多

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    Flink CDC Meetup PPT - 龚中强 立即下载
    Flink CDC Meetup PPT - 王赫 立即下载
    Flink CDC Meetup PPT - 覃立辉 立即下载