flink cdc抽取hologres 2张表采用的增量,这个问题怎么解决?

flink cdc抽取hologres 2张表采用的增量模式,2个表的增量数据每天都在8000w左右,现在的问题是刚开始运行的时候sink端busy会随着时间增大,最后产生背压,需要无状态重启一下才能恢复?aec8093818bb53391fd300fdb0c589ec.png

展开
收起
小小鹿鹿鹿 2024-05-01 08:55:14 49 发布于山西 分享
分享
版权
举报
阿里云 AI 助理回答

问题分析与解决方案

根据您描述的问题,Flink CDC 抽取 Hologres 的两张表采用增量模式,每天增量数据量约为 8000w。在运行过程中,sink 端的 busy 状态会随着时间逐渐增大,最终导致背压(backpressure),需要通过无状态重启才能恢复。以下是针对该问题的详细分析和解决方案。


1. 问题可能的原因

1.1 数据写入性能瓶颈

Hologres 的写入性能受多种因素影响,包括存储格式、Insert 模式、Table Group 配置以及 Shard 数量等。如果目标表的配置不合理,可能会导致写入延迟增加,进而引发背压。

  • 列存表的局部更新:列存表在高 QPS 场景下容易出现局部更新问题,导致写入性能下降。
  • Shard 数量不足:如果目标表的 Shard 数量不足以支撑高并发写入,可能会导致数据倾斜或写入瓶颈。
  • Insert 模式选择不当:不同 Insert 模式的性能差异较大,例如 InsertOrReplaceInsertOrUpdate 在行存表中性能较好,但在列存表中可能表现较差。

1.2 数据倾斜

如果 Flink 作业的数据分发不均匀,可能导致某些分区或 Shard 的写入压力过大,从而引发背压。可以通过检查数据分布来确认是否存在数据倾斜。

1.3 后端集群压力

Hologres 实例的整体负载较高时,写入性能会显著下降。如果实例资源不足,可能会导致 sink 端的 busy 状态持续升高。

1.4 连接数限制

JDBC 模式下,Hologres Connector 会占用一定数量的连接数。如果作业并发较高且未启用连接复用,可能会导致连接数耗尽,进而影响写入性能。


2. 解决方案

2.1 优化目标表配置

  • 调整存储格式
    • 如果目标表是列存表,建议评估是否可以切换为行存表或行列混存表,以提升写入性能。
    • 行存表在点查和写入性能上优于列存表,适合高 QPS 场景。
  • 优化 Insert 模式
    • 根据业务需求选择合适的 Insert 模式。例如,对于列存表,优先使用 InsertOrIgnore;对于行存表,优先使用 InsertOrReplaceInsertOrUpdate
  • 增加 Shard 数量
    • 检查目标表的 Shard 数量是否合理。如果 Shard 数量不足,可以通过调整 Table Group 或重新创建表来增加 Shard 数量。

2.2 检查数据分布

  • 使用 SQL 命令检查数据分布,确认是否存在数据倾斜。例如:
    SELECT shard_id, COUNT(*) AS record_count
    FROM hologres_table
    GROUP BY shard_id
    ORDER BY record_count DESC;
    

    如果发现某些 Shard 的数据量显著高于其他 Shard,说明存在数据倾斜。可以通过调整 Distribution Key 来优化数据分布。

2.3 提升后端集群资源

  • 扩容实例:如果 Hologres 实例的整体负载较高,建议扩容实例资源(如 CPU、内存)以提升写入性能。
  • 监控指标:定期查看 Hologres 的监控指标,确认是否有高延迟或高负载的情况。

2.4 启用连接复用

  • 从 Flink 引擎 VVR-8.0.5-Flink-1.17 版本开始,默认启用了连接复用功能。如果您的作业未启用连接复用,建议为热点表单独配置 connectionPoolName 参数以优化性能。例如:
    'connectionPoolName' = 'hot_table_pool'
    
  • 调整 connectionSize 参数,确保每个并发的连接数足够支持高并发写入。

2.5 调整 Flink 作业配置

  • 增加并发度:适当增加 Flink 作业的并发度,以分担写入压力。
  • 优化 Checkpoint 配置:确保 Checkpoint 配置合理,避免因 Checkpoint 失败导致作业性能下降。
  • 启用异步维表查询:如果作业中涉及维表 Join,建议启用异步模式以提升性能。例如:
    'async' = 'true'
    

3. 实施步骤

  1. 检查目标表配置

    • 确认存储格式、Insert 模式和 Shard 数量是否合理。
    • 如果需要调整,重新创建目标表并迁移数据。
  2. 检查数据分布

    • 使用 SQL 命令检查数据分布,确认是否存在数据倾斜。
    • 如果存在数据倾斜,调整 Distribution Key 并重新分发数据。
  3. 监控后端集群负载

    • 查看 Hologres 的监控指标,确认是否有高延迟或高负载的情况。
    • 如果负载较高,考虑扩容实例资源。
  4. 优化连接数配置

    • 启用连接复用功能,并为热点表单独配置 connectionPoolName
    • 调整 connectionSize 参数以支持高并发写入。
  5. 调整 Flink 作业配置

    • 增加并发度并优化 Checkpoint 配置。
    • 如果涉及维表 Join,启用异步模式。

4. 重要提醒

  • 定期监控:建议定期监控 Hologres 和 Flink 的性能指标,及时发现并解决潜在问题。
  • 测试环境验证:在生产环境实施上述优化前,建议在测试环境中进行验证,确保优化效果符合预期。
  • 备份数据:在调整表结构或迁移数据时,务必提前备份数据,避免数据丢失。

通过以上优化措施,可以有效缓解 sink 端 busy 状态升高和背压问题,提升作业的稳定性和性能。

有帮助
无帮助
AI 助理回答生成答案可能存在不准确,仅供参考
0 条回答
写回答
取消 提交回答

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

还有其他疑问?
咨询AI助理