Flink CDC任务从savepoint/checkpoints状态中恢复作业错误问题

flink CDC任务监听mysql数据。只要不是从savepoint/checkpoint中恢复,都是能成功运行并监听数据的,但是只要从savepoint/checkpoint中恢复作业,就会报如下错误:
Caused by: java.lang.RuntimeException: One or more fetchers have encountered exception
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:261)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:131)
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:419)
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:114)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Caused by: org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:50)
at com.ververica.cdc.connectors.mysql.debezium.task.context.MySqlErrorHandler.setProducerThrowable(MySqlErrorHandler.java:85)
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource$ReaderThreadLifecycleListener.onCommunicationFailure(MySqlStreamingChangeEventSource.java:1544)
at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:1079)
at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:631)
at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:932)
... 1 more
Caused by: io.debezium.DebeziumException: Client requested master to start replication from position > file size Error code: 1236; SQLSTATE: HY000.
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.wrap(MySqlStreamingChangeEventSource.java:1488)
... 5 more
Caused by: com.github.shyiko.mysql.binlog.network.ServerException: Client requested master to start replication from position > file size
at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:1043)

展开
收起
游客ia5x5oefr42ge 2025-02-20 09:20:20 5846 分享 版权
25 条回答
写回答
取消 提交回答
  • org.apache.flink.streaming

    2025-12-02 09:16:54
    赞同 16 展开评论
  • 龙年大吉!

    SourceReaderBase.getNextFetch(SourceReaderBase.java

    2025-12-02 09:16:54
    赞同 16 展开评论
  • SplitFetcherManager.checkErrors(SplitFetcherManager.java

    2025-12-02 09:06:56
    赞同 17 展开评论
  • 将军百战死,壮士十年归!

    One or more fetchers have encountered exception

    2025-12-02 09:06:56
    赞同 16 展开评论
  • 2025-11-19 11:25:54
    赞同 25 展开评论
  • 验证 Binlog 文件是否存在
    在尝试恢复 Flink 任务前,你可以先去 MySQL 服务器上确认 Flink 需要的 binlog 文件是否还存在。

    查看 Flink 需要的 binlog 位置: 这个信息通常在 Flink 的错误日志中可以找到,或者在 Flink UI 的 Checkpoint 详情里。

    查看 MySQL 服务器上可用的 binlog 文件: 在 MySQL 中执行:

    SQL

    SHOW BINARY LOGS;
    这个命令会列出所有当前可用的 binlog 文件。检查一下 Flink 需要的文件是否在这个列表里。如果不在,就说明它已经被清理了。

    解决方案 3:放弃旧状态,从最新位置启动(数据会丢失!)

    这是一种下策,只有在你可以接受数据丢失(从任务停止到重启这段时间的数据)的情况下才能使用。

    当你确定无法从旧的 savepoint 恢复时,你可以选择放弃这个 savepoint,然后重新启动一个新任务,并设置启动参数从最新的位置开始消费。

    Java

    // Flink SQL
    'scan.startup.mode' = 'latest-offset'

    // DataStream API
    MySqlSource.builder()
    .startupOptions(StartupOptions.latest())
    // ... 其他配置
    .build();
    ⚠️ 警告:这样做会导致停机期间的所有数据变更全部丢失,请务必谨慎操作!

    2025-10-30 16:34:11
    赞同 31 展开评论
  • 俺也一样

    2025-10-30 10:35:06
    赞同 27 展开评论
  • 那些看似波澜不惊的日复一日,总有一天会看到坚持的意义!

    哈哈,看不懂,完全是问了完成新手任务而来

    2025-10-28 16:46:03
    赞同 22 展开评论
  • 摸鱼来的

    真心看不懂,完全是为了完成新手任务而来

    2025-09-24 16:34:48
    赞同 37 展开评论
  • 大佬看不懂啊1

    2025-09-22 17:11:31
    赞同 34 展开评论
  • 真心看不懂,完全是为了完成新手任务而来

    2025-09-05 11:11:19
    赞同 44 展开评论
  • 真心看不懂,完全是为了完成新手任务而来

    2025-09-03 09:13:04
    赞同 44 展开评论
  • 一名技术小白正在学习中....

    真心看不懂,完全是为了完成新手任务而来

    2025-08-20 23:13:42
    赞同 56 展开评论
  • 哈哈,看不懂,完全是问了完成新手任务而来

    2025-07-25 07:58:21
    赞同 63 展开评论
  • 月移花影,暗香浮动

    这个错误表明 Flink CDC 任务在从 checkpoint/savepoint 恢复时,尝试从 MySQL binlog 中一个无效的位置开始读取数据,导致连接失败。核心问题在于:当任务恢复时,它尝试从保存的 binlog 位置继续读取,但这个位置对应的 binlog 文件可能已被 MySQL 清理或轮转

    问题原因分析:

    Caused by: com.github.shyiko.mysql.binlog.network.ServerException: 
    Client requested master to start replication from position > file size
    

    这个错误明确表示:

    1. Flink CDC 保存了 binlog 位置(文件名 + 偏移量)
    2. 任务恢复时尝试从这个位置继续读取
    3. 但 MySQL 服务器上这个 binlog 文件:
      • 已被 purge binary logs 命令清理
      • 或超过 expire_logs_days 设置被自动删除
      • 或发生了 binlog 轮转(文件被重命名)

    完整解决方案:

    1. 调整 MySQL binlog 保留策略(关键)

    -- 查看当前设置
    SHOW VARIABLES LIKE 'expire_logs_days';
    SHOW VARIABLES LIKE 'binlog_expire_logs_seconds'; -- MySQL 8.0+
    
    -- 延长 binlog 保留时间(推荐 7-14 天)
    SET GLOBAL expire_logs_days = 14; -- MySQL 5.7
    SET GLOBAL binlog_expire_logs_seconds = 1209600; -- MySQL 8.0 (14天)
    

    2. 配置 Flink CDC 容错策略

    在 CDC 源配置中添加 GTID 模式和故障转移策略:

    MySqlSource<String> source = MySqlSource.<String>builder()
        .hostname("your-host")
        .port(3306)
        .databaseList("your-db")
        .username("user")
        .password("pass")
        .serverId("5400-5404") // 设置服务器ID范围
        .serverTimeZone("Asia/Shanghai")
        // 关键配置:启用GTID和故障转移
        .startupOptions(StartupOptions.timestamp(0L)) // 初始启动位置
        .includeSchemaChanges(true)
        .debeziumProperties(debeziumProps())
        .build();
    
    private static Properties debeziumProps() {
        Properties props = new Properties();
        // 启用GTID模式
        props.setProperty("gtid.source", "enabled");
        props.setProperty("gtid.new.channel.position", "latest");
    
        // 配置binlog失效时的恢复策略
        props.setProperty("database.history.skip.unparseable.ddl", "true");
        props.setProperty("database.history.store.only.monitored.tables.ddl", "true");
        props.setProperty("database.history.store.only.captured.tables.ddl", "true");
        props.setProperty("snapshot.mode", "when_needed"); // 必要时重新快照
        props.setProperty("tombstones.on.delete", "false");
        return props;
    }
    

    3. 添加 binlog 位置检查逻辑

    在任务恢复前检查 binlog 位置有效性:

    public class BinlogPositionValidator {
        public static boolean isValidPosition(String binlogFilename, long position) {
            try (Connection conn = DriverManager.getConnection(jdbcUrl, user, password)) {
                // 检查binlog文件是否存在
                PreparedStatement stmt = conn.prepareStatement(
                    "SHOW BINARY LOGS"
                );
                ResultSet rs = stmt.executeQuery();
    
                boolean fileExists = false;
                while (rs.next()) {
                    if (rs.getString("Log_name").equals(binlogFilename)) {
                        fileExists = true;
                        // 检查位置是否有效
                        if (position <= rs.getLong("File_size")) {
                            return true;
                        }
                    }
                }
                return false;
            } catch (SQLException e) {
                throw new RuntimeException("Binlog position validation failed", e);
            }
        }
    }
    

    4. 配置恢复策略

    在 Flink 任务中实现自定义恢复逻辑:

    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
        3, // 最大重启次数
        Time.seconds(10) // 重启间隔
    ));
    
    // 注册状态监听器
    streamExecutionEnvironment.addOperatorEventListener(new OperatorEventListener() {
        @Override
        public void onCheckpointError(OperatorID operatorId, Throwable cause) {
            if (cause instanceof BinlogPositionException) {
                // 重置到最新binlog位置
                resetBinlogPosition();
            }
        }
    });
    
    private void resetBinlogPosition() {
        // 获取当前最新的binlog位置
        try (Connection conn = DriverManager.getConnection(jdbcUrl, user, password)) {
            Statement stmt = conn.createStatement();
            ResultSet rs = stmt.executeQuery("SHOW MASTER STATUS");
            if (rs.next()) {
                String currentFile = rs.getString("File");
                long currentPos = rs.getLong("Position");
    
                // 更新CDC源配置
                sourceBuilder.startupOptions(
                    StartupOptions.specificOffset(currentFile, currentPos)
                );
            }
        } catch (SQLException e) {
            logger.error("Failed to reset binlog position", e);
        }
    }
    

    5. 监控和告警配置

    # 监控binlog文件过期时间
    mysql -e "SHOW VARIABLES LIKE 'expire_logs_days'"
    
    # 设置定期检查任务
    */30 * * * * /path/to/check_binlog_retention.sh
    
    # check_binlog_retention.sh 内容:
    #!/bin/bash
    EXPIRY_DAYS=$(mysql -N -e "SHOW VARIABLES LIKE 'expire_logs_days'" | awk '{print $2}')
    if [ $EXPIRY_DAYS -lt 7 ]; then
      echo "Binlog retention too low: $EXPIRY_DAYS days" | mail -s "MySQL Binlog Alert" admin@example.com
    fi
    

    关键配置说明:

    配置项推荐值作用
    expire_logs_days≥7 天确保 binlog 保留时间足够长
    gtid.sourceenabled使用 GTID 实现精确位置恢复
    snapshot.modewhen_neededbinlog 失效时自动重新快照
    startupOptionsspecificOffset支持精确指定恢复位置
    Flink 重启策略fixedDelayRestart提供自动恢复机制

    高级方案(适用于生产环境):

    1. Binlog 备份系统

      # 定期备份binlog到HDFS/S3
      */30 * * * * mysqlbinlog --read-from-remote-server --raw \
        --host=mysql-host --user=backup --password=pass \
        --result-file=/backup/binlogs/ $(mysql -e "SHOW BINARY LOGS" | tail -n +2 | awk '{print $1}')
      
    2. CDC 位置管理服务

      // 实现外部化的binlog位置存储
      public class BinlogPositionStore implements ListCheckpointed<BinlogPosition> {
          @Override
          public List<BinlogPosition> snapshotState(long checkpointId, long timestamp) {
              // 存储到DB/Redis
              store.save(getCurrentPosition());
              return Collections.singletonList(getCurrentPosition());
          }
      
          @Override
          public void restoreState(List<BinlogPosition> state) {
              BinlogPosition position = state.get(0);
              if (!BinlogPositionValidator.isValid(position)) {
                  position = getLatestPositionFromMaster();
              }
              setStartPosition(position);
          }
      }
      
    3. Flink CDC 版本升级
      使用最新版本(至少 2.2+)包含以下修复:

      <dependency>
          <groupId>com.ververica</groupId>
          <artifactId>flink-connector-mysql-cdc</artifactId>
          <version>2.3.0</version>
      </dependency>
      

    恢复流程优化:

    1. 任务启动时检查 binlog 位置有效性
    2. 如果位置无效:
      • 尝试查找下一个有效 binlog 文件
      • 如果找不到,触发全量快照(when_needed模式)
      • 发送告警通知管理员
    3. 定期记录 binlog 位置到外部存储
    4. 配置 Flink 定期保存点(至少每天一次)

    这些配置和策略组合使用,能有效解决 Flink CDC 从 checkpoint 恢复时的 binlog 位置失效问题,确保任务稳定运行。

    2025-07-08 18:53:42
    赞同 66 展开评论
  • 这个错误是Flink CDC从savepoint/checkpoint恢复时的一个典型问题。错误信息显示:Client requested master to start replication from position > file size,这意味着保存在checkpoint中的binlog位置已经超出了MySQL当前binlog文件的大小。

    问题原因

    • Binlog文件轮转:MySQL的binlog文件会定期轮转(根据配置的大小或时间),旧的binlog文件可能被删除
    • 位置信息过期:Checkpoint中保存的binlog位置信息已经失效
    • MySQL配置问题:binlog保留时间过短,导致恢复时需要的binlog已被清理
    2025-07-08 10:36:47
    赞同 67 展开评论
  • 哈哈,看不懂,完全是问了完成新手任务而来

    2025-06-25 15:29:26
    赞同 78 展开评论
  • 哈哈,看不懂,完全是问了完成新手任务而来

    2025-06-21 22:44:26
    赞同 86 展开评论
  • 哈哈,看不懂,完全是问了完成新手任务而来

    2025-04-29 17:49:24
    赞同 202 展开评论
  • 乘风破浪

    针对Flink CDC任务从savepoint/checkpoint恢复作业时出现的错误问题,以下是系统化的排查与解决方案指南:

    2025-04-21 16:30:41
    赞同 238 展开评论
滑动查看更多

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

还有其他疑问?
咨询AI助理