开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

flink mysql cdc从全量转增量的时候,有什么标记能被捕获吗?

"问题1:flink mysql cdc从全量转增量的时候,有什么标记能被捕获吗,因为全量阶段对内存资源的占用比较大,想在进入增量阶段时把资源调低,如果要是有这个标记,就可以分两阶段运行,或者有比较成型的方案吗?
问题2:这个日志,我在自己的逻辑代码内部也能获取到么?
问题3:就是全量阶段完成,我的任务自动通过捕获这个关键字,自动终止,这样通过监控发现全量任务完成,再有人工调整参数,启动增量任务。
问题4:普罗米修斯能监控到这个标记吗?
问题5:我知道有metric,我自己定义过,我意思是cdc mysql全量阶段完成这个标记通过metric是拿不到的吧?beb892e4f08fe9b7ee9cccd78b1ea817.png
"

展开
收起
十一0204 2023-07-19 18:38:25 576 3
2 条回答
写回答
取消 提交回答
  • 北京阿里云ACE会长

    在 Flink CDC 中,当从全量同步转为增量同步时,可以使用 CDC Connector 提供的 offset 来标记增量同步的起始位置。具体而言,可以通过 CDC Connector 获取当前的 offset,将其保存到外部存储中,当下一次增量同步时,从保存的 offset 开始读取数据。
    在 MySQL CDC Connector 中,可以通过 MySQLSource 类的 startupOptions() 方法来设置 offset。startupOptions() 方法接收一个 StartupOptions 类型的参数,该参数包含了启动参数的各种配置信息,其中包括 offset 配置。例如,可以通过设置 StartupOptions.initialOffset() 来指定 CDC Connector 的初始 offset。
    以下是一个示例代码:
    java
    Copy
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.EnvironmentSettings;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
    import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
    import org.apache.flink.table.connector.source.abilities.SupportsSourceWatermark;

    import java.util.Properties;

    public class MySQLCDCIncremental {
    public static void main(String[] args) throws Exception {
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    final StreamTableEnvironment tEnv = StreamTableEnvironment.create(
    env,
    EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build());

        // 创建 Flink MySQL CDC 连接器
        MySQLSource<RowData> source = MySQLSource.<RowData>builder()
                .hostname("localhost")
                .port(3306)
                .databaseList("test")
                .tableList("test_table")
                .username("root")
                .password("password")
                .deserializer(new RowDataDeserializationSchema())
                .startupOptions(StartupOptions.initialOffset("2022-01-01 00:00:00")) // 设置初始 offset
                .build();
    
        // 读取 MySQL 表中的数据
        DataStream<RowData> mysqlDataStream = env.addSource(source);
    
        // 将数据转换为 Table,并注册为临时表
        tEnv.createTemporaryView("test_table", mysqlDataStream, SupportsReadingMetadata.READ_METADATA_WATERMARK);
    
        // 执行查询
        String sql = "SELECT * FROM test_table WHERE id > 100";
        tEnv.executeSql(sql).print();
    
        env.execute();
    }
    

    }
    在上述示例中,通过设置 StartupOptions.ini

    2023-07-29 19:21:33
    赞同 9 展开评论 打赏
  • 意中人就是我呀!

    看日志,搜索received all关键字,出现的话表示全量阶段完成了。currentEmitEventTimeLag 指标记录的是 Source 发送一条记录到下游节点的时间点和该记录在 DB 里产生时间点差值,用于衡量数据从 DB 产生到离开 Source 节点的延迟。用户可以通过该指标判断 source 是否进入了 binlog 读取阶段:当该指标为 0 时,代表还在全量历史读取阶段;当大于 0 时,则代表进入了 binlog 读取阶段。此回答整理至钉群“Flink CDC 社区”。

    2023-07-19 19:03:22
    赞同 6 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    搭建电商项目架构连接MySQL 立即下载
    搭建4层电商项目架构,实战连接MySQL 立即下载
    PolarDB MySQL引擎重磅功能及产品能力盛大发布 立即下载

    相关镜像