每次启动读取全量数据同步到kafka,cancel job之后,再从checkpoint启动,能够正常启动,也显示从指定的checkpoint启动,但是数据是从头读取的。再次cancel job,对mongodb进行数据增删改,从第二个checkpoint启动,这时不是从头读取数据。
代码如下:
//1.获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
MongoDBSource.Builder<String> mongoDBSource = MongoDBSource.<String>builder()
.hosts(hostName)
.username(userName)
.password(password)
.connectionOptions(connectionOptions)
.deserializer(new CustomerMongoDeserialization()) //这里需要自定义序列化格式
// .deserializer(new JsonDebeziumDeserializationSchema())
;
switch (tableFlag){
case 1:
mongoDBSource.databaseList(StringUtils.split(mongoDatabaseList, ","));
break;
case 2:
if(StringUtils.isNotBlank(tableList)){
mongoDBSource.collectionList(StringUtils.split(tableList, ",")); //这个注释,就是多表同步
}else{
throw new RuntimeException("多表同步没有表名");
}
break;
}
env.enableCheckpointing(flinkCheckPointInterval * 1000L);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
SourceFunction<String> sourceFunction = mongoDBSource.build();
DataStreamSource<String> streamSource = env.addSource(sourceFunction);
//3.打印数据并将数据写入 Kafka
if(StringUtils.isNotEmpty(printLog) && "true".equals(printLog)){
streamSource.print();
}
streamSource.addSink(getKafkaProducer(kafkaIpPort, kafkaTopic, kafkaMaxRequestSize)).name("kafkaSink").disableChaining();
setEnvRestartStrategy(env);
env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//4.启动任务
try {
env.execute(flinkJobName);
} catch (Exception e) {
throw new RuntimeException(e);
}
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。