开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

Flink CDC同步无主键表的时候,怎么通过datastream mysqlsource添加呢?

Flink CDC同步无主键表的时候,查到scan.incremental.snapshot.enabled配置false,但是都是sql创建with配置这个参数,怎么通过datastream mysqlsource添加呢?没找到入口

展开
收起
真的很搞笑 2023-11-07 08:05:06 141 0
3 条回答
写回答
取消 提交回答
  • jdbcproperties里面添加image.png
    ,此回答整理自钉群“Flink CDC 社区”

    2023-11-08 02:48:42
    赞同 展开评论 打赏
  • 面对过去,不要迷离;面对未来,不必彷徨;活在今天,你只要把自己完全展示给别人看。

    在 Flink CDC 中,scan.incremental.snapshot.enabled 参数是用于控制是否使用 Snapshot 策略的配置项。如果使用 SQL 创建 Flink CDC Source,则可以直接在 SQL 中设置这个参数;如果使用 DataStream API 创建,则需要通过特定的 DataStream API 来设置这个参数。
    具体来说,可以通过以下步骤设置 scan.incremental.snapshot.enabled 参数:

    1. 创建 DataStream MySQL Source,并设置必要的参数,如 hostportdatabase 等。
    2. 设置 scan.incremental.snapshot.enabled 参数为 false。例如:
      ```kotlin

    val source = new MySQLOutputFormat()
    val properties = new Properties()

    properties.setProperty("scan.incremental.snapshot.enabled", "false")

    val dataStream = env.addSource(source, new SimpleStringSchema(), properties)
    ```

    需要注意的是,这个属性是针对 DataStream API 的,而不是针对 SQL API 的,所以无法直接在 SQL 中设置。
    此外,在设置 scan.incremental.snapshot.enabled 参数时,请确保不要与 LogMiner 模式一起使用,因为 LogMiner 模式需要设置 logMiningContext 参数,否则可能会导致同步失败。

    2023-11-07 14:32:40
    赞同 展开评论 打赏
  • 在Flink CDC中,scan.incremental.snapshot.enabled参数是用来控制是否启用快照增量扫描的。这个参数的默认值是false,也就是说,Flink CDC默认使用的是全量扫描的方式来进行数据同步。

    如果你想通过Datastream MySqlSource来添加这个参数,你可以这样做:

    首先,你需要在你的Flink任务中引入org.apache.flink.connector.mysql.source.MySqlSourceOptions类。你可以通过以下代码来实现:

    import org.apache.flink.connector.mysql.source.MySqlSourceOptions;
    

    然后,你可以在创建MySqlSource的时候,通过MySqlSourceOptions类的setScanIncrementalSnapshotEnabled方法来设置scan.incremental.snapshot.enabled参数的值。例如:

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    // ...
    
    MySqlSourceOptions sourceOptions = new MySqlSourceOptions()
        .setDatabaseName("your_database")
        .setTableName("your_table")
        .setUsername("your_username")
        .setPassword("your_password")
        .setServerName("your_server")
        .setPort(3306)
        .setScanIncrementalSnapshotEnabled(true); // 这里设置成true
    
    ConnectionProvider connectionProvider = new MySqlConnectionProvider(sourceOptions);
    
    Datastream<Row> stream = env.fromSource(
        new MySqlSource(connectionProvider),
        WatermarkStrategy.forMonotonousTimestamps(),
        "MySqlSource");
    
    // ...
    

    在这段代码中,我设置了scan.incremental.snapshot.enabled参数的值为true,这样就可以启用快照增量扫描的方式来进行数据同步。

    2023-11-07 10:07:48
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    Flink CDC Meetup PPT - 龚中强 立即下载
    Flink CDC Meetup PPT - 王赫 立即下载
    Flink CDC Meetup PPT - 覃立辉 立即下载