开发者社区> 问答> 正文

flink 1.11 checkpoint使用

我在使用flink 1.11.0中得ddl 部分 采用debezium-json做cdc得时候 从checkpoint恢复以后,新来op=d的数据会删除失败 重启命令:./bin/flink run -m yarn-cluster /root/bigdata-flink-1.0.jar -s hdfs://prehadoop01:8020/flink/checkpoints/4cc5df8b96e90c1c2a4d3719a77f51d1/chk-819/_metadata 代码: EnvironmentSettings settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build();

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointTimeout(6000L); // 超时时间 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 最大允许同时出现几个CheckPoint env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10L); // 最小得间隔时间 env.getCheckpointConfig().setPreferCheckpointForRecovery(true); // 是否倾向于用CheckPoint做故障恢复 env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1); // 容忍多少次CheckPoint失败 //Checkpoint文件清理策略

env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //Checkpoint外部文件路径 env.setStateBackend(new FsStateBackend(new URI("hdfs://172.22.20.205:8020/flink/checkpoints"), false)); TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS))); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings); String sourceDDL = String.format( "CREATE TABLE debezium_source (" + " id INT NOT NULL," + " name STRING," + " description STRING," + " weight Double" + ") WITH (" + " 'connector' = 'kafka-0.11'," + " 'topic' = '%s'," + " 'properties.bootstrap.servers' = '%s'," + " 'scan.startup.mode' = 'group-offsets'," + " 'format' = 'debezium-json'" + ")", "ddd", " 172.22.20.206:9092"); String sinkDDL = "CREATE TABLE sink (" + " id INT NOT NULL," + " name STRING," + " description STRING," + " weight Double," + " PRIMARY KEY (id,name, description,weight) NOT ENFORCED " + ") WITH (" + " 'connector' = 'jdbc'," + " 'url' = 'jdbc:mysql://172.27.4.22:3306/test?autoReconnect=true'," + " 'table-name' = 'products'," + " 'driver'= 'com.mysql.cj.jdbc.Driver'," + " 'username'='DataPip'," + " 'password'='DataPip'" + ")"; String dml = "INSERT INTO sink SELECT id,name ,description, weight FROM debezium_source GROUP BY id,name ,description, weight"; tEnv.executeSql(sourceDDL); tEnv.executeSql(sinkDDL); tEnv.executeSql(dml);

*来自志愿者整理的flink邮件归档

展开
收起
小阿矿 2021-12-06 15:54:58 795 0
1 条回答
写回答
取消 提交回答
  • 为什么要 GROUP BY id,name ,description, weight ? 直接 "INSERT INTO sink SELECT id,name ,description, weight FROM debezium_source" 不能满足需求?*来自志愿者整理的flink邮件归档

    2021-12-07 10:05:44
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载