开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

使用flinkcdc同步mysql至mysql的数据,只会同步一次,修改源表后目标表没有变化

我用这个代码 同步mysql的数据到另一个mysql 只会同步一次 第一次同步完成后后续源表变更目标表没有变化 flink版本是1.16.3 cdc是2.3.0 而且执行完insert后,web ui也没有任务

055867d84b6b8e2df5b20e401873870.png

这是flink下lib中的jar包

image.png

尝试过更换flink或者cdc的版本,然后还会报其他乱七八糟的错误。都是没有办法解决的。
尝试过直接在flink sql client中写insert into语句,插入数据到目标表,是没有问题的,可以写入数据。但使用insert into 目标表 select * from 源表这样的写法就不行。怀疑可能是源表的binlog日志没有抓取到,但按照其他的方式去查过,数据库的binlog是打开着的,到这就有些卡死了。table api没有写过 有些看不懂

展开
收起
游客3dwbslchfk4p6 2024-03-28 10:58:39 335 0
6 条回答
写回答
取消 提交回答
  • 这个问题可能是由于Flink CDC的快照模式导致的。在Flink CDC中,有两照模式:initial_and_latestonly_snapshot。默认情况下,Flink CDC会使用initial_and_latest模式,这意味着它会在启动时获取源表的初始快照,并在后续时刻获取最新的快照。

    要解决这个问题,你可以尝试将Flink CDC的快照模式更改为only_snapshot。这样,Flink CDC只会在启动时获取源表的初始快照,而不会在后续时刻获取最新的快照。你可以通过以下代码设置快照模式:

    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    import org.apache.flink.table.api.DataTypes;
    import org.apache.flink.table.api.Schema;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.TableResult;
    impor org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    import org.apache.flink.table.catalog.mysql.MySqlCatalog;
    import org.apache.flink.table.descriptors.ConnectorDescriptor;
    import org.apache.flink.table.descriptors.FormatDescriptor;
    import org.apache.flink.table.descriptors.SchemaDescriptor;
    import org.apache.flink.table.descriptors.SourceDescriptor;
    import org.apache.flink.table.descriptors.TableDescriptor;
    import org.apache.flink.table.descriptors.WatermarkDescriptor;
    import org.apache.flink.table.descriptors.XyzDescriptor;
    import org.apache.flink.table.factories.FactoryUtil;
    import org.apache.flink.table.sources.TableSource;
    import org.apache.flink.table.types.DataType;
    import org.apache.flink.table.types.logical.RowType;
    import org.apache.flink.table.utils.TableSchemaUtils;
    import org.apache.flink.table.utils.TableTestBase;
    import org.apache.flink.types.Row;
    
    public class FlinkCDCSync {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    
            // 创建源表描述符
            SourceDescriptor sourceDescriptor = new SourceDescriptor("mysql", "cdc", "source");
            sourceDescriptor.property("hostname", "localhost");
            sourceDescriptor.property("port", "3306");
            sourceDescriptor.property("username", "root");
            sourceDescriptor.property("password", "123456");
            sourceDescriptor.property("database-name", "test");
            sourceDescriptor.property("table-name", "source_table");
            sourceDescriptor.property("scan.startup.mode", "initial_and_latest"); // 修改为 only_snapshot
    
            // 注册源表
            tableEnv.connect(sourceDescriptor).withSchema().inAppendMode().registerTableSource("source_table");
    
            // 创建目标表描述符
            TableDescriptor targetDescriptor = TableDescriptor.forConnector("jdbc")
                    .schema(new Schema()
                            .field("id", DataTypes.INT())
                            .field("name", DataTypes.STRING())
                            .field("age", DataTypes.INT()))
                    .option("connector", "jdbc")
                    .option("url", "jdbc:mysql://localhost:3306/test")
                    .option("table-name", "target_table")
                    .option("username", "root")
                    .option("password", "123456")
                    .build();
    
            // 注册目标表
            tableEnv.connect(targetDescriptor).withSchema().inAppendMode().registerTableSource("target_table");
    
            // 同步数据
            Table result = tableEnv.sqlQuery("SELECT * FROM source_table");
            TableResult tableResult = tableEnv.executeSql("INSERT INTO target_table SELECT * FROM source_table");
        }
    }
    

    scan.startup.mode属性设置为only_snapshot后,Flink CDC将只获取源表的初始快照,而不会在后续时刻获取最新的快照。这样,当源表发生变更时,目标表也会相应地更新。

    2024-03-30 22:04:02
    赞同 展开评论 打赏
  • 这个问题可能是由于Flink CDC的快照模式导致的。在快照模式下,Flink CDC只会读取源表的初始状态,而不会监听后续的数据变更。要解决这个问题,你可以尝试将Flink CDC切换到实时模式。

    首先,你需要修改Flink CDC的配置文件,将scan.startup.mode设置为latest-offset。这将使Flink CDC从源表的最新偏移量开始读取数据。

    然后,你需要确保源表的数据变更被正确捕获。这可以通过在源表上创建一个触发器来实现。触发器可以在插入、更新或删除操作时记录binlog位置,以便Flink CDC可以从该位置开始读取数据。

    以下是创建触发器的示例代码:

    DELIMITER $$
    
    CREATE TRIGGER update_binlog_position
    AFTER INSERT ON your_source_table
    FOR EACH ROW
    BEGIN
      -- 在这里记录binlog位置,例如将其存储在另一个表中
    END$$
    
    ELIMITER ;
    

    最后,确保Flink CDC任务正在运行,并且可以正常处理源表的数据变更。如果问题仍然存在,建议检查Flink和CDC的日志以获取更多详细信息。

    2024-03-29 12:11:21
    赞同 展开评论 打赏
  • 阿里云大降价~

    这个问题可能是由于Flink CDC的checkpoint设置不正确导致的。请尝试调整以下参数:

    1. 增加execution.checkpointing.interval的值,例如设置为60000(单位为毫秒),表示每60秒进行一次checkpoint。
    2. 增加execution.checkpointing.min-pause的值,例如设置为5000(单位为毫秒),表示在两次checkpoint之间至少暂停5秒。
    3. 增加execution.chenting.max-concurrent-checkpoints的值,例如设置为1,表示最多同时进行一个checkpoint。

    修改后的代码如下:

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    
    // 设置chepoint相关参数
    envnableCheckpointing(60000); // 每60秒进行一次checkpoint
    env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000); // 在两次checkpoint之间至少暂停5秒
    env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 最多同时进行一个checkpoint
    
    String sourceDDL = "CREATE TABLE my_table (id INT PRIMARY KEY, name STRING) WITH ('connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'root', 'password' = 'password', 'database-name' = 'mydb', 'table-name' = 'my_table')";
    TableResult result = tableEnv.executeSql(sourceDDL);
    
    String sinkDDL = "CREATE TABLE my_sink_table (id INT PRIMARY KEY, name STRING) WITH ('connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:3306/mydb', 'username' = 'root', 'password' = 'password', 'table-name' = 'my_sink_table')";
    tableEnv.executeSql(sinkDDL);
    
    Table myTable = tableEnv.from("my_table");
    Table mySinkTable = tableEnv.from("my_sink_table");
    
    myTable.executeInsert("my_sink_table");
    
    env.execute("Flink CDC Job");
    

    如果问题仍然存在,请检查Flink和CDC的日志以获取更多详细信息。

    2024-03-29 10:00:03
    赞同 展开评论 打赏
  • 这个问题可能是由于Flink CDC的checkpoint设置不正确导致的。请尝试以下解决方案:

    1. 确保源表和目标表的数据类型是兼容的,否则同步过程中可能会出现错误。

    2. 检查Flink CDC的配置文件,确保scan.startup.mode设置为latest-offset0,以便从源表的最新位置开始同步数据。

    3. 调整Flink的checkpoint配置,例如增加checkpoint.interval的值(默认为1000毫秒),以便更频繁地触发checkpoint。同时,可以增加state.backend的内存大小,以确保状态数据不会丢失。

    4. 在Flink SQL中添加WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'root', 'password' = '123456', 'database-name' = 'mydb', 'table-name' = 'mytable')来指定源表和目标表的信息。

    5. 如果问题仍然存在,可以尝试升级Flink和CDC的版本,或者查看Flink的日志以获取更多详细信息。

    2024-03-28 16:00:29
    赞同 展开评论 打赏
  • 这个问题可能是由于Flink CDC的快照模式导致的。在快照模式下,Flink CDC只会读取源表的初始状态,而不会监听后续的数据变更。要解决这个问题,你可以尝试将Flink CDC切换到实时模式。

    首先,你需要修改Flink CDC的配置文件,将scan.startup.mode设置为latest-offset。这将使Flink CDC从源表的最新偏移量开始读取数据,并持续监听后续的数据变更。

    然后,你需要确保源表的数据变更能够被Flink CDC捕获。这通常需要开启MySQL的二进制日志功能。你可以在MySQL的配置文件(如my.cnf)中添加以下配置:

    [mysqld]
    log-bin=mysql-bin
    binlog-format=ROW
    server-id=1
    

    重启MySQL服务后,Flink CDC应该能够实时同步源表的数据变更到目标表。如果问题仍然存在,请检查Flink和Flink CDC的日志以获取更多信息。

    2024-03-28 14:47:21
    赞同 展开评论 打赏
  • 将军百战死,壮士十年归!

    您描述的情况是使用Apache Flink CDC同步MySQL数据到另一个MySQL数据库,但第一次同步后,源表数据发生变化时目标表没有相应更新,且Web UI中看不到运行的任务。

    在Flink CDC实现MySQL到MySQL的实时数据同步时,需要确保任务是持久运行的,且能够捕捉到MySQL的增量变更数据。以下是一些可能的问题点和解决思路:

    1. Flink作业未持久化运行

      • 确保Flink作业是部署为流处理模式,即作业提交后会持续监听和处理数据变更事件,而不是一次性作业。
      • 如果使用的是SQL Client提交任务,需要确保任务启动时带有恰当的执行模式,例如 execute_sql --job-type=DataStream
    2. 检查CDC源配置

      • 确认Debezium连接器正确配置了MySQL binlog监听,并且连接和读取binlog无误。
      • 确保Flink CDC任务配置了恰当的offset存储和恢复机制,以便在重启时能够从上次处理的位置继续读取。
    3. 检查Sink配置

      • 确认Flink Sink配置正确,能够将变更事件正确写入目标MySQL数据库。
      • 检查目标数据库的写入权限、表结构以及事务处理是否正常。
    4. 监控与检查点

      • 确认Flink任务启用了检查点或者保存点,以便在作业失败或重启时能够恢复状态并从断点处继续执行。
      • 检查作业的运行状态和日志,查找可能存在的错误信息。
    5. Web UI

      • 确认在Web UI中查看作业的状态是否正确,如果任务已经停止或者失败,需要找出原因并重新启动。
      • 检查Flink集群是否仍在运行,以及任务是否在集群中处于活动状态。
    6. 代码或SQL示例

      • 如果可能,请提供使用的Flink CDC配置代码或SQL脚本,以便进一步分析是否存在配置上的问题。

    如果上述检查都无法解决问题,请查看Flink CDC任务的日志,以及Flink集群的监控信息,这些信息可以帮助诊断问题的真正原因。此外,也可以检查MySQL的binlog配置,确保binlog格式、GTID等相关设置满足Flink CDC的要求。

    2024-03-28 14:28:06
    赞同 展开评论 打赏
滑动查看更多

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关电子书

更多
搭建电商项目架构连接MySQL 立即下载
搭建4层电商项目架构,实战连接MySQL 立即下载
PolarDB MySQL引擎重磅功能及产品能力盛大发布 立即下载

相关镜像