开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

请问flink CDC中这个参数怎么样才能生效?

请问flink CDC中这个参数怎么样才能生效?scan.incremental.close-idle-reader.enabled flink 版本:1.16.1cdc 版本:2.4.0初使并发:3依赖参数设置:execution.checkpointing.checkpoints-after-tasks-finish.enabled = true预期效果:全量结束后,增量阶段并行度为 1。实际还是一直是3。cd7095e6ab7715a82230a9734b2bb9af.png
df0247807300f30f04cb55756e8272f7.png

展开
收起
十一0204 2023-07-19 18:38:26 201 0
2 条回答
写回答
取消 提交回答
  • 北京阿里云ACE会长

    在 Flink CDC 中,如果需要设置 CDC 连接器的参数,可以使用 withParameters() 方法来设置。例如,以下是一个示例代码,演示如何使用 withParameters() 方法设置 CDC 连接器的参数:
    java
    Copy
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.EnvironmentSettings;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
    import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
    import org.apache.flink.table.connector.source.abilities.SupportsSourceWatermark;

    import java.util.Properties;

    public class MySQLCDCWithParameters {
    public static void main(String[] args) throws Exception {
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    final StreamTableEnvironment tEnv = StreamTableEnvironment.create(
    env,
    EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build());

        // 创建 Flink MySQL CDC 连接器
        MySQLSource<RowData> source = MySQLSource.<RowData>builder()
                .hostname("localhost")
                .port(3306)
                .databaseList("test")
                .tableList("test_table")
                .username("root")
                .password("password")
                .deserializer(new RowDataDeserializationSchema())
                .withParameters(getParameters()) // 设置参数
                .build();
    
        // 读取 MySQL 表中的数据
        DataStream<RowData> mysqlDataStream = env.addSource(source);
    
        // 将数据转换为 Table,并注册为临时表
        tEnv.createTemporaryView("test_table", mysqlDataStream, SupportsReadingMetadata.READ_METADATA_WATERMARK);
    
        // 执行查询
        String sql = "SELECT * FROM test_table WHERE id > 100";
        tEnv.executeSql(sql).print();
    
        env.execute();
    }
    
    // 获取 CDC 连接器的参数
    private static Map<String, String> getParameters() {
        Map<String, String> params = new HashMap<>();
        params.put("snapshot.mode", "initial"); // 设置参数
        return params;
    }
    

    }
    在上述示例中,通过使用 withParameters() 方法设置 CDC 连接器的参数,例如

    2023-07-29 19:21:31
    赞同 展开评论 打赏
  • 存在即是合理

    scan.incremental.close-idle-reader.enabled 参数用于控制 Flink CDC 增量流的关闭行为。当设置为 true 时,Flink CDC 将尝试在任务完成后关闭空闲的读取器,以释放资源并提高性能。

    如果希望在全量结束后,增量阶段并行度为 1,则需要确保以下两个条件:

    1. 在全量阶段之后,没有其他任务正在运行。这可以通过将全量阶段与其他任务分离来实现,例如使用不同的 Flink 作业或 JobManager。

    2. 在增量阶段中,所有读取器都已正确配置并启动。这包括确保每个读取器都有足够的资源(例如内存和 CPU)来处理数据流,并且它们之间没有竞争关系。

    如果以上两个条件都满足,但仍然无法使增量阶段并行度降为 1,则可能需要进一步检查 Flink CDC 的配置和代码实现,以确定是否存在其他问题。

    2023-07-24 19:52:20
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    Flink CDC Meetup PPT - 龚中强 立即下载
    Flink CDC Meetup PPT - 王赫 立即下载
    Flink CDC Meetup PPT - 覃立辉 立即下载