开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

在Flink CDC中这种双流join爆红怎么调优,如何解决?

在Flink CDC中这种双流join爆红怎么调优,如何解决?请参考图片:d0e4ac08617d7eef315062db954371b5.png

展开
收起
冲冲冲c 2024-06-26 11:10:34 144 0
7 条回答
写回答
取消 提交回答
  • image.png
    针对Flink CDC中双流Join导致的状态膨胀和资源压力问题,可以通过以下策略进行调优与解决:

    1. 合理设置JOIN_STATE_TTL Hint

      • 通过JOIN_STATE_TTL提示,可以自定义左右流中状态的保存周期。例如,将左流设置为较短的保存周期(如12小时),右流保持较长周期(如18天),确保数据完整性的同时减少状态大小。这样能大幅降低存储需求,从原本的5.8 TB减少到约590 GB。
    2. 利用主键优化连接操作

      • 在建表DDL中明确声明主键,并在Join操作中优先使用主键。当Join Key包含主键时,Flink会使用ValueState存储,仅保留每个键的最新记录,极大节省存储空间。非主键Join则会使用MapState,可能导致状态膨胀。
    3. 优化Append-Only流的去重

      • 使用ROW_NUMBER函数替代FIRST_VALUE或LAST_VALUE进行去重,高效保留首次出现或最新记录,减少状态占用。
    4. 改进聚合查询性能

      • 利用AGG WITH FILTER语法替换CASE WHEN,以共享状态信息,减少读写次数,性能提升可达一倍。
    5. 调整多流Join顺序

      • 先Join数据量小的流,再Join大数据流,减轻状态冗余放大效应,提升处理效率。
    6. 理解状态算子的产生与管理

      • 对于具有主键的表,Flink会使用特定的Sink算子(如SinkUpsertMaterializer)确保upsert语义,通过维护主键状态信息保证数据唯一性和有序性。
      • 针对LookupJoin中的非确定性更新,配置优化选项尝试解决,如通过引入带状态的LookupJoin算子处理维表数据动态变化,确保结果准确性。

    通过上述方法,可以有效应对Flink CDC双流Join中遇到的状态膨胀问题,减少资源消耗,提升作业性能。

    相关链接
    SQL作业大状态导致反压的调优原理与方法 调优方法 https://help.aliyun.com/zh/flink/use-cases/control-state-size-to-prevent-backpressure-in-sql-deployments

    2024-08-03 18:18:10
    赞同 展开评论 打赏
  • 在 Flink CDC 中处理双流 Join(两个来自不同数据源的流进行 Join 操作)时,确实可能会遇到性能瓶颈,尤其是在处理大量数据和高并发场景下。您提到的调优策略是非常有针对性的,下面我将进一步解释这些策略以及它们如何帮助提升性能。

    1. 开启 KV 分离优化
      Gemini StateBackend 是 Flink 的一种优化状态后端,它针对 Keyed State 进行了特别优化。在双流 Join 的场景中,如果两个流都是基于 Key 的(即 Join 条件是基于某个或多个 Key 的),那么 KV 分离优化可以显著提高性能。

    配置方式:通过设置 table.exec.join.kv-separate 为 FORCE 来强制开启 KV 分离。这个配置告诉 Flink 在执行 Join 操作时,尽量将 Key 和 Value 分开存储和处理,以减少内存占用和提高处理速度。

    1. 资源优化
      增加 CPU 和内存:对于 JobManager 和 TaskManager,增加它们的资源分配可以显著提高 Flink 作业的处理能力。特别是在处理大量数据和复杂计算时,更多的资源意味着更高的并行度和更快的处理速度。
      优化资源分配:除了增加总体资源外,还需要合理配置 Task Slots,以确保资源被有效利用。例如,避免单个 TaskManager 上的 Slot 过多而导致资源争用。
    2. 调整 Checkpoint 间隔
      Checkpoint 是 Flink 保证状态一致性和容错性的重要机制。然而,过于频繁的 Checkpoint 会增加系统的延迟和负载。

    设置合适的 Checkpoint 间隔:通过调整 execution.checkpointing.interval 参数,可以在延迟和容错之间找到平衡。较长的间隔可以减少 Checkpoint 对性能的影响,但也会增加数据丢失的风险(在发生故障时)。

    1. 开启 MiniBatch
      MiniBatch 是 Flink Table API 和 SQL 的一种优化手段,它允许将多个小的输入批次合并成一个较大的批次进行处理,从而减少状态访问次数和提高吞吐率。

    配置 MiniBatch:通过设置 table.exec.mini-batch.enabled 为 true 来启用 MiniBatch,并通过 table.exec.mini-batch.allow-latency 来控制允许的额外延迟。这个配置允许 Flink 在一定程度上牺牲延迟来换取更高的吞吐率。
    综合考虑
    在调优 Flink CDC 中的双流 Join 性能时,需要综合考虑上述策略,并根据具体的应用场景和性能要求进行调整。此外,还可以通过以下方式进一步优化:

    优化 Join 条件:确保 Join 条件尽可能高效,避免使用高成本的函数或计算。
    调整并行度:通过调整 Flink 作业的并行度来更好地利用集群资源。
    监控和日志:使用 Flink 的监控和日志功能来实时跟踪作业的性能指标,及时发现并解决问题。
    最终,调优是一个迭代的过程,需要不断地尝试和调整,直到找到最适合当前应用场景的配置。image.png

    2024-07-31 09:45:36
    赞同 展开评论 打赏
  • Flink CDC中双流Join导致的性能问题,可以尝试以下调优策略:

    开启KV分离优化:对于Gemini StateBackend,自动推导并开启KV分离优化,提升双流Join性能。可以通过配置table.exec.join.kv-separate来控制,设置为FORCE强制开启。
    资源优化:增加JobManager和TaskManager的CPU和内存,以应对高并发和复杂拓扑。
    调整Checkpoint间隔:设置合适的execution.checkpointing.interval以平衡延迟和容错。
    开启MiniBatch:使用table.exec.mini-batch.enabled和table.exec.mini-batch.allow-latency来减少State访问,提升吞吐。

    确保您的Flink版本支持这些优化,并参考配置作业运行参数进行设置。
    image.png

    2024-07-23 16:53:06
    赞同 展开评论 打赏
  • 阿里云大降价~

    针对Flink CDC中双流join导致的状态大小膨胀(爆红)问题,可以从以下几个方面进行调优:

    合理设置JOIN_STATE_TTL Hint: 通过设置合适的JOIN_STATE_TTL提示,可以有效管理状态的生命周期。例如,将左流的保存周期缩短至12小时,而右流保持18天,这样在保证数据完整性的同时,减少了状态的大小

    利用主键优化连接操作:

    确保在建表DDL中声明主键,并在双流连接时优先使用主键。当连接键包含主键时,系统会使用ValueState存储,仅保留每个键的最新记录,极大节省存储空间
    若连接非主键字段,系统将使用MapState存储所有相关记录,这会占用更多资源。因此,尽可能使用主键进行连接是优化的关键。
    优化去重操作: 使用ROW_NUMBER函数替代FIRST_VALUE或LAST_VALUE进行去重,能更高效地处理数据,减少不必要的状态存储

    改进聚合查询: 利用AGG WITH FILTER语法替换CASE WHEN,以共享状态信息,减少状态读写,从而提升聚合查询性能,尤其是在多维度统计场景下

    官网给的调优方案:
    image.png

    参考文档 ](https://help.aliyun.com/zh/flink/use-cases/control-state-size-to-prevent-backpressure-in-sql-deployments?spm=a2c6h.13262185.0.0.122f42e4e8zw1g&scm=20140722.S_help@@%E6%96%87%E6%A1%A3@@2807557@@16.S_llmOS0.ID_1096731-RL_FlinkCDC%E5%8F%8C%E6%B5%81join%E7%88%86%E7%BA%A2%E8%B0%83%E4%BC%98%E8%A7%A3%E5%86%B3-LOC_chat~DAS~llm-OR_ser-V_3-P0_0)

    2024-07-23 16:39:16
    赞同 展开评论 打赏
  • 北京阿里云ACE会长

    根据集群资源和任务需求,适当调整 parallelism 参数。在您提供的信息中,Parallelism: 10 可能不是最优设置,可以根据任务负载进行调整。

    根据数据特性选择合适的 Join 类型,例如:如果数据量不大,可以使用 broadcast join。
    调整状态后端:

    如果使用状态后端(如 RocksDB),调整状态后端的配置,例如:设置合理的 state.time-to-live 参数,减少状态大小。

    2024-07-21 17:39:05
    赞同 展开评论 打赏
  • 两种解决思路:

    • 减少状态大小:在 Flink Join 中的有效的优化措施就是减少 state key 的数量。在未优化之前 A 流和 B 流的数据往往是存储在单独的两个 State 实例中的,优化思路就是将同 Key 的数据放在一起进行存储,一个 key 的数据只需要存储一份,减少了 key 的数量。

    • 转移状态至外存:大 State 会导致 Flink 任务不稳定,就将 State 存储在外存中,让 Flink 任务轻量化,比如将数据存储在 Redis 中,A 流和 B 流中相同 key 的数据共同维护在一个 Redis 的 hashmap 中,以供相互进行关联。

    ——参考链接

    2024-07-21 15:50:55
    赞同 1 展开评论 打赏
  • 1.爆红通常指的是算子或步骤的执行时间过长,导致整个作业的进度变慢或停滞。双流join操作涉及两个数据流的合并,涉及到大量的数据匹配和转换。如果数据量很大,或者数据的结构复杂,那么join操作的效率就会降低,导致爆红。

    1. 解决方案:可以从以下几个方面进行调优:
      • 数据预处理:在进行join之前,对数据进行预处理,如使用哈希表或其他索引结构来减少数据匹配的时间。
      • 优化join条件:选择合适的join条件,避免不必要的数据匹配,提高join的效率。
      • 并行化:将join操作拆分为多个部分,并行处理,以利用多核或多机器的计算能力。
      • 分区策略:合理地划分数据集,使得join操作能够分布在不同的节点上,减少单节点的负载。
      • 硬件升级:如果硬件资源不足,考虑升级硬件设备,如增加内存、更换更快的CPU等。
      • 网络优化:确保良好的网络连接,减少数据传输的时间和成本。
    2. 示例代码:以下是一个简单的Python代码示例,展示了如何使用pandas库中的merge函数来实现两个DataFrame的双流join操作。这个例子假设我们有两个DataFrame,'df1'和'df2',它们都包含'id'列,我们需要根据'id'列进行join。
      ```python
      import pandas as pd
      df1 = pd.DataFrame({'id': [1, 2, 3], 'name': ['Alice', 'Bob', 'Charlie']})
      df2 = pd.DataFrame({'id': [4, 5, 6], 'age': [18, 20, 22]})
      joined_df = df1.merge(df2, on='id')
      print(joined_df)

    我们首先创建了两个DataFrame,然后使用pandas的merge函数进行了双流join操作。这个函数会自动识别两个DataFrame中相同的列,并进行数据匹配。输出结果应该是一个包含两组数据(来自df1和df2)的DataFrame。
    ```

    2024-07-20 10:16:50
    赞同 展开评论 打赏
滑动查看更多

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    Flink CDC Meetup PPT - 龚中强 立即下载
    Flink CDC Meetup PPT - 王赫 立即下载
    Flink CDC Meetup PPT - 覃立辉 立即下载