flinkcdc3.0.1整库同步到Doris2.0.4,分库分表合成一张表,Doris中数据突然不更新了,但flink程序以及Doris使用都是正常的。 这个是要调整flink集群的参数吗?源是mysql->doris。之前我用oracle也类似,后面加了参数就可以了。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
根据您描述的问题,Flink CDC 3.0.1 整库同步到 Doris 2.0.4 的过程中,分库分表合并成一张表后,Doris 中的数据突然停止更新,但 Flink 程序和 Doris 本身运行正常。这种情况可能与 Flink 的配置参数、CDC 数据捕获逻辑或 Doris Sink 的写入机制有关。以下是详细的分析和解决方案:
Flink 的 Checkpoint 是数据同步的核心机制,如果 Checkpoint 配置不当,可能导致数据同步中断或延迟。以下是一些关键参数的检查和调整建议:
execution.checkpointing.interval
这是 Flink Checkpoint 的时间间隔,推荐值为 10 秒。如果间隔过长,可能导致数据同步延迟;如果间隔过短,可能增加系统负载。建议设置为 10 秒左右。
table.exec.state.ttl
如果启用了 minibatch
,State TTL(Time-To-Live)过期后,累积的计算结果会被清除,导致无新数据更新。建议根据业务需求调整 TTL 值,确保 State 不会因过期而被清除。
Checkpoint 最小间隔时间
Checkpoint 的最小间隔时间会影响下一次 Checkpoint 的触发时机。如果 Checkpoint 失败或超时,可能导致数据同步中断。建议检查日志中是否有 Checkpoint 异常,并调整相关参数。
MySQL CDC Source 的配置直接影响数据捕获的稳定性和完整性。以下是一些常见问题及解决方案:
jdbc.properties.useSSL
如果使用的是 MySQL 8.0.27 及以上版本,默认连接需要启用 SSL 协议。如果未正确配置,可能导致连接异常。建议在 MySQL CDC 表的 with
参数中添加 'jdbc.properties.useSSL'='false'
。
Binlog 配置
确保 MySQL 的 Binlog 格式为 ROW
,并且开启了 log_bin
和 binlog_row_image=FULL
。如果 Binlog 配置不正确,可能导致数据捕获不完整。
分库分表合并逻辑
分库分表合并时,Flink CDC 需要正确解析多个表的变更事件。如果某些表的变更未被捕获,可能导致数据更新中断。建议检查 including-tables
和 excluding-tables
参数是否配置正确。
Doris Sink 的写入机制可能影响数据更新的及时性。以下是一些关键配置的检查点:
sink-conf
参数
Doris Sink 的配置项(如批量写入大小、写入超时时间等)可能影响数据同步性能。建议根据 Doris 的集群规模和负载情况调整这些参数。
table-conf
参数
在创建 Doris 表时,properties
中的配置项(如分区策略、索引设置等)可能影响数据写入效率。建议检查 Doris 表的定义是否合理。
写入模式
如果 Doris 表使用了主键模型(Primary Key Model),确保 Flink 写入的数据能够正确更新主键对应的记录。如果写入模式不匹配,可能导致数据更新失败。
Flink 集群资源不足
如果 Flink 集群的资源(如 TaskManager 内存、并行度等)不足,可能导致任务运行不稳定。建议适当增加 parallelism.default
参数值以提高数据同步速度。
网络或连接问题
检查 Flink 集群与 MySQL、Doris 之间的网络连接是否稳定。如果存在网络抖动或连接超时,可能导致数据同步中断。
日志排查
查看 Flink 任务的日志,重点关注是否有异常信息(如 EOFException
、NegativeArraySizeException
等)。根据日志中的错误信息,进一步定位问题原因。
您提到之前在 Oracle 场景中通过添加参数解决了类似问题。可以尝试将 Oracle 场景中的参数迁移到当前 MySQL 场景中,例如: - 调整 execution.checkpointing.interval
和 table.exec.state.ttl
。 - 增加 parallelism.default
提高并行度。 - 检查 sink-conf
和 table-conf
是否有类似的优化参数。
针对您描述的问题,建议从以下几个方面逐步排查和调整: 1. 检查 Flink Checkpoint 配置,确保 execution.checkpointing.interval
和 table.exec.state.ttl
设置合理。 2. 检查 MySQL CDC Source 配置,确保 Binlog 格式正确且 jdbc.properties.useSSL
参数已添加。 3. 检查 Doris Sink 配置,优化 sink-conf
和 table-conf
参数。 4. 排查 Flink 集群资源和网络连接问题,确保任务运行环境稳定。
如果问题仍未解决,建议提供 Flink 任务的日志信息,以便进一步分析具体原因。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。