flink cdc作业(多个数据库)同步数据库下的表运行一段时间,后面需要下线某些表?

flink cdc作业(多个数据库)同步数据库下的表运行一段时间,后面需要下线某些表?从savepoint或者ck启动就报这个文件找不到,但是这个文件在mysql中是有的,如果不上线和下线表,就能正常从保存点启动?Caused by: java.lang.IllegalStateException: The connector is trying to read binlog starting at Struct{version=1.9.7.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1703557118495,db=,server_id=0,file=mysql-bin.000149,pos=194708651,row=0}, but this is no longer available on the server. Reconfigure the connector to use a snapshot when needed.

展开
收起
真的很搞笑 2023-12-31 20:30:50 92 分享 版权
阿里云 AI 助理回答

问题分析

根据您提供的错误信息,Flink CDC作业在从保存点(savepoint)或检查点(checkpoint)启动时,报错提示无法找到指定的Binlog文件。具体错误如下:

Caused by: java.lang.IllegalStateException: The connector is trying to read binlog starting at Struct{version=1.9.7.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1703557118495,db=,server_id=0,file=mysql-bin.000149,pos=194708651,row=0}, but this is no longer available on the server.

这表明Flink CDC尝试从某个Binlog位点开始读取数据,但该位点对应的Binlog文件已经被MySQL服务器清理或过期删除。

此外,您提到在下线某些表后,从保存点启动会报错,而不上线和下线表时可以正常启动。这说明下线表的操作可能影响了CDC源表的元数据状态,导致Flink CDC无法正确恢复到保存点的状态。


解决方案

1. 确保Binlog文件未被清理

MySQL的Binlog文件默认会定期清理,清理策略由expire_logs_days参数控制。如果Binlog文件已被清理,Flink CDC将无法从保存点恢复。因此,首先需要确认以下内容: - 检查MySQL服务器上的Binlog文件是否仍然存在。

SHOW BINARY LOGS;

如果目标Binlog文件(如mysql-bin.000149)不存在,则需要调整MySQL的Binlog保留策略,确保在Flink CDC作业运行期间不会清理所需的Binlog文件。 - 修改expire_logs_days参数以延长Binlog保留时间: sql SET GLOBAL expire_logs_days = 7; -- 保留7天的Binlog文件 - 或者使用binlog_expire_logs_seconds参数(MySQL 8.0及以上版本支持): sql SET GLOBAL binlog_expire_logs_seconds = 604800; -- 保留7天的Binlog文件

重要提醒:在生产环境中,建议定期备份Binlog文件,以防止因清理策略导致数据丢失。


2. 下线表对CDC源表的影响

当您下线某些表时,Flink CDC的元数据状态可能会发生变化,导致从保存点恢复时无法正确解析Binlog位点。以下是可能的原因及解决方法:

原因分析
  • Flink CDC在保存点中记录了每个表的Binlog消费位点。当下线某些表后,这些表的元数据可能被移除,导致Flink CDC无法正确恢复到保存点的状态。
  • 如果下线表的操作涉及修改CDC源表的配置(如table-name正则表达式),可能导致Flink CDC无法匹配到正确的表结构。
解决方法
  1. 避免直接下线表
    如果需要下线某些表,建议通过以下方式操作:

    • 在Flink SQL中,使用EXCLUDING TABLE语法排除不需要同步的表,而不是直接修改CDC源表的配置。例如:
      CREATE DATABASE IF NOT EXISTS target_db AS DATABASE source_db EXCLUDING TABLE 'unwanted_table';
      

      这样可以确保Flink CDC的元数据状态保持一致,避免因下线表导致保存点无法恢复的问题。

  2. 重新生成保存点
    如果已经下线了某些表,并且无法从现有保存点启动,可以尝试以下步骤:

    • 停止当前作业。
    • 修改CDC源表配置,确保只包含需要同步的表。
    • 重新启动作业并生成新的保存点。
  3. 启用全量同步
    如果Binlog文件已丢失且无法恢复,可以通过重新执行全量同步来解决问题。在Flink CDC中,可以通过设置scan.startup.modeinitial来触发全量同步:

    CREATE TABLE mysql_source (...) WITH (
       'connector' = 'mysql-cdc',
       'scan.startup.mode' = 'initial',
       ...
    );
    

    全量同步完成后,Flink CDC会自动切换到增量同步模式。


3. 开启CDC Source复用

如果您的作业中有多个MySQL CDC源表,建议开启CDC Source复用功能,以减少对MySQL数据库的压力,并提高作业的稳定性。开启方法如下:

SET 'table.optimizer.source-merge.enabled' = 'true';

注意:在VVR 8.0.8及8.0.9版本中,还需要额外设置以下参数:

SET 'sql-gateway.exec-plan.enabled' = 'false';

开启Source复用后,建议保持pipeline.operator-chaining为默认值(true),以避免序列化和反序列化的性能开销。


4. 使用Paimon Tag管理保存点

为了避免保存点对应的快照文件因过期被清理,可以结合Paimon的Tag功能长期保留快照状态。具体操作如下: 1. 开启Tag自动创建功能:

ALTER TABLE your_table SET ('sink.savepoint.auto-tag' = 'true');
  1. 在需要从保存点恢复时,找到对应的Tag名称(格式为savepoint-${savepointID})。
  2. 执行回退操作:
    CALL `<catalog-name>`.sys.rollback_to('<database-name>.<table-name>', 'savepoint-${savepointID}');
    
  3. 从保存点恢复作业。

总结与建议

  1. 确保Binlog文件未被清理:调整MySQL的Binlog保留策略,避免因文件过期导致无法恢复。
  2. 避免直接下线表:通过EXCLUDING TABLE语法排除不需要同步的表,保持元数据一致性。
  3. 重新生成保存点:如果无法从现有保存点启动,可以重新启动作业并生成新的保存点。
  4. 启用CDC Source复用:减少对MySQL数据库的压力,提高作业稳定性。
  5. 结合Paimon Tag管理保存点:长期保留快照状态,避免因快照过期导致恢复失败。

通过以上措施,您可以有效解决Flink CDC作业从保存点启动时遇到的问题,并确保作业的稳定运行。

有帮助
无帮助
AI 助理回答生成答案可能存在不准确,仅供参考
0 条回答
写回答
取消 提交回答

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

还有其他疑问?
咨询AI助理