每次启动读取全量数据同步到kafka,cancel job之后,再从checkpoint启动,能够正常启动,也显示从指定的checkpoint启动,但是数据是从头读取的。再次cancel job,对mongodb进行数据增删改,从第二个checkpoint启动,这时不是从头读取数据。
代码如下:
//1.获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
MongoDBSource.Builder<String> mongoDBSource = MongoDBSource.<String>builder()
.hosts(hostName)
.username(userName)
.password(password)
.connectionOptions(connectionOptions)
.deserializer(new CustomerMongoDeserialization()) //这里需要自定义序列化格式
// .deserializer(new JsonDebeziumDeserializationSchema())
;
switch (tableFlag){
case 1:
mongoDBSource.databaseList(StringUtils.split(mongoDatabaseList, ","));
break;
case 2:
if(StringUtils.isNotBlank(tableList)){
mongoDBSource.collectionList(StringUtils.split(tableList, ",")); //这个注释,就是多表同步
}else{
throw new RuntimeException("多表同步没有表名");
}
break;
}
env.enableCheckpointing(flinkCheckPointInterval * 1000L);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
SourceFunction<String> sourceFunction = mongoDBSource.build();
DataStreamSource<String> streamSource = env.addSource(sourceFunction);
//3.打印数据并将数据写入 Kafka
if(StringUtils.isNotEmpty(printLog) && "true".equals(printLog)){
streamSource.print();
}
streamSource.addSink(getKafkaProducer(kafkaIpPort, kafkaTopic, kafkaMaxRequestSize)).name("kafkaSink").disableChaining();
setEnvRestartStrategy(env);
env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//4.启动任务
try {
env.execute(flinkJobName);
} catch (Exception e) {
throw new RuntimeException(e);
}
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
根据您的描述,问题的核心在于 MongoDB 数据同步到 Kafka 的过程中,Checkpoint 的行为不符合预期。具体表现为:
这种行为可能与以下因素有关: - MongoDB Source Connector 的配置:是否正确启用了增量同步(Change Streams)。 - Checkpoint 的实现机制:Flink 的 Checkpoint 是否正确记录了 MongoDB 的偏移量(Offset)。 - 全量同步与增量同步的切换逻辑:copy.existing
参数的设置是否合理。
以下是基于知识库资料和问题描述的详细解答。
在使用 MongoDB Source Connector 时,需要确保以下关键参数配置正确:
connector.class
:必须设置为 com.mongodb.kafka.connect.MongoSourceConnector
。tasks.max
:在 MongoDB Source Connector 中,此参数只能设置为 1
。connection.url
:填写 MongoDB 数据库的专有网络连接地址,并替换 ****
为 root 账号的密码。database
和 collection
:明确指定要同步的数据库和集合。topic.namespace.map
:定义目标 Kafka Topic 的映射关系,格式为 {"database.collection": "targetTopic"}
。copy.existing
:控制是否将 MongoDB 中已存在的数据全量同步到 Kafka。建议首次同步完成后将其设置为 false
,以避免重复消费。重要提示:如果 copy.existing
设置为 true
,每次启动任务时都会触发全量同步,这可能是导致您问题的原因之一。
Flink 的 Checkpoint 机制用于记录数据流的处理进度,确保任务在失败或重启时能够从上次的状态继续。以下是关键配置项的说明:
启用 Checkpoint:
env.enableCheckpointing(flinkCheckPointInterval * 1000L);
flinkCheckPointInterval
是 Checkpoint 的时间间隔(单位为秒)。建议设置为合理的值(如 60 秒),以平衡性能和容错能力。设置 Checkpoint 模式:
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
EXACTLY_ONCE
模式确保每条数据只被处理一次,适用于需要强一致性的场景。保存 Checkpoint 状态:
MongoDB 的 Change Streams 功能是实现增量同步的关键。如果未启用 Change Streams,MongoDB Source Connector 将无法记录数据变化的偏移量,从而导致每次启动任务时都从头读取数据。
启用步骤: 1. 确保 MongoDB 版本为 3.6 或更高。 2. 在 MongoDB 实例中启用副本集(Replica Set),因为 Change Streams 依赖于副本集的功能。 3. 在 Connector 配置中添加以下参数:
change.stream.full.document=updateLookup
copy.existing
参数根据您的描述,任务在首次启动时会从头读取数据,这可能是因为 copy.existing
参数设置为 true
。建议按照以下步骤调整:
首次同步时:
copy.existing=true
,以确保 MongoDB 中的存量数据被全量同步到 Kafka。copy.existing=true
首次同步完成后:
copy.existing
更新为 false
,以避免后续任务重启时再次触发全量同步。copy.existing=false
Flink 的 Checkpoint 应该记录 MongoDB 的偏移量(Offset),以便在任务重启时从指定位置继续读取数据。如果 Checkpoint 的偏移量未正确记录,可能会导致任务从头读取数据。
验证方法: 1. 检查 Checkpoint 存储路径中的元数据文件,确认是否包含 MongoDB 的偏移量信息。 2. 如果偏移量信息缺失,可能是以下原因导致: - MongoDB Source Connector 未正确启用 Change Streams。 - Flink 的 Checkpoint 配置存在问题。
完成上述配置后,按照以下步骤进行测试:
首次启动任务:
copy.existing=true
,观察是否从 MongoDB 中读取全量数据并写入 Kafka。取消任务并修改数据:
copy.existing=false
,观察是否从 Checkpoint 记录的位置继续读取数据。多次重复测试:
通过以上步骤,您可以解决 MongoDB 数据同步到 Kafka 时 Checkpoint 行为异常的问题。关键点包括:
copy.existing
和 change.stream.full.document
参数。如果问题仍然存在,请检查 MongoDB 和 Flink 的日志文件,定位具体的错误信息并进一步排查。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。