mongodb-cdc 2.4.1版本checkpoint一直是全量开始读取数据

每次启动读取全量数据同步到kafka,cancel job之后,再从checkpoint启动,能够正常启动,也显示从指定的checkpoint启动,但是数据是从头读取的。再次cancel job,对mongodb进行数据增删改,从第二个checkpoint启动,这时不是从头读取数据。
代码如下:
//1.获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    MongoDBSource.Builder<String> mongoDBSource = MongoDBSource.<String>builder()
            .hosts(hostName)
            .username(userName)
            .password(password)
            .connectionOptions(connectionOptions)
            .deserializer(new CustomerMongoDeserialization()) //这里需要自定义序列化格式

// .deserializer(new JsonDebeziumDeserializationSchema())
;

    switch (tableFlag){
        case 1:
            mongoDBSource.databaseList(StringUtils.split(mongoDatabaseList, ","));
            break;
        case 2:
            if(StringUtils.isNotBlank(tableList)){
                mongoDBSource.collectionList(StringUtils.split(tableList, ",")); //这个注释,就是多表同步
            }else{
                throw new RuntimeException("多表同步没有表名");
            }
            break;

    }

    env.enableCheckpointing(flinkCheckPointInterval * 1000L);

    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

    SourceFunction<String> sourceFunction = mongoDBSource.build();
    DataStreamSource<String> streamSource = env.addSource(sourceFunction);



    //3.打印数据并将数据写入 Kafka
    if(StringUtils.isNotEmpty(printLog) && "true".equals(printLog)){
        streamSource.print();
    }


    streamSource.addSink(getKafkaProducer(kafkaIpPort, kafkaTopic, kafkaMaxRequestSize)).name("kafkaSink").disableChaining();


    setEnvRestartStrategy(env);

    env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);


    //4.启动任务
    try {
        env.execute(flinkJobName);
    } catch (Exception e) {
        throw new RuntimeException(e);
    }

展开
收起
游客pbpcyd5spf2lq 2024-01-30 15:40:55 172 分享 版权
阿里云 AI 助理回答

问题分析

根据您的描述,问题的核心在于 MongoDB 数据同步到 Kafka 的过程中,Checkpoint 的行为不符合预期。具体表现为:

  1. 第一次启动任务时:从全量数据开始同步,即使指定了 Checkpoint,仍然会从头读取数据。
  2. 取消任务后对 MongoDB 进行增删改操作,再从第二个 Checkpoint 启动时:不再从头读取数据,而是从指定的 Checkpoint 继续。

这种行为可能与以下因素有关: - MongoDB Source Connector 的配置:是否正确启用了增量同步(Change Streams)。 - Checkpoint 的实现机制:Flink 的 Checkpoint 是否正确记录了 MongoDB 的偏移量(Offset)。 - 全量同步与增量同步的切换逻辑copy.existing 参数的设置是否合理。

以下是基于知识库资料和问题描述的详细解答。


解决方案

1. 确保 MongoDB Source Connector 配置正确

在使用 MongoDB Source Connector 时,需要确保以下关键参数配置正确:

  • connector.class:必须设置为 com.mongodb.kafka.connect.MongoSourceConnector
  • tasks.max:在 MongoDB Source Connector 中,此参数只能设置为 1
  • connection.url:填写 MongoDB 数据库的专有网络连接地址,并替换 **** 为 root 账号的密码。
  • databasecollection:明确指定要同步的数据库和集合。
  • topic.namespace.map:定义目标 Kafka Topic 的映射关系,格式为 {"database.collection": "targetTopic"}
  • copy.existing:控制是否将 MongoDB 中已存在的数据全量同步到 Kafka。建议首次同步完成后将其设置为 false,以避免重复消费。

重要提示:如果 copy.existing 设置为 true,每次启动任务时都会触发全量同步,这可能是导致您问题的原因之一。

2. 检查 Flink Checkpoint 配置

