各位大佬,我用flinkcdc读取mysql,etl之后写入pgsql数据库,每次隔天凌晨就报错。

各位大佬,我用flinkcdc读取mysql,etl之后写入pgsql数据库,每次隔天凌晨就报错。java.lang.RuntimeException: One or more fetchers have encountered exception

at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:223) ~[flink-table-blink_2.12-1.13.6.jar:1.13.6]

at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:154) ~[flink-table-blink_2.12-1.13.6.jar:1.13.6]

at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:116) ~[flink-table-blink_2.12-1.13.6.jar:1.13.6]

at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:305) ~[flink-dist_2.12-1.13.6.jar:1.13.6]

at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:69) ~[flink-dist_2.12-1.13.6.jar:1.13.6]

at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) ~[flink-dist_2.12-1.13.6.jar:1.13.6]

at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423) ~[flink-dist_2.12-1.13.6.jar:1.13.6]

at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) ~[flink-dist_2.12-1.13.6.jar:1.13.6]

at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684) ~[flink-dist_2.12-1.13.6.jar:1.13.6]

at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639) ~[flink-dist_2.12-1.13.6.jar:1.13.6]

at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650) ~[flink-dist_2.12-1.13.6.jar:1.13.6]

at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623) ~[flink-dist_2.12-1.13.6.jar:1.13.6]

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) ~[flink-dist_2.12-1.13.6.jar:1.13.6]

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) ~[flink-dist_2.12-1.13.6.jar:1.13.6]

at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_342]

Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records

at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:148) ~[flink-table-blink_2.12-1.13.6.jar:1.13.6]

at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:103) ~[flink-table-blink_2.12-1.13.6.jar:1.13.6]

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_342]

at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_342]

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_342]

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_342]

... 1 more

Caused by: java.lang.IllegalStateException: The connector is trying to read binlog starting at Struct{version=1.5.4.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1668482866321,db=,server_id=0,file=mysql-bin.020557,pos=212473,row=0}, but this is no longer available on the server. Reconfigure the connector to use a snapshot when needed.

at com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext.loadStartingOffsetState(StatefulTaskContext.java:180) ~[?:?]

at com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext.configure(StatefulTaskContext.java:113) ~[?:?]

展开
收起
毛毛虫雨 2022-11-27 09:59:42 1482 分享 版权
阿里云 AI 助理回答

遇到这个错误,主要是因为Flink CDC在尝试从MySQL读取binlog时发现之前记录的偏移量(即binlog的位置)在MySQL服务器上已经不再可用。这通常发生在MySQL binlog被清理或者由于某种原因重置之后。为了解决这个问题,你需要重新配置Flink CDC作业以使用快照模式(snapshot)来初始化数据流,然后再切换到基于binlog的增量同步。

以下是解决步骤:

  1. 停止当前的Flink作业:首先,需要停止正在运行并报错的Flink作业。

  2. 重新配置Flink CDC源:在你的Flink CDC配置中,确保设置了正确的重启策略和快照相关设置。对于Debezium MySQL Connector,你可以通过以下方式配置以强制进行一次全量快照:

    # Flink CDC for MySQL configuration example
    execution.startup-mode: initial
    debezium.source.snapshot.locking.mode: none  # 或者 'none' 如果你不需要锁表
    
    • execution.startup-mode: initial 强制作业启动时执行一次全量快照。
    • debezium.source.snapshot.locking.mode 配置快照时的锁行为,根据你的需求选择合适的锁定模式,如果不想在快照期间锁定表,可以设置为none,但需注意这可能会导致数据一致性问题。
  3. 重新提交作业:保存配置更改后,重新提交你的Flink作业。这次它会先执行一个全量数据的快照导入,然后继续监听binlog变化进行增量同步。

  4. 监控与验证:作业重新启动后,密切监控其运行状态,确保数据正确无误地流入目标数据库,并且没有再出现之前的错误。

  5. 考虑定期维护:为了避免未来再次发生类似问题,可能需要考虑MySQL的binlog保留策略,确保binlog不会在Flink CDC处理过程中被删除。同时,也可以考虑在Flink作业设计中加入更健壮的错误处理和自动重启逻辑。

请根据你的具体环境和需求调整上述建议。您可以复制页面截图提供更多信息,我可以进一步帮您分析问题原因。

有帮助
无帮助
AI 助理回答生成答案可能存在不准确,仅供参考
0 条回答
写回答
取消 提交回答

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

收录在圈子:
实时计算 Flink 版(Alibaba Cloud Realtime Compute for Apache Flink,Powered by Ververica)是阿里云基于 Apache Flink 构建的企业级、高性能实时大数据处理系统,由 Apache Flink 创始团队官方出品,拥有全球统一商业化品牌,完全兼容开源 Flink API,提供丰富的企业级增值功能。
还有其他疑问?
咨询AI助理