flink CDC任务监听mysql数据。只要不是从savepoint/checkpoint中恢复,都是能成功运行并监听数据的,但是只要从savepoint/checkpoint中恢复作业,就会报如下错误:
Caused by: java.lang.RuntimeException: One or more fetchers have encountered exception
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:261)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:131)
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:419)
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:114)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Caused by: org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:50)
at com.ververica.cdc.connectors.mysql.debezium.task.context.MySqlErrorHandler.setProducerThrowable(MySqlErrorHandler.java:85)
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource$ReaderThreadLifecycleListener.onCommunicationFailure(MySqlStreamingChangeEventSource.java:1544)
at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:1079)
at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:631)
at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:932)
... 1 more
Caused by: io.debezium.DebeziumException: Client requested master to start replication from position > file size Error code: 1236; SQLSTATE: HY000.
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.wrap(MySqlStreamingChangeEventSource.java:1488)
... 5 more
Caused by: com.github.shyiko.mysql.binlog.network.ServerException: Client requested master to start replication from position > file size
at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:1043)
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
验证 Binlog 文件是否存在
在尝试恢复 Flink 任务前,你可以先去 MySQL 服务器上确认 Flink 需要的 binlog 文件是否还存在。
查看 Flink 需要的 binlog 位置: 这个信息通常在 Flink 的错误日志中可以找到,或者在 Flink UI 的 Checkpoint 详情里。
查看 MySQL 服务器上可用的 binlog 文件: 在 MySQL 中执行:
SQL
SHOW BINARY LOGS;
这个命令会列出所有当前可用的 binlog 文件。检查一下 Flink 需要的文件是否在这个列表里。如果不在,就说明它已经被清理了。
这是一种下策,只有在你可以接受数据丢失(从任务停止到重启这段时间的数据)的情况下才能使用。
当你确定无法从旧的 savepoint 恢复时,你可以选择放弃这个 savepoint,然后重新启动一个新任务,并设置启动参数从最新的位置开始消费。
Java
// Flink SQL
'scan.startup.mode' = 'latest-offset'
// DataStream API
MySqlSource.builder()
.startupOptions(StartupOptions.latest())
// ... 其他配置
.build();
⚠️ 警告:这样做会导致停机期间的所有数据变更全部丢失,请务必谨慎操作!
这个错误表明 Flink CDC 任务在从 checkpoint/savepoint 恢复时,尝试从 MySQL binlog 中一个无效的位置开始读取数据,导致连接失败。核心问题在于:当任务恢复时,它尝试从保存的 binlog 位置继续读取,但这个位置对应的 binlog 文件可能已被 MySQL 清理或轮转。
Caused by: com.github.shyiko.mysql.binlog.network.ServerException:
Client requested master to start replication from position > file size
这个错误明确表示:
purge binary logs 命令清理expire_logs_days 设置被自动删除-- 查看当前设置
SHOW VARIABLES LIKE 'expire_logs_days';
SHOW VARIABLES LIKE 'binlog_expire_logs_seconds'; -- MySQL 8.0+
-- 延长 binlog 保留时间(推荐 7-14 天)
SET GLOBAL expire_logs_days = 14; -- MySQL 5.7
SET GLOBAL binlog_expire_logs_seconds = 1209600; -- MySQL 8.0 (14天)
在 CDC 源配置中添加 GTID 模式和故障转移策略:
MySqlSource<String> source = MySqlSource.<String>builder()
.hostname("your-host")
.port(3306)
.databaseList("your-db")
.username("user")
.password("pass")
.serverId("5400-5404") // 设置服务器ID范围
.serverTimeZone("Asia/Shanghai")
// 关键配置:启用GTID和故障转移
.startupOptions(StartupOptions.timestamp(0L)) // 初始启动位置
.includeSchemaChanges(true)
.debeziumProperties(debeziumProps())
.build();
private static Properties debeziumProps() {
Properties props = new Properties();
// 启用GTID模式
props.setProperty("gtid.source", "enabled");
props.setProperty("gtid.new.channel.position", "latest");
// 配置binlog失效时的恢复策略
props.setProperty("database.history.skip.unparseable.ddl", "true");
props.setProperty("database.history.store.only.monitored.tables.ddl", "true");
props.setProperty("database.history.store.only.captured.tables.ddl", "true");
props.setProperty("snapshot.mode", "when_needed"); // 必要时重新快照
props.setProperty("tombstones.on.delete", "false");
return props;
}
在任务恢复前检查 binlog 位置有效性:
public class BinlogPositionValidator {
public static boolean isValidPosition(String binlogFilename, long position) {
try (Connection conn = DriverManager.getConnection(jdbcUrl, user, password)) {
// 检查binlog文件是否存在
PreparedStatement stmt = conn.prepareStatement(
"SHOW BINARY LOGS"
);
ResultSet rs = stmt.executeQuery();
boolean fileExists = false;
while (rs.next()) {
if (rs.getString("Log_name").equals(binlogFilename)) {
fileExists = true;
// 检查位置是否有效
if (position <= rs.getLong("File_size")) {
return true;
}
}
}
return false;
} catch (SQLException e) {
throw new RuntimeException("Binlog position validation failed", e);
}
}
}
在 Flink 任务中实现自定义恢复逻辑:
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // 最大重启次数
Time.seconds(10) // 重启间隔
));
// 注册状态监听器
streamExecutionEnvironment.addOperatorEventListener(new OperatorEventListener() {
@Override
public void onCheckpointError(OperatorID operatorId, Throwable cause) {
if (cause instanceof BinlogPositionException) {
// 重置到最新binlog位置
resetBinlogPosition();
}
}
});
private void resetBinlogPosition() {
// 获取当前最新的binlog位置
try (Connection conn = DriverManager.getConnection(jdbcUrl, user, password)) {
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery("SHOW MASTER STATUS");
if (rs.next()) {
String currentFile = rs.getString("File");
long currentPos = rs.getLong("Position");
// 更新CDC源配置
sourceBuilder.startupOptions(
StartupOptions.specificOffset(currentFile, currentPos)
);
}
} catch (SQLException e) {
logger.error("Failed to reset binlog position", e);
}
}
# 监控binlog文件过期时间
mysql -e "SHOW VARIABLES LIKE 'expire_logs_days'"
# 设置定期检查任务
*/30 * * * * /path/to/check_binlog_retention.sh
# check_binlog_retention.sh 内容:
#!/bin/bash
EXPIRY_DAYS=$(mysql -N -e "SHOW VARIABLES LIKE 'expire_logs_days'" | awk '{print $2}')
if [ $EXPIRY_DAYS -lt 7 ]; then
echo "Binlog retention too low: $EXPIRY_DAYS days" | mail -s "MySQL Binlog Alert" admin@example.com
fi
| 配置项 | 推荐值 | 作用 |
|---|---|---|
expire_logs_days | ≥7 天 | 确保 binlog 保留时间足够长 |
gtid.source | enabled | 使用 GTID 实现精确位置恢复 |
snapshot.mode | when_needed | binlog 失效时自动重新快照 |
startupOptions | specificOffset | 支持精确指定恢复位置 |
| Flink 重启策略 | fixedDelayRestart | 提供自动恢复机制 |
Binlog 备份系统:
# 定期备份binlog到HDFS/S3
*/30 * * * * mysqlbinlog --read-from-remote-server --raw \
--host=mysql-host --user=backup --password=pass \
--result-file=/backup/binlogs/ $(mysql -e "SHOW BINARY LOGS" | tail -n +2 | awk '{print $1}')
CDC 位置管理服务:
// 实现外部化的binlog位置存储
public class BinlogPositionStore implements ListCheckpointed<BinlogPosition> {
@Override
public List<BinlogPosition> snapshotState(long checkpointId, long timestamp) {
// 存储到DB/Redis
store.save(getCurrentPosition());
return Collections.singletonList(getCurrentPosition());
}
@Override
public void restoreState(List<BinlogPosition> state) {
BinlogPosition position = state.get(0);
if (!BinlogPositionValidator.isValid(position)) {
position = getLatestPositionFromMaster();
}
setStartPosition(position);
}
}
Flink CDC 版本升级:
使用最新版本(至少 2.2+)包含以下修复:
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.3.0</version>
</dependency>
when_needed模式)这些配置和策略组合使用,能有效解决 Flink CDC 从 checkpoint 恢复时的 binlog 位置失效问题,确保任务稳定运行。
这个错误是Flink CDC从savepoint/checkpoint恢复时的一个典型问题。错误信息显示:Client requested master to start replication from position > file size,这意味着保存在checkpoint中的binlog位置已经超出了MySQL当前binlog文件的大小。
问题原因
针对Flink CDC任务从savepoint/checkpoint恢复作业时出现的错误问题,以下是系统化的排查与解决方案指南:
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。