开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

flink cdc 怎么做断点续传啊

flink cdc 怎么做断点续传啊我程序不小心出了问题。异常了。重启,默认的init全量拉数据

展开
收起
雪哥哥 2022-11-13 20:18:52 5705 1
6 条回答
写回答
取消 提交回答
  • 学无止境!

    通过采集模块-业务数据采集之FlinkCDC DataStreamAPI 进行打包进行断点续传测试

    2022-11-30 11:41:08
    赞同 1 展开评论 打赏
  • 注意:测试环境最好使用linux系统的jar提交 在idea上可能会出现ck保存失败问题 断点续传测试: - 1.自动保存的ck(关闭自动删除) 用ck启动 - 2.使用手动的savepoint启动

    因为设置的ck文件系统是hadoop 所以需要添加flink和hadoop的继承 在主机环境变量添加 - FLINK集成HADOOP需要 export HADOOP_CLASSPATH=hadoop classpath export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop

    • source一下 source /etc/profile.d/my_ini.sh

    • 使用单机模式启动Flink bin/start-cluster.sh

    • 提交任务给web bin/flink run -m Ava01:8081 -c 全类名 jar包位置 -d 后台执行 bin/flink run -m Ava01:8081 -c com.otto.gmall.cdc.DataStreamAPITest myjar/Flink-CDC-1.0-SNAPSHOT-jar-with-dependencies.jar -d
    • 在Ava01:8081上查看webUI的输出
    • 手动触发保存点 flink savepoint jobId sp保存在hdfs路径

    在webUI上手动cancel job作业 变更监听的mysql库的表的数据 观察是否断点续传

    • 使用savepoint来恢复任务 观察能否断点续传 flink run -s 保存点的hdfs路径 -c 全类名 jar包
    2022-11-29 07:55:08
    赞同 1 展开评论 打赏
  • flinkcdc 做断点续传,需要将flinkcdc读取binlog的位置信息以状态方式保存在checkpoint中即可。 开启checkpoint,每隔5s执行一次ck,指定ck的一致性语义。

         env.enableCheckpointing(5000L);     env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    
    2022-11-23 16:58:02
    赞同 1 展开评论 打赏
  • 十年摸盘键,代码未曾试。 今日码示君,谁有上云事。

    Flink-CDC将读取binlog的位置信息以状态的方式保存在CK,如果想要做到断点续传, 需要从Checkpoint或者Savepoint启动程序,通过这种方式来实现断点续传

    2022-11-23 14:41:49
    赞同 1 展开评论 打赏
  • 天下风云出我辈,一入江湖岁月催,皇图霸业谈笑中,不胜人生一场醉。

    public class flinkCDC {

    public static void main(String[] args) throws Exception {
    
    
        //1.创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
    
        //2.flinkcdc 做断点续传,需要将flinkcdc读取binlog的位置信息以状态方式保存在checkpoint中即可.
    
        //(1)开启checkpoint 每隔5s 执行一次ck 指定ck的一致性语义
    
       env.enableCheckpointing(5000L);
     env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    
        //3.设置任务关闭后,保存最后后一次ck数据.
       env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,2000L));
    
       env.setStateBackend(new FsStateBackend("hdfs://192.168.1.204:9000/flinkCDC"));
        //4.设置访问HDFS的用户名
        System.setProperty("HADOOP_USER_NAME","root");
    
        //5.创建Sources数据源
        Properties prop = new Properties();
        prop.setProperty("scan.startup.mode","initial");  //"scan.startup.mode","initial" 三种要补充解释下
    
        DebeziumSourceFunction<String> mysqlSource = MySQLSource.<String>builder()
                .hostname("192.168.1.205")
                .port(3306)
                .username("root")
                .password("Root@123")
                .tableList("ssm.order") //这里的表名称,书写格式:db.table  不然会报错
                .databaseList("ssm")
                .debeziumProperties(prop)
                .deserializer(new StringDebeziumDeserializationSchema())
                .build();
        
        //6.添加数据源
        DataStreamSource<String> source = env.addSource(mysqlSource);
    
         //7.打印数据
        source.print();
    
        //8.执行任务
    
        env.execute();
    
    }
    

    }

    2022-11-23 11:49:56
    赞同 展开评论 打赏
  • 网站:http://ixiancheng.cn/ 微信订阅号:小马哥学JAVA

    这个可用提供一种思路,首先需要设置一个检查点,每五秒执行一次,检查是否应答了,如果没有应答就重新进行请求处理,保存上次已经处理的进度,下次可用在这个基础上面继续进行传送。里面最重要的一个点是保存最后一次的ck数据。

    2022-11-22 19:56:16
    赞同 展开评论 打赏
滑动查看更多

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    Flink CDC Meetup PPT - 龚中强 立即下载
    Flink CDC Meetup PPT - 王赫 立即下载
    Flink CDC Meetup PPT - 覃立辉 立即下载