mysql cdc 抽取抛出异常 设置了重启策略 5次 ,但是任务一直restaring ,请问这是什么原因呢?Caused by: java.lang.IllegalStateException: The connector is trying to read binlog starting at Struct{version=1.9.7.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1699323077067,db=,server_id=0,file=mysql-bin.002696,pos=217069012,row=0}, but this is no longer available on the server. Reconfigure the connector to use a snapshot when needed.
at com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext.loadStartingOffsetState(StatefulTaskContext.java:203)
at com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext.configure(StatefulTaskContext.java:127)
at com.ververica.cdc.connectors.mysql.debezium.reader.BinlogSplitReader.submitSplit(BinlogSplitReader.java:105)
at com.ververica.cdc.connectors.mysql.debezium.reader.BinlogSplitReader.submitSplit(BinlogSplitReader.java:73)
at com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader.pollSplitRecords(MySqlSplitReader.java:109)
at com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader.fetch(MySqlSplitReader.java:80)
at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
这个错误是由于 Flink CDC 连接器尝试从 MySQL 服务器读取 binlog 开始位置时,发现该位置不再可用。
这可能是因为在任务启动时,MySQL 服务器的 binlog 已经被删除或移动到其他位置,导致连接器无法找到所需的 binlog 文件和位置。
为了解决这个问题,您可以重新配置 Flink CDC 连接器以使用快照功能。快照功能可以捕获当前数据库状态,并将其作为初始偏移量供连接器使用。当连接器启动时,它会从快照开始读取数据,而不是从 binlog 开始位置读取数据。
要启用快照功能,请将 connector 的 "snapshot.mode" 参数设置为 "when_needed" 或 "always"。例如:
{
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "localhost",
"database.port": "3306",
"database.user": "root",
"database.password": "password",
"database.server.id": "12345",
"database.server.name": "myserver",
"database.whitelist": "mydb",
"snapshot.mode": "when_needed"
}
在上面的例子中,"snapshot.mode" 参数被设置为 "when_needed",这意味着连接器会在需要时自动创建快照,而不需要手动触发快照创建操作。
这个问题可能是由于MySQL的binlog文件已经损坏或者丢失导致的。你可以尝试以下方法来解决这个问题:
mysqlbinlog
工具来查看binlog文件的状态,例如:mysqlbinlog --status mysql-bin.002696
如果发现有损坏的binlog文件,你需要修复它们。
use_snapshot
为true
,这样在需要时会使用快照而不是从binlog文件中读取数据。例如:{
"name": "mysql_binlog_source",
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "localhost",
"database.port": "3306",
"database.user": "root",
"database.password": "password",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.whitelist": "mydb",
"database.history.kafka.bootstrap.servers": "localhost:9092",
"database.history.kafka.topic": "schema-changes.inventory",
"database.history.kafka.poll.interval.ms": "5000",
"database.history.kafka.producer.config.auto.offset.reset": "earliest",
"database.history.kafka.ssl.truststore.location": "/tmp/kafka.client.truststore.jks",
"database.history.kafka.ssl.truststore.password": "changeit",
"database.history.kafka.ssl.keystore.location": "/tmp/kafka.client.keystore.jks",
"database.history.kafka.ssl.keystore.password": "changeit",
"database.history.kafka.ssl.key.password": "changeit",
"database.history.kafka.ssl.endpoint.identification.algorithm": "https",
"include.schema.changes": "true",
"table.include.list": "mydb.*",
"use_snapshot": "true"
}
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。