问下大佬们,Flink CDC mysql到pg。sink节点busy 100%,导致source也报错,有人遇到吗
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
根据您描述的问题,Flink CDC 从 MySQL 同步到 PostgreSQL 的过程中,Sink 节点的负载达到 100%,导致 Source 节点也报错。这种情况通常与作业的资源分配、反压问题或 Sink 节点的性能瓶颈有关。以下是详细的分析和解决方案:
启用批量写入:
batch-size
)和刷新间隔(flush-interval
)。例如:CREATE TABLE pg_sink (
id BIGINT,
name STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://<host>:<port>/<database>',
'table-name' = '<table>',
'username' = '<username>',
'password' = '<password>',
'sink.buffer-flush.max-rows' = '5000', -- 批量写入行数
'sink.buffer-flush.interval' = '2s' -- 刷新间隔
);
优化 PostgreSQL 性能:
max_connections
),确保有足够的连接供 Flink 使用。work_mem
和 maintenance_work_mem
参数,以提高写入性能。检查网络延迟:
增加 TaskManager 内存:
taskmanager.memory.process.size
参数来增加内存。例如:taskmanager.memory.process.size: 4g
调整并行度:
开启自动调优:
SET 'table.optimizer.auto-tuning.enabled' = 'true';
监控反压:
调整 Source 并发度:
避免 Binlog 清理:
SET GLOBAL binlog_expire_logs_seconds = 86400; -- 保留 24 小时
重新启动作业:
避免不兼容的表结构变更:
启用 CDC Source 复用:
SET 'table.optimizer.source-merge.enabled' = 'true';
升级 Flink 版本:
通过上述优化措施,您可以有效缓解 Sink 节点的性能瓶颈,解决反压问题,并确保 Flink 作业的稳定运行。如果问题仍然存在,建议结合 Flink Web UI 的监控数据进一步分析具体瓶颈。您可以复制页面截图提供更多信息,我可以进一步帮您分析问题原因。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。