Flink CDC任务从savepoint/checkpoints状态中恢复作业错误问题

flink CDC任务监听mysql数据。只要不是从savepoint/checkpoint中恢复,都是能成功运行并监听数据的,但是只要从savepoint/checkpoint中恢复作业,就会报如下错误:
Caused by: java.lang.RuntimeException: One or more fetchers have encountered exception
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:261)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:131)
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:419)
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:114)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Caused by: org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:50)
at com.ververica.cdc.connectors.mysql.debezium.task.context.MySqlErrorHandler.setProducerThrowable(MySqlErrorHandler.java:85)
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource$ReaderThreadLifecycleListener.onCommunicationFailure(MySqlStreamingChangeEventSource.java:1544)
at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:1079)
at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:631)
at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:932)
... 1 more
Caused by: io.debezium.DebeziumException: Client requested master to start replication from position > file size Error code: 1236; SQLSTATE: HY000.
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.wrap(MySqlStreamingChangeEventSource.java:1488)
... 5 more
Caused by: com.github.shyiko.mysql.binlog.network.ServerException: Client requested master to start replication from position > file size
at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:1043)

展开
收起
游客ia5x5oefr42ge 2025-02-20 09:20:20 805 分享 版权
3 条回答
写回答
取消 提交回答
  • Flink CDC任务从savepoint/checkpoints状态中恢复作业错误问题
    根据您提供的上下文信息,Flink CDC任务在尝试从savepoint或checkpoint恢复时遇到了异常。以下是对该问题的详细解释和可能的原因分析:

    1. 恢复流程概述
      Checkpoint恢复:当Flink任务发生故障(failover)时,系统会自动从最近一次成功的checkpoint位点进行恢复。
      1
      Savepoint恢复:用户可以手动创建savepoint,并在需要时从指定的savepoint恢复作业。
      3
    2. 错误原因分析
      从您提供的堆栈跟踪来看,错误发生在SplitFetcher线程中,具体是在轮询记录时出现了意外异常。以下是可能导致此问题的一些常见原因及解决方法:

    网络问题:如果Flink任务与MongoDB之间的网络连接不稳定,可能会导致SplitFetcher无法正常获取数据。

    解决方法:检查网络连接,确保Flink任务能够稳定地访问MongoDB。
    权限问题:如果用于连接MongoDB的用户名或密码不正确,或者没有足够的权限访问所需的数据,也会导致恢复失败。

    解决方法:确认连接MongoDB时使用的用户名和密码是正确的,并且该用户具有足够的权限。如果使用的是特定数据库下的用户凭证,请确保在WITH参数中添加了'connection.options' = 'authSource=用户所在的DB'。
    2
    Resume Token无效:在恢复过程中,Flink CDC依赖于Change Stream的Resume Token来定位oplog.rs中的位置。如果Resume Token对应的记录已经不在oplog.rs中,会导致恢复失败。

    解决方法:确保oplog.rs集合的大小足够大,以避免过早删除变更日志。可以通过调整oplog.rs的大小来延长其保留时间。
    2
    资源不足:如果Flink任务在恢复过程中缺乏足够的资源(如内存、CPU等),也可能导致恢复失败。

    解决方法:增加Flink任务的资源配置,确保有足够的资源来处理恢复过程中的数据加载和状态重建。
    版本兼容性问题:如果使用的Flink版本与MongoDB版本之间存在兼容性问题,也可能会导致恢复失败。

    解决方法:确保使用的Flink版本与MongoDB版本兼容。如果需要,可以考虑升级或降级相关组件的版本。

    1. 诊断步骤
      为了进一步诊断问题,您可以采取以下步骤:

    使用诊断工具分析算子状态:利用Thread Dump、线程动态分析和火焰图等工具,检查初始化阶段的算子线程栈。重点关注线程栈是否长时间处于等待状态,尤其是在Gemini等状态存储系统上的操作。
    5
    识别状态算子的初始化问题:如果发现某个算子长时间处于初始化状态,且该算子涉及状态处理,那么可以推断问题可能出在状态的下载或重建过程中。
    5

    2025-03-15 23:54:01
    赞同 59 展开评论
  • 可以看看这篇文章
    https://developer.aliyun.com

    2025-03-05 13:59:06
    赞同 131 展开评论
  • 初步排查是因为chpoints中_metadata中记录的binlog文件与数据库当前写入的binlog(SHOW MASTER STATUS查看;)文件不一样导致的,_metadata中正确记录保存了当前监听位置即pos,但是所对应的binlog file却是错误的,把检查点目录、保存点目录删掉,监听配置从指定timestamp改为latest,重新启动作业,然后取消作业再从检查点恢复作业,可以正常恢复。但是为什么一开始checkpoint没有正确记录保存当前监听位置pos所对应的binlog文件,而是记录了错误的binlog文件到_metadata中,这一点还不知道原因,不知道如何排查,但是可能binlog文件截断和跨binlog记录事务有关,可以参考https://blog.csdn.net/kjh2007abc/article/details/85001048

    2025-02-20 19:38:16
    赞同 193 展开评论

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

还有其他疑问?
咨询AI助理