Flink 的 Checkpoint 机制用于记录数据流的处理进度,确保任务在失败或重启时能够从上次的状态继续。以下是关键配置项的说明:

  • 启用 Checkpoint

    env.enableCheckpointing(flinkCheckPointInterval * 1000L);
    
    • flinkCheckPointInterval 是 Checkpoint 的时间间隔(单位为秒)。建议设置为合理的值(如 60 秒),以平衡性能和容错能力。
  • 设置 Checkpoint 模式

    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    
    • EXACTLY_ONCE 模式确保每条数据只被处理一次,适用于需要强一致性的场景。
  • 保存 Checkpoint 状态

    • 确保 Checkpoint 的状态存储路径(如 HDFS 或文件系统)配置正确,并且有足够的存储空间。

3. 确认 MongoDB Change Streams 的启用

MongoDB 的 Change Streams 功能是实现增量同步的关键。如果未启用 Change Streams,MongoDB Source Connector 将无法记录数据变化的偏移量,从而导致每次启动任务时都从头读取数据。

启用步骤: 1. 确保 MongoDB 版本为 3.6 或更高。 2. 在 MongoDB 实例中启用副本集(Replica Set),因为 Change Streams 依赖于副本集的功能。 3. 在 Connector 配置中添加以下参数:

change.stream.full.document=updateLookup
  • 此参数确保在更新操作中返回完整的文档内容。

4. 调整 copy.existing 参数

根据您的描述,任务在首次启动时会从头读取数据,这可能是因为 copy.existing 参数设置为 true。建议按照以下步骤调整:

  1. 首次同步时

    • 设置 copy.existing=true,以确保 MongoDB 中的存量数据被全量同步到 Kafka。
    • 示例配置:
      copy.existing=true
      
  2. 首次同步完成后

    • copy.existing 更新为 false,以避免后续任务重启时再次触发全量同步。
    • 示例配置:
      copy.existing=false
      

5. 验证 Checkpoint 的偏移量记录

Flink 的 Checkpoint 应该记录 MongoDB 的偏移量(Offset),以便在任务重启时从指定位置继续读取数据。如果 Checkpoint 的偏移量未正确记录,可能会导致任务从头读取数据。

验证方法: 1. 检查 Checkpoint 存储路径中的元数据文件,确认是否包含 MongoDB 的偏移量信息。 2. 如果偏移量信息缺失,可能是以下原因导致: - MongoDB Source Connector 未正确启用 Change Streams。 - Flink 的 Checkpoint 配置存在问题。

6. 测试与验证

完成上述配置后,按照以下步骤进行测试:

  1. 首次启动任务

    • 设置 copy.existing=true,观察是否从 MongoDB 中读取全量数据并写入 Kafka。
    • 确认 Checkpoint 是否正确记录了偏移量。
  2. 取消任务并修改数据

    • 对 MongoDB 进行增删改操作。
    • 重新启动任务,设置 copy.existing=false,观察是否从 Checkpoint 记录的位置继续读取数据。
  3. 多次重复测试

    • 反复取消任务、修改数据并重启任务,验证 Checkpoint 的行为是否符合预期。

总结

通过以上步骤,您可以解决 MongoDB 数据同步到 Kafka 时 Checkpoint 行为异常的问题。关键点包括:

  1. 确保 MongoDB Source Connector 的配置正确,特别是 copy.existingchange.stream.full.document 参数。
  2. 配置 Flink 的 Checkpoint 机制,确保偏移量信息被正确记录。
  3. 启用 MongoDB 的 Change Streams 功能,支持增量同步。

如果问题仍然存在,请检查 MongoDB 和 Flink 的日志文件,定位具体的错误信息并进一步排查。

有帮助
无帮助
AI 助理回答生成答案可能存在不准确,仅供参考
0 条回答
写回答
取消 提交回答

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

还有其他疑问?
咨询AI助理